import gradio as gr import fitz # PyMuPDF import easyocr import os import tempfile import numpy as np import json import cv2 import re import csv import io import time import gc import requests from datetime import datetime import pandas as pd from pathlib import Path # Configuration JSON_SAVE_FOLDER = "processed_json" os.makedirs(JSON_SAVE_FOLDER, exist_ok=True) # Initialize EasyOCR reader with CPU only def init_ocr(): return easyocr.Reader(['hi', 'en'], gpu=False) # Force CPU usage reader = init_ocr() def process_page_safely(page, page_num, attempt=1): try: pix = page.get_pixmap(dpi=200) img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) if pix.n == 4: img_data = cv2.cvtColor(img_data, cv2.COLOR_RGBA2RGB) max_pixels = 2000 * 2000 if img_data.shape[0] * img_data.shape[1] > max_pixels: half = img_data.shape[0] // 2 top_part = img_data[:half, :] bottom_part = img_data[half:, :] results_top = reader.readtext(top_part, detail=1, batch_size=1) results_bottom = reader.readtext(bottom_part, detail=1, batch_size=1) results = results_top + results_bottom else: results = reader.readtext(img_data, detail=1, batch_size=1) full_text = [] confidence_scores = [] for (bbox, text, confidence) in results: cleaned_text = re.sub(r'[oO]', '0', text) cleaned_text = re.sub(r'[lL]', '1', cleaned_text) full_text.append(cleaned_text) confidence_scores.append(confidence) avg_confidence = sum(confidence_scores)/len(confidence_scores) if confidence_scores else 0 return { "page": page_num, "text": "\n".join(full_text), "confidence": avg_confidence, "dimensions": {"width": pix.width, "height": pix.height} } except Exception as e: if attempt <= 3: time.sleep(1) gc.collect() return process_page_safely(page, page_num, attempt+1) return {"error": f"Page {page_num} error after {attempt} attempts: {str(e)}"} def process_pdf(pdf_path, progress=gr.Progress()): all_json = [] errors = [] try: with fitz.open(pdf_path) as doc: total_pages = len(doc) for i in range(total_pages): progress(i/total_pages, desc=f"Processing page {i+1}/{total_pages}") page = doc.load_page(i) page_result = process_page_safely(page, i+1) if "error" in page_result: errors.append(page_result["error"]) continue all_json.append(page_result) time.sleep(0.5) gc.collect() return all_json, errors except Exception as e: return None, [f"Processing error: {str(e)}"] def process_folder(folder_path, progress=gr.Progress()): folder_name = os.path.basename(folder_path) all_pdfs_json = [] all_errors = [] # Get all PDF files in the folder pdf_files = [f for f in os.listdir(folder_path) if f.lower().endswith('.pdf')] if not pdf_files: return None, None, f"No PDF files found in folder: {folder_name}" # Process each PDF in the folder for i, pdf_file in enumerate(pdf_files): progress(i/len(pdf_files), desc=f"Processing {pdf_file} in {folder_name}") pdf_path = os.path.join(folder_path, pdf_file) # Create temp file (needed for fitz) with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tf: with open(pdf_path, 'rb') as f: tf.write(f.read()) temp_pdf_path = tf.name try: pdf_json, errors = process_pdf(temp_pdf_path, progress) if pdf_json: all_pdfs_json.extend(pdf_json) if errors: all_errors.extend(errors) finally: try: if os.path.exists(temp_pdf_path): os.unlink(temp_pdf_path) except: pass if not all_pdfs_json: return None, None, "\n".join(all_errors) if all_errors else "No data extracted from any PDF" # Save combined JSON for the folder timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") json_filename = f"{folder_name}_processed_{timestamp}.json" json_path = os.path.join(JSON_SAVE_FOLDER, json_filename) with open(json_path, 'w', encoding='utf-8') as f: json.dump(all_pdfs_json, f, indent=2, ensure_ascii=False) return all_pdfs_json, json_path, "\n".join(all_errors) if all_errors else "No errors" def process_folders(folder_paths, progress=gr.Progress()): all_results = [] all_json_paths = [] all_errors = [] # Ensure we don't process more than 5 folders folder_paths = folder_paths[:5] for i, folder_path in enumerate(folder_paths): progress(i/len(folder_paths), desc=f"Processing folder {i+1}/{len(folder_paths)}") json_data, json_path, errors = process_folder(folder_path, progress) if json_data: all_results.append({ "folder": os.path.basename(folder_path), "data": json_data }) if json_path: all_json_paths.append(json_path) if errors and errors != "No errors": all_errors.append(f"Folder {os.path.basename(folder_path)}: {errors}") return all_results, all_json_paths, "\n".join(all_errors) if all_errors else "No errors" def chunk_json_by_char_limit(data, char_limit=3500): chunks = [] current_chunk = [] current_length = 0 for entry in data: entry_str = json.dumps(entry, ensure_ascii=False) entry_length = len(entry_str) if current_length + entry_length > char_limit: chunks.append(current_chunk) current_chunk = [entry] current_length = entry_length else: current_chunk.append(entry) current_length += entry_length if current_chunk: chunks.append(current_chunk) return chunks def call_llm_api(api_key, json_file_paths, repeated_info, debug_mode): all_csv_data = {} all_debug_info = "" api_status = True for json_path in json_file_paths: try: with open(json_path, 'r', encoding='utf-8') as f: full_data = json.load(f) # Extract folder name from the JSON filename (format: foldername_processed_timestamp.json) folder_name = os.path.basename(json_path).split('_processed_')[0] json_chunks = chunk_json_by_char_limit(full_data, char_limit=3500) all_csv_chunks = [] header_preserved = False debug_info = f"Processing folder: {folder_name}\n" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } for idx, chunk in enumerate(json_chunks): prompt = f""" {repeated_info} Below is a portion of the voter data in JSON format. Please extract all entries into a CSV format with the following columns: विधानसभा, सेक्शन, मतदाता ID, मतदाता का नाम, अभिभावक का नाम, घर संख्या, आयु, लिंग, फोटो उपलब्ध? Rules: 1. Use exactly these column headers in Hindi as shown above 2. Separate values with COMMAS (,) 3. For photo availability, use "हाँ" or "नहीं" 4. Do NOT include any extra explanation — only CSV JSON Data: {json.dumps(chunk, ensure_ascii=False)} Respond with ONLY the CSV data (including header ONLY in the first chunk). """.strip() payload = { "model": "google/gemma-3n-e4b-it:free", "messages": [ {"role": "user", "content": prompt} ], "temperature": 0.1, "max_tokens": 2048 } try: response = requests.post( "https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=120 ) except Exception as e: all_csv_data[folder_name] = pd.DataFrame({"Error": [f"Network error: {str(e)}"]}) debug_info += f"\nError in chunk {idx+1}: {str(e)}\n" api_status = False continue if debug_mode: debug_info += f"\n--- Chunk {idx+1} ---\nStatus: {response.status_code}\n{response.text}\n" if response.status_code != 200: all_csv_data[folder_name] = pd.DataFrame({"Error": [f"API Error on chunk {idx+1}: {response.text}"]}) debug_info += f"\nAPI Error in chunk {idx+1}: {response.text}\n" api_status = False continue chunk_csv = response.json()["choices"][0]["message"]["content"].strip() # Keep header for first chunk only lines = chunk_csv.splitlines() if not header_preserved: all_csv_chunks.append(chunk_csv) header_preserved = True else: if len(lines) > 1: all_csv_chunks.append("\n".join(lines[1:])) else: all_csv_chunks.append("") # if empty or malformed time.sleep(1.5) # Combine CSV results for this folder combined_csv = "\n".join(all_csv_chunks) csv_filename = f"{folder_name}_output_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" csv_path = os.path.join(JSON_SAVE_FOLDER, csv_filename) with open(csv_path, 'w', encoding='utf-8-sig', newline='') as f: f.write(combined_csv) # Attempt to parse CSV into DataFrame try: df = pd.read_csv(io.StringIO(combined_csv)) all_csv_data[folder_name] = df except Exception as e: all_csv_data[folder_name] = pd.DataFrame({"Error": [f"CSV Parsing Error: {str(e)}", combined_csv]}) api_status = False if debug_mode: all_debug_info += debug_info + "\n" except Exception as e: all_csv_data[folder_name] = pd.DataFrame({"Error": [str(e)]}) all_debug_info += f"\nError processing {folder_name}: {str(e)}\n" api_status = False # Prepare download files download_files = [] for folder_name in all_csv_data: csv_filename = f"{folder_name}_output.csv" csv_path = os.path.join(JSON_SAVE_FOLDER, csv_filename) all_csv_data[folder_name].to_csv(csv_path, index=False, encoding='utf-8-sig') download_files.append(csv_path) # If only one folder, return its DataFrame directly, otherwise return a dict of DataFrames if len(all_csv_data) == 1: df_output = list(all_csv_data.values())[0] else: df_output = pd.concat(all_csv_data.values(), keys=all_csv_data.keys()) return ( df_output, download_files[0] if len(download_files) == 1 else download_files, all_debug_info if debug_mode else "", api_status ) # Gradio interface with gr.Blocks(title="Hindi PDF Folder Processor with LLM API") as demo: gr.Markdown("## 📄 Hindi PDF Folder Processor with LLM API") gr.Markdown("Process folders of PDFs to extract text and convert to structured CSV using LLM") with gr.Tab("PDF Processing"): with gr.Row(): with gr.Column(): folder_input = gr.File( label="Upload Folder(s) (Select multiple)", file_count="multiple", file_types=[".pdf"] ) pdf_submit = gr.Button("Process PDF Folders") gr.Markdown("Note: Please select multiple folders (up to 5) containing PDFs") with gr.Column(): json_display = gr.JSON(label="Extracted JSON Data") pdf_errors = gr.Textbox(label="Processing Errors") json_download = gr.File(label="Download JSON Files", visible=False) with gr.Tab("LLM API Processing"): with gr.Row(): with gr.Column(): api_key = gr.Textbox(label="OpenRouter API Key", type="password") repeated_info = gr.Textbox( label="Additional Instructions", value="Extract voter information from the following text:" ) debug_mode = gr.Checkbox(label="Enable Debug Mode") api_submit = gr.Button("Call LLM API") with gr.Column(): dataframe_output = gr.Dataframe(label="CSV Output", wrap=True) csv_download = gr.File(label="Download CSV Files") api_debug = gr.Textbox(label="Debug Information", visible=False) api_status = gr.Textbox(label="API Status", visible=False) def process_selected_folders(files): # Filter out non-directory files and limit to 5 folders folder_paths = [] for file_info in files: file_path = file_info.name if os.path.isdir(file_path): folder_paths.append(file_path) if len(folder_paths) >= 5: break if not folder_paths: return None, None, "No valid folders selected or found in the upload" return process_folders(folder_paths) # PDF Processing pdf_submit.click( process_selected_folders, inputs=[folder_input], outputs=[json_display, json_download, pdf_errors] ) # API Processing api_submit.click( call_llm_api, inputs=[api_key, json_download, repeated_info, debug_mode], outputs=[dataframe_output, csv_download, api_debug, api_status] ) # Show/hide debug based on checkbox debug_mode.change( lambda x: gr.update(visible=x), inputs=[debug_mode], outputs=[api_debug] ) # Update API status visibility api_submit.click( lambda: gr.update(visible=True), inputs=None, outputs=[api_status] ) if __name__ == "__main__": demo.launch(share=True)