methunraj
refactor: remove thinking_budget parameter from Gemini model initialization in data arranger agent
c9b74e0
""" | |
Financial Document Analysis Workflow using Agno Workflows | |
Clean, pure-python implementation with structured outputs to avoid JSON parsing issues | |
""" | |
import json | |
from pathlib import Path | |
from typing import Dict, List, Optional, Iterator | |
from pydantic import BaseModel, Field | |
from agno.agent import Agent, RunResponse | |
from agno.models.google import Gemini | |
from agno.media import File | |
from agno.tools.file import FileTools | |
from agno.tools.python import PythonTools | |
from agno.workflow import Workflow | |
from agno.utils.log import logger | |
from agno.tools.shell import ShellTools | |
from config.settings import settings | |
from utils.prompt_loader import prompt_loader | |
from agno.models.openai import OpenAIChat | |
# Structured Output Models to avoid JSON parsing issues | |
class DataPoint(BaseModel): | |
"""Individual financial data point""" | |
field_name: str = Field(..., description="Name of the financial data field") | |
value: str = Field(..., description="Value of the field") | |
category: str = Field(..., description="Financial category (revenue, expenses, assets, etc.)") | |
period: str = Field(default="", description="Time period if applicable") | |
unit: str = Field(default="", description="Currency or measurement unit") | |
confidence: float = Field(default=0.9, description="Confidence score 0-1") | |
class ExtractedFinancialData(BaseModel): | |
"""Structured output for data extraction phase""" | |
company_name: str = Field(default="", description="Company name") | |
document_type: str = Field(..., description="Type of financial document") | |
reporting_period: str = Field(default="", description="Reporting period") | |
data_points: List[DataPoint] = Field(..., description="All extracted financial data points") | |
summary: str = Field(..., description="Brief summary of extracted data") | |
class FinancialCategory(BaseModel): | |
"""A category of organized financial data""" | |
category_name: str = Field(..., description="Name of the financial category") | |
description: str = Field(..., description="Description of what this category contains") | |
data_items: Dict[str, str] = Field(..., description="Key-value pairs of financial data") | |
totals: Dict[str, str] = Field(default_factory=dict, description="Any calculated totals") | |
class ArrangedFinancialData(BaseModel): | |
"""Structured output for data arrangement phase""" | |
categories: List[FinancialCategory] = Field(..., description="Organized financial categories") | |
key_metrics: Dict[str, str] = Field(default_factory=dict, description="Key financial metrics") | |
insights: List[str] = Field(default_factory=list, description="Financial insights and analysis") | |
summary: str = Field(..., description="Summary of arranged data") | |
class GeneratedCode(BaseModel): | |
"""Structured output for code generation phase""" | |
code: str = Field(..., description="Generated Python code for Excel creation") | |
description: str = Field(..., description="Description of what the code does") | |
output_filename: str = Field(..., description="Expected output filename") | |
execution_notes: str = Field(default="", description="Notes about code execution") | |
class FinancialDocumentWorkflow(Workflow): | |
""" | |
Pure Python workflow for financial document analysis | |
Uses structured outputs to eliminate JSON parsing issues | |
""" | |
description: str = "Financial document analysis workflow with data extraction, organization, and Excel generation" | |
# Data Extractor Agent - Structured output eliminates JSON parsing issues | |
data_extractor: Agent = Agent( | |
model=Gemini(id=settings.DATA_EXTRACTOR_MODEL,thinking_budget=settings.DATA_EXTRACTOR_MODEL_THINKING_BUDGET,api_key=settings.GOOGLE_API_KEY), | |
description="Expert financial data extraction specialist", | |
instructions=prompt_loader.load_instructions_as_list("agents/data_extractor"), | |
response_model=ExtractedFinancialData, | |
structured_outputs=True, | |
debug_mode=True, | |
) | |
# Data Arranger Agent - Organizes data into categories for Excel | |
data_arranger: Agent = Agent( | |
model=Gemini(id=settings.DATA_ARRANGER_MODEL,api_key=settings.GOOGLE_API_KEY), | |
description="Financial data organization and analysis expert", | |
instructions=prompt_loader.load_instructions_as_list("agents/data_arranger"), | |
tools=[FileTools()], # FileTools for saving arranged data | |
# NOTE: Cannot use structured_outputs with tools in Gemini - choosing tools over structured outputs | |
markdown=True, | |
debug_mode=True, | |
add_memory_references=True, | |
add_session_summary_references=True, | |
exponential_backoff=True, | |
retries=10, | |
) | |
# Code Generator Agent - Creates Excel generation code | |
code_generator = Agent( | |
model=Gemini( | |
id=settings.CODE_GENERATOR_MODEL, | |
api_key=settings.GOOGLE_API_KEY | |
), | |
description="Excel report generator that analyzes JSON data and creates formatted workbooks using shell execution on any OS", | |
goal="Generate a professional Excel report from arranged_financial_data.json with multiple worksheets, formatting, and charts", | |
instructions=prompt_loader.load_instructions_as_list("agents/code_generator"), | |
expected_output="A Financial_Report_YYYYMMDD_HHMMSS.xlsx file containing formatted data from the JSON with multiple worksheets, professional styling, and relevant charts", | |
additional_context="This agent must work on Windows, Mac, and Linux. Always use os.path for file operations and handle path separators correctly. Include proper error handling for cross-platform compatibility.", | |
tools=[ | |
ShellTools(), | |
FileTools(save_files=True, read_files=True, list_files=True), | |
PythonTools(pip_install=True, save_and_run=False, run_code=False) | |
], | |
markdown=False, | |
show_tool_calls=True, | |
debug_mode=True, | |
retries=10, | |
add_datetime_to_instructions=True, | |
delay_between_retries=10 | |
) | |
def __init__(self, session_id: str = None, **kwargs): | |
super().__init__(session_id=session_id, **kwargs) | |
self.session_id = session_id or f"financial_workflow_{int(__import__('time').time())}" | |
self.session_output_dir = Path(settings.TEMP_DIR) / self.session_id / "output" | |
self.session_input_dir = Path(settings.TEMP_DIR) / self.session_id / "input" | |
self.session_temp_dir = Path(settings.TEMP_DIR) / self.session_id / "temp" | |
# Create all session directories | |
self.session_output_dir.mkdir(parents=True, exist_ok=True) | |
self.session_input_dir.mkdir(parents=True, exist_ok=True) | |
self.session_temp_dir.mkdir(parents=True, exist_ok=True) | |
# Configure tools with correct base directories after initialization | |
self._configure_agent_tools() | |
logger.info(f"FinancialDocumentWorkflow initialized with session: {self.session_id}") | |
def clear_cache(self): | |
"""Clear workflow session cache and temporary files.""" | |
try: | |
# Clear session state | |
self.session_state.clear() | |
logger.info(f"Cleared workflow cache for session: {self.session_id}") | |
# Clean up temporary files (keep input and output) | |
if self.session_temp_dir.exists(): | |
import shutil | |
try: | |
shutil.rmtree(self.session_temp_dir) | |
self.session_temp_dir.mkdir(parents=True, exist_ok=True) | |
logger.info(f"Cleaned temporary files for session: {self.session_id}") | |
except Exception as e: | |
logger.warning(f"Could not clean temp directory: {e}") | |
except Exception as e: | |
logger.error(f"Error clearing workflow cache: {e}") | |
def cleanup_session(self): | |
"""Complete cleanup of session including all files.""" | |
try: | |
# Clear cache first | |
self.clear_cache() | |
# Remove entire session directory | |
session_dir = Path(settings.TEMP_DIR) / self.session_id | |
if session_dir.exists(): | |
import shutil | |
try: | |
shutil.rmtree(session_dir) | |
logger.info(f"Completely removed session directory: {session_dir}") | |
except Exception as e: | |
logger.warning(f"Could not remove session directory: {e}") | |
except Exception as e: | |
logger.error(f"Error during session cleanup: {e}") | |
def _configure_agent_tools(self): | |
"""Configure agent tools with the correct base directories""" | |
# Configure data arranger's FileTools with session output directory | |
if hasattr(self.data_arranger, 'tools') and self.data_arranger.tools: | |
for tool in self.data_arranger.tools: | |
if isinstance(tool, FileTools): | |
tool.base_dir = self.session_output_dir | |
# Configure code generator's tools with session output directory | |
if hasattr(self.code_generator, 'tools') and self.code_generator.tools: | |
for tool in self.code_generator.tools: | |
if isinstance(tool, FileTools): | |
tool.base_dir = self.session_output_dir | |
elif isinstance(tool, PythonTools): | |
tool.base_dir = self.session_output_dir | |
def run(self, file_path: str = None, **kwargs) -> RunResponse: | |
""" | |
Main workflow execution method | |
Pure Python workflow execution - no streaming, no JSON parsing issues | |
""" | |
# Handle file_path from parameter or attribute | |
if file_path is None: | |
file_path = getattr(self, 'file_path', None) | |
if file_path is None: | |
raise ValueError("file_path must be provided either as parameter or set as attribute") | |
logger.info(f"Processing financial document: {file_path}") | |
# Remove use_cache parameter since it's not defined in the method signature | |
use_cache = kwargs.get('use_cache', True) | |
# Check cache first if enabled | |
if use_cache and "final_results" in self.session_state: | |
logger.info("Returning cached results") | |
return RunResponse( | |
run_id=self.run_id, | |
content=self.session_state["final_results"] | |
) | |
try: | |
# Step 1: Extract Financial Data | |
logger.info("Step 1: Extracting financial data...") | |
# Check for cached extraction | |
if use_cache and "extracted_data" in self.session_state: | |
extracted_data = ExtractedFinancialData.model_validate( | |
self.session_state["extracted_data"] | |
) | |
logger.info("Using cached extraction data") | |
else: | |
document = File(filepath=file_path) | |
extraction_prompt = prompt_loader.load_prompt("workflow/data_extraction", file_path=file_path) | |
extraction_response: RunResponse = self.data_extractor.run( | |
extraction_prompt, | |
files=[document] | |
) | |
extracted_data: ExtractedFinancialData = extraction_response.content | |
# Cache the result | |
self.session_state["extracted_data"] = extracted_data.model_dump() | |
logger.info(f"Extracted {len(extracted_data.data_points)} data points") | |
# Step 2: Arrange and Organize Data | |
logger.info("Step 2: Organizing financial data...") | |
if use_cache and "arrangement_response" in self.session_state: | |
arrangement_content = self.session_state["arrangement_response"] | |
logger.info("Using cached arrangement data") | |
else: | |
# Debug: Check extracted data before passing to prompt | |
extracted_json = extracted_data.model_dump_json(indent=2) | |
logger.debug(f"Extracted data size: {len(extracted_json)} characters") | |
logger.debug(f"First 200 chars of extracted data: {extracted_json[:200]}...") | |
arrangement_prompt = prompt_loader.load_prompt("workflow/data_arrangement", | |
extracted_data=extracted_json) | |
# Debug: Check if prompt contains the actual data or just the placeholder | |
if "{extracted_data}" in arrangement_prompt: | |
logger.error("CRITICAL: Variable substitution failed! Prompt still contains {extracted_data} placeholder") | |
logger.error(f"Prompt length: {len(arrangement_prompt)}") | |
else: | |
logger.info(f"Variable substitution successful. Prompt length: {len(arrangement_prompt)}") | |
arrangement_response: RunResponse = self.data_arranger.run(arrangement_prompt) | |
arrangement_content = arrangement_response.content | |
# Cache the result | |
self.session_state["arrangement_response"] = arrangement_content | |
logger.info("Data organization completed - check output directory for arranged_financial_data.json") | |
# Step 3: Generate and Execute Excel Code | |
logger.info("Step 3: Generating and executing Excel code...") | |
if use_cache and "code_generation_response" in self.session_state: | |
code_generation_content = self.session_state["code_generation_response"] | |
execution_success = self.session_state.get("execution_success", False) | |
logger.info("Using cached code generation results") | |
else: | |
code_prompt = prompt_loader.load_prompt("workflow/code_generation") | |
code_response: RunResponse = self.code_generator.run(code_prompt) | |
code_generation_content = code_response.content | |
# Simple check for execution success based on response content | |
execution_success = ( | |
"error" not in code_generation_content.lower() or | |
"success" in code_generation_content.lower() or | |
"completed" in code_generation_content.lower() | |
) | |
# Cache the results | |
self.session_state["code_generation_response"] = code_generation_content | |
self.session_state["execution_success"] = execution_success | |
logger.info(f"Code generation and execution completed: {'β Success' if execution_success else 'β Failed'}") | |
# Prepare final results | |
# List actual output files | |
output_files = [] | |
if self.session_output_dir.exists(): | |
output_files = [f.name for f in self.session_output_dir.iterdir() if f.is_file()] | |
results_summary = f""" | |
# Financial Document Analysis Complete | |
## Document Information | |
- **Company**: {extracted_data.company_name or 'Not specified'} | |
- **Document Type**: {extracted_data.document_type} | |
- **Reporting Period**: {extracted_data.reporting_period or 'Not specified'} | |
## Processing Summary | |
- **Data Points Extracted**: {len(extracted_data.data_points)} | |
- **Data Organization**: {'β Completed' if arrangement_content else 'β Failed'} | |
- **Excel Creation**: {'β Success' if execution_success else 'β Failed'} | |
## Data Organization Results | |
{arrangement_content[:500] + '...' if arrangement_content and len(arrangement_content) > 500 else arrangement_content or 'No arrangement data available'} | |
## Tool Execution Summary | |
**Data Arranger**: Used FileTools to save organized data to JSON | |
**Code Generator**: Used PythonTools and FileTools for Excel generation | |
## Code Generation Results | |
{code_generation_content[:500] + '...' if code_generation_content and len(code_generation_content) > 500 else code_generation_content or 'No code generation results available'} | |
## Generated Files ({len(output_files)} files) | |
{chr(10).join(f"- **{file}**" for file in output_files) if output_files else "- No files generated"} | |
## Output Directory | |
π `{self.session_output_dir}` | |
--- | |
*Generated using Agno Workflows with FileTools and PythonTools integration* | |
*Note: Due to Gemini limitations, structured outputs were used for data extraction only* | |
""" | |
# Cache final results | |
self.session_state["final_results"] = results_summary | |
return RunResponse( | |
run_id=self.run_id, | |
content=results_summary | |
) | |
except Exception as e: | |
error_message = f"β Workflow failed: {str(e)}" | |
logger.error(f"Financial workflow error: {e}", exc_info=True) | |
return RunResponse( | |
run_id=self.run_id, | |
content=error_message | |
) | |
def get_processing_status(self) -> Dict[str, str]: | |
"""Get the current processing status""" | |
status = { | |
"extraction": "completed" if "extracted_data" in self.session_state else "pending", | |
"arrangement": "completed" if "arranged_data" in self.session_state else "pending", | |
"code_generation": "completed" if "generated_code" in self.session_state else "pending", | |
"final_results": "completed" if "final_results" in self.session_state else "pending" | |
} | |
return status |