Spaces:
Running
Running
# | |
# SPDX-FileCopyrightText: Hadad <[email protected]> | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import json # Import JSON module to parse and handle JSON data | |
import uuid # Import UUID module to generate unique identifiers for sessions | |
from typing import List, Dict, Any # Import type hints for lists, dictionaries, and generic types | |
from datetime import datetime # Import datetime to get and format current date/time | |
from config import * # Import all configuration variables including 'auth' and 'restrictions' | |
from src.utils.session_mapping import get_host # Import function to get server info by session ID | |
from src.utils.ip_generator import generate_ip # Import function to generate random IP for headers | |
from src.utils.helper import mark # Import function to mark a server as busy/unavailable | |
from src.ui.reasoning import styles # Import function to apply CSS styling to reasoning output | |
import httpx # Import httpx for async HTTP requests with streaming support | |
async def jarvis( | |
session_id: str, # Unique session identifier to maintain consistent server assignment | |
model: str, # AI model name to specify which model to use | |
history: List[Dict[str, str]], # List of previous conversation messages with roles and content | |
user_message: str, # Latest user input message to send to the AI model | |
mode: str, # Mode string to guide AI behavior, e.g., '/think' or '/no_think' | |
files=None, # Optional files or attachments to include with the user message | |
temperature: float = 0.6, # Sampling temperature controlling randomness in token generation | |
top_k: int = 20, # Limit token selection to top_k probable tokens | |
min_p: float = 0, # Minimum probability threshold for token selection | |
top_p: float = 0.95, # Nucleus sampling cumulative probability threshold | |
repetition_penalty: float = 1, # Penalty factor to reduce token repetition | |
): | |
""" | |
Asynchronously send a chat request to a Jarvis AI server and handle streaming response incrementally. | |
This function manages server selection based on the session ID, retries requests on specific error codes, | |
and yields incremental parts of the AI-generated response as they arrive. It integrates CSS styling into | |
the reasoning output only if the mode is not '/no_think', preserving the behavior where reasoning is streamed | |
first inside a styled HTML block, followed by the main content streamed normally. | |
Args: | |
session_id (str): Identifier for the user session to maintain consistent server assignment. | |
model (str): Name of the AI model to use for generating the response. | |
history (List[Dict[str, str]]): List of previous messages in the conversation. | |
user_message (str): The current message from the user to send to the AI model. | |
mode (str): Contextual instructions to guide the AI model's response style. | |
files (optional): Additional files or attachments to include with the user message. | |
temperature (float): Controls randomness in token generation. | |
top_k (int): Limits token selection to top_k probable tokens. | |
min_p (float): Minimum probability threshold for token selection. | |
top_p (float): Nucleus sampling cumulative probability threshold. | |
repetition_penalty (float): Factor to reduce token repetition. | |
Yields: | |
str: Incremental strings of AI-generated response streamed from the server. | |
Reasoning is wrapped in a styled HTML details block and streamed incrementally only if mode is not '/no_think'. | |
After reasoning finishes, the main content is streamed normally. | |
Notes: | |
The function attempts to send the request to a server assigned for the session. | |
If the server returns a specific error code indicating it is busy, it retries with another server. | |
If all servers are busy or fail, it yields a message indicating the server is busy. | |
""" | |
tried = set() # Track servers already tried to avoid repeated retries | |
# Loop until all available servers have been tried without success | |
while len(tried) < len(auth): | |
# Get server setup info assigned for this session, including endpoint, token, and error code | |
setup = get_host(session_id) | |
server = setup["jarvis"] # Server identifier | |
host = setup["endpoint"] # API endpoint URL | |
token = setup["token"] # Authorization token | |
error = setup["error"] # HTTP error code triggering retry | |
tried.add(server) # Mark this server as tried | |
# Format current date/time string for system instructions | |
date = datetime.now().strftime("%A, %B %d, %Y, %I:%M %p %Z") | |
# Combine mode instructions, usage restrictions, and date into system instructions string | |
instructions = f"{mode}\n\n\n{restrictions}\n\n\nToday: {date}\n\n\n" | |
# Copy conversation history to avoid mutating original | |
messages = history.copy() | |
# Insert system instructions as first message | |
messages.insert(0, {"role": "system", "content": instructions}) | |
# Prepare user message dict, include files if provided | |
msg = {"role": "user", "content": user_message} | |
if files: | |
msg["files"] = files | |
messages.append(msg) # Append user message to conversation | |
# Prepare HTTP headers with authorization and randomized client IP | |
headers = { | |
"Authorization": f"Bearer {token}", # Bearer token for API access | |
"Content-Type": "application/json", # JSON content type | |
"X-Forwarded-For": generate_ip() # Random IP to simulate different client origins | |
} | |
# Prepare JSON payload with model parameters and conversation messages | |
payload = { | |
"model": model, | |
"messages": messages, | |
"stream": True, | |
"temperature": temperature, | |
"top_k": top_k, | |
"min_p": min_p, | |
"top_p": top_p, | |
"repetition_penalty": repetition_penalty, | |
} | |
# Initialize accumulators and flags for streamed response parts | |
reasoning = "" # Accumulate reasoning text | |
reasoning_check = None # Flag to detect presence of reasoning in response | |
reasoning_done = False # Flag marking reasoning completion | |
content = "" # Accumulate main content text | |
try: | |
# Create async HTTP client with no timeout for long streaming | |
async with httpx.AsyncClient(timeout=None) as client: | |
# Open async streaming POST request to Jarvis server | |
async with client.stream("POST", host, headers=headers, json=payload) as response: | |
# Iterate asynchronously over each line of streaming response | |
async for chunk in response.aiter_lines(): | |
# Skip lines not starting with "data:" | |
if not chunk.strip().startswith("data:"): | |
continue | |
try: | |
# Parse JSON data after "data:" prefix | |
data = json.loads(chunk[5:]) | |
# Extract incremental delta message from first choice | |
choice = data["choices"][0]["delta"] | |
# On first delta received, detect if 'reasoning' field is present and non-empty | |
if reasoning_check is None: | |
# Initialize reasoning_check to empty string if reasoning exists and is non-empty, else None | |
reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None | |
# If reasoning is present and mode is not '/no_think' and reasoning not done | |
if ( | |
reasoning_check == "" # Reasoning detected in response | |
and mode != "/no_think" # Mode allows reasoning output | |
and not reasoning_done # Reasoning phase not finished yet | |
and "reasoning" in choice # Current delta includes reasoning part | |
and choice["reasoning"] # Reasoning content is not empty | |
): | |
reasoning += choice["reasoning"] # Append incremental reasoning text | |
# Yield reasoning wrapped in styled HTML block with details expanded | |
yield styles(reasoning=reasoning, content="", expanded=True) | |
continue # Continue streaming reasoning increments | |
# When reasoning ends and content starts, mark reasoning done, yield empty string, then content | |
if ( | |
reasoning_check == "" # Reasoning was detected previously | |
and mode != "/no_think" # Mode allows reasoning output | |
and not reasoning_done # Reasoning phase not finished yet | |
and "content" in choice # Current delta includes content part | |
and choice["content"] # Content is not empty | |
): | |
reasoning_done = True # Mark reasoning phase complete | |
yield "" # Yield empty string to signal end of reasoning block | |
content += choice["content"] # Start accumulating content text | |
yield content # Yield first part of content | |
continue # Continue streaming content increments | |
# If no reasoning present or reasoning done, accumulate content and yield incrementally | |
if ( | |
(reasoning_check is None or reasoning_done or mode == "/no_think") # No reasoning or reasoning finished or mode disables reasoning | |
and "content" in choice # Current delta includes content part | |
and choice["content"] # Content is not empty | |
): | |
content += choice["content"] # Append incremental content text | |
yield content # Yield updated content string | |
except Exception: | |
# Ignore exceptions during JSON parsing or key access and continue streaming | |
continue | |
return # Exit function after successful streaming completion | |
except httpx.HTTPStatusError as e: | |
# If server returns specific error code indicating busy, retry with another server | |
if e.response.status_code == error: | |
continue # Try next available server | |
else: | |
# For other HTTP errors, mark this server as busy | |
mark(server) | |
except Exception: | |
# For other exceptions (network errors, timeouts), mark server as busy | |
mark(server) | |
# If all servers tried and none succeeded, yield busy message | |
yield "The server is currently busy. Please wait a moment or try again later." | |
return # End of function |