Yago Bolivar
Refactor speech_to_text.py to implement a singleton ASR pipeline, enhance error handling, and introduce SpeechToTextTool for better integration. Update spreadsheet_tool.py to support querying and improve parsing functionality, including CSV support. Enhance video_processing_tool.py with new tasks for metadata extraction and frame extraction, while improving object detection capabilities and initialization checks.
87aa741
import re | |
from smolagents.tools import Tool | |
from typing import Dict, List, Optional | |
# Original parsing function | |
def _parse_markdown_table_string(markdown_text: str) -> Optional[Dict[str, List[str]]]: | |
""" | |
Parses the first valid Markdown table found in a string. | |
Returns a dictionary (headers as keys, lists of cell content as values) | |
or None if no valid table is found. | |
Useful for converting markdown tables into Python data structures for further analysis. | |
""" | |
lines = [line.rstrip() for line in markdown_text.split('\n') if line.strip()] | |
n = len(lines) | |
i = 0 | |
while i < n - 1: | |
header_line = lines[i].strip() | |
sep_line = lines[i+1].strip() | |
# Header and separator must start and end with | | |
if not (header_line.startswith('|') and header_line.endswith('|')): | |
i += 1 | |
continue | |
if not (sep_line.startswith('|') and sep_line.endswith('|')): | |
i += 1 | |
continue | |
# Split header and separator | |
headers = [h.strip() for h in header_line.strip('|').split('|')] | |
seps = [s.strip() for s in sep_line.strip('|').split('|')] | |
if len(headers) != len(seps): | |
i += 1 | |
continue | |
# Separator must have at least one '-' in each cell, and only -, :, or spaces | |
valid_sep = all('-' in s and all(c in '-: ' for c in s) for s in seps) | |
if not valid_sep: | |
i += 1 | |
continue | |
# Found a table, now parse data rows | |
# Special handling: if the first header is a row label (e.g., '*'), treat first cell of each row as row label, not data | |
has_row_labels = headers[0] not in ('',) | |
table = {h: [] for h in headers} | |
j = i + 2 | |
while j < n: | |
row = lines[j].strip() | |
if not (row.startswith('|') and row.endswith('|')): | |
break | |
cells = [c.strip() for c in row.strip('|').split('|')] | |
if len(cells) != len(headers): | |
j += 1 | |
continue | |
if has_row_labels and len(headers) > 1: | |
# First cell is row label, rest are data | |
table[headers[0]].append(cells[0]) | |
for k, h in enumerate(headers[1:], 1): | |
# Ensure the key exists and is a list | |
if h not in table or not isinstance(table[h], list): | |
table[h] = [] # Initialize if not present or not a list | |
table[h].append(cells[k]) | |
else: | |
for k, h in enumerate(headers): | |
if h not in table or not isinstance(table[h], list): | |
table[h] = [] | |
table[h].append(cells[k]) | |
j += 1 | |
return table | |
return None | |
class MarkdownTableParserTool(Tool): | |
""" | |
Parses a Markdown table from a given text string. | |
Useful for converting markdown tables into Python data structures for further analysis. | |
""" | |
name = "markdown_table_parser" | |
description = "Parses the first valid Markdown table found in a string and returns it as a dictionary." | |
inputs = {'markdown_text': {'type': 'string', 'description': 'The string containing the Markdown table.'}} | |
outputs = {'parsed_table': {'type': 'object', 'description': 'A dictionary representing the table (headers as keys, lists of cell content as values), or null if no table is found.'}} | |
output_type = "object" # Or dict/None | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.is_initialized = True | |
def forward(self, markdown_text: str) -> Optional[Dict[str, List[str]]]: | |
""" | |
Wrapper for the _parse_markdown_table_string function. | |
""" | |
return _parse_markdown_table_string(markdown_text) | |
# Expose the original function name if other parts of the system expect it (optional) | |
parse_markdown_table = _parse_markdown_table_string | |
if __name__ == '__main__': | |
tool_instance = MarkdownTableParserTool() | |
example_table = """ | |
|*|a|b|c|d|e| | |
|---|---|---|---|---|---| | |
|a|a|b|c|b|d| | |
|b|b|c|a|e|c| | |
|c|c|a|b|b|a| | |
|d|b|e|b|e|d| | |
|e|d|b|a|d|c| | |
""" | |
parsed = tool_instance.forward(example_table) | |
print("Parsed GAIA example:") | |
if parsed: | |
for header, column_data in parsed.items(): | |
print(f"Header: {header}, Data: {column_data}") | |
else: | |
print("Failed to parse table.") | |
example_table_2 = """ | |
Some text before | |
| Name | Age | City | | |
|-------|-----|-----------| | |
| Alice | 30 | New York | | |
| Bob | 24 | Paris | | |
| Carol | 45 | London | | |
Some text after | |
""" | |
parsed_2 = tool_instance.forward(example_table_2) | |
print("\\nParsed Table 2 (with surrounding text):") | |
if parsed_2: | |
for header, column_data in parsed_2.items(): | |
print(f"Header: {header}, Data: {column_data}") | |
else: | |
print("Failed to parse table 2.") | |
empty_table_with_header = """ | |
| Header1 | Header2 | | |
|---------|---------| | |
""" | |
parsed_empty = tool_instance.forward(empty_table_with_header) | |
print("\\nParsed Empty Table with Header:") | |
if parsed_empty: | |
for header, column_data in parsed_empty.items(): | |
print(f"Header: {header}, Data: {column_data}") | |
else: | |
print("Failed to parse table (empty with header).") # Corrected message | |
malformed_table = """ | |
| Header1 | Header2 | |
|--- ---| | |
| cell1 | cell2 | | |
""" | |
parsed_malformed = tool_instance.forward(malformed_table) | |
print("\\nParsed Malformed Table:") | |
if parsed_malformed: | |
for header, column_data in parsed_malformed.items(): | |
print(f"Header: {header}, Data: {column_data}") | |
else: | |
print("Failed to parse malformed table.") | |
no_table_text = "This is just some text without a table." | |
parsed_no_table = tool_instance.forward(no_table_text) | |
print("\\nParsed Text Without Table:") | |
if parsed_no_table: | |
print("Error: Should not have parsed a table.") | |
else: | |
print("Correctly found no table.") | |