|
import gradio as gr |
|
import tempfile |
|
import os |
|
import requests |
|
import json |
|
import re |
|
from bs4 import BeautifulSoup |
|
from datetime import datetime |
|
import urllib.parse |
|
|
|
|
|
|
|
SPACE_NAME = "CCNY Data Science Assistant" |
|
SPACE_DESCRIPTION = "Assistant for Foundations of Data Science at CCNY" |
|
SYSTEM_PROMPT = """You are a supportive course assistant for Professor Zach Muhlbauer's Foundations of Data Science course (CSC 10800 R) at City College of New York. |
|
|
|
Core Responsibilities |
|
* Course Navigation: Help students locate and understand syllabus information, assignment details, schedules, readings, and course materials |
|
* Learning Facilitation: Guide students toward understanding concepts through Socratic questioning and scaffolding rather than providing direct answers |
|
* Administrative Support: Provide accurate information about deadlines, exam dates, office hours, and course policies |
|
|
|
Response Guidelines |
|
* Precision First: Be exact with all dates, times, deadlines, and scheduling information |
|
* Warm Tone: Use encouraging, supportive language that builds student confidence |
|
* Pedagogical Approach: Never provide direct answers to assignments or assessments; instead, ask guiding questions that help students discover solutions independently |
|
* Scope Boundaries: Only address questions related to this specific course (CSC 10800 R) - syllabus, schedule, assignments, readings, and general data science learning support |
|
* Specific Protocols: For questions about grades, redirect students to contact Professor Muhlbauer at the email provided in the course materials for all related inquiries: [email protected] |
|
|
|
For Scheduling/Deadlines: |
|
* Always cross-reference the most current course schedule |
|
* Provide specific dates and times |
|
* Remind students of upcoming deadlines proactively when relevant |
|
* NB: If {{today}} is after the last date of the course, then do NOT make up new dates based on Tue/Thu schedule of the class |
|
|
|
For Conceptual Questions: |
|
* Use Socratic method to guide discovery |
|
* Provide analogies or real-world applications to clarify abstract concepts |
|
* Reference specific course materials where students can find more information |
|
* Encourage active engagement with the material |
|
|
|
Uncertainty Protocol: |
|
When uncertain about any information: |
|
* Explicitly state your uncertainty |
|
* Direct students to verify information through official course materials |
|
* Suggest contacting Professor Muhlbauer for clarification |
|
* Provide general guidance on where to find authoritative information |
|
|
|
Engagement Style: |
|
* Begin responses with acknowledgment of the student's question |
|
* Use warm, encouraging language |
|
* Ask follow-up questions to better understand student needs |
|
* Celebrate student progress and effort |
|
|
|
Remember: Your goal is to support student learning and success while maintaining the pedagogical integrity of the course. You are a learning facilitator, not an answer provider.""" |
|
MODEL = "google/gemini-2.0-flash-001" |
|
GROUNDING_URLS = ["https://zmuhls.github.io/ccny-data-science/schedule/", "https://zmuhls.github.io/ccny-data-science/syllabus/", "https://zmuhls.github.io/ccny-data-science/portfolio/", "https://zmuhls.github.io/ccny-data-science/activities/"] |
|
|
|
|
|
ACCESS_CODE = os.environ.get("SPACE_ACCESS_CODE") |
|
ENABLE_DYNAMIC_URLS = True |
|
|
|
|
|
API_KEY = os.environ.get("OPENROUTER_API_KEY") |
|
if API_KEY: |
|
API_KEY = API_KEY.strip() |
|
if not API_KEY: |
|
API_KEY = None |
|
|
|
|
|
def validate_api_key(): |
|
"""Validate API key configuration with detailed logging""" |
|
if not API_KEY: |
|
print(f"β οΈ API KEY CONFIGURATION ERROR:") |
|
print(f" Variable name: OPENROUTER_API_KEY") |
|
print(f" Status: Not set or empty") |
|
print(f" Action needed: Set 'OPENROUTER_API_KEY' in HuggingFace Space secrets") |
|
print(f" Expected format: sk-or-xxxxxxxxxx") |
|
return False |
|
elif not API_KEY.startswith('sk-or-'): |
|
print(f"β οΈ API KEY FORMAT WARNING:") |
|
print(f" Variable name: OPENROUTER_API_KEY") |
|
print(f" Current value: {API_KEY[:10]}..." if len(API_KEY) > 10 else API_KEY) |
|
print(f" Expected format: sk-or-xxxxxxxxxx") |
|
print(f" Note: OpenRouter keys should start with 'sk-or-'") |
|
return True |
|
else: |
|
print(f"β
API Key configured successfully") |
|
print(f" Variable: OPENROUTER_API_KEY") |
|
print(f" Format: Valid OpenRouter key") |
|
return True |
|
|
|
|
|
try: |
|
API_KEY_VALID = validate_api_key() |
|
except NameError: |
|
|
|
API_KEY_VALID = False |
|
|
|
def validate_url_domain(url): |
|
"""Basic URL domain validation""" |
|
try: |
|
from urllib.parse import urlparse |
|
parsed = urlparse(url) |
|
|
|
if parsed.netloc and '.' in parsed.netloc: |
|
return True |
|
except: |
|
pass |
|
return False |
|
|
|
def fetch_url_content(url): |
|
"""Enhanced URL content fetching with improved compatibility and error handling""" |
|
if not validate_url_domain(url): |
|
return f"Invalid URL format: {url}" |
|
|
|
try: |
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', |
|
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', |
|
'Accept-Language': 'en-US,en;q=0.5', |
|
'Accept-Encoding': 'gzip, deflate', |
|
'Connection': 'keep-alive' |
|
} |
|
|
|
response = requests.get(url, timeout=15, headers=headers) |
|
response.raise_for_status() |
|
soup = BeautifulSoup(response.content, 'html.parser') |
|
|
|
|
|
for element in soup(["script", "style", "nav", "header", "footer", "aside", "form", "button"]): |
|
element.decompose() |
|
|
|
|
|
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_=lambda x: bool(x and 'content' in x.lower())) or soup |
|
text = main_content.get_text() |
|
|
|
|
|
lines = (line.strip() for line in text.splitlines()) |
|
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) |
|
text = ' '.join(chunk for chunk in chunks if chunk and len(chunk) > 2) |
|
|
|
|
|
if len(text) > 4000: |
|
truncated = text[:4000] |
|
last_period = truncated.rfind('.') |
|
if last_period > 3000: |
|
text = truncated[:last_period + 1] |
|
else: |
|
text = truncated + "..." |
|
|
|
return text if text.strip() else "No readable content found at this URL" |
|
|
|
except requests.exceptions.Timeout: |
|
return f"Timeout error fetching {url} (15s limit exceeded)" |
|
except requests.exceptions.RequestException as e: |
|
return f"Error fetching {url}: {str(e)}" |
|
except Exception as e: |
|
return f"Error processing content from {url}: {str(e)}" |
|
|
|
def extract_urls_from_text(text): |
|
"""Extract URLs from text using regex with enhanced validation""" |
|
import re |
|
url_pattern = r'https?://[^\s<>"{}|\\^`\[\]"]+' |
|
urls = re.findall(url_pattern, text) |
|
|
|
|
|
validated_urls = [] |
|
for url in urls: |
|
|
|
url = url.rstrip('.,!?;:') |
|
|
|
if '.' in url and len(url) > 10: |
|
validated_urls.append(url) |
|
|
|
return validated_urls |
|
|
|
|
|
_url_content_cache = {} |
|
|
|
def get_grounding_context(): |
|
"""Fetch context from grounding URLs with caching""" |
|
if not GROUNDING_URLS: |
|
return "" |
|
|
|
|
|
cache_key = tuple(sorted([url for url in GROUNDING_URLS if url and url.strip()])) |
|
|
|
|
|
if cache_key in _url_content_cache: |
|
return _url_content_cache[cache_key] |
|
|
|
context_parts = [] |
|
for i, url in enumerate(GROUNDING_URLS, 1): |
|
if url.strip(): |
|
content = fetch_url_content(url.strip()) |
|
|
|
priority_label = "PRIMARY" if i <= 2 else "SECONDARY" |
|
context_parts.append(f"[{priority_label}] Context from URL {i} ({url}):\n{content}") |
|
|
|
if context_parts: |
|
result = "\n\n" + "\n\n".join(context_parts) + "\n\n" |
|
else: |
|
result = "" |
|
|
|
|
|
_url_content_cache[cache_key] = result |
|
return result |
|
|
|
def export_conversation_to_markdown(conversation_history): |
|
"""Export conversation history to markdown format""" |
|
if not conversation_history: |
|
return "No conversation to export." |
|
|
|
markdown_content = f"""# Conversation Export |
|
Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} |
|
|
|
--- |
|
|
|
""" |
|
|
|
message_pair_count = 0 |
|
for i, message in enumerate(conversation_history): |
|
if isinstance(message, dict): |
|
role = message.get('role', 'unknown') |
|
content = message.get('content', '') |
|
|
|
if role == 'user': |
|
message_pair_count += 1 |
|
markdown_content += f"## User Message {message_pair_count}\n\n{content}\n\n" |
|
elif role == 'assistant': |
|
markdown_content += f"## Assistant Response {message_pair_count}\n\n{content}\n\n---\n\n" |
|
elif isinstance(message, (list, tuple)) and len(message) >= 2: |
|
|
|
message_pair_count += 1 |
|
user_msg, assistant_msg = message[0], message[1] |
|
if user_msg: |
|
markdown_content += f"## User Message {message_pair_count}\n\n{user_msg}\n\n" |
|
if assistant_msg: |
|
markdown_content += f"## Assistant Response {message_pair_count}\n\n{assistant_msg}\n\n---\n\n" |
|
|
|
return markdown_content |
|
|
|
|
|
def generate_response(message, history): |
|
"""Generate response using OpenRouter API""" |
|
|
|
|
|
if not API_KEY: |
|
error_msg = f"π **API Key Required**\n\n" |
|
error_msg += f"Please configure your OpenRouter API key:\n" |
|
error_msg += f"1. Go to Settings (βοΈ) in your HuggingFace Space\n" |
|
error_msg += f"2. Click 'Variables and secrets'\n" |
|
error_msg += f"3. Add secret: **OPENROUTER_API_KEY**\n" |
|
error_msg += f"4. Value: Your OpenRouter API key (starts with `sk-or-`)\n\n" |
|
error_msg += f"Get your API key at: https://openrouter.ai/keys" |
|
print(f"β API request failed: No API key configured for OPENROUTER_API_KEY") |
|
return error_msg |
|
|
|
|
|
grounding_context = get_grounding_context() |
|
|
|
|
|
|
|
if ENABLE_DYNAMIC_URLS: |
|
urls_in_message = extract_urls_from_text(message) |
|
if urls_in_message: |
|
|
|
dynamic_context_parts = [] |
|
for url in urls_in_message[:3]: |
|
content = fetch_url_content(url) |
|
dynamic_context_parts.append(f"\n\nDynamic context from {url}:\n{content}") |
|
if dynamic_context_parts: |
|
grounding_context += "\n".join(dynamic_context_parts) |
|
|
|
|
|
enhanced_system_prompt = SYSTEM_PROMPT + grounding_context |
|
|
|
|
|
messages = [{"role": "system", "content": enhanced_system_prompt}] |
|
|
|
|
|
for chat in history: |
|
if isinstance(chat, dict): |
|
|
|
messages.append(chat) |
|
elif isinstance(chat, (list, tuple)) and len(chat) >= 2: |
|
|
|
user_msg, assistant_msg = chat[0], chat[1] |
|
if user_msg: |
|
messages.append({"role": "user", "content": user_msg}) |
|
if assistant_msg: |
|
messages.append({"role": "assistant", "content": assistant_msg}) |
|
|
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
try: |
|
print(f"π Making API request to OpenRouter...") |
|
print(f" Model: {MODEL}") |
|
print(f" Messages: {len(messages)} in conversation") |
|
|
|
response = requests.post( |
|
url="https://openrouter.ai/api/v1/chat/completions", |
|
headers={ |
|
"Authorization": f"Bearer {API_KEY}", |
|
"Content-Type": "application/json", |
|
"HTTP-Referer": "https://huggingface.co", |
|
"X-Title": "HuggingFace Space" |
|
}, |
|
json={ |
|
"model": MODEL, |
|
"messages": messages, |
|
"temperature": 0.7, |
|
"max_tokens": 750 |
|
}, |
|
timeout=30 |
|
) |
|
|
|
print(f"π‘ API Response: {response.status_code}") |
|
|
|
if response.status_code == 200: |
|
try: |
|
result = response.json() |
|
|
|
|
|
if 'choices' not in result or not result['choices']: |
|
print(f"β οΈ API response missing choices: {result}") |
|
return "API Error: No response choices available" |
|
elif 'message' not in result['choices'][0]: |
|
print(f"β οΈ API response missing message: {result}") |
|
return "API Error: No message in response" |
|
elif 'content' not in result['choices'][0]['message']: |
|
print(f"β οΈ API response missing content: {result}") |
|
return "API Error: No content in message" |
|
else: |
|
content = result['choices'][0]['message']['content'] |
|
|
|
|
|
if not content or content.strip() == "": |
|
print(f"β οΈ API returned empty content") |
|
return "API Error: Empty response content" |
|
|
|
print(f"β
API request successful") |
|
return content |
|
|
|
except (KeyError, IndexError, json.JSONDecodeError) as e: |
|
print(f"β Failed to parse API response: {str(e)}") |
|
return f"API Error: Failed to parse response - {str(e)}" |
|
elif response.status_code == 401: |
|
error_msg = f"π **Authentication Error**\n\n" |
|
error_msg += f"Your API key appears to be invalid or expired.\n\n" |
|
error_msg += f"**Troubleshooting:**\n" |
|
error_msg += f"1. Check that your **OPENROUTER_API_KEY** secret is set correctly\n" |
|
error_msg += f"2. Verify your API key at: https://openrouter.ai/keys\n" |
|
error_msg += f"3. Ensure your key starts with `sk-or-`\n" |
|
error_msg += f"4. Check that you have credits on your OpenRouter account" |
|
print(f"β API authentication failed: {response.status_code} - {response.text[:200]}") |
|
return error_msg |
|
elif response.status_code == 429: |
|
error_msg = f"β±οΈ **Rate Limit Exceeded**\n\n" |
|
error_msg += f"Too many requests. Please wait a moment and try again.\n\n" |
|
error_msg += f"**Troubleshooting:**\n" |
|
error_msg += f"1. Wait 30-60 seconds before trying again\n" |
|
error_msg += f"2. Check your OpenRouter usage limits\n" |
|
error_msg += f"3. Consider upgrading your OpenRouter plan" |
|
print(f"β Rate limit exceeded: {response.status_code}") |
|
return error_msg |
|
elif response.status_code == 400: |
|
try: |
|
error_data = response.json() |
|
error_message = error_data.get('error', {}).get('message', 'Unknown error') |
|
except: |
|
error_message = response.text |
|
|
|
error_msg = f"β οΈ **Request Error**\n\n" |
|
error_msg += f"The API request was invalid:\n" |
|
error_msg += f"`{error_message}`\n\n" |
|
if "model" in error_message.lower(): |
|
error_msg += f"**Model Issue:** The model `{MODEL}` may not be available.\n" |
|
error_msg += f"Try switching to a different model in your Space configuration." |
|
print(f"β Bad request: {response.status_code} - {error_message}") |
|
return error_msg |
|
else: |
|
error_msg = f"π« **API Error {response.status_code}**\n\n" |
|
error_msg += f"An unexpected error occurred. Please try again.\n\n" |
|
error_msg += f"If this persists, check:\n" |
|
error_msg += f"1. OpenRouter service status\n" |
|
error_msg += f"2. Your API key and credits\n" |
|
error_msg += f"3. The model availability" |
|
print(f"β API error: {response.status_code} - {response.text[:200]}") |
|
return error_msg |
|
|
|
except requests.exceptions.Timeout: |
|
error_msg = f"β° **Request Timeout**\n\n" |
|
error_msg += f"The API request took too long (30s limit).\n\n" |
|
error_msg += f"**Troubleshooting:**\n" |
|
error_msg += f"1. Try again with a shorter message\n" |
|
error_msg += f"2. Check your internet connection\n" |
|
error_msg += f"3. Try a different model" |
|
print(f"β Request timeout after 30 seconds") |
|
return error_msg |
|
except requests.exceptions.ConnectionError: |
|
error_msg = f"π **Connection Error**\n\n" |
|
error_msg += f"Could not connect to OpenRouter API.\n\n" |
|
error_msg += f"**Troubleshooting:**\n" |
|
error_msg += f"1. Check your internet connection\n" |
|
error_msg += f"2. Check OpenRouter service status\n" |
|
error_msg += f"3. Try again in a few moments" |
|
print(f"β Connection error to OpenRouter API") |
|
return error_msg |
|
except Exception as e: |
|
error_msg = f"β **Unexpected Error**\n\n" |
|
error_msg += f"An unexpected error occurred:\n" |
|
error_msg += f"`{str(e)}`\n\n" |
|
error_msg += f"Please try again or contact support if this persists." |
|
print(f"β Unexpected error: {str(e)}") |
|
return error_msg |
|
|
|
|
|
access_granted = gr.State(False) |
|
_access_granted_global = False |
|
|
|
def verify_access_code(code): |
|
"""Verify the access code""" |
|
global _access_granted_global |
|
if ACCESS_CODE is None: |
|
_access_granted_global = True |
|
return gr.update(visible=False), gr.update(visible=True), gr.update(value=True) |
|
|
|
if code == ACCESS_CODE: |
|
_access_granted_global = True |
|
return gr.update(visible=False), gr.update(visible=True), gr.update(value=True) |
|
else: |
|
_access_granted_global = False |
|
return gr.update(visible=True, value="β Incorrect access code. Please try again."), gr.update(visible=False), gr.update(value=False) |
|
|
|
def protected_generate_response(message, history): |
|
"""Protected response function that checks access""" |
|
|
|
if ACCESS_CODE is not None and not _access_granted_global: |
|
return "Please enter the access code to continue." |
|
return generate_response(message, history) |
|
|
|
|
|
chat_history_store = [] |
|
|
|
def store_and_generate_response(message, history): |
|
"""Wrapper function that stores history and generates response""" |
|
global chat_history_store |
|
|
|
|
|
response = protected_generate_response(message, history) |
|
|
|
|
|
|
|
chat_history_store = [] |
|
if history: |
|
for exchange in history: |
|
if isinstance(exchange, (list, tuple)) and len(exchange) >= 2: |
|
chat_history_store.append({"role": "user", "content": exchange[0]}) |
|
chat_history_store.append({"role": "assistant", "content": exchange[1]}) |
|
|
|
|
|
chat_history_store.append({"role": "user", "content": message}) |
|
chat_history_store.append({"role": "assistant", "content": response}) |
|
|
|
return response |
|
|
|
def export_current_conversation(): |
|
"""Export the current conversation""" |
|
if not chat_history_store: |
|
return gr.update(visible=False) |
|
|
|
markdown_content = export_conversation_to_markdown(chat_history_store) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: |
|
f.write(markdown_content) |
|
temp_file = f.name |
|
|
|
return gr.update(value=temp_file, visible=True) |
|
|
|
def export_conversation(history): |
|
"""Export conversation to markdown file""" |
|
if not history: |
|
return gr.update(visible=False) |
|
|
|
markdown_content = export_conversation_to_markdown(history) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: |
|
f.write(markdown_content) |
|
temp_file = f.name |
|
|
|
return gr.update(value=temp_file, visible=True) |
|
|
|
|
|
def get_configuration_status(): |
|
"""Generate a configuration status message for display""" |
|
status_parts = [] |
|
|
|
if API_KEY_VALID: |
|
status_parts.append("β
**API Key:** Configured and valid") |
|
else: |
|
status_parts.append("β **API Key:** Not configured - Set `OPENROUTER_API_KEY` in Space secrets") |
|
|
|
status_parts.append(f"π€ **Model:** {MODEL}") |
|
status_parts.append(f"π‘οΈ **Temperature:** 0.7") |
|
status_parts.append(f"π **Max Tokens:** 750") |
|
|
|
|
|
if GROUNDING_URLS: |
|
status_parts.append(f"π **URL Grounding:** {len(GROUNDING_URLS)} URLs configured") |
|
|
|
for i, url in enumerate(GROUNDING_URLS[:3], 1): |
|
priority_label = "Primary" if i <= 2 else "Secondary" |
|
status_parts.append(f" - [{priority_label}] {url}") |
|
if len(GROUNDING_URLS) > 3: |
|
status_parts.append(f" - ... and {len(GROUNDING_URLS) - 3} more URLs") |
|
else: |
|
status_parts.append("π **URL Grounding:** No URLs configured") |
|
|
|
if ENABLE_DYNAMIC_URLS: |
|
status_parts.append("π **Dynamic URLs:** Enabled") |
|
else: |
|
status_parts.append("π **Dynamic URLs:** Disabled") |
|
|
|
if ACCESS_CODE is not None: |
|
status_parts.append("π **Access Control:** Enabled") |
|
else: |
|
status_parts.append("π **Access:** Public Chatbot") |
|
|
|
|
|
status_parts.append("") |
|
status_parts.append("**System Prompt:**") |
|
status_parts.append(f"{SYSTEM_PROMPT}") |
|
|
|
return "\n".join(status_parts) |
|
|
|
|
|
with gr.Blocks(title=SPACE_NAME) as demo: |
|
gr.Markdown(f"# {SPACE_NAME}") |
|
gr.Markdown(SPACE_DESCRIPTION) |
|
|
|
|
|
with gr.Accordion("π Configuration Status", open=not API_KEY_VALID): |
|
gr.Markdown(get_configuration_status()) |
|
|
|
|
|
with gr.Column(visible=(ACCESS_CODE is not None)) as access_section: |
|
gr.Markdown("### π Access Required") |
|
gr.Markdown("Please enter the access code provided by your instructor:") |
|
|
|
access_input = gr.Textbox( |
|
label="Access Code", |
|
placeholder="Enter access code...", |
|
type="password" |
|
) |
|
access_btn = gr.Button("Submit", variant="primary") |
|
access_error = gr.Markdown(visible=False) |
|
|
|
|
|
with gr.Column(visible=(ACCESS_CODE is None)) as chat_section: |
|
chat_interface = gr.ChatInterface( |
|
fn=store_and_generate_response, |
|
title="", |
|
description="", |
|
examples=['When is the social coding portfolio due?', 'How can I reach Prof. Muhlbauer?', 'Explain Python data types to me', 'What kind of programming language is Python?'], |
|
type="messages" |
|
) |
|
|
|
|
|
with gr.Row(): |
|
export_btn = gr.Button("π₯ Export Conversation", variant="secondary", size="sm") |
|
export_file = gr.File(label="Download Conversation", visible=False) |
|
|
|
|
|
export_btn.click( |
|
export_current_conversation, |
|
outputs=[export_file] |
|
) |
|
|
|
|
|
if ACCESS_CODE is not None: |
|
access_btn.click( |
|
verify_access_code, |
|
inputs=[access_input], |
|
outputs=[access_error, chat_section, access_granted] |
|
) |
|
access_input.submit( |
|
verify_access_code, |
|
inputs=[access_input], |
|
outputs=[access_error, chat_section, access_granted] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|