Spaces:
Sleeping
Sleeping
""" | |
Multi-modal agent for processing different file types and answering questions. | |
""" | |
import os | |
import json | |
import logging | |
from typing import Dict, Any, List, Optional, Tuple | |
from agent.tools.file_handlers import extract_file_content | |
from agent.utils.question_analyzer import QuestionAnalyzer | |
from agent.utils.data_processor import DataProcessor | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
logger = logging.getLogger('MultiModalAgent') | |
class MultiModalAgent: | |
""" | |
Agent for processing different file types and answering questions. | |
""" | |
def __init__(self, resource_dir: str = 'resource'): | |
""" | |
Initialize the agent. | |
Args: | |
resource_dir: Directory containing resource files | |
""" | |
logger.info("Initializing MultiModalAgent") | |
self.resource_dir = resource_dir | |
self.question_analyzer = QuestionAnalyzer(resource_dir) | |
self.data_processor = DataProcessor() | |
# Cache for file content to avoid re-processing | |
self.file_content_cache = {} | |
# Cache for answers | |
self.answer_cache = {} | |
def __call__(self, question: str, task_id: Optional[str] = None) -> str: | |
""" | |
Process a question and return an answer. | |
Args: | |
question: The question to answer | |
task_id: The task ID (optional) | |
Returns: | |
Answer to the question | |
""" | |
logger.info(f"Processing question: {question[:100]}...") | |
if task_id: | |
logger.info(f"Task ID: {task_id}") | |
# Check answer cache | |
cache_key = f"{task_id}:{question}" if task_id else question | |
if cache_key in self.answer_cache: | |
logger.info("Answer found in cache") | |
return self.answer_cache[cache_key] | |
try: | |
# Analyze the question | |
analysis = self.question_analyzer.analyze_question(question, task_id) | |
logger.info(f"Question analysis: {analysis}") | |
# Handle general questions that don't require file processing | |
if not analysis.get('file_path'): | |
logger.info("No file reference found in question, trying to answer directly") | |
direct_answer = self._answer_without_file(question) | |
if direct_answer: | |
self.answer_cache[cache_key] = direct_answer | |
return direct_answer | |
# If direct answering failed, try to find a file in the resource directory | |
logger.info("Direct answering failed, looking for relevant files") | |
analysis['file_path'] = self._find_most_relevant_file(question) | |
if not analysis['file_path']: | |
logger.warning("No relevant file found for the question") | |
# List available files for debugging | |
try: | |
files = os.listdir(self.resource_dir) | |
logger.info(f"Available files in {self.resource_dir}: {files}") | |
except Exception as e: | |
logger.error(f"Error listing files in resource directory: {e}") | |
# Check if resource directory exists | |
if not os.path.exists(self.resource_dir): | |
logger.error(f"Resource directory does not exist: {self.resource_dir}") | |
return f"Error: Resource directory not found at {self.resource_dir}. Please check the path." | |
return "I couldn't find a relevant file to answer this question. Please provide more context or specify a file." | |
# Extract content from the file | |
file_path = analysis['file_path'] | |
if file_path in self.file_content_cache: | |
content, handler = self.file_content_cache[file_path] | |
else: | |
content, handler = extract_file_content(file_path, self.resource_dir) | |
if content is not None: | |
self.file_content_cache[file_path] = (content, handler) | |
if content is None: | |
logger.error(f"Failed to extract content from file: {file_path}") | |
return "I couldn't extract content from the specified file." | |
# Process the content based on file type | |
answer = self._process_content(content, handler, question) | |
# Cache the answer | |
self.answer_cache[cache_key] = answer | |
return answer | |
except Exception as e: | |
logger.exception(f"Error processing question: {e}") | |
return f"An error occurred while processing your question: {e}" | |
def _answer_without_file(self, question: str) -> Optional[str]: | |
""" | |
Try to answer the question without using a file. | |
Args: | |
question: The question to answer | |
Returns: | |
Answer to the question, or None if the question can't be answered directly | |
""" | |
# This is a simple implementation that can be expanded based on your needs | |
# Check if the question is asking for metadata about the resource directory | |
if 'how many files' in question.lower() or 'number of files' in question.lower(): | |
try: | |
file_count = len(os.listdir(self.resource_dir)) | |
return f"There are {file_count} files in the resource directory." | |
except Exception as e: | |
logger.error(f"Error counting files: {e}") | |
return None | |
# Check if the question is asking about file types | |
file_types_patterns = [ | |
'what file types', 'which file types', 'what kinds of files', | |
'which kinds of files', 'what formats', 'which formats' | |
] | |
if any(pattern in question.lower() for pattern in file_types_patterns): | |
try: | |
files = os.listdir(self.resource_dir) | |
extensions = set() | |
for file in files: | |
_, ext = os.path.splitext(file) | |
if ext: # Skip files without extension | |
extensions.add(ext) | |
if extensions: | |
extensions_list = sorted(list(extensions)) | |
return f"The resource directory contains files with the following extensions: {', '.join(extensions_list)}" | |
else: | |
return "The resource directory doesn't contain any files with extensions." | |
except Exception as e: | |
logger.error(f"Error analyzing file types: {e}") | |
return None | |
return None | |
def _find_most_relevant_file(self, question: str) -> Optional[str]: | |
""" | |
Find the most relevant file for a question. | |
Args: | |
question: The question to answer | |
Returns: | |
Path to the most relevant file, or None if no relevant file is found | |
""" | |
try: | |
# Get all files in the resource directory | |
files = [ | |
os.path.join(self.resource_dir, f) | |
for f in os.listdir(self.resource_dir) | |
if os.path.isfile(os.path.join(self.resource_dir, f)) | |
] | |
if not files: | |
logger.warning("No files found in the resource directory") | |
return None | |
# Extract keywords from the question | |
keywords = set(self.question_analyzer._extract_keywords(question)) | |
# Calculate relevance scores for each file | |
scores = [] | |
for file_path in files: | |
score = 0 | |
file_name = os.path.basename(file_path) | |
# Score based on file name | |
for keyword in keywords: | |
if keyword.lower() in file_name.lower(): | |
score += 2 # Higher weight for filename matches | |
# Score based on file extension | |
_, ext = os.path.splitext(file_path) | |
ext = ext.lower() | |
# Check if the question mentions the file type | |
if 'excel' in question.lower() or 'spreadsheet' in question.lower() or 'xlsx' in question.lower(): | |
if ext in ['.xlsx', '.xls']: | |
score += 3 | |
elif 'csv' in question.lower(): | |
if ext == '.csv': | |
score += 3 | |
elif 'text' in question.lower() or 'txt' in question.lower(): | |
if ext == '.txt': | |
score += 3 | |
elif 'pdf' in question.lower(): | |
if ext == '.pdf': | |
score += 3 | |
elif 'image' in question.lower() or 'picture' in question.lower() or 'photo' in question.lower(): | |
if ext in ['.jpg', '.jpeg', '.png', '.gif', '.bmp']: | |
score += 3 | |
elif 'word' in question.lower() or 'document' in question.lower() or 'docx' in question.lower(): | |
if ext == '.docx': | |
score += 3 | |
elif 'powerpoint' in question.lower() or 'presentation' in question.lower() or 'slides' in question.lower() or 'pptx' in question.lower(): | |
if ext == '.pptx': | |
score += 3 | |
elif 'json' in question.lower(): | |
if ext in ['.json', '.jsonld']: | |
score += 3 | |
elif 'zip' in question.lower() or 'archive' in question.lower(): | |
if ext == '.zip': | |
score += 3 | |
elif 'python' in question.lower() or 'py' in question.lower() or 'code' in question.lower() or 'script' in question.lower(): | |
if ext == '.py': | |
score += 3 | |
elif 'pdb' in question.lower() or 'protein' in question.lower(): | |
if ext == '.pdb': | |
score += 3 | |
scores.append((file_path, score)) | |
# Sort by score in descending order | |
scores.sort(key=lambda x: x[1], reverse=True) | |
# Return the most relevant file if it has a non-zero score | |
if scores and scores[0][1] > 0: | |
logger.info(f"Found relevant file: {scores[0][0]} with score {scores[0][1]}") | |
return scores[0][0] | |
# If no relevant file is found based on the question, try to default to the metadata file | |
if not scores or scores[0][1] == 0: | |
# Look for metadata file as a fallback | |
metadata_path = os.path.join(self.resource_dir, 'metadata.jsonl') | |
if os.path.exists(metadata_path): | |
logger.info("No specific file found, defaulting to metadata.jsonl") | |
return metadata_path | |
# If we get here, no relevant file was found | |
logger.warning("No relevant file found for the question") | |
return None | |
except Exception as e: | |
logger.error(f"Error finding relevant file: {e}") | |
return None | |
def _process_content(self, content: Any, handler: Any, question: str) -> str: | |
""" | |
Process the content based on file type. | |
Args: | |
content: Extracted content from the file | |
handler: File handler used to extract the content | |
question: The question to answer | |
Returns: | |
Answer to the question | |
""" | |
try: | |
handler_type = type(handler).__name__ | |
if handler_type == 'ExcelHandler': | |
return self.data_processor.process_excel_data(content, question) | |
elif handler_type == 'CSVHandler': | |
return self.data_processor.process_csv_data(content, question) | |
elif handler_type == 'TextHandler': | |
return self.data_processor.process_text_data(content, question) | |
elif handler_type == 'PDFHandler': | |
return self.data_processor.process_pdf_data(content, question) | |
elif handler_type == 'ImageHandler': | |
return self.data_processor.process_image_metadata(content, question) | |
elif handler_type == 'DocxHandler': | |
return self.data_processor.process_docx_data(content, question) | |
elif handler_type == 'PptxHandler': | |
return self.data_processor.process_pptx_data(content, question) | |
elif handler_type == 'JsonHandler': | |
return self.data_processor.process_json_data(content, question) | |
elif handler_type == 'ZipHandler': | |
return self.data_processor.process_zip_data(content, question) | |
elif handler_type == 'PdbHandler': | |
return self.data_processor.process_pdb_data(content, question) | |
elif handler_type == 'PythonHandler': | |
return self.data_processor.process_python_data(content, question) | |
elif handler_type == 'JsonlHandler': | |
return self.data_processor.process_jsonl_data(content, question) | |
else: | |
logger.warning(f"Unknown handler type: {handler_type}") | |
return f"I don't know how to process content from a {handler_type}." | |
except Exception as e: | |
logger.exception(f"Error processing content: {e}") | |
return f"An error occurred while processing the file content: {e}" | |