Spaces:
Sleeping
Sleeping
# algoforge_prime/app.py | |
import gradio as gr | |
import os | |
import time # For progress updates and timing | |
# --- Core Logic Imports --- | |
# Initialize clients first to ensure API keys are loaded before other modules use them. | |
from core.llm_clients import initialize_all_clients, is_gemini_api_configured, is_hf_api_configured # Use getters | |
initialize_all_clients() # CRITICAL: Call initialization first | |
# Now get the status AFTER initialization | |
GEMINI_API_READY = is_gemini_api_configured() | |
HF_API_READY = is_hf_api_configured() | |
from core.generation_engine import generate_initial_solutions | |
from core.evaluation_engine import evaluate_solution_candidate, EvaluationResultOutput # Using the renamed class | |
from core.evolution_engine import evolve_solution | |
from prompts.system_prompts import get_system_prompt | |
from prompts.prompt_templates import format_code_test_analysis_user_prompt | |
from core.safe_executor import execute_python_code_with_tests, ExecutionResult # For re-evaluating evolved code | |
# --- Application Configuration (Models, Defaults) --- | |
AVAILABLE_MODELS_CONFIG = {} | |
UI_DEFAULT_MODEL_KEY = None | |
GEMINI_1_5_PRO_LATEST_ID = "gemini-1.5-pro-latest" | |
GEMINI_1_5_FLASH_LATEST_ID = "gemini-1.5-flash-latest" | |
if GEMINI_API_READY: | |
AVAILABLE_MODELS_CONFIG.update({ | |
f"Google Gemini 1.5 Pro (API - Recommended)": {"id": GEMINI_1_5_PRO_LATEST_ID, "type": "google_gemini"}, | |
f"Google Gemini 1.5 Flash (API - Fast)": {"id": GEMINI_1_5_FLASH_LATEST_ID, "type": "google_gemini"}, | |
"Google Gemini 1.0 Pro (API - Legacy)": {"id": "gemini-1.0-pro-latest", "type": "google_gemini"}, | |
}) | |
UI_DEFAULT_MODEL_KEY = f"Google Gemini 1.5 Pro (API - Recommended)" | |
if UI_DEFAULT_MODEL_KEY not in AVAILABLE_MODELS_CONFIG: | |
UI_DEFAULT_MODEL_KEY = f"Google Gemini 1.5 Flash (API - Fast)" | |
print(f"INFO: app.py - Gemini models populated. Default set to: {UI_DEFAULT_MODEL_KEY}") | |
else: | |
print("WARNING: app.py - Gemini API not configured; Gemini models will be unavailable.") | |
if HF_API_READY: | |
AVAILABLE_MODELS_CONFIG.update({ | |
"Google Gemma 2B (HF - Quick Test)": {"id": "google/gemma-2b-it", "type": "hf"}, | |
"Mistral 7B Instruct (HF)": {"id": "mistralai/Mistral-7B-Instruct-v0.2", "type": "hf"}, | |
"CodeLlama 7B Instruct (HF)": {"id": "codellama/CodeLlama-7b-Instruct-hf", "type": "hf"}, | |
}) | |
if not UI_DEFAULT_MODEL_KEY: | |
UI_DEFAULT_MODEL_KEY = "Google Gemma 2B (HF - Quick Test)" | |
print("INFO: app.py - HF models populated; default set as Gemini was not available.") | |
else: | |
print("INFO: app.py - HF models also populated as alternatives.") | |
else: | |
print("WARNING: app.py - Hugging Face API not configured; HF models will be unavailable.") | |
if not AVAILABLE_MODELS_CONFIG: | |
print("CRITICAL APP ERROR: No models could be configured. Check API keys and restart Space.") | |
AVAILABLE_MODELS_CONFIG["No Models Available (Check API Keys & Restart)"] = {"id": "dummy_error", "type": "none"} | |
UI_DEFAULT_MODEL_KEY = "No Models Available (Check API Keys & Restart)" | |
elif not UI_DEFAULT_MODEL_KEY and AVAILABLE_MODELS_CONFIG: | |
UI_DEFAULT_MODEL_KEY = list(AVAILABLE_MODELS_CONFIG.keys())[0] | |
print(f"WARNING: app.py - UI_DEFAULT_MODEL_KEY was not set by primary logic, falling back to: {UI_DEFAULT_MODEL_KEY}") | |
# --- Main Orchestration Logic for Gradio --- | |
def run_algoforge_simulation_orchestrator( | |
problem_type_selected: str, | |
problem_description_text: str, | |
initial_hints_text: str, | |
user_provided_tests_code: str, | |
num_initial_solutions_to_gen: int, | |
selected_model_ui_key: str, | |
genesis_temp: float, genesis_max_tokens: int, | |
critique_temp: float, critique_max_tokens: int, | |
evolution_temp: float, evolution_max_tokens: int, | |
progress=gr.Progress(track_tqdm=True) | |
): | |
# CORRECTED: start_time defined at the beginning of the function | |
start_time = time.time() | |
progress(0, desc="Initializing AlgoForge Prime™...") | |
log_entries = [f"**AlgoForge Prime™ Cycle Starting at {time.strftime('%Y-%m-%d %H:%M:%S')}**"] | |
if not problem_description_text.strip(): | |
error_msg = "CRITICAL INPUT ERROR: Problem Description is mandatory. Please describe the problem." | |
log_entries.append(error_msg) | |
return error_msg, "", "", "\n".join(log_entries), "" | |
current_model_config = AVAILABLE_MODELS_CONFIG.get(selected_model_ui_key) | |
if not current_model_config or current_model_config["type"] == "none": | |
error_msg = f"CRITICAL CONFIG ERROR: No valid LLM selected ('{selected_model_ui_key}'). API keys might be missing or failed initialization. Please check Space Secrets & restart." | |
log_entries.append(error_msg) | |
return error_msg, "", "", "\n".join(log_entries), "" | |
log_entries.append(f"Selected Model: {selected_model_ui_key} (Type: {current_model_config['type']}, ID: {current_model_config['id']})") | |
log_entries.append(f"Problem Type: {problem_type_selected}") | |
log_entries.append(f"User Unit Tests Provided: {'Yes' if user_provided_tests_code.strip() else 'No'}") | |
llm_config_genesis = {"type": current_model_config["type"], "model_id": current_model_config["id"], "temp": genesis_temp, "max_tokens": genesis_max_tokens} | |
llm_config_critique = {"type": current_model_config["type"], "model_id": current_model_config["id"], "temp": critique_temp, "max_tokens": critique_max_tokens} | |
llm_config_evolution = {"type": current_model_config["type"], "model_id": current_model_config["id"], "temp": evolution_temp, "max_tokens": evolution_max_tokens} | |
# --- STAGE 1: GENESIS --- | |
progress(0.05, desc="Stage 1: Genesis Engine - Generating Solutions...") | |
log_entries.append("\n**------ STAGE 1: GENESIS ENGINE ------**") | |
initial_raw_solutions = generate_initial_solutions( | |
problem_description_text, initial_hints_text, problem_type_selected, | |
num_initial_solutions_to_gen, llm_config_genesis | |
) | |
log_entries.append(f"Genesis Engine produced {len(initial_raw_solutions)} raw solution candidate(s).") | |
for i, sol_text in enumerate(initial_raw_solutions): | |
log_entries.append(f" Candidate {i+1} (Raw Snippet): {str(sol_text)[:120]}...") | |
# --- STAGE 2: CRITIQUE & AUTOMATED EVALUATION --- | |
progress(0.25, desc="Stage 2: Critique Crucible - Evaluating Candidates...") | |
log_entries.append("\n**------ STAGE 2: CRITIQUE CRUCIBLE & AUTOMATED EVALUATION ------**") | |
evaluated_candidates_list = [] | |
for i, candidate_solution_text in enumerate(initial_raw_solutions): | |
current_progress = 0.25 + ( (i + 1) / num_initial_solutions_to_gen ) * 0.4 | |
progress(current_progress, desc=f"Evaluating Candidate {i+1} of {num_initial_solutions_to_gen}...") | |
log_entries.append(f"\n--- Evaluating Candidate {i+1} ---") | |
evaluation_output_obj = evaluate_solution_candidate( | |
str(candidate_solution_text), | |
problem_description_text, problem_type_selected, | |
user_provided_tests_code, llm_config_critique | |
) | |
evaluated_candidates_list.append({ | |
"id": i + 1, | |
"solution_text": str(candidate_solution_text), | |
"evaluation_obj": evaluation_output_obj | |
}) | |
log_entries.append(f" Combined Score: {evaluation_output_obj.combined_score}/10") | |
if evaluation_output_obj.execution_details: | |
log_entries.append(f" Test Results: {evaluation_output_obj.execution_details.passed_tests}/{evaluation_output_obj.execution_details.total_tests} passed.") | |
if evaluation_output_obj.execution_details.error: log_entries.append(f" Execution Error: {evaluation_output_obj.execution_details.error}") | |
log_entries.append(f" LLM Critique (Snippet): {str(evaluation_output_obj.llm_critique_text)[:150]}...") | |
initial_solutions_display_markdown = [] | |
for data in evaluated_candidates_list: | |
initial_solutions_display_markdown.append( | |
f"**Candidate {data['id']}:**\n```python\n{data['solution_text']}\n```\n\n" | |
f"**Evaluation Verdict (Combined Score: {data['evaluation_obj'].combined_score}/10):**\n{data['evaluation_obj'].get_display_critique()}\n---" | |
) | |
# --- STAGE 3: SELECTION OF CHAMPION --- | |
progress(0.7, desc="Stage 3: Selecting Champion Candidate...") | |
log_entries.append("\n**------ STAGE 3: CHAMPION SELECTION ------**") | |
potentially_viable_candidates = [ | |
cand for cand in evaluated_candidates_list | |
if cand["evaluation_obj"] and cand["evaluation_obj"].combined_score > 0 and \ | |
cand["solution_text"] and not str(cand["solution_text"]).startswith("ERROR") | |
] | |
if not potentially_viable_candidates: | |
final_error_msg = "No viable candidate solutions found after generation and evaluation. All attempts may have failed or scored too low." | |
log_entries.append(f" CRITICAL: {final_error_msg}") | |
return "\n\n".join(initial_solutions_display_markdown), final_error_msg, "", "\n".join(log_entries), "" | |
potentially_viable_candidates.sort(key=lambda x: x["evaluation_obj"].combined_score, reverse=True) | |
champion_candidate_data = potentially_viable_candidates[0] | |
log_entries.append(f"Champion Selected: Candidate {champion_candidate_data['id']} " | |
f"(Solution Snippet: {str(champion_candidate_data['solution_text'])[:60]}...) " | |
f"with evaluation score {champion_candidate_data['evaluation_obj'].combined_score}/10.") | |
champion_display_markdown = ( | |
f"**Champion Candidate ID: {champion_candidate_data['id']} " | |
f"(Original Combined Score: {champion_candidate_data['evaluation_obj'].combined_score}/10):**\n" | |
f"```python\n{champion_candidate_data['solution_text']}\n```\n\n" | |
f"**Original Comprehensive Evaluation for this Champion:**\n{champion_candidate_data['evaluation_obj'].get_display_critique()}" | |
) | |
# --- STAGE 4: EVOLUTIONARY FORGE --- | |
progress(0.75, desc="Stage 4: Evolutionary Forge - Refining Champion...") | |
log_entries.append("\n**------ STAGE 4: EVOLUTIONARY FORGE ------**") | |
evolved_solution_code = evolve_solution( | |
str(champion_candidate_data["solution_text"]), | |
champion_candidate_data["evaluation_obj"], | |
problem_description_text, | |
problem_type_selected, | |
llm_config_evolution | |
) | |
log_entries.append(f"Raw Evolved Solution Text (Snippet): {str(evolved_solution_code)[:150]}...") | |
evolved_solution_display_markdown = "" | |
ai_test_analysis_markdown = "" | |
if str(evolved_solution_code).startswith("ERROR"): | |
evolved_solution_display_markdown = f"**Evolution Stage Failed:**\n{evolved_solution_code}" | |
else: | |
evolved_solution_display_markdown = f"**✨ AlgoForge Omega™ Evolved Artifact ✨:**\n```python\n{evolved_solution_code}\n```" | |
if "python" in problem_type_selected.lower() and user_provided_tests_code.strip(): | |
progress(0.9, desc="Post-Evolution: Re-testing Evolved Code...") | |
log_entries.append("\n--- Post-Evolution Test of Evolved Code ---") | |
evolved_code_exec_result = execute_python_code_with_tests( | |
str(evolved_solution_code), user_provided_tests_code, timeout_seconds=10 | |
) | |
evolved_solution_display_markdown += ( | |
f"\n\n**Post-Evolution Automated Test Results (Simulated):**\n" | |
f" Tests Attempted: {evolved_code_exec_result.total_tests}\n" | |
f" Tests Passed: {evolved_code_exec_result.passed_tests}\n" | |
f" Execution Time: {evolved_code_exec_result.execution_time:.4f}s\n" | |
) | |
if evolved_code_exec_result.error: | |
evolved_solution_display_markdown += f" Execution Error/Output: {evolved_code_exec_result.error}\n" | |
elif evolved_code_exec_result.output: | |
evolved_solution_display_markdown += f" Execution Output (stdout):\n```\n{evolved_code_exec_result.output[:300]}\n```\n" | |
log_entries.append(f" Evolved Code Test Results: {evolved_code_exec_result}") | |
if evolved_code_exec_result.total_tests > 0 : | |
progress(0.95, desc="Post-Evolution: AI Analyzing Test Results...") | |
log_entries.append("\n--- AI Analysis of Evolved Code's Test Results ---") | |
analysis_exec_summary = evolved_code_exec_result.error if evolved_code_exec_result.error else (evolved_code_exec_result.output if evolved_code_exec_result.output else "Tests completed.") | |
analysis_user_prompt = format_code_test_analysis_user_prompt(str(evolved_solution_code), user_provided_tests_code, f"Passed: {evolved_code_exec_result.passed_tests}/{evolved_code_exec_result.total_tests}. Detail: {analysis_exec_summary}") | |
analysis_system_prompt = get_system_prompt("code_execution_explainer") | |
llm_analysis_config = {"type": current_model_config["type"], "model_id": current_model_config["id"], | |
"temp": 0.3, "max_tokens": critique_max_tokens + 150} | |
from core.llm_clients import call_huggingface_api, call_gemini_api | |
explanation_response_obj = None | |
if llm_analysis_config["type"] == "hf": explanation_response_obj = call_huggingface_api(analysis_user_prompt, llm_analysis_config["model_id"], llm_analysis_config["temp"], llm_analysis_config["max_tokens"], analysis_system_prompt) | |
elif llm_analysis_config["type"] == "google_gemini": explanation_response_obj = call_gemini_api(analysis_user_prompt, llm_analysis_config["model_id"], llm_analysis_config["temp"], llm_analysis_config["max_tokens"], analysis_system_prompt) | |
if explanation_response_obj and explanation_response_obj.success: | |
ai_test_analysis_markdown = f"**AI Analysis of Evolved Code's Test Performance:**\n{explanation_response_obj.text}" | |
log_entries.append(f" AI Test Analysis (Snippet): {str(explanation_response_obj.text)[:100]}...") | |
elif explanation_response_obj: | |
ai_test_analysis_markdown = f"**AI Analysis of Test Performance Failed:**\n{explanation_response_obj.error}" | |
log_entries.append(f" AI Test Analysis Error: {explanation_response_obj.error}") | |
# CORRECTED: total_time definition using the start_time from the function scope | |
total_time = time.time() - start_time | |
log_entries.append(f"\n**AlgoForge Omega™ Cycle Complete. Total time: {total_time:.2f} seconds.**") | |
progress(1.0, desc="Cycle Complete!") | |
return "\n\n".join(initial_solutions_display_markdown), champion_display_markdown, evolved_solution_display_markdown, "\n".join(log_entries), ai_test_analysis_markdown | |
# --- Gradio UI Definition --- | |
intro_markdown = """ | |
# ✨ AlgoForge Omega™ ✨: Conceptual Demo with (Simulated) Execution | |
This version demonstrates a conceptual workflow for AI-assisted algorithm discovery and refinement, | |
featuring **(simulated) execution of generated Python code against user-provided unit tests**. | |
**API Keys Required in Space Secrets:** | |
- `GOOGLE_API_KEY` (Primary): For Google Gemini API models. Ensure the "Generative Language API" (or similar) is enabled for your project. | |
- `HF_TOKEN` (Secondary): For Hugging Face hosted models. | |
""" | |
ui_token_status_md = "" | |
if not GEMINI_API_READY and not HF_API_READY: | |
ui_token_status_md = "<p style='color:red;'>⚠️ **CRITICAL: NEITHER API IS CONFIGURED. APP NON-FUNCTIONAL.**</p>" | |
else: | |
if GEMINI_API_READY: ui_token_status_md += "<p style='color:green;'>✅ Google Gemini API Configured.</p>" | |
else: ui_token_status_md += "<p style='color:orange;'>⚠️ **GOOGLE_API_KEY missing/failed.** Gemini models disabled.</p>" | |
if HF_API_READY: ui_token_status_md += "<p style='color:green;'>✅ Hugging Face API Configured.</p>" | |
else: ui_token_status_md += "<p style='color:orange;'>⚠️ **HF_TOKEN missing/failed.** HF models disabled.</p>" | |
with gr.Blocks(theme=gr.themes.Soft(primary_hue="purple", secondary_hue="pink"), title="AlgoForge Omega™ Demo") as app_demo: | |
gr.Markdown(intro_markdown) | |
gr.HTML(ui_token_status_md) | |
usable_models_available = any( | |
AVAILABLE_MODELS_CONFIG.get(key, {}).get("type") != "none" | |
for key in AVAILABLE_MODELS_CONFIG | |
) | |
if not usable_models_available: | |
gr.Markdown("<h2 style='color:red;'>No LLM models are available for use. Check API keys and restart.</h2>") | |
else: | |
with gr.Row(): | |
with gr.Column(scale=2): | |
gr.Markdown("## 💡 1. Define the Challenge") | |
problem_type_dropdown = gr.Dropdown(choices=["Python Algorithm with Tests", "Python Algorithm (Critique Only)", "General Algorithm Idea"], label="Problem Type", value="Python Algorithm with Tests") | |
problem_description_textbox = gr.Textbox(lines=5, label="Problem Description") | |
initial_hints_textbox = gr.Textbox(lines=3, label="Initial Hints (Optional)") | |
user_tests_textbox = gr.Textbox(lines=6, label="Python Unit Tests (Optional, one `assert` per line)", placeholder="assert my_func(1) == 1") | |
gr.Markdown("## ⚙️ 2. Configure The Forge") | |
model_selection_dropdown = gr.Dropdown(choices=list(AVAILABLE_MODELS_CONFIG.keys()), value=UI_DEFAULT_MODEL_KEY, label="LLM Core Model") | |
num_initial_solutions_slider = gr.Slider(1, 3, value=2, step=1, label="# Initial Solutions") | |
with gr.Accordion("Advanced LLM Parameters", open=False): | |
genesis_temp_slider = gr.Slider(0.0, 1.0, value=0.7, step=0.05, label="Genesis Temp") | |
genesis_max_tokens_slider = gr.Slider(256, 4096, value=1024, step=128, label="Genesis Max Tokens") | |
critique_temp_slider = gr.Slider(0.0, 1.0, value=0.4, step=0.05, label="Critique Temp") | |
critique_max_tokens_slider = gr.Slider(150, 2048, value=512, step=64, label="Critique Max Tokens") | |
evolution_temp_slider = gr.Slider(0.0, 1.0, value=0.75, step=0.05, label="Evolution Temp") | |
evolution_max_tokens_slider = gr.Slider(256, 4096, value=1536, step=128, label="Evolution Max Tokens") | |
engage_button = gr.Button("🚀 ENGAGE ALGOFORGE OMEGA™ 🚀", variant="primary") | |
with gr.Column(scale=3): | |
gr.Markdown("## 🔥 3. The Forge's Output") | |
with gr.Tabs(): | |
with gr.TabItem("📜 Candidates & Evaluations"): output_initial_solutions_markdown = gr.Markdown() | |
with gr.TabItem("🏆 Champion"): output_champion_markdown = gr.Markdown() | |
with gr.TabItem("🌟 Evolved & Tested"): | |
output_evolved_markdown = gr.Markdown() | |
output_ai_test_analysis_markdown = gr.Markdown() | |
with gr.TabItem("🛠️ Log"): output_interaction_log_markdown = gr.Markdown() | |
engage_button.click( | |
fn=run_algoforge_simulation_orchestrator, | |
inputs=[ problem_type_dropdown, problem_description_textbox, initial_hints_textbox, user_tests_textbox, num_initial_solutions_slider, model_selection_dropdown, genesis_temp_slider, genesis_max_tokens_slider, critique_temp_slider, critique_max_tokens_slider, evolution_temp_slider, evolution_max_tokens_slider ], | |
outputs=[ output_initial_solutions_markdown, output_champion_markdown, output_evolved_markdown, output_interaction_log_markdown, output_ai_test_analysis_markdown ] | |
) | |
gr.Markdown("---") | |
gr.Markdown("**Disclaimer:** Conceptual Omega Demo. (Simulated) unit testing. **NEVER run untrusted LLM code without robust sandboxing.**") | |
# --- Entry Point for Running the Gradio App --- | |
if __name__ == "__main__": | |
print("="*80) | |
print("AlgoForge Omega™ Conceptual Demo - Launching...") | |
print(f" Gemini API Ready: {GEMINI_API_READY}") | |
print(f" HF API Ready: {HF_API_READY}") | |
if not GEMINI_API_READY and not HF_API_READY: | |
print(" CRITICAL WARNING: No API keys seem to be configured correctly. The application will likely be non-functional.") | |
print(f" UI Default Model Key: {UI_DEFAULT_MODEL_KEY}") | |
print(f" Available models for UI: {list(AVAILABLE_MODELS_CONFIG.keys())}") | |
print("="*80) | |
app_demo.launch(debug=True, server_name="0.0.0.0") |