shukdevdattaEX's picture
Update app.py
8c4798d verified
raw
history blame
29.4 kB
import gradio as gr
import base64
import io
import os
from openai import OpenAI
import PyPDF2
from PIL import Image
import speech_recognition as sr
import tempfile
import cv2
import numpy as np
from typing import List, Tuple, Optional
import json
import pydub
from pydub import AudioSegment
class MultimodalChatbot:
def __init__(self, api_key: str):
self.client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key,
)
self.model = "google/gemma-3n-e2b-it:free"
self.conversation_history = []
def encode_image_to_base64(self, image) -> str:
"""Convert PIL Image to base64 string"""
try:
if isinstance(image, str):
# If it's a file path
with open(image, "rb") as img_file:
return base64.b64encode(img_file.read()).decode('utf-8')
else:
# If it's a PIL Image
buffered = io.BytesIO()
# Convert to RGB if it's RGBA
if image.mode == 'RGBA':
image = image.convert('RGB')
image.save(buffered, format="JPEG", quality=85)
return base64.b64encode(buffered.getvalue()).decode('utf-8')
except Exception as e:
return f"Error encoding image: {str(e)}"
def extract_pdf_text(self, pdf_file) -> str:
"""Extract text from PDF file"""
try:
if hasattr(pdf_file, 'name'):
# Gradio file object
pdf_path = pdf_file.name
else:
pdf_path = pdf_file
text = ""
with open(pdf_path, 'rb') as file:
pdf_reader = PyPDF2.PdfReader(file)
for page_num, page in enumerate(pdf_reader.pages):
page_text = page.extract_text()
if page_text.strip():
text += f"Page {page_num + 1}:\n{page_text}\n\n"
return text.strip() if text.strip() else "No text could be extracted from this PDF."
except Exception as e:
return f"Error extracting PDF: {str(e)}"
def convert_audio_to_wav(self, audio_file) -> str:
"""Convert audio file to WAV format for speech recognition"""
try:
if hasattr(audio_file, 'name'):
audio_path = audio_file.name
else:
audio_path = audio_file
# Get file extension
file_ext = os.path.splitext(audio_path)[1].lower()
# If already WAV, return as is
if file_ext == '.wav':
return audio_path
# Convert to WAV using pydub
audio = AudioSegment.from_file(audio_path)
# Export as WAV with proper settings for speech recognition
wav_path = tempfile.mktemp(suffix='.wav')
audio.export(wav_path, format="wav", parameters=["-ac", "1", "-ar", "16000"])
return wav_path
except Exception as e:
raise Exception(f"Error converting audio: {str(e)}")
def transcribe_audio(self, audio_file) -> str:
"""Transcribe audio file to text"""
try:
recognizer = sr.Recognizer()
# Convert audio to WAV format
wav_path = self.convert_audio_to_wav(audio_file)
with sr.AudioFile(wav_path) as source:
# Adjust for ambient noise
recognizer.adjust_for_ambient_noise(source, duration=0.2)
audio_data = recognizer.record(source)
# Try Google Speech Recognition
try:
text = recognizer.recognize_google(audio_data)
return text
except sr.UnknownValueError:
return "Could not understand the audio. Please try with clearer audio."
except sr.RequestError as e:
# Fallback to offline recognition if available
try:
text = recognizer.recognize_sphinx(audio_data)
return text
except:
return f"Speech recognition service error: {str(e)}"
except Exception as e:
return f"Error transcribing audio: {str(e)}"
def process_video(self, video_file) -> Tuple[List[str], str]:
"""Extract frames from video and convert to base64"""
try:
if hasattr(video_file, 'name'):
video_path = video_file.name
else:
video_path = video_file
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
return [], "Error: Could not open video file"
frames = []
frame_descriptions = []
frame_count = 0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
# Extract frames (every 60 frames or every 2 seconds)
frame_interval = max(60, int(fps * 2)) if fps > 0 else 60
while cap.read()[0] and len(frames) < 5: # Limit to 5 frames
ret, frame = cap.read()
if ret and frame_count % frame_interval == 0:
# Convert BGR to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
pil_image = Image.fromarray(rgb_frame)
# Resize image to reduce size
pil_image.thumbnail((800, 600), Image.Resampling.LANCZOS)
base64_frame = self.encode_image_to_base64(pil_image)
if not base64_frame.startswith("Error"):
frames.append(base64_frame)
timestamp = frame_count / fps if fps > 0 else frame_count
frame_descriptions.append(f"Frame at {timestamp:.1f}s")
frame_count += 1
cap.release()
description = f"Video processed: {len(frames)} frames extracted from {total_frames} total frames"
return frames, description
except Exception as e:
return [], f"Error processing video: {str(e)}"
def create_multimodal_message(self,
text_input: str = "",
pdf_file=None,
audio_file=None,
image_file=None,
video_file=None) -> dict:
"""Create a multimodal message for the API"""
content_parts = []
processing_info = []
# Add text content
if text_input:
content_parts.append({"type": "text", "text": text_input})
# Process PDF
if pdf_file is not None:
pdf_text = self.extract_pdf_text(pdf_file)
content_parts.append({
"type": "text",
"text": f"PDF Content:\n{pdf_text}"
})
processing_info.append("πŸ“„ PDF processed")
# Process Audio
if audio_file is not None:
audio_text = self.transcribe_audio(audio_file)
content_parts.append({
"type": "text",
"text": f"Audio Transcription:\n{audio_text}"
})
processing_info.append("🎀 Audio transcribed")
# Process Image - Use text-only approach since vision isn't supported
if image_file is not None:
# Since vision isn't supported, we'll describe what we can about the image
if hasattr(image_file, 'size'):
width, height = image_file.size
mode = image_file.mode
content_parts.append({
"type": "text",
"text": f"Image uploaded: {width}x{height} pixels, mode: {mode}. Note: Visual analysis not available with current model configuration."
})
else:
content_parts.append({
"type": "text",
"text": "Image uploaded. Note: Visual analysis not available with current model configuration."
})
processing_info.append("πŸ–ΌοΈ Image received (metadata only)")
# Process Video - Use text-only approach since vision isn't supported
if video_file is not None:
frames, video_desc = self.process_video(video_file)
content_parts.append({
"type": "text",
"text": f"Video uploaded: {video_desc}. Note: Visual analysis not available with current model configuration."
})
processing_info.append("πŸŽ₯ Video processed (metadata only)")
return {"role": "user", "content": content_parts}, processing_info
def chat(self,
text_input: str = "",
pdf_file=None,
audio_file=None,
image_file=None,
video_file=None,
history: List[Tuple[str, str]] = None) -> Tuple[List[Tuple[str, str]], str]:
"""Main chat function"""
if history is None:
history = []
try:
# Create user message summary for display
user_message_parts = []
if text_input:
user_message_parts.append(f"Text: {text_input}")
if pdf_file:
user_message_parts.append("πŸ“„ PDF uploaded")
if audio_file:
user_message_parts.append("🎀 Audio uploaded")
if image_file:
user_message_parts.append("πŸ–ΌοΈ Image uploaded")
if video_file:
user_message_parts.append("πŸŽ₯ Video uploaded")
user_display = " | ".join(user_message_parts)
# Create multimodal message
user_message, processing_info = self.create_multimodal_message(
text_input, pdf_file, audio_file, image_file, video_file
)
# Add processing info to display
if processing_info:
user_display += f"\n{' | '.join(processing_info)}"
# Add to conversation history
messages = [user_message]
# Get response from Gemma
completion = self.client.chat.completions.create(
extra_headers={
"HTTP-Referer": "https://multimodal-chatbot.local",
"X-Title": "Multimodal Chatbot",
},
model=self.model,
messages=messages,
max_tokens=2048,
temperature=0.7
)
bot_response = completion.choices[0].message.content
# Update history
history.append((user_display, bot_response))
return history, ""
except Exception as e:
error_msg = f"Error: {str(e)}"
history.append((user_display if 'user_display' in locals() else "Error in input", error_msg))
return history, ""
def create_interface():
"""Create the Gradio interface"""
with gr.Blocks(title="Multimodal Chatbot with Gemma 3n", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# πŸ€– Multimodal Chatbot with Gemma 3n
This chatbot can process multiple types of input:
- **Text**: Regular text messages
- **PDF**: Extract and analyze document content
- **Audio**: Transcribe speech to text (supports WAV, MP3, M4A, FLAC)
- **Images**: Upload images (metadata analysis only due to model limitations)
- **Video**: Upload videos (metadata analysis only due to model limitations)
**Setup**: Enter your OpenRouter API key below to get started
""")
# API Key Input Section
with gr.Row():
with gr.Column():
api_key_input = gr.Textbox(
label="πŸ”‘ OpenRouter API Key",
placeholder="Enter your OpenRouter API key here...",
type="password",
info="Your API key is not stored and only used for this session"
)
api_status = gr.Textbox(
label="Connection Status",
value="❌ API Key not provided",
interactive=False
)
# Tabbed Interface
with gr.Tabs():
# Text Chat Tab
with gr.TabItem("πŸ’¬ Text Chat"):
with gr.Row():
with gr.Column(scale=1):
text_input = gr.Textbox(
label="πŸ’¬ Text Input",
placeholder="Type your message here...",
lines=5
)
text_submit_btn = gr.Button("πŸš€ Send", variant="primary", size="lg", interactive=False)
text_clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Column(scale=2):
text_chatbot = gr.Chatbot(
label="Text Chat History",
height=600,
bubble_full_width=False,
show_copy_button=True
)
# PDF Chat Tab
with gr.TabItem("πŸ“„ PDF Chat"):
with gr.Row():
with gr.Column(scale=1):
pdf_input = gr.File(
label="πŸ“„ PDF Upload",
file_types=[".pdf"],
type="filepath"
)
pdf_text_input = gr.Textbox(
label="πŸ’¬ Question about PDF",
placeholder="Ask something about the PDF...",
lines=3
)
pdf_submit_btn = gr.Button("πŸš€ Send", variant="primary", size="lg", interactive=False)
pdf_clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Column(scale=2):
pdf_chatbot = gr.Chatbot(
label="PDF Chat History",
height=600,
bubble_full_width=False,
show_copy_button=True
)
# Audio Chat Tab
with gr.TabItem("🎀 Audio Chat"):
with gr.Row():
with gr.Column(scale=1):
audio_input = gr.File(
label="🎀 Audio Upload",
file_types=[".wav", ".mp3", ".m4a", ".flac", ".ogg"],
type="filepath"
)
audio_text_input = gr.Textbox(
label="πŸ’¬ Question about Audio",
placeholder="Ask something about the audio...",
lines=3
)
audio_submit_btn = gr.Button("πŸš€ Send", variant="primary", size="lg", interactive=False)
audio_clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Column(scale=2):
audio_chatbot = gr.Chatbot(
label="Audio Chat History",
height=600,
bubble_full_width=False,
show_copy_button=True
)
# Image Chat Tab
with gr.TabItem("πŸ–ΌοΈ Image Chat"):
with gr.Row():
with gr.Column(scale=1):
image_input = gr.Image(
label="πŸ–ΌοΈ Image Upload",
type="pil"
)
image_text_input = gr.Textbox(
label="πŸ’¬ Question about Image",
placeholder="Ask something about the image...",
lines=3
)
image_submit_btn = gr.Button("πŸš€ Send", variant="primary", size="lg", interactive=False)
image_clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Column(scale=2):
image_chatbot = gr.Chatbot(
label="Image Chat History",
height=600,
bubble_full_width=False,
show_copy_button=True
)
# Video Chat Tab
with gr.TabItem("πŸŽ₯ Video Chat"):
with gr.Row():
with gr.Column(scale=1):
video_input = gr.File(
label="πŸŽ₯ Video Upload",
file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"],
type="filepath"
)
video_text_input = gr.Textbox(
label="πŸ’¬ Question about Video",
placeholder="Ask something about the video...",
lines=3
)
video_submit_btn = gr.Button("πŸš€ Send", variant="primary", size="lg", interactive=False)
video_clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Column(scale=2):
video_chatbot = gr.Chatbot(
label="Video Chat History",
height=600,
bubble_full_width=False,
show_copy_button=True
)
# Combined Chat Tab
with gr.TabItem("🌟 Combined Chat"):
with gr.Row():
with gr.Column(scale=1):
combined_text_input = gr.Textbox(
label="πŸ’¬ Text Input",
placeholder="Type your message here...",
lines=3
)
combined_pdf_input = gr.File(
label="πŸ“„ PDF Upload",
file_types=[".pdf"],
type="filepath"
)
combined_audio_input = gr.File(
label="🎀 Audio Upload",
file_types=[".wav", ".mp3", ".m4a", ".flac", ".ogg"],
type="filepath"
)
combined_image_input = gr.Image(
label="πŸ–ΌοΈ Image Upload",
type="pil"
)
combined_video_input = gr.File(
label="πŸŽ₯ Video Upload",
file_types=[".mp4", ".avi", ".mov", ".mkv", ".webm"],
type="filepath"
)
combined_submit_btn = gr.Button("πŸš€ Send All", variant="primary", size="lg", interactive=False)
combined_clear_btn = gr.Button("πŸ—‘οΈ Clear All", variant="secondary")
with gr.Column(scale=2):
combined_chatbot = gr.Chatbot(
label="Combined Chat History",
height=600,
bubble_full_width=False,
show_copy_button=True
)
# Event handlers
def validate_api_key(api_key):
if not api_key or len(api_key.strip()) == 0:
return "❌ API Key not provided", *[gr.update(interactive=False) for _ in range(6)]
try:
# Test the API key by creating a client
test_client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=api_key.strip(),
)
return "βœ… API Key validated successfully", *[gr.update(interactive=True) for _ in range(6)]
except Exception as e:
return f"❌ API Key validation failed: {str(e)}", *[gr.update(interactive=False) for _ in range(6)]
def process_text_input(api_key, text, history):
if not api_key or len(api_key.strip()) == 0:
if history is None:
history = []
history.append(("Error", "❌ Please provide a valid API key first"))
return history, ""
chatbot = MultimodalChatbot(api_key.strip())
return chatbot.chat(text_input=text, history=history)
def process_pdf_input(api_key, pdf, text, history):
if not api_key or len(api_key.strip()) == 0:
if history is None:
history = []
history.append(("Error", "❌ Please provide a valid API key first"))
return history, ""
chatbot = MultimodalChatbot(api_key.strip())
return chatbot.chat(text_input=text, pdf_file=pdf, history=history)
def process_audio_input(api_key, audio, text, history):
if not api_key or len(api_key.strip()) == 0:
if history is None:
history = []
history.append(("Error", "❌ Please provide a valid API key first"))
return history, ""
chatbot = MultimodalChatbot(api_key.strip())
return chatbot.chat(text_input=text, audio_file=audio, history=history)
def process_image_input(api_key, image, text, history):
if not api_key or len(api_key.strip()) == 0:
if history is None:
history = []
history.append(("Error", "❌ Please provide a valid API key first"))
return history, ""
chatbot = MultimodalChatbot(api_key.strip())
return chatbot.chat(text_input=text, image_file=image, history=history)
def process_video_input(api_key, video, text, history):
if not api_key or len(api_key.strip()) == 0:
if history is None:
history = []
history.append(("Error", "❌ Please provide a valid API key first"))
return history, ""
chatbot = MultimodalChatbot(api_key.strip())
return chatbot.chat(text_input=text, video_file=video, history=history)
def process_combined_input(api_key, text, pdf, audio, image, video, history):
if not api_key or len(api_key.strip()) == 0:
if history is None:
history = []
history.append(("Error", "❌ Please provide a valid API key first"))
return history, ""
chatbot = MultimodalChatbot(api_key.strip())
return chatbot.chat(text, pdf, audio, image, video, history)
def clear_chat():
return [], ""
def clear_all_inputs():
return [], "", None, None, None, None
# API Key validation
api_key_input.change(
validate_api_key,
inputs=[api_key_input],
outputs=[api_status, text_submit_btn, pdf_submit_btn, audio_submit_btn,
image_submit_btn, video_submit_btn, combined_submit_btn]
)
# Text chat events
text_submit_btn.click(
process_text_input,
inputs=[api_key_input, text_input, text_chatbot],
outputs=[text_chatbot, text_input]
)
text_input.submit(
process_text_input,
inputs=[api_key_input, text_input, text_chatbot],
outputs=[text_chatbot, text_input]
)
text_clear_btn.click(clear_chat, outputs=[text_chatbot, text_input])
# PDF chat events
pdf_submit_btn.click(
process_pdf_input,
inputs=[api_key_input, pdf_input, pdf_text_input, pdf_chatbot],
outputs=[pdf_chatbot, pdf_text_input]
)
pdf_clear_btn.click(lambda: ([], "", None), outputs=[pdf_chatbot, pdf_text_input, pdf_input])
# Audio chat events
audio_submit_btn.click(
process_audio_input,
inputs=[api_key_input, audio_input, audio_text_input, audio_chatbot],
outputs=[audio_chatbot, audio_text_input]
)
audio_clear_btn.click(lambda: ([], "", None), outputs=[audio_chatbot, audio_text_input, audio_input])
# Image chat events
image_submit_btn.click(
process_image_input,
inputs=[api_key_input, image_input, image_text_input, image_chatbot],
outputs=[image_chatbot, image_text_input]
)
image_clear_btn.click(lambda: ([], "", None), outputs=[image_chatbot, image_text_input, image_input])
# Video chat events
video_submit_btn.click(
process_video_input,
inputs=[api_key_input, video_input, video_text_input, video_chatbot],
outputs=[video_chatbot, video_text_input]
)
video_clear_btn.click(lambda: ([], "", None), outputs=[video_chatbot, video_text_input, video_input])
# Combined chat events
combined_submit_btn.click(
process_combined_input,
inputs=[api_key_input, combined_text_input, combined_pdf_input,
combined_audio_input, combined_image_input, combined_video_input, combined_chatbot],
outputs=[combined_chatbot, combined_text_input]
)
combined_clear_btn.click(clear_all_inputs,
outputs=[combined_chatbot, combined_text_input, combined_pdf_input,
combined_audio_input, combined_image_input, combined_video_input])
# Examples and Instructions
gr.Markdown("""
### 🎯 How to Use Each Tab:
**πŸ’¬ Text Chat**: Simple text conversations with the AI
**πŸ“„ PDF Chat**: Upload a PDF and ask questions about its content
**🎀 Audio Chat**: Upload audio files for transcription and analysis
- Supports: WAV, MP3, M4A, FLAC, OGG formats
- Best results with clear speech and minimal background noise
**πŸ–ΌοΈ Image Chat**: Upload images (currently metadata only due to model limitations)
**πŸŽ₯ Video Chat**: Upload videos (currently metadata only due to model limitations)
**🌟 Combined Chat**: Use multiple input types together for comprehensive analysis
### πŸ”‘ Getting an API Key:
1. Go to [OpenRouter.ai](https://openrouter.ai)
2. Sign up for an account
3. Navigate to the API Keys section
4. Create a new API key
5. Copy and paste it in the field above
### ⚠️ Current Limitations:
- Image and video visual analysis not supported by the free Gemma 3n model
- Audio transcription requires internet connection for best results
- Large files may take longer to process
""")
return demo
if __name__ == "__main__":
# Required packages (install with pip):
required_packages = [
"gradio",
"openai",
"PyPDF2",
"Pillow",
"SpeechRecognition",
"opencv-python",
"numpy",
"pydub"
]
print("πŸš€ Multimodal Chatbot with Gemma 3n")
print("=" * 50)
print("Required packages:", ", ".join(required_packages))
print("\nπŸ“¦ To install: pip install " + " ".join(required_packages))
print("\n🎀 For audio processing, you may also need:")
print(" - ffmpeg (for audio conversion)")
print(" - sudo apt-get install espeak espeak-data libespeak1 libespeak-dev (for offline speech recognition)")
print("\nπŸ”‘ Get your API key from: https://openrouter.ai")
print("πŸ’‘ Enter your API key in the web interface when it loads")
demo = create_interface()
demo.launch(
share=True
)