wt002 commited on
Commit
1f27438
·
verified ·
1 Parent(s): e060a36

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +143 -384
app.py CHANGED
@@ -22,429 +22,188 @@ DEFAULT_API_URL = "https://agents-course-unit4-scoring.hf.space"
22
  #Load environment variables
23
  load_dotenv()
24
 
25
- import io
26
- import contextlib
27
- import traceback
28
- from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
29
- from smolagents import Tool, CodeAgent, DuckDuckGoSearchTool, FinalAnswerTool, HfApiModel
30
-
31
-
32
- class CodeLlamaTool(Tool):
33
- name = "code_llama_tool"
34
- description = "Solves reasoning/code questions using Meta Code Llama 7B Instruct"
35
-
36
- inputs = {
37
- "question": {
38
- "type": "string",
39
- "description": "The question requiring code-based or reasoning-based solution"
40
- }
41
- }
42
- output_type = "string"
43
-
44
- def __init__(self):
45
- self.model_id = "codellama/CodeLlama-7b-Instruct-hf"
46
- token = os.getenv("HF_TOKEN")
47
-
48
- self.tokenizer = AutoTokenizer.from_pretrained(self.model_id, token=token)
49
- self.model = AutoModelForCausalLM.from_pretrained(
50
- self.model_id,
51
- device_map="auto",
52
- torch_dtype="auto",
53
- token=token
54
- )
55
- self.pipeline = pipeline(
56
- "text-generation",
57
- model=self.model,
58
- tokenizer=self.tokenizer,
59
- max_new_tokens=512,
60
- temperature=0.2,
61
- truncation=True
62
- )
63
-
64
- def forward(self, question: str) -> str:
65
- prompt = f"""You are an AI that uses Python code to answer questions.
66
- Question: {question}
67
- Instructions:
68
- - If solving requires code, use a block like <tool>code</tool>.
69
- - Always end with <final>FINAL ANSWER</final> containing the final number or string.
70
- Example:
71
- Question: What is 5 * sqrt(36)?
72
- Answer:
73
- <tool>
74
- import math
75
- print(5 * math.sqrt(36))
76
- </tool>
77
- <final>30.0</final>
78
- Answer:"""
79
-
80
- response = self.pipeline(prompt)[0]["generated_text"]
81
- return self.parse_and_execute(response)
82
-
83
- def parse_and_execute(self, response: str) -> str:
84
  try:
85
- # Extract and run code if exists
86
- if "<tool>" in response and "</tool>" in response:
87
- code = response.split("<tool>")[1].split("</tool>")[0].strip()
88
- result = self._run_code(code)
89
- return f"FINAL ANSWER (code output): {result}"
90
-
91
- # Extract final result directly
92
- elif "<final>" in response and "</final>" in response:
93
- final = response.split("<final>")[1].split("</final>")[0].strip()
94
- return f"FINAL ANSWER: {final}"
95
-
96
- return f"Could not extract final answer.\n\n{response}"
97
-
98
  except Exception as e:
99
- return f"Error in parse_and_execute: {str(e)}\n\nFull response:\n{response}"
100
-
101
- def _run_code(self, code: str) -> str:
102
- buffer = io.StringIO()
103
- try:
104
- with contextlib.redirect_stdout(buffer):
105
- exec(code, {})
106
- return buffer.getvalue().strip()
107
- except Exception:
108
- return f"Error executing code:\n{traceback.format_exc()}"
109
-
110
-
111
-
112
 
113
- import requests
114
- from smolagents import Tool
115
-
116
- class ArxivSearchTool(Tool):
117
- name = "arxiv_search"
118
- description = "Search Arxiv for papers matching a query and return titles and links."
119
- inputs = {
120
- "query": {"type": "string", "description": "Search query for Arxiv papers"}
121
- }
122
- output_type = "string"
123
-
124
- def forward(self, query: str) -> str:
125
- url = "http://export.arxiv.org/api/query"
126
- params = {
127
- "search_query": query,
128
- "start": 0,
129
- "max_results": 3,
130
- "sortBy": "relevance",
131
- "sortOrder": "descending"
132
- }
133
  try:
