Spaces:
Running
Running
import gradio as gr | |
import json | |
import zipfile | |
import io | |
import os | |
from datetime import datetime | |
from dotenv import load_dotenv | |
import requests | |
from bs4 import BeautifulSoup | |
import tempfile | |
from pathlib import Path | |
# from scraping_service import get_grounding_context_crawl4ai, fetch_url_content_crawl4ai | |
# Temporary mock functions for testing | |
def get_grounding_context_crawl4ai(urls): | |
return "\n\n[URL content would be fetched here]\n\n" | |
def fetch_url_content_crawl4ai(url): | |
return f"[Content from {url} would be fetched here]" | |
# Import RAG components | |
try: | |
from rag_tool import RAGTool | |
HAS_RAG = True | |
except ImportError: | |
HAS_RAG = False | |
RAGTool = None | |
# Load environment variables from .env file | |
load_dotenv() | |
# Template for generated space app (based on mvp_simple.py) | |
SPACE_TEMPLATE = '''import gradio as gr | |
import os | |
import requests | |
import json | |
import asyncio | |
from crawl4ai import AsyncWebCrawler | |
# Configuration | |
SPACE_NAME = "{name}" | |
SPACE_DESCRIPTION = "{description}" | |
SYSTEM_PROMPT = """{system_prompt}""" | |
MODEL = "{model}" | |
GROUNDING_URLS = {grounding_urls} | |
# Get access code from environment variable for security | |
ACCESS_CODE = os.environ.get("SPACE_ACCESS_CODE", "{access_code}") | |
ENABLE_DYNAMIC_URLS = {enable_dynamic_urls} | |
ENABLE_VECTOR_RAG = {enable_vector_rag} | |
RAG_DATA = {rag_data_json} | |
# Get API key from environment - customizable variable name | |
API_KEY = os.environ.get("{api_key_var}") | |
async def fetch_url_content_async(url, crawler): | |
"""Fetch and extract text content from a URL using Crawl4AI""" | |
try: | |
result = await crawler.arun( | |
url=url, | |
bypass_cache=True, | |
word_count_threshold=10, | |
excluded_tags=['script', 'style', 'nav', 'header', 'footer'], | |
remove_overlay_elements=True | |
) | |
if result.success: | |
content = result.markdown or result.cleaned_html or "" | |
# Truncate to ~4000 characters | |
if len(content) > 4000: | |
content = content[:4000] + "..." | |
return content | |
else: | |
return f"Error fetching {{url}}: Failed to retrieve content" | |
except Exception as e: | |
return f"Error fetching {{url}}: {{str(e)}}" | |
def fetch_url_content(url): | |
"""Synchronous wrapper for URL fetching""" | |
async def fetch(): | |
async with AsyncWebCrawler(verbose=False) as crawler: | |
return await fetch_url_content_async(url, crawler) | |
try: | |
return asyncio.run(fetch()) | |
except Exception as e: | |
return f"Error fetching {{url}}: {{str(e)}}" | |
# Global cache for URL content to avoid re-crawling in generated spaces | |
_url_content_cache = {{}} | |
def get_grounding_context(): | |
"""Fetch context from grounding URLs with caching""" | |
if not GROUNDING_URLS: | |
return "" | |
# Create cache key from URLs | |
cache_key = tuple(sorted([url for url in GROUNDING_URLS if url and url.strip()])) | |
# Check cache first | |
if cache_key in _url_content_cache: | |
return _url_content_cache[cache_key] | |
context_parts = [] | |
for i, url in enumerate(GROUNDING_URLS, 1): | |
if url.strip(): | |
content = fetch_url_content(url.strip()) | |
context_parts.append(f"Context from URL {{i}} ({{url}}):\\n{{content}}") | |
if context_parts: | |
result = "\\n\\n" + "\\n\\n".join(context_parts) + "\\n\\n" | |
else: | |
result = "" | |
# Cache the result | |
_url_content_cache[cache_key] = result | |
return result | |
import re | |
def extract_urls_from_text(text): | |
"""Extract URLs from text using regex""" | |
url_pattern = r'https?://[^\\s<>"{{}}|\\^`\\[\\]"]+' | |
return re.findall(url_pattern, text) | |
# Initialize RAG context if enabled | |
if ENABLE_VECTOR_RAG and RAG_DATA: | |
try: | |
import faiss | |
import numpy as np | |
import base64 | |
class SimpleRAGContext: | |
def __init__(self, rag_data): | |
# Deserialize FAISS index | |
index_bytes = base64.b64decode(rag_data['index_base64']) | |
self.index = faiss.deserialize_index(index_bytes) | |
# Restore chunks and mappings | |
self.chunks = rag_data['chunks'] | |
self.chunk_ids = rag_data['chunk_ids'] | |
def get_context(self, query, max_chunks=3): | |
"""Get relevant context - simplified version""" | |
# In production, you'd compute query embedding here | |
# For now, return a simple message | |
return "\\n\\n[RAG context would be retrieved here based on similarity search]\\n\\n" | |
rag_context_provider = SimpleRAGContext(RAG_DATA) | |
except Exception as e: | |
print(f"Failed to initialize RAG: {{e}}") | |
rag_context_provider = None | |
else: | |
rag_context_provider = None | |
def generate_response(message, history): | |
"""Generate response using OpenRouter API""" | |
if not API_KEY: | |
return "Please set your {api_key_var} in the Space settings." | |
# Get grounding context | |
grounding_context = get_grounding_context() | |
# Add RAG context if available | |
if ENABLE_VECTOR_RAG and rag_context_provider: | |
rag_context = rag_context_provider.get_context(message) | |
if rag_context: | |
grounding_context += rag_context | |
# If dynamic URLs are enabled, check message for URLs to fetch | |
if ENABLE_DYNAMIC_URLS: | |
urls_in_message = extract_urls_from_text(message) | |
if urls_in_message: | |
# Fetch content from URLs mentioned in the message | |
dynamic_context_parts = [] | |
for url in urls_in_message[:3]: # Limit to 3 URLs per message | |
content = fetch_url_content(url) | |
dynamic_context_parts.append(f"\\n\\nDynamic context from {{url}}:\\n{{content}}") | |
if dynamic_context_parts: | |
grounding_context += "\\n".join(dynamic_context_parts) | |
# Build enhanced system prompt with grounding context | |
enhanced_system_prompt = SYSTEM_PROMPT + grounding_context | |
# Build messages array for the API | |
messages = [{{"role": "system", "content": enhanced_system_prompt}}] | |
# Add conversation history - compatible with Gradio 5.x format | |
for chat in history: | |
if isinstance(chat, dict): | |
# New format: {{"role": "user", "content": "..."}} or {{"role": "assistant", "content": "..."}} | |
messages.append(chat) | |
else: | |
# Legacy format: ("user msg", "bot msg") | |
user_msg, bot_msg = chat | |
messages.append({{"role": "user", "content": user_msg}}) | |
if bot_msg: | |
messages.append({{"role": "assistant", "content": bot_msg}}) | |
# Add current message | |
messages.append({{"role": "user", "content": message}}) | |
# Make API request | |
try: | |
response = requests.post( | |
url="https://openrouter.ai/api/v1/chat/completions", | |
headers={{ | |
"Authorization": f"Bearer {{API_KEY}}", | |
"Content-Type": "application/json" | |
}}, | |
json={{ | |
"model": MODEL, | |
"messages": messages, | |
"temperature": {temperature}, | |
"max_tokens": {max_tokens} | |
}} | |
) | |
if response.status_code == 200: | |
return response.json()['choices'][0]['message']['content'] | |
else: | |
return f"Error: {{response.status_code}} - {{response.text}}" | |
except Exception as e: | |
return f"Error: {{str(e)}}" | |
# Access code verification | |
access_granted = gr.State(False) | |
_access_granted_global = False # Global fallback | |
def verify_access_code(code): | |
\"\"\"Verify the access code\"\"\" | |
global _access_granted_global | |
if not ACCESS_CODE: | |
_access_granted_global = True | |
return gr.update(visible=False), gr.update(visible=True), gr.update(value=True) | |
if code == ACCESS_CODE: | |
_access_granted_global = True | |
return gr.update(visible=False), gr.update(visible=True), gr.update(value=True) | |
else: | |
_access_granted_global = False | |
return gr.update(visible=True, value="❌ Incorrect access code. Please try again."), gr.update(visible=False), gr.update(value=False) | |
def protected_generate_response(message, history): | |
\"\"\"Protected response function that checks access\"\"\" | |
# Check if access is granted via the global variable | |
if ACCESS_CODE and not _access_granted_global: | |
return "Please enter the access code to continue." | |
return generate_response(message, history) | |
# Create interface with access code protection | |
with gr.Blocks(title=SPACE_NAME) as demo: | |
gr.Markdown(f"# {{SPACE_NAME}}") | |
gr.Markdown(SPACE_DESCRIPTION) | |
# Access code section (shown only if ACCESS_CODE is set) | |
with gr.Column(visible=bool(ACCESS_CODE)) as access_section: | |
gr.Markdown("### 🔐 Access Required") | |
gr.Markdown("Please enter the access code provided by your instructor:") | |
access_input = gr.Textbox( | |
label="Access Code", | |
placeholder="Enter access code...", | |
type="password" | |
) | |
access_btn = gr.Button("Submit", variant="primary") | |
access_error = gr.Markdown(visible=False) | |
# Main chat interface (hidden until access granted) | |
with gr.Column(visible=not bool(ACCESS_CODE)) as chat_section: | |
chat_interface = gr.ChatInterface( | |
fn=protected_generate_response, | |
title="", # Title already shown above | |
description="", # Description already shown above | |
examples=None | |
) | |
# Connect access verification | |
if ACCESS_CODE: | |
access_btn.click( | |
verify_access_code, | |
inputs=[access_input], | |
outputs=[access_error, chat_section, access_granted] | |
) | |
access_input.submit( | |
verify_access_code, | |
inputs=[access_input], | |
outputs=[access_error, chat_section, access_granted] | |
) | |
if __name__ == "__main__": | |
demo.launch() | |
''' | |
# Available models | |
MODELS = [ | |
"google/gemma-3-27b-it", | |
"google/gemini-2.0-flash-001", | |
"mistralai/mistral-medium", | |
"openai/gpt-4o-nano", | |
"anthropic/claude-3.5-haiku" | |
] | |
def fetch_url_content(url): | |
"""Fetch and extract text content from a URL""" | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
# Get text content | |
text = soup.get_text() | |
# Clean up whitespace | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = ' '.join(chunk for chunk in chunks if chunk) | |
# Truncate to ~4000 characters | |
if len(text) > 4000: | |
text = text[:4000] + "..." | |
return text | |
except Exception as e: | |
return f"Error fetching {url}: {str(e)}" | |
def get_grounding_context(urls): | |
"""Fetch context from grounding URLs""" | |
if not urls: | |
return "" | |
context_parts = [] | |
for i, url in enumerate(urls, 1): | |
if url and url.strip(): | |
content = fetch_url_content(url.strip()) | |
context_parts.append(f"Context from URL {i} ({url}):\n{content}") | |
if context_parts: | |
return "\n\n" + "\n\n".join(context_parts) + "\n\n" | |
return "" | |
def create_readme(config): | |
"""Generate README with deployment instructions""" | |
return f"""--- | |
title: {config['name']} | |
emoji: 🤖 | |
colorFrom: blue | |
colorTo: red | |
sdk: gradio | |
sdk_version: 5.35.0 | |
app_file: app.py | |
pinned: false | |
--- | |
# {config['name']} | |
{config['description']} | |
## Quick Deploy to HuggingFace Spaces | |
### Step 1: Create the Space | |
1. Go to https://huggingface.co/spaces | |
2. Click "Create new Space" | |
3. Choose a name for your Space | |
4. Select **Gradio** as the SDK | |
5. Set visibility (Public/Private) | |
6. Click "Create Space" | |
### Step 2: Upload Files | |
1. In your new Space, click "Files" tab | |
2. Upload these files from the zip: | |
- `app.py` | |
- `requirements.txt` | |
3. Wait for "Building" to complete | |
### Step 3: Add API Key | |
1. Go to Settings (gear icon) | |
2. Click "Variables and secrets" | |
3. Click "New secret" | |
4. Name: `{config['api_key_var']}` | |
5. Value: Your OpenRouter API key | |
6. Click "Add" | |
{f'''### Step 4: Configure Access Control | |
Your Space is configured with access code protection. Students will need to enter the access code to use the chatbot. | |
1. Go to Settings (gear icon) | |
2. Click "Variables and secrets" | |
3. Click "New secret" | |
4. Name: `SPACE_ACCESS_CODE` | |
5. Value: `{config['access_code']}` | |
6. Click "Add" | |
**Important**: The access code is now stored securely as an environment variable and is not visible in your app code. | |
To disable access protection: | |
1. Go to Settings → Variables and secrets | |
2. Delete the `SPACE_ACCESS_CODE` secret | |
3. The Space will rebuild automatically with no access protection | |
''' if config['access_code'] else ''} | |
### Step {4 if not config['access_code'] else 5}: Get Your API Key | |
1. Go to https://openrouter.ai/keys | |
2. Sign up/login if needed | |
3. Click "Create Key" | |
4. Copy the key (starts with `sk-or-`) | |
### Step {5 if not config['access_code'] else 6}: Test Your Space | |
- Go back to "App" tab | |
- Your Space should be running! | |
- Try the example prompts or ask a question | |
## Configuration | |
- **Model**: {config['model']} | |
- **Temperature**: {config['temperature']} | |
- **Max Tokens**: {config['max_tokens']} | |
- **API Key Variable**: {config['api_key_var']}""" | |
# Add optional configuration items | |
if config['access_code']: | |
readme_content += f""" | |
- **Access Code**: {config['access_code']} (Students need this to access the chatbot)""" | |
if config.get('enable_dynamic_urls'): | |
readme_content += """ | |
- **Dynamic URL Fetching**: Enabled (Assistant can fetch URLs mentioned in conversations)""" | |
readme_content += f""" | |
## Customization | |
To modify your Space: | |
1. Edit `app.py` in your Space | |
2. Update configuration variables at the top | |
3. Changes deploy automatically | |
## Troubleshooting | |
- **"Please set your {config['api_key_var']}"**: Add the secret in Space settings | |
- **Error 401**: Invalid API key or no credits | |
- **Error 429**: Rate limit - wait and try again | |
- **Build failed**: Check requirements.txt formatting | |
## More Help | |
- HuggingFace Spaces: https://huggingface.co/docs/hub/spaces | |
- OpenRouter Docs: https://openrouter.ai/docs | |
- Gradio Docs: https://gradio.app/docs | |
--- | |
Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} with Chat U/I Helper | |
""" | |
return readme_content | |
def create_requirements(enable_vector_rag=False): | |
"""Generate requirements.txt""" | |
base_requirements = "gradio>=5.35.0\nrequests>=2.32.3\ncrawl4ai>=0.4.0\naiofiles>=24.0" | |
if enable_vector_rag: | |
base_requirements += "\nfaiss-cpu==1.7.4\nnumpy==1.24.3" | |
return base_requirements | |
def generate_zip(name, description, system_prompt, model, api_key_var, temperature, max_tokens, examples_text, access_code="", enable_dynamic_urls=False, url1="", url2="", url3="", url4="", enable_vector_rag=False, rag_data=None): | |
"""Generate deployable zip file""" | |
# Process examples | |
if examples_text and examples_text.strip(): | |
examples_list = [ex.strip() for ex in examples_text.split('\n') if ex.strip()] | |
examples_json = json.dumps(examples_list) | |
else: | |
examples_json = json.dumps([ | |
"Hello! How can you help me?", | |
"Tell me something interesting", | |
"What can you do?" | |
]) | |
# Process grounding URLs | |
grounding_urls = [] | |
for url in [url1, url2, url3, url4]: | |
if url and url.strip(): | |
grounding_urls.append(url.strip()) | |
# Use the provided system prompt directly | |
# Create config | |
config = { | |
'name': name, | |
'description': description, | |
'system_prompt': system_prompt, | |
'model': model, | |
'api_key_var': api_key_var, | |
'temperature': temperature, | |
'max_tokens': int(max_tokens), | |
'examples': examples_json, | |
'grounding_urls': json.dumps(grounding_urls), | |
'access_code': "", # Access code stored in environment variable for security | |
'enable_dynamic_urls': enable_dynamic_urls, | |
'enable_vector_rag': enable_vector_rag, | |
'rag_data_json': json.dumps(rag_data) if rag_data else 'None' | |
} | |
# Generate files | |
app_content = SPACE_TEMPLATE.format(**config) | |
# Pass original access_code to README for documentation | |
readme_config = config.copy() | |
readme_config['access_code'] = access_code or "" | |
readme_content = create_readme(readme_config) | |
requirements_content = create_requirements(enable_vector_rag) | |
# Create zip file with clean naming | |
filename = f"{name.lower().replace(' ', '_').replace('-', '_')}.zip" | |
# Create zip in memory and save to disk | |
zip_buffer = io.BytesIO() | |
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
zip_file.writestr('app.py', app_content) | |
zip_file.writestr('requirements.txt', requirements_content) | |
zip_file.writestr('README.md', readme_content) | |
zip_file.writestr('config.json', json.dumps(config, indent=2)) | |
# Write zip to file | |
zip_buffer.seek(0) | |
with open(filename, 'wb') as f: | |
f.write(zip_buffer.getvalue()) | |
return filename | |
# Define callback functions outside the interface | |
def toggle_rag_section(enable_rag): | |
"""Toggle visibility of RAG section""" | |
return gr.update(visible=enable_rag) | |
def process_documents(files, current_rag_tool): | |
"""Process uploaded documents""" | |
if not files: | |
return "Please upload files first", current_rag_tool | |
if not HAS_RAG: | |
return "RAG functionality not available. Please install required dependencies.", current_rag_tool | |
try: | |
# Initialize RAG tool if not exists | |
if not current_rag_tool: | |
current_rag_tool = RAGTool() | |
# Process files | |
result = current_rag_tool.process_uploaded_files(files) | |
if result['success']: | |
# Create status message | |
status_parts = [f"✅ {result['message']}"] | |
# Add file summary | |
if result['summary']['files_processed']: | |
status_parts.append("\n**Processed files:**") | |
for file_info in result['summary']['files_processed']: | |
status_parts.append(f"- {file_info['name']} ({file_info['chunks']} chunks)") | |
# Add errors if any | |
if result.get('errors'): | |
status_parts.append("\n**Errors:**") | |
for error in result['errors']: | |
status_parts.append(f"- {error['file']}: {error['error']}") | |
# Add index stats | |
if result.get('index_stats'): | |
stats = result['index_stats'] | |
status_parts.append(f"\n**Index stats:** {stats['total_chunks']} chunks, {stats['dimension']}D embeddings") | |
return "\n".join(status_parts), current_rag_tool | |
else: | |
return f"❌ {result['message']}", current_rag_tool | |
except Exception as e: | |
return f"❌ Error processing documents: {str(e)}", current_rag_tool | |
def update_sandbox_preview(config_data): | |
"""Update the sandbox preview with generated content""" | |
if not config_data: | |
return "Generate a space configuration to see preview here.", "<div style='text-align: center; padding: 50px; color: #666;'>No preview available</div>" | |
# Create preview info | |
preview_text = f"""**Space Configuration:** | |
- **Name:** {config_data.get('name', 'N/A')} | |
- **Model:** {config_data.get('model', 'N/A')} | |
- **Temperature:** {config_data.get('temperature', 'N/A')} | |
- **Max Tokens:** {config_data.get('max_tokens', 'N/A')} | |
- **Dynamic URLs:** {'✅ Enabled' if config_data.get('enable_dynamic_urls') else '❌ Disabled'} | |
- **Vector RAG:** {'✅ Enabled' if config_data.get('enable_vector_rag') else '❌ Disabled'} | |
**System Prompt Preview:** | |
``` | |
{config_data.get('system_prompt', 'No system prompt configured')[:500]}{'...' if len(config_data.get('system_prompt', '')) > 500 else ''} | |
``` | |
**Deployment Package:** `{config_data.get('filename', 'Not generated')}`""" | |
# Create a basic HTML preview of the chat interface | |
preview_html = f""" | |
<div style="border: 1px solid #ddd; border-radius: 8px; padding: 20px; background: #f9f9f9;"> | |
<h3 style="margin-top: 0; color: #333;">{config_data.get('name', 'Chat Interface')}</h3> | |
<p style="color: #666; margin-bottom: 20px;">{config_data.get('description', 'A customizable AI chat interface')}</p> | |
<div style="border: 1px solid #ccc; border-radius: 4px; background: white; min-height: 200px; padding: 15px; margin-bottom: 15px;"> | |
<div style="color: #888; text-align: center; padding: 50px 0;">Chat Interface Preview</div> | |
<div style="background: #f0f8ff; padding: 10px; border-radius: 4px; margin-bottom: 10px; border-left: 3px solid #0066cc;"> | |
<strong>Assistant:</strong> Hello! I'm ready to help you. How can I assist you today? | |
</div> | |
</div> | |
<div style="border: 1px solid #ccc; border-radius: 4px; padding: 10px; background: white;"> | |
<input type="text" placeholder="Type your message here..." style="width: 70%; padding: 8px; border: 1px solid #ddd; border-radius: 4px; margin-right: 10px;"> | |
<button style="padding: 8px 15px; background: #0066cc; color: white; border: none; border-radius: 4px; cursor: pointer;">Send</button> | |
</div> | |
<div style="margin-top: 15px; padding: 10px; background: #f0f0f0; border-radius: 4px; font-size: 12px; color: #666;"> | |
<strong>Configuration:</strong> Model: {config_data.get('model', 'N/A')} | Temperature: {config_data.get('temperature', 'N/A')} | Max Tokens: {config_data.get('max_tokens', 'N/A')} | |
</div> | |
</div> | |
""" | |
return preview_text, preview_html | |
def on_generate(name, description, system_prompt, enable_research_assistant, role_purpose, intended_audience, key_tasks, additional_context, custom_role_purpose, custom_intended_audience, custom_key_tasks, custom_additional_context, model, api_key_var, temperature, max_tokens, examples_text, access_code, enable_dynamic_urls, url1, url2, url3, url4, enable_vector_rag, rag_tool_state): | |
if not name or not name.strip(): | |
return gr.update(value="Error: Please provide a Space Title", visible=True), gr.update(visible=False) | |
try: | |
# Get RAG data if enabled | |
rag_data = None | |
if enable_vector_rag and rag_tool_state: | |
rag_data = rag_tool_state.get_serialized_data() | |
# Combine system prompt components if research assistant is enabled | |
if enable_research_assistant: | |
# Use the research assistant fields if enabled | |
if not role_purpose or not role_purpose.strip(): | |
return gr.update(value="Error: Please provide a Role and Purpose for the research assistant", visible=True), gr.update(visible=False) | |
system_prompt_parts = [] | |
if role_purpose and role_purpose.strip(): | |
system_prompt_parts.append(role_purpose.strip()) | |
if intended_audience and intended_audience.strip(): | |
system_prompt_parts.append(intended_audience.strip()) | |
if key_tasks and key_tasks.strip(): | |
system_prompt_parts.append(key_tasks.strip()) | |
if additional_context and additional_context.strip(): | |
system_prompt_parts.append(additional_context.strip()) | |
final_system_prompt = " ".join(system_prompt_parts) | |
else: | |
# Use the direct system prompt field | |
if not system_prompt or not system_prompt.strip(): | |
return gr.update(value="Error: Please provide a System Prompt for the assistant", visible=True), gr.update(visible=False) | |
final_system_prompt = system_prompt.strip() | |
filename = generate_zip(name, description, final_system_prompt, model, api_key_var, temperature, max_tokens, examples_text, access_code, enable_dynamic_urls, url1, url2, url3, url4, enable_vector_rag, rag_data) | |
success_msg = f"""**Deployment package ready!** | |
**File**: `{filename}` | |
**What's included:** | |
- `app.py` - Ready-to-deploy chat interface | |
- `requirements.txt` - Dependencies | |
- `README.md` - Step-by-step deployment guide | |
- `config.json` - Configuration backup | |
**Next steps:** | |
1. Download the zip file below | |
2. Follow the README instructions to deploy on HuggingFace Spaces | |
3. Set your `{api_key_var}` secret in Space settings | |
**Your Space will be live in minutes!**""" | |
# Update sandbox preview | |
config_data = { | |
'name': name, | |
'description': description, | |
'system_prompt': final_system_prompt, | |
'model': model, | |
'temperature': temperature, | |
'max_tokens': max_tokens, | |
'enable_dynamic_urls': enable_dynamic_urls, | |
'enable_vector_rag': enable_vector_rag, | |
'filename': filename | |
} | |
return gr.update(value=success_msg, visible=True), gr.update(value=filename, visible=True), config_data | |
except Exception as e: | |
return gr.update(value=f"Error: {str(e)}", visible=True), gr.update(visible=False) | |
# Global cache for URL content to avoid re-crawling | |
url_content_cache = {} | |
def get_cached_grounding_context(urls): | |
"""Get grounding context with caching to avoid re-crawling same URLs""" | |
if not urls: | |
return "" | |
# Filter valid URLs | |
valid_urls = [url for url in urls if url and url.strip()] | |
if not valid_urls: | |
return "" | |
# Create cache key from sorted URLs | |
cache_key = tuple(sorted(valid_urls)) | |
# Check if we already have this content cached | |
if cache_key in url_content_cache: | |
return url_content_cache[cache_key] | |
# If not cached, fetch using Crawl4AI | |
grounding_context = get_grounding_context_crawl4ai(valid_urls) | |
# Cache the result | |
url_content_cache[cache_key] = grounding_context | |
return grounding_context | |
def respond_with_cache_update(message, chat_history, url1="", url2="", url3="", url4=""): | |
"""Wrapper that updates cache status after responding""" | |
msg, history = respond(message, chat_history, url1, url2, url3, url4) | |
cache_status = get_cache_status() | |
return msg, history, cache_status | |
def respond(message, chat_history, url1="", url2="", url3="", url4=""): | |
# Make actual API request to OpenRouter | |
import os | |
import requests | |
# Get API key from environment | |
api_key = os.environ.get("OPENROUTER_API_KEY") | |
if not api_key: | |
response = "Please set your OPENROUTER_API_KEY in the Space settings to use the chat support." | |
chat_history.append({"role": "user", "content": message}) | |
chat_history.append({"role": "assistant", "content": response}) | |
return "", chat_history | |
# Get grounding context from URLs using cached approach | |
grounding_urls = [url1, url2, url3, url4] | |
grounding_context = get_cached_grounding_context(grounding_urls) | |
# Build enhanced system prompt with grounding context | |
base_system_prompt = """You are an expert assistant specializing in Gradio configurations for HuggingFace Spaces. You have deep knowledge of: | |
- Gradio interface components and layouts | |
- HuggingFace Spaces configuration (YAML frontmatter, secrets, environment variables) | |
- Deployment best practices for Gradio apps on HuggingFace | |
- Space settings, SDK versions, and hardware requirements | |
- Troubleshooting common Gradio and HuggingFace Spaces issues | |
- Integration with various APIs and models through Gradio interfaces | |
Provide specific, technical guidance focused on Gradio implementation details and HuggingFace Spaces deployment. Include code examples when relevant. Keep responses concise and actionable.""" | |
enhanced_system_prompt = base_system_prompt + grounding_context | |
# Build conversation history for API | |
messages = [{ | |
"role": "system", | |
"content": enhanced_system_prompt | |
}] | |
# Add conversation history - Support both new messages format and legacy tuple format | |
for chat in chat_history: | |
if isinstance(chat, dict): | |
# New format: {"role": "user", "content": "..."} | |
messages.append(chat) | |
elif isinstance(chat, (list, tuple)) and len(chat) >= 2: | |
# Legacy format: ("user msg", "bot msg") | |
user_msg, assistant_msg = chat[0], chat[1] | |
if user_msg: | |
messages.append({"role": "user", "content": user_msg}) | |
if assistant_msg: | |
messages.append({"role": "assistant", "content": assistant_msg}) | |
# Add current message | |
messages.append({"role": "user", "content": message}) | |
try: | |
# Make API request to OpenRouter | |
response = requests.post( | |
url="https://openrouter.ai/api/v1/chat/completions", | |
headers={ | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
}, | |
json={ | |
"model": "google/gemini-2.0-flash-001", | |
"messages": messages, | |
"temperature": 0.7, | |
"max_tokens": 500 | |
} | |
) | |
if response.status_code == 200: | |
assistant_response = response.json()['choices'][0]['message']['content'] | |
else: | |
assistant_response = f"Error: {response.status_code} - {response.text}" | |
except Exception as e: | |
assistant_response = f"Error: {str(e)}" | |
chat_history.append({"role": "user", "content": message}) | |
chat_history.append({"role": "assistant", "content": assistant_response}) | |
return "", chat_history | |
def clear_chat(): | |
return "", [] | |
def clear_url_cache(): | |
"""Clear the URL content cache""" | |
global url_content_cache | |
url_content_cache.clear() | |
return "✅ URL cache cleared. Next request will re-fetch content." | |
def get_cache_status(): | |
"""Get current cache status""" | |
if not url_content_cache: | |
return "🔄 No URLs cached" | |
return f"💾 {len(url_content_cache)} URL set(s) cached" | |
def add_urls(count): | |
"""Show additional URL fields""" | |
if count == 2: | |
return (gr.update(visible=True), gr.update(visible=False), | |
gr.update(value="+ Add URLs"), gr.update(visible=True), 3) | |
elif count == 3: | |
return (gr.update(visible=True), gr.update(visible=True), | |
gr.update(value="Max URLs", interactive=False), gr.update(visible=True), 4) | |
else: | |
return (gr.update(), gr.update(), gr.update(), gr.update(), count) | |
def remove_urls(count): | |
"""Hide URL fields""" | |
if count == 4: | |
return (gr.update(visible=True), gr.update(visible=False, value=""), | |
gr.update(value="+ Add URLs", interactive=True), gr.update(visible=True), 3) | |
elif count == 3: | |
return (gr.update(visible=False, value=""), gr.update(visible=False, value=""), | |
gr.update(value="+ Add URLs", interactive=True), gr.update(visible=False), 2) | |
else: | |
return (gr.update(), gr.update(), gr.update(), gr.update(), count) | |
def add_chat_urls(count): | |
"""Show additional chat URL fields""" | |
if count == 2: | |
return (gr.update(visible=True), gr.update(visible=False), | |
gr.update(value="+ Add URLs"), gr.update(visible=True), 3) | |
elif count == 3: | |
return (gr.update(visible=True), gr.update(visible=True), | |
gr.update(value="Max URLs", interactive=False), gr.update(visible=True), 4) | |
else: | |
return (gr.update(), gr.update(), gr.update(), gr.update(), count) | |
def remove_chat_urls(count): | |
"""Hide chat URL fields""" | |
if count == 4: | |
return (gr.update(visible=True), gr.update(visible=False, value=""), | |
gr.update(value="+ Add URLs", interactive=True), gr.update(visible=True), 3) | |
elif count == 3: | |
return (gr.update(visible=False, value=""), gr.update(visible=False, value=""), | |
gr.update(value="+ Add URLs", interactive=True), gr.update(visible=False), 2) | |
else: | |
return (gr.update(), gr.update(), gr.update(), gr.update(), count) | |
def toggle_research_assistant(enable_research): | |
"""Toggle visibility of research assistant detailed fields and disable custom categories""" | |
if enable_research: | |
combined_prompt = "You are a research assistant that provides link-grounded information through Crawl4AI web fetching. Use MLA documentation for parenthetical citations and bibliographic entries. This assistant is designed for students and researchers conducting academic inquiry. Your main responsibilities include: analyzing academic sources, fact-checking claims with evidence, providing properly cited research summaries, and helping users navigate scholarly information. Ground all responses in provided URL contexts and any additional URLs you're instructed to fetch. Never rely on memory for factual claims." | |
return ( | |
gr.update(visible=True), # Show research detailed fields | |
gr.update(value=combined_prompt), # Update main system prompt | |
gr.update(value="You are a research assistant that provides link-grounded information through Crawl4AI web fetching. Use MLA documentation for parenthetical citations and bibliographic entries."), | |
gr.update(value="This assistant is designed for students and researchers conducting academic inquiry."), | |
gr.update(value="Your main responsibilities include: analyzing academic sources, fact-checking claims with evidence, providing properly cited research summaries, and helping users navigate scholarly information."), | |
gr.update(value="Ground all responses in provided URL contexts and any additional URLs you're instructed to fetch. Never rely on memory for factual claims."), | |
gr.update(value=True), # Enable dynamic URL fetching for research template | |
gr.update(value=False), # Force disable custom categories checkbox | |
gr.update(visible=False) # Force hide custom categories fields | |
) | |
else: | |
return ( | |
gr.update(visible=False), # Hide research detailed fields | |
gr.update(value=""), # Clear main system prompt | |
gr.update(value=""), # Clear research fields | |
gr.update(value=""), | |
gr.update(value=""), | |
gr.update(value=""), | |
gr.update(value=False), # Disable dynamic URL setting | |
gr.update(value=False), # Ensure custom categories stays disabled | |
gr.update(visible=False) # Ensure custom categories fields stay hidden | |
) | |
def update_system_prompt_from_fields(role_purpose, intended_audience, key_tasks, additional_context): | |
"""Update the main system prompt field when research assistant fields change""" | |
parts = [] | |
if role_purpose and role_purpose.strip(): | |
parts.append(role_purpose.strip()) | |
if intended_audience and intended_audience.strip(): | |
parts.append(intended_audience.strip()) | |
if key_tasks and key_tasks.strip(): | |
parts.append(key_tasks.strip()) | |
if additional_context and additional_context.strip(): | |
parts.append(additional_context.strip()) | |
combined = " ".join(parts) | |
return gr.update(value=combined) | |
def toggle_custom_categories(enable_custom): | |
"""Toggle visibility of custom categories fields and disable research assistant""" | |
if enable_custom: | |
return ( | |
gr.update(visible=True), # Show custom categories fields | |
gr.update(value=False), # Force disable research assistant checkbox | |
gr.update(visible=False) # Force hide research assistant fields | |
) | |
else: | |
return ( | |
gr.update(visible=False), # Hide custom categories fields | |
gr.update(value=False), # Ensure research assistant stays disabled | |
gr.update(visible=False) # Ensure research assistant fields stay hidden | |
) | |
# Create Gradio interface with proper tab structure | |
with gr.Blocks(title="Chat U/I Helper") as demo: | |
# Global state for cross-tab functionality | |
sandbox_state = gr.State({}) | |
with gr.Tabs(): | |
with gr.Tab("Configuration"): | |
gr.Markdown("# Spaces Configuration") | |
gr.Markdown("Convert custom assistants from HuggingChat into chat interfaces with HuggingFace Spaces. Configure and download everything needed to deploy a simple HF space using Gradio.") | |
with gr.Column(): | |
name = gr.Textbox( | |
label="Space Title", | |
placeholder="My Course Helper", | |
value="My Custom Space" | |
) | |
description = gr.Textbox( | |
label="Space Description", | |
placeholder="A customizable AI chat interface for...", | |
lines=2, | |
value="" | |
) | |
model = gr.Dropdown( | |
label="Model", | |
choices=MODELS, | |
value=MODELS[0], | |
info="Choose based on the context and purposes of your space" | |
) | |
api_key_var = gr.Textbox( | |
label="API Key Variable Name", | |
value="OPENROUTER_API_KEY", | |
info="Name for the secret in HuggingFace Space settings" | |
) | |
access_code = gr.Textbox( | |
label="Access Code (Optional)", | |
placeholder="Leave empty for public access, or enter code for student access", | |
info="If set, students must enter this code to access the chatbot", | |
type="password" | |
) | |
with gr.Accordion("Assistant Configuration", open=True): | |
gr.Markdown("### Configure your assistant's behavior and capabilities") | |
gr.Markdown("Define the system prompt and assistant settings. You can use pre-configured templates or custom fields.") | |
# Main system prompt field - always visible | |
system_prompt = gr.Textbox( | |
label="System Prompt", | |
placeholder="You are a helpful assistant that...", | |
lines=4, | |
value="", | |
info="Define the assistant's role, purpose, and behavior in a single prompt" | |
) | |
# Assistant configuration options | |
with gr.Row(): | |
enable_research_assistant = gr.Checkbox( | |
label="Research Template", | |
value=False, | |
info="Enable to use pre-configured research assistant settings" | |
) | |
enable_custom_categories = gr.Checkbox( | |
label="Use Custom Categories", | |
value=False, | |
info="Enable structured fields for defining your assistant" | |
) | |
# Detailed fields for research assistant (initially hidden) | |
with gr.Column(visible=False) as research_detailed_fields: | |
gr.Markdown("*The system prompt above will be automatically populated with these fields when enabled*") | |
role_purpose = gr.Textbox( | |
label="Role and Purpose", | |
placeholder="You are a research assistant that...", | |
lines=2, | |
value="", | |
info="Define what the assistant is and its primary function" | |
) | |
intended_audience = gr.Textbox( | |
label="Intended Audience", | |
placeholder="This assistant is designed for undergraduate students...", | |
lines=2, | |
value="", | |
info="Specify who will be using this assistant and their context" | |
) | |
key_tasks = gr.Textbox( | |
label="Key Tasks", | |
placeholder="Your main responsibilities include...", | |
lines=3, | |
value="", | |
info="List the specific tasks and capabilities the assistant should focus on" | |
) | |
additional_context = gr.Textbox( | |
label="Additional Context", | |
placeholder="Remember to always...", | |
lines=2, | |
value="", | |
info="Any additional instructions, constraints, or behavioral guidelines" | |
) | |
# Custom categories fields (initially hidden) | |
with gr.Column(visible=False) as custom_categories_fields: | |
gr.Markdown("#### Custom Assistant Categories") | |
gr.Markdown("*The system prompt above will be automatically populated with these fields when enabled*") | |
custom_role_purpose = gr.Textbox( | |
label="Role and Purpose", | |
placeholder="Define what the assistant is and its primary function", | |
lines=2, | |
value="", | |
info="Define what the assistant is and its primary function" | |
) | |
custom_intended_audience = gr.Textbox( | |
label="Intended Audience", | |
placeholder="Specify who will be using this assistant and their context", | |
lines=2, | |
value="", | |
info="Specify who will be using this assistant and their context" | |
) | |
custom_key_tasks = gr.Textbox( | |
label="Key Tasks", | |
placeholder="List the specific tasks and capabilities the assistant should focus on", | |
lines=3, | |
value="", | |
info="List the specific tasks and capabilities the assistant should focus on" | |
) | |
custom_additional_context = gr.Textbox( | |
label="Additional Context", | |
placeholder="Any additional instructions, constraints, or behavioral guidelines", | |
lines=2, | |
value="", | |
info="Any additional instructions, constraints, or behavioral guidelines" | |
) | |
examples_text = gr.Textbox( | |
label="Example Prompts (one per line)", | |
placeholder="Can you analyze this research paper: https://example.com/paper.pdf\nWhat are the latest findings on climate change adaptation?\nHelp me fact-check claims about renewable energy efficiency", | |
lines=3, | |
info="These will appear as clickable examples in the chat interface" | |
) | |
with gr.Accordion("Tool Settings", open=False): | |
enable_dynamic_urls = gr.Checkbox( | |
label="Enable Dynamic URL Fetching", | |
value=False, | |
info="Allow the assistant to fetch additional URLs mentioned in conversations (uses Crawl4AI)" | |
) | |
enable_vector_rag = gr.Checkbox( | |
label="Enable Document RAG", | |
value=False, | |
info="Upload documents for context-aware responses (PDF, DOCX, TXT, MD)", | |
visible=True if HAS_RAG else False | |
) | |
with gr.Column(visible=False) as rag_section: | |
gr.Markdown("### Document Upload") | |
file_upload = gr.File( | |
label="Upload Documents", | |
file_types=[".pdf", ".docx", ".txt", ".md"], | |
file_count="multiple", | |
type="filepath" | |
) | |
process_btn = gr.Button("Process Documents", variant="secondary") | |
rag_status = gr.Markdown() | |
# State to store RAG tool | |
rag_tool_state = gr.State(None) | |
with gr.Accordion("URL Grounding (Optional)", open=False): | |
gr.Markdown("Add URLs to provide context. Content will be fetched and added to the system prompt.") | |
# Initial URL fields | |
url1 = gr.Textbox( | |
label="URL 1", | |
placeholder="https://example.com/page1", | |
info="First URL for context grounding" | |
) | |
url2 = gr.Textbox( | |
label="URL 2", | |
placeholder="https://example.com/page2", | |
info="Second URL for context grounding" | |
) | |
# Additional URL fields (initially hidden) | |
url3 = gr.Textbox( | |
label="URL 3", | |
placeholder="https://example.com/page3", | |
info="Third URL for context grounding", | |
visible=False | |
) | |
url4 = gr.Textbox( | |
label="URL 4", | |
placeholder="https://example.com/page4", | |
info="Fourth URL for context grounding", | |
visible=False | |
) | |
# URL management buttons | |
with gr.Row(): | |
add_url_btn = gr.Button("+ Add URLs", size="sm") | |
remove_url_btn = gr.Button("- Remove URLs", size="sm", visible=False) | |
url_count = gr.State(2) # Track number of visible URLs | |
examples_text = gr.Textbox( | |
label="Example Prompts (one per line)", | |
placeholder="Can you analyze this research paper: https://example.com/paper.pdf\nWhat are the latest findings on climate change adaptation?\nHelp me fact-check claims about renewable energy efficiency", | |
lines=3, | |
info="These will appear as clickable examples in the chat interface" | |
) | |
with gr.Row(): | |
temperature = gr.Slider( | |
label="Temperature", | |
minimum=0, | |
maximum=2, | |
value=0.7, | |
step=0.1, | |
info="Higher = more creative, Lower = more focused" | |
) | |
max_tokens = gr.Slider( | |
label="Max Response Tokens", | |
minimum=50, | |
maximum=4096, | |
value=500, | |
step=50 | |
) | |
generate_btn = gr.Button("Generate Deployment Package", variant="primary") | |
status = gr.Markdown(visible=False) | |
download_file = gr.File(label="Download your zip package", visible=False) | |
# Connect the research assistant checkbox | |
enable_research_assistant.change( | |
toggle_research_assistant, | |
inputs=[enable_research_assistant], | |
outputs=[research_detailed_fields, system_prompt, role_purpose, intended_audience, key_tasks, additional_context, enable_dynamic_urls, enable_custom_categories, custom_categories_fields] | |
) | |
# Connect the custom categories checkbox | |
enable_custom_categories.change( | |
toggle_custom_categories, | |
inputs=[enable_custom_categories], | |
outputs=[custom_categories_fields, enable_research_assistant, research_detailed_fields] | |
) | |
# Connect research assistant fields to update main system prompt | |
for field in [role_purpose, intended_audience, key_tasks, additional_context]: | |
field.change( | |
update_system_prompt_from_fields, | |
inputs=[role_purpose, intended_audience, key_tasks, additional_context], | |
outputs=[system_prompt] | |
) | |
# Connect custom categories fields to update main system prompt | |
for field in [custom_role_purpose, custom_intended_audience, custom_key_tasks, custom_additional_context]: | |
field.change( | |
update_system_prompt_from_fields, | |
inputs=[custom_role_purpose, custom_intended_audience, custom_key_tasks, custom_additional_context], | |
outputs=[system_prompt] | |
) | |
# Connect the URL management buttons | |
add_url_btn.click( | |
add_urls, | |
inputs=[url_count], | |
outputs=[url3, url4, add_url_btn, remove_url_btn, url_count] | |
) | |
remove_url_btn.click( | |
remove_urls, | |
inputs=[url_count], | |
outputs=[url3, url4, add_url_btn, remove_url_btn, url_count] | |
) | |
# Connect RAG functionality | |
enable_vector_rag.change( | |
toggle_rag_section, | |
inputs=[enable_vector_rag], | |
outputs=[rag_section] | |
) | |
process_btn.click( | |
process_documents, | |
inputs=[file_upload, rag_tool_state], | |
outputs=[rag_status, rag_tool_state] | |
) | |
# Connect the generate button | |
generate_btn.click( | |
on_generate, | |
inputs=[name, description, system_prompt, enable_research_assistant, role_purpose, intended_audience, key_tasks, additional_context, custom_role_purpose, custom_intended_audience, custom_key_tasks, custom_additional_context, model, api_key_var, temperature, max_tokens, examples_text, access_code, enable_dynamic_urls, url1, url2, url3, url4, enable_vector_rag, rag_tool_state], | |
outputs=[status, download_file, sandbox_state] | |
) | |
with gr.Tab("Support"): | |
gr.Markdown("# Chat Support") | |
gr.Markdown("Get personalized guidance on configuring chat assistants as HuggingFace Spaces for educational & research purposes.") | |
# Meta chat interface | |
with gr.Column(): | |
chatbot = gr.Chatbot( | |
value=[], | |
label="Chat Support Assistant", | |
height=400, | |
type="messages" | |
) | |
msg = gr.Textbox( | |
label="Ask about configuring chat UIs for courses, research, or custom HuggingFace Spaces", | |
placeholder="How can I configure a chat UI for my senior seminar?", | |
lines=2 | |
) | |
with gr.Accordion("URL Grounding (Optional)", open=False): | |
gr.Markdown("Add URLs to provide additional context for more informed responses") | |
chat_url1 = gr.Textbox( | |
label="URL 1", | |
value="https://huggingface.co/docs/hub/en/spaces-overview", | |
info="HuggingFace Spaces Overview" | |
) | |
chat_url2 = gr.Textbox( | |
label="URL 2", | |
value="", | |
placeholder="https://example.com/page2", | |
info="Additional context URL" | |
) | |
# Additional URL fields for chat (initially hidden) | |
chat_url3 = gr.Textbox( | |
label="URL 3", | |
placeholder="https://example.com/page3", | |
info="Additional context URL", | |
visible=False | |
) | |
chat_url4 = gr.Textbox( | |
label="URL 4", | |
placeholder="https://example.com/page4", | |
info="Additional context URL", | |
visible=False | |
) | |
# Chat URL management buttons | |
with gr.Row(): | |
add_chat_url_btn = gr.Button("+ Add URLs", size="sm") | |
remove_chat_url_btn = gr.Button("- Remove URLs", size="sm", visible=False) | |
chat_url_count = gr.State(2) # Track number of visible chat URLs | |
# Cache controls | |
with gr.Row(): | |
cache_status = gr.Markdown("🔄 No URLs cached") | |
clear_cache_btn = gr.Button("Clear URL Cache", size="sm") | |
with gr.Row(): | |
submit = gr.Button("Send", variant="primary") | |
clear = gr.Button("Clear") | |
gr.Examples( | |
examples=[ | |
"How do I set up a course assistant?", | |
"Which model should I use?", | |
"What's a good system prompt?", | |
"Why Gradio? What is it?", | |
"How do I customize the chat interface?", | |
"Can you help me troubleshoot?", | |
], | |
inputs=msg | |
) | |
# Connect the chat URL management buttons | |
add_chat_url_btn.click( | |
add_chat_urls, | |
inputs=[chat_url_count], | |
outputs=[chat_url3, chat_url4, add_chat_url_btn, remove_chat_url_btn, chat_url_count] | |
) | |
remove_chat_url_btn.click( | |
remove_chat_urls, | |
inputs=[chat_url_count], | |
outputs=[chat_url3, chat_url4, add_chat_url_btn, remove_chat_url_btn, chat_url_count] | |
) | |
# Connect cache controls | |
clear_cache_btn.click(clear_url_cache, outputs=[cache_status]) | |
# Connect the chat functionality | |
submit.click(respond_with_cache_update, [msg, chatbot, chat_url1, chat_url2, chat_url3, chat_url4], [msg, chatbot, cache_status]) | |
msg.submit(respond_with_cache_update, [msg, chatbot, chat_url1, chat_url2, chat_url3, chat_url4], [msg, chatbot, cache_status]) | |
clear.click(clear_chat, outputs=[msg, chatbot]) | |
with gr.Tab("Sandbox"): | |
gr.Markdown("# Generated Space Preview") | |
gr.Markdown("Preview your generated HuggingFace Space before deployment.") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
preview_info_display = gr.Markdown("Generate a space configuration to see preview here.") | |
with gr.Column(scale=2): | |
preview_iframe_display = gr.HTML("<div style='text-align: center; padding: 50px; color: #666;'>No preview available</div>") | |
if __name__ == "__main__": | |
# Check if running in local development with dev tunnels | |
if os.environ.get('CODESPACES') or 'devtunnels.ms' in os.environ.get('GRADIO_SERVER_NAME', ''): | |
demo.launch(share=True, allowed_paths=[], server_name="0.0.0.0") | |
else: | |
demo.launch(share=True) |