Spaces:
Sleeping
Sleeping
"""Tools to handle multimodal understandig.""" | |
import os | |
import io | |
import re | |
import requests | |
import librosa | |
import soundfile as sf | |
import pandas as pd | |
from llama_index.core.tools import FunctionTool | |
from huggingface_hub import InferenceClient | |
from transformers import pipeline | |
DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space" | |
def transcribe_audio(file_id: str) -> str: | |
""" | |
Transcribes an English audio file identfied by its id. | |
""" | |
try: | |
audio, sr = sf.read(_get_file(file_id)) | |
if sr != 16000: | |
audio = librosa.resample(audio, orig_sr=sr, target_sr=16000) | |
except: | |
return "Error: Invalid file. This file is either not an audio file or the id does not exist." | |
asr = pipeline("automatic-speech-recognition", model="openai/whisper-tiny") | |
output = asr(audio, language="en") | |
return output["text"].strip() | |
def transcribe_audio_hf(file_id: str) -> str: | |
""" | |
Transcribes an audio file identfied by its id. | |
""" | |
#audio, sr = sf.read(_get_file(file_id)) | |
try: | |
audio_bytes = _get_file(file_id).read() | |
except: | |
return "Error: Invalid file. This file is either not an audio file or the id does not exist." | |
client = InferenceClient( | |
provider="hf-inference", | |
api_key=os.getenv("HF_TOKEN"), | |
) | |
output = client.automatic_speech_recognition(audio_bytes, model="openai/whisper-small") | |
return output | |
def get_transcription_tool(): | |
return FunctionTool.from_defaults( | |
fn=transcribe_audio, | |
description="Transcribes an audio file identified by its id." | |
) | |
def answer_image_question(question: str, file_id: str) -> str: | |
""" | |
Answers questions about an image identified by its id. | |
""" | |
client = InferenceClient( | |
provider="hf-inference", | |
api_key=os.getenv("HF_TOKEN"), | |
) | |
completion = client.chat.completions.create( | |
model= "Qwen/Qwen2.5-VL-32B-Instruct", | |
messages=[ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": question | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": DEFAULT_API_URL + f"/files/{file_id}", | |
} | |
} | |
] | |
} | |
], | |
max_tokens=512, | |
) | |
return remove_think(completion.choices[0].message.content) | |
def get_image_qa_tool(): | |
return FunctionTool.from_defaults( | |
fn=answer_image_question, | |
description="Answer a question about a given image. The image is identified by a file id." | |
) | |
def read_excel(file_id: str) -> str: | |
file_io = _get_file(file_id) | |
df = pd.read_excel(file_io) | |
return df.to_markdown() | |
def get_excel_tool(): | |
return FunctionTool.from_defaults( | |
fn=read_excel, | |
description="Convert an excel file that is identified by its file id into a markdown string." | |
) | |
def analyse_excel(file_id: str) -> str: | |
file_io = _get_file(file_id) | |
df = pd.read_excel(file_io) | |
return df.describe() | |
def get_excel_analysis_tool(): | |
return FunctionTool.from_defaults( | |
fn=read_excel, | |
description="Analyse an excel file that is identified by its file id and get common statistics such as mean or max per column." | |
) | |
def read_csv(file_id: str) -> str: | |
file_io = _get_file(file_id) | |
df = pd.read_csv(file_io) | |
return df.to_markdown() | |
def get_csv_tool(): | |
return FunctionTool.from_defaults( | |
fn=read_excel, | |
description="Convert a csv file that is identified by its file id into a markdown string." | |
) | |
def analyse_csv(file_id: str) -> str: | |
file_io = _get_file(file_id) | |
df = pd.read_csv(file_io) | |
return df.describe() | |
def get_csv_analysis_tool(): | |
return FunctionTool.from_defaults( | |
fn=read_excel, | |
description="Analyse a csv file that is identified by its file id and get common statistics such as mean or max per column." | |
) | |
def watch_video(video_url: str) -> str: | |
return "You are not able to watch a Video yet. Reply with 'I don't know' to the question." | |
def get_video_tool(): | |
return FunctionTool.from_defaults( | |
fn=watch_video, | |
description="Watch a video and get a content description as a string." | |
) | |
def _get_file(task_id: str) -> io.BytesIO: | |
res = requests.get(DEFAULT_API_URL + f"/files/{task_id}") | |
if res.status_code != 200: | |
raise FileNotFoundError("Invalid file or task id.") | |
file_like = io.BytesIO(res.content) | |
return file_like | |
def remove_think(output: str) -> str: | |
"""Removes the <think> part of an LLM output.""" | |
if output: | |
return re.sub("<think>.*</think>", "", output).strip() | |
return output | |