134
- response = requests.get(url, params=params, timeout=10)
135
- response.raise_for_status()
136
- # Simple parse titles and links (basic, for demo)
137
- import xml.etree.ElementTree as ET
138
- root = ET.fromstring(response.content)
139
- ns = {"atom": "http://www.w3.org/2005/Atom"}
140
-
141
- entries = root.findall("atom:entry", ns)
142
- results = []
143
- for entry in entries:
144
- title = entry.find("atom:title", ns).text.strip().replace('\n', ' ')
145
- link = entry.find("atom:id", ns).text.strip()
146
- results.append(f"{title}\n{link}")
147
- return "\n\n".join(results) if results else "No results found."
148
  except Exception as e:
149
- return f"Error during Arxiv search: {e}"
150
-
151
-
152
- from transformers import pipeline
153
- from smolagents import Tool
154
- from PIL import Image
155
-
156
- class HuggingFaceDocumentQATool(Tool):
157
- name = "document_qa"
158
- description = "Answer questions from document images (e.g., scanned invoices)."
159
- inputs = {
160
- "image_path": {"type": "string", "description": "Path to the image file"},
161
- "question": {"type": "string", "description": "Question to ask about the document"}
162
- }
163
- output_type = "string"
164
-
165
- def __init__(self):
166
- self.pipeline = pipeline("document-question-answering", model="impira/layoutlm-document-qa")
167
-
168
- def forward(self, image_path: str, question: str) -> str:
169
- image = Image.open(image_path)
170
- result = self.pipeline(image, question=question)
171
- return result[0]['answer']
172
-
173
-
174
- from transformers import BlipProcessor, BlipForQuestionAnswering
175
-
176
- class HuggingFaceImageQATool(Tool):
177
- name = "image_qa"
178
- description = "Answer questions about an image."
179
- inputs = {
180
- "image_path": {"type": "string", "description": "Path to image"},
181
- "question": {"type": "string", "description": "Question about the image"}
182
- }
183
- output_type = "string"
184
-
185
- def __init__(self):
186
- self.processor = BlipProcessor.from_pretrained("Salesforce/blip-vqa-base")
187
- self.model = BlipForQuestionAnswering.from_pretrained("Salesforce/blip-vqa-base")
188
-
189
- def forward(self, image_path: str, question: str) -> str:
190
- image = Image.open(image_path)
191
- inputs = self.processor(image, question, return_tensors="pt")
192
- out = self.model.generate(**inputs)
193
- return self.processor.decode(out[0], skip_special_tokens=True)
194
-
195
-
196
- from transformers import pipeline
197
-
198
- class HuggingFaceTranslationTool(Tool):
199
- name = "translate"
200
- description = "Translate text from English to another language."
201
- inputs = {
202
- "text": {"type": "string", "description": "Text to translate"}
203
- }
204
- output_type = "string"
205
-
206
- def __init__(self):
207
- self.translator = pipeline("translation", model="Helsinki-NLP/opus-mt-en-fr")
208
-
209
- def forward(self, text: str) -> str:
210
- return self.translator(text)[0]["translation_text"]
211
 
212
-
213
- import io
214
- import contextlib
215
-
216
- class PythonCodeExecutionTool(Tool):
217
- name = "run_python"
218
- description = "Execute Python code and return result."
219
- inputs = {
220
- "code": {"type": "string", "description": "Python code to execute"}
221
- }
222
- output_type = "string"
223
-
224
- def forward(self, code: str) -> str:
225
- output = io.StringIO()
226
  try:
227
- with contextlib.redirect_stdout(output):
228
- exec(code, {})
229
- return output.getvalue().strip()
 
 
 
 
 
 
 
 
 
 
 
 
 
230
  except Exception as e:
231
- return f"Error: {str(e)}"
232
-
233
-
234
 
