wt002 commited on
Commit
84992c5
·
verified ·
1 Parent(s): 3985578

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +179 -166
app.py CHANGED
@@ -1,210 +1,223 @@
1
  import os
2
  import gradio as gr
3
  import requests
 
4
  import pandas as pd
5
- from smolagents import CodeAgent, OpenAIServerModel, DuckDuckGoSearchTool, VisitWebpageTool, tool, \
6
- FinalAnswerTool, PythonInterpreterTool, SpeechToTextTool, ToolCallingAgent
7
- import yaml
8
- import importlib
 
9
  from io import BytesIO
10
- import tempfile
11
- import base64
12
  from youtube_transcript_api import YouTubeTranscriptApi
13
- from youtube_transcript_api._errors import TranscriptsDisabled, NoTranscriptFound, VideoUnavailable
14
- from urllib.parse import urlparse, parse_qs
15
- import json
16
- import whisper
17
- import re
18
-
19
-
20
 
21
  # (Keep Constants as is)
22
  # --- Constants ---
23
  DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
24
 
 
 
25
 
26
- @tool
27
- def transcribe_audio_file(file_path: str) -> str:
28
- """
29
- Transcribes a local MP3 audio file using Whisper.
30
- Args:
31
- file_path: Full path to the .mp3 audio file.
32
- Returns:
33
- A JSON-formatted string containing either the transcript or an error message.
34
- {
35
- "success": true,
36
- "transcript": [
37
- {"start": 0.0, "end": 5.2, "text": "Hello and welcome"},
38
- ...
 
 
 
 
 
 
 
 
39
  ]
 
 
 
 
 
 
 
 
 
 
 
 
 
40
  }
41
- OR
42
- {
43
- "success": false,
44
- "error": "Reason why transcription failed"
45
- }
46
- """
47
- try:
48
- if not os.path.exists(file_path):
49
- return json.dumps({"success": False, "error": "File does not exist."})
50
 
51
- if not file_path.lower().endswith(".mp3"):
52
- return json.dumps({"success": False, "error": "Invalid file type. Only MP3 files are supported."})
53
 
54
- model = whisper.load_model("base") # You can use 'tiny', 'base', 'small', 'medium', or 'large'
55
- result = model.transcribe(file_path, verbose=False, word_timestamps=False)
56
 
57
- transcript_data = [
58
- {
59
- "start": segment["start"],
60
- "end": segment["end"],
61
- "text": segment["text"].strip()
62
- }
63
- for segment in result["segments"]
64
- ]
65
 
66
- return json.dumps({"success": True, "transcript": transcript_data})
 
67
 
68
- except Exception as e:
69
- return json.dumps({"success": False, "error": str(e)})
 
 
 
 
 
70
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
 
72
  @tool
73
- def get_youtube_transcript(video_url: str) -> str:
74
  """
75
- Retrieves the transcript from a YouTube video URL, including timestamps.
76
- This tool fetches the English transcript for a given YouTube video. Automatically generated subtitles
77
- are also supported. The result includes each snippet's start time, duration, and text.
78
  Args:
79
- video_url: The full URL of the YouTube video (e.g., https://www.youtube.com/watch?v=12345)
 
80
  Returns:
81
- A JSON-formatted string containing either the transcript with timestamps or an error message.
82
  {
83
- "success": true,
84
- "transcript": [
85
- {"start": 0.0, "duration": 1.54, "text": "Hey there"},
86
- {"start": 1.54, "duration": 4.16, "text": "how are you"},
87
  ...
88
- ]
89
- }
90
- OR
91
- {
92
- "success": false,
93
- "error": "Reason why the transcript could not be retrieved"
94
  }
95
  """
96
- try:
97
- # Extract video ID from URL
98
- parsed_url = urlparse(video_url)
99
- query_params = parse_qs(parsed_url.query)
100
- video_id = query_params.get("v", [None])[0]
101
-
102
- if not video_id:
103
- return json.dumps({"success": False, "error": "Invalid YouTube URL. Could not extract video ID."})
104
-
105
- fetched_transcript = YouTubeTranscriptApi().fetch(video_id)
106
- transcript_data = [
107
- {
108
- "start": snippet.start,
109
- "duration": snippet.duration,
110
- "text": snippet.text
111
- }
112
- for snippet in fetched_transcript
113
- ]
114
-
115
- return json.dumps({"success": True, "transcript": transcript_data})
116
-
117
- except VideoUnavailable:
118
- return json.dumps({"success": False, "error": "The video is unavailable."})
119
- except TranscriptsDisabled:
120
- return json.dumps({"success": False, "error": "Transcripts are disabled for this video."})
121
- except NoTranscriptFound:
122
- return json.dumps({"success": False, "error": "No transcript found for this video."})
123
- except Exception as e:
124
- return json.dumps({"success": False, "error": str(e)})
125
 
