import base64 import json import os import re from typing import Optional, Dict import pandas as pd import requests import whisper from bs4 import BeautifulSoup from datetime import datetime from dotenv import find_dotenv, load_dotenv from langchain.chains import RetrievalQA from langchain.chat_models import init_chat_model from langchain.schema import Document from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.document_loaders import ( UnstructuredPDFLoader, UnstructuredPowerPointLoader, UnstructuredWordDocumentLoader, WebBaseLoader) from langchain_community.tools import DuckDuckGoSearchResults, GoogleSearchResults from langchain_community.utilities import GoogleSerperAPIWrapper from langchain_community.vectorstores import FAISS from langchain_core.prompts import ChatPromptTemplate from langchain_core.tools import tool from langchain_huggingface.embeddings import HuggingFaceEmbeddings from langchain_tavily import TavilySearch from markdownify import markdownify as md from youtube_transcript_api import YouTubeTranscriptApi from yt_dlp import YoutubeDL UNWANTED_SECTIONS = { "references", "external links", "further reading", "see also", "notes", } @tool def get_weather_info(location: str) -> str: """Fetches weather information for a given location. Usage: ``` # Initialize the tool weather_info_tool = Tool( name="get_weather_info", func=get_weather_info, description="Fetches weather information for a given location.") ``` """ load_dotenv(find_dotenv()) api_key = os.getenv("OPENWEATHERMAP_API_KEY") url = ( f"https://api.openweathermap.org/data/2.5/" f"weather?q={location}&appid={api_key}&units=metric" ) res = requests.get(url, timeout=15) data = res.json() humidity = data["main"]["humidity"] pressure = data["main"]["pressure"] wind = data["wind"]["speed"] description = data["weather"][0]["description"] temp = data["main"]["temp"] min_temp = data["main"]["temp_min"] max_temp = data["main"]["temp_max"] return ( f"Weather in {location}: {description}, " f"Temperature: {temp}°C, Min: {min_temp}°C, Max: {max_temp}°C, " f"Humidity: {humidity}%, Pressure: {pressure} hPa, " f"Wind Speed: {wind} m/s" ) @tool def add(a: int, b: int) -> int: """Adds two numbers together. Args: a (int): The first number. b (int): The second number. """ return a + b @tool def get_sum(list_of_numbers: list[int]) -> int: """Sums a list of numbers. Args: list_of_numbers (list[int]): The list of numbers to sum. """ return sum(list_of_numbers) @tool def subtract(a: int, b: int) -> int: """Subtracts the second number from the first. Args: a (int): The first number. b (int): The second number. """ return a - b @tool def multiply(a: int, b: int) -> int: """Multiplies two numbers together. Args: a (int): The first number. b (int): The second number. """ return a * b @tool def divide(a: int, b: int) -> float: """Divides the first number by the second. Args: a (int): The first number. b (int): The second number. """ if b == 0: raise ValueError("Cannot divide by zero.") return a / b @tool def get_current_time_and_date() -> str: """Returns the current time and date in ISO format.""" return datetime.now().isoformat() @tool def reverse_text(text: str) -> str: """Reverses the given text. Args: text (str): The text to reverse. """ return text[::-1] def build_retriever(text: str): """Builds a retriever from the given text. Args: text (str): The text to be used for retrieval. """ splitter = RecursiveCharacterTextSplitter( separators=["\n### ", "\n## ", "\n# "], chunk_size=1000, chunk_overlap=200, ) chunks = splitter.split_text(text) docs = [ Document(page_content=chunk) for chunk in chunks ] hf_embed = HuggingFaceEmbeddings( model_name="sentence-transformers/all-MiniLM-L6-v2" ) index = FAISS.from_documents(docs, hf_embed) return index.as_retriever(search_kwargs={"k": 3}) def get_retrieval_qa(text: str): """Creates a RetrievalQA instance for the given text. Args: text (str): The text to be used for retrieval. """ retriever = build_retriever(text) llm = init_chat_model("groq:meta-llama/llama-4-scout-17b-16e-instruct") return RetrievalQA.from_chain_type( llm=llm, chain_type="stuff", retriever=retriever, return_source_documents=True, ) def clean_html(html: str) -> str: soup = BeautifulSoup(html, "html.parser") # 1. Remove