Spaces:
Running
Running
import os | |
from typing import List, Dict, Any, Optional | |
from openai import OpenAI | |
import json | |
from tools import SearchTool, FetchTool, SummarizeTool, FirecrawlScrapeTool | |
from dotenv import load_dotenv | |
from openai.types.chat import ChatCompletionMessage | |
from openai.types.chat.chat_completion import ChatCompletion | |
load_dotenv() | |
def print_section(title: str, content: str): | |
"""Print a section with a clear separator.""" | |
print(f"\n{'='*80}") | |
print(f"{title}") | |
print(f"{'='*80}") | |
print(content) | |
print(f"{'='*80}\n") | |
class PromptRefiner: | |
def __init__(self, client): | |
self.client = client | |
self.model = "qwen-3-32b" | |
def refine(self, query: str) -> str: | |
"""Refine the user's query into a structured research prompt.""" | |
#print_section("PROMPT REFINER", f"Original query: {query}") | |
response = self.client.chat.completions.create( | |
model=self.model, | |
messages=[ | |
{"role": "system", "content": """You are a "Prompt Architect" for a Deep Research Tool. Your job is to take an informal user query and turn it into a clear, comprehensive, and structured research prompt. | |
Your output MUST follow this exact format: | |
[RESEARCH_OBJECTIVE] | |
A clear, single-sentence statement of what needs to be researched. | |
[CONTEXT] | |
- Domain/field of research | |
- Required background knowledge | |
- Any specific constraints or boundaries | |
[KEY_QUESTIONS] | |
1. First specific question to answer | |
2. Second specific question to answer | |
3. Third specific question to answer | |
(Add more if needed) | |
[OUTPUT_REQUIREMENTS] | |
- Format (e.g., structured report, bullet points) | |
- Depth of analysis | |
- Required citations or sources | |
- Length constraints | |
[KEY_TERMS] | |
- Term 1 | |
- Term 2 | |
- Term 3 | |
(Add more if needed) | |
[CLARIFICATIONS_NEEDED] | |
- Any questions that need to be asked to the user | |
- Any assumptions made | |
"""}, | |
{"role": "user", "content": query} | |
] | |
) | |
refined_query = response.choices[0].message.content | |
#print_section("REFINED QUERY", refined_query) | |
return refined_query | |
class ResearcherAgent: | |
def __init__(self, client): | |
self.client = client | |
self.model = "qwen-3-32b" | |
self.tools = [ | |
SearchTool(), | |
# FetchTool(), | |
SummarizeTool(), | |
FirecrawlScrapeTool() | |
] | |
self.tools_json = [ | |
{ | |
"type": "function", | |
"function": tool.to_json() | |
} | |
for tool in self.tools | |
] | |
self.tools_map = {tool.name: tool for tool in self.tools} | |
def research(self, query: str) -> str: | |
"""Perform web research on the given query and return summarized findings.""" | |
#print_section("RESEARCHER", f"Starting research on: {query}") | |
conversation_history = [ | |
{"role": "system", "content": """You are a research agent that searches the web, reads contents of the urls, and summarizes findings. | |
Use below tools if you think you are not up to date with the latest information: | |
- search tool - to find relevant URLs | |
- firecrawl_scrape tool - to get content from the most promising URLs in markdown format | |
- summarize tool - to extract key information | |
Organize findings in a clear, structured format | |
Your final response should be a well-organized summary of all findings, with clear sections and bullet points where appropriate."""}, | |
{"role": "user", "content": query} | |
] | |
while True: | |
response = self.client.chat.completions.create( | |
model=self.model, | |
messages=conversation_history, | |
tools=self.tools_json, | |
) | |
message = response.choices[0].message | |
conversation_history.append({ | |
"role": "assistant", | |
"content": message.content if message.content else "", | |
"tool_calls": message.tool_calls | |
}) | |
if not message.tool_calls: | |
#print_section("RESEARCH FINDINGS", message.content or "No findings generated") | |
return message.content or "No findings generated" | |
tool_results = [] | |
for tool_call in message.tool_calls: | |
tool_name = tool_call.function.name | |
arguments = json.loads(tool_call.function.arguments) | |
#print_section("TOOL CALL", f"Tool: {tool_name}\nArguments: {json.dumps(arguments, indent=2)}") | |
if tool_name not in self.tools_map: | |
continue | |
tool = self.tools_map[tool_name] | |
result = tool(**arguments) | |
#print_section("TOOL RESULT", f"Tool: {tool_name}\nResult: {result}") | |
tool_results.append({ | |
"tool_call_id": tool_call.id, | |
"role": "tool", | |
"name": tool_name, | |
"content": result | |
}) | |
conversation_history.extend(tool_results) | |
class PlannerAgent: | |
def __init__(self, client): | |
self.client = client | |
self.model = "qwen-3-32b" | |
self.scratchpad = "" | |
self.researcher = ResearcherAgent(client) | |
def plan(self, refined_query: str) -> str: | |
"""Plan the research process and manage the scratchpad.""" | |
#print_section("PLANNER", f"Starting research planning for:\n{refined_query}") | |
conversation_history = [ | |
{"role": "system", "content": """ | |
You are a research planner that manages the research process. | |
Your responses MUST follow this exact format: | |
If you need more research: | |
NEED_RESEARCH | |
RESEARCH_QUERY: [specific query to research] | |
REASON: [why this research is needed] | |
If you have enough information: | |
ENOUGH_INFORMATION | |
SUMMARY: [brief summary of what we've learned] | |
NEXT_STEPS: [what should be done with this information] | |
Always evaluate: | |
1. Have we answered all key questions from the research objective? | |
2. Do we have enough depth and breadth of information? | |
3. Are there any gaps in our understanding? | |
4. Do we need to verify any information? | |
Current date is 2025-06-04. | |
"""}, | |
{"role": "user", "content": f"Query: {refined_query}\nCurrent scratchpad:\n{self.scratchpad}"} | |
] | |
while True: | |
response = self.client.chat.completions.create( | |
model=self.model, | |
messages=conversation_history | |
) | |
message = response.choices[0].message | |
#print_section("PLANNER DECISION", message.content) | |
conversation_history.append({"role": "assistant", "content": message.content}) | |
# Parse the planner's decision | |
if "ENOUGH_INFORMATION" in message.content: | |
#print_section("PLANNER", "Research complete. Moving to report generation.") | |
return self.scratchpad | |
elif "NEED_RESEARCH" in message.content: | |
# Extract research query from the message | |
research_query = message.content.split("RESEARCH_QUERY:")[1].split("\n")[0].strip() | |
findings = self.researcher.research(research_query) | |
self.scratchpad += f"\n\nNew findings:\n{findings}" | |
#print_section("UPDATED SCRATCHPAD", self.scratchpad) | |
conversation_history.append({ | |
"role": "user", | |
"content": f"Updated scratchpad:\n{self.scratchpad}" | |
}) | |
class ReporterAgent: | |
def __init__(self, client): | |
self.client = client | |
self.model = "qwen-3-32b" | |
def generate_report(self, scratchpad: str, original_query: str) -> str: | |
"""Generate a final report based on the scratchpad content.""" | |
#print_section("REPORTER", "Generating final report") | |
response = self.client.chat.completions.create( | |
model=self.model, | |
messages=[ | |
{"role": "system", "content": """You are a research reporter that generates clear, well-structured reports. | |
Your report MUST follow this format: | |
[EXECUTIVE_SUMMARY] | |
A concise overview of the key findings and conclusions. | |
[MAIN_FINDINGS] | |
1. First major finding | |
- Supporting details | |
- Sources/references | |
2. Second major finding | |
- Supporting details | |
- Sources/references | |
(Add more as needed) | |
[ANALYSIS] | |
- Interpretation of the findings | |
- Connections between different pieces of information | |
- Implications or significance | |
[CONCLUSION] | |
- Summary of key takeaways | |
- Any remaining questions or areas for further research | |
[SOURCES] | |
- List of all sources used in the research"""}, | |
{"role": "user", "content": f"Original query: {original_query}\n\nResearch findings:\n{scratchpad}\n\nGenerate a comprehensive report that answers the original query."} | |
] | |
) | |
report = response.choices[0].message.content | |
# #print_section("FINAL REPORT", report) | |
return report | |
def research(query: str) -> str: | |
"""Main research function that orchestrates the entire research process.""" | |
try: | |
api_key = os.environ.get("CEREBRAS_API_KEY") | |
if not api_key: | |
return "Error: Please set CEREBRAS_API_KEY environment variable" | |
client = OpenAI( | |
base_url="https://api.cerebras.ai/v1", | |
api_key=api_key | |
) | |
# Step 1: Refine the prompt | |
refiner = PromptRefiner(client) | |
refined_query = refiner.refine(query) | |
# Step 2: Plan and execute research | |
planner = PlannerAgent(client) | |
scratchpad = planner.plan(refined_query) | |
# Step 3: Generate final report | |
reporter = ReporterAgent(client) | |
final_report = reporter.generate_report(scratchpad, query) | |
return final_report | |
except Exception as e: | |
return f"Error in research process: {str(e)}" | |
# if __name__ == "__main__": | |
# while True: | |
# query = input("Enter your query: ") | |
# if query == "exit": | |
# break | |
# print(research(query)) |