llm-excel-plotter-agent / llm_agent.py
“Transcendental-Programmer”
fix: Update chart generation and LLM agent functionality
4ac0bf8
import torch
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from data_processor import DataProcessor
from chart_generator import ChartGenerator
from image_verifier import ImageVerifier
from huggingface_hub import login
import logging
import time
import os
from dotenv import load_dotenv
import ast
import requests
import json
load_dotenv()
class LLM_Agent:
def __init__(self, data_path=None):
logging.info("Initializing LLM_Agent")
self.data_processor = DataProcessor(data_path)
self.chart_generator = ChartGenerator(self.data_processor.data)
self.image_verifier = ImageVerifier()
# Use Hugging Face Hub model path for fine-tuned model
model_path = "ArchCoder/fine-tuned-bart-large"
self.query_tokenizer = AutoTokenizer.from_pretrained(model_path)
self.query_model = AutoModelForSeq2SeqLM.from_pretrained(model_path)
def validate_plot_args(plot_args):
required_keys = ['x', 'y', 'chart_type']
if not all(key in plot_args for key in required_keys):
return False
if not isinstance(plot_args['y'], list):
plot_args['y'] = [plot_args['y']]
return True
def process_request(self, data):
start_time = time.time()
logging.info(f"Processing request data: {data}")
query = data.get('query', '')
data_path = data.get('file_path')
model_choice = data.get('model', 'bart')
# Log file path and check existence
if data_path:
logging.info(f"Data path received: {data_path}")
import os
if not os.path.exists(data_path):
logging.error(f"File does not exist at path: {data_path}")
else:
logging.info(f"File exists at path: {data_path}")
# Re-initialize data processor and chart generator if a file is specified
if data_path:
self.data_processor = DataProcessor(data_path)
# Log loaded columns
loaded_columns = self.data_processor.get_columns()
logging.info(f"Loaded columns from data: {loaded_columns}")
self.chart_generator = ChartGenerator(self.data_processor.data)
# Enhanced prompt for better model responses
enhanced_prompt = (
"You are VizBot, an expert data visualization assistant. "
"Given a user's natural language request about plotting data, output ONLY a valid Python dictionary with keys: x, y, chart_type, and color (if specified). "
"Do not include any explanation or extra text.\n\n"
"Example 1:\n"
"User: plot the sales in the years with red line\n"
"Output: {'x': 'Year', 'y': ['Sales'], 'chart_type': 'line', 'color': 'red'}\n\n"
"Example 2:\n"
"User: show employee expenses and net profit over the years\n"
"Output: {'x': 'Year', 'y': ['Employee expense', 'Net profit'], 'chart_type': 'line'}\n\n"
"Example 3:\n"
"User: display the EBITDA for each year with a blue bar\n"
"Output: {'x': 'Year', 'y': ['EBITDA'], 'chart_type': 'bar', 'color': 'blue'}\n\n"
f"User: {query}\nOutput:"
)
try:
if model_choice == 'bart':
# Use local fine-tuned BART model
inputs = self.query_tokenizer(query, return_tensors="pt", max_length=512, truncation=True)
outputs = self.query_model.generate(**inputs, max_length=100, num_return_sequences=1)
response_text = self.query_tokenizer.decode(outputs[0], skip_special_tokens=True)
elif model_choice == 'flan-t5-base':
# Use Hugging Face Inference API with Flan-T5-Base model
api_url = "https://api-inference.huggingface.co/models/google/flan-t5-base"
headers = {"Authorization": f"Bearer {os.getenv('HUGGINGFACEHUB_API_TOKEN')}"}
payload = {"inputs": enhanced_prompt}
response = requests.post(api_url, headers=headers, json=payload, timeout=30)
if response.status_code != 200:
logging.error(f"Hugging Face API error: {response.status_code} {response.text}")
response_text = "{'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}"
else:
try:
resp_json = response.json()
response_text = resp_json[0]['generated_text'] if isinstance(resp_json, list) else resp_json.get('generated_text', '')
if not response_text:
response_text = "{'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}"
except Exception as e:
logging.error(f"Error parsing Hugging Face API response: {e}, raw: {response.text}")
response_text = "{'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}"
elif model_choice == 'flan-ul2':
# Use Hugging Face Inference API with Flan-T5-XXL model (best available)
api_url = "https://api-inference.huggingface.co/models/google/flan-t5-xxl"
headers = {"Authorization": f"Bearer {os.getenv('HUGGINGFACEHUB_API_TOKEN')}"}
payload = {"inputs": enhanced_prompt}
response = requests.post(api_url, headers=headers, json=payload, timeout=30)
if response.status_code != 200:
logging.error(f"Hugging Face API error: {response.status_code} {response.text}")
response_text = "{'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}"
else:
try:
resp_json = response.json()
response_text = resp_json[0]['generated_text'] if isinstance(resp_json, list) else resp_json.get('generated_text', '')
if not response_text:
response_text = "{'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}"
except Exception as e:
logging.error(f"Error parsing Hugging Face API response: {e}, raw: {response.text}")
response_text = "{'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}"
else:
# Default fallback to local fine-tuned BART model
inputs = self.query_tokenizer(query, return_tensors="pt", max_length=512, truncation=True)
outputs = self.query_model.generate(**inputs, max_length=100, num_return_sequences=1)
response_text = self.query_tokenizer.decode(outputs[0], skip_special_tokens=True)
logging.info(f"LLM response text: {response_text}")
# Clean and parse the response
response_text = response_text.strip()
if response_text.startswith("```") and response_text.endswith("```"):
response_text = response_text[3:-3].strip()
if response_text.startswith("python"):
response_text = response_text[6:].strip()
try:
plot_args = ast.literal_eval(response_text)
except (SyntaxError, ValueError) as e:
logging.warning(f"Invalid LLM response: {e}. Response: {response_text}")
plot_args = {'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}
if not LLM_Agent.validate_plot_args(plot_args):
logging.warning("Invalid plot arguments. Using default.")
plot_args = {'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}
chart_path = self.chart_generator.generate_chart(plot_args)
verified = self.image_verifier.verify(chart_path, query)
end_time = time.time()
logging.info(f"Processed request in {end_time - start_time} seconds")
return {
"response": response_text,
"chart_path": chart_path,
"verified": verified
}
except Exception as e:
logging.error(f"Error processing request: {e}")
end_time = time.time()
logging.info(f"Processed request in {end_time - start_time} seconds")
return {
"response": f"Error: {str(e)}",
"chart_path": "",
"verified": False
}