""" Financial Document Analysis Workflow using Agno Workflows Clean, pure-python implementation with structured outputs to avoid JSON parsing issues """ import json from pathlib import Path from typing import Dict, List, Optional, Iterator from pydantic import BaseModel, Field from agno.agent import Agent, RunResponse from agno.models.google import Gemini from agno.media import File from agno.tools.file import FileTools from agno.tools.python import PythonTools from agno.workflow import Workflow from agno.utils.log import logger from agno.tools.shell import ShellTools from config.settings import settings from utils.prompt_loader import prompt_loader # Structured Output Models to avoid JSON parsing issues class DataPoint(BaseModel): """Individual financial data point""" field_name: str = Field(..., description="Name of the financial data field") value: str = Field(..., description="Value of the field") category: str = Field(..., description="Financial category (revenue, expenses, assets, etc.)") period: str = Field(default="", description="Time period if applicable") unit: str = Field(default="", description="Currency or measurement unit") confidence: float = Field(default=0.9, description="Confidence score 0-1") class ExtractedFinancialData(BaseModel): """Structured output for data extraction phase""" company_name: str = Field(default="", description="Company name") document_type: str = Field(..., description="Type of financial document") reporting_period: str = Field(default="", description="Reporting period") data_points: List[DataPoint] = Field(..., description="All extracted financial data points") summary: str = Field(..., description="Brief summary of extracted data") class FinancialCategory(BaseModel): """A category of organized financial data""" category_name: str = Field(..., description="Name of the financial category") description: str = Field(..., description="Description of what this category contains") data_items: Dict[str, str] = Field(..., description="Key-value pairs of financial data") totals: Dict[str, str] = Field(default_factory=dict, description="Any calculated totals") class ArrangedFinancialData(BaseModel): """Structured output for data arrangement phase""" categories: List[FinancialCategory] = Field(..., description="Organized financial categories") key_metrics: Dict[str, str] = Field(default_factory=dict, description="Key financial metrics") insights: List[str] = Field(default_factory=list, description="Financial insights and analysis") summary: str = Field(..., description="Summary of arranged data") class GeneratedCode(BaseModel): """Structured output for code generation phase""" code: str = Field(..., description="Generated Python code for Excel creation") description: str = Field(..., description="Description of what the code does") output_filename: str = Field(..., description="Expected output filename") execution_notes: str = Field(default="", description="Notes about code execution") class FinancialDocumentWorkflow(Workflow): """ Pure Python workflow for financial document analysis Uses structured outputs to eliminate JSON parsing issues """ description: str = "Financial document analysis workflow with data extraction, organization, and Excel generation" # Data Extractor Agent - Structured output eliminates JSON parsing issues data_extractor: Agent = Agent( model=Gemini(id=settings.DATA_EXTRACTOR_MODEL,thinking_budget=settings.DATA_EXTRACTOR_MODEL_THINKING_BUDGET,api_key=settings.GOOGLE_API_KEY), description="Expert financial data extraction specialist", instructions=prompt_loader.load_instructions_as_list("agents/data_extractor"), response_model=ExtractedFinancialData, structured_outputs=True, debug_mode=True, ) # Data Arranger Agent - Organizes data into categories for Excel data_arranger: Agent = Agent( model=Gemini(id=settings.DATA_ARRANGER_MODEL,api_key=settings.GOOGLE_API_KEY), description="Financial data organization and analysis expert", instructions=prompt_loader.load_instructions_as_list("agents/data_arranger"), tools=[FileTools()], # FileTools for saving arranged data # NOTE: Cannot use structured_outputs with tools in Gemini - choosing tools over structured outputs markdown=True, debug_mode=True, add_memory_references=True, add_session_summary_references=True, exponential_backoff=True, retries=10, ) # Code Generator Agent - Creates Excel generation code code_generator = Agent( model=Gemini( id=settings.CODE_GENERATOR_MODEL, api_key=settings.GOOGLE_API_KEY ), description="Excel report generator that analyzes JSON data and creates formatted workbooks using shell execution on any OS", goal="Generate a professional Excel report from arranged_financial_data.json with multiple worksheets, formatting, and charts", instructions=prompt_loader.load_instructions_as_list("agents/code_generator"), expected_output="A Financial_Report_YYYYMMDD_HHMMSS.xlsx file containing formatted data from the JSON with multiple worksheets, professional styling, and relevant charts", additional_context="This agent must work on Windows, Mac, and Linux. Always use os.path for file operations and handle path separators correctly. Include proper error handling for cross-platform compatibility.", tools=[ ShellTools(), FileTools(save_files=True, read_files=True, list_files=True), PythonTools(pip_install=True, save_and_run=False, run_code=False) ], markdown=False, show_tool_calls=True, debug_mode=True, retries=10, add_datetime_to_instructions=True, delay_between_retries=10 ) def __init__(self, session_id: str = None, **kwargs): super().__init__(session_id=session_id, **kwargs) self.session_id = session_id or f"financial_workflow_{int(__import__('time').time())}" self.session_output_dir = Path(settings.TEMP_DIR) / self.session_id / "output" self.session_input_dir = Path(settings.TEMP_DIR) / self.session_id / "input" self.session_temp_dir = Path(settings.TEMP_DIR) / self.session_id / "temp" # Create all session directories self.session_output_dir.mkdir(parents=True, exist_ok=True) self.session_input_dir.mkdir(parents=True, exist_ok=True) self.session_temp_dir.mkdir(parents=True, exist_ok=True) # Configure tools with correct base directories after initialization self._configure_agent_tools() logger.info(f"FinancialDocumentWorkflow initialized with session: {self.session_id}") def clear_cache(self): """Clear workflow session cache and temporary files.""" try: # Clear session state self.session_state.clear() logger.info(f"Cleared workflow cache for session: {self.session_id}") # Clean up temporary files (keep input and output) if self.session_temp_dir.exists(): import shutil try: shutil.rmtree(self.session_temp_dir) self.session_temp_dir.mkdir(parents=True, exist_ok=True) logger.info(f"Cleaned temporary files for session: {self.session_id}") except Exception as e: logger.warning(f"Could not clean temp directory: {e}") except Exception as e: logger.error(f"Error clearing workflow cache: {e}") def cleanup_session(self): """Complete cleanup of session including all files.""" try: # Clear cache first self.clear_cache() # Remove entire session directory session_dir = Path(settings.TEMP_DIR) / self.session_id if session_dir.exists(): import shutil try: shutil.rmtree(session_dir) logger.info(f"Completely removed session directory: {session_dir}") except Exception as e: logger.warning(f"Could not remove session directory: {e}") except Exception as e: logger.error(f"Error during session cleanup: {e}") def _configure_agent_tools(self): """Configure agent tools with the correct base directories""" # Configure data arranger's FileTools with session output directory if hasattr(self.data_arranger, 'tools') and self.data_arranger.tools: for tool in self.data_arranger.tools: if isinstance(tool, FileTools): tool.base_dir = self.session_output_dir # Configure code generator's tools with session output directory if hasattr(self.code_generator, 'tools') and self.code_generator.tools: for tool in self.code_generator.tools: if isinstance(tool, FileTools): tool.base_dir = self.session_output_dir elif isinstance(tool, PythonTools): tool.base_dir = self.session_output_dir def run(self, file_path: str = None, **kwargs) -> RunResponse: """ Main workflow execution method Pure Python workflow execution - no streaming, no JSON parsing issues """ # Handle file_path from parameter or attribute if file_path is None: file_path = getattr(self, 'file_path', None) if file_path is None: raise ValueError("file_path must be provided either as parameter or set as attribute") logger.info(f"Processing financial document: {file_path}") # Remove use_cache parameter since it's not defined in the method signature use_cache = kwargs.get('use_cache', True) # Check cache first if enabled if use_cache and "final_results" in self.session_state: logger.info("Returning cached results") return RunResponse( run_id=self.run_id, content=self.session_state["final_results"] ) try: # Step 1: Extract Financial Data logger.info("Step 1: Extracting financial data...") # Check for cached extraction if use_cache and "extracted_data" in self.session_state: extracted_data = ExtractedFinancialData.model_validate( self.session_state["extracted_data"] ) logger.info("Using cached extraction data") else: document = File(filepath=file_path) extraction_prompt = prompt_loader.load_prompt("workflow/data_extraction", file_path=file_path) extraction_response: RunResponse = self.data_extractor.run( extraction_prompt, files=[document] ) extracted_data: ExtractedFinancialData = extraction_response.content # Cache the result self.session_state["extracted_data"] = extracted_data.model_dump() logger.info(f"Extracted {len(extracted_data.data_points)} data points") # Step 2: Arrange and Organize Data logger.info("Step 2: Organizing financial data...") if use_cache and "arrangement_response" in self.session_state: arrangement_content = self.session_state["arrangement_response"] logger.info("Using cached arrangement data") else: # Debug: Check extracted data before passing to prompt extracted_json = extracted_data.model_dump_json(indent=2) logger.debug(f"Extracted data size: {len(extracted_json)} characters") logger.debug(f"First 200 chars of extracted data: {extracted_json[:200]}...") arrangement_prompt = prompt_loader.load_prompt("workflow/data_arrangement", extracted_data=extracted_json) # Debug: Check if prompt contains the actual data or just the placeholder if "{extracted_data}" in arrangement_prompt: logger.error("CRITICAL: Variable substitution failed! Prompt still contains {extracted_data} placeholder") logger.error(f"Prompt length: {len(arrangement_prompt)}") else: logger.info(f"Variable substitution successful. Prompt length: {len(arrangement_prompt)}") arrangement_response: RunResponse = self.data_arranger.run(arrangement_prompt) arrangement_content = arrangement_response.content # Cache the result self.session_state["arrangement_response"] = arrangement_content logger.info("Data organization completed - check output directory for arranged_financial_data.json") # Step 3: Generate and Execute Excel Code logger.info("Step 3: Generating and executing Excel code...") if use_cache and "code_generation_response" in self.session_state: code_generation_content = self.session_state["code_generation_response"] execution_success = self.session_state.get("execution_success", False) logger.info("Using cached code generation results") else: code_prompt = prompt_loader.load_prompt("workflow/code_generation") code_response: RunResponse = self.code_generator.run(code_prompt) code_generation_content = code_response.content # Simple check for execution success based on response content execution_success = ( "error" not in code_generation_content.lower() or "success" in code_generation_content.lower() or "completed" in code_generation_content.lower() ) # Cache the results self.session_state["code_generation_response"] = code_generation_content self.session_state["execution_success"] = execution_success logger.info(f"Code generation and execution completed: {'✅ Success' if execution_success else '❌ Failed'}") # Prepare final results # List actual output files output_files = [] if self.session_output_dir.exists(): output_files = [f.name for f in self.session_output_dir.iterdir() if f.is_file()] results_summary = f""" # Financial Document Analysis Complete ## Document Information - **Company**: {extracted_data.company_name or 'Not specified'} - **Document Type**: {extracted_data.document_type} - **Reporting Period**: {extracted_data.reporting_period or 'Not specified'} ## Processing Summary - **Data Points Extracted**: {len(extracted_data.data_points)} - **Data Organization**: {'✅ Completed' if arrangement_content else '❌ Failed'} - **Excel Creation**: {'✅ Success' if execution_success else '❌ Failed'} ## Data Organization Results {arrangement_content[:500] + '...' if arrangement_content and len(arrangement_content) > 500 else arrangement_content or 'No arrangement data available'} ## Tool Execution Summary **Data Arranger**: Used FileTools to save organized data to JSON **Code Generator**: Used PythonTools and FileTools for Excel generation ## Code Generation Results {code_generation_content[:500] + '...' if code_generation_content and len(code_generation_content) > 500 else code_generation_content or 'No code generation results available'} ## Generated Files ({len(output_files)} files) {chr(10).join(f"- **{file}**" for file in output_files) if output_files else "- No files generated"} ## Output Directory 📁 `{self.session_output_dir}` --- *Generated using Agno Workflows with FileTools and PythonTools integration* *Note: Due to Gemini limitations, structured outputs were used for data extraction only* """ # Cache final results self.session_state["final_results"] = results_summary return RunResponse( run_id=self.run_id, content=results_summary ) except Exception as e: error_message = f"❌ Workflow failed: {str(e)}" logger.error(f"Financial workflow error: {e}", exc_info=True) return RunResponse( run_id=self.run_id, content=error_message ) def get_processing_status(self) -> Dict[str, str]: """Get the current processing status""" status = { "extraction": "completed" if "extracted_data" in self.session_state else "pending", "arrangement": "completed" if "arranged_data" in self.session_state else "pending", "code_generation": "completed" if "generated_code" in self.session_state else "pending", "final_results": "completed" if "final_results" in self.session_state else "pending" } return status