Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
from sentence_transformers import SentenceTransformer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import torch | |
import json | |
import os | |
import glob | |
from pathlib import Path | |
from datetime import datetime | |
import requests | |
from collections import defaultdict | |
import re | |
from urllib.parse import quote | |
from xml.etree import ElementTree as ET | |
import base64 | |
from PIL import Image | |
# ----------------------------------------- | |
# Session State Initialization | |
# ----------------------------------------- | |
if 'search_history' not in st.session_state: | |
st.session_state['search_history'] = [] | |
if 'last_voice_input' not in st.session_state: | |
st.session_state['last_voice_input'] = "" | |
if 'transcript_history' not in st.session_state: | |
st.session_state['transcript_history'] = [] | |
if 'should_rerun' not in st.session_state: | |
st.session_state['should_rerun'] = False | |
if 'search_columns' not in st.session_state: | |
st.session_state['search_columns'] = [] | |
if 'initial_search_done' not in st.session_state: | |
st.session_state['initial_search_done'] = False | |
if 'tts_voice' not in st.session_state: | |
st.session_state['tts_voice'] = "en-US-AriaNeural" | |
if 'arxiv_last_query' not in st.session_state: | |
st.session_state['arxiv_last_query'] = "" | |
if 'old_val' not in st.session_state: | |
st.session_state['old_val'] = None | |
if 'current_file' not in st.session_state: | |
st.session_state['current_file'] = None | |
if 'file_content' not in st.session_state: | |
st.session_state['file_content'] = "" | |
# ----------------------------------------- | |
# Utility Functions | |
# ----------------------------------------- | |
def highlight_text(text, query): | |
"""Highlight case-insensitive occurrences of query in text with bold formatting.""" | |
if not query: | |
return text | |
pattern = re.compile(re.escape(query), re.IGNORECASE) | |
return pattern.sub(lambda m: f"**{m.group(0)}**", text) | |
def fetch_dataset_rows(): | |
"""Fetch dataset from Hugging Face API and cache it.""" | |
try: | |
url = "https://datasets-server.huggingface.co/first-rows?dataset=omegalabsinc%2Fomega-multimodal&config=default&split=train" | |
response = requests.get(url, timeout=30) | |
if response.status_code == 200: | |
data = response.json() | |
if 'rows' in data: | |
processed_rows = [] | |
for row_data in data['rows']: | |
row = row_data.get('row', row_data) | |
# Convert embed fields from strings to arrays | |
for key in row: | |
if any(term in key.lower() for term in ['embed', 'vector', 'encoding']): | |
if isinstance(row[key], str): | |
try: | |
row[key] = [float(x.strip()) for x in row[key].strip('[]').split(',') if x.strip()] | |
except: | |
continue | |
processed_rows.append(row) | |
df = pd.DataFrame(processed_rows) | |
st.session_state['search_columns'] = [col for col in df.columns | |
if col not in ['video_embed', 'description_embed', 'audio_embed']] | |
return df | |
except: | |
pass | |
return load_example_data() | |
def load_example_data(): | |
"""Load example data as fallback.""" | |
example_data = [ | |
{ | |
"video_id": "cd21da96-fcca-4c94-a60f-0b1e4e1e29fc", | |
"youtube_id": "IO-vwtyicn4", | |
"description": "This video shows a close-up of an ancient text carved into a surface.", | |
"views": 45489, | |
"start_time": 1452, | |
"end_time": 1458, | |
"video_embed": [0.014160037972033024, -0.003111184574663639, -0.016604168340563774], | |
"description_embed": [-0.05835828185081482, 0.02589797042310238, 0.11952091753482819] | |
} | |
] | |
return pd.DataFrame(example_data) | |
def load_dataset(): | |
df = fetch_dataset_rows() | |
return df | |
def prepare_features(dataset): | |
"""Prepare embeddings with adaptive field detection.""" | |
try: | |
embed_cols = [col for col in dataset.columns | |
if any(term in col.lower() for term in ['embed', 'vector', 'encoding'])] | |
embeddings = {} | |
for col in embed_cols: | |
try: | |
data = [] | |
for row in dataset[col]: | |
if isinstance(row, str): | |
values = [float(x.strip()) for x in row.strip('[]').split(',') if x.strip()] | |
elif isinstance(row, list): | |
values = row | |
else: | |
continue | |
data.append(values) | |
if data: | |
embeddings[col] = np.array(data) | |
except: | |
continue | |
# Assign default embeddings | |
video_embeds = embeddings.get('video_embed', None) | |
text_embeds = embeddings.get('description_embed', None) | |
# If missing either, fall back to what is available | |
if video_embeds is None and embeddings: | |
video_embeds = next(iter(embeddings.values())) | |
if text_embeds is None: | |
text_embeds = video_embeds if video_embeds is not None else np.random.randn(len(dataset), 384) | |
if video_embeds is None: | |
# Fallback to random embeddings if none found | |
num_rows = len(dataset) | |
video_embeds = np.random.randn(num_rows, 384) | |
text_embeds = np.random.randn(num_rows, 384) | |
return video_embeds, text_embeds | |
except: | |
# Fallback to random embeddings | |
num_rows = len(dataset) | |
return np.random.randn(num_rows, 384), np.random.randn(num_rows, 384) | |
class VideoSearch: | |
def __init__(self): | |
self.text_model = SentenceTransformer('all-MiniLM-L6-v2') | |
self.dataset = load_dataset() | |
self.video_embeds, self.text_embeds = prepare_features(self.dataset) | |
def search(self, query, column=None, top_k=20): | |
# If no query, return all records | |
if not query.strip(): | |
# Just return all rows as results | |
results = [] | |
df_copy = self.dataset.copy() | |
# Add a neutral relevance score (e.g. 1.0) | |
for row in df_copy.itertuples(): | |
result = {'relevance_score': 1.0} | |
for col in df_copy.columns: | |
if col not in ['video_embed', 'description_embed', 'audio_embed']: | |
result[col] = getattr(row, col) | |
results.append(result) | |
return results[:top_k] | |
# Semantic search | |
query_embedding = self.text_model.encode([query])[0] | |
video_sims = cosine_similarity([query_embedding], self.video_embeds)[0] | |
text_sims = cosine_similarity([query_embedding], self.text_embeds)[0] | |
combined_sims = 0.5 * video_sims + 0.5 * text_sims | |
# If a column is selected (not All Fields), strictly filter by textual match | |
if column and column in self.dataset.columns and column != "All Fields": | |
mask = self.dataset[column].astype(str).str.contains(query, case=False, na=False) | |
combined_sims = combined_sims[mask] | |
filtered_dataset = self.dataset[mask].copy() | |
else: | |
filtered_dataset = self.dataset.copy() | |
# Get top results | |
top_k = min(top_k, len(combined_sims)) | |
if top_k == 0: | |
return [] | |
top_indices = np.argsort(combined_sims)[-top_k:][::-1] | |
results = [] | |
filtered_dataset = filtered_dataset.iloc[top_indices] | |
filtered_sims = combined_sims[top_indices] | |
for idx, row in zip(top_indices, filtered_dataset.itertuples()): | |
result = {'relevance_score': float(filtered_sims[list(top_indices).index(idx)])} | |
for col in filtered_dataset.columns: | |
if col not in ['video_embed', 'description_embed', 'audio_embed']: | |
result[col] = getattr(row, col) | |
results.append(result) | |
return results | |
# ----------------------------------------- | |
# Arxiv Search Functions | |
# ----------------------------------------- | |
def arxiv_search(query, max_results=5): | |
"""Perform a simple Arxiv search using their API and return top results.""" | |
if not query.strip(): | |
return [] | |
base_url = "http://export.arxiv.org/api/query?" | |
search_url = base_url + f"search_query={quote(query)}&start=0&max_results={max_results}" | |
r = requests.get(search_url) | |
if r.status_code == 200: | |
root = ET.fromstring(r.text) | |
ns = {'atom': 'http://www.w3.org/2005/Atom'} | |
entries = root.findall('atom:entry', ns) | |
results = [] | |
for entry in entries: | |
title = entry.find('atom:title', ns).text.strip() | |
summary = entry.find('atom:summary', ns).text.strip() | |
link = None | |
for l in entry.findall('atom:link', ns): | |
if l.get('type') == 'text/html': | |
link = l.get('href') | |
break | |
results.append((title, summary, link)) | |
return results | |
return [] | |
def perform_arxiv_lookup(q, vocal_summary=True, titles_summary=True, full_audio=False): | |
results = arxiv_search(q, max_results=5) | |
if not results: | |
st.write("No Arxiv results found.") | |
return | |
st.markdown(f"**Arxiv Search Results for '{q}':**") | |
for i, (title, summary, link) in enumerate(results, start=1): | |
st.markdown(f"**{i}. {title}**") | |
st.write(summary) | |
if link: | |
st.markdown(f"[View Paper]({link})") | |
# ----------------------------------------- | |
# File Manager | |
# ----------------------------------------- | |
def show_file_manager(): | |
"""Display file manager interface for uploading and browsing local files.""" | |
st.subheader("π File Manager") | |
col1, col2 = st.columns(2) | |
with col1: | |
uploaded_file = st.file_uploader("Upload File", type=['txt', 'md', 'mp3']) | |
if uploaded_file: | |
with open(uploaded_file.name, "wb") as f: | |
f.write(uploaded_file.getvalue()) | |
st.success(f"Uploaded: {uploaded_file.name}") | |
st.session_state.should_rerun = True | |
with col2: | |
if st.button("π Clear All Files"): | |
for f in glob.glob("*.txt") + glob.glob("*.md") + glob.glob("*.mp3"): | |
os.remove(f) | |
st.success("All files cleared!") | |
st.session_state.should_rerun = True | |
files = glob.glob("*.txt") + glob.glob("*.md") + glob.glob("*.mp3") | |
if files: | |
st.write("### Existing Files") | |
for f in files: | |
with st.expander(f"π {os.path.basename(f)}"): | |
if f.endswith('.mp3'): | |
st.audio(f) | |
else: | |
with open(f, 'r', encoding='utf-8') as file: | |
st.text_area("Content", file.read(), height=100) | |
if st.button(f"Delete {os.path.basename(f)}", key=f"del_{f}"): | |
os.remove(f) | |
st.session_state.should_rerun = True | |
# ----------------------------------------- | |
# Editor: Allow user to select a text file and edit it | |
# ----------------------------------------- | |
def display_editor(): | |
# Let user pick a file from local directory to edit | |
text_files = glob.glob("*.txt") + glob.glob("*.md") | |
selected_file = st.selectbox("Select a file to edit:", ["None"] + text_files) | |
if selected_file != "None": | |
with open(selected_file, 'r', encoding='utf-8') as f: | |
content = f.read() | |
new_content = st.text_area("βοΈ Edit Content:", value=content, height=300) | |
if st.button("πΎ Save"): | |
with open(selected_file, 'w', encoding='utf-8') as f: | |
f.write(new_content) | |
st.success("File saved!") | |
st.session_state.should_rerun = True | |
# ----------------------------------------- | |
# Media (Images & Videos) | |
# ----------------------------------------- | |
def show_media(): | |
st.header("πΈ Images & π₯ Videos") | |
tabs = st.tabs(["πΌ Images", "π₯ Video"]) | |
with tabs[0]: | |
imgs = glob.glob("*.png") + glob.glob("*.jpg") + glob.glob("*.jpeg") | |
if imgs: | |
c = st.slider("Columns", 1, 5, 3) | |
cols = st.columns(c) | |
for i, f in enumerate(imgs): | |
with cols[i % c]: | |
st.image(Image.open(f), use_column_width=True) | |
else: | |
st.write("No images found.") | |
with tabs[1]: | |
vids = glob.glob("*.mp4") + glob.glob("*.webm") + glob.glob("*.mov") | |
if vids: | |
for v in vids: | |
with st.expander(f"π₯ {os.path.basename(v)}"): | |
st.video(v) | |
else: | |
st.write("No videos found.") | |
# ----------------------------------------- | |
# Video Search | |
# ----------------------------------------- | |
def display_video_search(): | |
st.subheader("Search Videos") | |
search_instance = VideoSearch() | |
col1, col2 = st.columns([3, 1]) | |
with col1: | |
query = st.text_input("Enter your search query:", value="ancient" if not st.session_state['initial_search_done'] else "") | |
with col2: | |
search_column = st.selectbox("Search in field:", ["All Fields"] + st.session_state['search_columns']) | |
col3, col4 = st.columns(2) | |
with col3: | |
num_results = st.slider("Number of results:", 1, 100, 20) | |
with col4: | |
search_button = st.button("π Search") | |
if (search_button or not st.session_state['initial_search_done']) and query is not None: | |
st.session_state['initial_search_done'] = True | |
selected_column = None if search_column == "All Fields" else search_column | |
with st.spinner("Searching..."): | |
results = search_instance.search(query, selected_column, num_results) | |
st.session_state['search_history'].append({ | |
'query': query, | |
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
'results': results[:5] | |
}) | |
for i, result in enumerate(results, 1): | |
highlighted_desc = highlight_text(result['description'], query) | |
with st.expander(f"Result {i}: {result['description'][:100]}...", expanded=(i == 1)): | |
cols = st.columns([2, 1]) | |
with cols[0]: | |
st.markdown("**Description:**") | |
st.write(highlighted_desc) | |
st.markdown(f"**Time Range:** {result['start_time']}s - {result['end_time']}s") | |
st.markdown(f"**Views:** {result['views']:,}") | |
with cols[1]: | |
st.markdown(f"**Relevance Score:** {result['relevance_score']:.2%}") | |
if result.get('youtube_id'): | |
st.video(f"https://youtube.com/watch?v={result['youtube_id']}&t={result['start_time']}") | |
# ----------------------------------------- | |
# Main Application (Integrated) | |
# ----------------------------------------- | |
def main(): | |
st.sidebar.markdown("### π²BikeAIπ Multi-Agent Research") | |
# We remove the "π€ Voice" option since voice input is removed | |
tab_main = st.sidebar.radio("Action:", ["πΈ Media", "π ArXiv", "π Editor"]) | |
# File manager in the sidebar | |
with st.sidebar: | |
st.subheader("βοΈ Settings & History") | |
if st.button("ποΈ Clear History"): | |
st.session_state['search_history'] = [] | |
st.experimental_rerun() | |
st.markdown("### Recent Searches") | |
for entry in reversed(st.session_state['search_history'][-5:]): | |
with st.expander(f"{entry['timestamp']}: {entry['query']}"): | |
for i, result in enumerate(entry['results'], 1): | |
st.write(f"{i}. {result['description'][:100]}...") | |
st.markdown("### TTS Voice (unused)") | |
st.selectbox("TTS Voice:", | |
["en-US-AriaNeural", "en-US-GuyNeural", "en-GB-SoniaNeural"], | |
key="tts_voice") | |
# Main content based on selection | |
if tab_main == "πΈ Media": | |
# Show media and video search combined | |
show_media() | |
st.write("---") | |
display_video_search() | |
elif tab_main == "π ArXiv": | |
st.subheader("Arxiv Search") | |
q = st.text_input("Enter your Arxiv search query:", value=st.session_state['arxiv_last_query']) | |
vocal_summary = st.checkbox("π Short Audio Summary (Placeholder - no TTS actually)", value=True) | |
titles_summary = st.checkbox("π Titles Only", value=True) | |
full_audio = st.checkbox("π Full Audio Results (Placeholder)", value=False) | |
if st.button("π Arxiv Search"): | |
st.session_state['arxiv_last_query'] = q | |
perform_arxiv_lookup(q, vocal_summary=vocal_summary, titles_summary=titles_summary, full_audio=full_audio) | |
elif tab_main == "π Editor": | |
show_file_manager() | |
st.write("---") | |
display_editor() | |
# Rerun if needed | |
if st.session_state.should_rerun: | |
st.session_state.should_rerun = False | |
st.experimental_rerun() | |
if __name__ == "__main__": | |
main() | |