Athspi's picture
Update app.py
01ff908 verified
import os
import time
import gradio as gr
import requests
import json
import re
import asyncio
import google.generativeai as genai
from openai import OpenAI
from typing import List, Dict, Tuple, Any, Optional, Union
from dataclasses import dataclass, field
from concurrent.futures import ThreadPoolExecutor
@dataclass
class CognitiveStep:
name: str
description: str
content: str = ""
metadata: Dict[str, Any] = field(default_factory=dict)
execution_time: float = 0.0
class CognitiveArchitecture:
def __init__(self, debug_mode: bool = False):
self.debug_mode = debug_mode
self.api_keys = {
"GEMINI": os.environ.get("GEMINI_API_KEY"),
"MISTRAL": os.environ.get("MISTRAL_API_KEY"),
"OPENROUTER": os.environ.get("OPENROUTER_API_KEY"),
"AZURE": os.environ.get("AZURE_API_KEY")
}
self.validate_keys()
# Initialize all AI models
genai.configure(api_key=self.api_keys["GEMINI"])
self.gemini_model = genai.GenerativeModel(
"gemini-2.0-pro-exp-02-05",
generation_config={"temperature": 0.5, "max_output_tokens": 8192}
)
self.gpt4o_client = OpenAI(
base_url="https://models.inference.ai.azure.com",
api_key=self.api_keys["AZURE"]
)
self.models = {
"DeepSeek": "deepseek/deepseek-chat:free", # Updated to DeepSeek
"Qwen": "qwen/qwen-vl-plus:free",
"Llama": "meta-llama/llama-3.3-70b-instruct:free",
"Mistral": "mistral-large-latest",
"GPT4o": "gpt-4o"
}
self.headers = {
"OpenRouter": {
"Authorization": f"Bearer {self.api_keys['OPENROUTER']}",
"Content-Type": "application/json"
},
"Mistral": {
"Authorization": f"Bearer {self.api_keys['MISTRAL']}",
"Content-Type": "application/json",
"Accept": "application/json"
}
}
self.memory = []
self.thinking_steps = []
self.executor = ThreadPoolExecutor(max_workers=5)
def validate_keys(self):
missing_keys = [key for key, value in self.api_keys.items() if not value]
if missing_keys:
if self.debug_mode:
print(f"Warning: Missing API keys: {', '.join(missing_keys)}")
else:
raise ValueError(f"Missing API keys: {', '.join(missing_keys)}")
def log(self, message: str, level: str = "INFO"):
"""Enhanced logging with timestamps"""
if self.debug_mode:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
print(f"[{timestamp}] [{level}] {message}")
async def call_model_async(self, model_role: str, prompt: str, context: List[Dict] = None) -> str:
"""Asynchronous model router with advanced error handling"""
self.log(f"Calling {model_role} model")
start_time = time.time()
try:
if model_role == "Gemini":
response = await asyncio.to_thread(
self.gemini_model.generate_content, prompt
)
result = response.text
elif model_role == "Mistral":
result = await asyncio.to_thread(
self._call_mistral, prompt, context
)
elif model_role == "GPT4o":
result = await asyncio.to_thread(
self._call_gpt4o, prompt, context
)
elif model_role == "DeepSeek":
result = await asyncio.to_thread(
self._call_deepseek, prompt, context # Updated to DeepSeek
)
# Handle OpenRouter models
else:
payload = {
"model": self.models.get(model_role, model_role),
"messages": context if context else [{"role": "user", "content": prompt}],
"temperature": 0.7,
"max_tokens": 3096,
"top_p": 0.9
}
async with asyncio.timeout(30):
response = await asyncio.to_thread(
requests.post,
"https://openrouter.ai/api/v1/chat/completions",
headers=self.headers["OpenRouter"],
json=payload
)
if response.status_code == 200:
result = response.json()['choices'][0]['message']['content']
else:
result = f"API Error {response.status_code}: {response.text}"
execution_time = time.time() - start_time
self.log(f"{model_role} completed in {execution_time:.2f}s")
return result
except Exception as e:
self.log(f"Model Error ({model_role}): {str(e)}", "ERROR")
return f"Error with {model_role}: {str(e)}"
def call_model(self, model_role: str, prompt: str, context: List[Dict] = None) -> str:
"""Synchronous wrapper for legacy compatibility"""
return asyncio.run(self.call_model_async(model_role, prompt, context))
def _call_mistral(self, prompt: str, context: List[Dict] = None) -> str:
"""Direct Mistral API call with improved error handling"""
try:
payload = {
"model": self.models["Mistral"],
"messages": context if context else [{"role": "user", "content": prompt}],
"temperature": 0.7,
"max_tokens": 4096,
"top_p": 0.9
}
response = requests.post(
"https://api.mistral.ai/v1/chat/completions",
headers=self.headers["Mistral"],
json=payload,
timeout=30
)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
self.log(f"Mistral API error: {response.status_code} - {response.text}", "ERROR")
return f"API Error {response.status_code}"
except Exception as e:
self.log(f"Mistral API Error: {str(e)}", "ERROR")
return f"Error: {str(e)}"
def _call_gpt4o(self, prompt: str, context: List[Dict] = None) -> str:
"""Azure Inference API for GPT-4o with retry logic"""
max_retries = 2
retry_count = 0
while retry_count <= max_retries:
try:
messages = context if context else [
{"role": "system", "content": "You are an expert analyst with multi-step reasoning capabilities."},
{"role": "user", "content": prompt}
]
response = self.gpt4o_client.chat.completions.create(
model=self.models["GPT4o"],
messages=messages,
temperature=0.7,
top_p=0.95,
max_tokens=2000
)
return response.choices[0].message.content
except Exception as e:
retry_count += 1
if retry_count <= max_retries:
self.log(f"GPT-4o Error, retrying ({retry_count}/{max_retries}): {str(e)}", "WARNING")
time.sleep(2) # Backoff before retry
else:
self.log(f"GPT-4o Error after retries: {str(e)}", "ERROR")
return f"Error after {max_retries} retries: {str(e)}"
def _call_deepseek(self, prompt: str, context: List[Dict] = None) -> str:
"""DeepSeek API integration"""
try:
if context:
messages = [{"role": m["role"], "content": m["content"]} for m in context]
else:
messages = [{"role": "user", "content": prompt}]
payload = {
"model": self.models["DeepSeek"],
"messages": messages,
"max_tokens": 4000,
"temperature": 0.5
}
response = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers=self.headers["OpenRouter"],
json=payload,
timeout=45
)
if response.status_code == 200:
return response.json()['choices'][0]['message']['content']
self.log(f"DeepSeek API error: {response.status_code} - {response.text}", "ERROR")
return f"API Error {response.status_code}"
except Exception as e:
self.log(f"DeepSeek API Error: {str(e)}", "ERROR")
return f"Error: {str(e)}"
async def hierarchical_reasoning(self, query: str) -> Tuple[str, dict]:
"""Nine-stage AGI reasoning pipeline with concurrent model calling"""
self.thinking_steps = []
try:
# Stage 1: Conceptual Decomposition (Mistral)
decomp_start = time.time()
decomposition_prompt = f"""Decompose the following query into detailed components:
QUERY: "{query}"
Output format:
- Primary Intent: [What is the main goal]
- Implicit Assumptions: [List all unstated assumptions]
- Required Knowledge Domains: [Specific domains needed to answer]
- Potential Biases: [Cognitive biases that might affect reasoning]
- Key Constraints: [Limitations or boundaries]
- Sub-Questions: [List of component questions needed to fully address]
"""
decomposition = await self.call_model_async("Mistral", decomposition_prompt)
decomp_time = time.time() - decomp_start
self.thinking_steps.append(CognitiveStep(
name="Conceptual Decomposition",
description="Breaking down the query into its foundational components",
content=decomposition,
execution_time=decomp_time
))
# Stage 2: Parallel Deep Analysis (Multiple models concurrently)
analysis_tasks = [
self.call_model_async(
"GPT4o",
f"""Analyze this query using first principles thinking:
QUERY: {query}
DECOMPOSITION: {decomposition}
Include multiple angles of analysis, potential solution paths, and identify knowledge gaps."""
),
self.call_model_async(
"DeepSeek", # Updated to DeepSeek
f"""Generate a systematic analysis framework for addressing:
"{query}"
Focus on:
1. Deep structure of the problem
2. Alternative perspectives
3. Root causes and implications
4. Knowledge requirements
"""
),
self.call_model_async(
"Mistral",
f"""Create a comprehensive concept map for the query:
"{query}"
Map out:
- Core concepts
- Their relationships
- Dependencies
- Decision points
- Critical factors
"""
)
]
analysis_start = time.time()
analysis_results = await asyncio.gather(*analysis_tasks)
analysis_time = time.time() - analysis_start
# Combine the analyses with attribution
combined_analysis = f"""
## GPT-4o Analysis
{analysis_results[0]}
## DeepSeek Analysis
{analysis_results[1]}
## Mistral Concept Map
{analysis_results[2]}
"""
self.thinking_steps.append(CognitiveStep(
name="Multi-Model Deep Analysis",
description="Parallel processing across different reasoning systems",
content=combined_analysis,
execution_time=analysis_time
))
# Stage 3: Contextual Grounding (Qwen)
context_start = time.time()
context = await self.call_model_async(
"Qwen",
f"""Generate comprehensive context for addressing this query:
"{query}"
Include:
- Relevant background information
- Historical context
- Current state of the art
- Common misconceptions
- Established frameworks
- Similar problems and their solutions
"""
)
context_time = time.time() - context_start
self.thinking_steps.append(CognitiveStep(
name="Contextual Grounding",
description="Establishing broader context and knowledge framework",
content=context,
execution_time=context_time
))
# Stage 4: Critical Evaluation (Llama)
critique_start = time.time()
critique = await self.call_model_async(
"Llama",
f"""Perform a comprehensive critique of the analysis so far:
QUERY: {query}
DECOMPOSITION: {decomposition}
ANALYSIS: {combined_analysis}
CONTEXT: {context}
Evaluate for:
- Logical fallacies
- Gaps in reasoning
- Unfounded assumptions
- Alternative interpretations
- Counterarguments
- Strength of evidence
"""
)
critique_time = time.time() - critique_start
self.thinking_steps.append(CognitiveStep(
name="Critical Evaluation",
description="Rigorously challenging the analysis through critical thinking",
content=critique,
execution_time=critique_time
))
# Stage 5: Ethical Consideration (DeepSeek)
ethics_start = time.time()
ethics = await self.call_model_async(
"DeepSeek", # Updated to DeepSeek
f"""Analyze the ethical dimensions of responding to:
"{query}"
Consider:
- Stakeholder impacts
- Value conflicts
- Potential for harm
- Justice and fairness implications
- Transparency requirements
- Long-term consequences
- Ethical frameworks applicable (deontological, utilitarian, virtue ethics, etc.)
Provide concrete ethical recommendations.
"""
)
ethics_time = time.time() - ethics_start
self.thinking_steps.append(CognitiveStep(
name="Ethical Analysis",
description="Evaluating moral implications and ethical considerations",
content=ethics,
execution_time=ethics_time
))
# Stage 6: Innovation Generation (DeepSeek)
innovation_start = time.time()
innovation = await self.call_model_async(
"DeepSeek",
f"""Generate innovative approaches and novel perspectives for addressing:
"{query}"
Go beyond conventional thinking to propose:
- Creative frameworks
- Interdisciplinary approaches
- Unexpected connections
- Paradigm shifts
- Breakthrough methodologies
"""
)
innovation_time = time.time() - innovation_start
self.thinking_steps.append(CognitiveStep(
name="Innovation Generation",
description="Creating novel approaches and unconventional perspectives",
content=innovation,
execution_time=innovation_time
))
# Stage 7: Integration (Gemini)
integration_start = time.time()
integration = await self.call_model_async(
"Gemini",
f"""Integrate all preceding analyses into a coherent framework:
COMPONENTS:
- Decomposition: {decomposition}
- Analysis: {combined_analysis}
- Context: {context}
- Critique: {critique}
- Ethics: {ethics}
- Innovation: {innovation}
Create a unified, comprehensive understanding that resolves contradictions
and synthesizes insights from all components. Structure your integration
systematically, addressing each major aspect of the query.
"""
)
integration_time = time.time() - integration_start
self.thinking_steps.append(CognitiveStep(
name="Integration",
description="Synthesizing all insights into a unified framework",
content=integration,
execution_time=integration_time
))
# Stage 8: Response Synthesis (GPT-4o)
synthesis_start = time.time()
synthesis = await self.call_model_async(
"GPT4o",
f"""Synthesize a complete response based on all analysis:
ORIGINAL QUERY: "{query}"
INTEGRATION FRAMEWORK: {integration}
Create a comprehensive, well-structured response that:
1. Directly addresses the core query
2. Incorporates key insights from all analyses
3. Presents multiple perspectives where relevant
4. Acknowledges limitations and uncertainties
5. Provides actionable conclusions
Format your response for clarity and impact.
"""
)
synthesis_time = time.time() - synthesis_start
self.thinking_steps.append(CognitiveStep(
name="Response Synthesis",
description="Crafting a comprehensive answer from the integrated analysis",
content=synthesis,
execution_time=synthesis_time
))
# Stage 9: Validation & Refinement (DeepSeek)
validation_start = time.time()
validation = await self.call_model_async(
"DeepSeek", # Updated to DeepSeek
f"""Validate and refine this comprehensive response:
ORIGINAL QUERY: "{query}"
PROPOSED RESPONSE:
{synthesis}
Please evaluate this response for:
- Accuracy and factual correctness
- Completeness (addressing all aspects of the query)
- Clarity and coherence
- Logical consistency
- Relevance to the original query
- Balance and fairness
Then provide an optimized final version that addresses any identified issues
while maintaining the core insights and structure.
"""
)
validation_time = time.time() - validation_start
self.thinking_steps.append(CognitiveStep(
name="Validation & Refinement",
description="Final quality assurance and optimization",
content=validation,
execution_time=validation_time
))
# Extract metadata for analysis
structured_data = {
"components": self.extract_structured_data(decomposition),
"analysis": self.extract_structured_data(combined_analysis),
"validation": self.extract_structured_data(validation),
"execution_metrics": {
"total_time": sum(step.execution_time for step in self.thinking_steps),
"step_times": {step.name: step.execution_time for step in self.thinking_steps}
}
}
# Add to memory for future reference
self.memory.append({
"query": query,
"response": validation,
"thinking_steps": [
{"name": step.name, "content": step.content} for step in self.thinking_steps
],
"timestamp": time.time()
})
return validation, structured_data
except Exception as e:
error_msg = f"Reasoning Error: {str(e)}"
self.log(error_msg, "ERROR")
return f"Cognitive processing failed: {error_msg}", {}
def extract_structured_data(self, text: str) -> dict:
"""Advanced text parsing with multi-strategy fallbacks"""
try:
# Strategy 1: JSON extraction
json_match = re.search(r'\{.*\}', text, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(0))
except json.JSONDecodeError:
pass # Continue to next strategy
# Strategy 2: Markdown list parsing
structured_data = {}
section_pattern = r'##?\s+(.+?)\n(.*?)(?=##?\s+|\Z)'
sections = re.findall(section_pattern, text, re.DOTALL)
if sections:
for title, content in sections:
structured_data[title.strip().lower().replace(' ', '_')] = content.strip()
return structured_data
# Strategy 3: Bullet point parsing
bullet_pattern = r'[-\*]\s+([^:]+):\s*(.*?)(?=[-\*]|\Z)'
bullets = re.findall(bullet_pattern, text, re.DOTALL)
if bullets:
for key, value in bullets:
structured_data[key.strip().lower().replace(' ', '_')] = value.strip()
return structured_data
# Strategy 4: Key-value line parsing
line_pattern = r'([^:]+):\s*(.*)'
lines = re.findall(line_pattern, text)
if lines:
for key, value in lines:
structured_data[key.strip().lower().replace(' ', '_')] = value.strip()
return structured_data
# Fallback
return {"content": text}
except Exception as e:
self.log(f"Parsing Error: {str(e)}", "ERROR")
return {"error": "Failed to parse response", "raw_text": text}
def visualize_thought_process(self) -> str:
"""Interactive process visualization with timing data"""
if not self.thinking_steps:
return "<div class='error'>No thinking process data available</div>"
total_time = sum(step.execution_time for step in self.thinking_steps)
vis = ["<div class='cognitive-process'>"]
vis.append("<h2>Cognitive Process Breakdown</h2>")
vis.append(f"<div class='total-time'>Total Processing Time: {total_time:.2f}s</div>")
# Add timeline visualization
vis.append("<div class='timeline'>")
for step in self.thinking_steps:
percentage = (step.execution_time / total_time) * 100
vis.append(f"""
<div class='timeline-bar' style='width: {percentage}%;'>
<div class='step-name'>{step.name}</div>
<div class='step-time'>{step.execution_time:.2f}s</div>
</div>
""")
vis.append("</div>")
# Add detailed step breakdown
for i, step in enumerate(self.thinking_steps):
vis.append(f"""
<div class='process-step' id='step-{i}'>
<div class='step-header'>
<h3>{step.name}</h3>
<div class='step-info'>
<span class='step-number'>Step {i+1}/{len(self.thinking_steps)}</span>
<span class='step-time'>{step.execution_time:.2f}s</span>
</div>
</div>
<div class='step-description'>{step.description}</div>
<pre class='step-content'>{step.content}</pre>
</div>
""")
vis.append("</div>")
return "\n".join(vis)
def create_agi_interface():
try:
agi = CognitiveArchitecture(debug_mode=True)
except ValueError as e:
return gr.Blocks().launch(error_message=str(e))
with gr.Blocks(title="Advanced AGI Reasoning Framework", theme=gr.themes.Soft(), css="""
.cognitive-process {
max-width: 1200px;
margin: 0 auto;
}
.total-time {
font-size: 1.2em;
font-weight: bold;
margin: 15px 0;
color: #2a4365;
}
.timeline {
display: flex;
height: 40px;
background: #f0f0f0;
margin: 20px 0;
border-radius: 4px;
overflow: hidden;
}
.timeline-bar {
height: 100%;
display: flex;
flex-direction: column;
justify-content: center;
align-items: center;
background: #4299e1;
color: white;
font-size: 0.8em;
position: relative;
min-width: 30px;
padding: 0 5px;
}
.timeline-bar:nth-child(odd) {
background: #3182ce;
}
.step-name, .step-time {
white-space: nowrap;
overflow: hidden;
text-overflow: ellipsis;
}
.process-step {
margin: 25px 0;
padding: 20px;
border: 1px solid #e0e0e0;
border-radius: 8px;
background: #fafafa;
box-shadow: 0 2px 4px rgba(0,0,0,0.05);
}
.step-header {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 10px;
}
.step-header h3 {
color: #2b6cb0;
margin: 0;
font-size: 1.2em;
}
.step-info {
display: flex;
gap: 15px;
font-size: 0.9em;
}
.step-number {
color: #4a5568;
}
.step-time {
color: #2d3748;
font-weight: bold;
}
.step-description {
color: #4a5568;
margin-bottom: 15px;
font-style: italic;
}
.step-content {
white-space: pre-wrap;
background: #f8f9fa;
padding: 15px;
border-radius: 6px;
border: 1px solid #eee;
font-family: monospace;
font-size: 0.9em;
overflow-x: auto;
max-height: 400px;
overflow-y: auto;
}
.error {
color: #e53e3e;
padding: 20px;
text-align: center;
font-weight: bold;
}
""") as demo:
gr.Markdown("# 🧠 Advanced AGI Cognitive Reasoning Framework")
with gr.Row():
with gr.Column(scale=3):
input_box = gr.Textbox(
label="Input Query",
placeholder="Enter your complex request or question...",
lines=5
)
with gr.Column(scale=1):
with gr.Row():
process_btn = gr.Button("Begin Cognitive Processing", variant="primary", size="lg")
with gr.Row():
clear_btn = gr.Button("Clear", variant="secondary")
with gr.Accordion("Advanced Options", open=False):
thinking_depth = gr.Slider(
minimum=1,
maximum=9,
value=9,
step=1,
label="Reasoning Depth",
info="Number of cognitive steps to perform"
)
with gr.Tabs():
with gr.TabItem("Response"):
output = gr.Markdown()
with gr.TabItem("Cognitive Process"):
process_visual = gr.HTML()
with gr.TabItem("Performance Metrics"):
metrics = gr.JSON()
async def process_query(query, depth):
agi.log(f"Processing query with depth {depth}: {query}")
progress_bar = gr.Progress()
progress_bar(0, desc="Initializing...")
try:
start_time = time.time()
# Limit the steps based on depth setting
agi.thinking_steps = agi.thinking_steps[:depth] if agi.thinking_steps else []
final, metadata = await agi.hierarchical_reasoning(query)
process_time = time.time() - start_time
# Prepare performance metrics
steps_data = []
for step in agi.thinking_steps:
steps_data.append({
"name": step.name,
"time": step.execution_time,
"percentage": (step.execution_time / process_time) * 100
})
metrics_data = {
"total_time": process_time,
"steps_completed": len(agi.thinking_steps),
"average_step_time": sum(s["time"] for s in steps_data) / len(steps_data) if steps_data else 0,
"steps": steps_data,
"metadata": metadata
}
return (
f"## Optimized Response\n{final}\n\n"
f"**Processing Time**: {process_time:.2f}s\n"
f"**Cognitive Steps Executed**: {len(agi.thinking_steps)}",
agi.visualize_thought_process(),
metrics_data
)
except Exception as e:
return (
f"## Error Processing Query\n\nAn error occurred: {str(e)}",
f"<div class='error'>Processing error: {str(e)}</div>",
{"error": str(e)}
)
def clear_interface():
return "", "", None
process_btn.click(
fn=process_query,
inputs=[input_box, thinking_depth],
outputs=[output, process_visual, metrics]
)
clear_btn.click(
fn=clear_interface,
inputs=[],
outputs=[output, process_visual, metrics]
)
return demo
if __name__ == "__main__":
app = create_agi_interface()
app.launch(
server_name="0.0.0.0",
server_port=7860
)