Spaces:
Sleeping
Sleeping
import os | |
import time | |
import gradio as gr | |
import requests | |
import json | |
import re | |
import google.generativeai as genai | |
from openai import OpenAI | |
from typing import List, Dict, Optional | |
class CognitiveArchitecture: | |
def __init__(self): | |
self.api_keys = { | |
"GEMINI": os.environ.get("GEMINI_API_KEY"), | |
"MISTRAL": os.environ.get("MISTRAL_API_KEY"), | |
"OPENROUTER": os.environ.get("OPENROUTER_API_KEY"), | |
"AZURE": os.environ.get("AZURE_API_KEY") | |
} | |
self.validate_keys() | |
genai.configure(api_key=self.api_keys["GEMINI"]) | |
self.gemini_model = genai.GenerativeModel("gemini-2.0-pro-exp-02-05") | |
self.gpt4o_client = OpenAI( | |
base_url="https://models.inference.ai.azure.com", | |
api_key=self.api_keys["AZURE"] | |
) | |
self.model_config = { | |
"decomposition": ["gemini-2.0-pro-exp-02-05", "mistral-large-latest"], | |
"analysis": ["gpt-4o"], | |
"critique": ["gpt-4o", "meta-llama/llama-3.3-70b-instruct:free"] | |
} | |
self.headers = { | |
"mistral": {"Authorization": f"Bearer {self.api_keys['MISTRAL']}"}, | |
"openrouter": {"Authorization": f"Bearer {self.api_keys['OPENROUTER']}"} | |
} | |
def validate_keys(self): | |
for service, key in self.api_keys.items(): | |
if not key: | |
raise ValueError(f"Missing API key: {service}") | |
def safe_json_parse(self, text: str) -> dict: | |
"""Robust JSON parsing with multiple fallbacks""" | |
try: | |
return json.loads(text) | |
except json.JSONDecodeError: | |
try: | |
# Try to extract JSON from text | |
json_str = re.search(r'\{.*\}', text, re.DOTALL) | |
if json_str: | |
return json.loads(json_str.group()) | |
# Fallback to key-value parsing | |
return self.parse_kv_text(text) | |
except Exception as e: | |
return {"error": f"JSON parsing failed: {str(e)}", "raw_response": text} | |
def parse_kv_text(self, text: str) -> dict: | |
"""Fallback key-value parser""" | |
parsed = {} | |
current_key = None | |
lines = text.split('\n') | |
for line in lines: | |
if ':' in line: | |
key, value = line.split(':', 1) | |
parsed[key.strip()] = value.strip() | |
current_key = key.strip() | |
elif current_key: | |
parsed[current_key] += " " + line.strip() | |
return parsed | |
def call_model(self, model: str, prompt: str) -> dict: | |
"""Robust model caller with error handling""" | |
try: | |
if "gemini" in model: | |
response = self.gemini_model.generate_content(prompt) | |
return {"success": True, "content": response.text} | |
if "mistral" in model: | |
return self.call_mistral(model, prompt) | |
if "gpt-4o" in model: | |
return self.call_azure_gpt4o(prompt) | |
return self.call_openrouter(model, prompt) | |
except Exception as e: | |
return {"success": False, "error": str(e)} | |
def call_mistral(self, model: str, prompt: str) -> dict: | |
"""Mistral API caller""" | |
try: | |
response = requests.post( | |
"https://api.mistral.ai/v1/chat/completions", | |
headers=self.headers["mistral"], | |
json={ | |
"model": model, | |
"messages": [{"role": "user", "content": prompt}], | |
"temperature": 0.7, | |
"max_tokens": 3000 | |
}, | |
timeout=30 | |
) | |
if response.status_code == 200: | |
return {"success": True, "content": response.json()['choices'][0]['message']['content']} | |
return {"success": False, "error": f"API Error {response.status_code}"} | |
except Exception as e: | |
return {"success": False, "error": str(e)} | |
def call_azure_gpt4o(self, prompt: str) -> dict: | |
"""Azure GPT-4o caller""" | |
try: | |
response = self.gpt4o_client.chat.completions.create( | |
model="gpt-4o", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.7, | |
max_tokens=2000 | |
) | |
return {"success": True, "content": response.choices[0].message.content} | |
except Exception as e: | |
return {"success": False, "error": str(e)} | |
def call_openrouter(self, model: str, prompt: str) -> dict: | |
"""OpenRouter caller""" | |
try: | |
response = requests.post( | |
"https://openrouter.ai/api/v1/chat/completions", | |
headers=self.headers["openrouter"], | |
json={ | |
"model": model, | |
"messages": [{"role": "user", "content": prompt}], | |
"temperature": 0.7, | |
"max_tokens": 3000 | |
}, | |
timeout=30 | |
) | |
if response.status_code == 200: | |
return {"success": True, "content": response.json()['choices'][0]['message']['content']} | |
return {"success": False, "error": f"API Error {response.status_code}"} | |
except Exception as e: | |
return {"success": False, "error": str(e)} | |
def analyze_query(self, query: str) -> dict: | |
"""Full analysis workflow with error resilience""" | |
try: | |
# Step 1: Multi-model decomposition | |
decompositions = {} | |
for model in self.model_config["decomposition"]: | |
result = self.call_model(model, f"Decompose this query: {query}") | |
if result["success"]: | |
decompositions[model] = self.safe_json_parse(result["content"]) | |
# Step 2: GPT-4 analysis | |
analysis_prompt = f"""Analyze this query based on decompositions: | |
{json.dumps(decompositions, indent=2)} | |
Query: {query} | |
Provide detailed analysis in JSON format.""" | |
analysis = self.call_model("gpt-4o", analysis_prompt) | |
if not analysis["success"]: | |
return {"error": "Analysis failed", "details": analysis["error"]} | |
parsed_analysis = self.safe_json_parse(analysis["content"]) | |
# Step 3: Cross-model critique | |
critiques = {} | |
for model in self.model_config["critique"]: | |
critique_prompt = f"""Critique this analysis: | |
{json.dumps(parsed_analysis, indent=2)} | |
Provide structured feedback in JSON format.""" | |
result = self.call_model(model, critique_prompt) | |
if result["success"]: | |
critiques[model] = self.safe_json_parse(result["content"]) | |
# Step 4: Final synthesis | |
synthesis_prompt = f"""Synthesize final response considering: | |
Analysis: {json.dumps(parsed_analysis, indent=2)} | |
Critiques: {json.dumps(critiques, indent=2)} | |
Provide comprehensive response with confidence score.""" | |
synthesis = self.call_model("gemini-2.0-pro-exp-02-05", synthesis_prompt) | |
return { | |
"decompositions": decompositions, | |
"analysis": parsed_analysis, | |
"critiques": critiques, | |
"synthesis": synthesis["content"] if synthesis["success"] else "Synthesis failed", | |
"success": True | |
} | |
except Exception as e: | |
return {"success": False, "error": str(e)} | |
def create_interface(): | |
try: | |
analyzer = CognitiveArchitecture() | |
except ValueError as e: | |
return gr.Blocks().launch(error_message=str(e)) | |
with gr.Blocks(title="AI Analysis Suite", theme=gr.themes.Soft(), css=""" | |
.analysis-section { margin: 15px; padding: 15px; border: 1px solid #eee; border-radius: 8px; } | |
pre { white-space: pre-wrap; background: #f8f9fa; padding: 10px; } | |
.error { color: #dc3545; background: #ffeef0; padding: 10px; border-radius: 5px; } | |
""") as demo: | |
gr.Markdown("# 🧠 Advanced AI Analysis System") | |
with gr.Row(): | |
input_box = gr.Textbox(label="Input Query", placeholder="Enter your query...", lines=3) | |
submit_btn = gr.Button("Analyze", variant="primary") | |
output_area = gr.Markdown() | |
debug_info = gr.JSON(label="Analysis Details") | |
def process_query(query): | |
start_time = time.time() | |
result = analyzer.analyze_query(query) | |
duration = time.time() - start_time | |
if not result.get("success"): | |
return f"## ❌ Analysis Error\n{result.get('error', 'Unknown error')}", {} | |
formatted_output = f""" | |
## Analysis Result | |
{result.get('synthesis', 'No synthesis available')} | |
**Processing Time**: {duration:.2f}s | |
**Models Used**: {len(result['decompositions'])} decomposition, {len(result['critiques'])} critique | |
""" | |
return formatted_output, { | |
"decompositions": result["decompositions"], | |
"analysis": result["analysis"], | |
"critiques": result["critiques"] | |
} | |
submit_btn.click( | |
process_query, | |
inputs=input_box, | |
outputs=[output_area, debug_info] | |
) | |
return demo | |
if __name__ == "__main__": | |
create_interface().launch(server_port=7860) |