Spaces:
Sleeping
Sleeping
import os | |
import json | |
import re | |
from typing import Tuple, Dict, Any | |
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM # Import AutoTokenizer and AutoModelForSeq2SeqLM | |
from tools.asr_tool import transcribe_audio | |
from tools.excel_tool import analyze_excel | |
from tools.search_tool import search_duckduckgo | |
from tools.math_tool import calculate_math # Make sure to import your math tool | |
class GaiaAgent: | |
def __init__(self): | |
token = os.getenv("HUGGINGFACEHUB_API_TOKEN") | |
if not token: | |
raise ValueError("Missing HUGGINGFACEHUB_API_TOKEN environment variable.") | |
# Specify the model and load tokenizer and model separately for better control | |
model_name = "google/flan-t5-large" | |
self.tokenizer = AutoTokenizer.from_pretrained(model_name, token=token) | |
self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name, token=token) | |
# Use the pipeline with the loaded model and tokenizer | |
self.llm = pipeline( | |
"text2text-generation", | |
model=self.model, | |
tokenizer=self.tokenizer, | |
device="cpu", # Consider "cuda" if you have a GPU | |
max_new_tokens=256, | |
do_sample=False, # Set to True if you want to use temperature and top_p/k | |
# temperature=0.1, # Removed, as it's not a valid pipeline initialization flag here | |
) | |
self.system_prompt = """You are a general AI assistant. I will ask you a question. Report your thoughts, and finish your answer with the following template: FINAL ANSWER: [YOUR FINAL ANSWER]. YOUR FINAL ANSWER should be a number OR as few words as possible OR a comma separated list of numbers and/or strings. If you are asked for a number, don't use comma to write your number neither use units such as $ or percent sign unless specified otherwise. If you are asked for a string, don't use articles, neither abbreviations (e.g. for cities), and write the digits in plain text unless specified otherwise. If you are asked for a comma separated list, apply the above rules depending of whether the element to be put in the list is a number or a string.""" | |
def extract_final_answer(self, text: str) -> str: | |
"""Extrahera det slutliga svaret från modellens output""" | |
final_answer_match = re.search(r'FINAL ANSWER:\s*(.+?)(?:\n|$)', text, re.IGNORECASE) | |
if final_answer_match: | |
return final_answer_match.group(1).strip() | |
sentences = text.strip().split('\n') | |
return sentences[-1].strip() if sentences else text.strip() | |
def needs_tool(self, question: str) -> Tuple[str, bool]: | |
"""Bestäm vilket verktyg som behövs baserat på frågan""" | |
question_lower = question.lower() | |
if any(ext in question_lower for ext in ['.mp3', '.wav', '.m4a', '.flac']): | |
return 'audio', True | |
if any(ext in question_lower for ext in ['.xlsx', '.xls', '.csv']): | |
return 'excel', True | |
if any(keyword in question_lower for keyword in ['search', 'find', 'lookup', 'http', 'www.', 'wikipedia', 'albums', 'discography', 'published', 'website']): | |
return 'search', True | |
if any(keyword in question_lower for keyword in ['calculate', 'compute', 'sum', 'average', 'count', 'what is', 'solve']): | |
return 'math', True | |
return 'llm', False | |
def process_with_tools(self, question: str, tool_type: str) -> Tuple[str, str]: | |
"""Bearbeta frågan med specifika verktyg""" | |
trace_log = f"Detected {tool_type} task. Processing...\n" | |
try: | |
if tool_type == 'audio': | |
audio_files = re.findall(r'\b[\w\-_]+\.(mp3|wav|m4a|flac)\b', question, re.IGNORECASE) | |
if audio_files: | |
result = transcribe_audio(audio_files[0]) | |
trace_log += f"Audio transcription: {result}\n" | |
return result, trace_log | |
else: | |
return "No audio file mentioned in the question.", trace_log | |
elif tool_type == 'excel': | |
excel_files = re.findall(r'\b[\w\-_]+\.(xlsx|xls|csv)\b', question, re.IGNORECASE) | |
if excel_files: | |
result = analyze_excel(excel_files[0]) | |
trace_log += f"Excel analysis: {result}\n" | |
return result, trace_log | |
else: | |
return "No Excel file mentioned in the question.", trace_log | |
elif tool_type == 'search': | |
search_query = question # This might need refinement to extract just the search query | |
result = search_duckduckgo(search_query) | |
trace_log += f"Search results: {result}\n" | |
return result, trace_log | |
elif tool_type == 'math': | |
math_expression_match = re.search(r'calculate (.+)', question, re.IGNORECASE) | |
if math_expression_match: | |
expression = math_expression_match.group(1).strip() | |
result = calculate_math(expression) | |
trace_log += f"Math calculation: {result}\n" | |
return result, trace_log | |
else: | |
return "No clear mathematical expression found in the question.", trace_log | |
except Exception as e: | |
trace_log += f"Error using {tool_type} tool: {str(e)}\n" | |
return f"Error: {str(e)}", trace_log | |
return "No valid input found for tool", trace_log | |
def reason_with_llm(self, question: str, context: str = "") -> Tuple[str, str]: | |
"""Använd LLM för reasoning med kontext""" | |
trace_log = "Using LLM for reasoning...\n" | |
# Combine system prompt, context, and question, ensuring it fits token limit | |
if context: | |
prompt = f"{self.system_prompt}\n\nContext: {context}\n\nQuestion: {question}\n\nPlease analyze this step by step and provide your final answer." | |
else: | |
prompt = f"{self.system_prompt}\n\nQuestion: {question}\n\nPlease analyze this step by step and provide your final answer." | |
# Tokenize and truncate if necessary | |
inputs = self.tokenizer(prompt, return_tensors="pt", truncation=True, max_length=self.tokenizer.model_max_length) | |
try: | |
# Generate response using the model's generate method for more control | |
# You can add generation arguments here, e.g., temperature, top_k, etc. | |
outputs = self.model.generate( | |
inputs.input_ids, | |
max_new_tokens=256, | |
do_sample=False, # Set to True to enable temperature and other sampling parameters | |
# temperature=0.1, # Example: Only if do_sample is True | |
) | |
response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
trace_log += f"LLM response: {response}\n" | |
return response, trace_log | |
except Exception as e: | |
trace_log += f"Error with LLM: {str(e)}\n" | |
return f"Error: {str(e)}", trace_log | |
def __call__(self, question: str) -> Tuple[str, str]: | |
"""Huvudfunktion som bearbetar frågan""" | |
total_trace = f"Processing question: {question}\n" | |
tool_type, needs_tool = self.needs_tool(question) | |
total_trace += f"Tool needed: {tool_type}\n" | |
context = "" | |
if needs_tool and tool_type != 'llm': | |
tool_result, tool_trace = self.process_with_tools(question, tool_type) | |
total_trace += tool_trace | |
context = tool_result | |
llm_response, llm_trace = self.reason_with_llm(question, context) | |
total_trace += llm_trace | |
final_answer = self.extract_final_answer(llm_response) | |
total_trace += f"Final answer extracted: {final_answer}\n" | |
return final_answer, total_trace | |