dygoo commited on
Commit
a668c60
·
verified ·
1 Parent(s): a983eab

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +162 -64
app.py CHANGED
@@ -4,6 +4,7 @@ import requests
4
  import inspect
5
  import pandas as pd
6
  import smolagents
 
7
  from smolagents import DuckDuckGoSearchTool, VisitWebpageTool
8
  import time
9
  from functools import lru_cache
@@ -82,7 +83,14 @@ DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
82
  # Cache Wrapper
83
  @lru_cache(maxsize=100)
84
  def cached_search(query):
85
- return search_tool(query)
 
 
 
 
 
 
 
86
 
87
  # --- Basic Agent Definition ---
88
  # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------
@@ -97,105 +105,195 @@ class BasicAgent:
97
  self.history = []
98
  print(f"BasicAgent initialized with model: {model} and {len(self.tools)} tools.")
99
 
100
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
  def __call__(self, question: str) -> str:
102
- print(f"Agent received question (first 50 chars): {question[:50]}...")
103
- # Implement your agent logic here using self.model and self.tools
104
- final_answer = self.process_question(question)
105
- print(f"Agent returning answer: {final_answer[:50]}...")
106
- return final_answer
 
 
 
 
 
107
 
108
 
109
-
110
- def process_question(self, question:str) -> str:
111
  try:
112
  # Check if this is a request about a YouTube video
113
  youtube_patterns = ["youtube.com", "youtu.be", "watch youtube", "youtube video"]
114
  use_youtube_tool = any(pattern in question.lower() for pattern in youtube_patterns)
115
 
 
 
 
 
116
  if use_youtube_tool and any(isinstance(tool, YouTubeVideoTool) for tool in self.tools):
117
  # Extract potential YouTube URL or ID
118
  url_match = re.search(r'(?:https?:\/\/)?(?:www\.)?(?:youtube\.com|youtu\.be)\/[^\s]+', question)
119
  youtube_url = url_match.group(0) if url_match else question
120
 
 
121
  # Use YouTube tool
122
- youtube_info = next(tool for tool in self.tools
123
- if isinstance(tool, YouTubeVideoTool))(youtube_url)
 
 
124
 
125
- relevant_info = self._extract_key_info(youtube_info, question)
126
- return self._formulate_direct_answer(relevant_info, question)
 
 
 
 
 
 
 
127
  else:
128
- # Use regular search
129
- search_results = cached_search(question) if any(isinstance(tool, DuckDuckGoSearchTool) for tool in self.tools) else "No search results available."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
130
  relevant_info = self._extract_key_info(search_results, question)
131
  return self._formulate_direct_answer(relevant_info, question)
132
- except Exception as e:
133
- if "too many requests" in str(e).lower():
134
- time.sleep(2)
135
- try:
136
- search_results = cached_search(question)
137
- relevant_info = self._extract_key_info(search_results, question)
138
- return self._formulate_direct_answer(relevant_info, question)
139
- except:
140
- return self._get_fallback_answer(question)
141
- return self._get_fallback_answer(question)
142
 
143
 
144
 
145
  def _extract_key_info(self, search_results, question):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
146
  # Split results into sentences and find most relevant
147
  sentences = search_results.split('. ')
148
  if len(sentences) <= 3:
149
- return search_results[:250] # If few sentences, return first portion
150
 
151
- # Try to find sentence with keywords from question
152
  keywords = [w for w in question.lower().split() if len(w) > 3]
 
 
153
  for sentence in sentences:
154
  sentence_lower = sentence.lower()
155
  if any(keyword in sentence_lower for keyword in keywords):
156
- return sentence
 
 
 
 
 
 
157
 
158
  # Fallback to first few sentences
159
- return '. '.join(sentences[:2])
160
-
161
-
162
 
163
  def _formulate_direct_answer(self, relevant_info, question):
164
- if self.model and self.model.startswith('gemini'):
165
- try:
166
- # Configure the model
167
- generation_config = {
168
- "temperature": 0.7,
169
- "top_p": 0.95,
170
- "top_k": 40,
171
- "max_output_tokens": 1024,
172
- }
173
-
174
- safety_settings = {
175
- HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
176
- HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
177
- HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
178
- HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
179
- }
180
-
181
- # Initialize the model
182
- model = genai.GenerativeModel(
183
- model_name="gemini-pro", # Adjust as needed based on your model string
184
- generation_config=generation_config,
185
- safety_settings=safety_settings
186
- )
187
-
188
- # Prepare prompt and generate response
189
- prompt = f"Question: {question}\n\nRelevant information: {relevant_info}\n\nProvide a concise answer based only on the given information."
190
- response = model.generate_content(prompt)
191
  return response.text
 
 
 
