|
from fastapi import FastAPI, Request |
|
from llama_cpp import Llama |
|
from huggingface_hub import hf_hub_download |
|
import os |
|
import platform |
|
import psutil |
|
import multiprocessing |
|
import time |
|
import uuid |
|
import tiktoken |
|
|
|
app = FastAPI() |
|
|
|
|
|
REPO_ID = "TheBloke/TinyLlama-1.1B-Chat-v1.0-GGUF" |
|
FILENAME = "tinyllama-1.1b-chat-v1.0.Q4_K_M.gguf" |
|
MODEL_DIR = "models" |
|
MODEL_PATH = os.path.join(MODEL_DIR, FILENAME) |
|
|
|
|
|
if not os.path.exists(MODEL_PATH): |
|
print(f"β¬οΈ Downloading {FILENAME} from Hugging Face...") |
|
try: |
|
model_path = hf_hub_download( |
|
repo_id=REPO_ID, |
|
filename=FILENAME, |
|
cache_dir=MODEL_DIR, |
|
local_dir=MODEL_DIR, |
|
local_dir_use_symlinks=False |
|
) |
|
print(f"β
Model downloaded to: {model_path}") |
|
except Exception as e: |
|
print(f"β Error downloading model: {e}") |
|
|
|
exit(1) |
|
else: |
|
print(f"β
Model already available at: {MODEL_PATH}") |
|
model_path = MODEL_PATH |
|
|
|
|
|
logical_cores = psutil.cpu_count(logical=True) |
|
physical_cores = psutil.cpu_count(logical=False) |
|
recommended_threads = max(1, physical_cores) |
|
|
|
print(f"Detected physical cores: {physical_cores}, logical cores: {logical_cores}") |
|
print(f"Using n_threads: {recommended_threads}") |
|
|
|
|
|
try: |
|
llm = Llama( |
|
model_path=model_path, |
|
n_ctx=1024, |
|
n_threads=recommended_threads, |
|
use_mlock=True, |
|
n_gpu_layers=0, |
|
chat_format="chatml", |
|
verbose=False |
|
) |
|
print("π Llama model loaded successfully!") |
|
except Exception as e: |
|
print(f"β Error loading Llama model: {e}") |
|
exit(1) |
|
|
|
|
|
try: |
|
encoding = tiktoken.get_encoding("cl100k_base") |
|
except Exception: |
|
print("β οΈ Could not load tiktoken 'cl100k_base' encoding. Using basic len() for token estimation.") |
|
encoding = None |
|
|
|
|
|
chat_histories = {} |
|
|
|
|
|
|
|
|
|
MAX_CONTEXT_TOKENS = 800 |
|
|
|
def count_tokens_in_message(message): |
|
"""Estimates tokens in a single message using tiktoken or simple char count.""" |
|
if encoding: |
|
return len(encoding.encode(message.get("content", ""))) |
|
else: |
|
|
|
return len(message.get("content", "")) // 4 |
|
|
|
def get_message_token_length(messages): |
|
"""Calculates total tokens for a list of messages.""" |
|
total_tokens = 0 |
|
for message in messages: |
|
total_tokens += count_tokens_in_message(message) |
|
return total_tokens |
|
|
|
def truncate_history(history, max_tokens): |
|
""" |
|
Truncates the chat history to fit within max_tokens. |
|
Keeps the system message and recent messages. |
|
""" |
|
if not history: |
|
return [] |
|
|
|
|
|
system_message = history[0] |
|
truncated_history = [system_message] |
|
current_tokens = count_tokens_in_message(system_message) |
|
|
|
|
|
|
|
for message in reversed(history[1:]): |
|
message_tokens = count_tokens_in_message(message) |
|
if current_tokens + message_tokens <= max_tokens: |
|
truncated_history.insert(1, message) |
|
current_tokens += message_tokens |
|
else: |
|
break |
|
|
|
return truncated_history |
|
|
|
@app.get("/") |
|
def root(): |
|
return {"message": "β
Data Analysis AI API is live and optimized!"} |
|
|
|
@app.get("/get_sys") |
|
def get_sys_specs(): |
|
"""Returns system specifications including CPU, RAM, and OS details.""" |
|
memory = psutil.virtual_memory() |
|
return { |
|
"CPU": { |
|
"physical_cores": physical_cores, |
|
"logical_cores": logical_cores, |
|
"max_freq_mhz": psutil.cpu_freq().max if psutil.cpu_freq() else "N/A", |
|
"cpu_usage_percent": psutil.cpu_percent(interval=1) |
|
}, |
|
"RAM": { |
|
"total_GB": round(memory.total / (1024 ** 3), 2), |
|
"available_GB": round(memory.available / (1024 ** 3), 2), |
|
"usage_percent": memory.percent |
|
}, |
|
"System": { |
|
"platform": platform.platform(), |
|
"architecture": platform.machine(), |
|
"python_version": platform.python_version() |
|
}, |
|
"Model_Config": { |
|
"model_name": FILENAME, |
|
"n_ctx": llm.n_ctx(), |
|
"n_threads": llm.n_threads(), |
|
"use_mlock": llm.use_mlock() |
|
} |
|
} |
|
|
|
@app.get("/process_list") |
|
def process_list(): |
|
"""Returns a list of processes consuming significant CPU.""" |
|
time.sleep(1) |
|
processes = [] |
|
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']): |
|
try: |
|
cpu = proc.cpu_percent() |
|
mem = proc.memory_percent() |
|
|
|
if cpu > 5 or mem > 2: |
|
processes.append({ |
|
"pid": proc.pid, |
|
"name": proc.name(), |
|
"cpu_percent": round(cpu, 2), |
|
"memory_percent": round(mem, 2) |
|
}) |
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): |
|
pass |
|
|
|
processes.sort(key=lambda x: x['cpu_percent'], reverse=True) |
|
return {"heavy_processes": processes} |
|
|
|
@app.post("/generate") |
|
async def generate(request: Request): |
|
""" |
|
Generates a response from the LLM, maintaining chat context. |
|
Expects a JSON body with 'prompt' and optionally 'session_id'. |
|
If 'session_id' is not provided, a new one will be generated. |
|
""" |
|
data = await request.json() |
|
prompt = data.get("prompt", "").strip() |
|
session_id = data.get("session_id") |
|
|
|
if not prompt: |
|
return {"error": "Prompt cannot be empty"}, 400 |
|
|
|
|
|
system_prompt_content = ( |
|
"You are a helpful AI assistant for data analysis. " |
|
"You are designed to provide concise and actionable suggestions based on the data provided or questions asked. " |
|
"**Always refer to the information given in the current conversation context.** " |
|
"Keep your responses focused on data insights and actionable steps for report generation. " |
|
"Do not claim to have no memory if the information is present in the conversation history." |
|
) |
|
|
|
|
|
if not session_id: |
|
session_id = str(uuid.uuid4()) |
|
|
|
chat_histories[session_id] = [ |
|
{"role": "system", "content": system_prompt_content} |
|
] |
|
print(f"π New session created: {session_id}") |
|
elif session_id not in chat_histories: |
|
|
|
chat_histories[session_id] = [ |
|
{"role": "system", "content": system_prompt_content} |
|
] |
|
print(f"β οΈ Session ID {session_id} not found, re-initializing history.") |
|
else: |
|
|
|
if chat_histories[session_id][0]["role"] == "system": |
|
chat_histories[session_id][0]["content"] = system_prompt_content |
|
else: |
|
|
|
chat_histories[session_id].insert(0, {"role": "system", "content": system_prompt_content}) |
|
|
|
|
|
print(f"π§Ύ Prompt received for session {session_id}: {prompt}") |
|
|
|
|
|
current_messages = list(chat_histories[session_id]) |
|
current_messages.append({"role": "user", "content": prompt}) |
|
|
|
|
|
|
|
|
|
effective_max_history_tokens = MAX_CONTEXT_TOKENS - count_tokens_in_message({"role": "user", "content": prompt}) - 100 |
|
|
|
if get_message_token_length(current_messages) > MAX_CONTEXT_TOKENS: |
|
print(f"βοΈ Truncating history for session {session_id}. Current tokens: {get_message_token_length(current_messages)}") |
|
chat_histories[session_id] = truncate_history(current_messages, effective_max_history_tokens) |
|
|
|
|
|
if not (chat_histories[session_id] and |
|
chat_histories[session_id][-1]["role"] == "user" and |
|
chat_histories[session_id][-1]["content"] == prompt): |
|
chat_histories[session_id].append({"role": "user", "content": prompt}) |
|
print(f"β
History truncated. New tokens: {get_message_token_length(chat_histories[session_id])}") |
|
else: |
|
chat_histories[session_id] = current_messages |
|
|
|
try: |
|
|
|
response = llm.create_chat_completion( |
|
messages=chat_histories[session_id], |
|
max_tokens=150, |
|
temperature=0.7, |
|
stop=["</s>"] |
|
) |
|
|
|
ai_response_content = response["choices"][0]["message"]["content"].strip() |
|
|
|
|
|
chat_histories[session_id].append({"role": "assistant", "content": ai_response_content}) |
|
|
|
return { |
|
"response": ai_response_content, |
|
"session_id": session_id, |
|
"current_context_tokens": get_message_token_length(chat_histories[session_id]) |
|
} |
|
except Exception as e: |
|
print(f"β Error during generation for session {session_id}: {e}") |
|
|
|
if chat_histories[session_id] and chat_histories[session_id][-1]["role"] == "user": |
|
chat_histories[session_id].pop() |
|
return {"error": f"Failed to generate response: {e}. Please try again.", "session_id": session_id}, 500 |
|
|