import torch from transformers import AutoTokenizer, AutoModelForSeq2SeqLM from data_processor import DataProcessor from chart_generator import ChartGenerator from image_verifier import ImageVerifier from huggingface_hub import login import logging import time import os from dotenv import load_dotenv import ast import requests import json load_dotenv() class LLM_Agent: def __init__(self, data_path=None): logging.info("Initializing LLM_Agent") self.data_processor = DataProcessor(data_path) self.chart_generator = ChartGenerator(self.data_processor.data) self.image_verifier = ImageVerifier() # Use Hugging Face Hub model path for fine-tuned model model_path = "ArchCoder/fine-tuned-bart-large" self.query_tokenizer = AutoTokenizer.from_pretrained(model_path) self.query_model = AutoModelForSeq2SeqLM.from_pretrained(model_path) def validate_plot_args(plot_args): required_keys = ['x', 'y', 'chart_type'] if not all(key in plot_args for key in required_keys): return False if not isinstance(plot_args['y'], list): plot_args['y'] = [plot_args['y']] return True def process_request(self, data): start_time = time.time() logging.info(f"Processing request data: {data}") query = data.get('query', '') data_path = data.get('file_path') model_choice = data.get('model', 'bart') # Log file path and check existence if data_path: logging.info(f"Data path received: {data_path}") import os if not os.path.exists(data_path): logging.error(f"File does not exist at path: {data_path}") else: logging.info(f"File exists at path: {data_path}") # Re-initialize data processor and chart generator if a file is specified if data_path: self.data_processor = DataProcessor(data_path) # Log loaded columns loaded_columns = self.data_processor.get_columns() logging.info(f"Loaded columns from data: {loaded_columns}") self.chart_generator = ChartGenerator(self.data_processor.data) # Enhanced prompt for better model responses enhanced_prompt = ( "You are VizBot, an expert data visualization assistant. " "Given a user's natural language request about plotting data, output ONLY a valid Python dictionary with keys: x, y, chart_type, and color (if specified). " "Do not include any explanation or extra text.\n\n" "Example 1:\n" "User: plot the sales in the years with red line\n" "Output: {'x': 'Year', 'y': ['Sales'], 'chart_type': 'line', 'color': 'red'}\n\n" "Example 2:\n" "User: show employee expenses and net profit over the years\n" "Output: {'x': 'Year', 'y': ['Employee expense', 'Net profit'], 'chart_type': 'line'}\n\n" "Example 3:\n" "User: display the EBITDA for each year with a blue bar\n" "Output: {'x': 'Year', 'y': ['EBITDA'], 'chart_type': 'bar', 'color': 'blue'}\n\n" f"User: {query}\nOutput:" ) try: if model_choice == 'bart': # Use local fine-tuned BART model inputs = self.query_tokenizer(query, return_tensors="pt", max_length=512, truncation=True) outputs = self.query_model.generate(**inputs, max_length=100, num_return_sequences=1) response_text = self.query_tokenizer.decode(outputs[0], skip_special_tokens=True) elif model_choice == 'flan-t5-base': # Use Hugging Face Inference API with Flan-T5-Base model api_url = "https://api-inference.huggingface.co/models/google/flan-t5-base" headers = {"Authorization": f"Bearer {os.getenv('HUGGINGFACEHUB_API_TOKEN')}"} payload = {"inputs": enhanced_prompt} response = requests.post(api_url, headers=headers, json=payload, timeout=30) if response.status_code != 200: logging.error(f"Hugging Face API error: {response.status_code} {response.text}") response_text = "{'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}" else: try: resp_json = response.json() response_text = resp_json[0]['generated_text'] if isinstance(resp_json, list) else resp_json.get('generated_text', '') if not response_text: response_text = "{'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}" except Exception as e: logging.error(f"Error parsing Hugging Face API response: {e}, raw: {response.text}") response_text = "{'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}" elif model_choice == 'flan-ul2': # Use Hugging Face Inference API with Flan-T5-XXL model (best available) api_url = "https://api-inference.huggingface.co/models/google/flan-t5-xxl" headers = {"Authorization": f"Bearer {os.getenv('HUGGINGFACEHUB_API_TOKEN')}"} payload = {"inputs": enhanced_prompt} response = requests.post(api_url, headers=headers, json=payload, timeout=30) if response.status_code != 200: logging.error(f"Hugging Face API error: {response.status_code} {response.text}") response_text = "{'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}" else: try: resp_json = response.json() response_text = resp_json[0]['generated_text'] if isinstance(resp_json, list) else resp_json.get('generated_text', '') if not response_text: response_text = "{'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}" except Exception as e: logging.error(f"Error parsing Hugging Face API response: {e}, raw: {response.text}") response_text = "{'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'}" else: # Default fallback to local fine-tuned BART model inputs = self.query_tokenizer(query, return_tensors="pt", max_length=512, truncation=True) outputs = self.query_model.generate(**inputs, max_length=100, num_return_sequences=1) response_text = self.query_tokenizer.decode(outputs[0], skip_special_tokens=True) logging.info(f"LLM response text: {response_text}") # Clean and parse the response response_text = response_text.strip() if response_text.startswith("```") and response_text.endswith("```"): response_text = response_text[3:-3].strip() if response_text.startswith("python"): response_text = response_text[6:].strip() try: plot_args = ast.literal_eval(response_text) except (SyntaxError, ValueError) as e: logging.warning(f"Invalid LLM response: {e}. Response: {response_text}") plot_args = {'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'} if not LLM_Agent.validate_plot_args(plot_args): logging.warning("Invalid plot arguments. Using default.") plot_args = {'x': 'Year', 'y': ['Sales'], 'chart_type': 'line'} chart_path = self.chart_generator.generate_chart(plot_args) verified = self.image_verifier.verify(chart_path, query) end_time = time.time() logging.info(f"Processed request in {end_time - start_time} seconds") return { "response": response_text, "chart_path": chart_path, "verified": verified } except Exception as e: logging.error(f"Error processing request: {e}") end_time = time.time() logging.info(f"Processed request in {end_time - start_time} seconds") return { "response": f"Error: {str(e)}", "chart_path": "", "verified": False }