dolphinium
feat: Integrate dynamic field suggestions from external API into analysis plan generation and UI
f74c067
""" | |
Core data processing and analysis logic for the PharmaCircle AI Data Analyst. | |
This module orchestrates the main analysis workflow: | |
1. Takes a user's natural language query. | |
2. Uses the LLM to generate a structured analysis plan. | |
3. Executes parallel queries against Solr for quantitative and qualitative data. | |
4. Generates a data visualization using the LLM. | |
5. Synthesizes the findings into a comprehensive, user-facing report. | |
""" | |
import json | |
import re | |
import datetime | |
import pandas as pd | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
import os | |
import concurrent.futures | |
import copy | |
import google.generativeai as genai | |
from llm_prompts import ( | |
get_analysis_plan_prompt, | |
get_synthesis_report_prompt, | |
get_visualization_code_prompt | |
) | |
from extract_results import get_search_list_params | |
def parse_suggestions_from_report(report_text): | |
"""Extracts numbered suggestions from the report's markdown text.""" | |
suggestions_match = re.search(r"### (?:Deeper Dive: Suggested Follow-up Analyses|Suggestions for Further Exploration)\s*\n(.*?)$", report_text, re.DOTALL | re.IGNORECASE) | |
if not suggestions_match: return [] | |
suggestions_text = suggestions_match.group(1) | |
suggestions = re.findall(r"^\s*\d+\.\s*(.*)", suggestions_text, re.MULTILINE) | |
return [s.strip() for s in suggestions] | |
def llm_generate_analysis_plan_with_history(llm_model, natural_language_query, chat_history): | |
""" | |
Generates a complete analysis plan from a user query, considering chat history | |
and dynamic field suggestions from an external API. | |
""" | |
search_fields, search_name = [], "" | |
try: | |
# Call the external API to get dynamic field suggestions | |
search_fields, search_name = get_search_list_params(natural_language_query) | |
print(f"Successfully retrieved {len(search_fields)} dynamic fields.") | |
except Exception as e: | |
print(f"Warning: Could not retrieve dynamic search fields. Proceeding without them. Error: {e}") | |
# Generate the prompt, including the (potentially empty) search_fields | |
prompt = get_analysis_plan_prompt(natural_language_query, chat_history, search_fields) | |
try: | |
response = llm_model.generate_content(prompt) | |
cleaned_text = re.sub(r'```json\s*|\s*```', '', response.text, flags=re.MULTILINE | re.DOTALL).strip() | |
plan = json.loads(cleaned_text) | |
# Return the plan and the retrieved fields for UI display | |
return plan, search_fields | |
except Exception as e: | |
raw_response_text = response.text if 'response' in locals() else 'N/A' | |
print(f"Error in llm_generate_analysis_plan_with_history: {e}\nRaw Response:\n{raw_response_text}") | |
# Return None for the plan but still return search_fields for debugging in the UI | |
return None, search_fields | |
def execute_quantitative_query(solr_client, plan): | |
"""Executes the facet query to get aggregate data.""" | |
if not plan or 'quantitative_request' not in plan or 'json.facet' not in plan.get('quantitative_request', {}): | |
return None | |
try: | |
params = { | |
"q": plan.get('query_filter', '*_*'), | |
"rows": 0, | |
"json.facet": json.dumps(plan['quantitative_request']['json.facet']) | |
} | |
results = solr_client.search(**params) | |
return results.raw_response.get("facets", {}) | |
except Exception as e: | |
print(f"Error in quantitative query: {e}") | |
return None | |
def execute_qualitative_query(solr_client, plan): | |
"""Executes the grouping query to get the best example docs.""" | |
if not plan or 'qualitative_request' not in plan: | |
return None | |
try: | |
qual_request = copy.deepcopy(plan['qualitative_request']) | |
params = { | |
"q": plan.get('query_filter', '*_*'), | |
"rows": 3, # Get a few examples per group | |
"fl": "*,score", | |
**qual_request | |
} | |
results = solr_client.search(**params) | |
return results.grouped | |
except Exception as e: | |
print(f"Error in qualitative query: {e}") | |
return None | |
def llm_synthesize_enriched_report_stream(llm_model, query, quantitative_data, qualitative_data, plan): | |
""" | |
Generates an enriched report by synthesizing quantitative aggregates | |
and qualitative examples, and streams the result. | |
""" | |
prompt = get_synthesis_report_prompt(query, quantitative_data, qualitative_data, plan) | |
try: | |
response_stream = llm_model.generate_content(prompt, stream=True) | |
for chunk in response_stream: | |
yield chunk.text | |
except Exception as e: | |
print(f"Error in llm_synthesize_enriched_report_stream: {e}") | |
yield "Sorry, I was unable to generate a report for this data." | |
def llm_generate_visualization_code(llm_model, query_context, facet_data): | |
"""Generates Python code for visualization based on query and data.""" | |
prompt = get_visualization_code_prompt(query_context, facet_data) | |
try: | |
generation_config = genai.types.GenerationConfig(temperature=0, max_output_tokens=2048) | |
response = llm_model.generate_content(prompt, generation_config=generation_config) | |
code = re.sub(r'^```python\s*|```$', '', response.text, flags=re.MULTILINE) | |
return code | |
except Exception as e: | |
print(f"Error in llm_generate_visualization_code: {e}\nRaw response: {response.text}") | |
return None | |
def execute_viz_code_and_get_path(viz_code, facet_data): | |
"""Executes visualization code and returns the path to the saved plot image.""" | |
if not viz_code: return None | |
try: | |
if not os.path.exists('/tmp/plots'): os.makedirs('/tmp/plots') | |
plot_path = f"/tmp/plots/plot_{datetime.datetime.now().timestamp()}.png" | |
exec_globals = {'facet_data': facet_data, 'plt': plt, 'sns': sns, 'pd': pd} | |
exec(viz_code, exec_globals) | |
fig = exec_globals.get('fig') | |
if fig: | |
fig.savefig(plot_path, bbox_inches='tight') | |
plt.close(fig) | |
return plot_path | |
return None | |
except Exception as e: | |
print(f"ERROR executing visualization code: {e}\n---Code---\n{viz_code}") | |
return None |