project-chimera / src /chimera /core /orchestrator.py
mgbam's picture
Update src/chimera/core/orchestrator.py
9ce91b0 verified
# src/chimera/core/orchestrator.py
import asyncio
from ..api_clients import gemini_client, serp_client # , external_apis
from ..utils.logging_config import logger
from ..utils import data_processing
async def run_analysis(user_query: str) -> str:
"""
Main orchestration logic for Project Chimera.
1. Interprets user query (simple keyword check for now).
2. Calls relevant APIs concurrently.
3. Gathers data.
4. Formats data and creates a prompt for Gemini.
5. Calls Gemini for analysis.
6. Returns the result.
"""
logger.info(f"Received query: {user_query}")
# Step 1: Basic Query Interpretation (Replace with more sophisticated logic/LLM call if needed)
tasks = []
if "news" in user_query.lower() or "search" in user_query.lower():
# Extract keywords or use the whole query for SERP
search_term = user_query # Or extract better term
logger.info("Adding SERP task.")
tasks.append(asyncio.create_task(serp_client.search_google(search_term, num_results=5)))
# Add conditions for other APIs based on keywords (e.g., "weather", "stock", "earthquake")
# if "weather" in user_query.lower():
# location = "New York" # Extract location from query
# logger.info("Adding Weather task.")
# tasks.append(asyncio.create_task(external_apis.get_weather(location)))
# --- Start of Corrected Section ---
if not tasks:
logger.warning("No relevant APIs identified for the query. Proceeding without external API data.")
# Fallback: Just send the raw query to Gemini? Or ask user for clarification?
# For now, just send the query directly for general knowledge analysis
pass # This 'pass' is now correctly indented under the 'if not tasks:' block.
# It signifies doing nothing specific if no tasks were added.
# --- Lines below are now correctly dedented ---
# Step 2 & 3: Call APIs Concurrently and Gather Data
api_results = {} # Initialize api_results regardless of whether tasks were added
if tasks: # Only execute gather if there are tasks
logger.info(f"Gathering data from {len(tasks)} API(s)...")
results = await asyncio.gather(*tasks, return_exceptions=True) # Collect all results/exceptions
logger.info("API data gathering complete.")
# Process results (basic example)
# Make sure results list is not empty before accessing results[0]
if results and isinstance(results[0], dict) and "organic_results" in results[0]:
api_results["serp"] = results[0]
# Add checks and assignments for other potential API results
# Handle potential errors from asyncio.gather
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"API call task {i} failed: {result}")
# Decide how to handle partial failures - inform Gemini? Return error?
# For now, we'll just log it and proceed with whatever data we got.
elif isinstance(result, dict) and "error" in result:
logger.error(f"API call task {i} reported an error: {result['error']}")
# --- End of Corrected Section ---
# Step 4: Format Data and Create Gemini Prompt
# Process the gathered data into a readable format for the LLM
formatted_data = data_processing.format_api_data_for_llm(api_results) # Pass potentially empty api_results
# Construct the final prompt
# This is CRITICAL - prompt engineering is key here!
prompt = f"""
Analyze the following user query and synthesized real-time data to provide insights, identify patterns, potential solutions, or opportunities.
User Query: "{user_query}"
Synthesized Data:
---
{formatted_data if formatted_data else "No additional real-time data was gathered for this query."}
---
Based on the query and the data (if provided), please provide a comprehensive analysis. Consider potential implications, connections between data points, and answer the user's core question or request. If suggesting solutions or opportunities, be specific and justify your reasoning.
"""
# Step 5: Call Gemini
logger.info("Sending final prompt to Gemini for analysis.")
analysis_result = await gemini_client.generate_analysis(prompt)
# Step 6: Return Result
logger.info("Analysis complete.")
return analysis_result