|
|
import os |
|
|
import requests |
|
|
import gradio as gr |
|
|
from dotenv import load_dotenv |
|
|
import re |
|
|
from typing import Optional |
|
|
import base64 |
|
|
from time import sleep |
|
|
|
|
|
|
|
|
def get_book_recommendations(prompt: str) -> str: |
|
|
""" |
|
|
Fetches a book recommendation from Google Books API. |
|
|
Args: |
|
|
prompt: User search query, e.g., "thriller novel by Gillian Flynn" |
|
|
Returns: |
|
|
Top book recommendation in formatted text. |
|
|
""" |
|
|
load_dotenv() |
|
|
|
|
|
api_key = os.getenv("GOOGLE_BOOKS_API_KEY") |
|
|
if not api_key: |
|
|
return "API key not found." |
|
|
|
|
|
search_query = prompt.strip() |
|
|
url = f"https://www.googleapis.com/books/v1/volumes?q={search_query}&key={api_key}" |
|
|
|
|
|
try: |
|
|
response = requests.get(url, timeout=10) |
|
|
response.raise_for_status() |
|
|
books = response.json().get("items", []) |
|
|
if not books: |
|
|
return "No books found." |
|
|
|
|
|
volume_info = books[0].get("volumeInfo", {}) |
|
|
title = volume_info.get("title", "Unknown Title") |
|
|
authors = ", ".join(volume_info.get("authors", ["Unknown Author"])) |
|
|
description = volume_info.get("description", "No description available.") |
|
|
|
|
|
return f"π **{title}** by *{authors}*\n\n{description}" |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
def get_random_recipe(tags: str = "", exclude_ingredients: str = "") -> str: |
|
|
""" |
|
|
Fetches a random recipe using the Spoonacular API and returns it in a formatted string. |
|
|
|
|
|
Args: |
|
|
tags (str): Comma-separated tags like 'vegetarian,dessert'. |
|
|
exclude_ingredients (str): Comma-separated ingredients to exclude. |
|
|
|
|
|
Returns: |
|
|
str: A formatted string with title, preparation time, ingredients, and cooking steps. |
|
|
|
|
|
""" |
|
|
load_dotenv() |
|
|
|
|
|
recipe_api_key = os.getenv("SPOONACULAR_API_KEY") |
|
|
if not recipe_api_key: |
|
|
return "β API key not found in environment." |
|
|
|
|
|
url = "https://api.spoonacular.com/recipes/random" |
|
|
params = { |
|
|
"number": 1, |
|
|
"tags": tags, |
|
|
"excludeIngredients": exclude_ingredients, |
|
|
"apiKey": recipe_api_key |
|
|
} |
|
|
|
|
|
try: |
|
|
response = requests.get(url, params=params, timeout=10) |
|
|
data = response.json() |
|
|
except Exception: |
|
|
return "β Failed to parse JSON from API response." |
|
|
|
|
|
if response.status_code != 200: |
|
|
return f"β API Error: {data.get('message', 'Unknown error')}" |
|
|
|
|
|
if not data.get("recipes"): |
|
|
return "β No recipes returned." |
|
|
|
|
|
recipe = data["recipes"][0] |
|
|
|
|
|
title = recipe.get("title", "Unknown") |
|
|
ready_in = recipe.get("readyInMinutes", "?") |
|
|
ingredients = [i["name"] for i in recipe.get("extendedIngredients", [])] |
|
|
|
|
|
steps = [] |
|
|
for block in recipe.get("analyzedInstructions", []): |
|
|
for step in block.get("steps", []): |
|
|
steps.append(step["step"]) |
|
|
|
|
|
formatted = f"π½οΈ **Title**: {title}\n" |
|
|
formatted += f"π **Ready in**: {ready_in} minutes\n" |
|
|
formatted += f"π **Ingredients**: {', '.join(ingredients) if ingredients else 'N/A'}\n" |
|
|
formatted += "π¨βπ³ **Steps**:\n" |
|
|
for idx, step in enumerate(steps, 1): |
|
|
formatted += f" {idx}. {step.strip()}\n" |
|
|
|
|
|
return formatted.strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_now_playing_movies(retries=3) -> str: |
|
|
api_key = os.getenv("TMDB_API_KEY") |
|
|
url = f"https://api.themoviedb.org/3/movie/now_playing?api_key={api_key}&language=en-US&page=1" |
|
|
|
|
|
for attempt in range(1, retries + 1): |
|
|
try: |
|
|
response = requests.get(url, timeout=10) |
|
|
response.raise_for_status() |
|
|
movies = response.json().get("results", []) |
|
|
if not movies: |
|
|
return "No movies currently playing." |
|
|
|
|
|
formatted = [] |
|
|
for movie in movies[:5]: |
|
|
title = movie.get("title", "Untitled") |
|
|
overview = movie.get("overview", "No description available.") |
|
|
formatted.append(f"π¬ **{title}**\n{overview}\n") |
|
|
return "\n".join(formatted) |
|
|
|
|
|
except Exception as e: |
|
|
if attempt == retries: |
|
|
return f"β Failed after {retries} attempts: {e}" |
|
|
sleep(1) |
|
|
|
|
|
def get_music_recommendations(prompt: str, num_songs: int = 3, min_popularity: int = 0, year: str = "") -> str: |
|
|
""" |
|
|
Fetches multiple music recommendations from Spotify API. |
|
|
|
|
|
Args: |
|
|
prompt: Search query (genre, mood, etc.) |
|
|
num_songs: Number of song recommendations to return. |
|
|
min_popularity: Minimum popularity score (0β100). |
|
|
year: Optional release year filter, e.g., '2022' or '2010-2020'. |
|
|
|
|
|
Returns: |
|
|
Markdown-formatted string of top song recommendations. |
|
|
""" |
|
|
load_dotenv() |
|
|
client_id = os.getenv("SPOTIFY_CLIENT_ID") |
|
|
client_secret = os.getenv("SPOTIFY_CLIENT_SECRET") |
|
|
|
|
|
if not client_id or not client_secret: |
|
|
return "Spotify API credentials not found." |
|
|
|
|
|
|
|
|
auth_str = f"{client_id}:{client_secret}" |
|
|
b64_auth_str = base64.b64encode(auth_str.encode()).decode() |
|
|
|
|
|
try: |
|
|
token_response = requests.post( |
|
|
"https://accounts.spotify.com/api/token", |
|
|
data={"grant_type": "client_credentials"}, |
|
|
headers={"Authorization": f"Basic {b64_auth_str}"} |
|
|
) |
|
|
token_response.raise_for_status() |
|
|
access_token = token_response.json().get("access_token") |
|
|
except Exception as e: |
|
|
return f"Token Error: {str(e)}" |
|
|
|
|
|
try: |
|
|
|
|
|
search_query = prompt.strip() |
|
|
if year: |
|
|
search_query += f" year:{year}" |
|
|
|
|
|
url = f"https://api.spotify.com/v1/search?q={search_query}&type=track&limit={num_songs}" |
|
|
headers = {"Authorization": f"Bearer {access_token}"} |
|
|
|
|
|
response = requests.get(url, headers=headers, timeout=10) |
|
|
response.raise_for_status() |
|
|
tracks = response.json().get("tracks", {}).get("items", []) |
|
|
|
|
|
if not tracks: |
|
|
return "No songs found." |
|
|
|
|
|
|
|
|
filtered_tracks = [ |
|
|
track for track in tracks if track["popularity"] >= min_popularity |
|
|
] |
|
|
|
|
|
if not filtered_tracks: |
|
|
return f"No songs found with popularity β₯ {min_popularity}." |
|
|
|
|
|
output_lines = [] |
|
|
for track in filtered_tracks: |
|
|
name = track["name"] |
|
|
artist = ", ".join(artist["name"] for artist in track["artists"]) |
|
|
popularity = track["popularity"] |
|
|
spotify_url = track["external_urls"]["spotify"] |
|
|
preview_url = track.get("preview_url") |
|
|
|
|
|
entry = f"π΅ **{name}** by *{artist}* (Popularity: {popularity})\n[Listen on Spotify]({spotify_url})" |
|
|
if preview_url: |
|
|
entry += f"\nPreview: {preview_url}" |
|
|
output_lines.append(entry) |
|
|
|
|
|
return "\n\n---\n\n".join(output_lines) |
|
|
except Exception as e: |
|
|
return f"Error: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
def get_current_weather(location: str) -> str: |
|
|
""" |
|
|
Fetches current weather for a given city name or coordinates using OpenWeatherMap API. |
|
|
|
|
|
Args: |
|
|
location: A city name (e.g., "New York") or "lat,lon" format (e.g., "44.34,10.99") |
|
|
|
|
|
Returns: |
|
|
A formatted weather report string. |
|
|
""" |
|
|
load_dotenv() |
|
|
|
|
|
api_key = os.getenv("OPENWEATHER_API_KEY") |
|
|
if not api_key: |
|
|
return "OpenWeatherMap API key not found." |
|
|
|
|
|
try: |
|
|
|
|
|
if "," in location: |
|
|
lat, lon = map(str.strip, location.split(",")) |
|
|
else: |
|
|
|
|
|
geo_url = f"http://api.openweathermap.org/geo/1.0/direct?q={location}&limit=1&appid={api_key}" |
|
|
geo_resp = requests.get(geo_url, timeout=10) |
|
|
geo_resp.raise_for_status() |
|
|
geo_data = geo_resp.json() |
|
|
if not geo_data: |
|
|
return f"Could not find location: {location}" |
|
|
lat = geo_data[0]["lat"] |
|
|
lon = geo_data[0]["lon"] |
|
|
|
|
|
|
|
|
weather_url = f"https://api.openweathermap.org/data/2.5/weather?lat={lat}&lon={lon}&appid={api_key}&units=metric" |
|
|
weather_resp = requests.get(weather_url, timeout=10) |
|
|
weather_resp.raise_for_status() |
|
|
data = weather_resp.json() |
|
|
|
|
|
city = f"{data.get('name', 'Unknown')}, {data['sys'].get('country', '')}" |
|
|
condition = data["weather"][0]["description"].capitalize() |
|
|
temp = data["main"]["temp"] |
|
|
feels_like = data["main"]["feels_like"] |
|
|
humidity = data["main"]["humidity"] |
|
|
wind = data["wind"]["speed"] |
|
|
|
|
|
return ( |
|
|
f"π€ **Weather in {city}**\n" |
|
|
f"Condition: {condition}\n" |
|
|
f"π‘ Temperature: {temp}Β°C (Feels like {feels_like}Β°C)\n" |
|
|
f"π§ Humidity: {humidity}%\n" |
|
|
f"π¬ Wind Speed: {wind} m/s" |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error fetching weather: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
book_interface = gr.Interface( |
|
|
fn=get_book_recommendations, |
|
|
inputs=gr.Textbox(label="Enter your book query"), |
|
|
outputs=gr.Markdown(label="Top Recommendation"), |
|
|
api_name="get_book_recommendations" |
|
|
) |
|
|
|
|
|
recipe_interface = gr.Interface( |
|
|
fn=get_random_recipe, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Tags (e.g., vegetarian,dessert)", placeholder="Enter tags or leave blank"), |
|
|
gr.Textbox(label="Exclude Ingredients", placeholder="e.g., nuts,gluten") |
|
|
], |
|
|
outputs=gr.Markdown(label="Random Recipe"), |
|
|
api_name="get_random_recipe" |
|
|
) |
|
|
|
|
|
movie_interface = gr.Interface( |
|
|
fn=get_now_playing_movies, |
|
|
inputs=[], |
|
|
outputs=gr.Markdown(label="Now Playing Movies"), |
|
|
api_name="get_now_playing_movies" |
|
|
) |
|
|
|
|
|
music_interface = gr.Interface( |
|
|
fn=get_music_recommendations, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Enter your music query (e.g., 'lofi beats', 'happy pop')"), |
|
|
gr.Slider(minimum=1, maximum=10, value=3, label="Number of Songs"), |
|
|
gr.Slider(minimum=0, maximum=100, value=0, label="Minimum Popularity"), |
|
|
gr.Textbox(label="Release Year (e.g., 2022 or 2015-2020)", placeholder="Optional") |
|
|
], |
|
|
outputs=gr.Markdown(label="Song Recommendations"), |
|
|
api_name="get_music_recommendations" |
|
|
) |
|
|
|
|
|
weather_interface = gr.Interface( |
|
|
fn=get_current_weather, |
|
|
inputs=gr.Textbox(label="Enter a location (e.g., 'New York' or '44.34,10.99')"), |
|
|
outputs=gr.Markdown(label="Current Weather"), |
|
|
api_name="get_current_weather" |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Blocks(title="DailyPal: Your Daily Discovery Assistant") as demo: |
|
|
gr.Markdown(""" |
|
|
# π DailyPal |
|
|
Welcome to **DailyPal**, your all-in-one assistant to discover: |
|
|
|
|
|
- π Great books |
|
|
- π³ Random recipes |
|
|
- π¬ Movies now playing |
|
|
- π΅ Music recommendations |
|
|
- βοΈ Current weather updates |
|
|
|
|
|
--- |
|
|
βοΈ **How to use**: |
|
|
Just select a tab and enter your query or location. Get instant suggestions to brighten your day! |
|
|
""") |
|
|
|
|
|
gr.TabbedInterface( |
|
|
[ |
|
|
book_interface, |
|
|
recipe_interface, |
|
|
movie_interface, |
|
|
music_interface, |
|
|
weather_interface, |
|
|
|
|
|
], |
|
|
[ |
|
|
"Book Finder", |
|
|
"Random Recipe", |
|
|
"Now Playing Movies", |
|
|
"Music Recommendations", |
|
|
"Current Weather" |
|
|
] |
|
|
) |
|
|
gr.Markdown("**Example prompt using mcp client like claude desktop**: I am in new york, if the weather is good, then get me a recipe to grill outdoors, if not get me a book on mars to read, and also get me the songs by weekend to listen.") |
|
|
gr.Markdown("### π [Watch Demo Video](https://drive.google.com/file/d/1DV0Plrhdr7kAWtLRQS91BnVuvRElPbOm/view?usp=drive_link)") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(mcp_server=True) |