seinfeld / app.py
warshanks
Refac
a2f0cd0
import gradio as gr
import asyncio
import threading
import os
from io import BytesIO
from dotenv import load_dotenv
from google import genai
from google.genai.types import Part, FileData, Tool, GenerateContentConfig, GoogleSearch, Content
# Import Discord functionality
import discord
from discord import app_commands
from discord.ext import commands
load_dotenv()
# Environment variables
GOOGLE_KEY = os.getenv("GOOGLE_KEY")
DISCORD_TOKEN = os.getenv("DISCORD_TOKEN")
CHANNEL_ID = os.getenv("CHANNEL_ID")
ADDITIONAL_CHANNELS = os.getenv("ADDITIONAL_CHANNELS", "")
# Parse channel IDs for Discord bot
TARGET_CHANNEL_IDS = []
if CHANNEL_ID:
TARGET_CHANNEL_IDS.append(int(CHANNEL_ID))
if ADDITIONAL_CHANNELS:
ADDITIONAL_IDS = [int(channel_id.strip()) for channel_id in ADDITIONAL_CHANNELS.split(",") if channel_id.strip()]
TARGET_CHANNEL_IDS.extend(ADDITIONAL_IDS)
# Model configuration - centralized model definitions
MODEL_DEFINITIONS = {
"flash": "gemini-2.0-flash",
"pro": "gemini-2.5-pro-preview-05-06",
"image": "imagen-3.0-generate-002"
}
# Default models
chat_model_id = MODEL_DEFINITIONS["flash"] # Default to flash model
image_model_id = MODEL_DEFINITIONS["image"]
# Initialize Google client
google_client = None
if GOOGLE_KEY:
google_client = genai.Client(api_key=GOOGLE_KEY)
# Default system instruction (fallback if environment variable not set)
DEFAULT_SYSTEM_INSTRUCTION = ""
# Get system instruction from environment variable or use default
SYSTEM_INSTRUCTION = os.getenv("SYSTEM_INSTRUCTION", DEFAULT_SYSTEM_INSTRUCTION)
def respond_with_gemini(message, history):
"""Generate response using Google Gemini API with Seinfeld personality"""
if not google_client:
return "I need a Google API key to work! Set the GOOGLE_KEY environment variable."
try:
# Format history for Gemini API
formatted_history = []
for user_msg, assistant_msg in history:
if user_msg:
formatted_history.append(Content(role="user", parts=[Part(text=user_msg)]))
if assistant_msg:
formatted_history.append(Content(role="model", parts=[Part(text=assistant_msg)]))
# Initialize Google Search tool
google_search_tool = Tool(google_search=GoogleSearch())
# Create chat
chat = google_client.chats.create(
model=chat_model_id,
history=formatted_history,
config=GenerateContentConfig(
system_instruction=SYSTEM_INSTRUCTION,
tools=[google_search_tool],
response_modalities=["TEXT"]
)
)
# Send message and get response
response = chat.send_message(message)
return response.text
except Exception as e:
print(f"Error with Gemini API: {e}")
# Fallback to a Seinfeld-style response
return f"What's the deal with API errors? I mean, you type something in, the computer thinks about it, and then... nothing! It's like asking your friend a question and they just stare at you. 'Hey, how are you?' *silence* 'Hello?' *more silence* It's the digital equivalent of being ignored at a party!"
def respond_gradio(message, history: list[tuple[str, str]]):
"""Response function for Gradio interface"""
# Use Gemini with Seinfeld personality and default parameters
response = respond_with_gemini(message, history)
# Stream the response character by character for better UX
partial_response = ""
for char in response:
partial_response += char
yield partial_response
async def keep_typing(channel):
"""Continuously show the typing indicator until the task is cancelled."""
print(f"Starting typing indicator in channel {channel.id}")
try:
while True:
async with channel.typing():
await asyncio.sleep(5)
except asyncio.CancelledError:
print(f"Typing indicator cancelled for channel {channel.id}")
pass
except Exception as e:
print(f"Error in keep_typing: {type(e).__name__}: {str(e)}")
async def generate_image_bytes(prompt, google_client, image_model_id):
"""Generate an image using Gemini API and return the image bytes."""
try:
# Run image generation in a separate thread to avoid blocking the event loop
def generate_image():
response = google_client.models.generate_images(
model=image_model_id,
prompt=prompt,
config=genai.types.GenerateImagesConfig(
number_of_images=1,
aspect_ratio="16:9"
)
)
return response
# Run the API call in a separate thread
response = await asyncio.to_thread(generate_image)
# Return the image bytes directly
for generated_image in response.generated_images:
return generated_image.image.image_bytes
# If we get here, no images were generated
print("ERROR: No images were generated in the response")
raise Exception("No image was generated in the response")
except Exception as e:
print(f"Exception in image generation: {type(e).__name__}: {str(e)}")
raise
async def handle_image_request(message, query, google_client, image_model_id):
"""Handle image generation requests from text messages."""
if query.lower().startswith("generate image:") or query.lower().startswith("create image:"):
# Start continuous typing in the background
typing_task = asyncio.create_task(keep_typing(message.channel))
try:
prompt = query.split(":", 1)[1].strip()
try:
print(f"Generating image for prompt: {prompt[:30]}...")
image_bytes = await generate_image_bytes(prompt, google_client, image_model_id)
# Cancel typing before sending the response
typing_task.cancel()
# Send image directly from bytes without saving to disk
await message.reply(f"Here's your image:", file=discord.File(BytesIO(image_bytes), filename="generated_image.png"))
except Exception as e:
print(f"Error generating image: {e}")
# Cancel typing before sending the response
typing_task.cancel()
await message.reply("Sorry, I couldn't generate that image.")
except Exception as e:
# Make sure to cancel the typing task even if an error occurs
typing_task.cancel()
print(f"Exception during image generation: {e}")
raise e
return True
return False
# Discord Bot Setup
discord_bot = None
async def setup_discord_bot():
"""Setup and run Discord bot"""
if not DISCORD_TOKEN or not TARGET_CHANNEL_IDS:
print("Discord bot disabled: Missing DISCORD_TOKEN or channel IDs")
return
global discord_bot
# Initialize Discord bot
intents = discord.Intents.default()
intents.message_content = True
discord_bot = commands.Bot(command_prefix="~", intents=intents)
@discord_bot.event
async def on_ready():
print(f"Discord bot logged in as {discord_bot.user}")
try:
synced = await discord_bot.tree.sync()
print(f"Synced {len(synced)} command(s)")
except Exception as e:
print(f"Failed to sync commands: {e}")
@discord_bot.event
async def on_message(message):
await discord_bot.process_commands(message)
if message.channel.id in TARGET_CHANNEL_IDS:
if message.author == discord_bot.user:
return
if message.content.startswith('!') or message.content.startswith('~'):
return
if message.content.strip() == "":
return
# Check if this is an image generation request first
if await handle_image_request(message, message.content, google_client, image_model_id):
return
# Show typing indicator
async with message.channel.typing():
# Get response using the same function as Gradio
response = respond_with_gemini(message.content, [])
# Split long responses
if len(response) > 2000:
# Split by sentences to preserve formatting
sentences = response.split('. ')
current_msg = ""
for sentence in sentences:
if len(current_msg + sentence + '. ') > 1900:
if current_msg:
await message.reply(current_msg.strip())
current_msg = sentence + '. '
else:
# Single sentence too long, just send it
await message.reply(sentence[:1900] + "...")
current_msg = ""
else:
current_msg += sentence + '. '
if current_msg:
await message.channel.send(current_msg.strip())
else:
await message.reply(response)
@discord_bot.tree.command(name="model")
@app_commands.describe(new_model_id="New model ID to use for Gemini API or shorthand ('flash', 'pro')")
async def change_model(interaction: discord.Interaction, new_model_id: str):
"""Changes the Gemini chat model being used."""
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message("Only administrators can change the model.", ephemeral=True)
return
global chat_model_id
# Use centralized model definitions
actual_model_id = MODEL_DEFINITIONS.get(new_model_id.lower(), new_model_id)
old_model = chat_model_id
chat_model_id = actual_model_id
await interaction.response.send_message(f"Chat model changed from `{old_model}` to `{actual_model_id}`", ephemeral=True)
@discord_bot.tree.command(name="image")
@app_commands.describe(prompt="Description of the image you want to generate")
async def generate_image_command(interaction: discord.Interaction, prompt: str):
"""Generates an image using Gemini API based on the provided prompt."""
await interaction.response.defer(thinking=True)
try:
image_bytes = await generate_image_bytes(prompt, google_client, image_model_id)
await interaction.followup.send(f"Generated image based on: {prompt}", file=discord.File(BytesIO(image_bytes), filename="generated_image.png"))
except Exception as e:
print(f"Error generating image: {e}")
await interaction.followup.send("Sorry, I couldn't generate that image.")
# Run the bot
await discord_bot.start(DISCORD_TOKEN)
def run_discord_bot():
"""Run Discord bot in separate thread"""
try:
asyncio.run(setup_discord_bot())
except Exception as e:
print(f"Discord bot error: {e}")
# Create Gradio interface
demo = gr.ChatInterface(
respond_gradio,
title="🥨 Seinfeld Chatbot",
description="Chat with Jerry Seinfeld! What's the deal with chatbots anyway?",
examples=[
["What's the deal with airplane food?"],
["Why do people say 'after dark' when it's really after light?"],
["What's up with people who take forever to order at restaurants?"],
["Why do we park in driveways and drive on parkways?"],
["Tell me about the soup nazi"],
["What's your take on people who don't return shopping carts?"],
],
cache_examples=True,
)
if __name__ == "__main__":
# Start Discord bot in separate thread if credentials are available
if DISCORD_TOKEN and TARGET_CHANNEL_IDS:
discord_thread = threading.Thread(target=run_discord_bot, daemon=True)
discord_thread.start()
print("Discord bot starting in background...")
else:
print("Discord bot disabled: Missing credentials or channel IDs")
# Launch Gradio interface
demo.launch()