|
""" |
|
Financial Document Analysis Workflow using Agno Workflows |
|
Clean, pure-python implementation with structured outputs to avoid JSON parsing issues |
|
""" |
|
|
|
import json |
|
from pathlib import Path |
|
from typing import Dict, List, Optional, Iterator |
|
from pydantic import BaseModel, Field |
|
|
|
from agno.agent import Agent, RunResponse |
|
from agno.models.google import Gemini |
|
from agno.media import File |
|
from agno.tools.file import FileTools |
|
from agno.tools.python import PythonTools |
|
from agno.workflow import Workflow |
|
from agno.utils.log import logger |
|
from agno.tools.shell import ShellTools |
|
from config.settings import settings |
|
from utils.prompt_loader import prompt_loader |
|
|
|
|
|
|
|
class DataPoint(BaseModel): |
|
"""Individual financial data point""" |
|
field_name: str = Field(..., description="Name of the financial data field") |
|
value: str = Field(..., description="Value of the field") |
|
category: str = Field(..., description="Financial category (revenue, expenses, assets, etc.)") |
|
period: str = Field(default="", description="Time period if applicable") |
|
unit: str = Field(default="", description="Currency or measurement unit") |
|
confidence: float = Field(default=0.9, description="Confidence score 0-1") |
|
|
|
class ExtractedFinancialData(BaseModel): |
|
"""Structured output for data extraction phase""" |
|
company_name: str = Field(default="", description="Company name") |
|
document_type: str = Field(..., description="Type of financial document") |
|
reporting_period: str = Field(default="", description="Reporting period") |
|
data_points: List[DataPoint] = Field(..., description="All extracted financial data points") |
|
summary: str = Field(..., description="Brief summary of extracted data") |
|
|
|
class FinancialCategory(BaseModel): |
|
"""A category of organized financial data""" |
|
category_name: str = Field(..., description="Name of the financial category") |
|
description: str = Field(..., description="Description of what this category contains") |
|
data_items: Dict[str, str] = Field(..., description="Key-value pairs of financial data") |
|
totals: Dict[str, str] = Field(default_factory=dict, description="Any calculated totals") |
|
|
|
class ArrangedFinancialData(BaseModel): |
|
"""Structured output for data arrangement phase""" |
|
categories: List[FinancialCategory] = Field(..., description="Organized financial categories") |
|
key_metrics: Dict[str, str] = Field(default_factory=dict, description="Key financial metrics") |
|
insights: List[str] = Field(default_factory=list, description="Financial insights and analysis") |
|
summary: str = Field(..., description="Summary of arranged data") |
|
|
|
class GeneratedCode(BaseModel): |
|
"""Structured output for code generation phase""" |
|
code: str = Field(..., description="Generated Python code for Excel creation") |
|
description: str = Field(..., description="Description of what the code does") |
|
output_filename: str = Field(..., description="Expected output filename") |
|
execution_notes: str = Field(default="", description="Notes about code execution") |
|
|
|
|
|
class FinancialDocumentWorkflow(Workflow): |
|
""" |
|
Pure Python workflow for financial document analysis |
|
Uses structured outputs to eliminate JSON parsing issues |
|
""" |
|
|
|
description: str = "Financial document analysis workflow with data extraction, organization, and Excel generation" |
|
|
|
|
|
data_extractor: Agent = Agent( |
|
model=Gemini(id=settings.DATA_EXTRACTOR_MODEL,thinking_budget=settings.DATA_EXTRACTOR_MODEL_THINKING_BUDGET,api_key=settings.GOOGLE_API_KEY), |
|
description="Expert financial data extraction specialist", |
|
instructions=prompt_loader.load_instructions_as_list("agents/data_extractor"), |
|
response_model=ExtractedFinancialData, |
|
structured_outputs=True, |
|
debug_mode=True, |
|
) |
|
|
|
|
|
data_arranger: Agent = Agent( |
|
model=Gemini(id=settings.DATA_ARRANGER_MODEL,api_key=settings.GOOGLE_API_KEY), |
|
description="Financial data organization and analysis expert", |
|
instructions=prompt_loader.load_instructions_as_list("agents/data_arranger"), |
|
tools=[FileTools()], |
|
|
|
markdown=True, |
|
debug_mode=True, |
|
add_memory_references=True, |
|
add_session_summary_references=True, |
|
exponential_backoff=True, |
|
retries=10, |
|
) |
|
|
|
|
|
code_generator = Agent( |
|
model=Gemini( |
|
id=settings.CODE_GENERATOR_MODEL, |
|
api_key=settings.GOOGLE_API_KEY |
|
), |
|
description="Excel report generator that analyzes JSON data and creates formatted workbooks using shell execution on any OS", |
|
goal="Generate a professional Excel report from arranged_financial_data.json with multiple worksheets, formatting, and charts", |
|
instructions=prompt_loader.load_instructions_as_list("agents/code_generator"), |
|
expected_output="A Financial_Report_YYYYMMDD_HHMMSS.xlsx file containing formatted data from the JSON with multiple worksheets, professional styling, and relevant charts", |
|
additional_context="This agent must work on Windows, Mac, and Linux. Always use os.path for file operations and handle path separators correctly. Include proper error handling for cross-platform compatibility.", |
|
tools=[ |
|
ShellTools(), |
|
FileTools(save_files=True, read_files=True, list_files=True), |
|
PythonTools(pip_install=True, save_and_run=False, run_code=False) |
|
], |
|
markdown=False, |
|
show_tool_calls=True, |
|
debug_mode=True, |
|
retries=10, |
|
add_datetime_to_instructions=True, |
|
delay_between_retries=10 |
|
) |
|
|
|
def __init__(self, session_id: str = None, **kwargs): |
|
super().__init__(session_id=session_id, **kwargs) |
|
self.session_id = session_id or f"financial_workflow_{int(__import__('time').time())}" |
|
self.session_output_dir = Path(settings.TEMP_DIR) / self.session_id / "output" |
|
self.session_input_dir = Path(settings.TEMP_DIR) / self.session_id / "input" |
|
self.session_temp_dir = Path(settings.TEMP_DIR) / self.session_id / "temp" |
|
|
|
|
|
self.session_output_dir.mkdir(parents=True, exist_ok=True) |
|
self.session_input_dir.mkdir(parents=True, exist_ok=True) |
|
self.session_temp_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
self._configure_agent_tools() |
|
|
|
logger.info(f"FinancialDocumentWorkflow initialized with session: {self.session_id}") |
|
|
|
def clear_cache(self): |
|
"""Clear workflow session cache and temporary files.""" |
|
try: |
|
|
|
self.session_state.clear() |
|
logger.info(f"Cleared workflow cache for session: {self.session_id}") |
|
|
|
|
|
if self.session_temp_dir.exists(): |
|
import shutil |
|
try: |
|
shutil.rmtree(self.session_temp_dir) |
|
self.session_temp_dir.mkdir(parents=True, exist_ok=True) |
|
logger.info(f"Cleaned temporary files for session: {self.session_id}") |
|
except Exception as e: |
|
logger.warning(f"Could not clean temp directory: {e}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error clearing workflow cache: {e}") |
|
|
|
def cleanup_session(self): |
|
"""Complete cleanup of session including all files.""" |
|
try: |
|
|
|
self.clear_cache() |
|
|
|
|
|
session_dir = Path(settings.TEMP_DIR) / self.session_id |
|
if session_dir.exists(): |
|
import shutil |
|
try: |
|
shutil.rmtree(session_dir) |
|
logger.info(f"Completely removed session directory: {session_dir}") |
|
except Exception as e: |
|
logger.warning(f"Could not remove session directory: {e}") |
|
|
|
except Exception as e: |
|
logger.error(f"Error during session cleanup: {e}") |
|
|
|
def _configure_agent_tools(self): |
|
"""Configure agent tools with the correct base directories""" |
|
|
|
if hasattr(self.data_arranger, 'tools') and self.data_arranger.tools: |
|
for tool in self.data_arranger.tools: |
|
if isinstance(tool, FileTools): |
|
tool.base_dir = self.session_output_dir |
|
|
|
|
|
if hasattr(self.code_generator, 'tools') and self.code_generator.tools: |
|
for tool in self.code_generator.tools: |
|
if isinstance(tool, FileTools): |
|
tool.base_dir = self.session_output_dir |
|
elif isinstance(tool, PythonTools): |
|
tool.base_dir = self.session_output_dir |
|
|
|
def run(self, file_path: str = None, **kwargs) -> RunResponse: |
|
""" |
|
Main workflow execution method |
|
Pure Python workflow execution - no streaming, no JSON parsing issues |
|
""" |
|
|
|
if file_path is None: |
|
file_path = getattr(self, 'file_path', None) |
|
|
|
if file_path is None: |
|
raise ValueError("file_path must be provided either as parameter or set as attribute") |
|
|
|
logger.info(f"Processing financial document: {file_path}") |
|
|
|
|
|
use_cache = kwargs.get('use_cache', True) |
|
|
|
|
|
if use_cache and "final_results" in self.session_state: |
|
logger.info("Returning cached results") |
|
return RunResponse( |
|
run_id=self.run_id, |
|
content=self.session_state["final_results"] |
|
) |
|
|
|
try: |
|
|
|
logger.info("Step 1: Extracting financial data...") |
|
|
|
|
|
if use_cache and "extracted_data" in self.session_state: |
|
extracted_data = ExtractedFinancialData.model_validate( |
|
self.session_state["extracted_data"] |
|
) |
|
logger.info("Using cached extraction data") |
|
else: |
|
document = File(filepath=file_path) |
|
extraction_prompt = prompt_loader.load_prompt("workflow/data_extraction", file_path=file_path) |
|
|
|
extraction_response: RunResponse = self.data_extractor.run( |
|
extraction_prompt, |
|
files=[document] |
|
) |
|
extracted_data: ExtractedFinancialData = extraction_response.content |
|
|
|
|
|
self.session_state["extracted_data"] = extracted_data.model_dump() |
|
logger.info(f"Extracted {len(extracted_data.data_points)} data points") |
|
|
|
|
|
logger.info("Step 2: Organizing financial data...") |
|
|
|
if use_cache and "arrangement_response" in self.session_state: |
|
arrangement_content = self.session_state["arrangement_response"] |
|
logger.info("Using cached arrangement data") |
|
else: |
|
|
|
extracted_json = extracted_data.model_dump_json(indent=2) |
|
logger.debug(f"Extracted data size: {len(extracted_json)} characters") |
|
logger.debug(f"First 200 chars of extracted data: {extracted_json[:200]}...") |
|
|
|
arrangement_prompt = prompt_loader.load_prompt("workflow/data_arrangement", |
|
extracted_data=extracted_json) |
|
|
|
|
|
if "{extracted_data}" in arrangement_prompt: |
|
logger.error("CRITICAL: Variable substitution failed! Prompt still contains {extracted_data} placeholder") |
|
logger.error(f"Prompt length: {len(arrangement_prompt)}") |
|
else: |
|
logger.info(f"Variable substitution successful. Prompt length: {len(arrangement_prompt)}") |
|
|
|
arrangement_response: RunResponse = self.data_arranger.run(arrangement_prompt) |
|
arrangement_content = arrangement_response.content |
|
|
|
|
|
self.session_state["arrangement_response"] = arrangement_content |
|
logger.info("Data organization completed - check output directory for arranged_financial_data.json") |
|
|
|
|
|
logger.info("Step 3: Generating and executing Excel code...") |
|
|
|
if use_cache and "code_generation_response" in self.session_state: |
|
code_generation_content = self.session_state["code_generation_response"] |
|
execution_success = self.session_state.get("execution_success", False) |
|
logger.info("Using cached code generation results") |
|
else: |
|
code_prompt = prompt_loader.load_prompt("workflow/code_generation") |
|
|
|
code_response: RunResponse = self.code_generator.run(code_prompt) |
|
code_generation_content = code_response.content |
|
|
|
|
|
execution_success = ( |
|
"error" not in code_generation_content.lower() or |
|
"success" in code_generation_content.lower() or |
|
"completed" in code_generation_content.lower() |
|
) |
|
|
|
|
|
self.session_state["code_generation_response"] = code_generation_content |
|
self.session_state["execution_success"] = execution_success |
|
|
|
logger.info(f"Code generation and execution completed: {'β
Success' if execution_success else 'β Failed'}") |
|
|
|
|
|
|
|
output_files = [] |
|
if self.session_output_dir.exists(): |
|
output_files = [f.name for f in self.session_output_dir.iterdir() if f.is_file()] |
|
|
|
results_summary = f""" |
|
# Financial Document Analysis Complete |
|
|
|
## Document Information |
|
- **Company**: {extracted_data.company_name or 'Not specified'} |
|
- **Document Type**: {extracted_data.document_type} |
|
- **Reporting Period**: {extracted_data.reporting_period or 'Not specified'} |
|
|
|
## Processing Summary |
|
- **Data Points Extracted**: {len(extracted_data.data_points)} |
|
- **Data Organization**: {'β
Completed' if arrangement_content else 'β Failed'} |
|
- **Excel Creation**: {'β
Success' if execution_success else 'β Failed'} |
|
|
|
## Data Organization Results |
|
{arrangement_content[:500] + '...' if arrangement_content and len(arrangement_content) > 500 else arrangement_content or 'No arrangement data available'} |
|
|
|
## Tool Execution Summary |
|
**Data Arranger**: Used FileTools to save organized data to JSON |
|
**Code Generator**: Used PythonTools and FileTools for Excel generation |
|
|
|
## Code Generation Results |
|
{code_generation_content[:500] + '...' if code_generation_content and len(code_generation_content) > 500 else code_generation_content or 'No code generation results available'} |
|
|
|
## Generated Files ({len(output_files)} files) |
|
{chr(10).join(f"- **{file}**" for file in output_files) if output_files else "- No files generated"} |
|
|
|
## Output Directory |
|
π `{self.session_output_dir}` |
|
|
|
--- |
|
*Generated using Agno Workflows with FileTools and PythonTools integration* |
|
*Note: Due to Gemini limitations, structured outputs were used for data extraction only* |
|
""" |
|
|
|
|
|
self.session_state["final_results"] = results_summary |
|
|
|
return RunResponse( |
|
run_id=self.run_id, |
|
content=results_summary |
|
) |
|
|
|
except Exception as e: |
|
error_message = f"β Workflow failed: {str(e)}" |
|
logger.error(f"Financial workflow error: {e}", exc_info=True) |
|
return RunResponse( |
|
run_id=self.run_id, |
|
content=error_message |
|
) |
|
|
|
def get_processing_status(self) -> Dict[str, str]: |
|
"""Get the current processing status""" |
|
status = { |
|
"extraction": "completed" if "extracted_data" in self.session_state else "pending", |
|
"arrangement": "completed" if "arranged_data" in self.session_state else "pending", |
|
"code_generation": "completed" if "generated_code" in self.session_state else "pending", |
|
"final_results": "completed" if "final_results" in self.session_state else "pending" |
|
} |
|
return status |