Spaces:
Running
Running
import pandas as pd | |
from langchain_core.prompts import ChatPromptTemplate | |
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer | |
from huggingface_hub import login | |
import torch | |
import os | |
# Hugging Face API Token from Space Secrets | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
if not HF_TOKEN: | |
raise ValueError("HF_TOKEN environment variable not set. Please set it in Hugging Face Space Settings under Secrets.") | |
# Model configuration | |
MODEL_NAME = "facebook/opt-125m" # Lightweight model; replace with e.g., mistralai/Mixtral-8x7B-Instruct-v0.1 for paid Spaces with GPU | |
# Initialize model and tokenizer | |
try: | |
# Log in to Hugging Face Hub | |
login(token=HF_TOKEN) | |
print("Successfully logged in to Hugging Face Hub") | |
# Load tokenizer and model | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
model = AutoModelForCausalLM.from_pretrained(MODEL_NAME) | |
# Create text generation pipeline | |
llm_pipeline = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
device=0 if torch.cuda.is_available() else -1, # Use GPU if available in Space | |
max_new_tokens=500, # Limit response length | |
pad_token_id=tokenizer.eos_token_id, # Ensure proper padding | |
) | |
except Exception as e: | |
print(f"Failed to load model: {str(e)}") | |
llm_pipeline = None | |
# Function to parse and extract information from the chunks | |
def parse(dom_chunks, parse_description): | |
"""Parse and extract information from DOM chunks using a local LLM.""" | |
if llm_pipeline is None: | |
raise ValueError("LLM pipeline not initialized. Check model loading and ensure HF_TOKEN is set in Space Secrets.") | |
# Create a prompt template | |
template = ( | |
"You are tasked with extracting specific information from the following text content: {dom_content}. " | |
"Please follow these instructions carefully:\n\n" | |
"1. **Task:** Extract data from the provided text that matches the description: {parse_description}.\n" | |
"2. **Output Format:** Return the extracted data ONLY as one or more Markdown tables. Each table MUST be correctly formatted.\n" | |
"3. **Markdown Table Format:** Each table must adhere to the following Markdown format:\n" | |
" - Start with a header row, clearly labeling each column, separated by pipes (|).\n" | |
" - Follow the header row with an alignment row, using hyphens (-) to indicate column alignment (e.g., --- for left alignment).\n" | |
" - Subsequent rows should contain the data, with cells aligned according to the alignment row.\n" | |
" - Use pipes (|) to separate columns in each data row.\n" | |
"4. **No Explanations:** Do not include any introductory or explanatory text before or after the table(s).\n" | |
"5. **Empty Response:** If no information matches the description, return an empty string ('').\n" | |
"6. **Multiple Tables:** If the text contains multiple tables matching the description, return each table separately, following the Markdown format for each.\n" | |
"7. **Accuracy:** The extracted data must be accurate and reflect the information in the provided text.\n" | |
) | |
parsed_results = [] | |
# Loop through the chunks and parse | |
for i, chunk in enumerate(dom_chunks, start=1): | |
# Format the prompt | |
prompt = template.format(dom_content=chunk, parse_description=parse_description) | |
# Invoke the LLM pipeline | |
response = llm_pipeline(prompt, max_length=2000, truncation=True) | |
result = response[0]["generated_text"] | |
# Clean the output to keep only the Markdown table (remove prompt text) | |
start_idx = result.find("|") | |
if start_idx != -1: | |
result = result[start_idx:] | |
else: | |
result = "" # Return empty string if no table is found | |
print(f"Parsed batch {i} of {len(dom_chunks)}") | |
parsed_results.append(result) | |
# Return the parsed results as a single string | |
return "\n".join(parsed_results) | |
def merge_tables_with_llm(tables, parse_description): | |
"""Merges a list of Pandas DataFrames into a single Markdown table using a local LLM.""" | |
if llm_pipeline is None: | |
raise ValueError("LLM pipeline not initialized. Check model loading and ensure HF_TOKEN is set in Space Secrets.") | |
# Convert DataFrames to Markdown strings | |
table_strings = [table.to_markdown(index=False) for table in tables] | |
# Create a prompt for the LLM | |
merge_prompt = ( | |
"You are tasked with merging the following Markdown tables into a single, comprehensive Markdown table.\n" | |
"The tables contain information related to: {parse_description}.\n" | |
"Please follow these instructions carefully:\n\n" | |
"1. **Task:** Merge the data from the following tables into a single table that matches the description: {parse_description}.\n" | |
"2. **Output Format:** Return the merged data ONLY as a single Markdown table. The table MUST be correctly formatted.\n" | |
"3. **Markdown Table Format:** The table must adhere to the following Markdown format:\n" | |
" - Start with a header row, clearly labeling each column, separated by pipes (|).\n" | |
" - Follow the header row with an alignment row, using hyphens (-) to indicate column alignment (e.g., --- for left alignment).\n" | |
" - Subsequent rows should contain the data, with cells aligned according to the alignment row.\n" | |
" - Use pipes (|) to separate columns in each data row.\n" | |
"4. **No Explanations:** Do not include any introductory or explanatory text before or after the table.\n" | |
"5. **Empty Response:** If no information matches the description, return an empty string ('') if no data can be merged.\n" | |
"6. **Duplicate Columns:** If there are duplicate columns, rename them to be unique.\n" | |
"7. **Missing Values:** If there are missing values, fill them with 'N/A'.\n\n" | |
"Here are the tables:\n\n" + "\n\n".join(table_strings) + | |
"\n\nReturn the merged table in Markdown format:" | |
).format(parse_description=parse_description) | |
# Invoke the LLM pipeline | |
response = llm_pipeline(merge_prompt, max_length=2000, truncation=True) | |
merged_table = response[0]["generated_text"] | |
# Clean the output to keep only the Markdown table | |
start_idx = merged_table.find("|") | |
if start_idx != -1: | |
merged_table = merged_table[start_idx:] | |
else: | |
merged_table = "" # Return empty string if no table is found | |
return merged_table |