File size: 17,785 Bytes
cfeb3a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90b0a17
cfeb3a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90b0a17
cfeb3a6
90b0a17
cfeb3a6
 
 
 
 
 
 
c9b74e0
cfeb3a6
90b0a17
cfeb3a6
 
 
 
 
 
 
 
 
90b0a17
cfeb3a6
 
 
 
90b0a17
cfeb3a6
 
 
90b0a17
cfeb3a6
 
 
 
 
 
 
0daea93
cfeb3a6
 
 
 
 
 
 
 
 
 
 
90b0a17
 
 
 
cfeb3a6
90b0a17
 
cfeb3a6
 
 
 
 
90b0a17
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cfeb3a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90b0a17
cfeb3a6
90b0a17
cfeb3a6
 
90b0a17
 
 
 
 
 
 
cfeb3a6
 
90b0a17
 
 
cfeb3a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90b0a17
cfeb3a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90b0a17
 
 
 
 
 
 
cfeb3a6
90b0a17
 
 
 
 
 
cfeb3a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
90b0a17
cfeb3a6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
"""
Financial Document Analysis Workflow using Agno Workflows
Clean, pure-python implementation with structured outputs to avoid JSON parsing issues
"""

import json
from pathlib import Path
from typing import Dict, List, Optional, Iterator
from pydantic import BaseModel, Field

from agno.agent import Agent, RunResponse
from agno.models.google import Gemini  
from agno.media import File
from agno.tools.file import FileTools
from agno.tools.python import PythonTools
from agno.workflow import Workflow
from agno.utils.log import logger
from agno.tools.shell import ShellTools
from config.settings import settings
from utils.prompt_loader import prompt_loader


# Structured Output Models to avoid JSON parsing issues
class DataPoint(BaseModel):
    """Individual financial data point"""
    field_name: str = Field(..., description="Name of the financial data field")
    value: str = Field(..., description="Value of the field")
    category: str = Field(..., description="Financial category (revenue, expenses, assets, etc.)")
    period: str = Field(default="", description="Time period if applicable")
    unit: str = Field(default="", description="Currency or measurement unit")
    confidence: float = Field(default=0.9, description="Confidence score 0-1")

class ExtractedFinancialData(BaseModel):
    """Structured output for data extraction phase"""
    company_name: str = Field(default="", description="Company name")
    document_type: str = Field(..., description="Type of financial document")
    reporting_period: str = Field(default="", description="Reporting period")
    data_points: List[DataPoint] = Field(..., description="All extracted financial data points")
    summary: str = Field(..., description="Brief summary of extracted data")

class FinancialCategory(BaseModel):
    """A category of organized financial data"""
    category_name: str = Field(..., description="Name of the financial category")
    description: str = Field(..., description="Description of what this category contains")
    data_items: Dict[str, str] = Field(..., description="Key-value pairs of financial data")
    totals: Dict[str, str] = Field(default_factory=dict, description="Any calculated totals")

class ArrangedFinancialData(BaseModel):
    """Structured output for data arrangement phase"""
    categories: List[FinancialCategory] = Field(..., description="Organized financial categories")
    key_metrics: Dict[str, str] = Field(default_factory=dict, description="Key financial metrics")
    insights: List[str] = Field(default_factory=list, description="Financial insights and analysis")
    summary: str = Field(..., description="Summary of arranged data")

class GeneratedCode(BaseModel):
    """Structured output for code generation phase"""
    code: str = Field(..., description="Generated Python code for Excel creation")
    description: str = Field(..., description="Description of what the code does")
    output_filename: str = Field(..., description="Expected output filename")
    execution_notes: str = Field(default="", description="Notes about code execution")


