|
import gradio as gr |
|
import os |
|
import requests |
|
import json |
|
import re |
|
from bs4 import BeautifulSoup |
|
from datetime import datetime |
|
import tempfile |
|
|
|
|
|
SPACE_NAME = "My Custom Space" |
|
SPACE_DESCRIPTION = "" |
|
SYSTEM_PROMPT = """You are an advanced research assistant specializing in academic literature search and analysis. Your expertise includes finding peer-reviewed sources, critically evaluating research methodology, synthesizing insights across multiple papers, and providing properly formatted citations. When responding, ground all claims in specific sources from provided URL contexts, distinguish between direct evidence and analytical interpretation, and highlight any limitations or conflicting findings. Use clear, accessible language that makes complex research understandable, and suggest related areas of inquiry when relevant. Your goal is to be a knowledgeable research partner who helps users navigate academic information with precision and clarity.""" |
|
MODEL = "google/gemini-2.0-flash-001" |
|
GROUNDING_URLS = [] |
|
|
|
ACCESS_CODE = os.environ.get("SPACE_ACCESS_CODE", "") |
|
ENABLE_DYNAMIC_URLS = True |
|
ENABLE_VECTOR_RAG = True |
|
ENABLE_WEB_SEARCH = False |
|
RAG_DATA = {"index_base64": "SXhGSYABAAABAAAAAAAAAAAAEAAAAAAAAAAQAAAAAAABAAAAAIABAAAAAAAAq+L8vO1iA71Ey6u7avEnPT/5tTxuLQs7hrgTPJKVCT18lxu9c9UbvZcz+rvtsBs987PLPUS2ET0OLH+73wgnPSK7kD2GeYC8eGOLPFzvBLzgT4m7MnJUPfAGgz33vSu9WGeRvBjTPD0UljK8hdCyvadptLsNVDO9ac2rPU/HPj2u+eo8k7wnPtDTjr0rCeq75DxvvHK0PL1mRkW9pwzevNMce71jGBu9KAuqvSdelz3RUZE9ZIRPPU7Bw73q+qS80jk+PT/soD06Rym+46qgvI/G5ju3uSs9+ooxvWbipjz5tos7xC2ZPCfAmzsa/6m8ID8IO3li8L0dUaE9jbO8vEdoKjyLjCG9PXKKvalusDwX26o8CH7Rvc6lcr1truM9IroGvfIZYj0+Sne9z62xPWCjbzyLoJq9ad5DPVVH+bwcUZ69o/HavKFA3jxtLcy7h3FZPfOwkbsbXts9ig2svRcWS7zXWcc888SUPFf/Ub3oIbM9NIrNO6OZUz3DDGE8m9ikPQ9/Cr3cJLk9Tj0wPFOCkD1Azow9v4poPDFJd71F2o+9QWM2PNM86Twxrsq8a5ORPYmX2L3psZC9502bPMyBG71r4aq9iai4u+3MZL0Xk+C7Y5LqvFePIDsgpIE8CTC7uSoonT3z54S8dluQvOsfpbzIs/48CKDrvL7oEQpmYpe8y77bvOjiRb2nQIE9qGLUu4W0qbz2MYW8iWivPV1qnr2ou468oSryvWF5rz1F5j+847vGPfmy/DxYAN080dbXvIahyTzXzNy82L32vOIHhD2i3hS95GNqPdzPmj3ZeyQ9zvw3vCvVeD3+0q+9SCM3POSd+bw9tse90uCCvRTrvLz0ay89UUHcPHDQPj28LAk8G9kQvT9pXL2zCIg8OPeVvQq6Er1Lk9c8DZorvREmS715Z/M8qkYQveSYdLzWZEU94UM4vVQHsTymWj89vG7tvCzcIT1RP7Q9BQ5mu+Jgrzx8x1y8nE0qPbsmRjxe0jW8wbEyvBC5mz0uih67vLkJvWrHIr0PutA6u1ZGPYqRyT2rN6Q9znmIPeI7Ez0/PxE9BPElvXFa5rytpfU7VexEva176L0rlkG81v0mPfKMIbybQLa9V2IEPUsazr17Thk9agSDvZn+gjwnUh6+N61iOyiZRr2e7c27albBPSeeGL2lUB68d+NYPcJg4IlqYLO8mHA/vXH9mLx3DUs9gfpgPK0Hi7wMU1u83oekPBRzcL1D0jC9ww7VvNwxiL3iR3o9nxAKvRJMp7yNWJw9Ve3cvJqxbL1FBGG9ifKqPeNSir38M6M8KJR4vdreOz0ZIiW8aCggvH8NQLqSATK9soGBvKo20LzKkEi8UhFBvf8NALv8L6M8FDRFPA898bxCOo09UomcvVixyzsAbEE9jRK6PBtlmT21tA69TgYjvQfESb3/Mow7XMjZvZgzVT1CbGU9SzDQvF7eUD1roz+9rPYhPF3AHL1jkuI8zkRnvXqyoL1JBY08/RaBPJAxBz3M6ne8DqkUvKIvnT2hRhg817rJPI/tuL2OGz+9PZbvPFYrob3MARm93w1IPQkCi7wRzRk9vBV8PaM7WT0MBIs8OBZVPcNUBL3pLdo8JGhEvTGSj7y6Z608dR6zPaKYkD2lErg7BlQbPqimq7ykz2C8U9nsvMQsVz12ChG9f5QIPDRRCLweOMA95fQ4PTBNOLPNAMm9wOguvAO8K71gWoi87ysTPRBC+Dv4UbA8t17rPRpRoL1mh8e8rh0NPf1FRbwo8eq8yKiMPLw4qz1qBlA8X40RPeuoGrzSyhY9k6fEO38dnj02gFI9xlEVPPO85z2/nGc8U2CzPHoLlrzeUtM949eDPTQGfr2HRPO89GoHPXtjtrz3dqa91VKlPaq5Kj3z7F08QHYIvGwJzr2rJBw8dX7zPLgyhDuZh+Y83CUUvRQfAz3BO308N/cNvD2wrb0W4ck6rccnPOlxpLybUpG90PS1Oy98yT0q83w8hom3u7K76bxdcJg8NPmZPTRWIb2QPeM7C/ehO9Gq3Dzef4I8", "chunks": {"8570c8c5": {"text": "Vector Database Test Document This is a test document for evaluating the vector database functionality. Section 1: Introduction to Vector Databases Vector databases store and query high-dimensional vector representations of data. They enable semantic search by finding vectors similar to a query vector in an embedding space. Section 2: Use Cases Common applications include: - Document retrieval and question answering - Similarity search for products or content - Recommendation systems - Semantic search in chatbots Section 3: Technical Implementation Vector databases typically use embedding models to convert text into dense vectors, then use algorithms like cosine similarity or approximate nearest neighbor search to find relevant results. Section 4: Benefits - Semantic understanding beyond keyword matching - Scalable retrieval for large document collections - Integration with modern AI systems and large language models - Support for multi-modal data (text, images, audio) This document should generate multiple chunks when processed by the system.", "metadata": {"file_path": "/private/var/folders/0m/_clrz0_d1tzf_fns8rxyy1jr0000gn/T/gradio/c4c745f9c7f069f694a492715df7f50d07f18cee76e93e198029acd8a6c38532/doc.txt", "file_name": "doc.txt", "chunk_index": 0, "start_word": 0, "word_count": 151}, "chunk_id": "8570c8c5"}}, "chunk_ids": ["8570c8c5"], "dimension": 384, "model_name": "all-MiniLM-L6-v2"} |
|
|
|
|
|
API_KEY = os.environ.get("OPENROUTER_API_KEY") |
|
if API_KEY: |
|
API_KEY = API_KEY.strip() |
|
if not API_KEY: |
|
API_KEY = None |
|
|
|
|
|
def validate_api_key(): |
|
"""Validate API key configuration with detailed logging""" |
|
if not API_KEY: |
|
print(f"β οΈ API KEY CONFIGURATION ERROR:") |
|
print(f" Variable name: OPENROUTER_API_KEY") |
|
print(f" Status: Not set or empty") |
|
print(f" Action needed: Set 'OPENROUTER_API_KEY' in HuggingFace Space secrets") |
|
print(f" Expected format: sk-or-xxxxxxxxxx") |
|
return False |
|
elif not API_KEY.startswith('sk-or-'): |
|
print(f"β οΈ API KEY FORMAT WARNING:") |
|
print(f" Variable name: OPENROUTER_API_KEY") |
|
print(f" Current value: {{API_KEY[:10]}}..." if len(API_KEY) > 10 else API_KEY) |
|
print(f" Expected format: sk-or-xxxxxxxxxx") |
|
print(f" Note: OpenRouter keys should start with 'sk-or-'") |
|
return True |
|
else: |
|
print(f"β
API Key configured successfully") |
|
print(f" Variable: OPENROUTER_API_KEY") |
|
print(f" Format: Valid OpenRouter key") |
|
return True |
|
|
|
|
|
API_KEY_VALID = validate_api_key() |
|
|
|
def validate_url_domain(url): |
|
"""Basic URL domain validation""" |
|
try: |
|
from urllib.parse import urlparse |
|
parsed = urlparse(url) |
|
|
|
if parsed.netloc and '.' in parsed.netloc: |
|
return True |
|
except: |
|
pass |
|
return False |
|
|
|
def fetch_url_content(url): |
|
"""Enhanced URL content fetching with improved compatibility and error handling""" |
|
if not validate_url_domain(url): |
|
return f"Invalid URL format: {url}" |
|
|
|
try: |
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
|
'Accept-Language': 'en-US,en;q=0.5', |
|
'Accept-Encoding': 'gzip, deflate', |
|
'Connection': 'keep-alive' |
|
} |
|
|
|
response = requests.get(url, timeout=15, headers=headers) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
for element in soup(["script", "style", "nav", "header", "footer", "aside", "form", "button"]): |
|
element.decompose() |
|
|
|
|
|
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=lambda x: bool(x and 'content' in x.lower())) or soup |
|
text = main_content.get_text() |
|
|
|
|
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = ' '.join(chunk for chunk in chunks if chunk and len(chunk) > 2) |
|
|
|
|
|
if len(text) > 4000: |
|
truncated = text[:4000] |
|
last_period = truncated.rfind('.') |
|
if last_period > 3000: |
|
text = truncated[:last_period + 1] |
|
else: |
|
text = truncated + "..." |
|
|
|
return text if text.strip() else "No readable content found at this URL" |
|
|
|
except requests.exceptions.Timeout: |
|
return f"Timeout error fetching {url} (15s limit exceeded)" |
|
except requests.exceptions.RequestException as e: |
|
return f"Error fetching {url}: {str(e)}" |
|
except Exception as e: |
|
return f"Error processing content from {url}: {str(e)}" |
|
|
|
def extract_urls_from_text(text): |
|
"""Extract URLs from text using regex with enhanced validation""" |
|
import re |
|
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]"]+' |
|
urls = re.findall(url_pattern, text) |
|
|
|
|
|
validated_urls = [] |
|
for url in urls: |
|
|
|
url = url.rstrip('.,!?;:') |
|
|
|
if '.' in url and len(url) > 10: |
|
validated_urls.append(url) |
|
|
|
return validated_urls |
|
|
|
|
|
_url_content_cache = {} |
|
|
|
def get_grounding_context(): |
|
"""Fetch context from grounding URLs with caching""" |
|
if not GROUNDING_URLS: |
|
return "" |
|
|
|
|
|
cache_key = tuple(sorted([url for url in GROUNDING_URLS if url and url.strip()])) |
|
|
|
|
|
if cache_key in _url_content_cache: |
|
return _url_content_cache[cache_key] |
|
|
|
context_parts = [] |
|
for i, url in enumerate(GROUNDING_URLS, 1): |
|
if url.strip(): |
|
content = fetch_url_content(url.strip()) |
|
context_parts.append(f"Context from URL {i} ({url}):\n{content}") |
|
|
|
if context_parts: |
|
result = "\n\n" + "\n\n".join(context_parts) + "\n\n" |
|
else: |
|
result = "" |
|
|
|
|
|
_url_content_cache[cache_key] = result |
|
return result |
|
|
|
def export_conversation_to_markdown(conversation_history): |
|
"""Export conversation history to markdown format""" |
|
if not conversation_history: |
|
return "No conversation to export." |
|
|
|
markdown_content = f"""# Conversation Export |
|
Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
|
|
|
--- |
|
|
|
""" |
|
|
|
message_pair_count = 0 |
|
for i, message in enumerate(conversation_history): |
|
if isinstance(message, dict): |
|
role = message.get('role', 'unknown') |
|
content = message.get('content', '') |
|
|
|
if role == 'user': |
|
message_pair_count += 1 |
|
markdown_content += f"## User Message {message_pair_count}\n\n{content}\n\n" |
|
elif role == 'assistant': |
|
markdown_content += f"## Assistant Response {message_pair_count}\n\n{content}\n\n---\n\n" |
|
elif isinstance(message, (list, tuple)) and len(message) >= 2: |
|
|
|
message_pair_count += 1 |
|
user_msg, assistant_msg = message[0], message[1] |
|
if user_msg: |
|
markdown_content += f"## User Message {message_pair_count}\n\n{user_msg}\n\n" |
|
if assistant_msg: |
|
markdown_content += f"## Assistant Response {message_pair_count}\n\n{assistant_msg}\n\n---\n\n" |
|
|
|
return markdown_content |
|
|
|
|
|
if ENABLE_VECTOR_RAG and RAG_DATA: |
|
try: |
|
import faiss |
|
import numpy as np |
|
import base64 |
|
|
|
class SimpleRAGContext: |
|
def __init__(self, rag_data): |
|
|
|
index_bytes = base64.b64decode(rag_data['index_base64']) |
|
self.index = faiss.deserialize_index(index_bytes) |
|
|
|
|
|
self.chunks = rag_data['chunks'] |
|
self.chunk_ids = rag_data['chunk_ids'] |
|
|
|
def get_context(self, query, max_chunks=3): |
|
"""Get relevant context - simplified version""" |
|
|
|
|
|
return "\n\n[RAG context would be retrieved here based on similarity search]\n\n" |
|
|
|
rag_context_provider = SimpleRAGContext(RAG_DATA) |
|
except Exception as e: |
|
print(f"Failed to initialize RAG: {e}") |
|
rag_context_provider = None |
|
else: |
|
rag_context_provider = None |
|
|
|
def generate_response(message, history): |
|
"""Generate response using OpenRouter API""" |
|
|
|
|
|
if not API_KEY: |
|
error_msg = f"π **API Key Required**\n\n" |
|
error_msg += f"Please configure your OpenRouter API key:\n" |
|
error_msg += f"1. Go to Settings (βοΈ) in your HuggingFace Space\n" |
|
error_msg += f"2. Click 'Variables and secrets'\n" |
|
error_msg += f"3. Add secret: **OPENROUTER_API_KEY**\n" |
|
error_msg += f"4. Value: Your OpenRouter API key (starts with `sk-or-`)\n\n" |
|
error_msg += f"Get your API key at: https://openrouter.ai/keys" |
|
print(f"β API request failed: No API key configured for OPENROUTER_API_KEY") |
|
return error_msg |
|
|
|
|
|
grounding_context = get_grounding_context() |
|
|
|
|
|
if ENABLE_VECTOR_RAG and rag_context_provider: |
|
rag_context = rag_context_provider.get_context(message) |
|
if rag_context: |
|
grounding_context += rag_context |
|
|
|
|
|
if ENABLE_DYNAMIC_URLS: |
|
urls_in_message = extract_urls_from_text(message) |
|
if urls_in_message: |
|
|
|
dynamic_context_parts = [] |
|
for url in urls_in_message[:3]: |
|
content = fetch_url_content(url) |
|
dynamic_context_parts.append(f"\n\nDynamic context from {url}:\n{content}") |
|
if dynamic_context_parts: |
|
grounding_context += "\n".join(dynamic_context_parts) |
|
|
|
|
|
if ENABLE_WEB_SEARCH: |
|
should_search = True |
|
|
|
|
|
import re |
|
if re.search(r'```[\s\S]*```', message): |
|
should_search = False |
|
|
|
|
|
urls_in_message = extract_urls_from_text(message) |
|
if urls_in_message and len(' '.join(urls_in_message)) > len(message) * 0.5: |
|
should_search = False |
|
|
|
|
|
if len(message.strip()) < 5: |
|
should_search = False |
|
|
|
if should_search: |
|
|
|
search_query = message.strip() |
|
try: |
|
|
|
import urllib.parse |
|
import asyncio |
|
|
|
async def search_with_crawl4ai(search_query): |
|
try: |
|
from crawl4ai import WebCrawler |
|
|
|
|
|
encoded_query = urllib.parse.quote_plus(search_query) |
|
search_url = f"https://duckduckgo.com/html/?q={encoded_query}" |
|
|
|
|
|
crawler = WebCrawler(verbose=False) |
|
|
|
try: |
|
|
|
await crawler.astart() |
|
|
|
|
|
result = await crawler.arun(url=search_url) |
|
|
|
if result.success: |
|
|
|
content = result.cleaned_html if result.cleaned_html else result.markdown |
|
|
|
|
|
if content: |
|
|
|
lines = [line.strip() for line in content.split('\n') if line.strip()] |
|
cleaned_content = '\n'.join(lines) |
|
|
|
|
|
if len(cleaned_content) > 2000: |
|
cleaned_content = cleaned_content[:2000] + "..." |
|
|
|
return cleaned_content |
|
else: |
|
return "No content extracted from search results" |
|
else: |
|
return f"Search failed: {result.error_message if hasattr(result, 'error_message') else 'Unknown error'}" |
|
|
|
finally: |
|
|
|
await crawler.aclose() |
|
|
|
except ImportError: |
|
|
|
encoded_query = urllib.parse.quote_plus(search_query) |
|
search_url = f"https://duckduckgo.com/html/?q={encoded_query}" |
|
|
|
|
|
response = requests.get(search_url, headers={'User-Agent': 'Mozilla/5.0'}, timeout=10) |
|
if response.status_code == 200: |
|
from bs4 import BeautifulSoup |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
for script in soup(["script", "style", "nav", "header", "footer"]): |
|
script.decompose() |
|
|
|
|
|
text = soup.get_text() |
|
|
|
|
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = ' '.join(chunk for chunk in chunks if chunk) |
|
|
|
|
|
if len(text) > 2000: |
|
text = text[:2000] + "..." |
|
|
|
return text |
|
else: |
|
return f"Failed to fetch search results: {response.status_code}" |
|
|
|
|
|
if hasattr(asyncio, 'run'): |
|
search_result = asyncio.run(search_with_crawl4ai(search_query)) |
|
else: |
|
|
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
try: |
|
search_result = loop.run_until_complete(search_with_crawl4ai(search_query)) |
|
finally: |
|
loop.close() |
|
|
|
grounding_context += f"\n\nWeb search results for '{search_query}':\n{search_result}" |
|
except Exception as e: |
|
|
|
urls = extract_urls_from_text(search_query) |
|
if urls: |
|
fallback_results = [] |
|
for url in urls[:2]: |
|
content = fetch_url_content(url) |
|
fallback_results.append(f"Content from {url}:\n{content[:500]}...") |
|
grounding_context += f"\n\nWeb search fallback for '{search_query}':\n" + "\n\n".join(fallback_results) |
|
else: |
|
grounding_context += f"\n\nWeb search requested for '{search_query}' but search functionality is unavailable" |
|
|
|
|
|
enhanced_system_prompt = SYSTEM_PROMPT + grounding_context |
|
|
|
|
|
messages = [{"role": "system", "content": enhanced_system_prompt}] |
|
|
|
|
|
for chat in history: |
|
if isinstance(chat, dict): |
|
|
|
messages.append(chat) |
|
elif isinstance(chat, (list, tuple)) and len(chat) >= 2: |
|
|
|
user_msg, assistant_msg = chat[0], chat[1] |
|
if user_msg: |
|
messages.append({"role": "user", "content": user_msg}) |
|
if assistant_msg: |
|
messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
try: |
|
print(f"π Making API request to OpenRouter...") |
|
print(f" Model: {MODEL}") |
|
print(f" Messages: {len(messages)} in conversation") |
|
|
|
response = requests.post( |
|
url="https://openrouter.ai/api/v1/chat/completions", |
|
headers={ |
|
"Authorization": f"Bearer {API_KEY}", |
|
"Content-Type": "application/json", |
|
"HTTP-Referer": "https://huggingface.co", |
|
"X-Title": "HuggingFace Space" |
|
}, |
|
json={ |
|
"model": MODEL, |
|
"messages": messages, |
|
"temperature": 0.7, |
|
"max_tokens": 1500 |
|
}, |
|
timeout=30 |
|
) |
|
|
|
print(f"π‘ API Response: {response.status_code}") |
|
|
|
if response.status_code == 200: |
|
try: |
|
result = response.json() |
|
|
|
|
|
if 'choices' not in result or not result['choices']: |
|
print(f"β οΈ API response missing choices: {result}") |
|
return "API Error: No response choices available" |
|
elif 'message' not in result['choices'][0]: |
|
print(f"β οΈ API response missing message: {result}") |
|
return "API Error: No message in response" |
|
elif 'content' not in result['choices'][0]['message']: |
|
print(f"β οΈ API response missing content: {result}") |
|
return "API Error: No content in message" |
|
else: |
|
content = result['choices'][0]['message']['content'] |
|
|
|
|
|
if not content or content.strip() == "": |
|
print(f"β οΈ API returned empty content") |
|
return "API Error: Empty response content" |
|
|
|
print(f"β
API request successful") |
|
return content |
|
|
|
except (KeyError, IndexError, json.JSONDecodeError) as e: |
|
print(f"β Failed to parse API response: {str(e)}") |
|
return f"API Error: Failed to parse response - {str(e)}" |
|
elif response.status_code == 401: |
|
error_msg = f"π **Authentication Error**\n\n" |
|
error_msg += f"Your API key appears to be invalid or expired.\n\n" |
|
error_msg += f"**Troubleshooting:**\n" |
|
error_msg += f"1. Check that your **OPENROUTER_API_KEY** secret is set correctly\n" |
|
error_msg += f"2. Verify your API key at: https://openrouter.ai/keys\n" |
|
error_msg += f"3. Ensure your key starts with `sk-or-`\n" |
|
error_msg += f"4. Check that you have credits on your OpenRouter account" |
|
print(f"β API authentication failed: {response.status_code} - {response.text[:200]}") |
|
return error_msg |
|
elif response.status_code == 429: |
|
error_msg = f"β±οΈ **Rate Limit Exceeded**\n\n" |
|
error_msg += f"Too many requests. Please wait a moment and try again.\n\n" |
|
error_msg += f"**Troubleshooting:**\n" |
|
error_msg += f"1. Wait 30-60 seconds before trying again\n" |
|
error_msg += f"2. Check your OpenRouter usage limits\n" |
|
error_msg += f"3. Consider upgrading your OpenRouter plan" |
|
print(f"β Rate limit exceeded: {response.status_code}") |
|
return error_msg |
|
elif response.status_code == 400: |
|
try: |
|
error_data = response.json() |
|
error_message = error_data.get('error', {}).get('message', 'Unknown error') |
|
except: |
|
error_message = response.text |
|
|
|
error_msg = f"β οΈ **Request Error**\n\n" |
|
error_msg += f"The API request was invalid:\n" |
|
error_msg += f"`{error_message}`\n\n" |
|
if "model" in error_message.lower(): |
|
error_msg += f"**Model Issue:** The model `{MODEL}` may not be available.\n" |
|
error_msg += f"Try switching to a different model in your Space configuration." |
|
print(f"β Bad request: {response.status_code} - {error_message}") |
|
return error_msg |
|
else: |
|
error_msg = f"π« **API Error {response.status_code}**\n\n" |
|
error_msg += f"An unexpected error occurred. Please try again.\n\n" |
|
error_msg += f"If this persists, check:\n" |
|
error_msg += f"1. OpenRouter service status\n" |
|
error_msg += f"2. Your API key and credits\n" |
|
error_msg += f"3. The model availability" |
|
print(f"β API error: {response.status_code} - {response.text[:200]}") |
|
return error_msg |
|
|
|
except requests.exceptions.Timeout: |
|
error_msg = f"β° **Request Timeout**\n\n" |
|
error_msg += f"The API request took too long (30s limit).\n\n" |
|
error_msg += f"**Troubleshooting:**\n" |
|
error_msg += f"1. Try again with a shorter message\n" |
|
error_msg += f"2. Check your internet connection\n" |
|
error_msg += f"3. Try a different model" |
|
print(f"β Request timeout after 30 seconds") |
|
return error_msg |
|
except requests.exceptions.ConnectionError: |
|
error_msg = f"π **Connection Error**\n\n" |
|
error_msg += f"Could not connect to OpenRouter API.\n\n" |
|
error_msg += f"**Troubleshooting:**\n" |
|
error_msg += f"1. Check your internet connection\n" |
|
error_msg += f"2. Check OpenRouter service status\n" |
|
error_msg += f"3. Try again in a few moments" |
|
print(f"β Connection error to OpenRouter API") |
|
return error_msg |
|
except Exception as e: |
|
error_msg = f"β **Unexpected Error**\n\n" |
|
error_msg += f"An unexpected error occurred:\n" |
|
error_msg += f"`{str(e)}`\n\n" |
|
error_msg += f"Please try again or contact support if this persists." |
|
print(f"β Unexpected error: {str(e)}") |
|
return error_msg |
|
|
|
|
|
access_granted = gr.State(False) |
|
_access_granted_global = False |
|
|
|
def verify_access_code(code): |
|
"""Verify the access code""" |
|
global _access_granted_global |
|
if not ACCESS_CODE: |
|
_access_granted_global = True |
|
return gr.update(visible=False), gr.update(visible=True), gr.update(value=True) |
|
|
|
if code == ACCESS_CODE: |
|
_access_granted_global = True |
|
return gr.update(visible=False), gr.update(visible=True), gr.update(value=True) |
|
else: |
|
_access_granted_global = False |
|
return gr.update(visible=True, value="β Incorrect access code. Please try again."), gr.update(visible=False), gr.update(value=False) |
|
|
|
def protected_generate_response(message, history): |
|
"""Protected response function that checks access""" |
|
|
|
if ACCESS_CODE and not _access_granted_global: |
|
return "Please enter the access code to continue." |
|
return generate_response(message, history) |
|
|
|
def export_conversation(history): |
|
"""Export conversation to markdown file""" |
|
if not history: |
|
return gr.update(visible=False) |
|
|
|
markdown_content = export_conversation_to_markdown(history) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: |
|
f.write(markdown_content) |
|
temp_file = f.name |
|
|
|
return gr.update(value=temp_file, visible=True) |
|
|
|
|
|
def get_configuration_status(): |
|
"""Generate a configuration status message for display""" |
|
status_parts = [] |
|
|
|
if API_KEY_VALID: |
|
status_parts.append("β
**API Key:** Configured and valid") |
|
else: |
|
status_parts.append("β **API Key:** Not configured - Set `OPENROUTER_API_KEY` in Space secrets") |
|
|
|
status_parts.append(f"π€ **Model:** {MODEL}") |
|
status_parts.append(f"π‘οΈ **Temperature:** 0.7") |
|
status_parts.append(f"π **Max Tokens:** 1500") |
|
|
|
if GROUNDING_URLS: |
|
status_parts.append(f"π **URL Grounding:** {len(GROUNDING_URLS)} URLs configured") |
|
|
|
if ENABLE_DYNAMIC_URLS: |
|
status_parts.append("π **Dynamic URLs:** Enabled") |
|
|
|
if ENABLE_WEB_SEARCH: |
|
status_parts.append("π **Web Search:** Enabled") |
|
|
|
if ENABLE_VECTOR_RAG: |
|
status_parts.append("π **Document RAG:** Enabled") |
|
|
|
if ACCESS_CODE: |
|
status_parts.append("π **Access Control:** Enabled") |
|
else: |
|
status_parts.append("π **Access:** Public") |
|
|
|
return "\n".join(status_parts) |
|
|
|
|
|
with gr.Blocks(title=SPACE_NAME) as demo: |
|
gr.Markdown(f"# {SPACE_NAME}") |
|
gr.Markdown(SPACE_DESCRIPTION) |
|
|
|
|
|
with gr.Accordion("π Configuration Status", open=not API_KEY_VALID): |
|
gr.Markdown(get_configuration_status()) |
|
|
|
|
|
with gr.Column(visible=bool(ACCESS_CODE)) as access_section: |
|
gr.Markdown("### π Access Required") |
|
gr.Markdown("Please enter the access code provided by your instructor:") |
|
|
|
access_input = gr.Textbox( |
|
label="Access Code", |
|
placeholder="Enter access code...", |
|
type="password" |
|
) |
|
access_btn = gr.Button("Submit", variant="primary") |
|
access_error = gr.Markdown(visible=False) |
|
|
|
|
|
with gr.Column(visible=not bool(ACCESS_CODE)) as chat_section: |
|
chat_interface = gr.ChatInterface( |
|
fn=protected_generate_response, |
|
title="", |
|
description="", |
|
examples=None, |
|
type="messages" |
|
) |
|
|
|
|
|
with gr.Row(): |
|
export_btn = gr.Button("Export Conversation", variant="secondary", size="sm") |
|
export_file = gr.File(label="Download Conversation", visible=False) |
|
|
|
|
|
export_btn.click( |
|
export_conversation, |
|
inputs=[chat_interface], |
|
outputs=[export_file] |
|
) |
|
|
|
|
|
if ACCESS_CODE: |
|
access_btn.click( |
|
verify_access_code, |
|
inputs=[access_input], |
|
outputs=[access_error, chat_section, access_granted] |
|
) |
|
access_input.submit( |
|
verify_access_code, |
|
inputs=[access_input], |
|
outputs=[access_error, chat_section, access_granted] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|