Spaces:
Running
Running
import os | |
import logging | |
import asyncio | |
import nest_asyncio | |
from datetime import datetime | |
import uuid | |
import aiohttp | |
import gradio as gr | |
from langfuse.llama_index import LlamaIndexInstrumentor | |
from llama_index.tools.duckduckgo import DuckDuckGoSearchToolSpec | |
from llama_index.tools.weather import OpenWeatherMapToolSpec | |
from llama_index.tools.playwright import PlaywrightToolSpec | |
from llama_index.core.tools import FunctionTool | |
from llama_index.core.agent.workflow import AgentWorkflow | |
from llama_index.core.workflow import Context | |
from llama_index.llms.huggingface_api import HuggingFaceInferenceAPI | |
from llama_index.core.memory import ChatMemoryBuffer | |
from llama_index.readers.web import RssReader | |
# Configure logging | |
logging.getLogger("langfuse").setLevel(logging.WARNING) | |
# Initialize Langfuse instrumentor | |
instrumentor = LlamaIndexInstrumentor( | |
public_key=os.environ.get("LANGFUSE_PUBLIC_KEY"), | |
secret_key=os.environ.get("LANGFUSE_SECRET_KEY"), | |
host=os.environ.get("LANGFUSE_HOST"), | |
) | |
instrumentor.start() | |
# Environment variables | |
hf_token = os.environ.get("HF_TOKEN") | |
openweather_api_key = os.environ.get("OPENWEATHER_API_KEY") | |
serper_api_key = os.environ.get("SERPER_API_KEY") | |
# Initialize LLM and conversation memory | |
llm = HuggingFaceInferenceAPI( | |
model_name="Qwen/Qwen2.5-Coder-32B-Instruct", | |
token=hf_token, | |
task="conversational" | |
) | |
memory = ChatMemoryBuffer.from_defaults(token_limit=4096) | |
today_str = datetime.now().strftime("%B %d, %Y") | |
ANON_USER_ID = os.environ.get("ANON_USER_ID", uuid.uuid4().hex) | |
# Define tools | |
# DuckDuckGo web search | |
duckduckgo_tool = FunctionTool.from_defaults( | |
DuckDuckGoSearchToolSpec().duckduckgo_full_search | |
) | |
# Weather tools | |
weather_tool = FunctionTool.from_defaults( | |
OpenWeatherMapToolSpec(key=openweather_api_key).weather_at_location, | |
name="current_weather", | |
description="Get the current weather for a city/country." | |
) | |
forecast_tool = FunctionTool.from_defaults( | |
OpenWeatherMapToolSpec(key=openweather_api_key).forecast_tommorrow_at_location, | |
name="weather_forecast", | |
description="Get tomorrow's weather forecast for a city/country." | |
) | |
# Playwright tools setup | |
nest_asyncio.apply() | |
browser = asyncio.get_event_loop().run_until_complete( | |
PlaywrightToolSpec.create_async_playwright_browser( | |
headless=True, | |
args=["--no-sandbox", "--disable-setuid-sandbox"] | |
) | |
) | |
playwright_tool_spec = PlaywrightToolSpec.from_async_browser(browser) | |
navigate_tool = FunctionTool.from_defaults( | |
playwright_tool_spec.navigate_to, | |
name="web_navigate", | |
description="Navigate to a URL." | |
) | |
extract_text_tool = FunctionTool.from_defaults( | |
playwright_tool_spec.extract_text, | |
name="web_extract_text", | |
description="Extract text from the current page." | |
) | |
extract_links_tool = FunctionTool.from_defaults( | |
playwright_tool_spec.extract_hyperlinks, | |
name="web_extract_links", | |
description="Extract hyperlinks from the current page." | |
) | |
# Google News RSS tool | |
def fetch_google_news_rss(): | |
reader = RssReader(html_to_text=True) | |
docs = reader.load_data(["https://news.google.com/rss"]) | |
return [ | |
{"title": doc.metadata.get("title", "").strip(), "url": doc.metadata.get("link", "")} for doc in docs | |
] | |
google_rss_tool = FunctionTool.from_defaults( | |
fetch_google_news_rss, | |
name="fetch_google_news_rss", | |
description="Get latest headlines and URLs from Google News RSS feed." | |
) | |
# Serper news API tool | |
async def fetch_serper_news(query: str): | |
if not serper_api_key: | |
raise ValueError("Missing SERPER_API_KEY environment variable") | |
url = f"https://google.serper.dev/news?q={query}&tbs=qdr%3Ad" | |
headers = {"X-API-KEY": serper_api_key, "Content-Type": "application/json"} | |
async with aiohttp.ClientSession() as session: | |
async with session.get(url, headers=headers) as resp: | |
resp.raise_for_status() | |
return await resp.json() | |
serper_news_tool = FunctionTool.from_defaults( | |
fetch_serper_news, | |
name="fetch_news_from_serper", | |
description="Fetch news articles on a topic via Serper API." | |
) | |
# Create the agent workflow | |
tools = [ | |
duckduckgo_tool, | |
navigate_tool, | |
extract_text_tool, | |
extract_links_tool, | |
weather_tool, | |
forecast_tool, | |
google_rss_tool, | |
serper_news_tool, | |
] | |
web_agent = AgentWorkflow.from_tools_or_functions(tools, llm=llm) | |
ctx = Context(web_agent) | |
# Async helper to run agent queries | |
def run_query_sync(query: str): | |
"""Helper to run async agent.run in sync context.""" | |
return asyncio.get_event_loop().run_until_complete( | |
web_agent.run(query, ctx=ctx) | |
) | |
async def run_query(query: str): | |
trace_id = f"agent-run-{uuid.uuid4().hex}" | |
try: | |
with instrumentor.observe( | |
trace_id=trace_id, | |
session_id="web-agent-session", | |
user_id=ANON_USER_ID, | |
): | |
return await web_agent.run(query, ctx=ctx) | |
finally: | |
instrumentor.flush() | |
# Gradio interface function | |
async def gradio_query(user_input, chat_history=None): | |
history = chat_history or [] | |
result = await run_query(user_input) | |
# Ensure text-only content and strip any role prefix | |
resp_text = str(result.response) | |
if resp_text.lower().startswith("assistant:"): | |
resp_text = resp_text.split(":", 1)[1].strip() | |
# Append OpenAI-style message dicts | |
history.append({"role": "user", "content": user_input}) | |
history.append({"role": "assistant", "content": resp_text}) | |
return history, history | |
# Build and launch Gradio app | |
grb = gr.Blocks() | |
with grb: | |
gr.Markdown("## AI Web Agent") | |
chatbot = gr.Chatbot(type="messages") # use openai-style messages | |
txt = gr.Textbox(placeholder="Ask me anything...", show_label=False) | |
txt.submit(gradio_query, [txt, chatbot], [chatbot, chatbot]) | |
gr.Button("Send").click(gradio_query, [txt, chatbot], [chatbot, chatbot]) | |
if __name__ == "__main__": | |
# share=True if you want a public Space link | |
grb.launch() | |