Spaces:
Sleeping
Sleeping
import os | |
import pandas as pd | |
import json | |
import gradio as gr | |
from openai import AzureOpenAI | |
from PyPDF2 import PdfReader | |
from gradio.themes.base import Base | |
from gradio.themes.utils import colors, fonts, sizes | |
import base64 | |
class BaseTheme(Base): | |
def __init__( | |
self, | |
*, | |
primary_hue: colors.Color | str = colors.orange, | |
secondary_hue: colors.Color | str = colors.blue, | |
neutral_hue: colors.Color | str = colors.gray, | |
spacing_size: sizes.Size | str = sizes.spacing_md, | |
radius_size: sizes.Size | str = sizes.radius_md, | |
text_size: sizes.Size | str = sizes.text_lg, | |
): | |
super().__init__( | |
primary_hue=primary_hue, | |
secondary_hue=secondary_hue, | |
neutral_hue=neutral_hue, | |
spacing_size=spacing_size, | |
radius_size=radius_size, | |
text_size=text_size, | |
) | |
basetheme = BaseTheme() | |
js_func = """ | |
function refresh() { | |
const url = new URL(window.location); | |
if (url.searchParams.get('__theme') !== 'dark') { | |
url.searchParams.set('__theme', 'dark'); | |
window.location.href = url.href; | |
} | |
} | |
""" | |
# =============================== | |
# Azure OpenAI setup | |
# =============================== | |
os.environ["AZURE_OPENAI_ENDPOINT"] = os.getenv("AZURE_OPENAI_ENDPOINT") | |
os.environ["AZURE_OPENAI_API_KEY"] = os.getenv("AZURE_OPENAI_API_KEY") | |
client = AzureOpenAI( | |
api_version="2023-05-15", | |
azure_deployment="gpt-4o", # Replace with your actual model deployment name | |
) | |
# =============================== | |
# Helper Functions | |
# =============================== | |
def parse_field_definitions(field_text): | |
""" | |
Converts user-entered lines in the format: | |
Field Name: Description | |
into a dictionary { "Field Name": "Description", ... }. | |
Lines without a colon are ignored or added with an empty description. | |
""" | |
user_fields = {} | |
lines = field_text.split("\n") | |
for line in lines: | |
line = line.strip() | |
if not line: | |
continue | |
if ":" in line: | |
# Split on the first colon | |
field, description = line.split(":", 1) | |
field = field.strip() | |
description = description.strip() | |
user_fields[field] = description | |
else: | |
# If no colon is found, treat entire line as a field with an empty description | |
user_fields[line] = "" | |
return user_fields | |
def read_file_metadata(file_path): | |
df = pd.read_csv(file_path) | |
column_names = list(df.columns) | |
sample_columns = column_names[:2] | |
sample_data = df[sample_columns].iloc[0].to_dict() if len(df) > 0 else {} | |
return column_names, sample_data | |
def read_excel_metadata(file_path): | |
df = pd.read_excel(file_path) | |
column_names = list(df.columns) | |
sample_columns = column_names[:2] | |
sample_data = df[sample_columns].iloc[0].to_dict() if len(df) > 0 else {} | |
return column_names, sample_data | |
def create_column_mapping_prompt(file_metadata, user_fields): | |
prompt = ( | |
"You are given CSV/Excel data from different sources. The files contain columns with similar content but with different names.\n" | |
"The user has provided the following desired fields and their descriptions:\n" | |
f"{json.dumps(user_fields, indent=2)}\n\n" | |
"For each file, here are the details (showing example data from the first two columns):\n\n" | |
) | |
for file_path, column_names, sample_data in file_metadata: | |
prompt += f"File: {file_path}\n" | |
prompt += f"Columns: {column_names}\n" | |
prompt += f"Example Data (first two columns): {sample_data}\n\n" | |
prompt += ( | |
"Your task is to map the existing column names from each file to the desired fields provided by the user. " | |
"For each desired field, decide which column name in each file best represents it. " | |
"If a field cannot be found, map it to an empty string.\n\n" | |
"Return the mapping in JSON format with the following structure:\n" | |
"{\n" | |
' "desired_field1": { "source_file1": "matched_column_name_or_empty", "source_file2": "matched_column_name_or_empty", ... },\n' | |
' "desired_field2": { ... },\n' | |
" ...\n" | |
"}\n\n" | |
"Do not include any additional text in your response." | |
) | |
return prompt | |
def get_column_mapping(file_metadata, user_fields): | |
column_match_prompt = create_column_mapping_prompt(file_metadata, user_fields) | |
completion = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[{"role": "user", "content": column_match_prompt}], | |
temperature=0.1, | |
response_format={"type": "json_object"}, | |
) | |
try: | |
response_text = completion.choices[0].message.content.strip() | |
result_mapping = json.loads(response_text) | |
except Exception as e: | |
raise ValueError( | |
f"Error parsing LLM response: {e}\n\nResponse:\n{completion.choices[0].message.content}" | |
) | |
return result_mapping | |
def merge_files_with_mapping(file_paths, user_fields): | |
file_metadata = [] | |
for file_path in file_paths: | |
if file_path.lower().endswith('.csv'): | |
columns, sample_data = read_file_metadata(file_path) | |
elif file_path.lower().endswith(('.xlsx', '.xls')): | |
columns, sample_data = read_excel_metadata(file_path) | |
else: | |
continue | |
file_metadata.append((file_path, columns, sample_data)) | |
# Ask the LLM for a column mapping | |
mapping = get_column_mapping(file_metadata, user_fields) if file_metadata else {} | |
all_data = [] | |
for file_path in file_paths: | |
if file_path.lower().endswith('.csv'): | |
df = pd.read_csv(file_path) | |
elif file_path.lower().endswith(('.xlsx', '.xls')): | |
df = pd.read_excel(file_path) | |
else: | |
continue | |
new_columns = {} | |
for desired_field, file_mapping in mapping.items(): | |
source_column = "" | |
if file_path in file_mapping: | |
source_column = file_mapping[file_path] | |
else: | |
base_name = os.path.basename(file_path) | |
source_column = file_mapping.get(base_name, "") | |
if source_column and source_column in df.columns: | |
new_columns[source_column] = desired_field | |
df.rename(columns=new_columns, inplace=True) | |
all_data.append(df) | |
if not all_data: | |
raise ValueError("No valid CSV/Excel files to merge.") | |
final_df = pd.concat(all_data, ignore_index=True) | |
# Only keep columns in the order the user specified | |
desired_columns = list(user_fields.keys()) | |
final_df = final_df.reindex(columns=desired_columns) | |
final_df.to_csv("merged_data.csv", index=False) | |
return final_df | |
def extract_text_from_pdf(pdf_path): | |
reader = PdfReader(pdf_path) | |
text = "" | |
for page in reader.pages: | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text | |
return text | |
def map_pdf_to_csv_structure(pdf_path, csv_df, user_fields): | |
pdf_text = extract_text_from_pdf(pdf_path) | |
column_headers = list(csv_df.columns) | |
first_row_data = csv_df.iloc[0].to_dict() if len(csv_df) > 0 else {} | |
prompt = ( | |
f"Based on the following document text extracted from a government project in Thailand:\n{pdf_text}\n\n" | |
f"Please map the information to JSON format using the following structure:\n" | |
f"Column Headers: {column_headers}\n" | |
f"Example Data (from the first row of the CSV): {first_row_data}\n\n" | |
"For each column header, extract the corresponding value from the document text. " | |
"If a column header is not applicable or data is missing, use an empty string.\n\n" | |
"Return only JSON with no additional explanations." | |
) | |
completion = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0, | |
response_format={"type": "json_object"}, | |
) | |
try: | |
response_text = completion.choices[0].message.content.strip() | |
result_dict = json.loads(response_text) | |
except Exception as e: | |
raise ValueError( | |
f"Error parsing LLM response for PDF mapping: {e}\n\nResponse:\n{completion.choices[0].message.content}" | |
) | |
if len(result_dict) == 1: | |
# If there's only a single top-level key, use its value as data | |
only_value = next(iter(result_dict.values())) | |
new_data_df = pd.DataFrame(only_value) | |
else: | |
new_data_df = pd.DataFrame(result_dict) | |
desired_columns = list(user_fields.keys()) | |
new_data_df = new_data_df.reindex(columns=desired_columns) | |
return new_data_df | |
def combine_all_data(file_paths, pdf_file, user_fields): | |
merged_csv_df = merge_files_with_mapping(file_paths, user_fields) | |
if pdf_file and os.path.exists(pdf_file): | |
pdf_data_df = map_pdf_to_csv_structure(pdf_file, merged_csv_df, user_fields) | |
final_df = pd.concat([merged_csv_df, pdf_data_df], ignore_index=True) | |
else: | |
final_df = merged_csv_df | |
desired_columns = list(user_fields.keys()) | |
final_df = final_df.reindex(columns=desired_columns) | |
final_df.to_csv("merged_all_data.csv", index=False) | |
return final_df | |
# =============================== | |
# Gradio Interface Function | |
# =============================== | |
def process_data(files, pdf_file, field_text): | |
""" | |
Main function for Gradio to handle user inputs: | |
- files: list of CSV/Excel files | |
- pdf_file: a single PDF file | |
- field_text: multiline text with lines in the form: "Field Name: Description" | |
""" | |
# Parse the user's desired fields from multiline text | |
user_fields = parse_field_definitions(field_text) | |
if not user_fields: | |
return "No valid fields found. Please use the format:\n\nField Name: Description" | |
file_paths = [f.name for f in files] if files else [] | |
pdf_path = pdf_file.name if pdf_file is not None else None | |
try: | |
final_df = combine_all_data(file_paths, pdf_path, user_fields) | |
except Exception as e: | |
return f"Error during processing: {e}" | |
return final_df | |
with open("Frame 1.png", "rb") as logo_file: | |
base64_logo = base64.b64encode(logo_file.read()).decode("utf-8") | |
# =============================== | |
# Gradio UI | |
# =============================== | |
with gr.Blocks(theme=basetheme,js=js_func,fill_height=True) as demo: | |
# Add logo at the top using Base64 HTML | |
with gr.Row(): | |
gr.HTML( | |
f""" | |
<div style="display: grid; grid-template-columns: 1fr 2fr 1fr; align-items: center;"> | |
<div style="justify-self: start;"> | |
<img src="data:image/png;base64,{base64_logo}" alt="Logo" style="width: 150px; height: auto;"> | |
</div> | |
<div style="justify-self: center;"> | |
<h2 style="margin: 0; text-align: center;">AI Data Transformation with User-Selected Fields</h2> | |
</div> | |
<div></div> | |
</div> | |
""" | |
) | |
gr.Interface( | |
fn=process_data, | |
inputs=[ | |
gr.File(label="Upload CSV/Excel files", file_count="multiple",file_types=[".csv", ".xlsx", ".xls"]), | |
gr.File(label="Upload PDF file (optional)", file_types=[".pdf"]), | |
gr.Textbox( | |
label="Desired Fields (one per line, use 'Field Name: Description' format)", | |
placeholder="Example:\nName: Full name\nDOB: Date of birth\nAddress: Full address\n", | |
lines=6, | |
), | |
], | |
outputs=gr.Dataframe(label="Final Merged Data"), | |
description=( | |
"Upload one or more CSV/Excel files, optionally a PDF file, and enter your desired fields below. " | |
"Type each field on a new line in the format:\n" | |
"'Field Name: Description'\n\n" | |
"The AI will automatically map and merge columns from your files to these fields, " | |
"then optionally extract matching data from the PDF." | |
), | |
) | |
if __name__ == "__main__": | |
# Launch the Gradio app | |
demo.launch() | |