import os import gradio as gr import logging import uuid import pathlib from dotenv import load_dotenv from research_engine import ResearchEngine import time import traceback # Load environment variables load_dotenv() # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # Initialize the research engine with verbose=False for production research_engine = None # Dict to store session-specific research engines session_engines = {} def validate_api_keys(custom_openai_key=None): """Checks if required API keys are set""" missing_keys = [] if not os.getenv("BRAVE_API_KEY"): missing_keys.append("BRAVE_API_KEY") # Check for OpenAI key in either the environment or the custom key provided if not custom_openai_key and not os.getenv("OPENAI_API_KEY"): missing_keys.append("OPENAI_API_KEY") return missing_keys def get_engine_for_session(session_id, openai_api_key=None): """Get or create a research engine for the specific session with optional custom API key""" if session_id not in session_engines: logger.info(f"Creating new research engine for session {session_id}") # Set temporary API key if provided by user original_key = None if openai_api_key: logger.info("Using custom OpenAI API key provided by user") original_key = os.environ.get("OPENAI_API_KEY") os.environ["OPENAI_API_KEY"] = openai_api_key try: session_engines[session_id] = ResearchEngine(verbose=False) finally: # Restore original key if we changed it if original_key is not None: os.environ["OPENAI_API_KEY"] = original_key elif openai_api_key: # If there was no original key, remove the temporary one os.environ.pop("OPENAI_API_KEY", None) return session_engines[session_id] def cleanup_session(session_id): """Remove a session when it's no longer needed""" if session_id in session_engines: logger.info(f"Cleaning up session {session_id}") del session_engines[session_id] def process_message(message, history, session_id, openai_api_key=None): """ Process user message and update chat history. Args: message: User's message history: Chat history list session_id: Unique identifier for the session openai_api_key: Optional custom OpenAI API key Returns: Updated history """ # Validate API keys missing_keys = validate_api_keys(openai_api_key) if missing_keys: return history + [ {"role": "user", "content": message}, {"role": "assistant", "content": f"Error: Missing required API keys: {', '.join(missing_keys)}. Please set these in your .env file or input your OpenAI API key below."} ] # Add user message to history history.append({"role": "user", "content": message}) try: print(f"Starting research for: {message}") start_time = time.time() # Get the appropriate engine for this session, passing the API key if provided engine = get_engine_for_session(session_id, openai_api_key) # Set the API key for this specific request if provided original_key = None if openai_api_key: original_key = os.environ.get("OPENAI_API_KEY") os.environ["OPENAI_API_KEY"] = openai_api_key try: # Start the research process research_task = engine.research(message) finally: # Restore original key if we changed it if original_key is not None: os.environ["OPENAI_API_KEY"] = original_key elif openai_api_key: # If there was no original key, remove the temporary one os.environ.pop("OPENAI_API_KEY", None) # Print the research task output for debugging print(f"Research task result type: {type(research_task)}") print(f"Research task content: {research_task}") # If we get here, step 1 is complete history[-1] = {"role": "user", "content": message} history.append({"role": "assistant", "content": f"Researching... this may take a minute or two...\n\n**Step 1/4:** Refining your query..."}) yield history # We don't actually have real-time progress indication from the engine, # so we'll simulate it with a slight delay between steps time.sleep(1) history[-1] = {"role": "assistant", "content": f"Researching... this may take a minute or two...\n\n**Step 1/4:** Refining your query... ✓\n**Step 2/4:** Searching the web..."} yield history time.sleep(1) history[-1] = {"role": "assistant", "content": f"Researching... this may take a minute or two...\n\n**Step 1/4:** Refining your query... ✓\n**Step 2/4:** Searching the web... ✓\n**Step 3/4:** Analyzing results..."} yield history time.sleep(1) history[-1] = {"role": "assistant", "content": f"Researching... this may take a minute or two...\n\n**Step 1/4:** Refining your query... ✓\n**Step 2/4:** Searching the web... ✓\n**Step 3/4:** Analyzing results... ✓\n**Step 4/4:** Synthesizing information..."} yield history # Get response from research engine response = research_task["result"] end_time = time.time() processing_time = end_time - start_time # Add processing time for transparency response += f"\n\nResearch completed in {processing_time:.2f} seconds." # Update last message with the full response history[-1] = {"role": "assistant", "content": response} yield history except Exception as e: logger.exception("Error processing message") error_traceback = traceback.format_exc() error_message = f"An error occurred: {str(e)}\n\nTraceback: {error_traceback}" history[-1] = {"role": "assistant", "content": error_message} yield history # Define a basic theme with minimal customization - more styling in CSS custom_theme = gr.themes.Soft( primary_hue=gr.themes.colors.indigo, secondary_hue=gr.themes.colors.blue, neutral_hue=gr.themes.colors.slate, ) # Gradio versions have different ways of loading CSS, let's ensure compatibility css_file_path = pathlib.Path("assets/custom.css") if css_file_path.exists(): with open(css_file_path, 'r') as f: css_content = f.read() else: css_content = "" # Fallback empty CSS if file doesn't exist # Add the CSS as a style tag to ensure it works in all Gradio versions css_head = f""" """ # Create the Gradio interface with multiple CSS loading methods for compatibility with gr.Blocks( title="Web Research Agent", theme=custom_theme, css=css_content, head=css_head, # Older versions may use this ) as app: # Create a unique session ID for each user session_id = gr.State(lambda: str(uuid.uuid4())) with gr.Row(elem_classes=["container"]): with gr.Column(): with gr.Row(elem_classes=["app-header"]): gr.Markdown("""