126
- # --- Basic Agent Definition ---
127
- # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------
128
- class BasicAgent:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
129
 
130
- def __init__(self):
131
- model = OpenAIServerModel(api_key=os.environ.get("OPENAI_API_KEY"), model_id="gpt-4o")
 
 
 
 
132
 
133
- self.code_agent = CodeAgent(
134
- tools=[PythonInterpreterTool(), DuckDuckGoSearchTool(), VisitWebpageTool(), transcribe_audio_file,
135
- get_youtube_transcript,
136
- FinalAnswerTool()],
137
- model=model,
138
- max_steps=20,
139
- name="hf_agent_course_final_assignment_solver",
140
- prompt_templates=yaml.safe_load(
141
- importlib.resources.files("prompts").joinpath("code_agent.yaml").read_text()
142
- )
143
 
144
- )
145
- print("BasicAgent initialized.")
146
 
147
- def __call__(self, task_id: str, question: str, file_name: str) -> str:
148
- if file_name:
149
- question = self.enrich_question_with_associated_file_details(task_id, question, file_name)
 
 
 
 
 
 
150
 
151
- final_result = self.code_agent.run(question)
152
 
153
- # Extract text after "FINAL ANSWER:" (case-insensitive, and trims whitespace)
154
- match = re.search(r'final answer:\s*(.*)', str(final_result), re.IGNORECASE | re.DOTALL)
155
- if match:
156
- return match.group(1).strip()
 
 
 
 
157
 
158
- # Fallback in case the pattern is not found
159
- return str(final_result).strip()
160
 
161
- def enrich_question_with_associated_file_details(self, task_id:str, question: str, file_name: str) -> str:
162
- api_url = DEFAULT_API_URL
163
- get_associated_files_url = f"{api_url}/files/{task_id}"
164
- response = requests.get(get_associated_files_url, timeout=15)
165
- response.raise_for_status()
 
 
 
166
 
167
- if file_name.endswith(".mp3"):
168
- with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
169
- tmp_file.write(response.content)
170
- file_path = tmp_file.name
171
- return question + "\n\nMentioned .mp3 file local path is: " + file_path
172
- elif file_name.endswith(".py"):
173
- file_content = response.text
174
- return question + "\n\nBelow is mentioned Python file:\n\n```python\n" + file_content + "\n```\n"
175
- elif file_name.endswith(".xlsx"):
176
- xlsx_io = BytesIO(response.content)
177
- df = pd.read_excel(xlsx_io)
178
- file_content = df.to_csv(index=False)
179
- return question + "\n\nBelow is mentioned excel file in CSV format:\n\n```csv\n" + file_content + "\n```\n"
180
- elif file_name.endswith(".png"):
181
- base64_str = base64.b64encode(response.content).decode('utf-8')
182
- return question + "\n\nBelow is the .png image in base64 format:\n\n```base64\n" + base64_str + "\n```\n"
183
-
184
-
185
- def enrich_question_with_associated_file_details(self, task_id:str, question: str, file_name: str) -> str:
186
- api_url = DEFAULT_API_URL
187
- get_associated_files_url = f"{api_url}/files/{task_id}"
188
- response = requests.get(get_associated_files_url, timeout=15)
189
- response.raise_for_status()
190
 
191
- if file_name.endswith(".mp3"):
192
- with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file:
193
- tmp_file.write(response.content)
194
- file_path = tmp_file.name
195
- return question + "\n\nMentioned .mp3 file local path is: " + file_path
196
- elif file_name.endswith(".py"):
197
- file_content = response.text
198
- return question + "\n\nBelow is mentioned Python file:\n\n```python\n" + file_content + "\n```\n"
199
- elif file_name.endswith(".xlsx"):
200
- xlsx_io = BytesIO(response.content)
201
- df = pd.read_excel(xlsx_io)
202
- file_content = df.to_csv(index=False)
203
- return question + "\n\nBelow is mentioned excel file in CSV format:\n\n```csv\n" + file_content + "\n```\n"
204
- elif file_name.endswith(".png"):
205
- base64_str = base64.b64encode(response.content).decode('utf-8')
206
- return question + "\n\nBelow is the .png image in base64 format:\n\n```base64\n" + base64_str + "\n```\n"
207
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
208
 
209
  def run_and_submit_all( profile: gr.OAuthProfile | None):
