Spaces:
Sleeping
Sleeping
import os | |
import pandas as pd | |
import json | |
import gradio as gr | |
from openai import AzureOpenAI | |
from PyPDF2 import PdfReader | |
from gradio.themes.base import Base | |
from gradio.themes.utils import colors, fonts, sizes | |
import base64 | |
class BaseTheme(Base): | |
def __init__( | |
self, | |
*, | |
primary_hue: colors.Color | str = colors.orange, | |
secondary_hue: colors.Color | str = colors.blue, | |
neutral_hue: colors.Color | str = colors.gray, | |
spacing_size: sizes.Size | str = sizes.spacing_md, | |
radius_size: sizes.Size | str = sizes.radius_md, | |
text_size: sizes.Size | str = sizes.text_lg, | |
): | |
super().__init__( | |
primary_hue=primary_hue, | |
secondary_hue=secondary_hue, | |
neutral_hue=neutral_hue, | |
spacing_size=spacing_size, | |
radius_size=radius_size, | |
text_size=text_size, | |
) | |
basetheme = BaseTheme() | |
js_func = """ | |
function refresh() { | |
const url = new URL(window.location); | |
if (url.searchParams.get('__theme') !== 'dark') { | |
url.searchParams.set('__theme', 'dark'); | |
window.location.href = url.href; | |
} | |
} | |
""" | |
# =============================== | |
# Azure OpenAI setup | |
# =============================== | |
os.environ["AZURE_OPENAI_ENDPOINT"] = os.getenv("AZURE_OPENAI_ENDPOINT") | |
os.environ["AZURE_OPENAI_API_KEY"] = os.getenv("AZURE_OPENAI_API_KEY") # Replace with your actual API key | |
client = AzureOpenAI( | |
api_version="2023-05-15", | |
azure_deployment="gpt-4o", # Replace with your actual model deployment name | |
) | |
# =============================== | |
# Helper Functions | |
# =============================== | |
def parse_field_definitions(field_text): | |
""" | |
Converts user-entered lines in the format: | |
Field Name: Description | |
into a dictionary { "Field Name": "Description", ... }. | |
Lines without a colon are ignored or added with an empty description. | |
""" | |
user_fields = {} | |
lines = field_text.split("\n") | |
for line in lines: | |
line = line.strip() | |
if not line: | |
continue | |
if ":" in line: | |
# Split on the first colon | |
field, description = line.split(":", 1) | |
field = field.strip() | |
description = description.strip() | |
user_fields[field] = description | |
else: | |
# If no colon is found, treat entire line as a field with an empty description | |
user_fields[line] = "" | |
return user_fields | |
def read_file_metadata(file_path): | |
df = pd.read_csv(file_path) | |
column_names = list(df.columns) | |
sample_columns = column_names[:2] | |
sample_data = df[sample_columns].iloc[0].to_dict() if len(df) > 0 else {} | |
return column_names, sample_data | |
def read_excel_metadata(file_path): | |
df = pd.read_excel(file_path) | |
column_names = list(df.columns) | |
sample_columns = column_names[:2] | |
sample_data = df[sample_columns].iloc[0].to_dict() if len(df) > 0 else {} | |
return column_names, sample_data | |
def create_column_mapping_prompt(file_metadata, user_fields): | |
prompt = ( | |
"You are given CSV/Excel data from different sources. The files contain columns with similar content but with different names.\n" | |
"The user has provided the following desired fields and their descriptions:\n" | |
f"{json.dumps(user_fields, indent=2)}\n\n" | |
"For each file, here are the details (showing example data from the first two columns):\n\n" | |
) | |
for file_path, column_names, sample_data in file_metadata: | |
prompt += f"File: {file_path}\n" | |
prompt += f"Columns: {column_names}\n" | |
prompt += f"Example Data (first two columns): {sample_data}\n\n" | |
prompt += ( | |
"Your task is to map the existing column names from each file to the desired fields provided by the user. " | |
"For each desired field, decide which column name in each file best represents it. " | |
"If a field cannot be found, map it to an empty string.\n\n" | |
"Return the mapping in JSON format with the following structure:\n" | |
"{\n" | |
' "desired_field1": { "source_file1": "matched_column_name_or_empty", "source_file2": "matched_column_name_or_empty", ... },\n' | |
' "desired_field2": { ... },\n' | |
" ...\n" | |
"}\n\n" | |
"Do not include any additional text in your response." | |
) | |
return prompt | |
def get_column_mapping(file_metadata, user_fields): | |
column_match_prompt = create_column_mapping_prompt(file_metadata, user_fields) | |
completion = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[{"role": "user", "content": column_match_prompt}], | |
temperature=0.1, | |
response_format={"type": "json_object"}, | |
) | |
try: | |
response_text = completion.choices[0].message.content.strip() | |
result_mapping = json.loads(response_text) | |
except Exception as e: | |
raise ValueError( | |
f"Error parsing LLM response: {e}\n\nResponse:\n{completion.choices[0].message.content}" | |
) | |
return result_mapping | |
def merge_files_with_mapping(file_paths, user_fields): | |
file_metadata = [] | |
for file_path in file_paths: | |
if file_path.lower().endswith('.csv'): | |
columns, sample_data = read_file_metadata(file_path) | |
elif file_path.lower().endswith(('.xlsx', '.xls')): | |
columns, sample_data = read_excel_metadata(file_path) | |
else: | |
continue | |
file_metadata.append((file_path, columns, sample_data)) | |
# Ask the LLM for a column mapping | |
mapping = get_column_mapping(file_metadata, user_fields) if file_metadata else {} | |
all_data = [] | |
for file_path in file_paths: | |
if file_path.lower().endswith('.csv'): | |
df = pd.read_csv(file_path) | |
elif file_path.lower().endswith(('.xlsx', '.xls')): | |
df = pd.read_excel(file_path) | |
else: | |
continue | |
new_columns = {} | |
for desired_field, file_mapping in mapping.items(): | |
source_column = "" | |
if file_path in file_mapping: | |
source_column = file_mapping[file_path] | |
else: | |
base_name = os.path.basename(file_path) | |
source_column = file_mapping.get(base_name, "") | |
if source_column and source_column in df.columns: | |
new_columns[source_column] = desired_field | |
df.rename(columns=new_columns, inplace=True) | |
all_data.append(df) | |
if not all_data: | |
raise ValueError("No valid CSV/Excel files to merge.") | |
final_df = pd.concat(all_data, ignore_index=True) | |
# Only keep columns in the order the user specified | |
desired_columns = list(user_fields.keys()) | |
final_df = final_df.reindex(columns=desired_columns) | |
final_df.to_csv("merged_data.csv", index=False) | |
return final_df | |
def combine_all_data(file_paths, user_fields): | |
final_df = merge_files_with_mapping(file_paths, user_fields) | |
desired_columns = list(user_fields.keys()) | |
final_df = final_df.reindex(columns=desired_columns) | |
final_df.to_csv("merged_all_data.csv", index=False) | |
absolute_path = os.path.abspath("merged_all_data.csv") | |
return final_df,absolute_path | |
# =============================== | |
# Gradio Interface Function | |
# =============================== | |
def process_data(files, field_text): | |
""" | |
Main function for Gradio to handle user inputs: | |
- files: list of CSV/Excel files | |
- pdf_file: a single PDF file | |
- field_text: multiline text with lines in the form: "Field Name: Description" | |
""" | |
# Parse the user's desired fields from multiline text | |
user_fields = parse_field_definitions(field_text) | |
if not user_fields: | |
return "No valid fields found. Please use the format:\n\nField Name: Description" | |
file_paths = [f.name for f in files] if files else [] | |
try: | |
final_df, absolute_path = combine_all_data(file_paths, user_fields) | |
except Exception as e: | |
return f"Error during processing: {e}" | |
return final_df, absolute_path | |
with open("Frame 1.png", "rb") as logo_file: | |
base64_logo = base64.b64encode(logo_file.read()).decode("utf-8") | |
# =============================== | |
# Gradio UI | |
# =============================== | |
with gr.Blocks(theme=basetheme,js=js_func,fill_height=True) as demo: | |
# Add logo at the top using Base64 HTML | |
with gr.Row(): | |
gr.HTML( | |
f""" | |
<div style="display: grid; grid-template-columns: 1fr 2fr 1fr; align-items: center;"> | |
<div style="justify-self: start;"> | |
<img src="data:image/png;base64,{base64_logo}" alt="Logo" style="width: 150px; height: auto;"> | |
</div> | |
<div style="justify-self: center;"> | |
<h2 style="margin: 0; text-align: center;">AI Data Transformation with User-Selected Fields</h2> | |
</div> | |
<div></div> | |
</div> | |
""" | |
) | |
gr.Interface( | |
fn=process_data, | |
inputs=[ | |
gr.File(label="Upload CSV/Excel files", file_count="multiple",file_types=[".csv", ".xlsx", ".xls"]), | |
gr.Textbox( | |
label="Desired Fields (one per line, use 'Field Name: Description' format)", | |
placeholder="Example:\nName: Full name\nDOB: Date of birth\nAddress: Full address\n", | |
lines=6, | |
), | |
], | |
outputs=[gr.Dataframe(label="Final Merged Data"),gr.File(label="Download CSV")], | |
description=( | |
"Upload one or more CSV/Excel files and enter your desired fields below. " | |
"Type each field on a new line in the format:\n" | |
"'Field Name: Description'\n\n" | |
"The AI will automatically map and merge columns from your files to these fields." | |
), | |
) | |
if __name__ == "__main__": | |
# Launch the Gradio app | |
demo.launch() | |