Priyanshukr-1's picture
Update app.py
01e79df verified
raw
history blame
11.2 kB
from fastapi import FastAPI, Request
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
import os
import platform
import psutil
import multiprocessing
import time
import uuid # For generating unique session IDs
import tiktoken # For estimating token count
app = FastAPI()
# === Model Config ===
REPO_ID = "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF"
FILENAME = "tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf" # Q4_K_M is a good balance of size and quality
MODEL_DIR = "models"
MODEL_PATH = os.path.join(MODEL_DIR, FILENAME)
# === Download if model not available ===
if not os.path.exists(MODEL_PATH):
print(f"⬇️ Downloading {FILENAME} from Hugging Face...")
try:
model_path = hf_hub_download(
repo_id=REPO_ID,
filename=FILENAME,
cache_dir=MODEL_DIR,
local_dir=MODEL_DIR,
local_dir_use_symlinks=False
)
print(f"βœ… Model downloaded to: {model_path}")
except Exception as e:
print(f"❌ Error downloading model: {e}")
# Exit or handle error appropriately if model download fails
exit(1)
else:
print(f"βœ… Model already available at: {MODEL_PATH}")
model_path = MODEL_PATH
# === Optimal thread usage ===
logical_cores = psutil.cpu_count(logical=True)
physical_cores = psutil.cpu_count(logical=False)
recommended_threads = max(1, physical_cores) # Ensure at least 1 thread
print(f"Detected physical cores: {physical_cores}, logical cores: {logical_cores}")
print(f"Using n_threads: {recommended_threads}")
# === Load the model ===
try:
llm = Llama(
model_path=model_path,
n_ctx=1024, # Context window size for the model
n_threads=recommended_threads,
use_mlock=True, # Lock model in RAM for faster access
n_gpu_layers=0, # CPU only
chat_format="chatml", # TinyLlama Chat uses ChatML format
verbose=False
)
print("πŸš€ Llama model loaded successfully!")
except Exception as e:
print(f"❌ Error loading Llama model: {e}")
exit(1)
# Initialize tiktoken encoder for token counting (approximate for GGUF models, but good enough)
# For TinyLlama, we'll use a generic encoder or one that's close enough.
# 'cl100k_base' is common for OpenAI models, but a good approximation for many others.
# For more precise counts for GGUF, you might need to use the model's tokenizer if available
# or rely on llama.cpp's internal tokenization (which is harder to access directly).
# For simplicity and general estimation, cl100k_base is often used.
try:
encoding = tiktoken.get_encoding("cl100k_base")
except Exception:
print("⚠️ Could not load tiktoken 'cl100k_base' encoding. Using basic len() for token estimation.")
encoding = None
# === Global dictionary to store chat histories per session ===
chat_histories = {}
# === Context Truncation Settings ===
# Max tokens for the entire conversation history (input to the model)
# This should be less than n_ctx to leave room for the new prompt and generated response.
MAX_CONTEXT_TOKENS = 800 # Keep total input context below this, leaving 224 tokens for new prompt + response
def count_tokens_in_message(message):
"""Estimates tokens in a single message using tiktoken or simple char count."""
if encoding:
return len(encoding.encode(message.get("content", "")))
else:
# Fallback for when tiktoken isn't available or for simple estimation
return len(message.get("content", "")) // 4 # Rough estimate: 1 token ~ 4 characters
def get_message_token_length(messages):
"""Calculates total tokens for a list of messages."""
total_tokens = 0
for message in messages:
total_tokens += count_tokens_in_message(message)
return total_tokens
def truncate_history(history, max_tokens):
"""
Truncates the chat history to fit within max_tokens.
Keeps the system message and recent messages.
"""
if not history:
return []
# Always keep the system message
system_message = history[0]
truncated_history = [system_message]
current_tokens = count_tokens_in_message(system_message)
# Add messages from most recent, until max_tokens is reached
for message in reversed(history[1:]): # Iterate from second-to-last to first user/assistant message
message_tokens = count_tokens_in_message(message)
if current_tokens + message_tokens <= max_tokens:
truncated_history.insert(1, message) # Insert after system message
current_tokens += message_tokens
else:
break # Stop adding if next message exceeds limit
return truncated_history
@app.get("/")
def root():
return {"message": "βœ… Data Analysis AI API is live and optimized!"}
@app.get("/get_sys")
def get_sys_specs():
"""Returns system specifications including CPU, RAM, and OS details."""
memory = psutil.virtual_memory()
return {
"CPU": {
"physical_cores": physical_cores,
"logical_cores": logical_cores,
"max_freq_mhz": psutil.cpu_freq().max if psutil.cpu_freq() else "N/A",
"cpu_usage_percent": psutil.cpu_percent(interval=1) # CPU usage over 1 second
},
"RAM": {
"total_GB": round(memory.total / (1024 ** 3), 2),
"available_GB": round(memory.available / (1024 ** 3), 2),
"usage_percent": memory.percent
},
"System": {
"platform": platform.platform(),
"architecture": platform.machine(),
"python_version": platform.python_version()
},
"Model_Config": {
"model_name": FILENAME,
"n_ctx": llm.n_ctx(),
"n_threads": llm.n_threads(),
"use_mlock": llm.use_mlock()
}
}
@app.get("/process_list")
def process_list():
"""Returns a list of processes consuming significant CPU."""
time.sleep(1) # Let CPU settle for accurate measurement
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
cpu = proc.cpu_percent()
mem = proc.memory_percent()
# Filter processes using more than 5% CPU or 2% memory
if cpu > 5 or mem > 2:
processes.append({
"pid": proc.pid,
"name": proc.name(),
"cpu_percent": round(cpu, 2),
"memory_percent": round(mem, 2)
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
# Sort by CPU usage descending
processes.sort(key=lambda x: x['cpu_percent'], reverse=True)
return {"heavy_processes": processes}
@app.post("/generate")
async def generate(request: Request):
"""
Generates a response from the LLM, maintaining chat context.
Expects a JSON body with 'prompt' and optionally 'session_id'.
If 'session_id' is not provided, a new one will be generated.
"""
data = await request.json()
prompt = data.get("prompt", "").strip()
session_id = data.get("session_id")
if not prompt:
return {"error": "Prompt cannot be empty"}, 400
# Generate a new session ID if not provided (for new conversations)
if not session_id:
session_id = str(uuid.uuid4())
# Initialize chat history for a new session with a system message
chat_histories[session_id] = [
{"role": "system", "content": "You are a helpful AI assistant for data analysis. Provide concise and actionable suggestions based on the data provided or questions asked. Keep your responses focused on data insights and actionable steps for report generation."}
]
print(f"πŸ†• New session created: {session_id}")
elif session_id not in chat_histories:
# If a session ID is provided but not found, re-initialize it
chat_histories[session_id] = [
{"role": "system", "content": "You are a helpful AI assistant for data analysis. Provide concise and actionable suggestions based on the data provided or questions asked. Keep your responses focused on data insights and actionable steps for report generation."}
]
print(f"⚠️ Session ID {session_id} not found, re-initializing history.")
print(f"🧾 Prompt received for session {session_id}: {prompt}")
# Add the user's new message to a temporary list to check total length
current_messages = list(chat_histories[session_id]) # Create a copy
current_messages.append({"role": "user", "content": prompt})
# Truncate history if it exceeds the max context tokens
# We subtract a buffer for the new prompt itself and the expected response
# A rough estimate for prompt + response: 100 tokens (prompt) + 200 tokens (response) = 300 tokens
# So, MAX_CONTEXT_TOKENS - 300 for the actual history
effective_max_history_tokens = MAX_CONTEXT_TOKENS - count_tokens_in_message({"role": "user", "content": prompt}) - 200 # Buffer for response
if get_message_token_length(current_messages) > MAX_CONTEXT_TOKENS:
print(f"βœ‚οΈ Truncating history for session {session_id}. Current tokens: {get_message_token_length(current_messages)}")
chat_histories[session_id] = truncate_history(current_messages, effective_max_history_tokens)
# Re-add the current user prompt after truncation
if chat_histories[session_id][-1]["role"] != "user" or chat_histories[session_id][-1]["content"] != prompt:
chat_histories[session_id].append({"role": "user", "content": prompt})
print(f"βœ… History truncated. New tokens: {get_message_token_length(chat_histories[session_id])}")
else:
chat_histories[session_id] = current_messages # If not truncated, just update with the new message
try:
# Pass the (potentially truncated) chat history for context
response = llm.create_chat_completion(
messages=chat_histories[session_id],
max_tokens=256, # Further limit response length for faster generation
temperature=0.7, # Adjust temperature for creativity vs. coherence (0.0-1.0)
stop=["</s>"] # Stop sequence for TinyLlama Chat
)
ai_response_content = response["choices"][0]["message"]["content"].strip()
# Add the AI's response to the history for future turns
chat_histories[session_id].append({"role": "assistant", "content": ai_response_content})
return {
"response": ai_response_content,
"session_id": session_id, # Return the session_id so the client can use it for subsequent requests
"current_context_tokens": get_message_token_length(chat_histories[session_id])
}
except Exception as e:
print(f"❌ Error during generation for session {session_id}: {e}")
# Remove the last user message from history if generation failed to prevent bad state
if chat_histories[session_id] and chat_histories[session_id][-1]["role"] == "user":
chat_histories[session_id].pop()
return {"error": f"Failed to generate response: {e}. Please try again.", "session_id": session_id}, 500