from fastapi import FastAPI, Request from llama_cpp import Llama from huggingface_hub import hf_hub_download import os import platform import psutil import multiprocessing import time import uuid # For generating unique session IDs import tiktoken # For estimating token count app = FastAPI() # === Model Config === REPO_ID = "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF" FILENAME = "tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf" # Q4_K_M is a good balance of size and quality MODEL_DIR = "models" MODEL_PATH = os.path.join(MODEL_DIR, FILENAME) # === Download if model not available === if not os.path.exists(MODEL_PATH): print(f"⬇️ Downloading {FILENAME} from Hugging Face...") try: model_path = hf_hub_download( repo_id=REPO_ID, filename=FILENAME, cache_dir=MODEL_DIR, local_dir=MODEL_DIR, local_dir_use_symlinks=False ) print(f"✅ Model downloaded to: {model_path}") except Exception as e: print(f"❌ Error downloading model: {e}") # Exit or handle error appropriately if model download fails exit(1) else: print(f"✅ Model already available at: {MODEL_PATH}") model_path = MODEL_PATH # === Optimal thread usage === logical_cores = psutil.cpu_count(logical=True) physical_cores = psutil.cpu_count(logical=False) recommended_threads = max(1, physical_cores) # Ensure at least 1 thread print(f"Detected physical cores: {physical_cores}, logical cores: {logical_cores}") print(f"Using n_threads: {recommended_threads}") # === Load the model === try: llm = Llama( model_path=model_path, n_ctx=1024, # Context window size for the model n_threads=recommended_threads, use_mlock=True, # Lock model in RAM for faster access n_gpu_layers=0, # CPU only chat_format="chatml", # TinyLlama Chat uses ChatML format verbose=False ) print("🚀 Llama model loaded successfully!") except Exception as e: print(f"❌ Error loading Llama model: {e}") exit(1) # Initialize tiktoken encoder for token counting (approximate for GGUF models, but good enough) # For TinyLlama, we'll use a generic encoder or one that's close enough. # 'cl100k_base' is common for OpenAI models, but a good approximation for many others. # For more precise counts for GGUF, you might need to use the model's tokenizer if available # or rely on llama.cpp's internal tokenization (which is harder to access directly). # For simplicity and general estimation, cl100k_base is often used. try: encoding = tiktoken.get_encoding("cl100k_base") except Exception: print("⚠️ Could not load tiktoken 'cl100k_base' encoding. Using basic len() for token estimation.") encoding = None # === Global dictionary to store chat histories per session === chat_histories = {} # === Context Truncation Settings === # Max tokens for the entire conversation history (input to the model) # This should be less than n_ctx to leave room for the new prompt and generated response. MAX_CONTEXT_TOKENS = 800 # Keep total input context below this, leaving 224 tokens for new prompt + response def count_tokens_in_message(message): """Estimates tokens in a single message using tiktoken or simple char count.""" if encoding: return len(encoding.encode(message.get("content", ""))) else: # Fallback for when tiktoken isn't available or for simple estimation return len(message.get("content", "")) // 4 # Rough estimate: 1 token ~ 4 characters def get_message_token_length(messages): """Calculates total tokens for a list of messages.""" total_tokens = 0 for message in messages: total_tokens += count_tokens_in_message(message) return total_tokens def truncate_history(history, max_tokens): """ Truncates the chat history to fit within max_tokens. Keeps the system message and recent messages. """ if not history: return [] # Always keep the system message system_message = history[0] truncated_history = [system_message] current_tokens = count_tokens_in_message(system_message) # Add messages from most recent, until max_tokens is reached for message in reversed(history[1:]): # Iterate from second-to-last to first user/assistant message message_tokens = count_tokens_in_message(message) if current_tokens + message_tokens <= max_tokens: truncated_history.insert(1, message) # Insert after system message current_tokens += message_tokens else: break # Stop adding if next message exceeds limit return truncated_history @app.get("/") def root(): return {"message": "✅ Data Analysis AI API is live and optimized!"} @app.get("/get_sys") def get_sys_specs(): """Returns system specifications including CPU, RAM, and OS details.""" memory = psutil.virtual_memory() return { "CPU": { "physical_cores": physical_cores, "logical_cores": logical_cores, "max_freq_mhz": psutil.cpu_freq().max if psutil.cpu_freq() else "N/A", "cpu_usage_percent": psutil.cpu_percent(interval=1) # CPU usage over 1 second }, "RAM": { "total_GB": round(memory.total / (1024 ** 3), 2), "available_GB": round(memory.available / (1024 ** 3), 2), "usage_percent": memory.percent }, "System": { "platform": platform.platform(), "architecture": platform.machine(), "python_version": platform.python_version() }, "Model_Config": { "model_name": FILENAME, "n_ctx": llm.n_ctx(), "n_threads": llm.n_threads(), "use_mlock": llm.use_mlock() } } @app.get("/process_list") def process_list(): """Returns a list of processes consuming significant CPU.""" time.sleep(1) # Let CPU settle for accurate measurement processes = [] for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']): try: cpu = proc.cpu_percent() mem = proc.memory_percent() # Filter processes using more than 5% CPU or 2% memory if cpu > 5 or mem > 2: processes.append({ "pid": proc.pid, "name": proc.name(), "cpu_percent": round(cpu, 2), "memory_percent": round(mem, 2) }) except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): pass # Sort by CPU usage descending processes.sort(key=lambda x: x['cpu_percent'], reverse=True) return {"heavy_processes": processes} @app.post("/generate") async def generate(request: Request): """ Generates a response from the LLM, maintaining chat context. Expects a JSON body with 'prompt' and optionally 'session_id'. If 'session_id' is not provided, a new one will be generated. """ data = await request.json() prompt = data.get("prompt", "").strip() session_id = data.get("session_id") if not prompt: return {"error": "Prompt cannot be empty"}, 400 # Generate a new session ID if not provided (for new conversations) if not session_id: session_id = str(uuid.uuid4()) # Initialize chat history for a new session with a system message chat_histories[session_id] = [ {"role": "system", "content": "You are a helpful AI assistant for data analysis. Provide concise and actionable suggestions based on the data provided or questions asked. Keep your responses focused on data insights and actionable steps for report generation."} ] print(f"🆕 New session created: {session_id}") elif session_id not in chat_histories: # If a session ID is provided but not found, re-initialize it chat_histories[session_id] = [ {"role": "system", "content": "You are a helpful AI assistant for data analysis. Provide concise and actionable suggestions based on the data provided or questions asked. Keep your responses focused on data insights and actionable steps for report generation."} ] print(f"⚠️ Session ID {session_id} not found, re-initializing history.") print(f"🧾 Prompt received for session {session_id}: {prompt}") # Add the user's new message to a temporary list to check total length current_messages = list(chat_histories[session_id]) # Create a copy current_messages.append({"role": "user", "content": prompt}) # Truncate history if it exceeds the max context tokens # We subtract a buffer for the new prompt itself and the expected response # A rough estimate for prompt + response: 100 tokens (prompt) + 200 tokens (response) = 300 tokens # So, MAX_CONTEXT_TOKENS - 300 for the actual history effective_max_history_tokens = MAX_CONTEXT_TOKENS - count_tokens_in_message({"role": "user", "content": prompt}) - 200 # Buffer for response if get_message_token_length(current_messages) > MAX_CONTEXT_TOKENS: print(f"✂️ Truncating history for session {session_id}. Current tokens: {get_message_token_length(current_messages)}") chat_histories[session_id] = truncate_history(current_messages, effective_max_history_tokens) # Re-add the current user prompt after truncation if chat_histories[session_id][-1]["role"] != "user" or chat_histories[session_id][-1]["content"] != prompt: chat_histories[session_id].append({"role": "user", "content": prompt}) print(f"✅ History truncated. New tokens: {get_message_token_length(chat_histories[session_id])}") else: chat_histories[session_id] = current_messages # If not truncated, just update with the new message try: # Pass the (potentially truncated) chat history for context response = llm.create_chat_completion( messages=chat_histories[session_id], max_tokens=256, # Further limit response length for faster generation temperature=0.7, # Adjust temperature for creativity vs. coherence (0.0-1.0) stop=[""] # Stop sequence for TinyLlama Chat ) ai_response_content = response["choices"][0]["message"]["content"].strip() # Add the AI's response to the history for future turns chat_histories[session_id].append({"role": "assistant", "content": ai_response_content}) return { "response": ai_response_content, "session_id": session_id, # Return the session_id so the client can use it for subsequent requests "current_context_tokens": get_message_token_length(chat_histories[session_id]) } except Exception as e: print(f"❌ Error during generation for session {session_id}: {e}") # Remove the last user message from history if generation failed to prevent bad state if chat_histories[session_id] and chat_histories[session_id][-1]["role"] == "user": chat_histories[session_id].pop() return {"error": f"Failed to generate response: {e}. Please try again.", "session_id": session_id}, 500