|
from fastapi import FastAPI, Request |
|
from llama_cpp import Llama |
|
from huggingface_hub import hf_hub_download |
|
import os |
|
import platform |
|
import psutil |
|
import multiprocessing |
|
import time |
|
import uuid |
|
import tiktoken |
|
|
|
app = FastAPI() |
|
|
|
|
|
REPO_ID = "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF" |
|
FILENAME = "tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf" |
|
MODEL_DIR = "models" |
|
MODEL_PATH = os.path.join(MODEL_DIR, FILENAME) |
|
|
|
|
|
if not os.path.exists(MODEL_PATH): |
|
print(f"β¬οΈ Downloading {FILENAME} from Hugging Face...") |
|
try: |
|
model_path = hf_hub_download( |
|
repo_id=REPO_ID, |
|
filename=FILENAME, |
|
cache_dir=MODEL_DIR, |
|
local_dir=MODEL_DIR, |
|
local_dir_use_symlinks=False |
|
) |
|
print(f"β
Model downloaded to: {model_path}") |
|
except Exception as e: |
|
print(f"β Error downloading model: {e}") |
|
|
|
exit(1) |
|
else: |
|
print(f"β
Model already available at: {MODEL_PATH}") |
|
model_path = MODEL_PATH |
|
|
|
|
|
logical_cores = psutil.cpu_count(logical=True) |
|
physical_cores = psutil.cpu_count(logical=False) |
|
recommended_threads = max(1, physical_cores) |
|
|
|
print(f"Detected physical cores: {physical_cores}, logical cores: {logical_cores}") |
|
print(f"Using n_threads: {recommended_threads}") |
|
|
|
|
|
try: |
|
llm = Llama( |
|
model_path=model_path, |
|
n_ctx=1024, |
|
n_threads=recommended_threads, |
|
use_mlock=True, |
|
n_gpu_layers=0, |
|
chat_format="chatml", |
|
verbose=False |
|
) |
|
print("π Llama model loaded successfully!") |
|
except Exception as e: |
|
print(f"β Error loading Llama model: {e}") |
|
exit(1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
try: |
|
encoding = tiktoken.get_encoding("cl100k_base") |
|
except Exception: |
|
print("β οΈ Could not load tiktoken 'cl100k_base' encoding. Using basic len() for token estimation.") |
|
encoding = None |
|
|
|
|
|
chat_histories = {} |
|
|
|
|
|
|
|
|
|
MAX_CONTEXT_TOKENS = 800 |
|
|
|
def count_tokens_in_message(message): |
|
"""Estimates tokens in a single message using tiktoken or simple char count.""" |
|
if encoding: |
|
return len(encoding.encode(message.get("content", ""))) |
|
else: |
|
|
|
return len(message.get("content", "")) // 4 |
|
|
|
def get_message_token_length(messages): |
|
"""Calculates total tokens for a list of messages.""" |
|
total_tokens = 0 |
|
for message in messages: |
|
total_tokens += count_tokens_in_message(message) |
|
return total_tokens |
|
|
|
def truncate_history(history, max_tokens): |
|
""" |
|
Truncates the chat history to fit within max_tokens. |
|
Keeps the system message and recent messages. |
|
""" |
|
if not history: |
|
return [] |
|
|
|
|
|
system_message = history[0] |
|
truncated_history = [system_message] |
|
current_tokens = count_tokens_in_message(system_message) |
|
|
|
|
|
for message in reversed(history[1:]): |
|
message_tokens = count_tokens_in_message(message) |
|
if current_tokens + message_tokens <= max_tokens: |
|
truncated_history.insert(1, message) |
|
current_tokens += message_tokens |
|
else: |
|
break |
|
|
|
return truncated_history |
|
|
|
@app.get("/") |
|
def root(): |
|
return {"message": "β
Data Analysis AI API is live and optimized!"} |
|
|
|
@app.get("/get_sys") |
|
def get_sys_specs(): |
|
"""Returns system specifications including CPU, RAM, and OS details.""" |
|
memory = psutil.virtual_memory() |
|
return { |
|
"CPU": { |
|
"physical_cores": physical_cores, |
|
"logical_cores": logical_cores, |
|
"max_freq_mhz": psutil.cpu_freq().max if psutil.cpu_freq() else "N/A", |
|
"cpu_usage_percent": psutil.cpu_percent(interval=1) |
|
}, |
|
"RAM": { |
|
"total_GB": round(memory.total / (1024 ** 3), 2), |
|
"available_GB": round(memory.available / (1024 ** 3), 2), |
|
"usage_percent": memory.percent |
|
}, |
|
"System": { |
|
"platform": platform.platform(), |
|
"architecture": platform.machine(), |
|
"python_version": platform.python_version() |
|
}, |
|
"Model_Config": { |
|
"model_name": FILENAME, |
|
"n_ctx": llm.n_ctx(), |
|
"n_threads": llm.n_threads(), |
|
"use_mlock": llm.use_mlock() |
|
} |
|
} |
|
|
|
@app.get("/process_list") |
|
def process_list(): |
|
"""Returns a list of processes consuming significant CPU.""" |
|
time.sleep(1) |
|
processes = [] |
|
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']): |
|
try: |
|
cpu = proc.cpu_percent() |
|
mem = proc.memory_percent() |
|
|
|
if cpu > 5 or mem > 2: |
|
processes.append({ |
|
"pid": proc.pid, |
|
"name": proc.name(), |
|
"cpu_percent": round(cpu, 2), |
|
"memory_percent": round(mem, 2) |
|
}) |
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): |
|
pass |
|
|
|
processes.sort(key=lambda x: x['cpu_percent'], reverse=True) |
|
return {"heavy_processes": processes} |
|
|
|
@app.post("/generate") |
|
async def generate(request: Request): |
|
""" |
|
Generates a response from the LLM, maintaining chat context. |
|
Expects a JSON body with 'prompt' and optionally 'session_id'. |
|
If 'session_id' is not provided, a new one will be generated. |
|
""" |
|
data = await request.json() |
|
prompt = data.get("prompt", "").strip() |
|
session_id = data.get("session_id") |
|
|
|
if not prompt: |
|
return {"error": "Prompt cannot be empty"}, 400 |
|
|
|
|
|
if not session_id: |
|
session_id = str(uuid.uuid4()) |
|
|
|
chat_histories[session_id] = [ |
|
{"role": "system", "content": "You are a helpful AI assistant for data analysis. Provide concise and actionable suggestions based on the data provided or questions asked. Keep your responses focused on data insights and actionable steps for report generation."} |
|
] |
|
print(f"π New session created: {session_id}") |
|
elif session_id not in chat_histories: |
|
|
|
chat_histories[session_id] = [ |
|
{"role": "system", "content": "You are a helpful AI assistant for data analysis. Provide concise and actionable suggestions based on the data provided or questions asked. Keep your responses focused on data insights and actionable steps for report generation."} |
|
] |
|
print(f"β οΈ Session ID {session_id} not found, re-initializing history.") |
|
|
|
print(f"π§Ύ Prompt received for session {session_id}: {prompt}") |
|
|
|
|
|
current_messages = list(chat_histories[session_id]) |
|
current_messages.append({"role": "user", "content": prompt}) |
|
|
|
|
|
|
|
|
|
|
|
effective_max_history_tokens = MAX_CONTEXT_TOKENS - count_tokens_in_message({"role": "user", "content": prompt}) - 200 |
|
|
|
if get_message_token_length(current_messages) > MAX_CONTEXT_TOKENS: |
|
print(f"βοΈ Truncating history for session {session_id}. Current tokens: {get_message_token_length(current_messages)}") |
|
chat_histories[session_id] = truncate_history(current_messages, effective_max_history_tokens) |
|
|
|
if chat_histories[session_id][-1]["role"] != "user" or chat_histories[session_id][-1]["content"] != prompt: |
|
chat_histories[session_id].append({"role": "user", "content": prompt}) |
|
print(f"β
History truncated. New tokens: {get_message_token_length(chat_histories[session_id])}") |
|
else: |
|
chat_histories[session_id] = current_messages |
|
|
|
try: |
|
|
|
response = llm.create_chat_completion( |
|
messages=chat_histories[session_id], |
|
max_tokens=256, |
|
temperature=0.7, |
|
stop=["</s>"] |
|
) |
|
|
|
ai_response_content = response["choices"][0]["message"]["content"].strip() |
|
|
|
|
|
chat_histories[session_id].append({"role": "assistant", "content": ai_response_content}) |
|
|
|
return { |
|
"response": ai_response_content, |
|
"session_id": session_id, |
|
"current_context_tokens": get_message_token_length(chat_histories[session_id]) |
|
} |
|
except Exception as e: |
|
print(f"β Error during generation for session {session_id}: {e}") |
|
|
|
if chat_histories[session_id] and chat_histories[session_id][-1]["role"] == "user": |
|
chat_histories[session_id].pop() |
|
return {"error": f"Failed to generate response: {e}. Please try again.", "session_id": session_id}, 500 |
|
|