Spaces:
Sleeping
Sleeping
import os, uuid, requests, gradio as gr, pdfplumber, docx, pandas as pd | |
from PIL import Image | |
import whisper | |
GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
GROQ_API_URL = "https://api.groq.com/openai/v1/chat/completions" | |
MODEL_NAME = "llama3-8b-8192" | |
def extract_text_from_file(file): | |
if file is None: return "" | |
ext = os.path.splitext(file.name)[-1].lower() | |
try: | |
if ext == ".pdf": | |
with pdfplumber.open(file.name) as pdf: | |
return "\n".join([page.extract_text() or "" for page in pdf.pages]) | |
elif ext == ".docx": | |
return "\n".join([p.text for p in docx.Document(file.name).paragraphs]) | |
elif ext in [".xls", ".xlsx", ".csv"]: | |
df = pd.read_excel(file.name) if ext != ".csv" else pd.read_csv(file.name) | |
return df.to_string(index=False) | |
elif ext.endswith((".png", ".jpg", ".jpeg", ".bmp")): | |
return "Image uploaded. You can ask about it." | |
elif ext.endswith((".txt", ".py", ".json", ".html", ".md", ".css", ".js")): | |
with open(file.name, encoding="utf-8", errors="ignore") as f: | |
return f.read() | |
except Exception as e: | |
return f"File extraction failed: {e}" | |
def transcribe_audio(audio_path): | |
try: | |
if not audio_path: return "" | |
model = whisper.load_model("base") | |
result = model.transcribe(audio_path) | |
return result["text"] | |
except Exception as e: | |
return f"Audio transcription failed: {e}" | |
def chat_response(history, message): | |
try: | |
headers = { | |
"Authorization": f"Bearer {GROQ_API_KEY}", | |
"Content-Type": "application/json" | |
} | |
messages = [{"role": "system", "content": "You are Neobot, a helpful assistant."}] | |
for role, content in history: | |
messages.append({"role": role, "content": content}) | |
messages.append({"role": "user", "content": message}) | |
payload = { | |
"model": MODEL_NAME, | |
"messages": messages | |
} | |
response = requests.post(GROQ_API_URL, headers=headers, json=payload) | |
response.raise_for_status() | |
return response.json()["choices"][0]["message"]["content"].strip() | |
except requests.exceptions.HTTPError as http_err: | |
return f"HTTP error: {http_err}\nResponse: {response.text}" | |
except Exception as e: | |
return f"Groq response failed: {e}" | |
def format_for_chatbot(history): | |
return [{"role": role, "content": content} for role, content in history] | |
def process_input(msg, file, audio, history): | |
file_text = extract_text_from_file(file) if file else "" | |
audio_text = transcribe_audio(audio) if audio else "" | |
full_input = "\n".join([i for i in [msg, file_text, audio_text] if i]) | |
if not full_input.strip(): | |
return history, gr.Textbox.update(value=""), None, None, gr.Textbox.update(value="β οΈ No input provided."), gr.update(value=format_for_chatbot(history)) | |
reply = chat_response(history, full_input) | |
history.append(("user", msg.strip() or file_text or audio_text)) | |
history.append(("assistant", reply)) | |
status_text = "β Response generated." if "failed" not in reply.lower() else f"β οΈ Error: {reply}" | |
return history, gr.Textbox.update(value=""), None, None, gr.Textbox.update(value=status_text), gr.update(value=format_for_chatbot(history)) | |
def save_chat_history(history): | |
try: | |
os.makedirs("saved_chats", exist_ok=True) | |
title = history[-2][1][:30].replace(" ", "_") if len(history) >= 2 else str(uuid.uuid4()) | |
filename = f"saved_chats/{title}.txt" | |
with open(filename, "w", encoding="utf-8") as f: | |
for role, text in history: | |
f.write(f"{role}: {text}\n") | |
return gr.Textbox.update(value="Chat saved successfully.") | |
except Exception as e: | |
return gr.Textbox.update(value=f"Failed to save chat: {e}") | |
def get_saved_chats(): | |
if not os.path.exists("saved_chats"): return [] | |
return sorted([f for f in os.listdir("saved_chats") if f.endswith(".txt")]) | |
def load_selected_chat(filename): | |
filepath = os.path.join("saved_chats", filename) | |
if not os.path.exists(filepath): return [], gr.Textbox.update(value="File not found.") | |
history = [] | |
with open(filepath, "r", encoding="utf-8") as f: | |
for line in f: | |
if ":" in line: | |
role, content = line.strip().split(":", 1) | |
history.append((role.strip(), content.strip())) | |
return history, gr.Textbox.update(value=f"Chat '{filename}' loaded.") | |
def clear_chat(): return [], gr.Textbox.update(value="π§Ή Chat cleared. Start fresh!") | |
custom_css = """ | |
.gradio-container { max-width: 1200px !important; margin: auto !important; } | |
select { appearance: auto !important; } | |
""" | |
with gr.Blocks(css=custom_css, title="Neobot - Chatbot") as demo: | |
gr.Markdown("# π€ Neobot - Advanced AI Chatbot") | |
chatbot = gr.Chatbot(label="Chat History", height=500, show_copy_button=True, type="messages") | |
message_input = gr.Textbox(placeholder="Type your message or upload a file...", scale=4) | |
send_button = gr.Button("Send π€", scale=1) | |
file_upload = gr.File(label="π Upload File", file_types=[".pdf", ".docx", ".txt", ".xlsx", ".xls", ".csv", ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".py", ".js", ".html", ".css", ".json", ".md"]) | |
audio_upload = gr.Audio(label="π€ Upload Audio", type="filepath") | |
save_button = gr.Button("πΎ Save Chat") | |
clear_button = gr.Button("π New Chat") | |
refresh_button = gr.Button("π Refresh List") | |
saved_chats_dropdown = gr.Dropdown(label="π Load Saved Chat", choices=get_saved_chats(), type="value") | |
load_button = gr.Button("π Load Chat") | |
status_message = gr.Textbox(label="Status", interactive=False) | |
chat_history = gr.State([]) | |
send_button.click(process_input, [message_input, file_upload, audio_upload, chat_history], | |
[chat_history, message_input, file_upload, audio_upload, status_message, chatbot]) | |
message_input.submit(process_input, [message_input, file_upload, audio_upload, chat_history], | |
[chat_history, message_input, file_upload, audio_upload, status_message, chatbot]) | |
save_button.click(save_chat_history, [chat_history], [status_message]) \ | |
.then(lambda: gr.Dropdown.update(choices=get_saved_chats()), None, [saved_chats_dropdown]) | |
clear_button.click(clear_chat, None, [chat_history, status_message]) \ | |
.then(lambda: [], None, [chatbot]) | |
refresh_button.click(lambda: gr.Dropdown.update(choices=get_saved_chats()), None, [saved_chats_dropdown]) | |
load_button.click(load_selected_chat, [saved_chats_dropdown], [chat_history, status_message]) \ | |
.then(lambda h: format_for_chatbot(h), [chat_history], [chatbot]) | |
# β Launch your app and confirm it's running | |
demo.launch() | |
print("β Neobot is now live! Listening for messages...") |