210
  """
 
1
  import os
2
  import gradio as gr
3
  import requests
4
+ import inspect
5
  import pandas as pd
6
+ from smolagents import tool, Tool, CodeAgent, DuckDuckGoSearchTool, HfApiModel, VisitWebpageTool, SpeechToTextTool, FinalAnswerTool
7
+ from dotenv import load_dotenv
8
+ import heapq
9
+ from collections import Counter
10
+ import re
11
  from io import BytesIO
 
 
12
  from youtube_transcript_api import YouTubeTranscriptApi
13
+ from langchain_community.tools.tavily_search import TavilySearchResults
14
+ from langchain_community.document_loaders import WikipediaLoader
15
+ from langchain_community.utilities import WikipediaAPIWrapper
16
+ from langchain_community.document_loaders import ArxivLoader
 
 
 
17
 
18
  # (Keep Constants as is)
19
  # --- Constants ---
20
  DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
21
 
22
+ #Load environment variables
23
+ load_dotenv()
24
 
25
+
26
+
27
+
28
+ from smolagents import Tool
29
+ from langchain_community.document_loaders import WikipediaLoader
30
+
31
+ class WikiSearchTool(Tool):
32
+ name = "wiki_search"
33
+ description = "Search Wikipedia for a query and return up to 2 results."
34
+ inputs = {
35
+ "query": {"type": "string", "description": "The search term for Wikipedia."}
36
+ }
37
+ output_type = "string"
38
+
39
+ def forward(self, query: str) -> str:
40
+ search_docs = WikipediaLoader(query=query, load_max_docs=2).load()
41
+
42
+ formatted_search_docs = "\n\n---\n\n".join(
43
+ [
44
+ f'<Document source="{doc.metadata.get("source", "Wikipedia")}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
45
+ for doc in search_docs
46
  ]
47
+ )
48
+ return formatted_search_docs
49
+
50
+
51
+
52
+
53
+ class StringReverseTool(Tool):
54
+ name = "reverse_message"
55
+ description = "When you received a strange text, try to reverse it and perform action described in reversed message."
56
+ inputs = {
57
+ "message": {
58
+ "type": "string",
59
+ "description": "A message, which looks like strange and can be reversed to get actions to execute."
60
  }
61
+ }
62
+ output_type = "string"
 
 
 
 
 
 
 
63
 
64
+ def __init__(self):
65
+ return
66
 
67
+ def forward(self, message: str):
68
+ return message[::-1]
69
 
70
+ class KeywordsExtractorTool(Tool):
71
+ """Extracts top 5 keywords from a given text based on frequency."""
 
 
 
 
 
 
72
 
73
+ name = "keywords_extractor"
74
+ description = "This tool returns the 5 most frequent keywords occur in provided block of text."
75
 
76
+ inputs = {
77
+ "text": {
78
+ "type": "string",
79
+ "description": "Text to analyze for keywords.",
80
+ }
81
+ }
82
+ output_type = "string"
83
 
84
+ def forward(self, text: str) -> str:
85
+ try:
86
+ all_words = re.findall(r'\b\w+\b', text.lower())
87
+ conjunctions = {'a', 'and', 'of', 'is', 'in', 'to', 'the'}
88
+ filtered_words = []
89
+ for w in all_words:
90
+ if w not in conjunctions:
91
+ filtered_words.push(w)
92
+ word_counts = Counter(filtered_words)
93
+ k = 5
94
+ return heapq.nlargest(k, word_counts.items(), key=lambda x: x[1])
95
+ except Exception as e:
96
+ return f"Error during extracting most common words: {e}"
97
 
98
  @tool
99
+ def parse_excel_to_json(task_id: str) -> dict:
100
  """
101
+ For a given task_id fetch and parse an Excel file and save parsed data in structured JSON file.
 
 
102
  Args:
103
+ task_id: An task ID to fetch.
104
+
105
  Returns:
 
106
  {
107
+ "task_id": str,
108
+ "sheets": {
109
+ "SheetName1": [ {col1: val1, col2: val2, ...}, ... ],
 
110
  ...
111
+ },
112
+ "status": "Success" | "Error"
 
 
 
 
113
  }
