# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import json # Import JSON module to parse and handle JSON data import uuid # Import UUID module to generate unique identifiers for sessions from typing import List, Dict, Any # Import type hints for lists, dictionaries, and generic types from datetime import datetime # Import datetime to get and format current date/time from config import * # Import all configuration variables including 'auth' and 'restrictions' from src.utils.session_mapping import get_host # Import function to get server info by session ID from src.utils.ip_generator import generate_ip # Import function to generate random IP for headers from src.utils.helper import mark # Import function to mark a server as busy/unavailable from src.ui.reasoning import styles # Import function to apply CSS styling to reasoning output import httpx # Import httpx for async HTTP requests with streaming support async def jarvis( session_id: str, # Unique session identifier to maintain consistent server assignment model: str, # AI model name to specify which model to use history: List[Dict[str, str]], # List of previous conversation messages with roles and content user_message: str, # Latest user input message to send to the AI model mode: str, # Mode string to guide AI behavior, e.g., '/think' or '/no_think' files=None, # Optional files or attachments to include with the user message temperature: float = 0.6, # Sampling temperature controlling randomness in token generation top_k: int = 20, # Limit token selection to top_k probable tokens min_p: float = 0, # Minimum probability threshold for token selection top_p: float = 0.95, # Nucleus sampling cumulative probability threshold repetition_penalty: float = 1, # Penalty factor to reduce token repetition ): """ Asynchronously send a chat request to a Jarvis AI server and handle streaming response incrementally. This function manages server selection based on the session ID, retries requests on specific error codes, and yields incremental parts of the AI-generated response as they arrive. It integrates CSS styling into the reasoning output only if the mode is not '/no_think', preserving the behavior where reasoning is streamed first inside a styled HTML block, followed by the main content streamed normally. Args: session_id (str): Identifier for the user session to maintain consistent server assignment. model (str): Name of the AI model to use for generating the response. history (List[Dict[str, str]]): List of previous messages in the conversation. user_message (str): The current message from the user to send to the AI model. mode (str): Contextual instructions to guide the AI model's response style. files (optional): Additional files or attachments to include with the user message. temperature (float): Controls randomness in token generation. top_k (int): Limits token selection to top_k probable tokens. min_p (float): Minimum probability threshold for token selection. top_p (float): Nucleus sampling cumulative probability threshold. repetition_penalty (float): Factor to reduce token repetition. Yields: str: Incremental strings of AI-generated response streamed from the server. Reasoning is wrapped in a styled HTML details block and streamed incrementally only if mode is not '/no_think'. After reasoning finishes, the main content is streamed normally. Notes: The function attempts to send the request to a server assigned for the session. If the server returns a specific error code indicating it is busy, it retries with another server. If all servers are busy or fail, it yields a message indicating the server is busy. """ tried = set() # Track servers already tried to avoid repeated retries # Loop until all available servers have been tried without success while len(tried) < len(auth): # Get server setup info assigned for this session, including endpoint, token, and error code setup = get_host(session_id) server = setup["jarvis"] # Server identifier host = setup["endpoint"] # API endpoint URL token = setup["token"] # Authorization token error = setup["error"] # HTTP error code triggering retry tried.add(server) # Mark this server as tried # Format current date/time string for system instructions date = datetime.now().strftime("%A, %B %d, %Y, %I:%M %p %Z") # Combine mode instructions, usage restrictions, and date into system instructions string instructions = f"{mode}\n\n\n{restrictions}\n\n\nToday: {date}\n\n\n" # Copy conversation history to avoid mutating original messages = history.copy() # Insert system instructions as first message messages.insert(0, {"role": "system", "content": instructions}) # Prepare user message dict, include files if provided msg = {"role": "user", "content": user_message} if files: msg["files"] = files messages.append(msg) # Append user message to conversation # Prepare HTTP headers with authorization and randomized client IP headers = { "Authorization": f"Bearer {token}", # Bearer token for API access "Content-Type": "application/json", # JSON content type "X-Forwarded-For": generate_ip() # Random IP to simulate different client origins } # Prepare JSON payload with model parameters and conversation messages payload = { "model": model, "messages": messages, "stream": True, "temperature": temperature, "top_k": top_k, "min_p": min_p, "top_p": top_p, "repetition_penalty": repetition_penalty, } # Initialize accumulators and flags for streamed response parts reasoning = "" # Accumulate reasoning text reasoning_check = None # Flag to detect presence of reasoning in response reasoning_done = False # Flag marking reasoning completion content = "" # Accumulate main content text try: # Create async HTTP client with no timeout for long streaming async with httpx.AsyncClient(timeout=None) as client: # Open async streaming POST request to Jarvis server async with client.stream("POST", host, headers=headers, json=payload) as response: # Iterate asynchronously over each line of streaming response async for chunk in response.aiter_lines(): # Skip lines not starting with "data:" if not chunk.strip().startswith("data:"): continue try: # Parse JSON data after "data:" prefix data = json.loads(chunk[5:]) # Extract incremental delta message from first choice choice = data["choices"][0]["delta"] # On first delta received, detect if 'reasoning' field is present and non-empty if reasoning_check is None: # Initialize reasoning_check to empty string if reasoning exists and is non-empty, else None reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None # If reasoning is present and mode is not '/no_think' and reasoning not done if ( reasoning_check == "" # Reasoning detected in response and mode != "/no_think" # Mode allows reasoning output and not reasoning_done # Reasoning phase not finished yet and "reasoning" in choice # Current delta includes reasoning part and choice["reasoning"] # Reasoning content is not empty ): reasoning += choice["reasoning"] # Append incremental reasoning text # Yield reasoning wrapped in styled HTML block with details expanded yield styles(reasoning=reasoning, content="", expanded=True) continue # Continue streaming reasoning increments # When reasoning ends and content starts, mark reasoning done, yield empty string, then content if ( reasoning_check == "" # Reasoning was detected previously and mode != "/no_think" # Mode allows reasoning output and not reasoning_done # Reasoning phase not finished yet and "content" in choice # Current delta includes content part and choice["content"] # Content is not empty ): reasoning_done = True # Mark reasoning phase complete yield "" # Yield empty string to signal end of reasoning block content += choice["content"] # Start accumulating content text yield content # Yield first part of content continue # Continue streaming content increments # If no reasoning present or reasoning done, accumulate content and yield incrementally if ( (reasoning_check is None or reasoning_done or mode == "/no_think") # No reasoning or reasoning finished or mode disables reasoning and "content" in choice # Current delta includes content part and choice["content"] # Content is not empty ): content += choice["content"] # Append incremental content text yield content # Yield updated content string except Exception: # Ignore exceptions during JSON parsing or key access and continue streaming continue return # Exit function after successful streaming completion except httpx.HTTPStatusError as e: # If server returns specific error code indicating busy, retry with another server if e.response.status_code == error: continue # Try next available server else: # For other HTTP errors, mark this server as busy mark(server) except Exception: # For other exceptions (network errors, timeouts), mark server as busy mark(server) # If all servers tried and none succeeded, yield busy message yield "The server is currently busy. Please wait a moment or try again later." return # End of function