Priyanshukr-1's picture
Update app.py
b03785b verified
raw
history blame
11.5 kB
from fastapi import FastAPI, Request
from llama_cpp import Llama
from huggingface_hub import hf_hub_download
import os
import platform
import psutil
import multiprocessing
import time
import uuid # For generating unique session IDs
import tiktoken # For estimating token count
app = FastAPI()
# === Model Config ===
REPO_ID = "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF"
FILENAME = "tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf" # Q4_K_M is a good balance of size and quality
MODEL_DIR = "models"
MODEL_PATH = os.path.join(MODEL_DIR, FILENAME)
# === Download if model not available ===
if not os.path.exists(MODEL_PATH):
print(f"⬇️ Downloading {FILENAME} from Hugging Face...")
try:
model_path = hf_hub_download(
repo_id=REPO_ID,
filename=FILENAME,
cache_dir=MODEL_DIR,
local_dir=MODEL_DIR,
local_dir_use_symlinks=False
)
print(f"βœ… Model downloaded to: {model_path}")
except Exception as e:
print(f"❌ Error downloading model: {e}")
# Exit or handle error appropriately if model download fails
exit(1)
else:
print(f"βœ… Model already available at: {MODEL_PATH}")
model_path = MODEL_PATH
# === Optimal thread usage ===
logical_cores = psutil.cpu_count(logical=True)
physical_cores = psutil.cpu_count(logical=False)
recommended_threads = max(1, physical_cores) # Ensure at least 1 thread
print(f"Detected physical cores: {physical_cores}, logical cores: {logical_cores}")
print(f"Using n_threads: {recommended_threads}")
# === Load the model ===
try:
llm = Llama(
model_path=model_path,
n_ctx=1024, # Context window size for the model
n_threads=recommended_threads,
use_mlock=True, # Lock model in RAM for faster access
n_gpu_layers=0, # CPU only
chat_format="chatml", # TinyLlama Chat uses ChatML format
verbose=False
)
print("πŸš€ Llama model loaded successfully!")
except Exception as e:
print(f"❌ Error loading Llama model: {e}")
exit(1)
# Initialize tiktoken encoder for token counting (approximate for GGUF models, but good enough)
try:
encoding = tiktoken.get_encoding("cl100k_base")
except Exception:
print("⚠️ Could not load tiktoken 'cl100k_base' encoding. Using basic len() for token estimation.")
encoding = None
# === Global dictionary to store chat histories per session ===
chat_histories = {}
# === Context Truncation Settings ===
# Max tokens for the entire conversation history (input to the model)
# This should be less than n_ctx to leave room for the new prompt and generated response.
MAX_CONTEXT_TOKENS = 800 # Keep total input context below this, leaving 224 tokens for new prompt + response
def count_tokens_in_message(message):
"""Estimates tokens in a single message using tiktoken or simple char count."""
if encoding:
return len(encoding.encode(message.get("content", "")))
else:
# Fallback for when tiktoken isn't available or for simple estimation
return len(message.get("content", "")) // 4 # Rough estimate: 1 token ~ 4 characters
def get_message_token_length(messages):
"""Calculates total tokens for a list of messages."""
total_tokens = 0
for message in messages:
total_tokens += count_tokens_in_message(message)
return total_tokens
def truncate_history(history, max_tokens):
"""
Truncates the chat history to fit within max_tokens.
Keeps the system message and recent messages.
"""
if not history:
return []
# Always keep the system message
system_message = history[0]
truncated_history = [system_message]
current_tokens = count_tokens_in_message(system_message)
# Add messages from most recent, until max_tokens is reached
# Iterate from second-to-last to first user/assistant message
for message in reversed(history[1:]):
message_tokens = count_tokens_in_message(message)
if current_tokens + message_tokens <= max_tokens:
truncated_history.insert(1, message) # Insert after system message to maintain order
current_tokens += message_tokens
else:
break # Stop adding if next message exceeds limit
return truncated_history
@app.get("/")
def root():
return {"message": "βœ… Data Analysis AI API is live and optimized!"}
@app.get("/get_sys")
def get_sys_specs():
"""Returns system specifications including CPU, RAM, and OS details."""
memory = psutil.virtual_memory()
return {
"CPU": {
"physical_cores": physical_cores,
"logical_cores": logical_cores,
"max_freq_mhz": psutil.cpu_freq().max if psutil.cpu_freq() else "N/A",
"cpu_usage_percent": psutil.cpu_percent(interval=1) # CPU usage over 1 second
},
"RAM": {
"total_GB": round(memory.total / (1024 ** 3), 2),
"available_GB": round(memory.available / (1024 ** 3), 2),
"usage_percent": memory.percent
},
"System": {
"platform": platform.platform(),
"architecture": platform.machine(),
"python_version": platform.python_version()
},
"Model_Config": {
"model_name": FILENAME,
"n_ctx": llm.n_ctx(),
"n_threads": llm.n_threads(),
"use_mlock": llm.use_mlock()
}
}
@app.get("/process_list")
def process_list():
"""Returns a list of processes consuming significant CPU."""
time.sleep(1) # Let CPU settle for accurate measurement
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
try:
cpu = proc.cpu_percent()
mem = proc.memory_percent()
# Filter processes using more than 5% CPU or 2% memory
if cpu > 5 or mem > 2:
processes.append({
"pid": proc.pid,
"name": proc.name(),
"cpu_percent": round(cpu, 2),
"memory_percent": round(mem, 2)
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
# Sort by CPU usage descending
processes.sort(key=lambda x: x['cpu_percent'], reverse=True)
return {"heavy_processes": processes}
@app.post("/generate")
async def generate(request: Request):
"""
Generates a response from the LLM, maintaining chat context.
Expects a JSON body with 'prompt' and optionally 'session_id'.
If 'session_id' is not provided, a new one will be generated.
"""
data = await request.json()
prompt = data.get("prompt", "").strip()
session_id = data.get("session_id")
if not prompt:
return {"error": "Prompt cannot be empty"}, 400
# Define the system prompt with an emphasis on using context
system_prompt_content = (
"You are a helpful AI assistant for data analysis. "
"You are designed to provide concise and actionable suggestions based on the data provided or questions asked. "
"**Always refer to the information given in the current conversation context.** "
"Keep your responses focused on data insights and actionable steps for report generation. "
"Do not claim to have no memory if the information is present in the conversation history."
)
# Generate a new session ID if not provided (for new conversations)
if not session_id:
session_id = str(uuid.uuid4())
# Initialize chat history for a new session with a system message
chat_histories[session_id] = [
{"role": "system", "content": system_prompt_content}
]
print(f"πŸ†• New session created: {session_id}")
elif session_id not in chat_histories:
# If a session ID is provided but not found, re-initialize it
chat_histories[session_id] = [
{"role": "system", "content": system_prompt_content}
]
print(f"⚠️ Session ID {session_id} not found, re-initializing history.")
else:
# Ensure the system message is always the most up-to-date one
if chat_histories[session_id][0]["role"] == "system":
chat_histories[session_id][0]["content"] = system_prompt_content
else:
# This case should ideally not happen if history is managed correctly
chat_histories[session_id].insert(0, {"role": "system", "content": system_prompt_content})
print(f"🧾 Prompt received for session {session_id}: {prompt}")
# Add the user's new message to a temporary list to check total length
current_messages = list(chat_histories[session_id]) # Create a copy
current_messages.append({"role": "user", "content": prompt})
# Truncate history if it exceeds the max context tokens
# We subtract a buffer for the new prompt itself and the expected response
# A rough estimate for prompt + response: 100 tokens (prompt) + 100 tokens (response) = 200 tokens
effective_max_history_tokens = MAX_CONTEXT_TOKENS - count_tokens_in_message({"role": "user", "content": prompt}) - 100 # Buffer for response
if get_message_token_length(current_messages) > MAX_CONTEXT_TOKENS:
print(f"βœ‚οΈ Truncating history for session {session_id}. Current tokens: {get_message_token_length(current_messages)}")
chat_histories[session_id] = truncate_history(current_messages, effective_max_history_tokens)
# Re-add the current user prompt after truncation if it was removed
# (This logic ensures the current prompt is always the last user message)
if not (chat_histories[session_id] and
chat_histories[session_id][-1]["role"] == "user" and
chat_histories[session_id][-1]["content"] == prompt):
chat_histories[session_id].append({"role": "user", "content": prompt})
print(f"βœ… History truncated. New tokens: {get_message_token_length(chat_histories[session_id])}")
else:
chat_histories[session_id] = current_messages # If not truncated, just update with the new message
try:
# Pass the (potentially truncated) chat history for context
response = llm.create_chat_completion(
messages=chat_histories[session_id],
max_tokens=150, # Further limit response length to encourage conciseness and speed
temperature=0.7, # Adjust temperature for creativity vs. coherence (0.0-1.0)
stop=["</s>"] # Stop sequence for TinyLlama Chat
)
ai_response_content = response["choices"][0]["message"]["content"].strip()
# Add the AI's response to the history for future turns
chat_histories[session_id].append({"role": "assistant", "content": ai_response_content})
return {
"response": ai_response_content,
"session_id": session_id, # Return the session_id so the client can use it for subsequent requests
"current_context_tokens": get_message_token_length(chat_histories[session_id])
}
except Exception as e:
print(f"❌ Error during generation for session {session_id}: {e}")
# Remove the last user message from history if generation failed to prevent bad state
if chat_histories[session_id] and chat_histories[session_id][-1]["role"] == "user":
chat_histories[session_id].pop()
return {"error": f"Failed to generate response: {e}. Please try again.", "session_id": session_id}, 500