192
 
193
- except Exception as e:
194
- print(f"Error using Gemini model: {e}")
195
- return f"Based on the search: {relevant_info}"
196
-
197
- return relevant_info
198
-
 
199
 
200
 
201
  def _get_fallback_answer(self, question):
 
4
  import inspect
5
  import pandas as pd
6
  import smolagents
7
+ import traceback
8
  from smolagents import DuckDuckGoSearchTool, VisitWebpageTool
9
  import time
10
  from functools import lru_cache
 
83
  # Cache Wrapper
84
  @lru_cache(maxsize=100)
85
  def cached_search(query):
86
+ try:
87
+ print(f"Performing search for: {query[:50]}...")
88
+ result = search_tool(query)
89
+ print(f"Search successful, returned {len(result)} characters")
90
+ return result
91
+ except Exception as e:
92
+ print(f"Search error: {str(e)}")
93
+ return f"Search error: {str(e)}"
94
 
95
  # --- Basic Agent Definition ---
96
  # ----- THIS IS WERE YOU CAN BUILD WHAT YOU WANT ------
 
105
  self.history = []
106
  print(f"BasicAgent initialized with model: {model} and {len(self.tools)} tools.")
107
 
108
+
109
+ if self.model and self.model.startswith('gemini'):
110
+ try:
111
+ self._init_gemini_model()
112
+ print("Successfully initialized Gemini model")
113
+ except Exception as e:
114
+ print(f"Error initializing Gemini model: {e}")
115
+ print("Will try again when needed")
116
+ self.gemini_model = None
117
+ else:
118
+ self.gemini_model = None
119
+
120
+ def _init_gemini_model(self):
121
+
122
+ """Initialize the Gemini model with appropriate settings"""
123
+ generation_config = {
124
+ "temperature": 0.7,
125
+ "top_p": 0.95,
126
+ "top_k": 40,
127
+ "max_output_tokens": 1024,
128
+ }
129
+
130
+ safety_settings = {
131
+ HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
132
+ HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_NONE,
133
+ HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
134
+ HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
135
+ }
136
+
137
+ model_name = "gemini-pro"
138
+ if "gemini-2.0" in self.model:
139
+ model_name = "gemini-1.5-pro"
140
+
141
+ self.gemini_model = genai.GenerativeModel(
142
+ model_name=model_name,
143
+ generation_config=generation_config,
144
+ safety_settings=safety_settings
145
+ )
146
+
147
  def __call__(self, question: str) -> str:
148
+ print(f"Agent received question: {question[:50]}...")
149
+ try:
150
+ final_answer = self.process_question(question)
151
+ print(f"Agent returning answer: {final_answer[:50]}...")
152
+ return final_answer
153
+ except Exception as e:
154
+ print(f"Agent error: {str(e)}")
155
+ traceback.print_exc()
156
+ return f"I apologize, but I encountered an error while processing your question. Error: {str(e)}"
157
+
158
 
159
 
160
+ def process_question(self, question: str) -> str:
 
161
  try:
162
  # Check if this is a request about a YouTube video
163
  youtube_patterns = ["youtube.com", "youtu.be", "watch youtube", "youtube video"]
164
  use_youtube_tool = any(pattern in question.lower() for pattern in youtube_patterns)
165
 
166
+ search_results = ""
167
+ youtube_info = ""
168
+
169
+ # Step 1: Gather information
170
  if use_youtube_tool and any(isinstance(tool, YouTubeVideoTool) for tool in self.tools):
171
  # Extract potential YouTube URL or ID
172
  url_match = re.search(r'(?:https?:\/\/)?(?:www\.)?(?:youtube\.com|youtu\.be)\/[^\s]+', question)
173
  youtube_url = url_match.group(0) if url_match else question
174
 
175
+ print(f"Using YouTube tool with URL: {youtube_url}")
176
  # Use YouTube tool
177
+ youtube_tool_instance = next((tool for tool in self.tools if isinstance(tool, YouTubeVideoTool)), None)
178
+ if youtube_tool_instance:
179
+ youtube_info = youtube_tool_instance(youtube_url)
180
+ print(f"YouTube info retrieved: {len(youtube_info)} characters")
181
 
