Spaces:
Sleeping
Sleeping
import gradio as gr | |
import tempfile | |
import os | |
import requests | |
import json | |
import re | |
from bs4 import BeautifulSoup | |
from datetime import datetime | |
import urllib.parse | |
# Configuration | |
SPACE_NAME = "English3180 Helper" | |
SPACE_DESCRIPTION = "A bot to help brainstorm" | |
# Default configuration values | |
DEFAULT_SYSTEM_PROMPT = """You are a pedagogically-minded academic assistant designed for an elective in the English Department at Brooklyn College. The subject of the course is literature and cultural diversity. Here is the course description: In our class, we will focus on US multi-ethnic and multi-racial literature, mostly novels, poems, and essays. We will think through several themes, which resonate with the experiences of diverse communities in the US: the development of individual and communal identities, complex understandings of home, the impact of urban space, the importance of work, and others. We will also consider formal questions and the literary forms and genres deployed to engage with multi-ethnic and multi-racial identities and communities. | |
We will ask a variety of questions: How do novels and memoirs trace the development of the identities of their characters? How do ethnicity and race (but also gender, sexuality, religion, immigration, age, class, education, and language) figure in the process of identity formation? How do fictional accounts conceive of home, of work, of local and national identities, of relationships and, in doing so, differ from memoir and other nonfiction? How do these texts use elements of the urban space – domestic spaces such as apartments or brownstones or public spaces such as subways, streets, parks, schools, or libraries – to depict, interrogate, and negotiate the process of identity formation? What larger historical and social processes do novels and memoirs emphasize? What are the literary forms that shape and are shaped by these trajectories? | |
Your approach follows constructivist learning principles: build on students' prior knowledge, scaffold complex concepts through graduated questioning, and use Socratic dialogue to guide discovery. Provide concise, evidence-based explanations that connect theory to lived experiences. Each response should model critical thinking by acknowledging multiple perspectives, identifying assumptions, and revealing conceptual relationships. Conclude with open-ended questions that promote higher-order thinking—analysis, synthesis, or evaluation—rather than recall. DO NOT PROVIDE STUDENTS WITH IDEAS. Students must propose an idea before getting feedback.""" | |
DEFAULT_TEMPERATURE = 1.2 | |
DEFAULT_MAX_TOKENS = 1400 | |
# Try to load configuration from file (if modified by faculty) | |
try: | |
with open('config.json', 'r') as f: | |
saved_config = json.load(f) | |
SYSTEM_PROMPT = saved_config.get('system_prompt', DEFAULT_SYSTEM_PROMPT) | |
temperature = saved_config.get('temperature', DEFAULT_TEMPERATURE) | |
max_tokens = saved_config.get('max_tokens', DEFAULT_MAX_TOKENS) | |
print("✅ Loaded configuration from config.json") | |
except: | |
# Use defaults if no config file or error | |
SYSTEM_PROMPT = DEFAULT_SYSTEM_PROMPT | |
temperature = DEFAULT_TEMPERATURE | |
max_tokens = DEFAULT_MAX_TOKENS | |
print("ℹ️ Using default configuration") | |
MODEL = "anthropic/claude-3.5-sonnet" | |
THEME = "Soft" # Gradio theme name | |
GROUNDING_URLS = [] | |
# Get access code from environment variable for security | |
# If ACCESS_CODE is not set, no access control is applied | |
ACCESS_CODE = os.environ.get("ACCESS_CODE") | |
ENABLE_DYNAMIC_URLS = True | |
# Get API key from environment - customizable variable name with validation | |
API_KEY = os.environ.get("API_KEY") | |
if API_KEY: | |
API_KEY = API_KEY.strip() # Remove any whitespace | |
if not API_KEY: # Check if empty after stripping | |
API_KEY = None | |
# API Key validation and logging | |
def validate_api_key(): | |
"""Validate API key configuration with detailed logging""" | |
if not API_KEY: | |
print(f"⚠️ API KEY CONFIGURATION ERROR:") | |
print(f" Variable name: API_KEY") | |
print(f" Status: Not set or empty") | |
print(f" Action needed: Set 'API_KEY' in HuggingFace Space secrets") | |
print(f" Expected format: sk-or-xxxxxxxxxx") | |
return False | |
elif not API_KEY.startswith('sk-or-'): | |
print(f"⚠️ API KEY FORMAT WARNING:") | |
print(f" Variable name: API_KEY") | |
print(f" Current value: {API_KEY[:10]}..." if len(API_KEY) > 10 else API_KEY) | |
print(f" Expected format: sk-or-xxxxxxxxxx") | |
print(f" Note: OpenRouter keys should start with 'sk-or-'") | |
return True # Still try to use it | |
else: | |
print(f"✅ API Key configured successfully") | |
print(f" Variable: API_KEY") | |
print(f" Format: Valid OpenRouter key") | |
return True | |
# Validate on startup | |
try: | |
API_KEY_VALID = validate_api_key() | |
except NameError: | |
# During template generation, API_KEY might not be defined yet | |
API_KEY_VALID = False | |
def validate_url_domain(url): | |
"""Basic URL domain validation""" | |
try: | |
from urllib.parse import urlparse | |
parsed = urlparse(url) | |
# Check for valid domain structure | |
if parsed.netloc and '.' in parsed.netloc: | |
return True | |
except: | |
pass | |
return False | |
def fetch_url_content(url): | |
"""Enhanced URL content fetching with improved compatibility and error handling""" | |
if not validate_url_domain(url): | |
return f"Invalid URL format: {url}" | |
try: | |
# Enhanced headers for better compatibility | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate', | |
'Connection': 'keep-alive' | |
} | |
response = requests.get(url, timeout=15, headers=headers) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Enhanced content cleaning | |
for element in soup(["script", "style", "nav", "header", "footer", "aside", "form", "button"]): | |
element.decompose() | |
# Extract main content preferentially | |
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=lambda x: bool(x and 'content' in x.lower())) or soup | |
text = main_content.get_text() | |
# Enhanced text cleaning | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = ' '.join(chunk for chunk in chunks if chunk and len(chunk) > 2) | |
# Smart truncation - try to end at sentence boundaries | |
if len(text) > 4000: | |
truncated = text[:4000] | |
last_period = truncated.rfind('.') | |
if last_period > 3000: # If we can find a reasonable sentence break | |
text = truncated[:last_period + 1] | |
else: | |
text = truncated + "..." | |
return text if text.strip() else "No readable content found at this URL" | |
except requests.exceptions.Timeout: | |
return f"Timeout error fetching {url} (15s limit exceeded)" | |
except requests.exceptions.RequestException as e: | |
return f"Error fetching {url}: {str(e)}" | |
except Exception as e: | |
return f"Error processing content from {url}: {str(e)}" | |
def extract_urls_from_text(text): | |
"""Extract URLs from text using regex with enhanced validation""" | |
import re | |
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]"]+' | |
urls = re.findall(url_pattern, text) | |
# Basic URL validation and cleanup | |
validated_urls = [] | |
for url in urls: | |
# Remove trailing punctuation that might be captured | |
url = url.rstrip('.,!?;:') | |
# Basic domain validation | |
if '.' in url and len(url) > 10: | |
validated_urls.append(url) | |
return validated_urls | |
# Global cache for URL content to avoid re-crawling in generated spaces | |
_url_content_cache = {} | |
def get_grounding_context(): | |
"""Fetch context from grounding URLs with caching""" | |
if not GROUNDING_URLS: | |
return "" | |
# Create cache key from URLs | |
cache_key = tuple(sorted([url for url in GROUNDING_URLS if url and url.strip()])) | |
# Check cache first | |
if cache_key in _url_content_cache: | |
return _url_content_cache[cache_key] | |
context_parts = [] | |
for i, url in enumerate(GROUNDING_URLS, 1): | |
if url.strip(): | |
content = fetch_url_content(url.strip()) | |
# Add priority indicators | |
priority_label = "PRIMARY" if i <= 2 else "SECONDARY" | |
context_parts.append(f"[{priority_label}] Context from URL {i} ({url}):\n{content}") | |
if context_parts: | |
result = "\n\n" + "\n\n".join(context_parts) + "\n\n" | |
else: | |
result = "" | |
# Cache the result | |
_url_content_cache[cache_key] = result | |
return result | |
def export_conversation_to_markdown(conversation_history): | |
"""Export conversation history to markdown format""" | |
if not conversation_history: | |
return "No conversation to export." | |
markdown_content = f"""# Conversation Export | |
Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
--- | |
""" | |
message_pair_count = 0 | |
for i, message in enumerate(conversation_history): | |
if isinstance(message, dict): | |
role = message.get('role', 'unknown') | |
content = message.get('content', '') | |
if role == 'user': | |
message_pair_count += 1 | |
markdown_content += f"## User Message {message_pair_count}\n\n{content}\n\n" | |
elif role == 'assistant': | |
markdown_content += f"## Assistant Response {message_pair_count}\n\n{content}\n\n---\n\n" | |
elif isinstance(message, (list, tuple)) and len(message) >= 2: | |
# Handle legacy tuple format: ["user msg", "assistant msg"] | |
message_pair_count += 1 | |
user_msg, assistant_msg = message[0], message[1] | |
if user_msg: | |
markdown_content += f"## User Message {message_pair_count}\n\n{user_msg}\n\n" | |
if assistant_msg: | |
markdown_content += f"## Assistant Response {message_pair_count}\n\n{assistant_msg}\n\n---\n\n" | |
return markdown_content | |
def generate_response(message, history): | |
"""Generate response using OpenRouter API""" | |
# Enhanced API key validation with helpful messages | |
if not API_KEY: | |
error_msg = f"🔑 **API Key Required**\n\n" | |
error_msg += f"Please configure your OpenRouter API key:\n" | |
error_msg += f"1. Go to Settings (⚙️) in your HuggingFace Space\n" | |
error_msg += f"2. Click 'Variables and secrets'\n" | |
error_msg += f"3. Add secret: **API_KEY**\n" | |
error_msg += f"4. Value: Your OpenRouter API key (starts with `sk-or-`)\n\n" | |
error_msg += f"Get your API key at: https://openrouter.ai/keys" | |
print(f"❌ API request failed: No API key configured for API_KEY") | |
return error_msg | |
# Get grounding context | |
grounding_context = get_grounding_context() | |
# If dynamic URLs are enabled, check message for URLs to fetch | |
if ENABLE_DYNAMIC_URLS: | |
urls_in_message = extract_urls_from_text(message) | |
if urls_in_message: | |
# Fetch content from URLs mentioned in the message | |
dynamic_context_parts = [] | |
for url in urls_in_message[:3]: # Limit to 3 URLs per message | |
content = fetch_url_content(url) | |
dynamic_context_parts.append(f"\n\nDynamic context from {url}:\n{content}") | |
if dynamic_context_parts: | |
grounding_context += "\n".join(dynamic_context_parts) | |
# Build enhanced system prompt with grounding context | |
enhanced_system_prompt = SYSTEM_PROMPT + grounding_context | |
# Build messages array for the API | |
messages = [{"role": "system", "content": enhanced_system_prompt}] | |
# Add conversation history - handle both modern messages format and legacy tuples | |
for chat in history: | |
if isinstance(chat, dict): | |
# Modern format: {"role": "user", "content": "..."} or {"role": "assistant", "content": "..."} | |
messages.append(chat) | |
elif isinstance(chat, (list, tuple)) and len(chat) >= 2: | |
# Legacy format: ["user msg", "assistant msg"] or ("user msg", "assistant msg") | |
user_msg, assistant_msg = chat[0], chat[1] | |
if user_msg: | |
messages.append({"role": "user", "content": user_msg}) | |
if assistant_msg: | |
messages.append({"role": "assistant", "content": assistant_msg}) | |
# Add current message | |
messages.append({"role": "user", "content": message}) | |
# Make API request with enhanced error handling | |
try: | |
print(f"🔄 Making API request to OpenRouter...") | |
print(f" Model: {MODEL}") | |
print(f" Messages: {len(messages)} in conversation") | |
response = requests.post( | |
url="https://openrouter.ai/api/v1/chat/completions", | |
headers={ | |
"Authorization": f"Bearer {API_KEY}", | |
"Content-Type": "application/json", | |
"HTTP-Referer": "https://huggingface.co", # Required by some providers | |
"X-Title": "HuggingFace Space" # Helpful for tracking | |
}, | |
json={ | |
"model": MODEL, | |
"messages": messages, | |
"temperature": 1.2, | |
"max_tokens": 1400 | |
}, | |
timeout=30 | |
) | |
print(f"📡 API Response: {response.status_code}") | |
if response.status_code == 200: | |
try: | |
result = response.json() | |
# Enhanced validation of API response structure | |
if 'choices' not in result or not result['choices']: | |
print(f"⚠️ API response missing choices: {result}") | |
return "API Error: No response choices available" | |
elif 'message' not in result['choices'][0]: | |
print(f"⚠️ API response missing message: {result}") | |
return "API Error: No message in response" | |
elif 'content' not in result['choices'][0]['message']: | |
print(f"⚠️ API response missing content: {result}") | |
return "API Error: No content in message" | |
else: | |
content = result['choices'][0]['message']['content'] | |
# Check for empty content | |
if not content or content.strip() == "": | |
print(f"⚠️ API returned empty content") | |
return "API Error: Empty response content" | |
print(f"✅ API request successful") | |
return content | |
except (KeyError, IndexError, json.JSONDecodeError) as e: | |
print(f"❌ Failed to parse API response: {str(e)}") | |
return f"API Error: Failed to parse response - {str(e)}" | |
elif response.status_code == 401: | |
error_msg = f"🔐 **Authentication Error**\n\n" | |
error_msg += f"Your API key appears to be invalid or expired.\n\n" | |
error_msg += f"**Troubleshooting:**\n" | |
error_msg += f"1. Check that your **API_KEY** secret is set correctly\n" | |
error_msg += f"2. Verify your API key at: https://openrouter.ai/keys\n" | |
error_msg += f"3. Ensure your key starts with `sk-or-`\n" | |
error_msg += f"4. Check that you have credits on your OpenRouter account" | |
print(f"❌ API authentication failed: {response.status_code} - {response.text[:200]}") | |
return error_msg | |
elif response.status_code == 429: | |
error_msg = f"⏱️ **Rate Limit Exceeded**\n\n" | |
error_msg += f"Too many requests. Please wait a moment and try again.\n\n" | |
error_msg += f"**Troubleshooting:**\n" | |
error_msg += f"1. Wait 30-60 seconds before trying again\n" | |
error_msg += f"2. Check your OpenRouter usage limits\n" | |
error_msg += f"3. Consider upgrading your OpenRouter plan" | |
print(f"❌ Rate limit exceeded: {response.status_code}") | |
return error_msg | |
elif response.status_code == 400: | |
try: | |
error_data = response.json() | |
error_message = error_data.get('error', {}).get('message', 'Unknown error') | |
except: | |
error_message = response.text | |
error_msg = f"⚠️ **Request Error**\n\n" | |
error_msg += f"The API request was invalid:\n" | |
error_msg += f"`{error_message}`\n\n" | |
if "model" in error_message.lower(): | |
error_msg += f"**Model Issue:** The model `{MODEL}` may not be available.\n" | |
error_msg += f"Try switching to a different model in your Space configuration." | |
print(f"❌ Bad request: {response.status_code} - {error_message}") | |
return error_msg | |
else: | |
error_msg = f"🚫 **API Error {response.status_code}**\n\n" | |
error_msg += f"An unexpected error occurred. Please try again.\n\n" | |
error_msg += f"If this persists, check:\n" | |
error_msg += f"1. OpenRouter service status\n" | |
error_msg += f"2. Your API key and credits\n" | |
error_msg += f"3. The model availability" | |
print(f"❌ API error: {response.status_code} - {response.text[:200]}") | |
return error_msg | |
except requests.exceptions.Timeout: | |
error_msg = f"⏰ **Request Timeout**\n\n" | |
error_msg += f"The API request took too long (30s limit).\n\n" | |
error_msg += f"**Troubleshooting:**\n" | |
error_msg += f"1. Try again with a shorter message\n" | |
error_msg += f"2. Check your internet connection\n" | |
error_msg += f"3. Try a different model" | |
print(f"❌ Request timeout after 30 seconds") | |
return error_msg | |
except requests.exceptions.ConnectionError: | |
error_msg = f"🌐 **Connection Error**\n\n" | |
error_msg += f"Could not connect to OpenRouter API.\n\n" | |
error_msg += f"**Troubleshooting:**\n" | |
error_msg += f"1. Check your internet connection\n" | |
error_msg += f"2. Check OpenRouter service status\n" | |
error_msg += f"3. Try again in a few moments" | |
print(f"❌ Connection error to OpenRouter API") | |
return error_msg | |
except Exception as e: | |
error_msg = f"❌ **Unexpected Error**\n\n" | |
error_msg += f"An unexpected error occurred:\n" | |
error_msg += f"`{str(e)}`\n\n" | |
error_msg += f"Please try again or contact support if this persists." | |
print(f"❌ Unexpected error: {str(e)}") | |
return error_msg | |
# Access code verification | |
access_granted = gr.State(False) | |
_access_granted_global = False # Global fallback | |
def verify_access_code(code): | |
"""Verify the access code""" | |
global _access_granted_global | |
if ACCESS_CODE is None: | |
_access_granted_global = True | |
return gr.update(visible=False), gr.update(visible=True), gr.update(value=True) | |
if code == ACCESS_CODE: | |
_access_granted_global = True | |
return gr.update(visible=False), gr.update(visible=True), gr.update(value=True) | |
else: | |
_access_granted_global = False | |
return gr.update(visible=True, value="❌ Incorrect access code. Please try again."), gr.update(visible=False), gr.update(value=False) | |
def protected_generate_response(message, history): | |
"""Protected response function that checks access""" | |
# Check if access is granted via the global variable | |
if ACCESS_CODE is not None and not _access_granted_global: | |
return "Please enter the access code to continue." | |
return generate_response(message, history) | |
# Global variable to store chat history for export | |
chat_history_store = [] | |
def store_and_generate_response(message, history): | |
"""Wrapper function that stores history and generates response""" | |
global chat_history_store | |
# Generate response using the protected function | |
response = protected_generate_response(message, history) | |
# Convert current history to the format we need for export | |
# history comes in as [["user1", "bot1"], ["user2", "bot2"], ...] | |
chat_history_store = [] | |
if history: | |
for exchange in history: | |
if isinstance(exchange, (list, tuple)) and len(exchange) >= 2: | |
chat_history_store.append({"role": "user", "content": exchange[0]}) | |
chat_history_store.append({"role": "assistant", "content": exchange[1]}) | |
# Add the current exchange | |
chat_history_store.append({"role": "user", "content": message}) | |
chat_history_store.append({"role": "assistant", "content": response}) | |
return response | |
def export_current_conversation(): | |
"""Export the current conversation""" | |
if not chat_history_store: | |
return gr.update(visible=False) | |
markdown_content = export_conversation_to_markdown(chat_history_store) | |
# Save to temporary file | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: | |
f.write(markdown_content) | |
temp_file = f.name | |
return gr.update(value=temp_file, visible=True) | |
def export_conversation(history): | |
"""Export conversation to markdown file""" | |
if not history: | |
return gr.update(visible=False) | |
markdown_content = export_conversation_to_markdown(history) | |
# Save to temporary file | |
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: | |
f.write(markdown_content) | |
temp_file = f.name | |
return gr.update(value=temp_file, visible=True) | |
# Configuration status display | |
def get_configuration_status(): | |
"""Generate a configuration status message for display""" | |
status_parts = [] | |
# API Key status | |
status_parts.append("### 🔑 API Configuration") | |
if API_KEY_VALID: | |
status_parts.append("✅ **API Key:** Ready") | |
else: | |
status_parts.append("❌ **API Key:** Not configured") | |
status_parts.append(" Set `API_KEY` in Space secrets") | |
# Model and parameters | |
status_parts.append("") # Blank line | |
status_parts.append("### 🤖 Model Settings") | |
status_parts.append(f"**Model:** {MODEL.split('/')[-1]}") | |
status_parts.append(f"**Temperature:** 1.2") | |
status_parts.append(f"**Max Tokens:** 1400") | |
# URL Context if configured | |
if GROUNDING_URLS: | |
status_parts.append("") # Blank line | |
status_parts.append("### 🔗 Context Sources") | |
status_parts.append(f"**URLs Configured:** {len(GROUNDING_URLS)}") | |
for i, url in enumerate(GROUNDING_URLS[:2], 1): | |
status_parts.append(f" {i}. {url[:50]}{'...' if len(url) > 50 else ''}") | |
if len(GROUNDING_URLS) > 2: | |
status_parts.append(f" ... and {len(GROUNDING_URLS) - 2} more") | |
# Access control | |
if ACCESS_CODE is not None: | |
status_parts.append("") # Blank line | |
status_parts.append("### 🔐 Access Control") | |
status_parts.append("**Status:** Password protected") | |
# System prompt | |
status_parts.append("") # Blank line | |
status_parts.append("### 📝 System Prompt") | |
# Show first 200 chars of system prompt | |
prompt_preview = SYSTEM_PROMPT[:200] + "..." if len(SYSTEM_PROMPT) > 200 else SYSTEM_PROMPT | |
status_parts.append(f"```\n{prompt_preview}\n```") | |
return "\n".join(status_parts) | |
# Create interface with access code protection | |
# Dynamically set theme based on configuration | |
theme_class = getattr(gr.themes, THEME, gr.themes.Default) | |
with gr.Blocks(title=SPACE_NAME, theme=theme_class()) as demo: | |
gr.Markdown(f"# {SPACE_NAME}") | |
gr.Markdown(SPACE_DESCRIPTION) | |
# Access code section (shown only if ACCESS_CODE is set) | |
with gr.Column(visible=(ACCESS_CODE is not None)) as access_section: | |
gr.Markdown("### 🔐 Access Required") | |
gr.Markdown("Please enter the access code provided by your instructor:") | |
access_input = gr.Textbox( | |
label="Access Code", | |
placeholder="Enter access code...", | |
type="password" | |
) | |
access_btn = gr.Button("Submit", variant="primary") | |
access_error = gr.Markdown(visible=False) | |
# Main chat interface (hidden until access granted) | |
with gr.Column(visible=(ACCESS_CODE is None)) as chat_section: | |
chat_interface = gr.ChatInterface( | |
fn=store_and_generate_response, # Use wrapper function to store history | |
title="", # Title already shown above | |
description="", # Description already shown above | |
examples=['Can you pose a counterargument to my idea?', 'Can you help me expand my idea?'], | |
type="messages" # Use modern message format for better compatibility | |
) | |
# Export functionality | |
with gr.Row(): | |
export_btn = gr.Button("📥 Export Conversation", variant="secondary", size="sm") | |
export_file = gr.File(label="Download", visible=False) | |
# Connect export functionality | |
export_btn.click( | |
export_current_conversation, | |
outputs=[export_file] | |
) | |
# Configuration status | |
with gr.Accordion("📊 Configuration Status", open=True): | |
gr.Markdown(get_configuration_status()) | |
# Connect access verification | |
if ACCESS_CODE is not None: | |
access_btn.click( | |
verify_access_code, | |
inputs=[access_input], | |
outputs=[access_error, chat_section, access_granted] | |
) | |
access_input.submit( | |
verify_access_code, | |
inputs=[access_input], | |
outputs=[access_error, chat_section, access_granted] | |
) | |
# Faculty Configuration Section - appears at the bottom with password protection | |
with gr.Accordion("🔧 Faculty Configuration", open=False, visible=True) as faculty_section: | |
gr.Markdown("**Faculty Only:** Edit assistant configuration. Requires CONFIG_CODE secret.") | |
# Check if faculty password is configured | |
FACULTY_PASSWORD = os.environ.get("CONFIG_CODE", "").strip() | |
if FACULTY_PASSWORD: | |
faculty_auth_state = gr.State(False) | |
# Authentication row | |
with gr.Column() as faculty_auth_row: | |
with gr.Row(): | |
faculty_password_input = gr.Textbox( | |
label="Faculty Password", | |
type="password", | |
placeholder="Enter faculty configuration password", | |
scale=3 | |
) | |
faculty_auth_btn = gr.Button("Unlock Configuration", variant="primary", scale=1) | |
faculty_auth_status = gr.Markdown("") | |
# Configuration editor (hidden until authenticated) | |
with gr.Column(visible=False) as faculty_config_section: | |
gr.Markdown("### Edit Assistant Configuration") | |
gr.Markdown("⚠️ **Warning:** Changes will affect all users immediately.") | |
# Load current configuration | |
try: | |
with open('config.json', 'r') as f: | |
current_config = json.load(f) | |
except: | |
current_config = { | |
'system_prompt': SYSTEM_PROMPT, | |
'temperature': 1.2, | |
'max_tokens': 1400, | |
'locked': False | |
} | |
# Editable fields | |
edit_system_prompt = gr.Textbox( | |
label="System Prompt", | |
value=current_config.get('system_prompt', SYSTEM_PROMPT), | |
lines=5 | |
) | |
with gr.Row(): | |
edit_temperature = gr.Slider( | |
label="Temperature", | |
minimum=0, | |
maximum=2, | |
value=current_config.get('temperature', 1.2), | |
step=0.1 | |
) | |
edit_max_tokens = gr.Slider( | |
label="Max Tokens", | |
minimum=50, | |
maximum=4096, | |
value=current_config.get('max_tokens', 1400), | |
step=50 | |
) | |
config_locked = gr.Checkbox( | |
label="Lock Configuration (Prevent further edits)", | |
value=current_config.get('locked', False) | |
) | |
with gr.Row(): | |
save_config_btn = gr.Button("💾 Save Configuration", variant="primary") | |
reset_config_btn = gr.Button("↩️ Reset to Defaults", variant="secondary") | |
config_status = gr.Markdown("") | |
# Faculty authentication function | |
def verify_faculty_password(password): | |
if password == FACULTY_PASSWORD: | |
return ( | |
gr.update(value="✅ Authentication successful!"), | |
gr.update(visible=False), # Hide auth row | |
gr.update(visible=True), # Show config section | |
True # Update auth state | |
) | |
else: | |
return ( | |
gr.update(value="❌ Invalid password"), | |
gr.update(visible=True), # Keep auth row visible | |
gr.update(visible=False), # Keep config hidden | |
False # Auth failed | |
) | |
# Save configuration function | |
def save_configuration(new_prompt, new_temp, new_tokens, lock_config, is_authenticated): | |
if not is_authenticated: | |
return "❌ Not authenticated" | |
# Check if configuration is already locked | |
try: | |
with open('config.json', 'r') as f: | |
existing_config = json.load(f) | |
if existing_config.get('locked', False): | |
return "🔒 Configuration is locked and cannot be modified" | |
except: | |
pass | |
# Save new configuration | |
new_config = { | |
'system_prompt': new_prompt, | |
'temperature': new_temp, | |
'max_tokens': int(new_tokens), | |
'locked': lock_config, | |
'last_modified': datetime.now().isoformat(), | |
'modified_by': 'faculty' | |
} | |
try: | |
with open('config.json', 'w') as f: | |
json.dump(new_config, f, indent=2) | |
# Update global variables | |
global SYSTEM_PROMPT, temperature, max_tokens | |
SYSTEM_PROMPT = new_prompt | |
temperature = new_temp | |
max_tokens = int(new_tokens) | |
return f"✅ Configuration saved successfully at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" | |
except Exception as e: | |
return f"❌ Error saving configuration: {str(e)}" | |
# Reset configuration function | |
def reset_configuration(is_authenticated): | |
if not is_authenticated: | |
return "❌ Not authenticated", gr.update(), gr.update(), gr.update() | |
# Check if locked | |
try: | |
with open('config.json', 'r') as f: | |
existing_config = json.load(f) | |
if existing_config.get('locked', False): | |
return "🔒 Configuration is locked", gr.update(), gr.update(), gr.update() | |
except: | |
pass | |
# Reset to original values | |
return ( | |
"↩️ Reset to default values", | |
gr.update(value=SYSTEM_PROMPT), | |
gr.update(value=1.2), | |
gr.update(value=1400) | |
) | |
# Connect authentication | |
faculty_auth_btn.click( | |
verify_faculty_password, | |
inputs=[faculty_password_input], | |
outputs=[faculty_auth_status, faculty_auth_row, faculty_config_section, faculty_auth_state] | |
) | |
faculty_password_input.submit( | |
verify_faculty_password, | |
inputs=[faculty_password_input], | |
outputs=[faculty_auth_status, faculty_auth_row, faculty_config_section, faculty_auth_state] | |
) | |
# Connect configuration buttons | |
save_config_btn.click( | |
save_configuration, | |
inputs=[edit_system_prompt, edit_temperature, edit_max_tokens, config_locked, faculty_auth_state], | |
outputs=[config_status] | |
) | |
reset_config_btn.click( | |
reset_configuration, | |
inputs=[faculty_auth_state], | |
outputs=[config_status, edit_system_prompt, edit_temperature, edit_max_tokens] | |
) | |
else: | |
gr.Markdown("ℹ️ Faculty configuration is not enabled. Set CONFIG_CODE in Space secrets to enable.") | |
if __name__ == "__main__": | |
demo.launch() | |