Spaces:
Running
Running
File size: 11,398 Bytes
b5e7375 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
#
# SPDX-FileCopyrightText: Hadad <[email protected]>
# SPDX-License-Identifier: Apache-2.0
#
import json # Import JSON module to parse and handle JSON data
import uuid # Import UUID module to generate unique identifiers for sessions
from typing import List, Dict, Any # Import type hints for lists, dictionaries, and generic types
from datetime import datetime # Import datetime to get and format current date/time
from config import * # Import all configuration variables including 'auth' and 'restrictions'
from src.utils.session_mapping import get_host # Import function to get server info by session ID
from src.utils.ip_generator import generate_ip # Import function to generate random IP for headers
from src.utils.helper import mark # Import function to mark a server as busy/unavailable
from src.ui.reasoning import styles # Import function to apply CSS styling to reasoning output
import httpx # Import httpx for async HTTP requests with streaming support
async def jarvis(
session_id: str, # Unique session identifier to maintain consistent server assignment
model: str, # AI model name to specify which model to use
history: List[Dict[str, str]], # List of previous conversation messages with roles and content
user_message: str, # Latest user input message to send to the AI model
mode: str, # Mode string to guide AI behavior, e.g., '/think' or '/no_think'
files=None, # Optional files or attachments to include with the user message
temperature: float = 0.6, # Sampling temperature controlling randomness in token generation
top_k: int = 20, # Limit token selection to top_k probable tokens
min_p: float = 0, # Minimum probability threshold for token selection
top_p: float = 0.95, # Nucleus sampling cumulative probability threshold
repetition_penalty: float = 1, # Penalty factor to reduce token repetition
):
"""
Asynchronously send a chat request to a Jarvis AI server and handle streaming response incrementally.
This function manages server selection based on the session ID, retries requests on specific error codes,
and yields incremental parts of the AI-generated response as they arrive. It integrates CSS styling into
the reasoning output only if the mode is not '/no_think', preserving the behavior where reasoning is streamed
first inside a styled HTML block, followed by the main content streamed normally.
Args:
session_id (str): Identifier for the user session to maintain consistent server assignment.
model (str): Name of the AI model to use for generating the response.
history (List[Dict[str, str]]): List of previous messages in the conversation.
user_message (str): The current message from the user to send to the AI model.
mode (str): Contextual instructions to guide the AI model's response style.
files (optional): Additional files or attachments to include with the user message.
temperature (float): Controls randomness in token generation.
top_k (int): Limits token selection to top_k probable tokens.
min_p (float): Minimum probability threshold for token selection.
top_p (float): Nucleus sampling cumulative probability threshold.
repetition_penalty (float): Factor to reduce token repetition.
Yields:
str: Incremental strings of AI-generated response streamed from the server.
Reasoning is wrapped in a styled HTML details block and streamed incrementally only if mode is not '/no_think'.
After reasoning finishes, the main content is streamed normally.
Notes:
The function attempts to send the request to a server assigned for the session.
If the server returns a specific error code indicating it is busy, it retries with another server.
If all servers are busy or fail, it yields a message indicating the server is busy.
"""
tried = set() # Track servers already tried to avoid repeated retries
# Loop until all available servers have been tried without success
while len(tried) < len(auth):
# Get server setup info assigned for this session, including endpoint, token, and error code
setup = get_host(session_id)
server = setup["jarvis"] # Server identifier
host = setup["endpoint"] # API endpoint URL
token = setup["token"] # Authorization token
error = setup["error"] # HTTP error code triggering retry
tried.add(server) # Mark this server as tried
# Format current date/time string for system instructions
date = datetime.now().strftime("%A, %B %d, %Y, %I:%M %p %Z")
# Combine mode instructions, usage restrictions, and date into system instructions string
instructions = f"{mode}\n\n\n{restrictions}\n\n\nToday: {date}\n\n\n"
# Copy conversation history to avoid mutating original
messages = history.copy()
# Insert system instructions as first message
messages.insert(0, {"role": "system", "content": instructions})
# Prepare user message dict, include files if provided
msg = {"role": "user", "content": user_message}
if files:
msg["files"] = files
messages.append(msg) # Append user message to conversation
# Prepare HTTP headers with authorization and randomized client IP
headers = {
"Authorization": f"Bearer {token}", # Bearer token for API access
"Content-Type": "application/json", # JSON content type
"X-Forwarded-For": generate_ip() # Random IP to simulate different client origins
}
# Prepare JSON payload with model parameters and conversation messages
payload = {
"model": model,
"messages": messages,
"stream": True,
"temperature": temperature,
"top_k": top_k,
"min_p": min_p,
"top_p": top_p,
"repetition_penalty": repetition_penalty,
}
# Initialize accumulators and flags for streamed response parts
reasoning = "" # Accumulate reasoning text
reasoning_check = None # Flag to detect presence of reasoning in response
reasoning_done = False # Flag marking reasoning completion
content = "" # Accumulate main content text
try:
# Create async HTTP client with no timeout for long streaming
async with httpx.AsyncClient(timeout=None) as client:
# Open async streaming POST request to Jarvis server
async with client.stream("POST", host, headers=headers, json=payload) as response:
# Iterate asynchronously over each line of streaming response
async for chunk in response.aiter_lines():
# Skip lines not starting with "data:"
if not chunk.strip().startswith("data:"):
continue
try:
# Parse JSON data after "data:" prefix
data = json.loads(chunk[5:])
# Extract incremental delta message from first choice
choice = data["choices"][0]["delta"]
# On first delta received, detect if 'reasoning' field is present and non-empty
if reasoning_check is None:
# Initialize reasoning_check to empty string if reasoning exists and is non-empty, else None
reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None
# If reasoning is present and mode is not '/no_think' and reasoning not done
if (
reasoning_check == "" # Reasoning detected in response
and mode != "/no_think" # Mode allows reasoning output
and not reasoning_done # Reasoning phase not finished yet
and "reasoning" in choice # Current delta includes reasoning part
and choice["reasoning"] # Reasoning content is not empty
):
reasoning += choice["reasoning"] # Append incremental reasoning text
# Yield reasoning wrapped in styled HTML block with details expanded
yield styles(reasoning=reasoning, content="", expanded=True)
continue # Continue streaming reasoning increments
# When reasoning ends and content starts, mark reasoning done, yield empty string, then content
if (
reasoning_check == "" # Reasoning was detected previously
and mode != "/no_think" # Mode allows reasoning output
and not reasoning_done # Reasoning phase not finished yet
and "content" in choice # Current delta includes content part
and choice["content"] # Content is not empty
):
reasoning_done = True # Mark reasoning phase complete
yield "" # Yield empty string to signal end of reasoning block
content += choice["content"] # Start accumulating content text
yield content # Yield first part of content
continue # Continue streaming content increments
# If no reasoning present or reasoning done, accumulate content and yield incrementally
if (
(reasoning_check is None or reasoning_done or mode == "/no_think") # No reasoning or reasoning finished or mode disables reasoning
and "content" in choice # Current delta includes content part
and choice["content"] # Content is not empty
):
content += choice["content"] # Append incremental content text
yield content # Yield updated content string
except Exception:
# Ignore exceptions during JSON parsing or key access and continue streaming
continue
return # Exit function after successful streaming completion
except httpx.HTTPStatusError as e:
# If server returns specific error code indicating busy, retry with another server
if e.response.status_code == error:
continue # Try next available server
else:
# For other HTTP errors, mark this server as busy
mark(server)
except Exception:
# For other exceptions (network errors, timeouts), mark server as busy
mark(server)
# If all servers tried and none succeeded, yield busy message
yield "The server is currently busy. Please wait a moment or try again later."
return # End of function |