|
import gradio as gr |
|
import os |
|
import requests |
|
import json |
|
import asyncio |
|
from crawl4ai import AsyncWebCrawler |
|
|
|
|
|
SPACE_NAME = "My Custom Space" |
|
SPACE_DESCRIPTION = "An AI research assistant tailored for academic inquiry and scholarly dialogue" |
|
SYSTEM_PROMPT = """You are a research assistant that provides link-grounded information through Crawl4AI web fetching. Use MLA documentation for parenthetical citations and bibliographic entries. This assistant is designed for students and researchers conducting academic inquiry. Your main responsibilities include: analyzing academic sources, fact-checking claims with evidence, providing properly cited research summaries, and helping users navigate scholarly information. Ground all responses in provided URL contexts and any additional URLs you're instructed to fetch. Never rely on memory for factual claims.""" |
|
MODEL = "google/gemma-3-27b-it" |
|
GROUNDING_URLS = [] |
|
|
|
ACCESS_CODE = os.environ.get("SPACE_ACCESS_CODE", "") |
|
ENABLE_DYNAMIC_URLS = True |
|
ENABLE_VECTOR_RAG = False |
|
RAG_DATA = None |
|
|
|
|
|
API_KEY = os.environ.get("OPENROUTER_API_KEY") |
|
|
|
async def fetch_url_content_async(url, crawler): |
|
"""Fetch and extract text content from a URL using Crawl4AI""" |
|
try: |
|
result = await crawler.arun( |
|
url=url, |
|
bypass_cache=True, |
|
word_count_threshold=10, |
|
excluded_tags=['script', 'style', 'nav', 'header', 'footer'], |
|
remove_overlay_elements=True |
|
) |
|
|
|
if result.success: |
|
content = result.markdown or result.cleaned_html or "" |
|
|
|
if len(content) > 4000: |
|
content = content[:4000] + "..." |
|
return content |
|
else: |
|
return f"Error fetching {url}: Failed to retrieve content" |
|
except Exception as e: |
|
return f"Error fetching {url}: {str(e)}" |
|
|
|
def fetch_url_content(url): |
|
"""Synchronous wrapper for URL fetching""" |
|
async def fetch(): |
|
async with AsyncWebCrawler(verbose=False) as crawler: |
|
return await fetch_url_content_async(url, crawler) |
|
|
|
try: |
|
return asyncio.run(fetch()) |
|
except Exception as e: |
|
return f"Error fetching {url}: {str(e)}" |
|
|
|
|
|
_url_content_cache = {} |
|
|
|
def get_grounding_context(): |
|
"""Fetch context from grounding URLs with caching""" |
|
if not GROUNDING_URLS: |
|
return "" |
|
|
|
|
|
cache_key = tuple(sorted([url for url in GROUNDING_URLS if url and url.strip()])) |
|
|
|
|
|
if cache_key in _url_content_cache: |
|
return _url_content_cache[cache_key] |
|
|
|
context_parts = [] |
|
for i, url in enumerate(GROUNDING_URLS, 1): |
|
if url.strip(): |
|
content = fetch_url_content(url.strip()) |
|
context_parts.append(f"Context from URL {i} ({url}):\n{content}") |
|
|
|
if context_parts: |
|
result = "\n\n" + "\n\n".join(context_parts) + "\n\n" |
|
else: |
|
result = "" |
|
|
|
|
|
_url_content_cache[cache_key] = result |
|
return result |
|
|
|
import re |
|
|
|
def extract_urls_from_text(text): |
|
"""Extract URLs from text using regex""" |
|
url_pattern = r'https?://[^\s<>"{}|\^`\[\]"]+' |
|
return re.findall(url_pattern, text) |
|
|
|
|
|
if ENABLE_VECTOR_RAG and RAG_DATA: |
|
try: |
|
import faiss |
|
import numpy as np |
|
import base64 |
|
|
|
class SimpleRAGContext: |
|
def __init__(self, rag_data): |
|
|
|
index_bytes = base64.b64decode(rag_data['index_base64']) |
|
self.index = faiss.deserialize_index(index_bytes) |
|
|
|
|
|
self.chunks = rag_data['chunks'] |
|
self.chunk_ids = rag_data['chunk_ids'] |
|
|
|
def get_context(self, query, max_chunks=3): |
|
"""Get relevant context - simplified version""" |
|
|
|
|
|
return "\n\n[RAG context would be retrieved here based on similarity search]\n\n" |
|
|
|
rag_context_provider = SimpleRAGContext(RAG_DATA) |
|
except Exception as e: |
|
print(f"Failed to initialize RAG: {e}") |
|
rag_context_provider = None |
|
else: |
|
rag_context_provider = None |
|
|
|
def generate_response(message, history): |
|
"""Generate response using OpenRouter API""" |
|
|
|
if not API_KEY: |
|
return "Please set your OPENROUTER_API_KEY in the Space settings." |
|
|
|
|
|
grounding_context = get_grounding_context() |
|
|
|
|
|
if ENABLE_VECTOR_RAG and rag_context_provider: |
|
rag_context = rag_context_provider.get_context(message) |
|
if rag_context: |
|
grounding_context += rag_context |
|
|
|
|
|
if ENABLE_DYNAMIC_URLS: |
|
urls_in_message = extract_urls_from_text(message) |
|
if urls_in_message: |
|
|
|
dynamic_context_parts = [] |
|
for url in urls_in_message[:3]: |
|
content = fetch_url_content(url) |
|
dynamic_context_parts.append(f"\n\nDynamic context from {url}:\n{content}") |
|
if dynamic_context_parts: |
|
grounding_context += "\n".join(dynamic_context_parts) |
|
|
|
|
|
enhanced_system_prompt = SYSTEM_PROMPT + grounding_context |
|
|
|
|
|
messages = [{"role": "system", "content": enhanced_system_prompt}] |
|
|
|
|
|
for chat in history: |
|
if isinstance(chat, dict): |
|
|
|
messages.append(chat) |
|
else: |
|
|
|
user_msg, bot_msg = chat |
|
messages.append({"role": "user", "content": user_msg}) |
|
if bot_msg: |
|
messages.append({"role": "assistant", "content": bot_msg}) |
|
|
|
|
|
messages.append({"role": "user", "content": message}) |
|
|
|
|
|
try: |
|
response = requests.post( |
|
url="https://openrouter.ai/api/v1/chat/completions", |
|
headers={ |
|
"Authorization": f"Bearer {API_KEY}", |
|
"Content-Type": "application/json" |
|
}, |
|
json={ |
|
"model": MODEL, |
|
"messages": messages, |
|
"temperature": 0.7, |
|
"max_tokens": 500 |
|
} |
|
) |
|
|
|
if response.status_code == 200: |
|
return response.json()['choices'][0]['message']['content'] |
|
else: |
|
return f"Error: {response.status_code} - {response.text}" |
|
|
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
access_granted = gr.State(False) |
|
|
|
def verify_access_code(code): |
|
"""Verify the access code""" |
|
if not ACCESS_CODE: |
|
return gr.update(visible=False), gr.update(visible=True), True |
|
|
|
if code == ACCESS_CODE: |
|
return gr.update(visible=False), gr.update(visible=True), True |
|
else: |
|
return gr.update(visible=True, value="❌ Incorrect access code. Please try again."), gr.update(visible=False), False |
|
|
|
def protected_generate_response(message, history): |
|
"""Protected response function that checks access""" |
|
|
|
if ACCESS_CODE and not access_granted.value: |
|
return "Please enter the access code to continue." |
|
return generate_response(message, history) |
|
|
|
|
|
with gr.Blocks(title=SPACE_NAME) as demo: |
|
gr.Markdown(f"# {SPACE_NAME}") |
|
gr.Markdown(SPACE_DESCRIPTION) |
|
|
|
|
|
with gr.Column(visible=bool(ACCESS_CODE)) as access_section: |
|
gr.Markdown("### 🔐 Access Required") |
|
gr.Markdown("Please enter the access code provided by your instructor:") |
|
|
|
access_input = gr.Textbox( |
|
label="Access Code", |
|
placeholder="Enter access code...", |
|
type="password" |
|
) |
|
access_btn = gr.Button("Submit", variant="primary") |
|
access_error = gr.Markdown(visible=False) |
|
|
|
|
|
with gr.Column(visible=not bool(ACCESS_CODE)) as chat_section: |
|
chat_interface = gr.ChatInterface( |
|
fn=protected_generate_response, |
|
title="", |
|
description="", |
|
examples=["Hello! How can you help me?", "Tell me something interesting", "What can you do?"] |
|
) |
|
|
|
|
|
if ACCESS_CODE: |
|
access_btn.click( |
|
verify_access_code, |
|
inputs=[access_input], |
|
outputs=[access_error, chat_section, access_granted] |
|
) |
|
access_input.submit( |
|
verify_access_code, |
|
inputs=[access_input], |
|
outputs=[access_error, chat_section, access_granted] |
|
) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|