235
-
236
- #from smolagents import Tool
237
- #from langchain_community.document_loaders import WikipediaLoader
238
-
239
- class WikiSearchTool(Tool):
240
- name = "wiki_search"
241
- description = "Search Wikipedia for a query and return up to 2 results."
242
- inputs = {
243
- "query": {"type": "string", "description": "The search term for Wikipedia."}
244
- }
245
- output_type = "string"
246
-
247
- def forward(self, query: str) -> str:
248
- search_docs = WikipediaLoader(query=query, load_max_docs=2).load()
249
-
250
- formatted_search_docs = "\n\n---\n\n".join(
251
- [
252
- f'<Document source="{doc.metadata.get("source", "Wikipedia")}" page="{doc.metadata.get("page", "")}"/>\n{doc.page_content}\n</Document>'
253
- for doc in search_docs
254
- ]
255
  )
256
- return formatted_search_docs
257
-
258
 
259
-
260
-
261
- class StringReverseTool(Tool):
262
- name = "reverse_message"
263
- description = "When you received a strange text, try to reverse it and perform action described in reversed message."
264
- inputs = {
265
- "message": {
266
- "type": "string",
267
- "description": "A message, which looks like strange and can be reversed to get actions to execute."
268
- }
269
- }
270
- output_type = "string"
271
-
272
- def __init__(self):
273
- return
274
-
275
- def forward(self, message: str):
276
- return message[::-1]
277
-
278
- class KeywordsExtractorTool(Tool):
279
- """Extracts top 5 keywords from a given text based on frequency."""
280
-
281
- name = "keywords_extractor"
282
- description = "This tool returns the 5 most frequent keywords occur in provided block of text."
283
-
284
- inputs = {
285
- "text": {
286
- "type": "string",
287
- "description": "Text to analyze for keywords.",
288
- }
289
- }
290
- output_type = "string"
291
-
292
- def forward(self, text: str) -> str:
293
  try:
294
- all_words = re.findall(r'\b\w+\b', text.lower())
295
- conjunctions = {'a', 'and', 'of', 'is', 'in', 'to', 'the'}
296
- filtered_words = []
297
- for w in all_words:
298
- if w not in conjunctions:
299
- filtered_words.push(w)
300
- word_counts = Counter(filtered_words)
301
- k = 5
302
- return heapq.nlargest(k, word_counts.items(), key=lambda x: x[1])
303
  except Exception as e:
304
- return f"Error during extracting most common words: {e}"
305
 
306
- @tool
307
- def parse_excel_to_json(task_id: str) -> dict:
308
- """
309
- For a given task_id fetch and parse an Excel file and save parsed data in structured JSON file.
310
- Args:
311
- task_id: An task ID to fetch.
312
 
313
- Returns:
314
- {
315
- "task_id": str,
316
- "sheets": {
317
- "SheetName1": [ {col1: val1, col2: val2, ...}, ... ],
318
- ...
319
- },
320
- "status": "Success" | "Error"
321
- }
322
- """
323
- url = f"https://agents-course-unit4-scoring.hf.space/files/{task_id}"
324
-
325
- try:
326
- response = requests.get(url, timeout=100)
327
- if response.status_code != 200:
328
- return {"task_id": task_id, "sheets": {}, "status": f"{response.status_code} - Failed"}
329
-
330
- xls_content = pd.ExcelFile(BytesIO(response.content))
331
- json_sheets = {}
332
-
333
- for sheet in xls_content.sheet_names:
334
- df = xls_content.parse(sheet)
335
- df = df.dropna(how="all")
336
- rows = df.head(20).to_dict(orient="records")
337
- json_sheets[sheet] = rows
338
-
339
- return {
340
- "task_id": task_id,
341
- "sheets": json_sheets,
342
- "status": "Success"
343
- }
344
-
345
- except Exception as e:
346
- return {
347
- "task_id": task_id,
348
- "sheets": {},
349
- "status": f"Error in parsing Excel file: {str(e)}"
350
- }
351
-
352
-
353
-
354
- class VideoTranscriptionTool(Tool):
355
- """Fetch transcripts from YouTube videos"""
356
- name = "transcript_video"
357
- description = "Fetch text transcript from YouTube movies with optional timestamps"
358
- inputs = {
359
- "url": {"type": "string", "description": "YouTube video URL or ID"},
360
- "include_timestamps": {"type": "boolean", "description": "If timestamps should be included in output", "nullable": True}
361
- }
362
- output_type = "string"
363
-
364
- def forward(self, url: str, include_timestamps: bool = False) -> str:
365
-
366
- if "youtube.com/watch" in url:
367
- video_id = url.split("v=")[1].split("&")[0]
368
- elif "youtu.be/" in url:
369
- video_id = url.split("youtu.be/")[1].split("?")[0]
370
- elif len(url.strip()) == 11: # Direct ID
371
- video_id = url.strip()
372
- else:
373
- return f"YouTube URL or ID: {url} is invalid!"
374
-
375
  try:
