Spaces:
Sleeping
Sleeping
# src/chimera/core/orchestrator.py | |
import asyncio | |
from ..api_clients import gemini_client, serp_client # , external_apis | |
from ..utils.logging_config import logger | |
from ..utils import data_processing | |
async def run_analysis(user_query: str) -> str: | |
""" | |
Main orchestration logic for Project Chimera. | |
1. Interprets user query (simple keyword check for now). | |
2. Calls relevant APIs concurrently. | |
3. Gathers data. | |
4. Formats data and creates a prompt for Gemini. | |
5. Calls Gemini for analysis. | |
6. Returns the result. | |
""" | |
logger.info(f"Received query: {user_query}") | |
# Step 1: Basic Query Interpretation (Replace with more sophisticated logic/LLM call if needed) | |
tasks = [] | |
if "news" in user_query.lower() or "search" in user_query.lower(): | |
# Extract keywords or use the whole query for SERP | |
search_term = user_query # Or extract better term | |
logger.info("Adding SERP task.") | |
tasks.append(asyncio.create_task(serp_client.search_google(search_term, num_results=5))) | |
# Add conditions for other APIs based on keywords (e.g., "weather", "stock", "earthquake") | |
# if "weather" in user_query.lower(): | |
# location = "New York" # Extract location from query | |
# logger.info("Adding Weather task.") | |
# tasks.append(asyncio.create_task(external_apis.get_weather(location))) | |
# --- Start of Corrected Section --- | |
if not tasks: | |
logger.warning("No relevant APIs identified for the query. Proceeding without external API data.") | |
# Fallback: Just send the raw query to Gemini? Or ask user for clarification? | |
# For now, just send the query directly for general knowledge analysis | |
pass # This 'pass' is now correctly indented under the 'if not tasks:' block. | |
# It signifies doing nothing specific if no tasks were added. | |
# --- Lines below are now correctly dedented --- | |
# Step 2 & 3: Call APIs Concurrently and Gather Data | |
api_results = {} # Initialize api_results regardless of whether tasks were added | |
if tasks: # Only execute gather if there are tasks | |
logger.info(f"Gathering data from {len(tasks)} API(s)...") | |
results = await asyncio.gather(*tasks, return_exceptions=True) # Collect all results/exceptions | |
logger.info("API data gathering complete.") | |
# Process results (basic example) | |
# Make sure results list is not empty before accessing results[0] | |
if results and isinstance(results[0], dict) and "organic_results" in results[0]: | |
api_results["serp"] = results[0] | |
# Add checks and assignments for other potential API results | |
# Handle potential errors from asyncio.gather | |
for i, result in enumerate(results): | |
if isinstance(result, Exception): | |
logger.error(f"API call task {i} failed: {result}") | |
# Decide how to handle partial failures - inform Gemini? Return error? | |
# For now, we'll just log it and proceed with whatever data we got. | |
elif isinstance(result, dict) and "error" in result: | |
logger.error(f"API call task {i} reported an error: {result['error']}") | |
# --- End of Corrected Section --- | |
# Step 4: Format Data and Create Gemini Prompt | |
# Process the gathered data into a readable format for the LLM | |
formatted_data = data_processing.format_api_data_for_llm(api_results) # Pass potentially empty api_results | |
# Construct the final prompt | |
# This is CRITICAL - prompt engineering is key here! | |
prompt = f""" | |
Analyze the following user query and synthesized real-time data to provide insights, identify patterns, potential solutions, or opportunities. | |
User Query: "{user_query}" | |
Synthesized Data: | |
--- | |
{formatted_data if formatted_data else "No additional real-time data was gathered for this query."} | |
--- | |
Based on the query and the data (if provided), please provide a comprehensive analysis. Consider potential implications, connections between data points, and answer the user's core question or request. If suggesting solutions or opportunities, be specific and justify your reasoning. | |
""" | |
# Step 5: Call Gemini | |
logger.info("Sending final prompt to Gemini for analysis.") | |
analysis_result = await gemini_client.generate_analysis(prompt) | |
# Step 6: Return Result | |
logger.info("Analysis complete.") | |
return analysis_result |