ALNISFPO / app.py
dschandra's picture
Update app.py
d4a05e1 verified
raw
history blame
6.73 kB
import pdfplumber
import pandas as pd
import re
import gradio as gr
# Function: Extract Text from PDF
def extract_text_from_pdf(pdf_file):
with pdfplumber.open(pdf_file.name) as pdf:
text = ""
for page in pdf.pages:
text += page.extract_text()
print("\nExtracted Text:\n", text) # Debugging: Print the extracted text
return text
# Function: Clean Description
def clean_description(description, item_number=None):
"""
Cleans the description by removing unwanted data such as Qty, Unit, Unit Price, Total Price, and other invalid entries.
"""
description = re.sub(r"Page \d+ of \d+.*", "", description) # Remove page references
description = re.sub(r"TOTAL EX-WORK.*", "", description) # Remove EX-WORK-related text
description = re.sub(r"NOTES:.*", "", description) # Remove notes section
description = re.sub(r"HS CODE.*", "", description) # Remove HS CODE-related data
description = re.sub(r"DELIVERY:.*", "", description) # Remove delivery instructions
description = re.sub(r"\(Q\. No:.*?\)", "", description) # Remove Q.No-related data
if item_number == 7:
description = re.sub(r"300 Sets 4.20 1260.00", "", description) # Remove unwanted text in item 7
return description.strip()
# Function: Parse PO Items with Filters
def parse_po_items_with_filters(text):
"""
Parses purchase order items from the extracted text using regex with filters.
Ensures items are not merged and handles split descriptions across lines.
Args:
text (str): Extracted text from the PDF.
Returns:
tuple: A DataFrame with parsed data and a status message.
"""
lines = text.splitlines()
data = []
current_item = None
description_accumulator = []
for line in lines:
# Match the start of an item row (strict boundary for items)
item_match = re.match(r"^(?P<Item>\d+)\s+(?P<Description>.+)", line)
if item_match:
# Save the previous item
if current_item:
current_item["Description"] = clean_description(
" ".join(description_accumulator).strip(),
item_number=int(current_item["Item"]),
)
data.append(current_item)
description_accumulator = []
# Start a new item
current_item = {
"Item": item_match.group("Item"),
"Description": "",
"Qty": "",
"Unit": "",
"Unit Price": "",
"Total Price": "",
}
description_accumulator.append(item_match.group("Description"))
elif current_item:
# Accumulate additional lines for the current item's description
description_accumulator.append(line.strip())
# Match Qty, Unit, Unit Price, and Total Price
qty_match = re.search(r"(?P<Qty>\d+)\s+(Nos\.|Set)", line)
if qty_match:
current_item["Qty"] = qty_match.group("Qty")
current_item["Unit"] = qty_match.group(2)
price_match = re.search(r"(?P<UnitPrice>[\d.]+)\s+(?P<TotalPrice>[\d.]+)$", line)
if price_match:
current_item["Unit Price"] = price_match.group("UnitPrice")
current_item["Total Price"] = price_match.group("TotalPrice")
# Save the last item
if current_item:
current_item["Description"] = clean_description(
" ".join(description_accumulator).strip(),
item_number=int(current_item["Item"]),
)
data.append(current_item)
# Handle item 3 split from item 2
for i, row in enumerate(data):
if row["Item"] == "2" and "As per Drg. to." in row["Description"]:
item_3_description = re.search(r"As per Drg. to. G000810.*Mfd:-2022", row["Description"])
if item_3_description:
data.insert(
i + 1,
{
"Item": "3",
"Description": item_3_description.group(),
"Qty": "12",
"Unit": "Nos.",
"Unit Price": "3.80",
"Total Price": "45.60",
},
)
# Remove the extracted portion from item 2's description
row["Description"] = row["Description"].replace(item_3_description.group(), "").strip()
# Ensure each description's additional data is handled properly
for item in data:
if item["Item"] == "7":
# Remove unwanted text from description
item["Description"] = re.sub(r"300 Sets 4.20 1260.00", "", item["Description"]).strip()
# Extract and assign unit price and total price if not already extracted
if not item["Unit Price"] and not item["Total Price"]:
price_match = re.search(r"(?P<UnitPrice>[\d.]+)\s+(?P<TotalPrice>[\d.]+)", item["Description"])
if price_match:
item["Unit Price"] = price_match.group("UnitPrice")
item["Total Price"] = price_match.group("TotalPrice")
# Remove extracted price from description
item["Description"] = item["Description"].replace(price_match.group(0), "").strip()
# Remove empty descriptions or invalid rows
data = [row for row in data if row["Description"]]
# Return data as a DataFrame
if not data:
return None, "No items found. Please check the PDF file format."
df = pd.DataFrame(data)
return df, "Data extracted successfully."
# Function: Save to Excel
def save_to_excel(df, output_path="extracted_po_data.xlsx"):
df.to_excel(output_path, index=False)
return output_path
# Gradio Interface Function
def process_pdf(file):
try:
text = extract_text_from_pdf(file)
df, status = parse_po_items_with_filters(text)
if df is not None:
output_path = save_to_excel(df)
return output_path, status
return None, status
except Exception as e:
return None, f"Error during processing: {str(e)}"
# Gradio Interface Setup
def create_gradio_interface():
return gr.Interface(
fn=process_pdf,
inputs=gr.File(label="Upload PDF", file_types=[".pdf"]),
outputs=[
gr.File(label="Download Extracted Data"),
gr.Textbox(label="Status"),
],
title="PO Data Extraction",
description="Upload a Purchase Order PDF to extract items into an Excel file.",
)
if __name__ == "__main__":
interface = create_gradio_interface()
interface.launch()