|
import gradio as gr |
|
import tempfile |
|
import os |
|
import requests |
|
import json |
|
import re |
|
from bs4 import BeautifulSoup |
|
from datetime import datetime |
|
import urllib.parse |
|
|
|
|
|
|
|
SPACE_NAME = "Britannica Wiki Search" |
|
SPACE_DESCRIPTION = "" |
|
SYSTEM_PROMPT = """You are a research aid specializing in academic literature search and analysis. Your expertise spans discovering peer-reviewed sources, assessing research methodologies, synthesizing findings across studies, and delivering properly formatted citations. When responding, anchor claims in specific sources from provided URL contexts, differentiate between direct evidence and interpretive analysis, and note any limitations or contradictory results. Employ clear, accessible language that demystifies complex research, and propose connected research directions when appropriate. Your purpose is to serve as an informed research tool supporting users through initial concept development, exploratory investigation, information collection, and source compilation.""" |
|
MODEL = "openai/gpt-4o-mini-search-preview" |
|
GROUNDING_URLS = ["https://www.wikipedia.org/", "https://www.britannica.com/"] |
|
|
|
ACCESS_CODE = os.environ.get("SPACE_ACCESS_CODE", "") |
|
ENABLE_DYNAMIC_URLS = True |
|
ENABLE_VECTOR_RAG = False |
|
RAG_DATA = None |
|
|
|
|
|
API_KEY = os.environ.get("OPENROUTER_API_KEY") |
|
if API_KEY: |
|
API_KEY = API_KEY.strip() |
|
if not API_KEY: |
|
API_KEY = None |
|
|
|
|
|
def validate_api_key(): |
|
"""Validate API key configuration with detailed logging""" |
|
if not API_KEY: |
|
print(f"β οΈ API KEY CONFIGURATION ERROR:") |
|
print(f" Variable name: OPENROUTER_API_KEY") |
|
print(f" Status: Not set or empty") |
|
print(f" Action needed: Set 'OPENROUTER_API_KEY' in HuggingFace Space secrets") |
|
print(f" Expected format: sk-or-xxxxxxxxxx") |
|
return False |
|
elif not API_KEY.startswith('sk-or-'): |
|
print(f"β οΈ API KEY FORMAT WARNING:") |
|
print(f" Variable name: OPENROUTER_API_KEY") |
|
print(f" Current value: {API_KEY[:10]}..." if len(API_KEY) > 10 else API_KEY) |
|
print(f" Expected format: sk-or-xxxxxxxxxx") |
|
print(f" Note: OpenRouter keys should start with 'sk-or-'") |
|
return True |
|
else: |
|
print(f"β
API Key configured successfully") |
|
print(f" Variable: OPENROUTER_API_KEY") |
|
print(f" Format: Valid OpenRouter key") |
|
return True |
|
|
|
|
|
try: |
|
API_KEY_VALID = validate_api_key() |
|
except NameError: |
|
|
|
API_KEY_VALID = False |
|
|
|
def validate_url_domain(url): |
|
"""Basic URL domain validation""" |
|
try: |
|
from urllib.parse import urlparse |
|
parsed = urlparse(url) |
|
|
|
allowed_patterns = [ |
|
'wikipedia.org', 'britannica.com', 'edu', 'gov', 'arxiv.org', |
|
'pubmed.ncbi.nlm.nih.gov', 'scholar.google.com', 'researchgate.net' |
|
] |
|
return any(pattern in parsed.netloc.lower() for pattern in allowed_patterns) |
|
except: |
|
return False |
|
|
|
def fetch_url_content(url): |
|
"""Fetch content from URL with enhanced error handling""" |
|
if not validate_url_domain(url): |
|
return f"URL domain not in allowed list for security: {url}" |
|
|
|
try: |
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' |
|
} |
|
response = requests.get(url, headers=headers, timeout=10) |
|
response.raise_for_status() |
|
|
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
|
|
for script in soup(["script", "style"]): |
|
script.decompose() |
|
|
|
|
|
text = soup.get_text() |
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = ' '.join(chunk for chunk in chunks if chunk) |
|
|
|
|
|
if len(text) > 4000: |
|
text = text[:4000] + "..." |
|
|
|
return text |
|
except requests.exceptions.RequestException as e: |
|
return f"Error fetching {url}: {str(e)}" |
|
except Exception as e: |
|
return f"Error processing {url}: {str(e)}" |
|
|
|
def extract_urls_from_text(text): |
|
"""Extract URLs from text using regex""" |
|
url_pattern = r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+' |
|
urls = re.findall(url_pattern, text) |
|
|
|
|
|
filtered_urls = [] |
|
for url in urls: |
|
|
|
if '.' in url and len(url) > 10: |
|
filtered_urls.append(url) |
|
|
|
return filtered_urls |
|
|
|
|
|
_url_content_cache = {} |
|
|
|
def get_grounding_context(): |
|
"""Fetch context from grounding URLs with caching""" |
|
if not GROUNDING_URLS: |
|
return "" |
|
|
|
|
|
cache_key = tuple(sorted([url for url in GROUNDING_URLS if url and url.strip()])) |
|
|
|
|
|
if cache_key in _url_content_cache: |
|
return _url_content_cache[cache_key] |
|
|
|
context_parts = [] |
|
for i, url in enumerate(GROUNDING_URLS, 1): |
|
if url.strip(): |
|
content = fetch_url_content(url.strip()) |
|
context_parts.append(f"Context from URL {i} ({url}):\n{content}") |
|
|
|
if context_parts: |
|
result = "\n\n" + "\n\n".join(context_parts) + "\n\n" |
|
else: |
|
result = "" |
|
|
|
|
|
_url_content_cache[cache_key] = result |
|
return result |
|
|
|
def export_conversation_to_markdown(conversation_history): |
|
"""Export conversation history to markdown format""" |
|
if not conversation_history: |
|
return "No conversation to export." |
|
|
|
markdown_content = f"""# Conversation Export |
|
Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
|
Space: {SPACE_NAME} |
|
|
|
--- |
|
|
|
""" |
|
|
|
for i, message in enumerate(conversation_history): |
|
if isinstance(message, dict): |
|
|
|
role = message.get("role", "unknown") |
|
content = message.get("content", "") |
|
|
|
if role == "user": |
|
markdown_content += f"## π€ User\n\n{content}\n\n" |
|
elif role == "assistant": |
|
markdown_content += f"## π€ Assistant\n\n{content}\n\n" |
|
elif isinstance(message, (list, tuple)) and len(message) >= 2: |
|
|
|
user_msg, assistant_msg = message[0], message[1] |
|
if user_msg: |
|
markdown_content += f"## π€ User\n\n{user_msg}\n\n" |
|
if assistant_msg: |
|
markdown_content += f"## π€ Assistant\n\n{assistant_msg}\n\n" |
|
|
|
markdown_content += f""" |
|
|
|
--- |
|
|
|
*Exported from {SPACE_NAME} on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}* |
|
""" |
|
|
|
return markdown_content |
|
|
|
|
|
chat_history_store = [] |
|
|
|
def generate_response(message, history): |
|
"""Generate response using OpenRouter API""" |
|
|
|
|
|
if not API_KEY: |
|
error_msg = f"π **API Key Required**\n\n" |
|
error_msg += f"Please configure your OpenRouter API key:\n" |
|
error_msg += f"1. Go to Settings (βοΈ) in your HuggingFace Space\n" |
|
error_msg += f"2. Click 'Variables and secrets'\n" |
|
error_msg += f"3. Add secret: **OPENROUTER_API_KEY**\n" |
|
error_msg += f"4. Value: Your OpenRouter API key (starts with `sk-or-`)\n\n" |
|
error_msg += f"Get your API key at: https://openrouter.ai/keys" |
|
print(f"β API request failed: No API key configured for OPENROUTER_API_KEY") |
|
return error_msg |
|
|
|
|
|
grounding_context = get_grounding_context() |
|
|
|
|
|
if ENABLE_VECTOR_RAG and rag_context_provider: |
|
rag_context = rag_context_provider.get_context(message) |
|
if rag_context: |
|
grounding_context += rag_context |
|
|
|
|
|
if ENABLE_DYNAMIC_URLS: |
|
urls_in_message = extract_urls_from_text(message) |
|
if urls_in_message: |
|
dynamic_context_parts = [] |
|
for url in urls_in_message[:2]: |
|
content = fetch_url_content(url) |
|
dynamic_context_parts.append(f"\n\nDynamic context from {url}:\n{content}") |
|
if dynamic_context_parts: |
|
grounding_context += "\n".join(dynamic_context_parts) |
|
|
|
|
|
enhanced_system_prompt = SYSTEM_PROMPT + grounding_context |
|
|
|
|
|
messages = [{ |
|
"role": "system", |
|
"content": enhanced_system_prompt |
|
}] |
|
|
|
|
|
for chat in history: |
|
if isinstance(chat, dict): |
|
|
|
messages.append(chat) |
|
elif isinstance(chat, (list, tuple)) and len(chat) >= 2: |
|
|
|
user_msg, assistant_msg = chat[0], chat[1] |
|
if user_msg: |
|
messages.append({"role": "user", "content": user_msg}) |
|
if assistant_msg: |
|
messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
try: |
|
|
|
response = requests.post( |
|
url="https://openrouter.ai/api/v1/chat/completions", |
|
headers={ |
|
"Authorization": f"Bearer {API_KEY}", |
|
"Content-Type": "application/json", |
|
"HTTP-Referer": "https://huggingface.co", |
|
"X-Title": "HuggingFace Space" |
|
}, |
|
json={ |
|
"model": MODEL, |
|
"messages": messages, |
|
"temperature": 0.7, |
|
"max_tokens": 1500, |
|
"stream": False |
|
}, |
|
timeout=30 |
|
) |
|
|
|
if response.status_code == 200: |
|
try: |
|
response_data = response.json() |
|
assistant_response = response_data['choices'][0]['message']['content'] |
|
print(f"β
API request successful") |
|
return assistant_response |
|
except (KeyError, IndexError, json.JSONDecodeError) as e: |
|
print(f"β Failed to parse API response: {str(e)}") |
|
return f"API Error: Failed to parse response - {str(e)}" |
|
elif response.status_code == 401: |
|
error_msg = f"π **Authentication Error**\n\n" |
|
error_msg += f"Your API key appears to be invalid or expired.\n\n" |
|
error_msg += f"**Troubleshooting:**\n" |
|
error_msg += f"1. Check that your **OPENROUTER_API_KEY** secret is set correctly\n" |
|
error_msg += f"2. Verify your API key at: https://openrouter.ai/keys\n" |
|
error_msg += f"3. Ensure your key starts with `sk-or-`\n" |
|
error_msg += f"4. Check that you have credits on your OpenRouter account" |
|
print(f"β API authentication failed: {response.status_code} - {response.text[:200]}") |
|
return error_msg |
|
elif response.status_code == 429: |
|
error_msg = f"β±οΈ **Rate Limit Exceeded**\n\n" |
|
error_msg += f"Too many requests. Please wait a moment and try again.\n\n" |
|
error_msg += f"If this persists, check your OpenRouter plan limits." |
|
print(f"β Rate limit exceeded: {response.status_code}") |
|
return error_msg |
|
else: |
|
error_msg = f"π« **API Error {response.status_code}**\n\n" |
|
error_msg += f"An unexpected error occurred. Please try again.\n\n" |
|
error_msg += f"If this persists, the issue may be with the OpenRouter service." |
|
print(f"β API error: {response.status_code} - {response.text[:200]}") |
|
return error_msg |
|
|
|
except requests.exceptions.Timeout: |
|
error_msg = f"β° **Request Timeout**\n\n" |
|
error_msg += f"The request took too long to complete. Please try again with a shorter message." |
|
print(f"β Request timeout") |
|
return error_msg |
|
except requests.exceptions.ConnectionError: |
|
error_msg = f"π **Connection Error**\n\n" |
|
error_msg += f"Unable to connect to the AI service. Please check your internet connection and try again." |
|
print(f"β Connection error") |
|
return error_msg |
|
except Exception as e: |
|
error_msg = f"π₯ **Unexpected Error**\n\n" |
|
error_msg += f"An unexpected error occurred: {str(e)}\n\n" |
|
error_msg += f"Please try again or contact support if this persists." |
|
print(f"β Unexpected error: {str(e)}") |
|
return error_msg |
|
|
|
|
|
access_granted = gr.State(False) |
|
|
|
def verify_access_code(code): |
|
"""Verify the access code""" |
|
global _access_granted_global |
|
if not ACCESS_CODE: |
|
return gr.update(visible=False), gr.update(visible=True), True |
|
|
|
if code.strip() == ACCESS_CODE.strip(): |
|
_access_granted_global = True |
|
return gr.update(visible=False), gr.update(visible=True), True |
|
else: |
|
return gr.update(value="β Incorrect access code. Please try again.", visible=True), gr.update(visible=False), False |
|
|
|
def protected_generate_response(message, history): |
|
"""Wrapper that checks access before generating response""" |
|
global _access_granted_global |
|
if ACCESS_CODE and not _access_granted_global: |
|
return "Access denied. Please enter the correct access code." |
|
return generate_response(message, history) |
|
|
|
def store_and_generate_response(message, history): |
|
"""Wrapper that stores conversation and generates response""" |
|
global chat_history_store |
|
|
|
|
|
response = protected_generate_response(message, history) |
|
|
|
|
|
chat_history_store = history + [{"role": "user", "content": message}, {"role": "assistant", "content": response}] |
|
|
|
return response |
|
|
|
def export_current_conversation(): |
|
"""Export the current conversation""" |
|
if not chat_history_store: |
|
return gr.update(visible=False) |
|
|
|
markdown_content = export_conversation_to_markdown(chat_history_store) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: |
|
f.write(markdown_content) |
|
temp_file = f.name |
|
|
|
return gr.update(value=temp_file, visible=True) |
|
|
|
def export_conversation(history): |
|
"""Export conversation from gradio history""" |
|
if not history: |
|
return "No conversation to export." |
|
|
|
return export_conversation_to_markdown(history) |
|
|
|
def get_configuration_status(): |
|
"""Get current configuration status for display""" |
|
status_parts = [] |
|
|
|
if API_KEY_VALID: |
|
status_parts.append("β
**API Key:** Configured and valid") |
|
else: |
|
status_parts.append("β **API Key:** Not configured - Set `OPENROUTER_API_KEY` in Space secrets") |
|
|
|
status_parts.append(f"π€ **Model:** {MODEL}") |
|
status_parts.append(f"π‘οΈ **Temperature:** 0.7") |
|
status_parts.append(f"π **Max Tokens:** 1500") |
|
|
|
if GROUNDING_URLS: |
|
status_parts.append(f"π **URL Grounding:** {len(GROUNDING_URLS)} URLs configured") |
|
|
|
if ENABLE_DYNAMIC_URLS: |
|
status_parts.append("π **Dynamic URLs:** Enabled") |
|
|
|
if ENABLE_VECTOR_RAG: |
|
status_parts.append("π **Document RAG:** Enabled") |
|
|
|
if ACCESS_CODE: |
|
status_parts.append("π **Access Control:** Enabled") |
|
else: |
|
status_parts.append("π **Access:** Public") |
|
|
|
return "\n".join(status_parts) |
|
|
|
|
|
_access_granted_global = not bool(ACCESS_CODE) |
|
|
|
|
|
with gr.Blocks(title=SPACE_NAME) as demo: |
|
gr.Markdown(f"# {SPACE_NAME}") |
|
gr.Markdown(SPACE_DESCRIPTION) |
|
|
|
|
|
with gr.Accordion("π Configuration Status", open=not API_KEY_VALID): |
|
gr.Markdown(get_configuration_status()) |
|
|
|
|
|
if ACCESS_CODE: |
|
with gr.Column(visible=True) as access_section: |
|
gr.Markdown("π **Access Required**") |
|
gr.Markdown("Please enter the access code to use this space.") |
|
|
|
access_input = gr.Textbox( |
|
label="Access Code", |
|
placeholder="Enter access code...", |
|
type="password" |
|
) |
|
access_btn = gr.Button("Submit", variant="primary") |
|
access_error = gr.Markdown(visible=False) |
|
|
|
|
|
with gr.Column(visible=not bool(ACCESS_CODE)) as chat_section: |
|
chat_interface = gr.ChatInterface( |
|
fn=store_and_generate_response, |
|
title="", |
|
description="", |
|
examples=None, |
|
type="messages" |
|
) |
|
|
|
|
|
with gr.Row(): |
|
export_btn = gr.Button("π₯ Export Conversation", variant="secondary", size="sm") |
|
export_file = gr.File(label="Download Conversation", visible=False) |
|
|
|
|
|
export_btn.click( |
|
export_current_conversation, |
|
outputs=[export_file] |
|
) |
|
|
|
|
|
if ACCESS_CODE: |
|
access_btn.click( |
|
verify_access_code, |
|
inputs=[access_input], |
|
outputs=[access_error, chat_section, access_granted] |
|
) |
|
access_input.submit( |
|
verify_access_code, |
|
inputs=[access_input], |
|
outputs=[access_error, chat_section, access_granted] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |