File size: 11,398 Bytes
b5e7375
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
#
# SPDX-FileCopyrightText: Hadad <[email protected]>
# SPDX-License-Identifier: Apache-2.0
#

import json  # Import JSON module to parse and handle JSON data
import uuid  # Import UUID module to generate unique identifiers for sessions
from typing import List, Dict, Any  # Import type hints for lists, dictionaries, and generic types
from datetime import datetime  # Import datetime to get and format current date/time
from config import *  # Import all configuration variables including 'auth' and 'restrictions'
from src.utils.session_mapping import get_host  # Import function to get server info by session ID
from src.utils.ip_generator import generate_ip  # Import function to generate random IP for headers
from src.utils.helper import mark  # Import function to mark a server as busy/unavailable
from src.ui.reasoning import styles  # Import function to apply CSS styling to reasoning output
import httpx  # Import httpx for async HTTP requests with streaming support

async def jarvis(
    session_id: str,  # Unique session identifier to maintain consistent server assignment
    model: str,  # AI model name to specify which model to use
    history: List[Dict[str, str]],  # List of previous conversation messages with roles and content
    user_message: str,  # Latest user input message to send to the AI model
    mode: str,  # Mode string to guide AI behavior, e.g., '/think' or '/no_think'
    files=None,  # Optional files or attachments to include with the user message
    temperature: float = 0.6,  # Sampling temperature controlling randomness in token generation
    top_k: int = 20,  # Limit token selection to top_k probable tokens
    min_p: float = 0,  # Minimum probability threshold for token selection
    top_p: float = 0.95,  # Nucleus sampling cumulative probability threshold
    repetition_penalty: float = 1,  # Penalty factor to reduce token repetition
):
    """
    Asynchronously send a chat request to a Jarvis AI server and handle streaming response incrementally.

    This function manages server selection based on the session ID, retries requests on specific error codes,
    and yields incremental parts of the AI-generated response as they arrive. It integrates CSS styling into
    the reasoning output only if the mode is not '/no_think', preserving the behavior where reasoning is streamed
    first inside a styled HTML block, followed by the main content streamed normally.

    Args:
        session_id (str): Identifier for the user session to maintain consistent server assignment.
        model (str): Name of the AI model to use for generating the response.
        history (List[Dict[str, str]]): List of previous messages in the conversation.
        user_message (str): The current message from the user to send to the AI model.
        mode (str): Contextual instructions to guide the AI model's response style.
        files (optional): Additional files or attachments to include with the user message.
        temperature (float): Controls randomness in token generation.
        top_k (int): Limits token selection to top_k probable tokens.
        min_p (float): Minimum probability threshold for token selection.
        top_p (float): Nucleus sampling cumulative probability threshold.
        repetition_penalty (float): Factor to reduce token repetition.

    Yields:
        str: Incremental strings of AI-generated response streamed from the server.
             Reasoning is wrapped in a styled HTML details block and streamed incrementally only if mode is not '/no_think'.
             After reasoning finishes, the main content is streamed normally.

    Notes:
        The function attempts to send the request to a server assigned for the session.
        If the server returns a specific error code indicating it is busy, it retries with another server.
        If all servers are busy or fail, it yields a message indicating the server is busy.
    """
    tried = set()  # Track servers already tried to avoid repeated retries

    # Loop until all available servers have been tried without success
    while len(tried) < len(auth):
        # Get server setup info assigned for this session, including endpoint, token, and error code
        setup = get_host(session_id)
        server = setup["jarvis"]  # Server identifier
        host = setup["endpoint"]  # API endpoint URL
        token = setup["token"]  # Authorization token
        error = setup["error"]  # HTTP error code triggering retry
        tried.add(server)  # Mark this server as tried

        # Format current date/time string for system instructions
        date = datetime.now().strftime("%A, %B %d, %Y, %I:%M %p %Z")

        # Combine mode instructions, usage restrictions, and date into system instructions string
        instructions = f"{mode}\n\n\n{restrictions}\n\n\nToday: {date}\n\n\n"

        # Copy conversation history to avoid mutating original
        messages = history.copy()
        # Insert system instructions as first message
        messages.insert(0, {"role": "system", "content": instructions})

        # Prepare user message dict, include files if provided
        msg = {"role": "user", "content": user_message}
        if files:
            msg["files"] = files
        messages.append(msg)  # Append user message to conversation

        # Prepare HTTP headers with authorization and randomized client IP
        headers = {
            "Authorization": f"Bearer {token}",  # Bearer token for API access
            "Content-Type": "application/json",  # JSON content type
            "X-Forwarded-For": generate_ip()  # Random IP to simulate different client origins
        }

        # Prepare JSON payload with model parameters and conversation messages
        payload = {
            "model": model,
            "messages": messages,
            "stream": True,
            "temperature": temperature,
            "top_k": top_k,
            "min_p": min_p,
            "top_p": top_p,
            "repetition_penalty": repetition_penalty,
        }

        # Initialize accumulators and flags for streamed response parts
        reasoning = ""  # Accumulate reasoning text
        reasoning_check = None  # Flag to detect presence of reasoning in response
        reasoning_done = False  # Flag marking reasoning completion
        content = ""  # Accumulate main content text

        try:
            # Create async HTTP client with no timeout for long streaming
            async with httpx.AsyncClient(timeout=None) as client:
                # Open async streaming POST request to Jarvis server
                async with client.stream("POST", host, headers=headers, json=payload) as response:
                    # Iterate asynchronously over each line of streaming response
                    async for chunk in response.aiter_lines():
                        # Skip lines not starting with "data:"
                        if not chunk.strip().startswith("data:"):
                            continue
                        try:
                            # Parse JSON data after "data:" prefix
                            data = json.loads(chunk[5:])
                            # Extract incremental delta message from first choice
                            choice = data["choices"][0]["delta"]

                            # On first delta received, detect if 'reasoning' field is present and non-empty
                            if reasoning_check is None:
                                # Initialize reasoning_check to empty string if reasoning exists and is non-empty, else None
                                reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None

                            # If reasoning is present and mode is not '/no_think' and reasoning not done
                            if (
                                reasoning_check == ""  # Reasoning detected in response
                                and mode != "/no_think"  # Mode allows reasoning output
                                and not reasoning_done  # Reasoning phase not finished yet
                                and "reasoning" in choice  # Current delta includes reasoning part
                                and choice["reasoning"]  # Reasoning content is not empty
                            ):
                                reasoning += choice["reasoning"]  # Append incremental reasoning text
                                # Yield reasoning wrapped in styled HTML block with details expanded
                                yield styles(reasoning=reasoning, content="", expanded=True)
                                continue  # Continue streaming reasoning increments

                            # When reasoning ends and content starts, mark reasoning done, yield empty string, then content
                            if (
                                reasoning_check == ""  # Reasoning was detected previously
                                and mode != "/no_think"  # Mode allows reasoning output
                                and not reasoning_done  # Reasoning phase not finished yet
                                and "content" in choice  # Current delta includes content part
                                and choice["content"]  # Content is not empty
                            ):
                                reasoning_done = True  # Mark reasoning phase complete
                                yield ""  # Yield empty string to signal end of reasoning block
                                content += choice["content"]  # Start accumulating content text
                                yield content  # Yield first part of content
                                continue  # Continue streaming content increments

                            # If no reasoning present or reasoning done, accumulate content and yield incrementally
                            if (
                                (reasoning_check is None or reasoning_done or mode == "/no_think")  # No reasoning or reasoning finished or mode disables reasoning
                                and "content" in choice  # Current delta includes content part
                                and choice["content"]  # Content is not empty
                            ):
                                content += choice["content"]  # Append incremental content text
                                yield content  # Yield updated content string
                        except Exception:
                            # Ignore exceptions during JSON parsing or key access and continue streaming
                            continue
            return  # Exit function after successful streaming completion
        except httpx.HTTPStatusError as e:
            # If server returns specific error code indicating busy, retry with another server
            if e.response.status_code == error:
                continue  # Try next available server
            else:
                # For other HTTP errors, mark this server as busy
                mark(server)
        except Exception:
            # For other exceptions (network errors, timeouts), mark server as busy
            mark(server)

    # If all servers tried and none succeeded, yield busy message
    yield "The server is currently busy. Please wait a moment or try again later."
    return  # End of function