Spaces:
Running
Running
# multimodular_modul version 7.0.py | |
""" | |
Multimodular Module — Multimodal SuperAgent v7.0 | |
Upgrade from v6.0 -> v7.0 (in-place upgrade) | |
Features added: | |
- CHB (Close-to-Human Brain) is universal "middle-man" for all inputs/outputs. | |
- User-driven retrieval plan flow (AI generates queries; client fetches; submits results). | |
- Opportunistic Creative Skill Vault (media & text benchmarks). | |
- Versioned Fact Store (VFS) enhancements: freshness, controversy scoring, provenance. | |
- Real-time Global Brain (facts, skills, media) sync via WebSocket + optional HuggingFace dataset push. | |
- Automatic local backups (JSON + SQLite), downloadable via API endpoint. | |
- Self-upgrading modules: safe, signed modules can be auto-integrated into runtime. | |
* Auto-exec only when cryptographic signature verification + sandbox available (wasmtime or subprocess sandboxes). | |
* If signing or sandbox not present, modules are stored but not auto-executed. | |
- Universal FastAPI endpoints and CLI demo preserved from v6 with additions. | |
Security & Safety: | |
- No CAPTCHA/TLS evasion. Respect robots.txt & user-driven retrieval model. | |
- Self-upgrade requires signature verification (env GLOBAL_SYNC_SIGNING_PUBKEY). | |
- Default: local-only sync. Enable cloud via env variables (HUGGINGFACE_TOKEN, GLOBAL_SYNC_REPO). | |
- Media sync allowed; personal/private data must be filtered before upload. | |
""" | |
from __future__ import annotations | |
import os, sys, json, time, uuid, shutil, tempfile, hashlib, base64, logging | |
from dataclasses import dataclass, field, asdict | |
from typing import Any, Dict, List, Optional, Tuple | |
from pathlib import Path | |
import threading | |
import sqlite3 | |
import zipfile | |
import hmac | |
import hashlib | |
import asyncio | |
# -------------------------- | |
# Optional deps (feature unlocks) | |
# -------------------------- | |
# pip install fastapi uvicorn pydantic requests websockets python-multipart cryptography wasmtime | |
try: | |
import fastapi | |
from fastapi import FastAPI, UploadFile, File, Form | |
from fastapi.responses import FileResponse, JSONResponse | |
from pydantic import BaseModel | |
FASTAPI_AVAILABLE = True | |
except Exception: | |
FASTAPI_AVAILABLE = False | |
try: | |
import requests | |
except Exception: | |
requests = None | |
try: | |
import websockets | |
except Exception: | |
websockets = None | |
try: | |
from cryptography.hazmat.primitives import serialization, hashes | |
from cryptography.hazmat.primitives.asymmetric import padding | |
CRYPTO_AVAILABLE = True | |
except Exception: | |
CRYPTO_AVAILABLE = False | |
try: | |
import wasmtime | |
WASM_AVAILABLE = True | |
except Exception: | |
WASM_AVAILABLE = False | |
# -------------------------- | |
# Logging | |
# -------------------------- | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s: %(message)s") | |
log = logging.getLogger("Multimodular_v7") | |
# -------------------------- | |
# Config & Paths (edit env or constants) | |
# -------------------------- | |
BASE_DIR = Path(os.getenv("MM_BASE_DIR", Path(__file__).parent.resolve())) | |
DATA_DIR = Path(os.getenv("MM_DATA_DIR", BASE_DIR / "mm_data")) | |
BACKUP_DIR = Path(os.getenv("MM_BACKUP_DIR", DATA_DIR / "backups")) | |
TMP_DIR = Path(os.getenv("MM_TMP_DIR", BASE_DIR / "tmp")) | |
CACHE_DIR = Path(os.getenv("MM_CACHE_DIR", BASE_DIR / "cache")) | |
for d in (DATA_DIR, BACKUP_DIR, TMP_DIR, CACHE_DIR): | |
d.mkdir(parents=True, exist_ok=True) | |
# Global sync config | |
GLOBAL_SYNC_ENABLED = os.getenv("MM_GLOBAL_SYNC_ENABLED", "false").lower() in ("1","true","yes") | |
HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN", None) | |
GLOBAL_SYNC_REPO = os.getenv("GLOBAL_SYNC_REPO", None) # e.g., "username/mm_global_brain" | |
GLOBAL_SYNC_SIGNING_PUBKEY = os.getenv("GLOBAL_SYNC_SIGNING_PUBKEY", None) # PEM public key for verifying modules | |
REALTIME_WS_PORT = int(os.getenv("MM_WS_PORT", "8765")) | |
# Auto-upgrade strictness: require signature & sandbox for auto-exec | |
AUTO_UPGRADE_REQUIRE_SIGN = True | |
AUTO_UPGRADE_REQUIRE_SANDBOX = True | |
# Backups | |
BACKUP_RETENTION = int(os.getenv("MM_BACKUP_RETENTION", "30")) # keep last N backups | |
# CHB confidence threshold | |
CHB_MIN_CONFIDENCE = float(os.getenv("CHB_MIN_CONFIDENCE", "0.85")) | |
# -------------------------- | |
# Utilities | |
# -------------------------- | |
def uid(prefix="id"): | |
return f"{prefix}_{uuid.uuid4().hex[:10]}" | |
def now_iso(): | |
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) | |
def sha256_b64(data: bytes) -> str: | |
return base64.urlsafe_b64encode(hashlib.sha256(data).digest()).decode() | |
def write_json(path: Path, data: Any): | |
tmp = path.with_suffix(".tmp") | |
with open(tmp, "w", encoding="utf-8") as f: | |
json.dump(data, f, ensure_ascii=False, indent=2) | |
tmp.replace(path) | |
def read_json(path: Path, default=None): | |
if not path.exists(): | |
return default | |
try: | |
with open(path, "r", encoding="utf-8") as f: | |
return json.load(f) | |
except Exception: | |
return default | |
# -------------------------- | |
# Database: local SQLite wrapper + JSON mirror | |
# -------------------------- | |
class LocalDB: | |
def __init__(self, path: Path): | |
self.path = path | |
self.conn = sqlite3.connect(str(self.path)) | |
self._init() | |
self.lock = threading.Lock() | |
def _init(self): | |
cur = self.conn.cursor() | |
cur.execute("""CREATE TABLE IF NOT EXISTS facts ( | |
id TEXT PRIMARY KEY, | |
claim TEXT, | |
value TEXT, | |
confidence REAL, | |
sources TEXT, | |
first_seen TEXT, | |
last_seen TEXT, | |
controversy REAL, | |
staleness REAL | |
)""") | |
cur.execute("""CREATE TABLE IF NOT EXISTS skills ( | |
id TEXT PRIMARY KEY, | |
tag TEXT, | |
kind TEXT, | |
meta TEXT, | |
score REAL, | |
added_at TEXT, | |
synced INTEGER DEFAULT 0 | |
)""") | |
cur.execute("""CREATE TABLE IF NOT EXISTS modules ( | |
id TEXT PRIMARY KEY, | |
name TEXT, | |
code TEXT, | |
meta TEXT, | |
verified INTEGER DEFAULT 0, | |
autointegrated INTEGER DEFAULT 0, | |
added_at TEXT | |
)""") | |
cur.execute("""CREATE TABLE IF NOT EXISTS backups ( | |
id TEXT PRIMARY KEY, | |
path TEXT, | |
created_at TEXT | |
)""") | |
self.conn.commit() | |
def upsert_fact(self, claim, value, confidence, sources, controversy=0.0, staleness=0.0): | |
fid = sha256_b64(claim.encode())[:32] | |
now = now_iso() | |
with self.lock: | |
cur = self.conn.cursor() | |
cur.execute("SELECT id FROM facts WHERE id=?", (fid,)) | |
if cur.fetchone(): | |
cur.execute("""UPDATE facts SET value=?, confidence=?, sources=?, last_seen=?, controversy=?, staleness=? WHERE id=?""", | |
(value, float(confidence), json.dumps(sources), now, float(controversy), float(staleness), fid)) | |
else: | |
cur.execute("""INSERT INTO facts (id,claim,value,confidence,sources,first_seen,last_seen,controversy,staleness) | |
VALUES (?,?,?,?,?,?,?,?,?)""", | |
(fid, claim, value, float(confidence), json.dumps(sources), now, now, float(controversy), float(staleness))) | |
self.conn.commit() | |
return fid | |
def add_skill(self, tag, kind, meta, score): | |
sid = uid("skill") | |
now = now_iso() | |
with self.lock: | |
self.conn.execute("INSERT INTO skills (id,tag,kind,meta,score,added_at) VALUES (?,?,?,?,?,?)", | |
(sid, tag, kind, json.dumps(meta), float(score), now)) | |
self.conn.commit() | |
return sid | |
def add_module(self, name, code, meta, verified=0, autointegrated=0): | |
mid = uid("mod") | |
now = now_iso() | |
with self.lock: | |
self.conn.execute("INSERT INTO modules (id,name,code,meta,verified,autointegrated,added_at) VALUES (?,?,?,?,?,?,?)", | |
(mid, name, code, json.dumps(meta), int(verified), int(autointegrated), now)) | |
self.conn.commit() | |
return mid | |
def list_facts(self): | |
cur = self.conn.cursor(); cur.execute("SELECT * FROM facts"); rows=cur.fetchall() | |
cols=[c[0] for c in cur.description] | |
return [dict(zip(cols, r)) for r in rows] | |
def list_skills(self): | |
cur = self.conn.cursor(); cur.execute("SELECT * FROM skills"); rows=cur.fetchall() | |
cols=[c[0] for c in cur.description] | |
return [dict(zip(cols, r)) for r in rows] | |
def list_modules(self): | |
cur = self.conn.cursor(); cur.execute("SELECT * FROM modules"); rows=cur.fetchall() | |
cols=[c[0] for c in cur.description] | |
return [dict(zip(cols, r)) for r in rows] | |
def mark_module_verified(self, module_id, verified=1): | |
with self.lock: | |
self.conn.execute("UPDATE modules SET verified=? WHERE id=?", (int(verified), module_id)) | |
self.conn.commit() | |
def mark_module_autointegrated(self, module_id, val=1): | |
with self.lock: | |
self.conn.execute("UPDATE modules SET autointegrated=? WHERE id=?", (int(val), module_id)) | |
self.conn.commit() | |
def add_backup(self, path): | |
bid = uid("bak") | |
now = now_iso() | |
with self.lock: | |
self.conn.execute("INSERT INTO backups (id,path,created_at) VALUES (?,?,?)", (bid, str(path), now)) | |
self.conn.commit() | |
self._prune_backups() | |
return bid | |
def _prune_backups(self): | |
cur = self.conn.cursor(); cur.execute("SELECT id,path,created_at FROM backups ORDER BY created_at DESC") | |
rows = cur.fetchall() | |
if len(rows) <= BACKUP_RETENTION: return | |
for r in rows[BACKUP_RETENTION:]: | |
pid, p, _ = r | |
try: | |
if os.path.exists(p): os.remove(p) | |
except Exception: | |
pass | |
self.conn.execute("DELETE FROM backups WHERE id=?", (pid,)) | |
self.conn.commit() | |
# -------------------------- | |
# VFS and Creative Skill Vault (JSON + SQLite) | |
# -------------------------- | |
class VFS: | |
def __init__(self, db: LocalDB): | |
self.db = db | |
def store_fact(self, claim:str, value:str, sources:List[Dict[str,Any]], confidence:float, controversy:float=0.0): | |
# staleness computed from source dates (simple) | |
staleness = 0.0 | |
for s in (sources or []): | |
dt = s.get("date") | |
if dt: | |
try: | |
# naive parse as ISO; compute days | |
t = time.mktime(time.strptime(dt[:19], "%Y-%m-%dT%H:%M:%S")) | |
age_days = max(0, (time.time() - t)/86400.0) | |
staleness = max(staleness, min(1.0, age_days/365.0)) | |
except Exception: | |
continue | |
fid = self.db.upsert_fact(claim, value, confidence, sources, controversy, staleness) | |
return fid | |
def query(self, q:str): | |
# naive substring search | |
res = self.db.list_facts() | |
qlow = q.lower() | |
return [r for r in res if qlow in (r.get("claim") or "").lower() or qlow in (r.get("value") or "").lower()] | |
class CreativeSkillVault: | |
def __init__(self, db: LocalDB): | |
self.db = db | |
def add_benchmark(self, tag:str, kind:str, meta:Dict[str,Any], score:float): | |
sid = self.db.add_skill(tag, kind, meta, score) | |
return sid | |
def top_by_tag(self, tag:str, k:int=5): | |
all_skills = self.db.list_skills() | |
filtered = [s for s in all_skills if s.get("tag")==tag] | |
filtered.sort(key=lambda x: x.get("score",0), reverse=True) | |
return filtered[:k] | |
# -------------------------- | |
# Global Sync: Hugging Face push & WebSocket real-time (simple) | |
# -------------------------- | |
class GlobalSync: | |
def __init__(self, db: LocalDB, hf_token:Optional[str]=None, repo:Optional[str]=None): | |
self.db = db | |
self.hf_token = hf_token | |
self.repo = repo | |
self.ws_clients = set() | |
self.ws_server_task = None | |
self.loop = None | |
self.lock = threading.Lock() | |
# --- push facts/skills package to Hugging Face dataset via simple HTTP (requires token & repo) | |
def push_to_hf(self, package:Dict[str,Any]) -> Tuple[bool,str]: | |
if not (self.hf_token and self.repo and requests): | |
return False, "huggingface not configured or requests missing" | |
# Minimal implementation: upload JSON file to HF repo via API | |
try: | |
url = f"https://huggingface.co/api/repos/create" | |
# Note: full implementation requires use of hf_hub or dataset APIs; here we do a simple placeholder | |
# We recommend using huggingface_hub library in production. | |
return False, "HF push requires huggingface_hub implementation; configure HF client" | |
except Exception as e: | |
return False, str(e) | |
# --- broadcast to connected WebSocket clients (realtime) | |
async def ws_broadcast(self, message:Dict[str,Any]): | |
if websockets is None: | |
return | |
data = json.dumps(message) | |
clients = list(self.ws_clients) | |
for ws in clients: | |
try: | |
await ws.send(data) | |
except Exception: | |
try: | |
self.ws_clients.remove(ws) | |
except Exception: | |
pass | |
# --- start a simple websocket server to accept other CHBs / clients that want live updates | |
def start_ws_server(self, host="0.0.0.0", port=REALTIME_WS_PORT): | |
if websockets is None: | |
log.warning("websockets library missing; realtime sync disabled") | |
return | |
async def handler(websocket, path): | |
log.info("WS client connected") | |
self.ws_clients.add(websocket) | |
try: | |
async for msg in websocket: | |
# accept 'ping' or 'submit' messages | |
try: | |
data = json.loads(msg) | |
typ = data.get("type") | |
if typ == "submit_skill": | |
payload = data.get("payload") | |
# minimal processing: store skill locally and broadcast | |
tag = payload.get("tag","global") | |
kind = payload.get("kind","image") | |
meta = payload.get("meta",{}) | |
score = float(payload.get("score", 0.5)) | |
self.db.add_skill(tag, kind, meta, score) | |
await self.ws_broadcast({"type":"skill_added","tag":tag,"kind":kind,"meta":meta,"score":score}) | |
except Exception: | |
pass | |
except Exception: | |
pass | |
finally: | |
try: self.ws_clients.remove(websocket) | |
except Exception: pass | |
log.info("WS client disconnected") | |
log.info("Starting WebSocket server on %s:%d", host, port) | |
self.loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(self.loop) | |
start_server = websockets.serve(handler, host, port) | |
self.ws_server_task = self.loop.run_until_complete(start_server) | |
try: | |
self.loop.run_forever() | |
except Exception: | |
pass | |
def run_ws_in_thread(self, host="0.0.0.0", port=REALTIME_WS_PORT): | |
t = threading.Thread(target=self.start_ws_server, args=(host,port), daemon=True) | |
t.start() | |
return t | |
# -------------------------- | |
# ModuleManager: Verify & Sandbox auto-integration of learned modules | |
# -------------------------- | |
class ModuleManager: | |
def __init__(self, db: LocalDB, signing_pubkey_pem:Optional[str]=None): | |
self.db = db | |
self.signing_pubkey_pem = signing_pubkey_pem | |
self.sandbox_available = WASM_AVAILABLE # prefer WASM sandbox if available | |
self.lock = threading.Lock() | |
def verify_signature(self, code: bytes, signature_b64: str) -> bool: | |
if not (CRYPTO_AVAILABLE and self.signing_pubkey_pem): | |
log.warning("Crypto or public key not available, cannot verify signature") | |
return False | |
try: | |
pub = serialization.load_pem_public_key(self.signing_pubkey_pem.encode()) | |
sig = base64.b64decode(signature_b64) | |
pub.verify(sig, code, padding.PKCS1v15(), hashes.SHA256()) | |
return True | |
except Exception as e: | |
log.warning("signature verification failed: %s", e) | |
return False | |
def sandbox_run_wasm(self, wasm_bytes: bytes, func_name: str="run", inputs: Optional[dict]=None, timeout: int=5) -> Tuple[bool,str]: | |
if not WASM_AVAILABLE: | |
return False, "wasm runtime not available" | |
try: | |
# create store & module | |
engine = wasmtime.Engine() | |
module = wasmtime.Module(engine, wasm_bytes) | |
store = wasmtime.Store(engine) | |
instance = wasmtime.Instance(store, module, []) | |
# This is a very conservative pattern — real WASM modules need standard interface; here we just attempt safe run if exposes memory/free functions | |
# For safety, we do not invoke arbitrary functions unless module authors follow the expected interface | |
# We'll attempt to call an exported function named 'run' that returns int | |
if hasattr(instance.exports, func_name): | |
fn = instance.exports.__getattr__(func_name) | |
try: | |
res = fn() | |
return True, f"wasm-run-res:{res}" | |
except Exception as e: | |
return False, f"wasm-run-exc:{e}" | |
else: | |
return False, "wasm module lacks 'run' export" | |
except Exception as e: | |
return False, f"wasm-failed:{e}" | |
def sandbox_run_subprocess(self, code_str: str, timeout: int=5) -> Tuple[bool,str]: | |
# Very limited subprocess sandbox: write file, run in subprocess with restricted env and timeout. | |
# NOTE: this is not fully secure against malicious code. Use real OS-level sandboxing for production. | |
tmp = Path(TMP_DIR) / f"module_{uid()}.py" | |
tmp.write_text(code_str, encoding="utf-8") | |
import subprocess, shlex | |
try: | |
p = subprocess.run([sys.executable, str(tmp)], stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=timeout, check=False) | |
out = p.stdout.decode()[:4000] | |
err = p.stderr.decode()[:2000] | |
return True, out + ("\nERR:\n" + err if err else "") | |
except subprocess.TimeoutExpired: | |
return False, "timeout" | |
except Exception as e: | |
return False, f"exec-error:{e}" | |
finally: | |
try: tmp.unlink() | |
except Exception: pass | |
def integrate_module(self, name: str, code: str, signature_b64: Optional[str]=None, autointegrate: bool=True) -> Dict[str,Any]: | |
# Store module first | |
meta = {"name": name, "signature": bool(signature_b64), "autointegrate": bool(autointegrate)} | |
mid = self.db.add_module(name, code, meta, verified=0, autointegrated=0) | |
# Verify signature if present and required | |
if AUTO_UPGRADE_REQUIRE_SIGN: | |
if not signature_b64 or not self.verify_signature(code.encode(), signature_b64): | |
return {"ok": False, "reason": "signature_missing_or_invalid", "module_id": mid} | |
# Sandbox-run tests | |
ran_ok = False; run_info = None | |
if self.sandbox_available and AUTO_UPGRADE_REQUIRE_SANDBOX: | |
# expect code to be WASM base64 (prefer) or python code string. Detect if code is base64 wasm by heuristic. | |
try: | |
# try decode base64, check for wasm magic | |
raw = base64.b64decode(code) | |
if raw[:4] == b"\x00asm": | |
ok, info = self.sandbox_run_wasm(raw) | |
ran_ok, run_info = ok, info | |
else: | |
# treat as python source | |
ok, info = self.sandbox_run_subprocess(code) | |
ran_ok, run_info = ok, info | |
except Exception as e: | |
ran_ok, run_info = False, f"sandbox-error:{e}" | |
else: | |
# sandbox not available; do not autointegrate (security) | |
ran_ok, run_info = False, "sandbox-not-available" | |
# If sandboxed OK and autointegrate allowed, mark module autointegrated and (optionally) import into runtime | |
if ran_ok and autointegrate: | |
self.db.mark_module_verified(mid, verified=1) | |
# For safety, we will NOT exec arbitrary Python into this process. | |
# Instead, save module to disk as a safe package and mark autointegrated. A separate process can load it. | |
self.db.mark_module_autointegrated(mid, val=1) | |
return {"ok": True, "module_id": mid, "sandbox_result": run_info} | |
else: | |
return {"ok": False, "module_id": mid, "sandbox_result": run_info} | |
# -------------------------- | |
# CHB — Universal middleman (upgrades v6 behaviour) | |
# -------------------------- | |
class CHB: | |
def __init__(self, db: LocalDB, vfs: VFS, csv: CreativeSkillVault, module_mgr: ModuleManager, global_sync: GlobalSync): | |
self.db = db | |
self.vfs = vfs | |
self.csv = csv | |
self.module_mgr = module_mgr | |
self.global_sync = global_sync | |
# lightweight internal state | |
self.min_conf = CHB_MIN_CONFIDENCE | |
def perceive(self, incoming: Dict[str,Any]) -> Dict[str,Any]: | |
# normalize inputs (text,image,audio,video,plan_results) | |
kind = "text" | |
if incoming.get("image") or incoming.get("image_path"): kind="image" | |
if incoming.get("audio") or incoming.get("audio_path"): kind="audio" | |
if incoming.get("video") or incoming.get("video_path"): kind="video" | |
if incoming.get("plan_results"): kind="plan_results" | |
return {"kind": kind, "payload": incoming} | |
def plan(self, text: str) -> Dict[str,Any]: | |
# produce a user-driven retrieval plan | |
queries = [] | |
queries.append({"q": text, "type":"web", "max_results":5}) | |
# image/video heuristics | |
if any(k in text.lower() for k in ["image","design","render","photo","logo","illustration","concept"]): | |
queries.append({"q": text + " high quality concept art", "type":"image", "max_results":8}) | |
if any(k in text.lower() for k in ["video","footage","tour","demo","walkthrough"]): | |
queries.append({"q": text + " video", "type":"video", "max_results":4}) | |
plan = {"id": uid("plan"), "queries": queries, "created_at": now_iso(), "instructions": "Run these queries locally on user's device and return structured results (web/images/videos/audio)."} | |
return plan | |
def verify(self, plan_results: Optional[Dict[str,Any]], local_tool_outputs: Optional[List[Dict[str,Any]]]=None) -> Dict[str,Any]: | |
# compute reliability, controversy, citations | |
citations = [] | |
reliability = 0.4 | |
controversy = 0.0 | |
if plan_results: | |
web = plan_results.get("web") or [] | |
domains = set() | |
for r in web: | |
u = r.get("url") or r.get("link") or "" | |
domains.add(u.split("/")[2] if "/" in u else u) | |
citations.append({"title": r.get("title"), "url": u, "date": r.get("date")}) | |
reliability = min(1.0, 0.2 + 0.1*len(domains)) | |
# controversy: if two top results contradict in short text heuristics | |
if len(web) >= 2: | |
s0 = web[0].get("snippet","").lower() | |
s1 = web[1].get("snippet","").lower() | |
if any(w in s1 for w in ["not", "contradict", "dispute"]) or any(w in s0 for w in ["not","contradict","dispute"]): | |
controversy = 0.5 | |
# tool outputs contribution | |
tool_bonus = 0.0 | |
for t in (local_tool_outputs or []): | |
if t.get("tool") == "image" and t.get("ok"): tool_bonus += 0.2 | |
if t.get("tool") == "math" and not t.get("result","").lower().startswith("math error"): tool_bonus += 0.2 | |
confidence = min(1.0, reliability*0.6 + tool_bonus) | |
return {"confidence": confidence, "reliability": reliability, "controversy": controversy, "citations": citations} | |
def opportunistic_learning(self, plan_results: Dict[str,Any]): | |
# extract images/videos/audio and store as skill benchmarks if quality high | |
images = plan_results.get("images", []) or [] | |
for im in images: | |
path = im.get("path") or im.get("url") | |
# naive quality score | |
score = float(im.get("quality_score", 0.6)) | |
tag = im.get("tags",[ "web" ])[0] if im.get("tags") else "web" | |
meta = {"source": path, "desc": im.get("caption") or im.get("alt") or "", "origin":"user_client"} | |
self.csv.add_benchmark(tag, "image", meta, score) | |
videos = plan_results.get("videos", []) or [] | |
for v in videos: | |
path = v.get("path") or v.get("url") | |
score = float(v.get("quality_score", 0.6)) | |
tag = v.get("tags",[ "web" ])[0] if v.get("tags") else "web" | |
meta = {"source": path, "desc": v.get("caption") or "", "origin":"user_client"} | |
self.csv.add_benchmark(tag, "video", meta, score) | |
audios = plan_results.get("audios", []) or [] | |
for a in audios: | |
path = a.get("path") or a.get("url") | |
score = float(a.get("quality_score", 0.6)) | |
tag = a.get("tags",[ "web" ])[0] if a.get("tags") else "web" | |
meta = {"source": path, "desc": a.get("caption") or "", "origin":"user_client"} | |
self.csv.add_benchmark(tag, "audio", meta, score) | |
# push to global sync immediately | |
if GLOBAL_SYNC_ENABLED: | |
payload = {"type":"skill_update", "time": now_iso(), "added": len(images)+len(videos)+len(audios)} | |
# best-effort: broadcast via websocket | |
if self.global_sync: | |
loop = asyncio.new_event_loop() | |
try: | |
loop.run_until_complete(self.global_sync.ws_broadcast(payload)) | |
except Exception: | |
pass | |
def handle_plan_results(self, plan_id: str, plan_results: Dict[str,Any], local_tool_outputs: Optional[List[Dict[str,Any]]]=None): | |
# verify, opportunistic learn, store facts in VFS | |
v = self.verify(plan_results, local_tool_outputs) | |
# store simple fact example: top web title as a fact | |
web = plan_results.get("web", []) or [] | |
if web: | |
top = web[0] | |
claim = top.get("title","").strip() | |
value = top.get("snippet","").strip() | |
sources = [{"url": top.get("url") or top.get("link"), "title": top.get("title"), "date": top.get("date")}] | |
self.vfs.store_fact(claim, value, sources, float(v.get("confidence",0.4)), controversy=v.get("controversy",0.0)) | |
# opportunistic learning | |
self.opportunistic_learning(plan_results) | |
return v | |
# -------------------------- | |
# Auto-backup & Export | |
# -------------------------- | |
class BackupManager: | |
def __init__(self, db: LocalDB, data_dir: Path, backup_dir: Path): | |
self.db = db | |
self.data_dir = data_dir | |
self.backup_dir = backup_dir | |
self.lock = threading.Lock() | |
def create_backup(self) -> str: | |
with self.lock: | |
ts = now_iso().replace(":", "-") | |
out_path = self.backup_dir / f"mm_backup_{ts}.zip" | |
with zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED) as zf: | |
# include SQLite DB file if exists | |
try: | |
dbf = Path(self.db.path) | |
if dbf.exists(): | |
zf.write(str(dbf), arcname=dbf.name) | |
except Exception: | |
pass | |
# include JSON DB mirrors (facts/skills/modules) | |
# try to export via DB list functions | |
try: | |
facts = self.db.list_facts() | |
skills = self.db.list_skills() | |
mods = self.db.list_modules() | |
zf.writestr("facts.json", json.dumps(facts, ensure_ascii=False, indent=2)) | |
zf.writestr("skills.json", json.dumps(skills, ensure_ascii=False, indent=2)) | |
zf.writestr("modules.json", json.dumps(mods, ensure_ascii=False, indent=2)) | |
except Exception: | |
pass | |
bid = self.db.add_backup(str(out_path)) | |
return str(out_path) | |
def download_backup_path(self) -> Optional[str]: | |
# return latest backup path | |
cur = self.db.conn.cursor() | |
cur.execute("SELECT id,path,created_at FROM backups ORDER BY created_at DESC LIMIT 1") | |
r = cur.fetchone() | |
if not r: return None | |
return r[1] | |
# -------------------------- | |
# Main Orchestrator (upgrades v6.SuperAgent) | |
# -------------------------- | |
class SuperAgentV7: | |
def __init__(self): | |
self.db_path = DATA_DIR / "multimodular_v7.db" | |
self.db = LocalDB(self.db_path) | |
self.vfs = VFS(self.db) | |
self.csv = CreativeSkillVault(self.db) | |
self.global_sync = GlobalSync(self.db, hf_token=HUGGINGFACE_TOKEN, repo=GLOBAL_SYNC_REPO) | |
# module manager uses GLOBAL_SYNC_SIGNING_PUBKEY if present | |
self.module_mgr = ModuleManager(self.db, signing_pubkey_pem=GLOBAL_SYNC_SIGNING_PUBKEY) | |
self.chb = CHB(self.db, self.vfs, self.csv, self.module_mgr, self.global_sync) | |
self.backup_mgr = BackupManager(self.db, DATA_DIR, BACKUP_DIR) | |
# start realtime ws server in background for incoming updates | |
if websockets is not None: | |
try: | |
self.global_sync.run_ws_in_thread() | |
except Exception as e: | |
log.warning("ws server thread failed: %s", e) | |
# --- user-driven search plan creation (CHB.plan) | |
def plan_search(self, text: str) -> Dict[str,Any]: | |
return self.chb.plan(text) | |
# --- client submits results | |
def submit_plan_results(self, plan_id: str, results: Dict[str,Any]) -> Dict[str,Any]: | |
# Accept results; pass to CHB for verify & learning | |
v = self.chb.handle_plan_results(plan_id, results) | |
# create backup after major update | |
try: | |
bp = self.backup_mgr.create_backup() | |
log.info("backup created: %s", bp) | |
except Exception: | |
log.exception("backup failed") | |
# push to global sync (best-effort) | |
if GLOBAL_SYNC_ENABLED: | |
try: | |
payload = {"type":"fact_skill_update", "ts": now_iso(), "payload": {"summary":"update","plan_id": plan_id}} | |
# attempt HF push or WS broadcast | |
# HF push requires implementation using huggingface_hub; we leave a placeholder | |
asyncio.run(self.global_sync.ws_broadcast(payload)) | |
except Exception: | |
pass | |
return {"ok": True, "verify": v} | |
# --- expose facts/skills | |
def search_facts(self, q: str): | |
return self.vfs.query(q) | |
def top_skills(self, tag: str, k: int=5): | |
return self.csv.top_by_tag(tag, k) | |
def add_module(self, name: str, code: str, signature_b64: Optional[str]=None, autointegrate: bool=True): | |
return self.module_mgr.integrate_module(name, code, signature_b64, autointegrate) | |
def download_latest_backup(self) -> Optional[str]: | |
return self.backup_mgr.download_backup_path() | |
# direct chat route that funnels through CHB | |
def chat(self, text: str, plan_results: Optional[Dict[str,Any]]=None): | |
# For direct chat, if user provided plan_results (client retrieval), handle them | |
if plan_results: | |
v = self.chb.handle_plan_results(uid("plan"), plan_results) | |
return {"status":"ok", "verify": v} | |
# Simple path: CHB will create plan if needed and answer (we re-use simple plan + verify) | |
plan = self.chb.plan(text) | |
# no client retrieval performed: CHB can still reply with local knowledge (fallback) | |
# For v7 we simply return plan and ask client to run it OR CHB will attempt local infer (fallback) | |
return {"status":"ok", "plan": plan, "hint": "Run the plan on client and submit results via submit_plan_results"} | |
# -------------------------- | |
# FastAPI endpoints (added/extended) | |
# -------------------------- | |
if FASTAPI_AVAILABLE: | |
app = FastAPI(title="Multimodular SuperAgent v7.0") | |
AGENT = SuperAgentV7() | |
class PlanIn(BaseModel): | |
text: str | |
async def api_plan_search(inp: PlanIn): | |
plan = AGENT.plan_search(inp.text) | |
return {"ok": True, "plan": plan} | |
async def api_submit_results(plan_id: str = Form(...), results: str = Form(...)): | |
try: | |
payload = json.loads(results) | |
except Exception: | |
return JSONResponse({"ok": False, "error": "invalid_json"}, status_code=400) | |
out = AGENT.submit_plan_results(plan_id, payload) | |
return out | |
async def api_facts_search(q: str = Form(...)): | |
res = AGENT.search_facts(q) | |
return {"ok": True, "results": res} | |
async def api_skills_top(tag: str = Form(...), k: int = Form(5)): | |
res = AGENT.top_skills(tag, k) | |
return {"ok": True, "results": res} | |
async def api_module_add(name: str = Form(...), code: str = Form(...), signature_b64: Optional[str] = Form(None)): | |
out = AGENT.add_module(name, code, signature_b64) | |
return out | |
async def api_backup_download(): | |
p = AGENT.download_latest_backup() | |
if not p or not os.path.exists(p): | |
return JSONResponse({"ok": False, "error": "no_backup"}, status_code=404) | |
return FileResponse(p, media_type="application/zip", filename=os.path.basename(p)) | |
async def api_chat(text: str = Form(...), plan_results: Optional[str] = Form(None)): | |
if plan_results: | |
try: | |
pr = json.loads(plan_results) | |
except Exception: | |
return JSONResponse({"ok": False, "error": "invalid_plan_results"}, status_code=400) | |
out = AGENT.chat(text, pr) | |
else: | |
out = AGENT.chat(text, None) | |
return out | |
else: | |
app = None | |
AGENT = SuperAgentV7() | |
# -------------------------- | |
# CLI demo & execution | |
# -------------------------- | |
def demo_run(): | |
print("Multimodular SuperAgent v7.0 - Demo") | |
a = AGENT | |
# 1) plan search | |
plan = a.plan_search("futuristic electric motorcycle neon blue lights battery tech 2025") | |
print("Plan:", plan) | |
# Simulate client retrieval | |
sample_results = { | |
"web": [ | |
{"title":"Solid-state battery prototype reaches 500 Wh/kg", "url":"https://example.org/article", "snippet":"Researchers at X report 500 Wh/kg...", "date": now_iso()} | |
], | |
"images":[ | |
{"path": str(BASE_DIR / "demo_motorcycle.png"), "quality_score": 0.92, "caption":"Futuristic motorcycle concept", "tags":["motorcycle","futuristic"]} | |
], | |
"videos": [], | |
"audios": [] | |
} | |
# ensure demo image exists | |
try: | |
from PIL import Image, ImageDraw | |
img = Image.new("RGB", (640,480), (20,20,30)) | |
d = ImageDraw.Draw(img); d.text((20,20), "Demo motorcycle", fill=(200,200,255)) | |
img.save(BASE_DIR / "demo_motorcycle.png") | |
except Exception: | |
pass | |
res = a.submit_plan_results(plan["id"], sample_results) | |
print("Submit results ->", res) | |
# show facts | |
facts = a.search_facts("solid-state") | |
print("Facts:", facts) | |
# download backup | |
path = a.download_latest_backup() | |
print("Backup path:", path) | |
if __name__ == "__main__": | |
import argparse | |
ap = argparse.ArgumentParser() | |
ap.add_argument("--demo", action="store_true") | |
ap.add_argument("--runserver", action="store_true") | |
ap.add_argument("--port", type=int, default=8000) | |
args = ap.parse_args() | |
if args.demo: | |
demo_run() | |
elif args.runserver and FASTAPI_AVAILABLE: | |
import uvicorn | |
uvicorn.run("multimodular_modul version 7.0:app", host="0.0.0.0", port=args.port, reload=False) | |
else: | |
print("Run with --demo or --runserver. FASTAPI available:", FASTAPI_AVAILABLE) | |