|
""" |
|
Financial Document Analysis Workflow - Agno Workflow 2.0 Implementation (Fixed) |
|
|
|
This workflow processes financial documents through a multi-agent system using the new |
|
step-based architecture introduced in Agno Workflow 2.0: |
|
1. Data Extractor Agent: Extracts structured financial data |
|
2. Data Arrangement Function: Organizes data into Excel-ready format |
|
3. Code Generator Agent: Creates professional Excel reports |
|
|
|
Built according to Agno Workflow 2.0 standards with simple sequential execution. |
|
""" |
|
|
|
import json |
|
import time |
|
from pathlib import Path |
|
from typing import Optional, Dict, Any |
|
from textwrap import dedent |
|
import os |
|
|
|
from agno.agent import Agent |
|
from agno.models.google import Gemini |
|
from agno.tools.file import FileTools |
|
from agno.tools.shell import ShellTools |
|
from agno.tools.python import PythonTools |
|
from agno.workflow.v2.workflow import Workflow |
|
from agno.workflow.v2.types import StepInput, StepOutput |
|
from agno.workflow.v2.step import Step |
|
from agno.storage.sqlite import SqliteStorage |
|
from agno.utils.log import logger |
|
from pydantic import BaseModel, Field |
|
|
|
from config.settings import settings |
|
from utils.prompt_loader import prompt_loader |
|
from utils.shell_toolkit import RestrictedShellTools |
|
from utils.restricted_python_tools import RestrictedPythonTools |
|
|
|
|
|
class DataPoint(BaseModel): |
|
"""Individual financial data point.""" |
|
field_name: str = Field(description="Name of the financial data field") |
|
value: str = Field(description="Value of the field") |
|
category: str = Field(description="Financial category (revenue, expenses, assets, etc.)") |
|
period: str = Field(default="", description="Time period if applicable") |
|
unit: str = Field(default="", description="Currency or measurement unit") |
|
confidence: float = Field(default=0.9, description="Confidence score 0-1") |
|
|
|
|
|
class Metadata(BaseModel): |
|
"""Metadata for extracted financial data.""" |
|
company_name: str = Field(default="Unknown Company", description="Company name") |
|
document_type: str = Field(default="Unknown", description="Type of financial document") |
|
reporting_period: str = Field(default="", description="Reporting period") |
|
currency: str = Field(default="", description="Primary currency used") |
|
|
|
|
|
class ExtractedFinancialData(BaseModel): |
|
"""Structured model for extracted financial data.""" |
|
data_points: list[DataPoint] = Field(description="List of extracted financial data points") |
|
summary: str = Field(description="Summary of the extracted data") |
|
metadata: Metadata = Field(default_factory=Metadata, description="Additional metadata") |
|
|
|
|
|
class FinancialDocumentWorkflow(Workflow): |
|
""" |
|
Financial document analysis workflow using Agno Workflow 2.0 step-based architecture. |
|
|
|
This workflow processes financial documents through three specialized steps: |
|
- Data extraction with structured outputs |
|
- Data arrangement for Excel compatibility |
|
- Excel report generation with formatting |
|
""" |
|
|
|
def __init__(self, session_id: Optional[str] = None, **kwargs): |
|
"""Initialize workflow with session management and step-based architecture.""" |
|
|
|
|
|
self._setup_session_directories(session_id) |
|
|
|
|
|
storage = SqliteStorage( |
|
table_name="financial_workflows", |
|
db_file="tmp/agno_workflows.db", |
|
mode="workflow_v2", |
|
auto_upgrade_schema=True |
|
) |
|
|
|
|
|
self.data_extractor = self._create_data_extractor() |
|
self.data_arranger = self._create_data_arranger() |
|
self.code_generator = self._create_code_generator() |
|
|
|
|
|
data_extraction_step = Step( |
|
name="FinancialDataExtractor", |
|
agent=self.data_extractor, |
|
description="Expert financial data extraction specialist optimized for Gemini" |
|
) |
|
|
|
data_arrangement_step = Step( |
|
name="DataArrangement", |
|
executor=self._arrangement_function, |
|
description="User-defined callable step for data arrangement" |
|
) |
|
|
|
excel_generation_step = Step( |
|
name="ExcelReportGenerator", |
|
agent=self.code_generator, |
|
description="Excel report generator optimized for Gemini with cross-platform support" |
|
) |
|
|
|
|
|
super().__init__( |
|
name="FinancialDocumentWorkflow", |
|
description=dedent("""\ |
|
Financial document analysis workflow using Agno Workflow 2.0 with step-based execution. |
|
Processes financial documents through extraction, arrangement, and Excel report generation. |
|
Uses session state for caching and proper error recovery mechanisms. |
|
"""), |
|
steps=[ |
|
data_extraction_step, |
|
data_arrangement_step, |
|
excel_generation_step |
|
], |
|
session_id=session_id, |
|
storage=storage, |
|
debug_mode=True, |
|
**kwargs |
|
) |
|
|
|
logger.info(f"FinancialDocumentWorkflow v2.0 initialized with session: {self.session_id}") |
|
logger.info(f"Session directories: {list(self.session_directories.keys())}") |
|
|
|
def _setup_session_directories(self, session_id: Optional[str] = None): |
|
"""Setup session-specific directories.""" |
|
self.session_id = session_id |
|
self.session_directories = settings.create_session_directories(self.session_id) |
|
self.session_output_dir = self.session_directories["output"] |
|
self.session_input_dir = self.session_directories["input"] |
|
self.session_temp_dir = self.session_directories["temp"] |
|
self.session_cache_dir = self.session_directories["cache"] |
|
|
|
def _create_data_extractor(self) -> Agent: |
|
"""Create the data extraction agent.""" |
|
return Agent( |
|
model=Gemini( |
|
id=settings.DATA_EXTRACTOR_MODEL, |
|
thinking_budget=settings.DATA_EXTRACTOR_MODEL_THINKING_BUDGET, |
|
api_key=settings.GOOGLE_API_KEY |
|
), |
|
name="FinancialDataExtractor", |
|
description="Expert financial data extraction specialist optimized for Gemini", |
|
instructions=prompt_loader.load_instructions_as_list("agents/data_extractor"), |
|
response_model=ExtractedFinancialData, |
|
structured_outputs=True, |
|
debug_mode=True, |
|
retries=10, |
|
delay_between_retries=10, |
|
exponential_backoff=True, |
|
) |
|
|
|
def _create_data_arranger(self) -> Agent: |
|
"""Create the data arrangement agent.""" |
|
logger.info(f"Data arranger base directory: {self.session_output_dir}") |
|
logger.info(f"Directory exists: {self.session_output_dir.exists()}") |
|
logger.info(f"Directory is writable: {os.access(self.session_output_dir, os.W_OK)}") |
|
return Agent( |
|
model=Gemini( |
|
id=settings.DATA_ARRANGER_MODEL, |
|
thinking_budget=settings.DATA_ARRANGER_MODEL_THINKING_BUDGET, |
|
api_key=settings.GOOGLE_API_KEY |
|
), |
|
name="FinancialDataArranger", |
|
description="Financial data organization specialist optimized for Gemini", |
|
instructions=prompt_loader.load_instructions_as_list("agents/data_arranger"), |
|
tools=[ |
|
RestrictedShellTools(base_dir=self.session_output_dir), |
|
FileTools(base_dir=self.session_output_dir, save_files=True, read_files=True, list_files=True), |
|
], |
|
markdown=False, |
|
debug_mode=True, |
|
add_memory_references=True, |
|
add_session_summary_references=True, |
|
retries=10, |
|
delay_between_retries=10, |
|
exponential_backoff=True, |
|
) |
|
|
|
def _create_code_generator(self) -> Agent: |
|
"""Create the code generation agent.""" |
|
return Agent( |
|
model=Gemini( |
|
id=settings.CODE_GENERATOR_MODEL, |
|
thinking_budget=settings.CODE_GENERATOR_MODEL_THINKING_BUDGET, |
|
api_key=settings.GOOGLE_API_KEY |
|
), |
|
name="ExcelReportGenerator", |
|
description="Excel report generator optimized for Gemini with cross-platform support", |
|
goal="Generate professional Excel reports from arranged financial data with multiple worksheets and formatting", |
|
instructions=prompt_loader.load_instructions_as_list("agents/code_generator"), |
|
expected_output="A professionally formatted Excel file with multiple worksheets, charts, and proper styling", |
|
additional_context=f"Working directory: {self.session_output_dir}. All files must be saved in this directory only.", |
|
tools=[ |
|
RestrictedShellTools(base_dir=self.session_output_dir), |
|
RestrictedPythonTools(base_dir=self.session_output_dir), |
|
FileTools(base_dir=self.session_output_dir, save_files=True, read_files=True, list_files=True) |
|
], |
|
markdown=False, |
|
show_tool_calls=True, |
|
debug_mode=True, |
|
add_datetime_to_instructions=True, |
|
retries=10, |
|
delay_between_retries=10, |
|
exponential_backoff=True, |
|
) |
|
|
|
def _arrangement_function(self, step_input: StepInput) -> StepOutput: |
|
"""Custom function for data arrangement step.""" |
|
try: |
|
message = step_input.message |
|
previous_step_content = step_input.previous_step_content |
|
|
|
logger.info("Starting data arrangement step") |
|
|
|
|
|
arrangement_prompt = prompt_loader.load_prompt("workflow/data_arrangement") |
|
|
|
|
|
full_arrangement_prompt = f"{arrangement_prompt}\n\nHere is the extracted financial data to arrange:\n\n{previous_step_content}" |
|
|
|
|
|
response = self.data_arranger.run(full_arrangement_prompt) |
|
|
|
|
|
if hasattr(self, 'session_state') and self.session_state: |
|
cache_key = f"arrangement_{int(time.time())}" |
|
self.session_state[cache_key] = response.content |
|
logger.info(f"Cached arrangement results with key: {cache_key}") |
|
|
|
logger.info("Data arrangement completed successfully") |
|
|
|
return StepOutput( |
|
content=response.content, |
|
response=response, |
|
success=True |
|
) |
|
|
|
except Exception as e: |
|
logger.error(f"Data arrangement failed: {str(e)}") |
|
return StepOutput( |
|
content=f"Data arrangement failed: {str(e)}", |
|
success=False, |
|
) |
|
|
|
def run(self, file_path: str = None, **kwargs): |
|
""" |
|
Main workflow execution using Workflow 2.0 step-based architecture. |
|
|
|
Args: |
|
file_path: Path to the financial document to process |
|
**kwargs: Additional parameters |
|
|
|
Returns: |
|
Workflow execution result using the new step-based system |
|
""" |
|
|
|
if file_path is None: |
|
file_path = kwargs.get('file_path') |
|
|
|
if file_path is None: |
|
logger.error("file_path is required but not provided") |
|
raise ValueError("file_path is required but not provided") |
|
|
|
start_time = time.time() |
|
|
|
try: |
|
|
|
file_path = Path(file_path).resolve() |
|
if not file_path.exists(): |
|
logger.error(f"File not found: {file_path}") |
|
raise FileNotFoundError(f"File not found: {file_path}") |
|
|
|
|
|
input_file = self.session_input_dir / file_path.name |
|
input_file.write_bytes(file_path.read_bytes()) |
|
|
|
logger.info(f"Starting financial document analysis for: {file_path.name}") |
|
|
|
|
|
from agno.media import File |
|
document = File(filepath=str(file_path)) |
|
|
|
|
|
extraction_prompt = prompt_loader.load_prompt( |
|
"workflow/data_extraction", |
|
file_path=str(file_path), |
|
output_directory=str(self.session_output_dir) |
|
) |
|
|
|
|
|
|
|
result = super().run( |
|
message=extraction_prompt, |
|
files=[document], |
|
**kwargs |
|
) |
|
|
|
|
|
execution_time = time.time() - start_time |
|
status = self._get_workflow_status() |
|
|
|
logger.info(f"Workflow completed successfully in {execution_time:.2f} seconds") |
|
logger.info(f"Results: {status}") |
|
|
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"Workflow execution failed: {str(e)}") |
|
raise |
|
|
|
def _get_workflow_status(self) -> Dict[str, Any]: |
|
"""Get current workflow status and file counts.""" |
|
status = { |
|
"session_id": self.session_id, |
|
"output_directory": str(self.session_output_dir), |
|
"json_files": 0, |
|
"excel_files": 0, |
|
"data_points": 0 |
|
} |
|
|
|
if self.session_output_dir.exists(): |
|
status["json_files"] = len(list(self.session_output_dir.glob("*.json"))) |
|
status["excel_files"] = len(list(self.session_output_dir.glob("*.xlsx"))) |
|
|
|
return status |
|
|
|
|
|
|
|
def create_financial_workflow(session_id: Optional[str] = None, **kwargs) -> FinancialDocumentWorkflow: |
|
""" |
|
Create a new FinancialDocumentWorkflow instance using Workflow 2.0. |
|
|
|
Args: |
|
session_id: Optional session ID for tracking workflow execution |
|
**kwargs: Additional parameters for workflow configuration |
|
|
|
Returns: |
|
FinancialDocumentWorkflow: Configured workflow instance |
|
""" |
|
return FinancialDocumentWorkflow(session_id=session_id, **kwargs) |