Node-Brain-BU3 / tools /orchestrator.py
broadfield-dev's picture
Update tools/orchestrator.py
750febc verified
raw
history blame
10.2 kB
import os
import json
import re
import logging
import time
from huggingface_hub import whoami
from model_logic import call_model_stream, MODELS_BY_PROVIDER, get_default_model_display_name_for_provider
from memory_logic import retrieve_memories_semantic, retrieve_rules_semantic
from tools.websearch import search_and_scrape_duckduckgo, scrape_url
from tools.space_builder import create_huggingface_space, update_huggingface_space_file, list_huggingface_space_files, get_huggingface_space_file_content
import prompts
from utils import format_insights_for_prompt
logger = logging.getLogger(__name__)
WEB_SEARCH_ENABLED = os.getenv("WEB_SEARCH_ENABLED", "true").lower() == "true"
TOOL_DECISION_PROVIDER = os.getenv("TOOL_DECISION_PROVIDER", "groq")
TOOL_DECISION_MODEL_ID = os.getenv("TOOL_DECISION_MODEL", "llama3-8b-8192")
MAX_HISTORY_TURNS = int(os.getenv("MAX_HISTORY_TURNS", 7))
def decide_on_tool(user_input: str, chat_history_for_prompt: list, initial_insights_ctx_str: str):
user_input_lower = user_input.lower()
if "http://" in user_input or "https://" in user_input:
url_match = re.search(r'(https?://[^\s]+)', user_input)
if url_match:
return "scrape_url_and_report", {"url": url_match.group(1)}
tool_trigger_keywords = ["search", "build", "create", "make", "update", "modify", "change", "fix", "list", "show", "files", "get", "read", "edit"]
if len(user_input.split()) > 3 or "?" in user_input or any(w in user_input_lower for w in tool_trigger_keywords):
history_snippet = "\n".join([f"{msg['role']}: {msg['content'][:100]}" for msg in chat_history_for_prompt[-2:]])
guideline_snippet = initial_insights_ctx_str[:200].replace('\n', ' ')
tool_user_prompt = prompts.get_tool_user_prompt(user_input, history_snippet, guideline_snippet)
tool_decision_messages = [{"role": "system", "content": prompts.TOOL_SYSTEM_PROMPT}, {"role": "user", "content": tool_user_prompt}]
tool_model_display = next((dn for dn, mid in MODELS_BY_PROVIDER.get(TOOL_DECISION_PROVIDER.lower(), {}).get("models", {}).items() if mid == TOOL_DECISION_MODEL_ID), None) or get_default_model_display_name_for_provider(TOOL_DECISION_PROVIDER)
if tool_model_display:
try:
tool_resp_raw = "".join(list(call_model_stream(provider=TOOL_DECISION_PROVIDER, model_display_name=tool_model_display, messages=tool_decision_messages, temperature=0.0, max_tokens=2048)))
json_match_tool = re.search(r"\{.*\}", tool_resp_raw, re.DOTALL)
if json_match_tool:
action_data = json.loads(json_match_tool.group(0))
return action_data.get("action", "quick_respond"), action_data.get("action_input", {})
except Exception as e:
logger.error(f"Tool decision LLM error: {e}")
return "quick_respond", {}
def orchestrate_and_respond(user_input: str, provider_name: str, model_display_name: str, chat_history_for_prompt: list[dict], custom_system_prompt: str = None, ui_api_key_override: str = None):
request_id = os.urandom(4).hex()
logger.info(f"ORCHESTRATOR [{request_id}] Start. User: '{user_input[:50]}...'")
history_str_for_prompt = "\n".join([f"{t['role']}: {t['content']}" for t in chat_history_for_prompt])
yield "status", "[Checking guidelines...]"
initial_insights = retrieve_rules_semantic(f"{user_input}\n{history_str_for_prompt}", k=5)
initial_insights_ctx_str, parsed_initial_insights_list = format_insights_for_prompt(initial_insights)
yield "status", "[Choosing best approach...]"
action_type, action_input = decide_on_tool(user_input, chat_history_for_prompt, initial_insights_ctx_str)
logger.info(f"ORCHESTRATOR [{request_id}]: Tool Decision: Action='{action_type}', Input='{action_input}'")
yield "status", f"[Path: {action_type}]"
final_system_prompt = custom_system_prompt or prompts.DEFAULT_SYSTEM_PROMPT
context_str = None
if action_type == "create_huggingface_space":
params = ["space_name", "sdk", "description"]
if all(p in action_input for p in params):
try:
hf_token = os.getenv("HF_TOKEN")
if not hf_token: raise ValueError("Hugging Face token (HF_TOKEN) not found.")
yield "status", "[Tool: Verifying user identity...]"
user_info = whoami(token=hf_token)
owner = user_info.get("name")
if not owner: raise ValueError("Could not determine owner from HF token.")
action_input["owner"] = owner
yield "status", "[Tool: Generating space content...]"
space_gen_messages = [
{"role": "system", "content": prompts.SPACE_GENERATION_SYSTEM_PROMPT},
{"role": "user", "content": prompts.get_space_generation_user_prompt(**action_input)}
]
markdown_content = "".join(list(call_model_stream(provider_name, model_display_name, space_gen_messages, ui_api_key_override, 0.1, 4096)))
yield "status", "[Tool: Creating Space...]"
result = create_huggingface_space(
owner=owner,
space_name=action_input["space_name"],
sdk=action_input["sdk"],
markdown_content=markdown_content.strip()
)
context_str = f"Tool Result (Create Space): {result.get('result') or result.get('error', 'Unknown outcome')}"
except Exception as e:
context_str = f"Tool Failed: An error occurred during space creation process - {e}"
else:
context_str = "Tool Failed: Missing parameters for create_huggingface_space. Required: " + ", ".join(params)
elif action_type == "list_huggingface_space_files":
params = ["owner", "space_name"]
if all(p in action_input for p in params):
yield "status", "[Tool: Listing files...]"
result = list_huggingface_space_files(**action_input)
if "error" in result:
context_str = f"Tool Result (List Files): Error - {result['error']}"
else:
files_list = result.get("files", [])
files_str = "\n- ".join(files_list) if files_list else "No files found."
context_str = f"Tool Result (List Files):\nStatus: {result.get('status', 'OK')}\nFiles:\n- {files_str}"
else:
context_str = "Tool Failed: Missing parameters for list_huggingface_space_files. Required: " + ", ".join(params)
elif action_type == "get_huggingface_space_file_content":
params = ["owner", "space_name", "file_path"]
if all(p in action_input for p in params):
yield "status", "[Tool: Reading file content...]"
result = get_huggingface_space_file_content(**action_input)
if "error" in result:
context_str = f"Tool Result (Get Content): Error - {result['error']}"
else:
context_str = f"Tool Result (Get Content for '{action_input['file_path']}'):\nStatus: {result.get('status', 'OK')}\n```\n{result.get('content', '')}\n```"
else:
context_str = "Tool Failed: Missing parameters for get_huggingface_space_file_content. Required: " + ", ".join(params)
elif action_type == "update_huggingface_space_file":
params = ["owner", "space_name", "file_path", "new_content", "commit_message"]
if all(p in action_input for p in params):
yield "status", "[Tool: Updating file...]"
result = update_huggingface_space_file(**action_input)
context_str = f"Tool Result (Update File): {result.get('result') or result.get('error', 'Unknown outcome')}"
else:
context_str = "Tool Failed: Missing parameters for update_huggingface_space_file. Required: " + ", ".join(params)
elif action_type == "search_duckduckgo_and_report" and WEB_SEARCH_ENABLED:
query = action_input.get("search_engine_query")
if query:
yield "status", f"[Web: '{query[:60]}'...]"
results = search_and_scrape_duckduckgo(query, num_results=2)
context_str = "Web Content:\n" + "\n".join([f"Source {i+1} ({r.get('url','N/A')}):\n{r.get('content', r.get('error', 'N/A'))[:3000]}\n---" for i, r in enumerate(results)])
elif action_type == "scrape_url_and_report" and WEB_SEARCH_ENABLED:
url = action_input.get("url")
if url:
yield "status", f"[Web: '{url[:60]}'...]"
result = scrape_url(url)
context_str = f"Web Content for {url}:\n{result.get('content', result.get('error', 'No content scraped.'))}"
elif action_type == "answer_using_conversation_memory":
yield "status", "[Searching conversation memory...]"
mems = retrieve_memories_semantic(f"User query: {user_input}\nContext:\n{history_str_for_prompt[-1000:]}", k=2)
context_str = "Relevant Past Interactions:\n" + "\n".join([f"- User:{m.get('user_input','')}->AI:{m.get('bot_response','')} (Takeaway:{m.get('metrics',{}).get('takeaway','N/A')})" for m in mems]) if mems else "No relevant past interactions found."
final_user_prompt = prompts.get_final_response_prompt(history_str_for_prompt, initial_insights_ctx_str, user_input, context_str)
final_llm_messages = [{"role": "system", "content": final_system_prompt}, {"role": "user", "content": final_user_prompt}]
streamed_response = ""
try:
for chunk in call_model_stream(provider_name, model_display_name, final_llm_messages, ui_api_key_override, max_tokens=4096):
streamed_response += chunk
yield "response_chunk", chunk
except Exception as e:
streamed_response += f"\n\n(Error: {e})"; yield "response_chunk", f"\n\n(Error: {e})"
final_bot_text = streamed_response.strip() or "(No response)"
logger.info(f"ORCHESTRATOR [{request_id}]: Finished. Response length: {len(final_bot_text)}")
yield "final_response_and_insights", {"response": final_bot_text, "insights_used": parsed_initial_insights_list}