File size: 6,176 Bytes
f43f2d3 f74c067 f43f2d3 f74c067 f43f2d3 f74c067 f43f2d3 f74c067 f43f2d3 f74c067 f43f2d3 f74c067 f43f2d3 f74c067 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 |
"""
Core data processing and analysis logic for the PharmaCircle AI Data Analyst.
This module orchestrates the main analysis workflow:
1. Takes a user's natural language query.
2. Uses the LLM to generate a structured analysis plan.
3. Executes parallel queries against Solr for quantitative and qualitative data.
4. Generates a data visualization using the LLM.
5. Synthesizes the findings into a comprehensive, user-facing report.
"""
import json
import re
import datetime
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
import concurrent.futures
import copy
import google.generativeai as genai
from llm_prompts import (
get_analysis_plan_prompt,
get_synthesis_report_prompt,
get_visualization_code_prompt
)
from extract_results import get_search_list_params
def parse_suggestions_from_report(report_text):
"""Extracts numbered suggestions from the report's markdown text."""
suggestions_match = re.search(r"### (?:Deeper Dive: Suggested Follow-up Analyses|Suggestions for Further Exploration)\s*\n(.*?)$", report_text, re.DOTALL | re.IGNORECASE)
if not suggestions_match: return []
suggestions_text = suggestions_match.group(1)
suggestions = re.findall(r"^\s*\d+\.\s*(.*)", suggestions_text, re.MULTILINE)
return [s.strip() for s in suggestions]
def llm_generate_analysis_plan_with_history(llm_model, natural_language_query, chat_history):
"""
Generates a complete analysis plan from a user query, considering chat history
and dynamic field suggestions from an external API.
"""
search_fields, search_name = [], ""
try:
# Call the external API to get dynamic field suggestions
search_fields, search_name = get_search_list_params(natural_language_query)
print(f"Successfully retrieved {len(search_fields)} dynamic fields.")
except Exception as e:
print(f"Warning: Could not retrieve dynamic search fields. Proceeding without them. Error: {e}")
# Generate the prompt, including the (potentially empty) search_fields
prompt = get_analysis_plan_prompt(natural_language_query, chat_history, search_fields)
try:
response = llm_model.generate_content(prompt)
cleaned_text = re.sub(r'```json\s*|\s*```', '', response.text, flags=re.MULTILINE | re.DOTALL).strip()
plan = json.loads(cleaned_text)
# Return the plan and the retrieved fields for UI display
return plan, search_fields
except Exception as e:
raw_response_text = response.text if 'response' in locals() else 'N/A'
print(f"Error in llm_generate_analysis_plan_with_history: {e}\nRaw Response:\n{raw_response_text}")
# Return None for the plan but still return search_fields for debugging in the UI
return None, search_fields
def execute_quantitative_query(solr_client, plan):
"""Executes the facet query to get aggregate data."""
if not plan or 'quantitative_request' not in plan or 'json.facet' not in plan.get('quantitative_request', {}):
return None
try:
params = {
"q": plan.get('query_filter', '*_*'),
"rows": 0,
"json.facet": json.dumps(plan['quantitative_request']['json.facet'])
}
results = solr_client.search(**params)
return results.raw_response.get("facets", {})
except Exception as e:
print(f"Error in quantitative query: {e}")
return None
def execute_qualitative_query(solr_client, plan):
"""Executes the grouping query to get the best example docs."""
if not plan or 'qualitative_request' not in plan:
return None
try:
qual_request = copy.deepcopy(plan['qualitative_request'])
params = {
"q": plan.get('query_filter', '*_*'),
"rows": 3, # Get a few examples per group
"fl": "*,score",
**qual_request
}
results = solr_client.search(**params)
return results.grouped
except Exception as e:
print(f"Error in qualitative query: {e}")
return None
def llm_synthesize_enriched_report_stream(llm_model, query, quantitative_data, qualitative_data, plan):
"""
Generates an enriched report by synthesizing quantitative aggregates
and qualitative examples, and streams the result.
"""
prompt = get_synthesis_report_prompt(query, quantitative_data, qualitative_data, plan)
try:
response_stream = llm_model.generate_content(prompt, stream=True)
for chunk in response_stream:
yield chunk.text
except Exception as e:
print(f"Error in llm_synthesize_enriched_report_stream: {e}")
yield "Sorry, I was unable to generate a report for this data."
def llm_generate_visualization_code(llm_model, query_context, facet_data):
"""Generates Python code for visualization based on query and data."""
prompt = get_visualization_code_prompt(query_context, facet_data)
try:
generation_config = genai.types.GenerationConfig(temperature=0, max_output_tokens=2048)
response = llm_model.generate_content(prompt, generation_config=generation_config)
code = re.sub(r'^```python\s*|```$', '', response.text, flags=re.MULTILINE)
return code
except Exception as e:
print(f"Error in llm_generate_visualization_code: {e}\nRaw response: {response.text}")
return None
def execute_viz_code_and_get_path(viz_code, facet_data):
"""Executes visualization code and returns the path to the saved plot image."""
if not viz_code: return None
try:
if not os.path.exists('/tmp/plots'): os.makedirs('/tmp/plots')
plot_path = f"/tmp/plots/plot_{datetime.datetime.now().timestamp()}.png"
exec_globals = {'facet_data': facet_data, 'plt': plt, 'sns': sns, 'pd': pd}
exec(viz_code, exec_globals)
fig = exec_globals.get('fig')
if fig:
fig.savefig(plot_path, bbox_inches='tight')
plt.close(fig)
return plot_path
return None
except Exception as e:
print(f"ERROR executing visualization code: {e}\n---Code---\n{viz_code}")
return None |