File size: 17,785 Bytes
cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 c9b74e0 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 0daea93 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 90b0a17 cfeb3a6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 |
"""
Financial Document Analysis Workflow using Agno Workflows
Clean, pure-python implementation with structured outputs to avoid JSON parsing issues
"""
import json
from pathlib import Path
from typing import Dict, List, Optional, Iterator
from pydantic import BaseModel, Field
from agno.agent import Agent, RunResponse
from agno.models.google import Gemini
from agno.media import File
from agno.tools.file import FileTools
from agno.tools.python import PythonTools
from agno.workflow import Workflow
from agno.utils.log import logger
from agno.tools.shell import ShellTools
from config.settings import settings
from utils.prompt_loader import prompt_loader
# Structured Output Models to avoid JSON parsing issues
class DataPoint(BaseModel):
"""Individual financial data point"""
field_name: str = Field(..., description="Name of the financial data field")
value: str = Field(..., description="Value of the field")
category: str = Field(..., description="Financial category (revenue, expenses, assets, etc.)")
period: str = Field(default="", description="Time period if applicable")
unit: str = Field(default="", description="Currency or measurement unit")
confidence: float = Field(default=0.9, description="Confidence score 0-1")
class ExtractedFinancialData(BaseModel):
"""Structured output for data extraction phase"""
company_name: str = Field(default="", description="Company name")
document_type: str = Field(..., description="Type of financial document")
reporting_period: str = Field(default="", description="Reporting period")
data_points: List[DataPoint] = Field(..., description="All extracted financial data points")
summary: str = Field(..., description="Brief summary of extracted data")
class FinancialCategory(BaseModel):
"""A category of organized financial data"""
category_name: str = Field(..., description="Name of the financial category")
description: str = Field(..., description="Description of what this category contains")
data_items: Dict[str, str] = Field(..., description="Key-value pairs of financial data")
totals: Dict[str, str] = Field(default_factory=dict, description="Any calculated totals")
class ArrangedFinancialData(BaseModel):
"""Structured output for data arrangement phase"""
categories: List[FinancialCategory] = Field(..., description="Organized financial categories")
key_metrics: Dict[str, str] = Field(default_factory=dict, description="Key financial metrics")
insights: List[str] = Field(default_factory=list, description="Financial insights and analysis")
summary: str = Field(..., description="Summary of arranged data")
class GeneratedCode(BaseModel):
"""Structured output for code generation phase"""
code: str = Field(..., description="Generated Python code for Excel creation")
description: str = Field(..., description="Description of what the code does")
output_filename: str = Field(..., description="Expected output filename")
execution_notes: str = Field(default="", description="Notes about code execution")
class FinancialDocumentWorkflow(Workflow):
"""
Pure Python workflow for financial document analysis
Uses structured outputs to eliminate JSON parsing issues
"""
description: str = "Financial document analysis workflow with data extraction, organization, and Excel generation"
# Data Extractor Agent - Structured output eliminates JSON parsing issues
data_extractor: Agent = Agent(
model=Gemini(id=settings.DATA_EXTRACTOR_MODEL,thinking_budget=settings.DATA_EXTRACTOR_MODEL_THINKING_BUDGET,api_key=settings.GOOGLE_API_KEY),
description="Expert financial data extraction specialist",
instructions=prompt_loader.load_instructions_as_list("agents/data_extractor"),
response_model=ExtractedFinancialData,
structured_outputs=True,
debug_mode=True,
)
# Data Arranger Agent - Organizes data into categories for Excel
data_arranger: Agent = Agent(
model=Gemini(id=settings.DATA_ARRANGER_MODEL,api_key=settings.GOOGLE_API_KEY),
description="Financial data organization and analysis expert",
instructions=prompt_loader.load_instructions_as_list("agents/data_arranger"),
tools=[FileTools()], # FileTools for saving arranged data
# NOTE: Cannot use structured_outputs with tools in Gemini - choosing tools over structured outputs
markdown=True,
debug_mode=True,
add_memory_references=True,
add_session_summary_references=True,
exponential_backoff=True,
retries=10,
)
# Code Generator Agent - Creates Excel generation code
code_generator = Agent(
model=Gemini(
id=settings.CODE_GENERATOR_MODEL,
api_key=settings.GOOGLE_API_KEY
),
description="Excel report generator that analyzes JSON data and creates formatted workbooks using shell execution on any OS",
goal="Generate a professional Excel report from arranged_financial_data.json with multiple worksheets, formatting, and charts",
instructions=prompt_loader.load_instructions_as_list("agents/code_generator"),
expected_output="A Financial_Report_YYYYMMDD_HHMMSS.xlsx file containing formatted data from the JSON with multiple worksheets, professional styling, and relevant charts",
additional_context="This agent must work on Windows, Mac, and Linux. Always use os.path for file operations and handle path separators correctly. Include proper error handling for cross-platform compatibility.",
tools=[
ShellTools(),
FileTools(save_files=True, read_files=True, list_files=True),
PythonTools(pip_install=True, save_and_run=False, run_code=False)
],
markdown=False,
show_tool_calls=True,
debug_mode=True,
retries=10,
add_datetime_to_instructions=True,
delay_between_retries=10
)
def __init__(self, session_id: str = None, **kwargs):
super().__init__(session_id=session_id, **kwargs)
self.session_id = session_id or f"financial_workflow_{int(__import__('time').time())}"
self.session_output_dir = Path(settings.TEMP_DIR) / self.session_id / "output"
self.session_input_dir = Path(settings.TEMP_DIR) / self.session_id / "input"
self.session_temp_dir = Path(settings.TEMP_DIR) / self.session_id / "temp"
# Create all session directories
self.session_output_dir.mkdir(parents=True, exist_ok=True)
self.session_input_dir.mkdir(parents=True, exist_ok=True)
self.session_temp_dir.mkdir(parents=True, exist_ok=True)
# Configure tools with correct base directories after initialization
self._configure_agent_tools()
logger.info(f"FinancialDocumentWorkflow initialized with session: {self.session_id}")
def clear_cache(self):
"""Clear workflow session cache and temporary files."""
try:
# Clear session state
self.session_state.clear()
logger.info(f"Cleared workflow cache for session: {self.session_id}")
# Clean up temporary files (keep input and output)
if self.session_temp_dir.exists():
import shutil
try:
shutil.rmtree(self.session_temp_dir)
self.session_temp_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Cleaned temporary files for session: {self.session_id}")
except Exception as e:
logger.warning(f"Could not clean temp directory: {e}")
except Exception as e:
logger.error(f"Error clearing workflow cache: {e}")
def cleanup_session(self):
"""Complete cleanup of session including all files."""
try:
# Clear cache first
self.clear_cache()
# Remove entire session directory
session_dir = Path(settings.TEMP_DIR) / self.session_id
if session_dir.exists():
import shutil
try:
shutil.rmtree(session_dir)
logger.info(f"Completely removed session directory: {session_dir}")
except Exception as e:
logger.warning(f"Could not remove session directory: {e}")
except Exception as e:
logger.error(f"Error during session cleanup: {e}")
def _configure_agent_tools(self):
"""Configure agent tools with the correct base directories"""
# Configure data arranger's FileTools with session output directory
if hasattr(self.data_arranger, 'tools') and self.data_arranger.tools:
for tool in self.data_arranger.tools:
if isinstance(tool, FileTools):
tool.base_dir = self.session_output_dir
# Configure code generator's tools with session output directory
if hasattr(self.code_generator, 'tools') and self.code_generator.tools:
for tool in self.code_generator.tools:
if isinstance(tool, FileTools):
tool.base_dir = self.session_output_dir
elif isinstance(tool, PythonTools):
tool.base_dir = self.session_output_dir
def run(self, file_path: str = None, **kwargs) -> RunResponse:
"""
Main workflow execution method
Pure Python workflow execution - no streaming, no JSON parsing issues
"""
# Handle file_path from parameter or attribute
if file_path is None:
file_path = getattr(self, 'file_path', None)
if file_path is None:
raise ValueError("file_path must be provided either as parameter or set as attribute")
logger.info(f"Processing financial document: {file_path}")
# Remove use_cache parameter since it's not defined in the method signature
use_cache = kwargs.get('use_cache', True)
# Check cache first if enabled
if use_cache and "final_results" in self.session_state:
logger.info("Returning cached results")
return RunResponse(
run_id=self.run_id,
content=self.session_state["final_results"]
)
try:
# Step 1: Extract Financial Data
logger.info("Step 1: Extracting financial data...")
# Check for cached extraction
if use_cache and "extracted_data" in self.session_state:
extracted_data = ExtractedFinancialData.model_validate(
self.session_state["extracted_data"]
)
logger.info("Using cached extraction data")
else:
document = File(filepath=file_path)
extraction_prompt = prompt_loader.load_prompt("workflow/data_extraction", file_path=file_path)
extraction_response: RunResponse = self.data_extractor.run(
extraction_prompt,
files=[document]
)
extracted_data: ExtractedFinancialData = extraction_response.content
# Cache the result
self.session_state["extracted_data"] = extracted_data.model_dump()
logger.info(f"Extracted {len(extracted_data.data_points)} data points")
# Step 2: Arrange and Organize Data
logger.info("Step 2: Organizing financial data...")
if use_cache and "arrangement_response" in self.session_state:
arrangement_content = self.session_state["arrangement_response"]
logger.info("Using cached arrangement data")
else:
# Debug: Check extracted data before passing to prompt
extracted_json = extracted_data.model_dump_json(indent=2)
logger.debug(f"Extracted data size: {len(extracted_json)} characters")
logger.debug(f"First 200 chars of extracted data: {extracted_json[:200]}...")
arrangement_prompt = prompt_loader.load_prompt("workflow/data_arrangement",
extracted_data=extracted_json)
# Debug: Check if prompt contains the actual data or just the placeholder
if "{extracted_data}" in arrangement_prompt:
logger.error("CRITICAL: Variable substitution failed! Prompt still contains {extracted_data} placeholder")
logger.error(f"Prompt length: {len(arrangement_prompt)}")
else:
logger.info(f"Variable substitution successful. Prompt length: {len(arrangement_prompt)}")
arrangement_response: RunResponse = self.data_arranger.run(arrangement_prompt)
arrangement_content = arrangement_response.content
# Cache the result
self.session_state["arrangement_response"] = arrangement_content
logger.info("Data organization completed - check output directory for arranged_financial_data.json")
# Step 3: Generate and Execute Excel Code
logger.info("Step 3: Generating and executing Excel code...")
if use_cache and "code_generation_response" in self.session_state:
code_generation_content = self.session_state["code_generation_response"]
execution_success = self.session_state.get("execution_success", False)
logger.info("Using cached code generation results")
else:
code_prompt = prompt_loader.load_prompt("workflow/code_generation")
code_response: RunResponse = self.code_generator.run(code_prompt)
code_generation_content = code_response.content
# Simple check for execution success based on response content
execution_success = (
"error" not in code_generation_content.lower() or
"success" in code_generation_content.lower() or
"completed" in code_generation_content.lower()
)
# Cache the results
self.session_state["code_generation_response"] = code_generation_content
self.session_state["execution_success"] = execution_success
logger.info(f"Code generation and execution completed: {'β
Success' if execution_success else 'β Failed'}")
# Prepare final results
# List actual output files
output_files = []
if self.session_output_dir.exists():
output_files = [f.name for f in self.session_output_dir.iterdir() if f.is_file()]
results_summary = f"""
# Financial Document Analysis Complete
## Document Information
- **Company**: {extracted_data.company_name or 'Not specified'}
- **Document Type**: {extracted_data.document_type}
- **Reporting Period**: {extracted_data.reporting_period or 'Not specified'}
## Processing Summary
- **Data Points Extracted**: {len(extracted_data.data_points)}
- **Data Organization**: {'β
Completed' if arrangement_content else 'β Failed'}
- **Excel Creation**: {'β
Success' if execution_success else 'β Failed'}
## Data Organization Results
{arrangement_content[:500] + '...' if arrangement_content and len(arrangement_content) > 500 else arrangement_content or 'No arrangement data available'}
## Tool Execution Summary
**Data Arranger**: Used FileTools to save organized data to JSON
**Code Generator**: Used PythonTools and FileTools for Excel generation
## Code Generation Results
{code_generation_content[:500] + '...' if code_generation_content and len(code_generation_content) > 500 else code_generation_content or 'No code generation results available'}
## Generated Files ({len(output_files)} files)
{chr(10).join(f"- **{file}**" for file in output_files) if output_files else "- No files generated"}
## Output Directory
π `{self.session_output_dir}`
---
*Generated using Agno Workflows with FileTools and PythonTools integration*
*Note: Due to Gemini limitations, structured outputs were used for data extraction only*
"""
# Cache final results
self.session_state["final_results"] = results_summary
return RunResponse(
run_id=self.run_id,
content=results_summary
)
except Exception as e:
error_message = f"β Workflow failed: {str(e)}"
logger.error(f"Financial workflow error: {e}", exc_info=True)
return RunResponse(
run_id=self.run_id,
content=error_message
)
def get_processing_status(self) -> Dict[str, str]:
"""Get the current processing status"""
status = {
"extraction": "completed" if "extracted_data" in self.session_state else "pending",
"arrangement": "completed" if "arranged_data" in self.session_state else "pending",
"code_generation": "completed" if "generated_code" in self.session_state else "pending",
"final_results": "completed" if "final_results" in self.session_state else "pending"
}
return status |