376
- transcription = YouTubeTranscriptApi.get_transcript(video_id)
 
 
 
 
 
 
377
 
378
- if include_timestamps:
379
- formatted_transcription = []
380
- for part in transcription:
381
- timestamp = f"{int(part['start']//60)}:{int(part['start']%60):02d}"
382
- formatted_transcription.append(f"[{timestamp}] {part['text']}")
383
- return "\n".join(formatted_transcription)
384
- else:
385
- return " ".join([part['text'] for part in transcription])
386
 
387
- except Exception as e:
388
- return f"Error in extracting YouTube transcript: {str(e)}"
 
 
 
 
 
389
 
390
  class BasicAgent:
391
  def __init__(self):
392
  token = os.environ.get("HF_API_TOKEN")
393
  model = HfApiModel(
394
- temperature=0.1,
395
  token=token
396
  )
397
 
398
- # Existing tools
399
  search_tool = DuckDuckGoSearchTool()
400
  wiki_search_tool = WikiSearchTool()
401
- str_reverse_tool = StringReverseTool()
402
- keywords_extract_tool = KeywordsExtractorTool()
403
- speech_to_text_tool = SpeechToTextTool()
404
- visit_webpage_tool = VisitWebpageTool()
405
- final_answer_tool = FinalAnswerTool()
406
- video_transcription_tool = VideoTranscriptionTool()
407
-
408
- # ✅ New Llama Tool
409
- code_llama_tool = CodeLlamaTool()
410
- # ✅ Add Hugging Face default tools
411
  arxiv_search_tool = ArxivSearchTool()
412
  doc_qa_tool = HuggingFaceDocumentQATool()
413
- image_qa_tool = HuggingFaceImageQATool()
414
- translation_tool = HuggingFaceTranslationTool()
415
  python_tool = PythonCodeExecutionTool()
 
416
 
417
- system_prompt = f"""
418
- You are my general AI assistant. Your task is to answer the question I asked.
419
- First, provide an explanation of your reasoning, step by step, to arrive at the answer.
420
- Then, return your final answer in a single line, formatted as follows: "FINAL ANSWER: [YOUR FINAL ANSWER]".
421
- [YOUR FINAL ANSWER] should be a number, a string, or a comma-separated list of numbers and/or strings, depending on the question.
422
- If the answer is a number, do not use commas or units (e.g., $, %) unless specified.
423
- If the answer is a string, do not use articles or abbreviations (e.g., for cities), and write digits in plain text unless specified.
424
- If the answer is a comma-separated list, apply the above rules for each element based on whether it is a number or a string.
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
425
  """
426
 
427
  self.agent = CodeAgent(
428
  model=model,
429
- tools=[
430
- search_tool, wiki_search_tool, str_reverse_tool,
431
- keywords_extract_tool, speech_to_text_tool,
432
- visit_webpage_tool, final_answer_tool,
433
- parse_excel_to_json, video_transcription_tool,
434
- arxiv_search_tool,
435
- doc_qa_tool, image_qa_tool,
436
- translation_tool, python_tool,
437
- code_llama_tool # 🔧 Add here
438
- ],
439
- add_base_tools=True
440
  )
