Yago Bolivar
Refactor speech_to_text.py to implement a singleton ASR pipeline, enhance error handling, and introduce SpeechToTextTool for better integration. Update spreadsheet_tool.py to support querying and improve parsing functionality, including CSV support. Enhance video_processing_tool.py with new tasks for metadata extraction and frame extraction, while improving object detection capabilities and initialization checks.
87aa741
import os | |
import pandas as pd | |
from typing import Dict, List, Union, Tuple, Any, Optional | |
import numpy as np | |
from smolagents.tools import Tool | |
class SpreadsheetTool(Tool): | |
""" | |
Parses spreadsheet files (e.g., .xlsx) and extracts tabular data for analysis or allows querying. | |
Useful for reading, processing, and converting spreadsheet content to Python data structures. | |
""" | |
name = "spreadsheet_processor" | |
description = "Parses a spreadsheet file (e.g., .xlsx, .xls, .csv) and can perform queries. Returns extracted data or query results." | |
inputs = { | |
'file_path': {'type': 'string', 'description': 'Path to the spreadsheet file.'}, | |
'query_instructions': {'type': 'string', 'description': 'Optional. Instructions for querying the data (e.g., "Sum column A"). If None, parses the whole sheet.', 'nullable': True} | |
} | |
outputs = {'result': {'type': 'object', 'description': 'A dictionary containing parsed sheet data, query results, or an error message.'}} | |
output_type = "object" | |
def __init__(self, *args, **kwargs): | |
"""Initialize the SpreadsheetTool.""" | |
super().__init__(*args, **kwargs) | |
self.is_initialized = True | |
# Main entry point for the agent | |
def forward(self, file_path: str, query_instructions: Optional[str] = None) -> Dict[str, Any]: | |
if not os.path.exists(file_path): | |
return {"error": f"File not found: {file_path}"} | |
# Determine file type for appropriate parsing | |
_, file_extension = os.path.splitext(file_path) | |
file_extension = file_extension.lower() | |
parsed_data = None | |
if file_extension in ['.xlsx', '.xls']: | |
parsed_data = self._parse_excel(file_path) | |
elif file_extension == '.csv': | |
parsed_data = self._parse_csv(file_path) | |
else: | |
return {"error": f"Unsupported file type: {file_extension}. Supported types: .xlsx, .xls, .csv"} | |
if parsed_data.get("error"): | |
return parsed_data # Return error from parsing step | |
if query_instructions: | |
return self._query_data(parsed_data, query_instructions) | |
else: | |
# If no query, return the parsed data and summary | |
return { | |
"parsed_sheets": parsed_data.get("sheets"), | |
"summary": parsed_data.get("summary"), | |
"message": "Spreadsheet parsed successfully." | |
} | |
def _parse_excel(self, file_path: str) -> Dict[str, Any]: | |
"""Parse an Excel spreadsheet and extract useful information.""" | |
try: | |
excel_file = pd.ExcelFile(file_path) | |
sheet_names = excel_file.sheet_names | |
sheets = {} | |
for sheet_name in sheet_names: | |
sheets[sheet_name] = pd.read_excel(excel_file, sheet_name=sheet_name) | |
summary = self._create_summary(sheets) | |
return {"sheets": sheets, "sheet_names": sheet_names, "summary": summary, "error": None} | |
except Exception as e: | |
return {"error": f"Error parsing Excel spreadsheet: {str(e)}"} | |
def _parse_csv(self, file_path: str) -> Dict[str, Any]: | |
"""Parse a CSV file.""" | |
try: | |
df = pd.read_csv(file_path) | |
# CSVs don't have multiple sheets, so we adapt the structure | |
sheet_name = os.path.splitext(os.path.basename(file_path))[0] | |
sheets = {sheet_name: df} | |
summary = self._create_summary(sheets) | |
return {"sheets": sheets, "sheet_names": [sheet_name], "summary": summary, "error": None} | |
except Exception as e: | |
return {"error": f"Error parsing CSV file: {str(e)}"} | |
def _create_summary(self, sheets_dict: Dict[str, pd.DataFrame]) -> Dict[str, Any]: | |
"""Create a summary of the spreadsheet contents.""" | |
summary = {} | |
for sheet_name, df in sheets_dict.items(): | |
summary[sheet_name] = { | |
"shape": df.shape, | |
"columns": df.columns.tolist(), | |
"numeric_columns": df.select_dtypes(include=[np.number]).columns.tolist(), | |
"text_columns": df.select_dtypes(include=['object']).columns.tolist(), | |
"has_nulls": df.isnull().any().any(), | |
"first_few_rows": df.head(3).to_dict('records') | |
} | |
return summary | |
# Renamed from query_data to _query_data and adjusted arguments | |
def _query_data(self, parsed_data_dict: Dict[str, Any], query_instructions: str) -> Dict[str, Any]: | |
""" | |
Execute a query on the spreadsheet data based on instructions. | |
This is a simplified placeholder. Real implementation would need robust query parsing. | |
""" | |
if parsed_data_dict.get("error"): | |
return {"error": parsed_data_dict["error"]} | |
sheets = parsed_data_dict.get("sheets") | |
if not sheets: | |
return {"error": "No sheets data available for querying."} | |
# Placeholder for actual query logic. | |
# This would involve parsing `query_instructions` (e.g., using regex, NLP, or a DSL) | |
# and applying pandas operations. | |
# For now, let's return a message indicating the query was received and basic info. | |
results = {} | |
explanation = f"Query instruction received: '{query_instructions}'. Advanced query execution is not fully implemented. " \ | |
f"Returning summary of available sheets: {list(sheets.keys())}." | |
# Example: if query asks for sum, try to sum first numeric column of first sheet | |
if "sum" in query_instructions.lower(): | |
first_sheet_name = next(iter(sheets)) | |
df = sheets[first_sheet_name] | |
numeric_cols = df.select_dtypes(include=[np.number]).columns | |
if not numeric_cols.empty: | |
col_to_sum = numeric_cols[0] | |
try: | |
total_sum = df[col_to_sum].sum() | |
results[f'{first_sheet_name}_{col_to_sum}_sum'] = total_sum | |
explanation += f" Example sum of column '{col_to_sum}' in sheet '{first_sheet_name}': {total_sum}." | |
except Exception as e: | |
explanation += f" Could not perform example sum: {e}." | |
else: | |
explanation += " No numeric columns found for example sum." | |
return {"query_results": results, "explanation": explanation, "original_query": query_instructions} | |
# Example usage (for direct testing) | |
if __name__ == '__main__': | |
tool = SpreadsheetTool() | |
# Create dummy files for testing | |
dummy_excel_file = "dummy_test.xlsx" | |
dummy_csv_file = "dummy_test.csv" | |
# Create a dummy Excel file | |
df_excel = pd.DataFrame({ | |
'colA': [1, 2, 3, 4, 5], | |
'colB': ['apple', 'banana', 'cherry', 'date', 'elderberry'], | |
'colC': [10.1, 20.2, 30.3, 40.4, 50.5] | |
}) | |
with pd.ExcelWriter(dummy_excel_file) as writer: | |
df_excel.to_excel(writer, sheet_name='Sheet1', index=False) | |
df_excel.head(2).to_excel(writer, sheet_name='Sheet2', index=False) | |
# Create a dummy CSV file | |
df_csv = pd.DataFrame({ | |
'id': [101, 102, 103], | |
'product': ['widget', 'gadget', 'gizmo'], | |
'price': [19.99, 29.50, 15.00] | |
}) | |
df_csv.to_csv(dummy_csv_file, index=False) | |
print("--- Test 1: Parse Excel file (no query) ---") | |
result1 = tool.forward(file_path=dummy_excel_file) | |
print(result1) | |
assert "error" not in result1 or result1["error"] is None | |
assert "Sheet1" in result1["parsed_sheets"] | |
print("\n--- Test 2: Parse CSV file (no query) ---") | |
result2 = tool.forward(file_path=dummy_csv_file) | |
print(result2) | |
assert "error" not in result2 or result2["error"] is None | |
assert dummy_csv_file.split('.')[0] in result2["parsed_sheets"] | |
print("\n--- Test 3: Query Excel file (simple sum example) ---") | |
result3 = tool.forward(file_path=dummy_excel_file, query_instructions="sum colA from Sheet1") | |
print(result3) | |
assert "error" not in result3 or result3["error"] is None | |
assert "query_results" in result3 | |
if result3.get("query_results"): | |
assert "Sheet1_colA_sum" in result3["query_results"] | |
assert result3["query_results"]["Sheet1_colA_sum"] == 15 | |
print("\n--- Test 4: File not found ---") | |
result4 = tool.forward(file_path="non_existent_file.xlsx") | |
print(result4) | |
assert result4["error"] is not None | |
assert "File not found" in result4["error"] | |
print("\n--- Test 5: Unsupported file type ---") | |
dummy_txt_file = "dummy_test.txt" | |
with open(dummy_txt_file, "w") as f: | |
f.write("hello") | |
result5 = tool.forward(file_path=dummy_txt_file) | |
print(result5) | |
assert result5["error"] is not None | |
assert "Unsupported file type" in result5["error"] | |
os.remove(dummy_txt_file) | |
# Clean up dummy files | |
if os.path.exists(dummy_excel_file): | |
os.remove(dummy_excel_file) | |
if os.path.exists(dummy_csv_file): | |
os.remove(dummy_csv_file) | |
print("\nSpreadsheetTool tests completed.") |