|
import gradio as gr |
|
import tempfile |
|
import os |
|
import requests |
|
import json |
|
import re |
|
from bs4 import BeautifulSoup |
|
from datetime import datetime |
|
import urllib.parse |
|
|
|
|
|
|
|
SPACE_NAME = 'AI Assistant' |
|
SPACE_DESCRIPTION = 'A customizable AI assistant' |
|
|
|
|
|
DEFAULT_CONFIG = {{ |
|
'name': SPACE_NAME, |
|
'description': SPACE_DESCRIPTION, |
|
'system_prompt': "You are a pedagogically-minded academic assistant designed for introductory courses. Your approach follows constructivist learning principles: build on students' prior knowledge, scaffold complex concepts through graduated questioning, and use Socratic dialogue to guide discovery. Provide concise, evidence-based explanations that connect theory to lived experiences. Each response should model critical thinking by acknowledging multiple perspectives, identifying assumptions, and revealing conceptual relationships. Conclude with open-ended questions that promote higher-order thinking—analysis, synthesis, or evaluation—rather than recall.", |
|
'temperature': 0.7, |
|
'max_tokens': 750, |
|
'model': 'google/gemini-2.0-flash-001', |
|
'api_key_var': 'API_KEY', |
|
'theme': 'Default', |
|
'grounding_urls': '[]', |
|
'enable_dynamic_urls': True, |
|
'examples': ['Can you help me understand why the sky is blue?'], |
|
'locked': False |
|
}} |
|
|
|
|
|
def load_config(): |
|
"""Load configuration from config.json with fallback to defaults""" |
|
try: |
|
with open('config.json', 'r') as f: |
|
config = json.load(f) |
|
print("✅ Loaded configuration from config.json") |
|
return config |
|
except FileNotFoundError: |
|
print("ℹ️ No config.json found, using default configuration") |
|
|
|
try: |
|
with open('config.json', 'w') as f: |
|
json.dump(DEFAULT_CONFIG, f, indent=2) |
|
print("✅ Created config.json with default values") |
|
except: |
|
pass |
|
return DEFAULT_CONFIG |
|
except Exception as e: |
|
print(f"⚠️ Error loading config.json: {{e}}, using defaults") |
|
return DEFAULT_CONFIG |
|
|
|
|
|
config = load_config() |
|
|
|
|
|
SPACE_NAME = config.get('name', DEFAULT_CONFIG['name']) |
|
SPACE_DESCRIPTION = config.get('description', DEFAULT_CONFIG['description']) |
|
SYSTEM_PROMPT = config.get('system_prompt', DEFAULT_CONFIG['system_prompt']) |
|
temperature = config.get('temperature', DEFAULT_CONFIG['temperature']) |
|
max_tokens = config.get('max_tokens', DEFAULT_CONFIG['max_tokens']) |
|
MODEL = config.get('model', DEFAULT_CONFIG['model']) |
|
THEME = config.get('theme', DEFAULT_CONFIG['theme']) |
|
GROUNDING_URLS = config.get('grounding_urls', DEFAULT_CONFIG['grounding_urls']) |
|
ENABLE_DYNAMIC_URLS = config.get('enable_dynamic_urls', DEFAULT_CONFIG['enable_dynamic_urls']) |
|
|
|
|
|
|
|
ACCESS_CODE = os.environ.get("ACCESS_CODE") |
|
|
|
|
|
API_KEY_VAR = config.get('api_key_var', DEFAULT_CONFIG['api_key_var']) |
|
API_KEY = os.environ.get(API_KEY_VAR) |
|
if API_KEY: |
|
API_KEY = API_KEY.strip() |
|
if not API_KEY: |
|
API_KEY = None |
|
|
|
|
|
def validate_api_key(): |
|
"""Validate API key configuration with detailed logging""" |
|
if not API_KEY: |
|
print(f"⚠️ API KEY CONFIGURATION ERROR:") |
|
print(f" Variable name: {API_KEY_VAR}") |
|
print(f" Status: Not set or empty") |
|
print(f" Action needed: Set '{API_KEY_VAR}' in HuggingFace Space secrets") |
|
print(f" Expected format: sk-or-xxxxxxxxxx") |
|
return False |
|
elif not API_KEY.startswith('sk-or-'): |
|
print(f"⚠️ API KEY FORMAT WARNING:") |
|
print(f" Variable name: {API_KEY_VAR}") |
|
print(f" Current value: {API_KEY[:10]}..." if len(API_KEY) > 10 else "{API_KEY}") |
|
print(f" Expected format: sk-or-xxxxxxxxxx") |
|
print(f" Note: OpenRouter keys should start with 'sk-or-'") |
|
return True |
|
else: |
|
print(f"✅ API Key configured successfully") |
|
print(f" Variable: {API_KEY_VAR}") |
|
print(f" Format: Valid OpenRouter key") |
|
return True |
|
|
|
|
|
try: |
|
API_KEY_VALID = validate_api_key() |
|
except NameError: |
|
|
|
API_KEY_VALID = False |
|
|
|
def validate_url_domain(url): |
|
"""Basic URL domain validation""" |
|
try: |
|
from urllib.parse import urlparse |
|
parsed = urlparse(url) |
|
|
|
if parsed.netloc and '.' in parsed.netloc: |
|
return True |
|
except: |
|
pass |
|
return False |
|
|
|
def fetch_url_content(url): |
|
"""Enhanced URL content fetching with improved compatibility and error handling""" |
|
if not validate_url_domain(url): |
|
return f"Invalid URL format: {url}" |
|
|
|
try: |
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
|
'Accept-Language': 'en-US,en;q=0.5', |
|
'Accept-Encoding': 'gzip, deflate', |
|
'Connection': 'keep-alive' |
|
} |
|
|
|
response = requests.get(url, timeout=15, headers=headers) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
for element in soup(["script", "style", "nav", "header", "footer", "aside", "form", "button"]): |
|
element.decompose() |
|
|
|
|
|
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=lambda x: bool(x and 'content' in x.lower())) or soup |
|
text = main_content.get_text() |
|
|
|
|
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = ' '.join(chunk for chunk in chunks if chunk and len(chunk) > 2) |
|
|
|
|
|
if len(text) > 4000: |
|
truncated = text[:4000] |
|
last_period = truncated.rfind('.') |
|
if last_period > 3000: |
|
text = truncated[:last_period + 1] |
|
else: |
|
text = truncated + "..." |
|
|
|
return text if text.strip() else "No readable content found at this URL" |
|
|
|
except requests.exceptions.Timeout: |
|
return f"Timeout error fetching {url} (15s limit exceeded)" |
|
except requests.exceptions.RequestException as e: |
|
return f"Error fetching {url}: {str(e)}" |
|
except Exception as e: |
|
return f"Error processing content from {url}: {str(e)}" |
|
|
|
def extract_urls_from_text(text): |
|
"""Extract URLs from text using regex with enhanced validation""" |
|
import re |
|
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]"]+' |
|
urls = re.findall(url_pattern, text) |
|
|
|
|
|
validated_urls = [] |
|
for url in urls: |
|
|
|
url = url.rstrip('.,!?;:') |
|
|
|
if '.' in url and len(url) > 10: |
|
validated_urls.append(url) |
|
|
|
return validated_urls |
|
|
|
|
|
_url_content_cache = {} |
|
|
|
def get_grounding_context(): |
|
"""Fetch context from grounding URLs with caching""" |
|
if not GROUNDING_URLS: |
|
return "" |
|
|
|
|
|
cache_key = tuple(sorted([url for url in GROUNDING_URLS if url and url.strip()])) |
|
|
|
|
|
if cache_key in _url_content_cache: |
|
return _url_content_cache[cache_key] |
|
|
|
context_parts = [] |
|
for i, url in enumerate(GROUNDING_URLS, 1): |
|
if url.strip(): |
|
content = fetch_url_content(url.strip()) |
|
|
|
priority_label = "PRIMARY" if i <= 2 else "SECONDARY" |
|
context_parts.append(f"[{priority_label}] Context from URL {i} ({url}):\n{content}") |
|
|
|
if context_parts: |
|
result = "\n\ |
|
" + "\n\ |
|
".join(context_parts) + "\n\ |
|
" |
|
else: |
|
result = "" |
|
|
|
|
|
_url_content_cache[cache_key] = result |
|
return result |
|
|
|
def export_conversation_to_markdown(conversation_history): |
|
"""Export conversation history to markdown format""" |
|
if not conversation_history: |
|
return "No conversation to export." |
|
|
|
markdown_content = f"""# Conversation Export |
|
Generated on: {datetime.now().strftime('%%Y-%%m-%%d %%H:%%M:%%S')} |
|
|
|
--- |
|
|
|
""" |
|
|
|
message_pair_count = 0 |
|
for i, message in enumerate(conversation_history): |
|
if isinstance(message, dict): |
|
role = message.get('role', 'unknown') |
|
content = message.get('content', '') |
|
|
|
if role == 'user': |
|
message_pair_count += 1 |
|
markdown_content += f"## User Message {{message_pair_count}}\n\n{{content}}\n\ |
|
" |
|
elif role == 'assistant': |
|
markdown_content += f"## Assistant Response {{message_pair_count}}\n\n{{content}}\n\n---\n\ |
|
" |
|
elif isinstance(message, (list, tuple)) and len(message) >= 2: |
|
|
|
message_pair_count += 1 |
|
user_msg, assistant_msg = message[0], message[1] |
|
if user_msg: |
|
markdown_content += f"## User Message {{message_pair_count}}\n\n{{user_msg}}\n\ |
|
" |
|
if assistant_msg: |
|
markdown_content += f"## Assistant Response {{message_pair_count}}\n\n{{assistant_msg}}\n\n---\n\ |
|
" |
|
|
|
return markdown_content |
|
|
|
|
|
def generate_response(message, history): |
|
"""Generate response using OpenRouter API""" |
|
|
|
|
|
if not API_KEY: |
|
error_msg = f"🔑 **API Key Required**\n\ |
|
" |
|
error_msg += f"Please configure your OpenRouter API key:\ |
|
" |
|
error_msg += f"1. Go to Settings (⚙️) in your HuggingFace Space\ |
|
" |
|
error_msg += f"2. Click 'Variables and secrets'\ |
|
" |
|
error_msg += f"3. Add secret: **{API_KEY_VAR}**\ |
|
" |
|
error_msg += f"4. Value: Your OpenRouter API key (starts with `sk-or-`)\n\ |
|
" |
|
error_msg += f"Get your API key at: https://openrouter.ai/keys" |
|
print(f"❌ API request failed: No API key configured for {API_KEY_VAR}") |
|
return error_msg |
|
|
|
|
|
grounding_context = get_grounding_context() |
|
|
|
|
|
|
|
if ENABLE_DYNAMIC_URLS: |
|
urls_in_message = extract_urls_from_text(message) |
|
if urls_in_message: |
|
|
|
dynamic_context_parts = [] |
|
for url in urls_in_message[:3]: |
|
content = fetch_url_content(url) |
|
dynamic_context_parts.append(f"\n\nDynamic context from {url}:\n{content}") |
|
if dynamic_context_parts: |
|
grounding_context += "\ |
|
".join(dynamic_context_parts) |
|
|
|
|
|
enhanced_system_prompt = SYSTEM_PROMPT + grounding_context |
|
|
|
|
|
messages = [{"role": "system", "content": enhanced_system_prompt}] |
|
|
|
|
|
for chat in history: |
|
if isinstance(chat, dict): |
|
|
|
messages.append(chat) |
|
elif isinstance(chat, (list, tuple)) and len(chat) >= 2: |
|
|
|
user_msg, assistant_msg = chat[0], chat[1] |
|
if user_msg: |
|
messages.append({"role": "user", "content": user_msg}) |
|
if assistant_msg: |
|
messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
try: |
|
print(f"🔄 Making API request to OpenRouter...") |
|
print(f" Model: {MODEL}") |
|
print(f" Messages: {len(messages)} in conversation") |
|
|
|
response = requests.post( |
|
url="https://openrouter.ai/api/v1/chat/completions", |
|
headers={ |
|
"Authorization": f"Bearer {API_KEY}", |
|
"Content-Type": "application/json", |
|
"HTTP-Referer": "https://huggingface.co", |
|
"X-Title": "HuggingFace Space" |
|
}, |
|
json={ |
|
"model": MODEL, |
|
"messages": messages, |
|
"temperature": 0.7, |
|
"max_tokens": 750 |
|
}, |
|
timeout=30 |
|
) |
|
|
|
print(f"📡 API Response: {response.status_code}") |
|
|
|
if response.status_code == 200: |
|
try: |
|
result = response.json() |
|
|
|
|
|
if 'choices' not in result or not result['choices']: |
|
print(f"⚠️ API response missing choices: {result}") |
|
return "API Error: No response choices available" |
|
elif 'message' not in result['choices'][0]: |
|
print(f"⚠️ API response missing message: {result}") |
|
return "API Error: No message in response" |
|
elif 'content' not in result['choices'][0]['message']: |
|
print(f"⚠️ API response missing content: {result}") |
|
return "API Error: No content in message" |
|
else: |
|
content = result['choices'][0]['message']['content'] |
|
|
|
|
|
if not content or content.strip() == "": |
|
print(f"⚠️ API returned empty content") |
|
return "API Error: Empty response content" |
|
|
|
print(f"✅ API request successful") |
|
return content |
|
|
|
except (KeyError, IndexError, json.JSONDecodeError) as e: |
|
print(f"❌ Failed to parse API response: {str(e)}") |
|
return f"API Error: Failed to parse response - {str(e)}" |
|
elif response.status_code == 401: |
|
error_msg = f"🔐 **Authentication Error**\n\ |
|
" |
|
error_msg += f"Your API key appears to be invalid or expired.\n\ |
|
" |
|
error_msg += f"**Troubleshooting:**\ |
|
" |
|
error_msg += f"1. Check that your **{API_KEY_VAR}** secret is set correctly\ |
|
" |
|
error_msg += f"2. Verify your API key at: https://openrouter.ai/keys\ |
|
" |
|
error_msg += f"3. Ensure your key starts with `sk-or-`\ |
|
" |
|
error_msg += f"4. Check that you have credits on your OpenRouter account" |
|
print(f"❌ API authentication failed: {response.status_code} - {response.text[:200]}") |
|
return error_msg |
|
elif response.status_code == 429: |
|
error_msg = f"⏱️ **Rate Limit Exceeded**\n\ |
|
" |
|
error_msg += f"Too many requests. Please wait a moment and try again.\n\ |
|
" |
|
error_msg += f"**Troubleshooting:**\ |
|
" |
|
error_msg += f"1. Wait 30-60 seconds before trying again\n" |
|
error_msg += f"2. Check your OpenRouter usage limits\n" |
|
error_msg += f"3. Consider upgrading your OpenRouter plan" |
|
print(f"❌ Rate limit exceeded: {response.status_code}") |
|
return error_msg |
|
elif response.status_code == 400: |
|
try: |
|
error_data = response.json() |
|
error_message = error_data.get('error', {}).get('message', 'Unknown error') |
|
except: |
|
error_message = response.text |
|
|
|
error_msg = f"⚠️ **Request Error**\n\ |
|
" |
|
error_msg += f"The API request was invalid:\n" |
|
error_msg += f"`{error_message}`\n\ |
|
" |
|
if "model" in error_message.lower(): |
|
error_msg += f"**Model Issue:** The model `{MODEL}` may not be available.\n" |
|
error_msg += f"Try switching to a different model in your Space configuration." |
|
print(f"❌ Bad request: {response.status_code} - {error_message}") |
|
return error_msg |
|
else: |
|
error_msg = f"🚫 **API Error {response.status_code}**\n\ |
|
" |
|
error_msg += f"An unexpected error occurred. Please try again.\n\ |
|
" |
|
error_msg += f"If this persists, check:\n" |
|
error_msg += f"1. OpenRouter service status\n" |
|
error_msg += f"2. Your API key and credits\n" |
|
error_msg += f"3. The model availability" |
|
print(f"❌ API error: {response.status_code} - {response.text[:200]}") |
|
return error_msg |
|
|
|
except requests.exceptions.Timeout: |
|
error_msg = f"⏰ **Request Timeout**\n\ |
|
" |
|
error_msg += f"The API request took too long (30s limit).\n\ |
|
" |
|
error_msg += f"**Troubleshooting:**\n" |
|
error_msg += f"1. Try again with a shorter message\n" |
|
error_msg += f"2. Check your internet connection\n" |
|
error_msg += f"3. Try a different model" |
|
print(f"❌ Request timeout after 30 seconds") |
|
return error_msg |
|
except requests.exceptions.ConnectionError: |
|
error_msg = f"🌐 **Connection Error**\n\ |
|
" |
|
error_msg += f"Could not connect to OpenRouter API.\n\ |
|
" |
|
error_msg += f"**Troubleshooting:**\n" |
|
error_msg += f"1. Check your internet connection\n" |
|
error_msg += f"2. Check OpenRouter service status\n" |
|
error_msg += f"3. Try again in a few moments" |
|
print(f"❌ Connection error to OpenRouter API") |
|
return error_msg |
|
except Exception as e: |
|
error_msg = "❌ **Unexpected Error**\n\ |
|
" |
|
error_msg += "An unexpected error occurred:\n" |
|
error_msg += f"`{str(e)}`\n\ |
|
" |
|
error_msg += "Please try again or contact support if this persists." |
|
print(f"❌ Unexpected error: {str(e)}") |
|
return error_msg |
|
|
|
|
|
access_granted = gr.State(False) |
|
_access_granted_global = False |
|
|
|
def verify_access_code(code): |
|
"""Verify the access code""" |
|
global _access_granted_global |
|
if ACCESS_CODE is None: |
|
_access_granted_global = True |
|
return gr.update(visible=False), gr.update(visible=True), gr.update(value=True) |
|
|
|
if code == ACCESS_CODE: |
|
_access_granted_global = True |
|
return gr.update(visible=False), gr.update(visible=True), gr.update(value=True) |
|
else: |
|
_access_granted_global = False |
|
return gr.update(visible=True, value="❌ Incorrect access code. Please try again."), gr.update(visible=False), gr.update(value=False) |
|
|
|
def protected_generate_response(message, history): |
|
"""Protected response function that checks access""" |
|
|
|
if ACCESS_CODE is not None and not _access_granted_global: |
|
return "Please enter the access code to continue." |
|
return generate_response(message, history) |
|
|
|
|
|
chat_history_store = [] |
|
|
|
def store_and_generate_response(message, history): |
|
"""Wrapper function that stores history and generates response""" |
|
global chat_history_store |
|
|
|
|
|
response = protected_generate_response(message, history) |
|
|
|
|
|
|
|
chat_history_store = [] |
|
if history: |
|
for exchange in history: |
|
if isinstance(exchange, (list, tuple)) and len(exchange) >= 2: |
|
chat_history_store.append({"role": "user", "content": exchange[0]}) |
|
chat_history_store.append({"role": "assistant", "content": exchange[1]}) |
|
|
|
|
|
chat_history_store.append({"role": "user", "content": message}) |
|
chat_history_store.append({"role": "assistant", "content": response}) |
|
|
|
return response |
|
|
|
def export_current_conversation(): |
|
"""Export the current conversation""" |
|
if not chat_history_store: |
|
return gr.update(visible=False) |
|
|
|
markdown_content = export_conversation_to_markdown(chat_history_store) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: |
|
f.write(markdown_content) |
|
temp_file = f.name |
|
|
|
return gr.update(value=temp_file, visible=True) |
|
|
|
def export_conversation(history): |
|
"""Export conversation to markdown file""" |
|
if not history: |
|
return gr.update(visible=False) |
|
|
|
markdown_content = export_conversation_to_markdown(history) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: |
|
f.write(markdown_content) |
|
temp_file = f.name |
|
|
|
return gr.update(value=temp_file, visible=True) |
|
|
|
|
|
def get_configuration_status(): |
|
"""Generate a clean configuration status message for display""" |
|
status_parts = [] |
|
|
|
|
|
status_parts.append(f"**Name:** {SPACE_NAME}") |
|
status_parts.append(f"**Model:** {MODEL}") |
|
status_parts.append(f"**Theme:** {THEME}") |
|
status_parts.append(f"**Temperature:** 0.7") |
|
status_parts.append(f"**Max Response Tokens:** 750") |
|
status_parts.append("") |
|
|
|
|
|
status_parts.append("") |
|
examples_list = config.get('examples', []) |
|
if isinstance(examples_list, str): |
|
try: |
|
import ast |
|
examples_list = ast.literal_eval(examples_list) |
|
except: |
|
examples_list = [] |
|
|
|
if examples_list and len(examples_list) > 0: |
|
status_parts.append("**Example Prompts:**") |
|
for example in examples_list[:5]: |
|
status_parts.append(f"• {example}") |
|
if len(examples_list) > 5: |
|
status_parts.append(f"• ... and {len(examples_list) - 5} more") |
|
else: |
|
status_parts.append("**Example Prompts:** No example prompts configured") |
|
|
|
|
|
if GROUNDING_URLS and len(GROUNDING_URLS) > 0: |
|
status_parts.append("") |
|
status_parts.append("**Grounding URLs:**") |
|
for i, url in enumerate(GROUNDING_URLS[:5], 1): |
|
status_parts.append(f"{i}. {url}") |
|
if len(GROUNDING_URLS) > 5: |
|
status_parts.append(f"... and {len(GROUNDING_URLS) - 5} more URLs") |
|
|
|
|
|
status_parts.append("") |
|
status_parts.append(f"**System Prompt:** {SYSTEM_PROMPT}") |
|
|
|
|
|
status_parts.append("") |
|
if not API_KEY_VALID: |
|
status_parts.append(f"**Note:** API key ({API_KEY_VAR}) not configured in Space secrets") |
|
|
|
return "\n".join(status_parts) |
|
|
|
|
|
|
|
theme_class = getattr(gr.themes, THEME, gr.themes.Default) |
|
with gr.Blocks(title=SPACE_NAME, theme=theme_class()) as demo: |
|
gr.Markdown(f"# {SPACE_NAME}") |
|
gr.Markdown(SPACE_DESCRIPTION) |
|
|
|
|
|
with gr.Column(visible=(ACCESS_CODE is not None)) as access_section: |
|
gr.Markdown("### 🔐 Access Required") |
|
gr.Markdown("Please enter the access code provided by your instructor:") |
|
|
|
access_input = gr.Textbox( |
|
label="Access Code", |
|
placeholder="Enter access code...", |
|
type="password" |
|
) |
|
access_btn = gr.Button("Submit", variant="primary") |
|
access_error = gr.Markdown(visible=False) |
|
|
|
|
|
with gr.Column(visible=(ACCESS_CODE is None)) as chat_section: |
|
|
|
examples = config.get('examples', []) |
|
if isinstance(examples, str): |
|
try: |
|
import ast |
|
examples = ast.literal_eval(examples) |
|
except: |
|
examples = [] |
|
|
|
chat_interface = gr.ChatInterface( |
|
fn=store_and_generate_response, |
|
title="", |
|
description="", |
|
examples=examples if examples else None, |
|
type="messages" |
|
) |
|
|
|
|
|
with gr.Row(): |
|
export_btn = gr.Button("📥 Export Conversation", variant="secondary", size="sm") |
|
export_file = gr.File(label="Download", visible=False) |
|
|
|
|
|
export_btn.click( |
|
export_current_conversation, |
|
outputs=[export_file] |
|
) |
|
|
|
|
|
with gr.Accordion("Configuration", open=False): |
|
gr.Markdown(get_configuration_status()) |
|
|
|
|
|
if ACCESS_CODE is not None: |
|
access_btn.click( |
|
verify_access_code, |
|
inputs=[access_input], |
|
outputs=[access_error, chat_section, access_granted] |
|
) |
|
access_input.submit( |
|
verify_access_code, |
|
inputs=[access_input], |
|
outputs=[access_error, chat_section, access_granted] |
|
) |
|
|
|
|
|
with gr.Accordion("🔧 Faculty Configuration", open=False, visible=True) as faculty_section: |
|
gr.Markdown("**Faculty Only:** Edit assistant configuration. Requires CONFIG_CODE secret.") |
|
|
|
|
|
FACULTY_PASSWORD = os.environ.get("CONFIG_CODE", "").strip() |
|
|
|
if FACULTY_PASSWORD: |
|
faculty_auth_state = gr.State(False) |
|
|
|
|
|
with gr.Column() as faculty_auth_row: |
|
with gr.Row(): |
|
faculty_password_input = gr.Textbox( |
|
label="Faculty Password", |
|
type="password", |
|
placeholder="Enter faculty configuration password", |
|
scale=3 |
|
) |
|
faculty_auth_btn = gr.Button("Unlock Configuration", variant="primary", scale=1) |
|
faculty_auth_status = gr.Markdown("") |
|
|
|
|
|
with gr.Column(visible=False) as faculty_config_section: |
|
gr.Markdown("### Edit Assistant Configuration") |
|
gr.Markdown("⚠️ **Warning:** Changes will affect all users immediately.") |
|
|
|
|
|
try: |
|
with open('config.json', 'r') as f: |
|
current_config = json.load(f) |
|
except: |
|
|
|
current_config = DEFAULT_CONFIG.copy() |
|
|
|
|
|
|
|
edit_system_prompt = gr.Textbox( |
|
label="System Prompt", |
|
value=current_config.get('system_prompt', SYSTEM_PROMPT), |
|
lines=5 |
|
) |
|
|
|
|
|
edit_model = gr.Dropdown( |
|
label="Model", |
|
choices=[ |
|
"google/gemini-2.0-flash-001", |
|
"google/gemma-3-27b-it", |
|
"anthropic/claude-3.5-sonnet", |
|
"anthropic/claude-3.5-haiku", |
|
"openai/gpt-4o-mini-search-preview", |
|
"openai/gpt-4.1-nano", |
|
"nvidia/llama-3.1-nemotron-70b-instruct", |
|
"mistralai/devstral-small" |
|
], |
|
value=current_config.get('model', MODEL) |
|
) |
|
|
|
|
|
examples_value = current_config.get('examples', []) |
|
if isinstance(examples_value, list): |
|
examples_text_value = "\n".join(examples_value) |
|
else: |
|
examples_text_value = "" |
|
|
|
edit_examples = gr.Textbox( |
|
label="Example Prompts (one per line)", |
|
value=examples_text_value, |
|
lines=3, |
|
placeholder="What can you help me with?\nExplain this concept\nHelp me understand..." |
|
) |
|
|
|
|
|
with gr.Row(): |
|
edit_temperature = gr.Slider( |
|
label="Temperature", |
|
minimum=0, |
|
maximum=2, |
|
value=current_config.get('temperature', 0.7), |
|
step=0.1 |
|
) |
|
edit_max_tokens = gr.Slider( |
|
label="Max Tokens", |
|
minimum=50, |
|
maximum=4096, |
|
value=current_config.get('max_tokens', 750), |
|
step=50 |
|
) |
|
|
|
|
|
gr.Markdown("### URL Grounding") |
|
grounding_urls_value = current_config.get('grounding_urls', []) |
|
if isinstance(grounding_urls_value, str): |
|
try: |
|
import ast |
|
grounding_urls_value = ast.literal_eval(grounding_urls_value) |
|
except: |
|
grounding_urls_value = [] |
|
|
|
|
|
url_fields = [] |
|
for i in range(10): |
|
url_value = grounding_urls_value[i] if i < len(grounding_urls_value) else "" |
|
url_field = gr.Textbox( |
|
label=f"URL {i+1}" + (" (Primary)" if i < 2 else " (Secondary)"), |
|
value=url_value, |
|
placeholder="https://..." |
|
) |
|
url_fields.append(url_field) |
|
|
|
config_locked = gr.Checkbox( |
|
label="Lock Configuration (Prevent further edits)", |
|
value=current_config.get('locked', False) |
|
) |
|
|
|
with gr.Row(): |
|
save_config_btn = gr.Button("Save Configuration", variant="primary") |
|
reset_config_btn = gr.Button("Reset to Defaults", variant="secondary") |
|
|
|
config_status = gr.Markdown("") |
|
|
|
|
|
def verify_faculty_password(password): |
|
if password == FACULTY_PASSWORD: |
|
return ( |
|
gr.update(value="Authentication successful!"), |
|
gr.update(visible=False), |
|
gr.update(visible=True), |
|
True |
|
) |
|
else: |
|
return ( |
|
gr.update(value="Invalid password"), |
|
gr.update(visible=True), |
|
gr.update(visible=False), |
|
False |
|
) |
|
|
|
|
|
def save_configuration(new_prompt, new_model, new_examples, new_temp, new_tokens, *url_values, lock_config, is_authenticated): |
|
if not is_authenticated: |
|
return "Not authenticated" |
|
|
|
|
|
try: |
|
with open('config.json', 'r') as f: |
|
existing_config = json.load(f) |
|
if existing_config.get('locked', False): |
|
return "Configuration is locked and cannot be modified" |
|
except: |
|
pass |
|
|
|
|
|
try: |
|
with open('config.json', 'r') as f: |
|
current_full_config = json.load(f) |
|
except: |
|
|
|
current_full_config = DEFAULT_CONFIG.copy() |
|
|
|
|
|
examples_list = [ex.strip() for ex in new_examples.split('\n') if ex.strip()] |
|
|
|
|
|
urls = list(url_values[:-1]) |
|
lock_config_from_args = url_values[-1] |
|
|
|
grounding_urls = [url.strip() for url in urls if url.strip()] |
|
|
|
|
|
current_full_config.update({ |
|
'system_prompt': new_prompt, |
|
'model': new_model, |
|
'examples': examples_list, |
|
'temperature': new_temp, |
|
'max_tokens': int(new_tokens), |
|
'grounding_urls': grounding_urls, |
|
'locked': lock_config_from_args, |
|
'last_modified': datetime.now().isoformat(), |
|
'last_modified_by': 'faculty' |
|
}) |
|
|
|
try: |
|
with open('config.json', 'w') as f: |
|
json.dump(current_full_config, f, indent=2) |
|
|
|
|
|
hf_token = os.environ.get("HF_TOKEN") |
|
space_id = os.environ.get("SPACE_ID") |
|
|
|
if hf_token and space_id: |
|
try: |
|
from huggingface_hub import HfApi |
|
api = HfApi() |
|
api.upload_file( |
|
path_or_fileobj="config.json", |
|
path_in_repo="config.json", |
|
repo_id=space_id, |
|
repo_type="space", |
|
commit_message=f"Update configuration by faculty at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" |
|
) |
|
return f"✅ Configuration saved and committed to repository at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n🔄 **Space will restart automatically** to apply changes." |
|
except Exception as commit_error: |
|
print(f"Note: Could not auto-commit to repository: {commit_error}") |
|
return f"✅ Configuration saved locally at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n🔄 **Manual Restart Required**\nFor changes to take effect:\n1. Go to Settings (⚙️)\n2. Click 'Factory reboot'\n3. Wait ~30 seconds for restart" |
|
else: |
|
return f"✅ Configuration saved at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n🔄 **Manual Restart Required**\nFor changes to take effect:\n1. Go to Settings (⚙️)\n2. Click 'Factory reboot'\n3. Wait ~30 seconds for restart" |
|
except Exception as e: |
|
return f"❌ Error saving configuration: {str(e)}" |
|
|
|
|
|
def reset_configuration(is_authenticated): |
|
if not is_authenticated: |
|
updates = ["Not authenticated"] + [gr.update() for _ in range(14)] |
|
return tuple(updates) |
|
|
|
|
|
try: |
|
with open('config.json', 'r') as f: |
|
existing_config = json.load(f) |
|
if existing_config.get('locked', False): |
|
updates = ["Configuration is locked"] + [gr.update() for _ in range(14)] |
|
return tuple(updates) |
|
except: |
|
pass |
|
|
|
|
|
default_examples = DEFAULT_CONFIG.get('examples', []) |
|
if isinstance(default_examples, list): |
|
examples_text = "\n".join(default_examples) |
|
else: |
|
examples_text = "" |
|
|
|
|
|
default_urls = DEFAULT_CONFIG.get('grounding_urls', []) |
|
if isinstance(default_urls, str): |
|
try: |
|
import json |
|
default_urls = json.loads(default_urls) |
|
except: |
|
default_urls = [] |
|
elif not isinstance(default_urls, list): |
|
default_urls = [] |
|
|
|
|
|
updates = [ |
|
"Reset to default values", |
|
gr.update(value=DEFAULT_CONFIG.get('system_prompt', SYSTEM_PROMPT)), |
|
gr.update(value=DEFAULT_CONFIG.get('model', MODEL)), |
|
gr.update(value=examples_text), |
|
gr.update(value=DEFAULT_CONFIG.get('temperature', 0.7)), |
|
gr.update(value=DEFAULT_CONFIG.get('max_tokens', 750)) |
|
] |
|
|
|
|
|
for i in range(10): |
|
url_value = default_urls[i] if i < len(default_urls) else "" |
|
updates.append(gr.update(value=url_value)) |
|
|
|
return tuple(updates) |
|
|
|
|
|
faculty_auth_btn.click( |
|
verify_faculty_password, |
|
inputs=[faculty_password_input], |
|
outputs=[faculty_auth_status, faculty_auth_row, faculty_config_section, faculty_auth_state] |
|
) |
|
|
|
faculty_password_input.submit( |
|
verify_faculty_password, |
|
inputs=[faculty_password_input], |
|
outputs=[faculty_auth_status, faculty_auth_row, faculty_config_section, faculty_auth_state] |
|
) |
|
|
|
|
|
save_config_btn.click( |
|
save_configuration, |
|
inputs=[edit_system_prompt, edit_model, edit_examples, edit_temperature, edit_max_tokens] + url_fields + [config_locked, faculty_auth_state], |
|
outputs=[config_status] |
|
) |
|
|
|
reset_config_btn.click( |
|
reset_configuration, |
|
inputs=[faculty_auth_state], |
|
outputs=[config_status, edit_system_prompt, edit_model, edit_examples, edit_temperature, edit_max_tokens] + url_fields |
|
) |
|
else: |
|
gr.Markdown("Faculty configuration is not enabled. Set CONFIG_CODE in Space secrets to enable.") |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|