182
+ # Always search as backup or additional context
183
+ if any(isinstance(tool, DuckDuckGoSearchTool) for tool in self.tools):
184
+ search_results = cached_search(question)
185
+ print(f"Search results: {len(search_results)} characters")
186
+
187
+ # Determine what information to use
188
+ if youtube_info and "Error processing YouTube video" not in youtube_info:
189
+ primary_info = youtube_info
190
+ print("Using YouTube info as primary source")
191
  else:
192
+ primary_info = search_results
193
+ print("Using search results as primary source")
194
+
195
+ # Extract key information
196
+ relevant_info = self._extract_key_info(primary_info, question)
197
+ print(f"Extracted relevant info: {len(relevant_info)} characters")
198
+
199
+ # Formulate an answer
200
+ return self._formulate_direct_answer(relevant_info, question)
201
+
202
+ except Exception as e:
203
+ print(f"Error in process_question: {str(e)}")
204
+ traceback.print_exc()
205
+ if "too many requests" in str(e).lower():
206
+ time.sleep(2)
207
+ try:
208
+ search_results = cached_search(question)
209
  relevant_info = self._extract_key_info(search_results, question)
210
  return self._formulate_direct_answer(relevant_info, question)
211
+ except Exception as retry_error:
212
+ print(f"Error in retry: {str(retry_error)}")
213
+ return self._get_fallback_answer(question)
214
+ return self._get_fallback_answer(question)
215
+
 
 
 
 
 
216
 
217
 
218
 
219
  def _extract_key_info(self, search_results, question):
220
+ # Basic check for empty results
221
+ if not search_results or len(search_results) < 10:
222
+ return "No relevant information found."
223
+
224
+ # For YouTube transcripts, extract the most relevant portion
225
+ if "Transcript from YouTube video" in search_results:
226
+ # Split by sentences but keep limited context
227
+ max_chars = 500 # Keep a reasonable chunk size
228
+ if len(search_results) > max_chars:
229
+ # Take a portion from the middle of the transcript for better relevance
230
+ start_idx = search_results.find("\n") + 1 # Skip the first line which is the header
231
+ # Get content chunk
232
+ return search_results[start_idx:start_idx+max_chars]
233
+ return search_results
234
+
235
+ # For search results
236
  # Split results into sentences and find most relevant
237
  sentences = search_results.split('. ')
238
  if len(sentences) <= 3:
239
+ return search_results[:300]
240
 
241
+ # Try to find sentences with keywords from question
242
  keywords = [w for w in question.lower().split() if len(w) > 3]
243
+ relevant_sentences = [] # NEW LINE
244
+
245
  for sentence in sentences:
246
  sentence_lower = sentence.lower()
247
  if any(keyword in sentence_lower for keyword in keywords):
248
+ relevant_sentences.append(sentence)
249
+ if len(relevant_sentences) >= 3: # Get up to 3 relevant sentences
250
+ break
251
+
252
+ # If we found relevant sentences, use them
253
+ if relevant_sentences:
254
+ return '. '.join(relevant_sentences)
255
 
256
  # Fallback to first few sentences
257
+ return '. '.join(sentences[:3])
 
 
258
 
259
  def _formulate_direct_answer(self, relevant_info, question):
260
+
261
+ if not self.model:
262
+ return f"Based on available information: {relevant_info}"
263
+
264
+ if self.model.startswith('gemini'):
265
+ try:
266
+
267
+ if not hasattr(self, 'gemini_model') or self.gemini_model is None:
268
+ self._init_gemini_model()
269
+
270
+
271
+ prompt = f"""
272
+ Question: {question}
273
+
274
+ Relevant information: {relevant_info}
275
+
276
+ Instructions:
277
+ 1. Provide a concise answer based only on the given information
278
+ 2. If the information doesn't contain the answer, say so honestly
279
+ 3. Use only facts from the provided information
280
+ 4. Format your response as a direct answer to the user
281
+ """
282
+
283
+ response = self.gemini_model.generate_content(prompt)
284
+ if response and hasattr(response, 'text'):
 
 
285
  return response.text
286
+ else:
287
+ print("Gemini response was empty or invalid")
288
+ return f"Based on the information: {relevant_info[:200]}..."
289
 
290
+ except Exception as e:
291
+ print(f"Error using Gemini model: {e}")
292
+ traceback.print_exc()
293
+ return f"Based on the search: {relevant_info[:200]}..."
294
+
295
+ return f"Based on the information: {relevant_info[:200]}..."
296
+
297
 
298
 
299
  def _get_fallback_answer(self, question):