Data_Extractor_Using_Gemini / workflow /financial_workflow.py
methunraj
refactor: remove unused OpenAIChat import from financial workflow
92cc167
raw
history blame
17.8 kB
"""
Financial Document Analysis Workflow using Agno Workflows
Clean, pure-python implementation with structured outputs to avoid JSON parsing issues
"""
import json
from pathlib import Path
from typing import Dict, List, Optional, Iterator
from pydantic import BaseModel, Field
from agno.agent import Agent, RunResponse
from agno.models.google import Gemini
from agno.media import File
from agno.tools.file import FileTools
from agno.tools.python import PythonTools
from agno.workflow import Workflow
from agno.utils.log import logger
from agno.tools.shell import ShellTools
from config.settings import settings
from utils.prompt_loader import prompt_loader
# Structured Output Models to avoid JSON parsing issues
class DataPoint(BaseModel):
"""Individual financial data point"""
field_name: str = Field(..., description="Name of the financial data field")
value: str = Field(..., description="Value of the field")
category: str = Field(..., description="Financial category (revenue, expenses, assets, etc.)")
period: str = Field(default="", description="Time period if applicable")
unit: str = Field(default="", description="Currency or measurement unit")
confidence: float = Field(default=0.9, description="Confidence score 0-1")
class ExtractedFinancialData(BaseModel):
"""Structured output for data extraction phase"""
company_name: str = Field(default="", description="Company name")
document_type: str = Field(..., description="Type of financial document")
reporting_period: str = Field(default="", description="Reporting period")
data_points: List[DataPoint] = Field(..., description="All extracted financial data points")
summary: str = Field(..., description="Brief summary of extracted data")
class FinancialCategory(BaseModel):
"""A category of organized financial data"""
category_name: str = Field(..., description="Name of the financial category")
description: str = Field(..., description="Description of what this category contains")
data_items: Dict[str, str] = Field(..., description="Key-value pairs of financial data")
totals: Dict[str, str] = Field(default_factory=dict, description="Any calculated totals")
class ArrangedFinancialData(BaseModel):
"""Structured output for data arrangement phase"""
categories: List[FinancialCategory] = Field(..., description="Organized financial categories")
key_metrics: Dict[str, str] = Field(default_factory=dict, description="Key financial metrics")
insights: List[str] = Field(default_factory=list, description="Financial insights and analysis")
summary: str = Field(..., description="Summary of arranged data")
class GeneratedCode(BaseModel):
"""Structured output for code generation phase"""
code: str = Field(..., description="Generated Python code for Excel creation")
description: str = Field(..., description="Description of what the code does")
output_filename: str = Field(..., description="Expected output filename")
execution_notes: str = Field(default="", description="Notes about code execution")
class FinancialDocumentWorkflow(Workflow):
"""
Pure Python workflow for financial document analysis
Uses structured outputs to eliminate JSON parsing issues
"""
description: str = "Financial document analysis workflow with data extraction, organization, and Excel generation"
# Data Extractor Agent - Structured output eliminates JSON parsing issues
data_extractor: Agent = Agent(
model=Gemini(id=settings.DATA_EXTRACTOR_MODEL,thinking_budget=settings.DATA_EXTRACTOR_MODEL_THINKING_BUDGET,api_key=settings.GOOGLE_API_KEY),
description="Expert financial data extraction specialist",
instructions=prompt_loader.load_instructions_as_list("agents/data_extractor"),
response_model=ExtractedFinancialData,
structured_outputs=True,
debug_mode=True,
)
# Data Arranger Agent - Organizes data into categories for Excel
data_arranger: Agent = Agent(
model=Gemini(id=settings.DATA_ARRANGER_MODEL,api_key=settings.GOOGLE_API_KEY),
description="Financial data organization and analysis expert",
instructions=prompt_loader.load_instructions_as_list("agents/data_arranger"),
tools=[FileTools()], # FileTools for saving arranged data
# NOTE: Cannot use structured_outputs with tools in Gemini - choosing tools over structured outputs
markdown=True,
debug_mode=True,
add_memory_references=True,
add_session_summary_references=True,
exponential_backoff=True,
retries=10,
)
# Code Generator Agent - Creates Excel generation code
code_generator = Agent(
model=Gemini(
id=settings.CODE_GENERATOR_MODEL,
api_key=settings.GOOGLE_API_KEY
),
description="Excel report generator that analyzes JSON data and creates formatted workbooks using shell execution on any OS",
goal="Generate a professional Excel report from arranged_financial_data.json with multiple worksheets, formatting, and charts",
instructions=prompt_loader.load_instructions_as_list("agents/code_generator"),
expected_output="A Financial_Report_YYYYMMDD_HHMMSS.xlsx file containing formatted data from the JSON with multiple worksheets, professional styling, and relevant charts",
additional_context="This agent must work on Windows, Mac, and Linux. Always use os.path for file operations and handle path separators correctly. Include proper error handling for cross-platform compatibility.",
tools=[
ShellTools(),
FileTools(save_files=True, read_files=True, list_files=True),
PythonTools(pip_install=True, save_and_run=False, run_code=False)
],
markdown=False,
show_tool_calls=True,
debug_mode=True,
retries=10,
add_datetime_to_instructions=True,
delay_between_retries=10
)
def __init__(self, session_id: str = None, **kwargs):
super().__init__(session_id=session_id, **kwargs)
self.session_id = session_id or f"financial_workflow_{int(__import__('time').time())}"
self.session_output_dir = Path(settings.TEMP_DIR) / self.session_id / "output"
self.session_input_dir = Path(settings.TEMP_DIR) / self.session_id / "input"
self.session_temp_dir = Path(settings.TEMP_DIR) / self.session_id / "temp"
# Create all session directories
self.session_output_dir.mkdir(parents=True, exist_ok=True)
self.session_input_dir.mkdir(parents=True, exist_ok=True)
self.session_temp_dir.mkdir(parents=True, exist_ok=True)
# Configure tools with correct base directories after initialization
self._configure_agent_tools()
logger.info(f"FinancialDocumentWorkflow initialized with session: {self.session_id}")
def clear_cache(self):
"""Clear workflow session cache and temporary files."""
try:
# Clear session state
self.session_state.clear()
logger.info(f"Cleared workflow cache for session: {self.session_id}")
# Clean up temporary files (keep input and output)
if self.session_temp_dir.exists():
import shutil
try:
shutil.rmtree(self.session_temp_dir)
self.session_temp_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"Cleaned temporary files for session: {self.session_id}")
except Exception as e:
logger.warning(f"Could not clean temp directory: {e}")
except Exception as e:
logger.error(f"Error clearing workflow cache: {e}")
def cleanup_session(self):
"""Complete cleanup of session including all files."""
try:
# Clear cache first
self.clear_cache()
# Remove entire session directory
session_dir = Path(settings.TEMP_DIR) / self.session_id
if session_dir.exists():
import shutil
try:
shutil.rmtree(session_dir)
logger.info(f"Completely removed session directory: {session_dir}")
except Exception as e:
logger.warning(f"Could not remove session directory: {e}")
except Exception as e:
logger.error(f"Error during session cleanup: {e}")
def _configure_agent_tools(self):
"""Configure agent tools with the correct base directories"""
# Configure data arranger's FileTools with session output directory
if hasattr(self.data_arranger, 'tools') and self.data_arranger.tools:
for tool in self.data_arranger.tools:
if isinstance(tool, FileTools):
tool.base_dir = self.session_output_dir
# Configure code generator's tools with session output directory
if hasattr(self.code_generator, 'tools') and self.code_generator.tools:
for tool in self.code_generator.tools:
if isinstance(tool, FileTools):
tool.base_dir = self.session_output_dir
elif isinstance(tool, PythonTools):
tool.base_dir = self.session_output_dir
def run(self, file_path: str = None, **kwargs) -> RunResponse:
"""
Main workflow execution method
Pure Python workflow execution - no streaming, no JSON parsing issues
"""
# Handle file_path from parameter or attribute
if file_path is None:
file_path = getattr(self, 'file_path', None)
if file_path is None:
raise ValueError("file_path must be provided either as parameter or set as attribute")
logger.info(f"Processing financial document: {file_path}")
# Remove use_cache parameter since it's not defined in the method signature
use_cache = kwargs.get('use_cache', True)
# Check cache first if enabled
if use_cache and "final_results" in self.session_state:
logger.info("Returning cached results")
return RunResponse(
run_id=self.run_id,
content=self.session_state["final_results"]
)
try:
# Step 1: Extract Financial Data
logger.info("Step 1: Extracting financial data...")
# Check for cached extraction
if use_cache and "extracted_data" in self.session_state:
extracted_data = ExtractedFinancialData.model_validate(
self.session_state["extracted_data"]
)
logger.info("Using cached extraction data")
else:
document = File(filepath=file_path)
extraction_prompt = prompt_loader.load_prompt("workflow/data_extraction", file_path=file_path)
extraction_response: RunResponse = self.data_extractor.run(
extraction_prompt,
files=[document]
)
extracted_data: ExtractedFinancialData = extraction_response.content
# Cache the result
self.session_state["extracted_data"] = extracted_data.model_dump()
logger.info(f"Extracted {len(extracted_data.data_points)} data points")
# Step 2: Arrange and Organize Data
logger.info("Step 2: Organizing financial data...")
if use_cache and "arrangement_response" in self.session_state:
arrangement_content = self.session_state["arrangement_response"]
logger.info("Using cached arrangement data")
else:
# Debug: Check extracted data before passing to prompt
extracted_json = extracted_data.model_dump_json(indent=2)
logger.debug(f"Extracted data size: {len(extracted_json)} characters")
logger.debug(f"First 200 chars of extracted data: {extracted_json[:200]}...")
arrangement_prompt = prompt_loader.load_prompt("workflow/data_arrangement",
extracted_data=extracted_json)
# Debug: Check if prompt contains the actual data or just the placeholder
if "{extracted_data}" in arrangement_prompt:
logger.error("CRITICAL: Variable substitution failed! Prompt still contains {extracted_data} placeholder")
logger.error(f"Prompt length: {len(arrangement_prompt)}")
else:
logger.info(f"Variable substitution successful. Prompt length: {len(arrangement_prompt)}")
arrangement_response: RunResponse = self.data_arranger.run(arrangement_prompt)
arrangement_content = arrangement_response.content
# Cache the result
self.session_state["arrangement_response"] = arrangement_content
logger.info("Data organization completed - check output directory for arranged_financial_data.json")
# Step 3: Generate and Execute Excel Code
logger.info("Step 3: Generating and executing Excel code...")
if use_cache and "code_generation_response" in self.session_state:
code_generation_content = self.session_state["code_generation_response"]
execution_success = self.session_state.get("execution_success", False)
logger.info("Using cached code generation results")
else:
code_prompt = prompt_loader.load_prompt("workflow/code_generation")
code_response: RunResponse = self.code_generator.run(code_prompt)
code_generation_content = code_response.content
# Simple check for execution success based on response content
execution_success = (
"error" not in code_generation_content.lower() or
"success" in code_generation_content.lower() or
"completed" in code_generation_content.lower()
)
# Cache the results
self.session_state["code_generation_response"] = code_generation_content
self.session_state["execution_success"] = execution_success
logger.info(f"Code generation and execution completed: {'βœ… Success' if execution_success else '❌ Failed'}")
# Prepare final results
# List actual output files
output_files = []
if self.session_output_dir.exists():
output_files = [f.name for f in self.session_output_dir.iterdir() if f.is_file()]
results_summary = f"""
# Financial Document Analysis Complete
## Document Information
- **Company**: {extracted_data.company_name or 'Not specified'}
- **Document Type**: {extracted_data.document_type}
- **Reporting Period**: {extracted_data.reporting_period or 'Not specified'}
## Processing Summary
- **Data Points Extracted**: {len(extracted_data.data_points)}
- **Data Organization**: {'βœ… Completed' if arrangement_content else '❌ Failed'}
- **Excel Creation**: {'βœ… Success' if execution_success else '❌ Failed'}
## Data Organization Results
{arrangement_content[:500] + '...' if arrangement_content and len(arrangement_content) > 500 else arrangement_content or 'No arrangement data available'}
## Tool Execution Summary
**Data Arranger**: Used FileTools to save organized data to JSON
**Code Generator**: Used PythonTools and FileTools for Excel generation
## Code Generation Results
{code_generation_content[:500] + '...' if code_generation_content and len(code_generation_content) > 500 else code_generation_content or 'No code generation results available'}
## Generated Files ({len(output_files)} files)
{chr(10).join(f"- **{file}**" for file in output_files) if output_files else "- No files generated"}
## Output Directory
πŸ“ `{self.session_output_dir}`
---
*Generated using Agno Workflows with FileTools and PythonTools integration*
*Note: Due to Gemini limitations, structured outputs were used for data extraction only*
"""
# Cache final results
self.session_state["final_results"] = results_summary
return RunResponse(
run_id=self.run_id,
content=results_summary
)
except Exception as e:
error_message = f"❌ Workflow failed: {str(e)}"
logger.error(f"Financial workflow error: {e}", exc_info=True)
return RunResponse(
run_id=self.run_id,
content=error_message
)
def get_processing_status(self) -> Dict[str, str]:
"""Get the current processing status"""
status = {
"extraction": "completed" if "extracted_data" in self.session_state else "pending",
"arrangement": "completed" if "arranged_data" in self.session_state else "pending",
"code_generation": "completed" if "generated_code" in self.session_state else "pending",
"final_results": "completed" if "final_results" in self.session_state else "pending"
}
return status