import gradio as gr import fitz # PyMuPDF import easyocr import os import tempfile import numpy as np import json import cv2 import re import csv import io import time import gc import requests from datetime import datetime import pandas as pd # Configuration JSON_SAVE_FOLDER = "processed_json" os.makedirs(JSON_SAVE_FOLDER, exist_ok=True) # Initialize EasyOCR reader with CPU only def init_ocr(): return easyocr.Reader(['hi', 'en'], gpu=False) # Force CPU usage reader = init_ocr() def process_page_safely(page, page_num, attempt=1): try: pix = page.get_pixmap(dpi=200) img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) if pix.n == 4: img_data = cv2.cvtColor(img_data, cv2.COLOR_RGBA2RGB) max_pixels = 2000 * 2000 if img_data.shape[0] * img_data.shape[1] > max_pixels: half = img_data.shape[0] // 2 top_part = img_data[:half, :] bottom_part = img_data[half:, :] results_top = reader.readtext(top_part, detail=1, batch_size=1) results_bottom = reader.readtext(bottom_part, detail=1, batch_size=1) results = results_top + results_bottom else: results = reader.readtext(img_data, detail=1, batch_size=1) full_text = [] confidence_scores = [] for (bbox, text, confidence) in results: cleaned_text = re.sub(r'[oO]', '0', text) cleaned_text = re.sub(r'[lL]', '1', cleaned_text) full_text.append(cleaned_text) confidence_scores.append(confidence) avg_confidence = sum(confidence_scores)/len(confidence_scores) if confidence_scores else 0 return { "page": page_num, "text": "\n".join(full_text), "confidence": avg_confidence, "dimensions": {"width": pix.width, "height": pix.height} } except Exception as e: if attempt <= 3: time.sleep(1) gc.collect() return process_page_safely(page, page_num, attempt+1) return {"error": f"Page {page_num} error after {attempt} attempts: {str(e)}"} def process_pdf(pdf_file, progress=gr.Progress()): all_json = [] errors = [] with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tf: tf.write(pdf_file) temp_pdf_path = tf.name try: with fitz.open(temp_pdf_path) as doc: total_pages = len(doc) for i in range(total_pages): progress(i/total_pages, desc=f"Processing page {i+1}/{total_pages}") page = doc.load_page(i) page_result = process_page_safely(page, i+1) if "error" in page_result: errors.append(page_result["error"]) continue all_json.append(page_result) time.sleep(0.5) gc.collect() # Generate timestamp for filename timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") json_filename = f"processed_{timestamp}.json" json_path = os.path.join(JSON_SAVE_FOLDER, json_filename) # Save JSON to file with UTF-8 encoding with open(json_path, 'w', encoding='utf-8') as f: json.dump(all_json, f, indent=2, ensure_ascii=False) return ( all_json, # For JSON display json_path, # For file download "\n".join(errors) if errors else "No errors" # For error display ) except Exception as e: return ( None, None, f"Processing error: {str(e)}" ) finally: try: if os.path.exists(temp_pdf_path): os.unlink(temp_pdf_path) except: pass def chunk_json_by_char_limit(data, char_limit=3500): chunks = [] current_chunk = [] current_length = 0 for entry in data: entry_str = json.dumps(entry, ensure_ascii=False) entry_length = len(entry_str) if current_length + entry_length > char_limit: chunks.append(current_chunk) current_chunk = [entry] current_length = entry_length else: current_chunk.append(entry) current_length += entry_length if current_chunk: chunks.append(current_chunk) return chunks def call_llm_api(api_key, json_file_path, repeated_info, debug_mode): try: with open(json_file_path, 'r', encoding='utf-8') as f: full_data = json.load(f) # NEW: chunk by char limit json_chunks = chunk_json_by_char_limit(full_data, char_limit=3500) all_csv_chunks = [] header_preserved = False debug_info = "" headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json" } for idx, chunk in enumerate(json_chunks): prompt = f""" {repeated_info} Below is a portion of the voter data in JSON format. Please extract all entries into a CSV format with the following columns: विधानसभा, सेक्शन, मतदाता ID, मतदाता का नाम, अभिभावक का नाम, घर संख्या, आयु, लिंग, फोटो उपलब्ध? Rules: 1. Use exactly these column headers in Hindi as shown above 2. Separate values with COMMAS (,) 3. For photo availability, use "हाँ" or "नहीं" 4. Do NOT include any extra explanation — only CSV JSON Data: {json.dumps(chunk, ensure_ascii=False)} Respond with ONLY the CSV data (including header ONLY in the first chunk). """.strip() payload = { "model": "google/gemma-3n-e4b-it:free", "messages": [ {"role": "user", "content": prompt} ], "temperature": 0.1, "max_tokens": 2048 } try: response = requests.post( "https://openrouter.ai/api/v1/chat/completions", headers=headers, json=payload, timeout=120 ) except Exception as e: return ( pd.DataFrame({"Error": [f"Network error: {str(e)}"]}), None, debug_info, False ) if debug_mode: debug_info += f"\n--- Chunk {idx+1} ---\nStatus: {response.status_code}\n{response.text}\n" if response.status_code != 200: return ( pd.DataFrame({"Error": [f"API Error on chunk {idx+1}: {response.text}"]}), None, debug_info, False ) chunk_csv = response.json()["choices"][0]["message"]["content"].strip() # Keep header for first chunk only lines = chunk_csv.splitlines() if not header_preserved: all_csv_chunks.append(chunk_csv) header_preserved = True else: if len(lines) > 1: all_csv_chunks.append("\n".join(lines[1:])) else: all_csv_chunks.append("") # if empty or malformed time.sleep(1.5) # Combine CSV results combined_csv = "\n".join(all_csv_chunks) csv_filename = f"output_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" csv_path = os.path.join(JSON_SAVE_FOLDER, csv_filename) with open(csv_path, 'w', encoding='utf-8-sig', newline='') as f: f.write(combined_csv) # Attempt to parse CSV into DataFrame try: df = pd.read_csv(io.StringIO(combined_csv)) except Exception as e: df = pd.DataFrame({"Error": [f"CSV Parsing Error: {str(e)}", combined_csv]}) return ( df, csv_path, debug_info if debug_mode else "", True ) except Exception as e: return ( pd.DataFrame({"Error": [str(e)]}), None, f"Unexpected error: {str(e)}", False ) # Gradio interface with gr.Blocks(title="Hindi PDF Processor with LLM API") as demo: gr.Markdown("## 📄 Hindi PDF Processor with LLM API") gr.Markdown("Process PDFs to extract text and convert to structured CSV using LLM") with gr.Tab("PDF Processing"): with gr.Row(): with gr.Column(): pdf_input = gr.File(label="Upload PDF File", type="binary") pdf_submit = gr.Button("Process PDF") with gr.Column(): json_display = gr.JSON(label="Extracted JSON Data") pdf_errors = gr.Textbox(label="Processing Errors") json_download = gr.File(label="Download JSON File", visible=False) with gr.Tab("LLM API Processing"): with gr.Row(): with gr.Column(): api_key = gr.Textbox(label="OpenRouter API Key", type="password") repeated_info = gr.Textbox(label="Additional Instructions", value="Extract voter information from the following text:") debug_mode = gr.Checkbox(label="Enable Debug Mode") api_submit = gr.Button("Call LLM API") with gr.Column(): dataframe_output = gr.Dataframe(label="CSV Output", wrap=True) csv_download = gr.File(label="Download CSV File") api_debug = gr.Textbox(label="Debug Information", visible=False) api_status = gr.Textbox(label="API Status", visible=False) # PDF Processing pdf_submit.click( process_pdf, inputs=[pdf_input], outputs=[json_display, json_download, pdf_errors] ) # API Processing api_submit.click( call_llm_api, inputs=[api_key, json_download, repeated_info, debug_mode], outputs=[dataframe_output, csv_download, api_debug, api_status] ) # Show/hide debug based on checkbox debug_mode.change( lambda x: gr.update(visible=x), inputs=[debug_mode], outputs=[api_debug] ) # Update API status visibility api_submit.click( lambda: gr.update(visible=True), inputs=None, outputs=[api_status] ) if __name__ == "__main__": demo.launch()