441
- self.agent.prompt_templates["system_prompt"] = self.agent.prompt_templates["system_prompt"] + system_prompt
 
442
 
443
  def __call__(self, question: str) -> str:
444
- print(f"Agent received question (first 50 chars): {question[:50]}...")
445
- answer = self.agent.run(question)
446
- print(f"Agent returning answer: {answer}")
447
- return answer
 
 
 
 
 
 
448
 
449
 
450
 
 
22
  #Load environment variables
23
  load_dotenv()
24
 
25
+ from duckduckgo_search import DDGS
26
+ import wikipedia
27
+ import arxiv
28
+ from transformers import pipeline
29
+ import os
30
+ import re
31
+ import ast
32
+ import subprocess
33
+ import sys
34
+
35
+ # ===== Search Tools =====
36
+ class DuckDuckGoSearchTool:
37
+ def __init__(self, max_results=3):
38
+ self.description = "Search web using DuckDuckGo. Input: search query"
39
+ self.max_results = max_results
40
+
41
+ def run(self, query: str) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
42
  try:
43
+ with DDGS() as ddgs:
44
+ results = [r for r in ddgs.text(query, max_results=self.max_results)]
45
+ return "\n\n".join(
46
+ f"Title: {res['title']}\nURL: {res['href']}\nSnippet: {res['body']}"
47
+ for res in results
48
+ )
 
 
 
 
 
 
 
49
  except Exception as e:
50
+ return f"Search error: {str(e)}"
 
 
 
 
 
 
 
 
 
 
 
 
51
 
52
+ class WikiSearchTool:
53
+ def __init__(self, sentences=3):
54
+ self.description = "Get Wikipedia summaries. Input: search phrase"
55
+ self.sentences = sentences
56
+
57
+ def run(self, query: str) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
58
  try:
59
+ return wikipedia.summary(query, sentences=self.sentences)
60
+ except wikipedia.DisambiguationError as e:
61
+ return f"Disambiguation error. Options: {', '.join(e.options[:5])}"
62
+ except wikipedia.PageError:
63
+ return "Page not found"
 
 
 
 
 
 
 
 
 
64
  except Exception as e:
65
+ return f"Wikipedia error: {str(e)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
66
 
67
+ class ArxivSearchTool:
68
+ def __init__(self, max_results=3):
69
+ self.description = "Search academic papers on arXiv. Input: search query"
70
+ self.max_results = max_results
71
+
72
+ def run(self, query: str) -> str:
 
 
 
 
 
 
 
 
73
  try:
74
+ results = arxiv.Search(
75
+ query=query,
76
+ max_results=self.max_results,
77
+ sort_by=arxiv.SortCriterion.Relevance
78
+ ).results()
79
+
80
+ output = []
81
+ for r in results:
82
+ output.append(
83
+ f"Title: {r.title}\n"
84
+ f"Authors: {', '.join(a.name for a in r.authors)}\n"
85
+ f"Published: {r.published.strftime('%Y-%m-%d')}\n"
86
+ f"Summary: {r.summary[:250]}...\n"
87
+ f"URL: {r.entry_id}"
88
+ )
89
+ return "\n\n".join(output)
90
  except Exception as e:
91
+ return f"arXiv error: {str(e)}"
 
 
92
 
93
+ # ===== QA Tools =====
94
+ class HuggingFaceDocumentQATool:
95
+ def __init__(self):
96
+ self.description = "Answer questions from documents. Input: 'document_text||question'"
97
+ self.model = pipeline(
98
+ 'question-answering',
99
+ model='deepset/roberta-base-squad2',
100
+ tokenizer='deepset/roberta-base-squad2'
 
 
 
 
 
 
 
 
 
 
 
 
101
  )
 
 
102
 
103
+ def run(self, input_str: str) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
104
  try:
105
+ if '||' not in input_str:
106
+ return "Invalid format. Use: 'document_text||question'"
107
+
108
+ context, question = input_str.split('||', 1)
109
+ result = self.model(question=question, context=context)
110
+ return result['answer']
 
 
 
