Spaces:
Running
Running
import gradio as gr | |
import json | |
import zipfile | |
import io | |
import os | |
from datetime import datetime | |
from dotenv import load_dotenv | |
import requests | |
from bs4 import BeautifulSoup | |
import tempfile | |
from pathlib import Path | |
# from scraping_service import get_grounding_context_crawl4ai, fetch_url_content_crawl4ai | |
# Temporary mock functions for testing | |
def get_grounding_context_crawl4ai(urls): | |
return "\n\n[URL content would be fetched here]\n\n" | |
def fetch_url_content_crawl4ai(url): | |
return f"[Content from {url} would be fetched here]" | |
# Import RAG components | |
try: | |
from rag_tool import RAGTool | |
HAS_RAG = True | |
except ImportError: | |
HAS_RAG = False | |
RAGTool = None | |
# Load environment variables from .env file | |
load_dotenv() | |
# Template for generated space app (based on mvp_simple.py) | |
SPACE_TEMPLATE = '''import gradio as gr | |
import os | |
import requests | |
import json | |
import asyncio | |
from crawl4ai import AsyncWebCrawler | |
# Configuration | |
SPACE_NAME = "{name}" | |
SPACE_DESCRIPTION = "{description}" | |
SYSTEM_PROMPT = """{system_prompt}""" | |
MODEL = "{model}" | |
GROUNDING_URLS = {grounding_urls} | |
ACCESS_CODE = "{access_code}" | |
ENABLE_DYNAMIC_URLS = {enable_dynamic_urls} | |
ENABLE_VECTOR_RAG = {enable_vector_rag} | |
RAG_DATA = {rag_data_json} | |
# Get API key from environment - customizable variable name | |
API_KEY = os.environ.get("{api_key_var}") | |
async def fetch_url_content_async(url, crawler): | |
"""Fetch and extract text content from a URL using Crawl4AI""" | |
try: | |
result = await crawler.arun( | |
url=url, | |
bypass_cache=True, | |
word_count_threshold=10, | |
excluded_tags=['script', 'style', 'nav', 'header', 'footer'], | |
remove_overlay_elements=True | |
) | |
if result.success: | |
content = result.markdown or result.cleaned_html or "" | |
# Truncate to ~4000 characters | |
if len(content) > 4000: | |
content = content[:4000] + "..." | |
return content | |
else: | |
return f"Error fetching {{url}}: Failed to retrieve content" | |
except Exception as e: | |
return f"Error fetching {{url}}: {{str(e)}}" | |
def fetch_url_content(url): | |
"""Synchronous wrapper for URL fetching""" | |
async def fetch(): | |
async with AsyncWebCrawler(verbose=False) as crawler: | |
return await fetch_url_content_async(url, crawler) | |
try: | |
return asyncio.run(fetch()) | |
except Exception as e: | |
return f"Error fetching {{url}}: {{str(e)}}" | |
# Global cache for URL content to avoid re-crawling in generated spaces | |
_url_content_cache = {{}} | |
def get_grounding_context(): | |
"""Fetch context from grounding URLs with caching""" | |
if not GROUNDING_URLS: | |
return "" | |
# Create cache key from URLs | |
cache_key = tuple(sorted([url for url in GROUNDING_URLS if url and url.strip()])) | |
# Check cache first | |
if cache_key in _url_content_cache: | |
return _url_content_cache[cache_key] | |
context_parts = [] | |
for i, url in enumerate(GROUNDING_URLS, 1): | |
if url.strip(): | |
content = fetch_url_content(url.strip()) | |
context_parts.append(f"Context from URL {{i}} ({{url}}):\\n{{content}}") | |
if context_parts: | |
result = "\\n\\n" + "\\n\\n".join(context_parts) + "\\n\\n" | |
else: | |
result = "" | |
# Cache the result | |
_url_content_cache[cache_key] = result | |
return result | |
import re | |
def extract_urls_from_text(text): | |
"""Extract URLs from text using regex""" | |
url_pattern = r'https?://[^\\s<>"{{}}|\\^`\\[\\]"]+' | |
return re.findall(url_pattern, text) | |
# Initialize RAG context if enabled | |
if ENABLE_VECTOR_RAG and RAG_DATA: | |
try: | |
import faiss | |
import numpy as np | |
import base64 | |
class SimpleRAGContext: | |
def __init__(self, rag_data): | |
# Deserialize FAISS index | |
index_bytes = base64.b64decode(rag_data['index_base64']) | |
self.index = faiss.deserialize_index(index_bytes) | |
# Restore chunks and mappings | |
self.chunks = rag_data['chunks'] | |
self.chunk_ids = rag_data['chunk_ids'] | |
def get_context(self, query, max_chunks=3): | |
"""Get relevant context - simplified version""" | |
# In production, you'd compute query embedding here | |
# For now, return a simple message | |
return "\\n\\n[RAG context would be retrieved here based on similarity search]\\n\\n" | |
rag_context_provider = SimpleRAGContext(RAG_DATA) | |
except Exception as e: | |
print(f"Failed to initialize RAG: {{e}}") | |
rag_context_provider = None | |
else: | |
rag_context_provider = None | |
def generate_response(message, history): | |
"""Generate response using OpenRouter API""" | |
if not API_KEY: | |
return "Please set your {api_key_var} in the Space settings." | |
# Get grounding context | |
grounding_context = get_grounding_context() | |
# Add RAG context if available | |
if ENABLE_VECTOR_RAG and rag_context_provider: | |
rag_context = rag_context_provider.get_context(message) | |
if rag_context: | |
grounding_context += rag_context | |
# If dynamic URLs are enabled, check message for URLs to fetch | |
if ENABLE_DYNAMIC_URLS: | |
urls_in_message = extract_urls_from_text(message) | |
if urls_in_message: | |
# Fetch content from URLs mentioned in the message | |
dynamic_context_parts = [] | |
for url in urls_in_message[:3]: # Limit to 3 URLs per message | |
content = fetch_url_content(url) | |
dynamic_context_parts.append(f"\\n\\nDynamic context from {{url}}:\\n{{content}}") | |
if dynamic_context_parts: | |
grounding_context += "\\n".join(dynamic_context_parts) | |
# Build enhanced system prompt with grounding context | |
enhanced_system_prompt = SYSTEM_PROMPT + grounding_context | |
# Build messages array for the API | |
messages = [{{"role": "system", "content": enhanced_system_prompt}}] | |
# Add conversation history - compatible with Gradio 5.x format | |
for chat in history: | |
if isinstance(chat, dict): | |
# New format: {{"role": "user", "content": "..."}} or {{"role": "assistant", "content": "..."}} | |
messages.append(chat) | |
else: | |
# Legacy format: ("user msg", "bot msg") | |
user_msg, bot_msg = chat | |
messages.append({{"role": "user", "content": user_msg}}) | |
if bot_msg: | |
messages.append({{"role": "assistant", "content": bot_msg}}) | |
# Add current message | |
messages.append({{"role": "user", "content": message}}) | |
# Make API request | |
try: | |
response = requests.post( | |
url="https://openrouter.ai/api/v1/chat/completions", | |
headers={{ | |
"Authorization": f"Bearer {{API_KEY}}", | |
"Content-Type": "application/json" | |
}}, | |
json={{ | |
"model": MODEL, | |
"messages": messages, | |
"temperature": {temperature}, | |
"max_tokens": {max_tokens} | |
}} | |
) | |
if response.status_code == 200: | |
return response.json()['choices'][0]['message']['content'] | |
else: | |
return f"Error: {{response.status_code}} - {{response.text}}" | |
except Exception as e: | |
return f"Error: {{str(e)}}" | |
# Access code verification | |
access_granted = gr.State(False) | |
def verify_access_code(code): | |
\"\"\"Verify the access code\"\"\" | |
if not ACCESS_CODE: | |
return gr.update(visible=False), gr.update(visible=True), True | |
if code == ACCESS_CODE: | |
return gr.update(visible=False), gr.update(visible=True), True | |
else: | |
return gr.update(visible=True, value="❌ Incorrect access code. Please try again."), gr.update(visible=False), False | |
def protected_generate_response(message, history, access_state): | |
\"\"\"Protected response function that checks access\"\"\" | |
if not access_state: | |
return "Please enter the access code to continue." | |
return generate_response(message, history) | |
# Create interface with access code protection | |
with gr.Blocks(title=SPACE_NAME) as demo: | |
gr.Markdown(f"# {{SPACE_NAME}}") | |
gr.Markdown(SPACE_DESCRIPTION) | |
# Access code section (shown only if ACCESS_CODE is set) | |
with gr.Column(visible=bool(ACCESS_CODE)) as access_section: | |
gr.Markdown("### 🔐 Access Required") | |
gr.Markdown("Please enter the access code provided by your instructor:") | |
access_input = gr.Textbox( | |
label="Access Code", | |
placeholder="Enter access code...", | |
type="password" | |
) | |
access_btn = gr.Button("Submit", variant="primary") | |
access_error = gr.Markdown(visible=False) | |
# Main chat interface (hidden until access granted) | |
with gr.Column(visible=not bool(ACCESS_CODE)) as chat_section: | |
chat_interface = gr.ChatInterface( | |
fn=lambda msg, hist: protected_generate_response(msg, hist, access_granted.value), | |
title="", # Title already shown above | |
description="", # Description already shown above | |
examples={examples} | |
) | |
# Connect access verification | |
if ACCESS_CODE: | |
access_btn.click( | |
verify_access_code, | |
inputs=[access_input], | |
outputs=[access_error, chat_section, access_granted] | |
) | |
access_input.submit( | |
verify_access_code, | |
inputs=[access_input], | |
outputs=[access_error, chat_section, access_granted] | |
) | |
if __name__ == "__main__": | |
demo.launch() | |
''' | |
# Available models | |
MODELS = [ | |
"google/gemma-3-27b-it", | |
"google/gemini-2.0-flash-001", | |
"mistralai/mistral-medium", | |
"openai/gpt-4o-nano", | |
"anthropic/claude-3.5-haiku" | |
] | |
def fetch_url_content(url): | |
"""Fetch and extract text content from a URL""" | |
try: | |
response = requests.get(url, timeout=10) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
# Get text content | |
text = soup.get_text() | |
# Clean up whitespace | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = ' '.join(chunk for chunk in chunks if chunk) | |
# Truncate to ~4000 characters | |
if len(text) > 4000: | |
text = text[:4000] + "..." | |
return text | |
except Exception as e: | |
return f"Error fetching {url}: {str(e)}" | |
def get_grounding_context(urls): | |
"""Fetch context from grounding URLs""" | |
if not urls: | |
return "" | |
context_parts = [] | |
for i, url in enumerate(urls, 1): | |
if url and url.strip(): | |
content = fetch_url_content(url.strip()) | |
context_parts.append(f"Context from URL {i} ({url}):\n{content}") | |
if context_parts: | |
return "\n\n" + "\n\n".join(context_parts) + "\n\n" | |
return "" | |
def create_readme(config): | |
"""Generate README with deployment instructions""" | |
return f"""--- | |
title: {config['name']} | |
emoji: 🤖 | |
colorFrom: blue | |
colorTo: red | |
sdk: gradio | |
sdk_version: 4.32.0 | |
app_file: app.py | |
pinned: false | |
--- | |
# {config['name']} | |
{config['description']} | |
## Quick Deploy to HuggingFace Spaces | |
### Step 1: Create the Space | |
1. Go to https://huggingface.co/spaces | |
2. Click "Create new Space" | |
3. Choose a name for your Space | |
4. Select **Gradio** as the SDK | |
5. Set visibility (Public/Private) | |
6. Click "Create Space" | |
### Step 2: Upload Files | |
1. In your new Space, click "Files" tab | |
2. Upload these files from the zip: | |
- `app.py` | |
- `requirements.txt` | |
3. Wait for "Building" to complete | |
### Step 3: Add API Key | |
1. Go to Settings (gear icon) | |
2. Click "Variables and secrets" | |
3. Click "New secret" | |
4. Name: `{config['api_key_var']}` | |
5. Value: Your OpenRouter API key | |
6. Click "Add" | |
{f'''### Step 4: Configure Access Control (Optional) | |
Your Space is configured with access code protection. Students will need to enter the access code to use the chatbot. | |
**Access Code**: `{config['access_code']}` | |
To disable access protection: | |
1. Edit `app.py` in your Space | |
2. Change `ACCESS_CODE = "{config['access_code']}"` to `ACCESS_CODE = ""` | |
3. The Space will rebuild automatically | |
''' if config['access_code'] else ''} | |
### Step {4 if not config['access_code'] else 5}: Get Your API Key | |
1. Go to https://openrouter.ai/keys | |
2. Sign up/login if needed | |
3. Click "Create Key" | |
4. Copy the key (starts with `sk-or-`) | |
### Step {5 if not config['access_code'] else 6}: Test Your Space | |
- Go back to "App" tab | |
- Your Space should be running! | |
- Try the example prompts or ask a question | |
## Configuration | |
- **Model**: {config['model']} | |
- **Temperature**: {config['temperature']} | |
- **Max Tokens**: {config['max_tokens']} | |
- **API Key Variable**: {config['api_key_var']}""" | |
# Add optional configuration items | |
if config['access_code']: | |
readme_content += f""" | |
- **Access Code**: {config['access_code']} (Students need this to access the chatbot)""" | |
if config.get('enable_dynamic_urls'): | |
readme_content += """ | |
- **Dynamic URL Fetching**: Enabled (Assistant can fetch URLs mentioned in conversations)""" | |
readme_content += f""" | |
## Customization | |
To modify your Space: | |
1. Edit `app.py` in your Space | |
2. Update configuration variables at the top | |
3. Changes deploy automatically | |
## Troubleshooting | |
- **"Please set your {config['api_key_var']}"**: Add the secret in Space settings | |
- **Error 401**: Invalid API key or no credits | |
- **Error 429**: Rate limit - wait and try again | |
- **Build failed**: Check requirements.txt formatting | |
## More Help | |
- HuggingFace Spaces: https://huggingface.co/docs/hub/spaces | |
- OpenRouter Docs: https://openrouter.ai/docs | |
- Gradio Docs: https://gradio.app/docs | |
--- | |
Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} with Chat U/I Helper | |
""" | |
return readme_content | |
def create_requirements(enable_vector_rag=False): | |
"""Generate requirements.txt""" | |
base_requirements = "gradio==4.44.1\nrequests==2.32.3\ncrawl4ai==0.4.245" | |
if enable_vector_rag: | |
base_requirements += "\nfaiss-cpu==1.7.4\nnumpy==1.24.3" | |
return base_requirements | |
def generate_zip(name, description, role_purpose, intended_audience, key_tasks, additional_context, model, api_key_var, temperature, max_tokens, examples_text, access_code="", enable_dynamic_urls=False, url1="", url2="", url3="", url4="", enable_vector_rag=False, rag_data=None): | |
"""Generate deployable zip file""" | |
# Process examples | |
if examples_text and examples_text.strip(): | |
examples_list = [ex.strip() for ex in examples_text.split('\n') if ex.strip()] | |
examples_json = json.dumps(examples_list) | |
else: | |
examples_json = json.dumps([ | |
"Hello! How can you help me?", | |
"Tell me something interesting", | |
"What can you do?" | |
]) | |
# Process grounding URLs | |
grounding_urls = [] | |
for url in [url1, url2, url3, url4]: | |
if url and url.strip(): | |
grounding_urls.append(url.strip()) | |
# Combine system prompt components | |
system_prompt_parts = [] | |
if role_purpose and role_purpose.strip(): | |
system_prompt_parts.append(role_purpose.strip()) | |
if intended_audience and intended_audience.strip(): | |
system_prompt_parts.append(intended_audience.strip()) | |
if key_tasks and key_tasks.strip(): | |
system_prompt_parts.append(key_tasks.strip()) | |
if additional_context and additional_context.strip(): | |
system_prompt_parts.append(additional_context.strip()) | |
combined_system_prompt = " ".join(system_prompt_parts) | |
# Create config | |
config = { | |
'name': name, | |
'description': description, | |
'system_prompt': combined_system_prompt, | |
'model': model, | |
'api_key_var': api_key_var, | |
'temperature': temperature, | |
'max_tokens': int(max_tokens), | |
'examples': examples_json, | |
'grounding_urls': json.dumps(grounding_urls), | |
'access_code': access_code or "", | |
'enable_dynamic_urls': enable_dynamic_urls, | |
'enable_vector_rag': enable_vector_rag, | |
'rag_data_json': json.dumps(rag_data) if rag_data else 'null' | |
} | |
# Generate files | |
app_content = SPACE_TEMPLATE.format(**config) | |
readme_content = create_readme(config) | |
requirements_content = create_requirements(enable_vector_rag) | |
# Create zip file with clean naming | |
filename = f"{name.lower().replace(' ', '_').replace('-', '_')}.zip" | |
# Create zip in memory and save to disk | |
zip_buffer = io.BytesIO() | |
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: | |
zip_file.writestr('app.py', app_content) | |
zip_file.writestr('requirements.txt', requirements_content) | |
zip_file.writestr('README.md', readme_content) | |
zip_file.writestr('config.json', json.dumps(config, indent=2)) | |
# Write zip to file | |
zip_buffer.seek(0) | |
with open(filename, 'wb') as f: | |
f.write(zip_buffer.getvalue()) | |
return filename | |
# Define callback functions outside the interface | |
def toggle_rag_section(enable_rag): | |
"""Toggle visibility of RAG section""" | |
return gr.update(visible=enable_rag) | |
def process_documents(files, current_rag_tool): | |
"""Process uploaded documents""" | |
if not files: | |
return "Please upload files first", current_rag_tool | |
if not HAS_RAG: | |
return "RAG functionality not available. Please install required dependencies.", current_rag_tool | |
try: | |
# Initialize RAG tool if not exists | |
if not current_rag_tool: | |
current_rag_tool = RAGTool() | |
# Process files | |
result = current_rag_tool.process_uploaded_files(files) | |
if result['success']: | |
# Create status message | |
status_parts = [f"✅ {result['message']}"] | |
# Add file summary | |
if result['summary']['files_processed']: | |
status_parts.append("\n**Processed files:**") | |
for file_info in result['summary']['files_processed']: | |
status_parts.append(f"- {file_info['name']} ({file_info['chunks']} chunks)") | |
# Add errors if any | |
if result.get('errors'): | |
status_parts.append("\n**Errors:**") | |
for error in result['errors']: | |
status_parts.append(f"- {error['file']}: {error['error']}") | |
# Add index stats | |
if result.get('index_stats'): | |
stats = result['index_stats'] | |
status_parts.append(f"\n**Index stats:** {stats['total_chunks']} chunks, {stats['dimension']}D embeddings") | |
return "\n".join(status_parts), current_rag_tool | |
else: | |
return f"❌ {result['message']}", current_rag_tool | |
except Exception as e: | |
return f"❌ Error processing documents: {str(e)}", current_rag_tool | |
def on_generate(name, description, role_purpose, intended_audience, key_tasks, additional_context, model, api_key_var, temperature, max_tokens, examples_text, access_code, enable_dynamic_urls, url1, url2, url3, url4, enable_vector_rag, rag_tool_state): | |
if not name or not name.strip(): | |
return gr.update(value="Error: Please provide a Space Title", visible=True), gr.update(visible=False) | |
if not role_purpose or not role_purpose.strip(): | |
return gr.update(value="Error: Please provide a Role and Purpose for the assistant", visible=True), gr.update(visible=False) | |
try: | |
# Get RAG data if enabled | |
rag_data = None | |
if enable_vector_rag and rag_tool_state: | |
rag_data = rag_tool_state.get_serialized_data() | |
filename = generate_zip(name, description, role_purpose, intended_audience, key_tasks, additional_context, model, api_key_var, temperature, max_tokens, examples_text, access_code, enable_dynamic_urls, url1, url2, url3, url4, enable_vector_rag, rag_data) | |
success_msg = f"""**Deployment package ready!** | |
**File**: `{filename}` | |
**What's included:** | |
- `app.py` - Ready-to-deploy chat interface | |
- `requirements.txt` - Dependencies | |
- `README.md` - Step-by-step deployment guide | |
- `config.json` - Configuration backup | |
**Next steps:** | |
1. Download the zip file below | |
2. Follow the README instructions to deploy on HuggingFace Spaces | |
3. Set your `{api_key_var}` secret in Space settings | |
**Your Space will be live in minutes!**""" | |
return gr.update(value=success_msg, visible=True), gr.update(value=filename, visible=True) | |
except Exception as e: | |
return gr.update(value=f"Error: {str(e)}", visible=True), gr.update(visible=False) | |
# Global cache for URL content to avoid re-crawling | |
url_content_cache = {} | |
def get_cached_grounding_context(urls): | |
"""Get grounding context with caching to avoid re-crawling same URLs""" | |
if not urls: | |
return "" | |
# Filter valid URLs | |
valid_urls = [url for url in urls if url and url.strip()] | |
if not valid_urls: | |
return "" | |
# Create cache key from sorted URLs | |
cache_key = tuple(sorted(valid_urls)) | |
# Check if we already have this content cached | |
if cache_key in url_content_cache: | |
return url_content_cache[cache_key] | |
# If not cached, fetch using Crawl4AI | |
grounding_context = get_grounding_context_crawl4ai(valid_urls) | |
# Cache the result | |
url_content_cache[cache_key] = grounding_context | |
return grounding_context | |
def respond_with_cache_update(message, chat_history, url1="", url2="", url3="", url4=""): | |
"""Wrapper that updates cache status after responding""" | |
msg, history = respond(message, chat_history, url1, url2, url3, url4) | |
cache_status = get_cache_status() | |
return msg, history, cache_status | |
def respond(message, chat_history, url1="", url2="", url3="", url4=""): | |
# Make actual API request to OpenRouter | |
import os | |
import requests | |
# Get API key from environment | |
api_key = os.environ.get("OPENROUTER_API_KEY") | |
if not api_key: | |
response = "Please set your OPENROUTER_API_KEY in the Space settings to use the chat support." | |
chat_history.append([message, response]) | |
return "", chat_history | |
# Get grounding context from URLs using cached approach | |
grounding_urls = [url1, url2, url3, url4] | |
grounding_context = get_cached_grounding_context(grounding_urls) | |
# Build enhanced system prompt with grounding context | |
base_system_prompt = """You are an expert assistant specializing in Gradio configurations for HuggingFace Spaces. You have deep knowledge of: | |
- Gradio interface components and layouts | |
- HuggingFace Spaces configuration (YAML frontmatter, secrets, environment variables) | |
- Deployment best practices for Gradio apps on HuggingFace | |
- Space settings, SDK versions, and hardware requirements | |
- Troubleshooting common Gradio and HuggingFace Spaces issues | |
- Integration with various APIs and models through Gradio interfaces | |
Provide specific, technical guidance focused on Gradio implementation details and HuggingFace Spaces deployment. Include code examples when relevant. Keep responses concise and actionable.""" | |
enhanced_system_prompt = base_system_prompt + grounding_context | |
# Build conversation history for API | |
messages = [{ | |
"role": "system", | |
"content": enhanced_system_prompt | |
}] | |
# Add conversation history - Support both new messages format and legacy tuple format | |
for chat in chat_history: | |
if isinstance(chat, dict): | |
# New format: {"role": "user", "content": "..."} | |
messages.append(chat) | |
elif isinstance(chat, (list, tuple)) and len(chat) >= 2: | |
# Legacy format: ("user msg", "bot msg") | |
user_msg, assistant_msg = chat[0], chat[1] | |
if user_msg: | |
messages.append({"role": "user", "content": user_msg}) | |
if assistant_msg: | |
messages.append({"role": "assistant", "content": assistant_msg}) | |
# Add current message | |
messages.append({"role": "user", "content": message}) | |
try: | |
# Make API request to OpenRouter | |
response = requests.post( | |
url="https://openrouter.ai/api/v1/chat/completions", | |
headers={ | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
}, | |
json={ | |
"model": "google/gemini-2.0-flash-001", | |
"messages": messages, | |
"temperature": 0.7, | |
"max_tokens": 500 | |
} | |
) | |
if response.status_code == 200: | |
assistant_response = response.json()['choices'][0]['message']['content'] | |
else: | |
assistant_response = f"Error: {response.status_code} - {response.text}" | |
except Exception as e: | |
assistant_response = f"Error: {str(e)}" | |
chat_history.append({"role": "user", "content": message}) | |
chat_history.append({"role": "assistant", "content": assistant_response}) | |
return "", chat_history | |
def clear_chat(): | |
return "", [] | |
def clear_url_cache(): | |
"""Clear the URL content cache""" | |
global url_content_cache | |
url_content_cache.clear() | |
return "✅ URL cache cleared. Next request will re-fetch content." | |
def get_cache_status(): | |
"""Get current cache status""" | |
if not url_content_cache: | |
return "🔄 No URLs cached" | |
return f"💾 {len(url_content_cache)} URL set(s) cached" | |
def add_urls(count): | |
"""Show additional URL fields""" | |
if count == 2: | |
return (gr.update(visible=True), gr.update(visible=False), | |
gr.update(value="+ Add URLs"), gr.update(visible=True), 3) | |
elif count == 3: | |
return (gr.update(visible=True), gr.update(visible=True), | |
gr.update(value="Max URLs", interactive=False), gr.update(visible=True), 4) | |
else: | |
return (gr.update(), gr.update(), gr.update(), gr.update(), count) | |
def remove_urls(count): | |
"""Hide URL fields""" | |
if count == 4: | |
return (gr.update(visible=True), gr.update(visible=False, value=""), | |
gr.update(value="+ Add URLs", interactive=True), gr.update(visible=True), 3) | |
elif count == 3: | |
return (gr.update(visible=False, value=""), gr.update(visible=False, value=""), | |
gr.update(value="+ Add URLs", interactive=True), gr.update(visible=False), 2) | |
else: | |
return (gr.update(), gr.update(), gr.update(), gr.update(), count) | |
def add_chat_urls(count): | |
"""Show additional chat URL fields""" | |
if count == 2: | |
return (gr.update(visible=True), gr.update(visible=False), | |
gr.update(value="+ Add URLs"), gr.update(visible=True), 3) | |
elif count == 3: | |
return (gr.update(visible=True), gr.update(visible=True), | |
gr.update(value="Max URLs", interactive=False), gr.update(visible=True), 4) | |
else: | |
return (gr.update(), gr.update(), gr.update(), gr.update(), count) | |
def remove_chat_urls(count): | |
"""Hide chat URL fields""" | |
if count == 4: | |
return (gr.update(visible=True), gr.update(visible=False, value=""), | |
gr.update(value="+ Add URLs", interactive=True), gr.update(visible=True), 3) | |
elif count == 3: | |
return (gr.update(visible=False, value=""), gr.update(visible=False, value=""), | |
gr.update(value="+ Add URLs", interactive=True), gr.update(visible=False), 2) | |
else: | |
return (gr.update(), gr.update(), gr.update(), gr.update(), count) | |
def update_template_fields(choice): | |
"""Update assistant configuration fields based on template choice""" | |
if choice == "Use the research assistant template": | |
return ( | |
gr.update(value="You are a research assistant that provides link-grounded information through Crawl4AI web fetching. Use MLA documentation for parenthetical citations and bibliographic entries."), | |
gr.update(value="This assistant is designed for students and researchers conducting academic inquiry."), | |
gr.update(value="Your main responsibilities include: analyzing academic sources, fact-checking claims with evidence, providing properly cited research summaries, and helping users navigate scholarly information."), | |
gr.update(value="Ground all responses in provided URL contexts and any additional URLs you're instructed to fetch. Never rely on memory for factual claims."), | |
gr.update(value=True) # Enable dynamic URL fetching for research template | |
) | |
else: # Custom assistant from scratch | |
return ( | |
gr.update(value=""), | |
gr.update(value=""), | |
gr.update(value=""), | |
gr.update(value=""), | |
gr.update(value=False) # Disable dynamic URL fetching for custom template | |
) | |
# Create Gradio interface with proper tab structure | |
with gr.Blocks(title="Chat U/I Helper") as demo: | |
with gr.Tabs(): | |
with gr.Tab("Spaces Configuration"): | |
gr.Markdown("# Spaces Configuration") | |
gr.Markdown("Convert custom assistants from HuggingChat into chat interfaces with HuggingFace Spaces. Configure and download everything needed to deploy a simple HF space using Gradio.") | |
with gr.Column(): | |
name = gr.Textbox( | |
label="Space Title", | |
placeholder="My Course Helper", | |
value="My Custom Space" | |
) | |
description = gr.Textbox( | |
label="Space Description", | |
placeholder="A customizable AI chat interface for...", | |
lines=2, | |
value="An AI research assistant tailored for academic inquiry and scholarly dialogue" | |
) | |
model = gr.Dropdown( | |
label="Model", | |
choices=MODELS, | |
value=MODELS[0], | |
info="Choose based on the context and purposes of your space" | |
) | |
api_key_var = gr.Textbox( | |
label="API Key Variable Name", | |
value="OPENROUTER_API_KEY", | |
info="Name for the secret in HuggingFace Space settings" | |
) | |
access_code = gr.Textbox( | |
label="Access Code (Optional)", | |
placeholder="Leave empty for public access, or enter code for student access", | |
info="If set, students must enter this code to access the chatbot", | |
type="password" | |
) | |
with gr.Accordion("Assistant Configuration", open=True): | |
gr.Markdown("### Configure your assistant's behavior and capabilities") | |
template_choice = gr.Radio( | |
label="How would you like to get started?", | |
choices=[ | |
"Use the research assistant template", | |
"Create a custom assistant from scratch" | |
], | |
value="Use the research assistant template", | |
info="Choose a starting point for your assistant configuration" | |
) | |
role_purpose = gr.Textbox( | |
label="Role and Purpose", | |
placeholder="You are a research assistant that...", | |
lines=2, | |
value="You are a research assistant that provides link-grounded information through Crawl4AI web fetching. Use MLA documentation for parenthetical citations and bibliographic entries.", | |
info="Define what the assistant is and its primary function" | |
) | |
intended_audience = gr.Textbox( | |
label="Intended Audience", | |
placeholder="This assistant is designed for undergraduate students...", | |
lines=2, | |
value="This assistant is designed for students and researchers conducting academic inquiry.", | |
info="Specify who will be using this assistant and their context" | |
) | |
key_tasks = gr.Textbox( | |
label="Key Tasks", | |
placeholder="Your main responsibilities include...", | |
lines=3, | |
value="Your main responsibilities include: analyzing academic sources, fact-checking claims with evidence, providing properly cited research summaries, and helping users navigate scholarly information.", | |
info="List the specific tasks and capabilities the assistant should focus on" | |
) | |
additional_context = gr.Textbox( | |
label="Additional Context", | |
placeholder="Remember to always...", | |
lines=2, | |
value="Ground all responses in provided URL contexts and any additional URLs you're instructed to fetch. Never rely on memory for factual claims.", | |
info="Any additional instructions, constraints, or behavioral guidelines" | |
) | |
gr.Markdown("### Tool Settings") | |
enable_dynamic_urls = gr.Checkbox( | |
label="Enable Dynamic URL Fetching", | |
value=False, | |
info="Allow the assistant to fetch additional URLs mentioned in conversations (uses Crawl4AI)" | |
) | |
enable_vector_rag = gr.Checkbox( | |
label="Enable Document RAG", | |
value=False, | |
info="Upload documents for context-aware responses (PDF, DOCX, TXT, MD)", | |
visible=HAS_RAG | |
) | |
with gr.Column(visible=False) as rag_section: | |
gr.Markdown("### Document Upload") | |
file_upload = gr.File( | |
label="Upload Documents", | |
file_types=[".pdf", ".docx", ".txt", ".md"], | |
file_count="multiple", | |
type="filepath" | |
) | |
process_btn = gr.Button("Process Documents", variant="secondary") | |
rag_status = gr.Markdown() | |
# State to store RAG tool | |
rag_tool_state = gr.State(None) | |
examples_text = gr.Textbox( | |
label="Example Prompts (one per line)", | |
placeholder="Can you analyze this research paper: https://example.com/paper.pdf\nWhat are the latest findings on climate change adaptation?\nHelp me fact-check claims about renewable energy efficiency", | |
lines=3, | |
info="These will appear as clickable examples in the chat interface" | |
) | |
with gr.Accordion("URL Grounding (Optional)", open=False): | |
gr.Markdown("Add URLs to provide context. Content will be fetched and added to the system prompt.") | |
# Initial URL fields | |
url1 = gr.Textbox( | |
label="URL 1", | |
placeholder="https://example.com/page1", | |
info="First URL for context grounding" | |
) | |
url2 = gr.Textbox( | |
label="URL 2", | |
placeholder="https://example.com/page2", | |
info="Second URL for context grounding" | |
) | |
# Additional URL fields (initially hidden) | |
url3 = gr.Textbox( | |
label="URL 3", | |
placeholder="https://example.com/page3", | |
info="Third URL for context grounding", | |
visible=False | |
) | |
url4 = gr.Textbox( | |
label="URL 4", | |
placeholder="https://example.com/page4", | |
info="Fourth URL for context grounding", | |
visible=False | |
) | |
# URL management buttons | |
with gr.Row(): | |
add_url_btn = gr.Button("+ Add URLs", size="sm") | |
remove_url_btn = gr.Button("- Remove URLs", size="sm", visible=False) | |
url_count = gr.State(2) # Track number of visible URLs | |
with gr.Row(): | |
temperature = gr.Slider( | |
label="Temperature", | |
minimum=0, | |
maximum=2, | |
value=0.7, | |
step=0.1, | |
info="Higher = more creative, Lower = more focused" | |
) | |
max_tokens = gr.Slider( | |
label="Max Response Tokens", | |
minimum=50, | |
maximum=4096, | |
value=500, | |
step=50 | |
) | |
generate_btn = gr.Button("Generate Deployment Package", variant="primary") | |
status = gr.Markdown(visible=False) | |
download_file = gr.File(label="Download your zip package", visible=False) | |
# Connect the template choice radio button | |
template_choice.change( | |
update_template_fields, | |
inputs=[template_choice], | |
outputs=[role_purpose, intended_audience, key_tasks, additional_context, enable_dynamic_urls] | |
) | |
# Connect the URL management buttons | |
add_url_btn.click( | |
add_urls, | |
inputs=[url_count], | |
outputs=[url3, url4, add_url_btn, remove_url_btn, url_count] | |
) | |
remove_url_btn.click( | |
remove_urls, | |
inputs=[url_count], | |
outputs=[url3, url4, add_url_btn, remove_url_btn, url_count] | |
) | |
# Connect RAG functionality | |
enable_vector_rag.change( | |
toggle_rag_section, | |
inputs=[enable_vector_rag], | |
outputs=[rag_section] | |
) | |
process_btn.click( | |
process_documents, | |
inputs=[file_upload, rag_tool_state], | |
outputs=[rag_status, rag_tool_state] | |
) | |
# Connect the generate button | |
generate_btn.click( | |
on_generate, | |
inputs=[name, description, role_purpose, intended_audience, key_tasks, additional_context, model, api_key_var, temperature, max_tokens, examples_text, access_code, enable_dynamic_urls, url1, url2, url3, url4, enable_vector_rag, rag_tool_state], | |
outputs=[status, download_file] | |
) | |
with gr.Tab("Chat Support"): | |
gr.Markdown("# Chat Support") | |
gr.Markdown("Get personalized guidance on configuring chat assistants as HuggingFace Spaces for educational & research purposes.") | |
# Meta chat interface | |
with gr.Column(): | |
chatbot = gr.Chatbot( | |
value=[], | |
label="Chat Support Assistant", | |
height=400 | |
) | |
msg = gr.Textbox( | |
label="Ask about configuring chat UIs for courses, research, or custom HuggingFace Spaces", | |
placeholder="How can I configure a chat UI for my senior seminar?", | |
lines=2 | |
) | |
with gr.Accordion("URL Grounding (Optional)", open=False): | |
gr.Markdown("Add URLs to provide additional context for more informed responses") | |
chat_url1 = gr.Textbox( | |
label="URL 1", | |
value="https://huggingface.co/docs/hub/en/spaces-overview", | |
info="HuggingFace Spaces Overview" | |
) | |
chat_url2 = gr.Textbox( | |
label="URL 2", | |
value="", | |
placeholder="https://example.com/page2", | |
info="Additional context URL" | |
) | |
# Additional URL fields for chat (initially hidden) | |
chat_url3 = gr.Textbox( | |
label="URL 3", | |
placeholder="https://example.com/page3", | |
info="Additional context URL", | |
visible=False | |
) | |
chat_url4 = gr.Textbox( | |
label="URL 4", | |
placeholder="https://example.com/page4", | |
info="Additional context URL", | |
visible=False | |
) | |
# Chat URL management buttons | |
with gr.Row(): | |
add_chat_url_btn = gr.Button("+ Add URLs", size="sm") | |
remove_chat_url_btn = gr.Button("- Remove URLs", size="sm", visible=False) | |
chat_url_count = gr.State(2) # Track number of visible chat URLs | |
# Cache controls | |
with gr.Row(): | |
cache_status = gr.Markdown("🔄 No URLs cached") | |
clear_cache_btn = gr.Button("Clear URL Cache", size="sm") | |
with gr.Row(): | |
submit = gr.Button("Send", variant="primary") | |
clear = gr.Button("Clear") | |
gr.Examples( | |
examples=[ | |
"How do I set up a course assistant?", | |
"Which model should I use?", | |
"What's a good system prompt?", | |
"Why Gradio? What is it?", | |
"How do I customize the chat interface?", | |
"Can you help me troubleshoot?", | |
], | |
inputs=msg | |
) | |
# Connect the chat URL management buttons | |
add_chat_url_btn.click( | |
add_chat_urls, | |
inputs=[chat_url_count], | |
outputs=[chat_url3, chat_url4, add_chat_url_btn, remove_chat_url_btn, chat_url_count] | |
) | |
remove_chat_url_btn.click( | |
remove_chat_urls, | |
inputs=[chat_url_count], | |
outputs=[chat_url3, chat_url4, add_chat_url_btn, remove_chat_url_btn, chat_url_count] | |
) | |
# Connect cache controls | |
clear_cache_btn.click(clear_url_cache, outputs=[cache_status]) | |
# Connect the chat functionality | |
submit.click(respond_with_cache_update, [msg, chatbot, chat_url1, chat_url2, chat_url3, chat_url4], [msg, chatbot, cache_status]) | |
msg.submit(respond_with_cache_update, [msg, chatbot, chat_url1, chat_url2, chat_url3, chat_url4], [msg, chatbot, cache_status]) | |
clear.click(clear_chat, outputs=[msg, chatbot]) | |
if __name__ == "__main__": | |
demo.launch(share=True) |