114
  """
115
+ url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
116
 
117
+ try:
118
+ response = requests.get(url, timeout=100)
119
+ if response.status_code != 200:
120
+ return {"task_id": task_id, "sheets": {}, "status": f"{response.status_code} - Failed"}
121
+
122
+ xls_content = pd.ExcelFile(BytesIO(response.content))
123
+ json_sheets = {}
124
+
125
+ for sheet in xls_content.sheet_names:
126
+ df = xls_content.parse(sheet)
127
+ df = df.dropna(how="all")
128
+ rows = df.head(20).to_dict(orient="records")
129
+ json_sheets[sheet] = rows
130
+
131
+ return {
132
+ "task_id": task_id,
133
+ "sheets": json_sheets,
134
+ "status": "Success"
135
+ }
136
 
137
+ except Exception as e:
138
+ return {
139
+ "task_id": task_id,
140
+ "sheets": {},
141
+ "status": f"Error in parsing Excel file: {str(e)}"
142
+ }
143
 
 
 
 
 
 
 
 
 
 
 
144
 
 
 
145
 
146
+ class VideoTranscriptionTool(Tool):
147
+ """Fetch transcripts from YouTube videos"""
148
+ name = "transcript_video"
149
+ description = "Fetch text transcript from YouTube movies with optional timestamps"
150
+ inputs = {
151
+ "url": {"type": "string", "description": "YouTube video URL or ID"},
152
+ "include_timestamps": {"type": "boolean", "description": "If timestamps should be included in output", "nullable": True}
153
+ }
154
+ output_type = "string"
155
 
156
+ def forward(self, url: str, include_timestamps: bool = False) -> str:
157
 
158
+ if "youtube.com/watch" in url:
159
+ video_id = url.split("v=")[1].split("&")[0]
160
+ elif "youtu.be/" in url:
161
+ video_id = url.split("youtu.be/")[1].split("?")[0]
162
+ elif len(url.strip()) == 11: # Direct ID
163
+ video_id = url.strip()
164
+ else:
165
+ return f"YouTube URL or ID: {url} is invalid!"
166
 
167
+ try:
168
+ transcription = YouTubeTranscriptApi.get_transcript(video_id)
169
 
170
+ if include_timestamps:
171
+ formatted_transcription = []
172
+ for part in transcription:
173
+ timestamp = f"{int(part['start']//60)}:{int(part['start']%60):02d}"
174
+ formatted_transcription.append(f"[{timestamp}] {part['text']}")
175
+ return "\n".join(formatted_transcription)
176
+ else:
177
+ return " ".join([part['text'] for part in transcription])
178
 
179
+ except Exception as e:
180
+ return f"Error in extracting YouTube transcript: {str(e)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
181
 
182
+ class BasicAgent:
183
+ def __init__(self):
184
+ token = os.environ.get("HF_API_TOKEN")
185
+ model = HfApiModel(
186
+ temperature=0.1,
187
+ token=token
188
+ )
 
 
 
 
 
 
 
 
 
189
 
190
+ search_tool = DuckDuckGoSearchTool()
191
+ wiki_search_tool = WikiSearchTool()
192
+ str_reverse_tool = StringReverseTool()
193
+ keywords_extract_tool = KeywordsExtractorTool()
194
+ speech_to_text_tool = SpeechToTextTool()
195
+ visit_webpage_tool = VisitWebpageTool()
196
+ final_answer_tool = FinalAnswerTool()
197
+ video_transcription_tool = VideoTranscriptionTool()
198
+
199
+ system_prompt = f"""
200
+ You are my general AI assistant. Your task is to answer the question I asked.
201
+ First, provide an explanation of your reasoning, step by step, to arrive at the answer.
202
+ Then, return your final answer in a single line, formatted as follows: "FINAL ANSWER: [YOUR FINAL ANSWER]".
203
+ [YOUR FINAL ANSWER] should be a number, a string, or a comma-separated list of numbers and/or strings, depending on the question.
204
+ If the answer is a number, do not use commas or units (e.g., $, %) unless specified.
205
+ If the answer is a string, do not use articles or abbreviations (e.g., for cities), and write digits in plain text unless specified.
206
+ If the answer is a comma-separated list, apply the above rules for each element based on whether it is a number or a string.
207
+ """
208
+ self.agent = CodeAgent(
209
+ model=model,
210
+ tools=[search_tool, wiki_search_tool, str_reverse_tool, keywords_extract_tool, speech_to_text_tool, visit_webpage_tool, final_answer_tool, parse_excel_to_json, video_transcription_tool],
211
+ add_base_tools=True
212
+ )
213
+ self.agent.prompt_templates["system_prompt"] = self.agent.prompt_templates["system_prompt"] + system_prompt
214
+
215
+ def __call__(self, question: str) -> str:
216
+ print(f"Agent received question (first 50 chars): {question[:50]}...")
217
+ answer = self.agent.run(question)
218
+ print(f"Agent returning answer: {answer}")
219
+ return answer
220
+
221
 
222
  def run_and_submit_all( profile: gr.OAuthProfile | None):
223
  """