Spaces:
Running
Running
#!/usr/bin/env python3 | |
# app.py β CHB v7.1 Professional Frontend (CLI + API, animated splash, ChatGPT-style TUI) | |
# Requires: rich, fastapi, uvicorn | |
# Brain file expected: multimodular_modul_v7.py (same folder) | |
import os, sys, time, json, argparse, pathlib, base64, random, string, datetime, threading | |
from typing import Any, Dict, Tuple, Optional | |
# -------------------- Optional deps (graceful) -------------------- | |
try: | |
from rich.console import Console | |
from rich.panel import Panel | |
from rich.table import Table | |
from rich.layout import Layout | |
from rich.live import Live | |
from rich.align import Align | |
from rich.markdown import Markdown | |
from rich.text import Text | |
from rich.box import ROUNDED | |
except Exception as e: | |
print("Missing 'rich'. Install: pip install rich") | |
raise | |
try: | |
from fastapi import FastAPI, Request, HTTPException | |
import uvicorn | |
except Exception: | |
FastAPI = None # API disabled if FastAPI not installed | |
# -------------------- Load brain module -------------------- | |
BRAIN_CANDIDATES = [ | |
"multimodular_modul_v7", # provided by user | |
] | |
def load_brain(): | |
for name in BRAIN_CANDIDATES: | |
try: | |
mod = __import__(name) | |
# Common exports | |
if hasattr(mod, "AGENT"): return getattr(mod, "AGENT") | |
if hasattr(mod, "agent"): return getattr(mod, "agent") | |
for cls in ("SuperAgentV7","MultimodalBrain","SuperAgent","Agent","Brain"): | |
if hasattr(mod, cls): | |
return getattr(mod, cls)() | |
return mod | |
except Exception: | |
continue | |
return None | |
BRAIN = load_brain() | |
def brain_call(fn_candidates, *args, **kwargs) -> Tuple[bool, Any]: | |
"""Try multiple function names on brain (and nested .chb).""" | |
if BRAIN is None: return False, "Brain not loaded" | |
# direct on brain | |
for fn in fn_candidates: | |
f = getattr(BRAIN, fn, None) | |
if callable(f): | |
try: return True, f(*args, **kwargs) | |
except Exception as e: return False, f"{fn} error: {e}" | |
# nested .chb | |
chb = getattr(BRAIN, "chb", None) | |
if chb: | |
for fn in fn_candidates: | |
f = getattr(chb, fn, None) | |
if callable(f): | |
try: return True, f(*args, **kwargs) | |
except Exception as e: return False, f"chb.{fn} error: {e}" | |
return False, f"Missing: {', '.join(fn_candidates)}" | |
# -------------------- API Keys & Limits -------------------- | |
API_KEYS_FILE = "api_keys.json" | |
DAILY_LIMIT = 1000 | |
API_LOCK = threading.Lock() | |
def _load_keys() -> Dict[str, Dict[str, Any]]: | |
if os.path.exists(API_KEYS_FILE): | |
try: | |
return json.load(open(API_KEYS_FILE,"r")) | |
except Exception: | |
return {} | |
return {} | |
def _save_keys(data: Dict[str, Any]): | |
json.dump(data, open(API_KEYS_FILE,"w"), indent=2) | |
def _today() -> str: | |
return datetime.date.today().isoformat() | |
def api_key_generate() -> str: | |
with API_LOCK: | |
data = _load_keys() | |
key = ''.join(random.choices(string.ascii_letters + string.digits, k=25)) | |
data[key] = {"usage": 0, "last_reset": _today(), "created": datetime.datetime.utcnow().isoformat()+"Z"} | |
_save_keys(data) | |
return key | |
def api_key_revoke(key: str) -> bool: | |
with API_LOCK: | |
data = _load_keys() | |
if key in data: | |
del data[key] | |
_save_keys(data) | |
return True | |
return False | |
def api_key_validate_and_count(key: str) -> bool: | |
with API_LOCK: | |
data = _load_keys() | |
if key not in data: return False | |
# reset day | |
if data[key].get("last_reset") != _today(): | |
data[key]["usage"] = 0 | |
data[key]["last_reset"] = _today() | |
if data[key]["usage"] >= DAILY_LIMIT: return False | |
data[key]["usage"] += 1 | |
_save_keys(data) | |
return True | |
def api_stats() -> Dict[str, Any]: | |
with API_LOCK: | |
data = _load_keys() | |
return { | |
"active_keys": len(data), | |
"limit_per_day": DAILY_LIMIT, | |
"keys": data | |
} | |
# -------------------- FastAPI (shared brain) -------------------- | |
APP = None | |
if FastAPI: | |
APP = FastAPI(title="CHB Universal Brain API", version="7.1") | |
async def api_chat(req: Request): | |
key = req.headers.get("x-api-key") | |
if not key or not api_key_validate_and_count(key): | |
raise HTTPException(status_code=401, detail="Invalid key or limit reached") | |
data = await req.json() | |
msg = data.get("message","") | |
ok, res = brain_call(["ctb_handle","process_input","chat","chat_message","handle_input"], input_data=msg) | |
if not ok: raise HTTPException(status_code=500, detail=str(res)) | |
return {"reply": res} | |
async def api_search(req: Request): | |
key = req.headers.get("x-api-key") | |
if not key or not api_key_validate_and_count(key): | |
raise HTTPException(status_code=401, detail="Invalid key or limit reached") | |
q = (await req.json()).get("query","") | |
ok, res = brain_call(["search_kb","facts_search","search_facts","query_facts"], q) | |
if not ok: raise HTTPException(status_code=500, detail=str(res)) | |
return {"results": res} | |
async def api_upload(req: Request): | |
key = req.headers.get("x-api-key") | |
if not key or not api_key_validate_and_count(key): | |
raise HTTPException(status_code=401, detail="Invalid key or limit reached") | |
data = await req.json() | |
path = data.get("path") | |
b64 = data.get("base64") | |
tmp_path = None | |
if b64: | |
raw = base64.b64decode(b64) | |
tmp_path = f"upload_{int(time.time())}.bin" | |
open(tmp_path,"wb").write(raw) | |
path = tmp_path | |
if not path: raise HTTPException(status_code=400, detail="Provide 'path' or 'base64'") | |
payload = _plan_results_for_path(path) | |
ok, res = brain_call(["submit_plan_results","handle_plan_results","submit_plan"], plan_id=f"api_upload_{int(time.time())}", results=payload) | |
if tmp_path and os.path.exists(tmp_path): os.remove(tmp_path) | |
if not ok: raise HTTPException(status_code=500, detail=str(res)) | |
return {"status":"ok","details":res} | |
def api_skills(tag: Optional[str] = None, top: int = 10): | |
# no key required here? Make it protected too: | |
raise HTTPException(status_code=405, detail="Use POST /skills with x-api-key") | |
async def api_skills_post(req: Request): | |
key = req.headers.get("x-api-key") | |
if not key or not api_key_validate_and_count(key): | |
raise HTTPException(status_code=401, detail="Invalid key or limit reached") | |
data = await req.json() | |
tag = data.get("tag","") | |
top = int(data.get("top",10)) | |
if tag: | |
ok, res = brain_call(["top_skills","top_by_tag"], tag, top) | |
else: | |
ok, res = brain_call(["list_skills","get_skills"]) | |
if not ok: raise HTTPException(status_code=500, detail=str(res)) | |
return {"skills": res} | |
def api_backup_get(): | |
return {"detail":"Use POST /backup with x-api-key"} | |
async def api_backup(req: Request): | |
key = req.headers.get("x-api-key") | |
if not key or not api_key_validate_and_count(key): | |
raise HTTPException(status_code=401, detail="Invalid key or limit reached") | |
ok, path = brain_call(["download_latest_backup","latest_backup","get_latest_backup"]) | |
if not ok or not path: | |
ok2, created = brain_call(["backup_create","create_backup","create_backup_zip"]) | |
if not ok2: raise HTTPException(status_code=500, detail=str(created)) | |
path = created | |
return {"backup_path": path} | |
def api_status(): | |
# Public status (no key): only meta | |
meta = {"api": "online", "keys": api_stats()["active_keys"], "limit_per_day": DAILY_LIMIT} | |
return meta | |
def api_key_new(): | |
return {"api_key": api_key_generate()} | |
async def api_key_revoke(req: Request): | |
data = await req.json() | |
key = data.get("api_key") | |
if not key: raise HTTPException(status_code=400, detail="api_key required") | |
ok = api_key_revoke(key) | |
if not ok: raise HTTPException(status_code=404, detail="Key not found") | |
return {"status":"revoked"} | |
# -------------------- CLI (ChatGPT-style TUI) -------------------- | |
console = Console() | |
CHB_ASCII = """ | |
ββββββββββ ββββββββββ | |
βββββββββββ βββββββββββ | |
βββ ββββββββββββββββ | |
βββ ββββββββββββββββ | |
βββββββββββ βββββββββββ | |
ββββββββββ ββββββββββ | |
""" | |
def boot_splash(): | |
console.clear() | |
panel = Panel.fit(Text(CHB_ASCII, style="bold cyan"), title="Close-to-Human Brain v7.1", border_style="cyan", padding=(1,4)) | |
console.print(panel, justify="center") | |
steps = [ | |
"Initializing Universal Brain", | |
"Loading Core Modules", | |
"Connecting Global Sync", | |
"Activating CTB Pipeline", | |
"Preparing Creative Skill Vault" | |
] | |
for s in steps: | |
console.print(f"[cyan]β {s}...[/cyan]") | |
time.sleep(0.5) | |
console.print("[bold green]β Ready![/bold green]") | |
time.sleep(0.6) | |
console.clear() | |
def _layout() -> Layout: | |
layout = Layout() | |
layout.split( | |
Layout(name="header", size=5), | |
Layout(name="body", ratio=1), | |
Layout(name="footer", size=3), | |
) | |
layout["body"].split_row( | |
Layout(name="sidebar", size=26), | |
Layout(name="chat", ratio=1) | |
) | |
return layout | |
SIDEBAR_ITEMS = [ | |
("π¬", "Multimodal Chat"), | |
("π", "Knowledge Search"), | |
("π€", "Upload"), | |
("πΎ", "Backup/Restore"), | |
("π¨", "Creative Vault"), | |
("π", "Sync Status"), | |
("π ", "API Keys"), | |
("π΄", "Offline Mode") | |
] | |
CHAT_HISTORY = [] | |
def status_bar() -> Text: | |
stats = api_stats() | |
sync_ok, sync = brain_call(["global_sync_status","sync_status","get_sync_status"]) | |
sync_txt = "Active" if (sync_ok and str(sync).lower().find("off")==-1) else "Idle" | |
t = Text(f"π API: {'ON' if APP else 'OFF'} π Keys: {stats.get('active_keys',0)} π Sync: {sync_txt} β³ Limit: {DAILY_LIMIT}/day") | |
t.stylize("cyan") | |
return t | |
def sidebar_panel(active: str) -> Panel: | |
table = Table.grid(padding=(0,1)) | |
for icon, label in SIDEBAR_ITEMS: | |
style = "bold white" if label==active else "grey70" | |
table.add_row(Text(f"{icon} {label}", style=style)) | |
return Panel(table, title="CHB", border_style="cyan", box=ROUNDED) | |
def header_panel() -> Panel: | |
return Panel(Text("CHB Universal Brain v7.1", style="bold cyan"), border_style="cyan", box=ROUNDED) | |
def chat_panel() -> Panel: | |
md = "" | |
for role, msg in CHAT_HISTORY[-100:]: | |
if role=="user": md += f"**You:** {msg}\n\n" | |
else: md += f"**CHB:** {msg}\n\n" | |
if not md: | |
md = "**CHB:** Hello! Type your message. Use `/help` for commands.\n" | |
return Panel(Markdown(md), border_style="cyan", box=ROUNDED) | |
def footer_panel() -> Panel: | |
return Panel(status_bar(), border_style="cyan", box=ROUNDED) | |
def _plan_results_for_path(path: str) -> Dict[str, Any]: | |
p = pathlib.Path(path) | |
suffix = p.suffix.lower() | |
payload = {} | |
if suffix in (".png",".jpg",".jpeg",".webp",".bmp"): | |
payload["images"] = [{"path": str(p), "quality_score": 0.9, "caption": "", "tags": []}] | |
elif suffix in (".mp4",".mov",".mkv",".webm"): | |
payload["videos"] = [{"path": str(p), "quality_score": 0.8, "caption": "", "tags": []}] | |
elif suffix in (".mp3",".wav",".m4a",".ogg"): | |
payload["audios"] = [{"path": str(p), "quality_score": 0.8, "caption": "", "tags": []}] | |
else: | |
payload["files"] = [{"path": str(p), "meta": {"size": p.stat().st_size if p.exists() else 0}}] | |
return payload | |
def run_cli(): | |
boot_splash() | |
active = "Multimodal Chat" | |
layout = _layout() | |
layout["header"].update(header_panel()) | |
layout["sidebar"].update(sidebar_panel(active)) | |
layout["chat"].update(chat_panel()) | |
layout["footer"].update(footer_panel()) | |
with Live(layout, refresh_per_second=10, screen=True): | |
while True: | |
try: | |
console.print() # spacing under Live | |
user = console.input("[bold blue]You βΊ [/bold blue]").strip() | |
except (KeyboardInterrupt, EOFError): | |
break | |
if not user: continue | |
# Slash commands (stay chat-first; sidebar is thematic) | |
if user == "/help": | |
help_text = ( | |
"**Commands**\n" | |
"β’ /upload <path> β process media/file\n" | |
"β’ /search <query> β knowledge search\n" | |
"β’ /backup β create/show latest backup\n" | |
"β’ /status β show global sync status\n" | |
"β’ /keys new|revoke <key>|list β manage API keys\n" | |
"β’ /offline β toggle offline mode\n" | |
"β’ /clear β clear chat\n" | |
"β’ /quit β exit\n" | |
) | |
CHAT_HISTORY.append(("chb", help_text)) | |
layout["chat"].update(chat_panel()); continue | |
if user.startswith("/upload "): | |
path = user.split(" ",1)[1].strip().strip('"').strip("'") | |
payload = _plan_results_for_path(path) | |
ok, res = brain_call(["submit_plan_results","handle_plan_results","submit_plan"], plan_id=f"cli_upload_{int(time.time())}", results=payload) | |
CHAT_HISTORY.append(("user", user)) | |
CHAT_HISTORY.append(("chb", f"Upload processed: {res if ok else res}")) | |
layout["chat"].update(chat_panel()); continue | |
if user.startswith("/search "): | |
q = user.split(" ",1)[1].strip() | |
ok, res = brain_call(["search_kb","facts_search","search_facts","query_facts"], q) | |
CHAT_HISTORY.append(("user", user)) | |
CHAT_HISTORY.append(("chb", f"Search results:\n{res if ok else res}")) | |
layout["chat"].update(chat_panel()); continue | |
if user == "/backup": | |
ok, path = brain_call(["download_latest_backup","latest_backup","get_latest_backup"]) | |
if not (ok and path): | |
ok2, created = brain_call(["backup_create","create_backup","create_backup_zip"]) | |
CHAT_HISTORY.append(("user","/backup")) | |
CHAT_HISTORY.append(("chb", f"Backup: {created if ok2 else path}")) | |
else: | |
CHAT_HISTORY.append(("user","/backup")) | |
CHAT_HISTORY.append(("chb", f"Latest backup: {path}")) | |
layout["chat"].update(chat_panel()); continue | |
if user == "/status": | |
ok, res = brain_call(["global_sync_status","sync_status","get_sync_status"]) | |
CHAT_HISTORY.append(("user","/status")) | |
CHAT_HISTORY.append(("chb", f"Sync status: {res if ok else res}")) | |
layout["footer"].update(footer_panel()) | |
layout["chat"].update(chat_panel()); continue | |
if user.startswith("/keys"): | |
parts = user.split() | |
if len(parts)==2 and parts[1]=="new": | |
k = api_key_generate() | |
CHAT_HISTORY.append(("user", user)) | |
CHAT_HISTORY.append(("chb", f"New API key: `{k}`")) | |
elif len(parts)==3 and parts[1]=="revoke": | |
ok = api_key_revoke(parts[2]) | |
CHAT_HISTORY.append(("user", user)) | |
CHAT_HISTORY.append(("chb", "Key revoked" if ok else "Key not found")) | |
else: | |
stats = api_stats() | |
CHAT_HISTORY.append(("user", user)) | |
CHAT_HISTORY.append(("chb", f"Keys: {list(stats.get('keys',{}).keys())}")) | |
layout["footer"].update(footer_panel()) | |
layout["chat"].update(chat_panel()); continue | |
if user == "/offline": | |
ok, res = brain_call(["toggle_offline","set_offline","offline_toggle"]) | |
CHAT_HISTORY.append(("user","/offline")) | |
CHAT_HISTORY.append(("chb", f"Offline toggled: {res if ok else res}")) | |
layout["footer"].update(footer_panel()) | |
layout["chat"].update(chat_panel()); continue | |
if user == "/clear": | |
CHAT_HISTORY.clear() | |
layout["chat"].update(chat_panel()); continue | |
if user in ("/quit","/exit"): | |
break | |
# Normal chat -> brain | |
CHAT_HISTORY.append(("user", user)) | |
ok, resp = brain_call(["ctb_handle","process_input","chat","chat_message","handle_input"], input_data=user) | |
CHAT_HISTORY.append(("chb", resp if ok else str(resp))) | |
layout["chat"].update(chat_panel()) | |
# -------------------- Entry -------------------- | |
def main(): | |
parser = argparse.ArgumentParser(description="CHB v7.1 Frontend") | |
parser.add_argument("--api", action="store_true", help="Run API server only") | |
parser.add_argument("--cli", action="store_true", help="Run CLI only") | |
parser.add_argument("--both", action="store_true", help="Run CLI and API together") | |
parser.add_argument("--host", default="0.0.0.0") | |
parser.add_argument("--port", type=int, default=8000) | |
args = parser.parse_args() | |
if args.api and not FastAPI: | |
console.print("[red]FastAPI/uvicorn not installed. Run: pip install fastapi uvicorn[/red]") | |
sys.exit(1) | |
if args.api and not args.cli and not args.both: | |
# API only | |
if APP is None: | |
console.print("[red]API unavailable.[/red]"); sys.exit(1) | |
uvicorn.run(APP, host=args.host, port=args.port) | |
return | |
if args.both and APP: | |
# Run API in background thread + CLI | |
def run_api(): | |
uvicorn.run(APP, host=args.host, port=args.port, log_level="warning") | |
t = threading.Thread(target=run_api, daemon=True) | |
t.start() | |
run_cli() | |
return | |
# default: CLI only | |
run_cli() | |
if __name__ == "__main__": | |
main() | |