Spaces:
Sleeping
Sleeping
| import re | |
| import fitz | |
| from PIL import Image | |
| import pytesseract | |
| import gradio as gr | |
| import pandas as pd | |
| import os | |
| config_val = "--psm 6 -c tessedit_char_whitelist=0123456789,.-+" | |
| # Rectangles for Form 1040 Pages 1 & 2 | |
| page1_rects = [ | |
| [(464, 399), (576, 399), (575, 409), (462, 410)], | |
| [(462, 519), (577, 518), (577, 531), (463, 529)], | |
| [(225, 517), (340, 518), (339, 530), (224, 530)], | |
| [(225, 530), (339, 532), (340, 541), (225, 542)], | |
| [(464, 531), (576, 531), (576, 542), (464, 542)], | |
| [(464, 589), (578, 589), (577, 602), (464, 602)], | |
| [(463, 624), (578, 626), (576, 639), (464, 637)], | |
| [(462, 652), (576, 651), (577, 661), (464, 663)], | |
| [(463, 661), (578, 664), (578, 676), (462, 674)], | |
| [(464, 699), (578, 684), (578, 699), (464, 699)] | |
| ] | |
| page2_rects = [ | |
| [(462, 15), (575, 15), (576, 26), (463, 26)], | |
| [(462, 62), (577, 63), (579, 75), (462, 73)], | |
| [(463, 98), (576, 98), (578, 110), (462, 110)], | |
| [(461, 111), (576, 111), (578, 123), (459, 122)] | |
| ] | |
| schedule1_rects = [ | |
| [(470, 204), (579, 203), (577, 216), (471, 216)], # Schedule 1 Line 3 | |
| [(470, 228), (577, 229), (576, 240), (470, 240)], # Schedule 1 Line 5 | |
| [(362, 274), (466, 274), (468, 288), (360, 288)] # Schedule 1 Line 8 | |
| ] | |
| adjusted_page1_rects = [[(x, y + 23) for (x, y) in rect] for rect in page1_rects] | |
| adjusted_page2_rects = [[(x, y + 23) for (x, y) in rect] for rect in page2_rects] | |
| def get_bounding_rect(points): | |
| xs = [pt[0] for pt in points] | |
| ys = [pt[1] for pt in points] | |
| return fitz.Rect(min(xs), min(ys), max(xs), max(ys)) | |
| def extract_numeric_values(pdf_file, schedule1_file=None, client_name="Unknown Client"): | |
| try: | |
| if not client_name or client_name.strip() == "": | |
| return "Error: Client name is required.", None | |
| # ---- All existing code inside try ---- | |
| if isinstance(pdf_file, str): | |
| doc = fitz.open(pdf_file) | |
| else: | |
| pdf_file.seek(0) | |
| doc = fitz.open(stream=pdf_file.read(), filetype="pdf") | |
| if len(doc) < 2: | |
| return "Error: Main PDF must have at least 2 pages.", None | |
| zoom = fitz.Matrix(2, 2) | |
| page1 = doc[0] | |
| page2 = doc[1] | |
| page1_values, page2_values = [], [] | |
| for rect_points in adjusted_page1_rects: | |
| rect = get_bounding_rect(rect_points) | |
| pix = page1.get_pixmap(matrix=zoom, clip=rect) | |
| cropped_img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| w, h = cropped_img.size | |
| val_img = cropped_img.crop((int(0.4 * w), 0, w, h)) | |
| raw = pytesseract.image_to_string(val_img, config=config_val).strip() | |
| value_text = re.sub(r"[^\d,.\-+]", "", raw) | |
| page1_values.append(value_text) | |
| for rect_points in adjusted_page2_rects: | |
| rect = get_bounding_rect(rect_points) | |
| pix = page2.get_pixmap(matrix=zoom, clip=rect) | |
| cropped_img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| w, h = cropped_img.size | |
| val_img = cropped_img.crop((int(0.4 * w), 0, w, h)) | |
| raw = pytesseract.image_to_string(val_img, config=config_val).strip() | |
| value_text = re.sub(r"[^\d,.\-+]", "", raw) | |
| page2_values.append(value_text) | |
| doc.close() | |
| output = [f"1040 Value {i+1}: {val}" for i, val in enumerate(page1_values + page2_values)] | |
| all_extracated_values = page1_values + page2_values | |
| schedule1_values = [] | |
| if schedule1_file: | |
| if isinstance(schedule1_file, str): | |
| doc = fitz.open(schedule1_file) | |
| else: | |
| schedule1_file.seek(0) | |
| doc = fitz.open(stream=schedule1_file.read(), filetype="pdf") | |
| if len(doc) >= 1: | |
| page = doc[0] | |
| schedule1_values = [] | |
| for idx, rect_points in enumerate(schedule1_rects): | |
| rect = get_bounding_rect(rect_points) | |
| pix = page.get_pixmap(matrix=zoom, clip=rect) | |
| cropped_img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| w, h = cropped_img.size | |
| val_img = cropped_img.crop((int(0.4 * w), 0, w, h)) | |
| raw = pytesseract.image_to_string(val_img, config=config_val).strip() | |
| value_text = re.sub(r"[^\d,.\-+]", "", raw) | |
| schedule1_values.append(value_text) | |
| schedule1 = schedule1_values | |
| output += [f"Schedule 1 Line {i*2+1 if i < 2 else 8}: {val}" for i, val in enumerate(schedule1_values)] | |
| doc.close() | |
| save_to_csv_flat(all_extracated_values, schedule1_values, client_name=client_name) | |
| return "\n".join(output), "Client_Output_Data_Form_1040.csv" | |
| except Exception as e: | |
| return f"Error occurred:\n{str(e)}", None | |
| def save_to_csv_flat(all_extracted_values, schedule1_values, client_name="Unknown Client", csv_path=None): | |
| # Define the directory path explicitly | |
| output_dir = "/home/user/app/files" # Adjust this to your Hugging Face Space's files directory | |
| os.makedirs(output_dir, exist_ok=True) # Create directory if it doesn't exist | |
| if csv_path is None: | |
| csv_path = os.path.join(output_dir, "Client_Output_Data_Form_1040.csv") | |
| # Header components | |
| header_level_1 = [ | |
| "Client Name","Gross Comp", "Taxable Wages", "Taxable Interest Income: Sch. B", "Tax- Exempt Interest", | |
| "Qualified Dividends", "Ordinary Dividends", "Long Term Capital Gain or Loss", | |
| "Other Adjustments (from Schedule 1)", "Business Income or Loss (Schedule C)", | |
| "Rent/ Royalty (Schedule E)", "Other Income", "Standard Deduction", "Qualified Business Income Deduction", | |
| "Taxable Income", "Tax", "", "", "Total Tax" | |
| ] | |
| header_level_2 = [ | |
| "","W2 Box 5", "Line 1", "Line 2b", "Line 2a", "Line 3a", "Line 3b", "Line 7", | |
| "Line 10", "Schedule 1, Line 3", "Schedule 1, Line 5", "Schedule 1, Line 8", | |
| "Line 12", "Line 13", "Line 15", "Line 16", "Line 20, Schedule 3", "Line 23, Schedule 2", "Line 24" | |
| ] | |
| # Flatten headers for CSV | |
| flat_columns = [ | |
| f"{h1.strip()} - {h2.strip()}" if h1.strip() and h2.strip() | |
| else (h1.strip() + h2.strip()) for h1, h2 in zip(header_level_1, header_level_2) | |
| ] | |
| # If file doesn't exist, create new DataFrame and write headers | |
| if os.path.exists(csv_path): | |
| df = pd.read_csv(csv_path) | |
| else: | |
| df = pd.DataFrame(columns=flat_columns) | |
| # Create new row with None | |
| new_row = pd.Series([None] * len(flat_columns), index=flat_columns) | |
| new_row.iloc[0] = client_name | |
| # Map Page 1-2 values | |
| line_mapping = { | |
| "Taxable Wages - Line 1": 0, | |
| "Taxable Interest Income: Sch. B - Line 2b": 1, | |
| "Tax- Exempt Interest - Line 2a": 2, | |
| "Qualified Dividends - Line 3a": 3, | |
| "Ordinary Dividends - Line 3b": 4, | |
| "Long Term Capital Gain or Loss - Line 7": 5, | |
| "Other Adjustments (from Schedule 1) - Line 10": 6, | |
| "Standard Deduction - Line 12": 7, | |
| "Qualified Business Income Deduction - Line 13": 8, | |
| "Taxable Income - Line 15": 9, | |
| "Tax - Line 16": 10, | |
| "Line 20, Schedule 3": 11, | |
| "Line 23, Schedule 2": 12, | |
| "Total Tax - Line 24": 13 | |
| } | |
| for key, idx in line_mapping.items(): | |
| if idx < len(all_extracted_values): | |
| new_row[key] = all_extracted_values[idx] if all_extracted_values[idx] != '' else '0' | |
| # Add Schedule 1 values | |
| if schedule1_values: | |
| new_row["Business Income or Loss (Schedule C) - Schedule 1, Line 3"] = schedule1_values[0] if schedule1_values[0] != '' else '0' | |
| new_row["Rent/ Royalty (Schedule E) - Schedule 1, Line 5"] = schedule1_values[1] if schedule1_values[1] != '' else '0' | |
| new_row["Other Income - Schedule 1, Line 8"] = schedule1_values[2] if schedule1_values[2] != '' else '0' | |
| # Append and save | |
| df = pd.concat([df, pd.DataFrame([new_row])], ignore_index=True) | |
| df.to_csv(csv_path, index=False) | |
| print(f" Data saved to CSV: {csv_path}") | |
| # Gradio UI | |
| iface = gr.Interface( | |
| fn=extract_numeric_values, | |
| inputs=[ | |
| gr.File(label="Upload Main Form 1040 PDF (Required)", file_types=[".pdf"]), | |
| gr.File(label="Upload Schedule 1 PDF (Optional)", file_types=[".pdf"]), | |
| gr.Textbox(label="Client Name", placeholder="Enter client name") | |
| ], | |
| outputs=[ | |
| gr.Textbox(label="Extracted Numeric Values", lines=20), | |
| gr.File(label="Download Excel Output") | |
| ], | |
| title="Tax PDF Extractor", | |
| description="Upload Form 1040 (at least 2 pages). Optionally upload Schedule 1 for extra fields." | |
| ) | |
| # with gr.Blocks(title="Tax PDF Extractor") as demo: | |
| # gr.Markdown("## Tax PDF Extractor") | |
| # gr.Markdown("Upload Form 1040 (at least 2 pages). Optionally upload Schedule 1 for extra fields.") | |
| # client_name = gr.Textbox(label="Client Name (Required)", placeholder="Enter your full name") | |
| # form_1040 = gr.File(label="Upload Main Form 1040 PDF (Required)", file_types=[".pdf"]) | |
| # has_schedule1 = gr.Radio( | |
| # choices=["Yes", "No"], | |
| # label="Do you have Schedule 1?", | |
| # value="No" | |
| # ) | |
| # schedule1 = gr.File(label="Upload Schedule 1 PDF (Optional)", file_types=[".pdf"], visible=False) | |
| # # Show/hide schedule1 upload box | |
| # def toggle_schedule1(choice): | |
| # return gr.update(visible=choice == "Yes") | |
| # has_schedule1.change(fn=toggle_schedule1, inputs=has_schedule1, outputs=schedule1) | |
| # output_text = gr.Textbox(label="Extracted Numeric Values", lines=20) | |
| # output_file = gr.File(label="Download Excel Output") | |
| # def wrapper_extract(main_pdf, schedule1_pdf, client_name): | |
| # if not client_name: | |
| # return "Error: Client name is required.", None | |
| # return extract_numeric_values(main_pdf, schedule1_pdf) | |
| # submit_btn = gr.Button("Extract Data") | |
| # submit_btn.click( | |
| # fn=wrapper_extract, | |
| # inputs=[form_1040, schedule1, client_name], | |
| # outputs=[output_text, output_file] | |
| # ) | |
| iface.launch(share=True) | |