class FinancialDocumentWorkflow(Workflow):
    """
    Pure Python workflow for financial document analysis
    Uses structured outputs to eliminate JSON parsing issues
    """
    
    description: str = "Financial document analysis workflow with data extraction, organization, and Excel generation"
    
    # Data Extractor Agent - Structured output eliminates JSON parsing issues
    data_extractor: Agent = Agent(
        model=Gemini(id=settings.DATA_EXTRACTOR_MODEL,thinking_budget=settings.DATA_EXTRACTOR_MODEL_THINKING_BUDGET,api_key=settings.GOOGLE_API_KEY),
        description="Expert financial data extraction specialist",
        instructions=prompt_loader.load_instructions_as_list("agents/data_extractor"),
        response_model=ExtractedFinancialData,
        structured_outputs=True,
        debug_mode=True,
    )
    
    # Data Arranger Agent - Organizes data into categories for Excel
    data_arranger: Agent = Agent(
        model=Gemini(id=settings.DATA_ARRANGER_MODEL,api_key=settings.GOOGLE_API_KEY),
        description="Financial data organization and analysis expert", 
        instructions=prompt_loader.load_instructions_as_list("agents/data_arranger"),
        tools=[FileTools()],  # FileTools for saving arranged data
        # NOTE: Cannot use structured_outputs with tools in Gemini - choosing tools over structured outputs
        markdown=True,
        debug_mode=True,
        add_memory_references=True,
        add_session_summary_references=True,
        exponential_backoff=True,
        retries=10,
    )

    # Code Generator Agent - Creates Excel generation code
    code_generator = Agent(
        model=Gemini(
            id=settings.CODE_GENERATOR_MODEL,
            api_key=settings.GOOGLE_API_KEY
        ),
        description="Excel report generator that analyzes JSON data and creates formatted workbooks using shell execution on any OS",
        goal="Generate a professional Excel report from arranged_financial_data.json with multiple worksheets, formatting, and charts",
        instructions=prompt_loader.load_instructions_as_list("agents/code_generator"),
        expected_output="A Financial_Report_YYYYMMDD_HHMMSS.xlsx file containing formatted data from the JSON with multiple worksheets, professional styling, and relevant charts",
        additional_context="This agent must work on Windows, Mac, and Linux. Always use os.path for file operations and handle path separators correctly. Include proper error handling for cross-platform compatibility.",
        tools=[
            ShellTools(),
            FileTools(save_files=True, read_files=True, list_files=True),
            PythonTools(pip_install=True, save_and_run=False, run_code=False)
        ],
        markdown=False,
        show_tool_calls=True,
        debug_mode=True,
        retries=10,
        add_datetime_to_instructions=True,
        delay_between_retries=10
            )

    def __init__(self, session_id: str = None, **kwargs):
        super().__init__(session_id=session_id, **kwargs)
        self.session_id = session_id or f"financial_workflow_{int(__import__('time').time())}"
        self.session_output_dir = Path(settings.TEMP_DIR) / self.session_id / "output"
        self.session_input_dir = Path(settings.TEMP_DIR) / self.session_id / "input"
        self.session_temp_dir = Path(settings.TEMP_DIR) / self.session_id / "temp"
        
        # Create all session directories
        self.session_output_dir.mkdir(parents=True, exist_ok=True)
        self.session_input_dir.mkdir(parents=True, exist_ok=True)  
        self.session_temp_dir.mkdir(parents=True, exist_ok=True)
        
        # Configure tools with correct base directories after initialization
        self._configure_agent_tools()
        
        logger.info(f"FinancialDocumentWorkflow initialized with session: {self.session_id}")
        
    def clear_cache(self):
        """Clear workflow session cache and temporary files."""
        try:
            # Clear session state
            self.session_state.clear()
            logger.info(f"Cleared workflow cache for session: {self.session_id}")
            
            # Clean up temporary files (keep input and output)
            if self.session_temp_dir.exists():
                import shutil
                try:
                    shutil.rmtree(self.session_temp_dir)
                    self.session_temp_dir.mkdir(parents=True, exist_ok=True)
                    logger.info(f"Cleaned temporary files for session: {self.session_id}")
                except Exception as e:
                    logger.warning(f"Could not clean temp directory: {e}")
                    
        except Exception as e:
            logger.error(f"Error clearing workflow cache: {e}")
    
    def cleanup_session(self):
        """Complete cleanup of session including all files."""
        try:
            # Clear cache first
            self.clear_cache()
            
            # Remove entire session directory
            session_dir = Path(settings.TEMP_DIR) / self.session_id
            if session_dir.exists():
                import shutil
                try:
                    shutil.rmtree(session_dir)
                    logger.info(f"Completely removed session directory: {session_dir}")
                except Exception as e:
                    logger.warning(f"Could not remove session directory: {e}")
                    
        except Exception as e:
            logger.error(f"Error during session cleanup: {e}")

    def _configure_agent_tools(self):
        """Configure agent tools with the correct base directories"""
        # Configure data arranger's FileTools with session output directory
        if hasattr(self.data_arranger, 'tools') and self.data_arranger.tools:
            for tool in self.data_arranger.tools:
                if isinstance(tool, FileTools):
                    tool.base_dir = self.session_output_dir
                    
        # Configure code generator's tools with session output directory  
        if hasattr(self.code_generator, 'tools') and self.code_generator.tools:
            for tool in self.code_generator.tools:
                if isinstance(tool, FileTools):
                    tool.base_dir = self.session_output_dir
                elif isinstance(tool, PythonTools):
                    tool.base_dir = self.session_output_dir

    def run(self, file_path: str = None, **kwargs) -> RunResponse:
        """
        Main workflow execution method
        Pure Python workflow execution - no streaming, no JSON parsing issues
        """
        # Handle file_path from parameter or attribute
        if file_path is None:
            file_path = getattr(self, 'file_path', None)
        
        if file_path is None:
            raise ValueError("file_path must be provided either as parameter or set as attribute")
            
        logger.info(f"Processing financial document: {file_path}")
        
        # Remove use_cache parameter since it's not defined in the method signature
        use_cache = kwargs.get('use_cache', True)
        
        # Check cache first if enabled
        if use_cache and "final_results" in self.session_state:
            logger.info("Returning cached results")
            return RunResponse(
                run_id=self.run_id,
                content=self.session_state["final_results"]
            )
        
        try:
            # Step 1: Extract Financial Data
            logger.info("Step 1: Extracting financial data...")
            
            # Check for cached extraction
            if use_cache and "extracted_data" in self.session_state:
                extracted_data = ExtractedFinancialData.model_validate(
                    self.session_state["extracted_data"]
                )
                logger.info("Using cached extraction data")
            else:
                document = File(filepath=file_path)
                extraction_prompt = prompt_loader.load_prompt("workflow/data_extraction", file_path=file_path)
                
                extraction_response: RunResponse = self.data_extractor.run(
                    extraction_prompt, 
                    files=[document]
                )
                extracted_data: ExtractedFinancialData = extraction_response.content
                
                # Cache the result
                self.session_state["extracted_data"] = extracted_data.model_dump()
                logger.info(f"Extracted {len(extracted_data.data_points)} data points")
            
            # Step 2: Arrange and Organize Data
            logger.info("Step 2: Organizing financial data...")
            
            if use_cache and "arrangement_response" in self.session_state:
                arrangement_content = self.session_state["arrangement_response"]
                logger.info("Using cached arrangement data")
            else:
                # Debug: Check extracted data before passing to prompt
                extracted_json = extracted_data.model_dump_json(indent=2)
                logger.debug(f"Extracted data size: {len(extracted_json)} characters")
                logger.debug(f"First 200 chars of extracted data: {extracted_json[:200]}...")
                
                arrangement_prompt = prompt_loader.load_prompt("workflow/data_arrangement", 
                                                                extracted_data=extracted_json)
                
                # Debug: Check if prompt contains the actual data or just the placeholder
                if "{extracted_data}" in arrangement_prompt:
                    logger.error("CRITICAL: Variable substitution failed! Prompt still contains {extracted_data} placeholder")
                    logger.error(f"Prompt length: {len(arrangement_prompt)}")
                else:
                    logger.info(f"Variable substitution successful. Prompt length: {len(arrangement_prompt)}")
                
                arrangement_response: RunResponse = self.data_arranger.run(arrangement_prompt)
                arrangement_content = arrangement_response.content
                
                # Cache the result
                self.session_state["arrangement_response"] = arrangement_content
                logger.info("Data organization completed - check output directory for arranged_financial_data.json")
            
            # Step 3: Generate and Execute Excel Code
            logger.info("Step 3: Generating and executing Excel code...")
            
            if use_cache and "code_generation_response" in self.session_state:
                code_generation_content = self.session_state["code_generation_response"]
                execution_success = self.session_state.get("execution_success", False)
                logger.info("Using cached code generation results")
            else:
                code_prompt = prompt_loader.load_prompt("workflow/code_generation")
                
                code_response: RunResponse = self.code_generator.run(code_prompt)
                code_generation_content = code_response.content
                
                # Simple check for execution success based on response content
                execution_success = (
                    "error" not in code_generation_content.lower() or
                    "success" in code_generation_content.lower() or
                    "completed" in code_generation_content.lower()
                )
                
                # Cache the results
                self.session_state["code_generation_response"] = code_generation_content
                self.session_state["execution_success"] = execution_success
                
                logger.info(f"Code generation and execution completed: {'βœ… Success' if execution_success else '❌ Failed'}")
            
            # Prepare final results
            # List actual output files
            output_files = []
            if self.session_output_dir.exists():
                output_files = [f.name for f in self.session_output_dir.iterdir() if f.is_file()]
            
            results_summary = f"""
# Financial Document Analysis Complete

## Document Information
- **Company**: {extracted_data.company_name or 'Not specified'}
- **Document Type**: {extracted_data.document_type}
- **Reporting Period**: {extracted_data.reporting_period or 'Not specified'}

## Processing Summary
- **Data Points Extracted**: {len(extracted_data.data_points)}
- **Data Organization**: {'βœ… Completed' if arrangement_content else '❌ Failed'}
- **Excel Creation**: {'βœ… Success' if execution_success else '❌ Failed'}

## Data Organization Results
{arrangement_content[:500] + '...' if arrangement_content and len(arrangement_content) > 500 else arrangement_content or 'No arrangement data available'}

## Tool Execution Summary
**Data Arranger**: Used FileTools to save organized data to JSON
**Code Generator**: Used PythonTools and FileTools for Excel generation

## Code Generation Results  
{code_generation_content[:500] + '...' if code_generation_content and len(code_generation_content) > 500 else code_generation_content or 'No code generation results available'}

## Generated Files ({len(output_files)} files)
{chr(10).join(f"- **{file}**" for file in output_files) if output_files else "- No files generated"}

## Output Directory
πŸ“ `{self.session_output_dir}`

---
*Generated using Agno Workflows with FileTools and PythonTools integration*
*Note: Due to Gemini limitations, structured outputs were used for data extraction only*
            """
            
            # Cache final results
            self.session_state["final_results"] = results_summary
            
            return RunResponse(
                run_id=self.run_id,
                content=results_summary
            )
            
        except Exception as e:
            error_message = f"❌ Workflow failed: {str(e)}"
            logger.error(f"Financial workflow error: {e}", exc_info=True)
            return RunResponse(
                run_id=self.run_id,
                content=error_message
            )

    def get_processing_status(self) -> Dict[str, str]:
        """Get the current processing status"""
        status = {
            "extraction": "completed" if "extracted_data" in self.session_state else "pending",
            "arrangement": "completed" if "arranged_data" in self.session_state else "pending", 
            "code_generation": "completed" if "generated_code" in self.session_state else "pending",
            "final_results": "completed" if "final_results" in self.session_state else "pending"
        }
        return status