Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
import json, re | |
import openai | |
from datetime import datetime | |
import pdfplumber, docx, pandas as pd | |
from PIL import Image | |
openai.api_key = os.environ.get("GROQ_API_KEY") | |
openai.api_base = "https://api.groq.com/openai/v1" | |
# Chat with Groq | |
def chat_with_groq(message, state): | |
if state is None: | |
state = {"oai_history": [], "chatbot_ui": []} | |
oai_history = state["oai_history"] | |
chatbot_ui = state["chatbot_ui"] | |
if not oai_history: | |
oai_history.append({"role": "system", "content": "You are Neobot π€ β helpful and concise."}) | |
oai_history.append({"role": "user", "content": message}) | |
try: | |
response = openai.ChatCompletion.create( | |
model="llama3-70b-8192", | |
messages=oai_history | |
) | |
reply = response["choices"][0]["message"]["content"] | |
except Exception as e: | |
reply = f"β Error: {e}" | |
oai_history.append({"role": "assistant", "content": reply}) | |
chatbot_ui.append({"role": "user", "content": message}) | |
chatbot_ui.append({"role": "assistant", "content": reply}) | |
return "", chatbot_ui, {"oai_history": oai_history, "chatbot_ui": chatbot_ui} | |
# Save chat | |
def save_session(state): | |
if not state or not state.get("oai_history"): | |
return "β No chat to save" | |
oai_history = state["oai_history"] | |
prompt = next((m["content"] for m in oai_history if m["role"] == "user"), "chat") | |
title = re.sub(r"[^\w\s]", "", prompt).strip() | |
title = " ".join(title.split()[:6]) or "Chat" | |
timestamp = datetime.now().strftime("%b %d %Y %H-%M") | |
filename = f"{title} - {timestamp}.json" | |
with open(filename, "w", encoding="utf-8") as f: | |
json.dump(oai_history, f, indent=2, ensure_ascii=False) | |
return f"β Chat saved as: {filename[:-5]}" | |
def list_saved_files(): | |
return sorted([f[:-5] for f in os.listdir() if f.endswith(".json")]) | |
def load_chat(name): | |
filename = f"{name}.json" | |
try: | |
with open(filename, "r", encoding="utf-8") as f: | |
oai_history = json.load(f) | |
chatbot_ui = [] | |
for m in oai_history: | |
if m["role"] in ["user", "assistant"]: | |
chatbot_ui.append({"role": m["role"], "content": m["content"]}) | |
return chatbot_ui, {"oai_history": oai_history, "chatbot_ui": chatbot_ui}, f"β Loaded {name}" | |
except Exception as e: | |
return [], {"oai_history": [], "chatbot_ui": []}, f"β Could not load {name}: {e}" | |
# Transcription | |
def transcribe_audio(file): | |
if not file: return "" | |
audio_file = open(file, "rb") | |
transcript = openai.Audio.transcribe("whisper-1", audio_file) | |
return transcript["text"] | |
# File processor | |
def process_file(file): | |
if file is None: | |
return "" | |
filename = file.name | |
ext = filename.split(".")[-1].lower() | |
try: | |
if ext == "pdf": | |
with pdfplumber.open(file.name) as pdf: | |
text = "\n".join([p.extract_text() for p in pdf.pages if p.extract_text()]) | |
elif ext == "docx": | |
doc = docx.Document(file.name) | |
text = "\n".join([p.text for p in doc.paragraphs]) | |
elif ext in ["csv", "xlsx"]: | |
df = pd.read_csv(file.name) if ext == "csv" else pd.read_excel(file.name) | |
text = df.to_string() | |
elif ext in ["png", "jpg", "jpeg"]: | |
Image.open(file.name) # just validate | |
text = "Image uploaded. Please describe what you want to know." | |
else: | |
text = file.read().decode("utf-8") | |
if len(text) > 8000: | |
text = text[:8000] | |
response = openai.ChatCompletion.create( | |
model="llama3-70b-8192", | |
messages=[ | |
{"role": "system", "content": "Summarize or explain the content uploaded."}, | |
{"role": "user", "content": text} | |
] | |
) | |
return response["choices"][0]["message"]["content"] | |
except Exception as e: | |
return f"β Error processing file: {e}" | |
# UI | |
with gr.Blocks(css=""" | |
body { background: #fff; color: #000; } | |
#title { font-size: 2rem; text-align: center; margin-top: 1rem; } | |
.gr-chatbot { height: 70vh !important; } | |
textarea, input[type='text'] { background: #f0f0f0; border-radius: 8px; } | |
.gr-button { border-radius: 8px; background: #000; color: white; } | |
""") as demo: | |
state = gr.State({"oai_history": [], "chatbot_ui": []}) | |
gr.Markdown("# π€ Neobot - Always Listening", elem_id="title") | |
chatbot = gr.Chatbot(type="messages") | |
with gr.Row(): | |
chat_input = gr.Textbox(placeholder="Type here or use mic...", scale=6, show_label=False) | |
send_btn = gr.Button("π", scale=1) | |
with gr.Row(): | |
mic_audio = gr.Audio(type="filepath", label="ποΈ Record Voice") | |
mic_audio.change(transcribe_audio, [mic_audio], [chat_input]) | |
with gr.Row(): | |
file_upload = gr.File(label="π Upload file", file_types=[".pdf", ".docx", ".txt", ".csv", ".xlsx", ".jpg", ".png"]) | |
process_btn = gr.Button("π Analyze File") | |
with gr.Row(): | |
new_btn = gr.Button("π New Chat") | |
save_btn = gr.Button("πΎ Save Chat") | |
dropdown = gr.Dropdown(label="π Saved Chats", choices=list_saved_files(), interactive=True) | |
load_btn = gr.Button("π₯ Load Chat") | |
status = gr.Markdown() | |
send_btn.click(chat_with_groq, [chat_input, state], [chat_input, chatbot, state]) | |
chat_input.submit(chat_with_groq, [chat_input, state], [chat_input, chatbot, state]) | |
process_btn.click(process_file, [file_upload], [chat_input]) | |
new_btn.click(lambda: ("", [], {"oai_history": [], "chatbot_ui": []}), [], [chat_input, chatbot, state]) | |
save_btn.click(save_session, [state], [status]) | |
save_btn.click(lambda: gr.update(choices=list_saved_files()), [], [dropdown]) | |
load_btn.click(load_chat, [dropdown], [chatbot, state, status]) | |
demo.launch() | |