Spaces:
Runtime error
Runtime error
File size: 8,847 Bytes
9161e19 7e165c0 9161e19 7e165c0 9161e19 7e165c0 9161e19 7e165c0 9161e19 7e165c0 9161e19 7e165c0 9161e19 7e165c0 9161e19 7e165c0 9161e19 7e165c0 9161e19 7e165c0 44cb78f 7e165c0 44cb78f 7e165c0 9161e19 7e165c0 9161e19 7e165c0 9161e19 7e165c0 9161e19 7e165c0 9161e19 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import os
import json
import re
import logging
import time
from model_logic import call_model_stream, MODELS_BY_PROVIDER, get_default_model_display_name_for_provider
from memory_logic import retrieve_memories_semantic, retrieve_rules_semantic
from tools.websearch import search_and_scrape_duckduckgo, scrape_url
from tools.space_builder import create_huggingface_space, update_huggingface_space_file
import prompts
from utils import format_insights_for_prompt
logger = logging.getLogger(__name__)
WEB_SEARCH_ENABLED = os.getenv("WEB_SEARCH_ENABLED", "true").lower() == "true"
TOOL_DECISION_PROVIDER = os.getenv("TOOL_DECISION_PROVIDER", "groq")
TOOL_DECISION_MODEL_ID = os.getenv("TOOL_DECISION_MODEL", "llama3-8b-8192")
MAX_HISTORY_TURNS = int(os.getenv("MAX_HISTORY_TURNS", 7))
def decide_on_tool(user_input: str, chat_history_for_prompt: list, initial_insights_ctx_str: str):
user_input_lower = user_input.lower()
if "http://" in user_input or "https://" in user_input:
url_match = re.search(r'(https?://[^\s]+)', user_input)
if url_match:
return "scrape_url_and_report", {"url": url_match.group(1)}
tool_trigger_keywords = ["what is", "how to", "explain", "search for", "build", "create", "make", "update", "modify", "change", "fix"]
if len(user_input.split()) > 3 or "?" in user_input or any(w in user_input_lower for w in tool_trigger_keywords):
history_snippet = "\n".join([f"{msg['role']}: {msg['content'][:100]}" for msg in chat_history_for_prompt[-2:]])
guideline_snippet = initial_insights_ctx_str[:200].replace('\n', ' ')
tool_user_prompt = prompts.get_tool_user_prompt(user_input, history_snippet, guideline_snippet)
tool_decision_messages = [{"role": "system", "content": prompts.TOOL_SYSTEM_PROMPT}, {"role": "user", "content": tool_user_prompt}]
tool_model_display = next((dn for dn, mid in MODELS_BY_PROVIDER.get(TOOL_DECISION_PROVIDER.lower(), {}).get("models", {}).items() if mid == TOOL_DECISION_MODEL_ID), None) or get_default_model_display_name_for_provider(TOOL_DECISION_PROVIDER)
if tool_model_display:
try:
tool_resp_raw = "".join(list(call_model_stream(provider=TOOL_DECISION_PROVIDER, model_display_name=tool_model_display, messages=tool_decision_messages, temperature=0.0, max_tokens=2048)))
json_match_tool = re.search(r"\{.*\}", tool_resp_raw, re.DOTALL)
if json_match_tool:
action_data = json.loads(json_match_tool.group(0))
action_type = action_data.get("action", "quick_respond")
action_input = action_data.get("action_input", {})
if not isinstance(action_input, dict):
action_input = {}
return action_type, action_input
except Exception as e:
logger.error(f"Tool decision LLM error: {e}")
return "quick_respond", {}
def orchestrate_and_respond(user_input: str, provider_name: str, model_display_name: str, chat_history_for_prompt: list[dict], custom_system_prompt: str = None, ui_api_key_override: str = None):
request_id = os.urandom(4).hex()
logger.info(f"ORCHESTRATOR [{request_id}] Start. User: '{user_input[:50]}...'")
history_str_for_prompt = "\n".join([f"{t['role']}: {t['content']}" for t in chat_history_for_prompt])
yield "status", "[Checking guidelines...]"
initial_insights = retrieve_rules_semantic(f"{user_input}\n{history_str_for_prompt}", k=5)
initial_insights_ctx_str, parsed_initial_insights_list = format_insights_for_prompt(initial_insights)
yield "status", "[Choosing best approach...]"
action_type, action_input = decide_on_tool(user_input, chat_history_for_prompt, initial_insights_ctx_str)
logger.info(f"ORCHESTRATOR [{request_id}]: Tool Decision: Action='{action_type}', Input='{action_input}'")
yield "status", f"[Path: {action_type}]"
final_system_prompt = custom_system_prompt or prompts.DEFAULT_SYSTEM_PROMPT
context_str = None
if action_type == "create_huggingface_space":
params = ["owner", "space_name", "sdk", "description"]
if all(p in action_input for p in params):
yield "status", "[Tool: Generating space content...]"
description = action_input["description"]
owner = action_input["owner"]
space_name = action_input["space_name"]
sdk = action_input["sdk"]
space_gen_user_prompt = prompts.get_space_generation_user_prompt(description, owner, space_name)
space_gen_messages = [
{"role": "system", "content": prompts.SPACE_GENERATION_SYSTEM_PROMPT},
{"role": "user", "content": space_gen_user_prompt}
]
markdown_content = "".join(list(call_model_stream(
provider=provider_name, model_display_name=model_display_name,
messages=space_gen_messages, api_key_override=ui_api_key_override,
temperature=0.1, max_tokens=4096
)))
yield "status", "[Tool: Creating Space with generated content...]"
result = create_huggingface_space(
owner=owner, space_name=space_name, sdk=sdk,
markdown_content=markdown_content.strip()
)
context_str = f"Tool Result (Create Space): {result.get('result') or result.get('error', 'Unknown outcome from tool.')}"
else:
context_str = "Tool Failed: Missing required parameters for create_huggingface_space. Required: " + ", ".join(params)
elif action_type == "update_huggingface_space_file":
yield "status", "[Tool: Updating file...]"
params = ["owner", "space_name", "file_path", "new_content", "commit_message"]
if all(p in action_input for p in params):
result = update_huggingface_space_file(**action_input)
context_str = f"Tool Result (Update File): {result.get('result') or result.get('error', 'Unknown outcome from tool.')}"
else:
context_str = "Tool Failed: Missing required parameters for update_huggingface_space_file. Required: " + ", ".join(params)
elif action_type == "search_duckduckgo_and_report" and WEB_SEARCH_ENABLED:
query = action_input.get("search_engine_query")
if query:
yield "status", f"[Web: '{query[:60]}'...]"
results = search_and_scrape_duckduckgo(query, num_results=2)
context_str = "Web Content:\n" + "\n".join([f"Source {i+1} ({r.get('url','N/A')}):\n{r.get('content', r.get('error', 'N/A'))[:3000]}\n---" for i, r in enumerate(results)])
else:
context_str = "Tool Failed: Missing 'search_engine_query' for web search."
elif action_type == "scrape_url_and_report" and WEB_SEARCH_ENABLED:
url = action_input.get("url")
if url:
yield "status", f"[Web: '{url[:60]}'...]"
result = scrape_url(url)
context_str = f"Web Content for {url}:\n{result.get('content', result.get('error', 'No content scraped.'))}"
else:
context_str = "Tool Failed: Missing 'url' for scraping."
elif action_type == "answer_using_conversation_memory":
yield "status", "[Searching conversation memory...]"
mems = retrieve_memories_semantic(f"User query: {user_input}\nContext:\n{history_str_for_prompt[-1000:]}", k=2)
context_str = "Relevant Past Interactions:\n" + "\n".join([f"- User:{m.get('user_input','')}->AI:{m.get('bot_response','')} (Takeaway:{m.get('metrics',{}).get('takeaway','N/A')})" for m in mems]) if mems else "No relevant past interactions found."
final_user_prompt = prompts.get_final_response_prompt(history_str_for_prompt, initial_insights_ctx_str, user_input, context_str)
final_llm_messages = [{"role": "system", "content": final_system_prompt}, {"role": "user", "content": final_user_prompt}]
streamed_response = ""
try:
for chunk in call_model_stream(provider_name, model_display_name, final_llm_messages, ui_api_key_override, max_tokens=4096):
if isinstance(chunk, str) and chunk.startswith("Error:"):
streamed_response += f"\n{chunk}\n"; yield "response_chunk", f"\n{chunk}\n"; break
streamed_response += chunk
yield "response_chunk", chunk
except Exception as e:
streamed_response += f"\n\n(Error: {e})"; yield "response_chunk", f"\n\n(Error: {e})"
final_bot_text = streamed_response.strip() or "(No response)"
logger.info(f"ORCHESTRATOR [{request_id}]: Finished. Response length: {len(final_bot_text)}")
yield "final_response_and_insights", {"response": final_bot_text, "insights_used": parsed_initial_insights_list} |