111
  except Exception as e:
112
+ return f"QA error: {str(e)}"
113
 
114
+ # ===== Code Execution =====
115
+ class PythonCodeExecutionTool:
116
+ def __init__(self):
117
+ self.description = "Execute Python code. Input: valid Python code"
 
 
118
 
119
+ def run(self, code: str) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
  try:
121
+ # Isolate code in a clean environment
122
+ env = {}
123
+ exec(f"def __temp_func__():\n {indent_code(code)}", env)
124
+ output = env['__temp_func__']()
125
+ return str(output)
126
+ except Exception as e:
127
+ return f"Execution error: {str(e)}"
128
 
129
+ def indent_code(code: str) -> str:
130
+ """Add proper indentation for multiline code"""
131
+ return '\n '.join(code.splitlines())
 
 
 
 
 
132
 
133
+ # ===== Answer Formatting =====
134
+ class FinalAnswerTool:
135
+ def __init__(self):
136
+ self.description = "Format final answer. Input: answer content"
137
+
138
+ def run(self, answer: str) -> str:
139
+ return f"FINAL ANSWER: {answer}"
140
 
141
  class BasicAgent:
142
  def __init__(self):
143
  token = os.environ.get("HF_API_TOKEN")
144
  model = HfApiModel(
145
+ temperature=0.0, # Reduced for deterministic output
146
  token=token
147
  )
148
 
149
+ # Curated toolset - remove redundant/conflicting tools
150
  search_tool = DuckDuckGoSearchTool()
151
  wiki_search_tool = WikiSearchTool()
 
 
 
 
 
 
 
 
 
 
152
  arxiv_search_tool = ArxivSearchTool()
153
  doc_qa_tool = HuggingFaceDocumentQATool()
 
 
154
  python_tool = PythonCodeExecutionTool()
155
+ final_answer_tool = FinalAnswerTool()
156
 
157
+ # Strategic tool selection
158
+ tools = [
159
+ search_tool,
160
+ wiki_search_tool,
161
+ arxiv_search_tool,
162
+ doc_qa_tool,
163
+ python_tool,
164
+ final_answer_tool
165
+ ]
166
+
167
+ # Enhanced system prompt
168
+ system_prompt = """
169
+ You are a precision question-answering AI. Follow this protocol:
170
+ 1. Analyze the question type: factual, computational, or multi-step
171
+ 2. Select the optimal tool:
172
+ - Use Search/Wiki/Arxiv for factual queries
173
+ - Use Python tool for calculations
174
+ - Use DocQA for document-based questions
175
+ 3. Execute necessary actions
176
+ 4. Verify answer matches question requirements
177
+ 5. Output FINAL ANSWER using this format:
178
+ "FINAL ANSWER: [EXACT_RESULT]"
179
+
180
+ Answer rules:
181
+ - Numbers: Plain format (e.g., 1000000)
182
+ - Strings: No articles/abbreviations (e.g., "Paris" not "city of Paris")
183
+ - Lists: Comma-separated (e.g., "red,blue,green")
184
+ - Never include units ($, kg, etc.) unless explicitly required
185
+ - For true/false: Use "true" or "false" lowercase
186
  """
187
 
188
  self.agent = CodeAgent(
189
  model=model,
190
+ tools=tools,
191
+ add_base_tools=False # Prevent tool conflicts
 
 
 
 
 
 
 
 
 
192
  )
193
+ # Force strict prompt template
194
+ self.agent.prompt_templates["system_prompt"] = system_prompt
195
 
196
  def __call__(self, question: str) -> str:
197
+ print(f"Processing: {question[:50]}...")
198
+ try:
199
+ result = self.agent.run(question)
200
+ # Extract final answer using regex
201
+ import re
202
+ match = re.search(r"FINAL ANSWER:\s*(.+)", result, re.IGNORECASE)
203
+ return match.group(1).strip() if match else result
204
+ except Exception as e:
205
+ print(f"Error: {str(e)}")
206
+ return "Unable to determine answer"
207
 
208
 
209