|
from fastapi import FastAPI, Request |
|
from llama_cpp import Llama |
|
from huggingface_hub import hf_hub_download |
|
import os |
|
import platform |
|
import psutil |
|
import multiprocessing |
|
import time |
|
import tiktoken |
|
import logging |
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
logger.setLevel(logging.INFO) |
|
|
|
handler = logging.StreamHandler() |
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') |
|
handler.setFormatter(formatter) |
|
|
|
if not logger.handlers: |
|
logger.addHandler(handler) |
|
|
|
app = FastAPI() |
|
|
|
|
|
REPO_ID = "TheBloke/Mistral-7B-Instruct-v0.2-GGUF" |
|
FILENAME = "mistral-7b-instruct-v0.2.Q4_K_M.gguf" |
|
MODEL_DIR = "models" |
|
MODEL_PATH = os.path.join(MODEL_DIR, FILENAME) |
|
|
|
|
|
if not os.path.exists(MODEL_PATH): |
|
logger.info(f"⬇️ Downloading {FILENAME} from Hugging Face...") |
|
try: |
|
model_path = hf_hub_download( |
|
repo_id=REPO_ID, |
|
filename=FILENAME, |
|
cache_dir=MODEL_DIR, |
|
local_dir=MODEL_DIR, |
|
local_dir_use_symlinks=False |
|
) |
|
logger.info(f"✅ Model downloaded to: {model_path}") |
|
except Exception as e: |
|
logger.error(f"❌ Error downloading model: {e}") |
|
|
|
exit(1) |
|
else: |
|
logger.info(f"✅ Model already available at: {MODEL_PATH}") |
|
model_path = MODEL_PATH |
|
|
|
|
|
logical_cores = psutil.cpu_count(logical=True) |
|
physical_cores = psutil.cpu_count(logical=False) |
|
recommended_threads = max(1, physical_cores) |
|
|
|
logger.info(f"Detected physical cores: {physical_cores}, logical cores: {logical_cores}") |
|
logger.info(f"Using n_threads: {recommended_threads}") |
|
|
|
|
|
try: |
|
llm = Llama( |
|
model_path=model_path, |
|
n_ctx=2048, |
|
n_threads=recommended_threads, |
|
use_mlock=True, |
|
n_gpu_layers=0, |
|
chat_format="chatml", |
|
verbose=False |
|
) |
|
logger.info("� Llama model loaded successfully!") |
|
except Exception as e: |
|
logger.error(f"❌ Error loading Llama model: {e}") |
|
exit(1) |
|
|
|
|
|
try: |
|
encoding = tiktoken.get_encoding("cl100k_base") |
|
except Exception: |
|
logger.warning("⚠️ Could not load tiktoken 'cl100k_base' encoding. Token count for prompt might be less accurate.") |
|
encoding = None |
|
|
|
def count_tokens_in_text(text): |
|
"""Estimates tokens in a given text using tiktoken or simple char count.""" |
|
if encoding: |
|
return len(encoding.encode(text)) |
|
else: |
|
|
|
return len(text) // 4 |
|
|
|
@app.get("/") |
|
def root(): |
|
logger.info("Root endpoint accessed.") |
|
return {"message": "✅ Data Analysis AI API is live and optimized for speed (no context retention)!"} |
|
|
|
@app.get("/get_sys") |
|
def get_sys_specs(): |
|
"""Returns system specifications including CPU, RAM, and OS details.""" |
|
logger.info("System specs endpoint accessed.") |
|
memory = psutil.virtual_memory() |
|
return { |
|
"CPU": { |
|
"physical_cores": physical_cores, |
|
"logical_cores": logical_cores, |
|
"max_freq_mhz": psutil.cpu_freq().max if psutil.cpu_freq() else "N/A", |
|
"cpu_usage_percent": psutil.cpu_percent(interval=1) |
|
}, |
|
"RAM": { |
|
"total_GB": round(memory.total / (1024 ** 3), 2), |
|
"available_GB": round(memory.available / (1024 ** 3), 2), |
|
"usage_percent": memory.percent |
|
}, |
|
"System": { |
|
"platform": platform.platform(), |
|
"architecture": platform.machine(), |
|
"python_version": platform.python_version() |
|
}, |
|
"Model_Config": { |
|
"model_name": FILENAME, |
|
"n_ctx": llm.n_ctx(), |
|
"n_threads": llm.n_threads(), |
|
"use_mlock": llm.use_mlock() |
|
} |
|
} |
|
|
|
@app.get("/process_list") |
|
def process_list(): |
|
"""Returns a list of processes consuming significant CPU.""" |
|
logger.info("Process list endpoint accessed.") |
|
time.sleep(1) |
|
processes = [] |
|
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']): |
|
try: |
|
cpu = proc.cpu_percent() |
|
mem = proc.memory_percent() |
|
|
|
if cpu > 5 or mem > 2: |
|
processes.append({ |
|
"pid": proc.pid, |
|
"name": proc.name(), |
|
"cpu_percent": round(cpu, 2), |
|
"memory_percent": round(mem, 2) |
|
}) |
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): |
|
pass |
|
|
|
processes.sort(key=lambda x: x['cpu_percent'], reverse=True) |
|
return {"heavy_processes": processes} |
|
|
|
@app.post("/generate") |
|
async def generate(request: Request): |
|
""" |
|
Generates a response from the LLM without retaining chat context. |
|
Expects a JSON body with 'prompt'. |
|
""" |
|
logger.info("➡️ /generate endpoint received a request.") |
|
data = await request.json() |
|
user_input = data.get("prompt", "").strip() |
|
|
|
if not user_input: |
|
logger.warning("Prompt cannot be empty in /generate request.") |
|
return {"error": "Prompt cannot be empty"}, 400 |
|
|
|
|
|
system_prompt_content = ( |
|
"You are a highly efficient and objective data analysis API. You are the 'assistant'. " |
|
"Your sole function is to process the user's data and instructions, then output ONLY the requested analysis in the specified format. " |
|
"**Crucially, do NOT include any conversational text, greetings, introductions, conclusions, or any remarks about being an AI.** " |
|
"Respond directly with the content. Adhere strictly to all formatting requirements. " |
|
"If a request cannot be fulfilled, respond ONLY with: 'STATUS: FAILED_ANALYSIS; REASON: Unable to process this specific analytical request due to limitations.'" |
|
) |
|
|
|
|
|
|
|
user_content_template = f"""Please analyze the following data based on the instructions within it. |
|
Provide only the direct output as requested. Do not add any extra conversational text. |
|
|
|
--- DATA --- |
|
{user_input} |
|
""" |
|
|
|
|
|
messages_for_llm = [ |
|
{"role": "system", "content": system_prompt_content}, |
|
{"role": "user", "content": user_content_template} |
|
] |
|
|
|
|
|
prompt_tokens = count_tokens_in_text(user_input) |
|
|
|
logger.info(f"🧾 Original user input: {user_input}") |
|
logger.info(f"Tokens in prompt: {prompt_tokens}") |
|
|
|
try: |
|
response = llm.create_chat_completion( |
|
messages=messages_for_llm, |
|
max_tokens=800, |
|
|
|
temperature=0.2, |
|
|
|
stop=["<|im_end|>"] |
|
) |
|
ai_response_content = response["choices"][0]["message"]["content"].strip() |
|
|
|
response_token_count = count_tokens_in_text(ai_response_content) |
|
|
|
logger.info("✅ Response generated successfully.") |
|
return { |
|
"response": ai_response_content, |
|
"prompt_tokens": prompt_tokens, |
|
"response_token_count": response_token_count |
|
} |
|
except Exception as e: |
|
logger.error(f"❌ Error during generation: {e}", exc_info=True) |
|
return {"error": f"Failed to generate response: {e}. Please try again."}, 500 |