Spaces:
Sleeping
Sleeping
import gradio as gr | |
import fitz # PyMuPDF | |
import easyocr | |
import os | |
import tempfile | |
import numpy as np | |
import json | |
import cv2 | |
import re | |
import csv | |
import io | |
import time | |
import gc | |
import requests | |
from datetime import datetime | |
import pandas as pd | |
# Configuration | |
JSON_SAVE_FOLDER = "processed_json" | |
os.makedirs(JSON_SAVE_FOLDER, exist_ok=True) | |
# Initialize EasyOCR reader with CPU only | |
def init_ocr(): | |
return easyocr.Reader(['hi', 'en'], gpu=False) # Force CPU usage | |
reader = init_ocr() | |
def process_page_safely(page, page_num, attempt=1): | |
try: | |
pix = page.get_pixmap(dpi=200) | |
img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) | |
if pix.n == 4: | |
img_data = cv2.cvtColor(img_data, cv2.COLOR_RGBA2RGB) | |
max_pixels = 2000 * 2000 | |
if img_data.shape[0] * img_data.shape[1] > max_pixels: | |
half = img_data.shape[0] // 2 | |
top_part = img_data[:half, :] | |
bottom_part = img_data[half:, :] | |
results_top = reader.readtext(top_part, detail=1, batch_size=1) | |
results_bottom = reader.readtext(bottom_part, detail=1, batch_size=1) | |
results = results_top + results_bottom | |
else: | |
results = reader.readtext(img_data, detail=1, batch_size=1) | |
full_text = [] | |
confidence_scores = [] | |
for (bbox, text, confidence) in results: | |
cleaned_text = re.sub(r'[oO]', '0', text) | |
cleaned_text = re.sub(r'[lL]', '1', cleaned_text) | |
full_text.append(cleaned_text) | |
confidence_scores.append(confidence) | |
avg_confidence = sum(confidence_scores)/len(confidence_scores) if confidence_scores else 0 | |
return { | |
"page": page_num, | |
"text": "\n".join(full_text), | |
"confidence": avg_confidence, | |
"dimensions": {"width": pix.width, "height": pix.height} | |
} | |
except Exception as e: | |
if attempt <= 3: | |
time.sleep(1) | |
gc.collect() | |
return process_page_safely(page, page_num, attempt+1) | |
return {"error": f"Page {page_num} error after {attempt} attempts: {str(e)}"} | |
def process_pdf(pdf_file, progress=gr.Progress()): | |
all_json = [] | |
errors = [] | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tf: | |
tf.write(pdf_file) | |
temp_pdf_path = tf.name | |
try: | |
with fitz.open(temp_pdf_path) as doc: | |
total_pages = len(doc) | |
for i in range(total_pages): | |
progress(i/total_pages, desc=f"Processing page {i+1}/{total_pages}") | |
page = doc.load_page(i) | |
page_result = process_page_safely(page, i+1) | |
if "error" in page_result: | |
errors.append(page_result["error"]) | |
continue | |
all_json.append(page_result) | |
time.sleep(0.5) | |
gc.collect() | |
# Generate timestamp for filename | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
json_filename = f"processed_{timestamp}.json" | |
json_path = os.path.join(JSON_SAVE_FOLDER, json_filename) | |
# Save JSON to file with UTF-8 encoding | |
with open(json_path, 'w', encoding='utf-8') as f: | |
json.dump(all_json, f, indent=2, ensure_ascii=False) | |
return ( | |
all_json, # For JSON display | |
json_path, # For file download | |
"\n".join(errors) if errors else "No errors" # For error display | |
) | |
except Exception as e: | |
return ( | |
None, | |
None, | |
f"Processing error: {str(e)}" | |
) | |
finally: | |
try: | |
if os.path.exists(temp_pdf_path): | |
os.unlink(temp_pdf_path) | |
except: | |
pass | |
def chunk_json_by_char_limit(data, char_limit=3500): | |
chunks = [] | |
current_chunk = [] | |
current_length = 0 | |
for entry in data: | |
entry_str = json.dumps(entry, ensure_ascii=False) | |
entry_length = len(entry_str) | |
if current_length + entry_length > char_limit: | |
chunks.append(current_chunk) | |
current_chunk = [entry] | |
current_length = entry_length | |
else: | |
current_chunk.append(entry) | |
current_length += entry_length | |
if current_chunk: | |
chunks.append(current_chunk) | |
return chunks | |
def call_llm_api(api_key, json_file_path, repeated_info, debug_mode): | |
try: | |
with open(json_file_path, 'r', encoding='utf-8') as f: | |
full_data = json.load(f) | |
# NEW: chunk by char limit | |
json_chunks = chunk_json_by_char_limit(full_data, char_limit=3500) | |
all_csv_chunks = [] | |
header_preserved = False | |
debug_info = "" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
for idx, chunk in enumerate(json_chunks): | |
prompt = f""" | |
{repeated_info} | |
Below is a portion of the voter data in JSON format. Please extract all entries into a CSV format with the following columns: | |
विधानसभा, सेक्शन, मतदाता ID, मतदाता का नाम, अभिभावक का नाम, घर संख्या, आयु, लिंग, फोटो उपलब्ध? | |
Rules: | |
1. Use exactly these column headers in Hindi as shown above | |
2. Separate values with COMMAS (,) | |
3. For photo availability, use "हाँ" or "नहीं" | |
4. Do NOT include any extra explanation — only CSV | |
JSON Data: | |
{json.dumps(chunk, ensure_ascii=False)} | |
Respond with ONLY the CSV data (including header ONLY in the first chunk). | |
""".strip() | |
payload = { | |
"model": "google/gemma-3n-e4b-it:free", | |
"messages": [ | |
{"role": "user", "content": prompt} | |
], | |
"temperature": 0.1, | |
"max_tokens": 2048 | |
} | |
try: | |
response = requests.post( | |
"https://openrouter.ai/api/v1/chat/completions", | |
headers=headers, | |
json=payload, | |
timeout=120 | |
) | |
except Exception as e: | |
return ( | |
pd.DataFrame({"Error": [f"Network error: {str(e)}"]}), | |
None, | |
debug_info, | |
False | |
) | |
if debug_mode: | |
debug_info += f"\n--- Chunk {idx+1} ---\nStatus: {response.status_code}\n{response.text}\n" | |
if response.status_code != 200: | |
return ( | |
pd.DataFrame({"Error": [f"API Error on chunk {idx+1}: {response.text}"]}), | |
None, | |
debug_info, | |
False | |
) | |
chunk_csv = response.json()["choices"][0]["message"]["content"].strip() | |
# Keep header for first chunk only | |
lines = chunk_csv.splitlines() | |
if not header_preserved: | |
all_csv_chunks.append(chunk_csv) | |
header_preserved = True | |
else: | |
if len(lines) > 1: | |
all_csv_chunks.append("\n".join(lines[1:])) | |
else: | |
all_csv_chunks.append("") # if empty or malformed | |
time.sleep(1.5) | |
# Combine CSV results | |
combined_csv = "\n".join(all_csv_chunks) | |
csv_filename = f"output_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
csv_path = os.path.join(JSON_SAVE_FOLDER, csv_filename) | |
with open(csv_path, 'w', encoding='utf-8-sig', newline='') as f: | |
f.write(combined_csv) | |
# Attempt to parse CSV into DataFrame | |
try: | |
df = pd.read_csv(io.StringIO(combined_csv)) | |
except Exception as e: | |
df = pd.DataFrame({"Error": [f"CSV Parsing Error: {str(e)}", combined_csv]}) | |
return ( | |
df, | |
csv_path, | |
debug_info if debug_mode else "", | |
True | |
) | |
except Exception as e: | |
return ( | |
pd.DataFrame({"Error": [str(e)]}), | |
None, | |
f"Unexpected error: {str(e)}", | |
False | |
) | |
# Gradio interface | |
with gr.Blocks(title="Hindi PDF Processor with LLM API") as demo: | |
gr.Markdown("## 📄 Hindi PDF Processor with LLM API") | |
gr.Markdown("Process PDFs to extract text and convert to structured CSV using LLM") | |
with gr.Tab("PDF Processing"): | |
with gr.Row(): | |
with gr.Column(): | |
pdf_input = gr.File(label="Upload PDF File", type="binary") | |
pdf_submit = gr.Button("Process PDF") | |
with gr.Column(): | |
json_display = gr.JSON(label="Extracted JSON Data") | |
pdf_errors = gr.Textbox(label="Processing Errors") | |
json_download = gr.File(label="Download JSON File", visible=False) | |
with gr.Tab("LLM API Processing"): | |
with gr.Row(): | |
with gr.Column(): | |
api_key = gr.Textbox(label="OpenRouter API Key", type="password") | |
repeated_info = gr.Textbox(label="Additional Instructions", | |
value="Extract voter information from the following text:") | |
debug_mode = gr.Checkbox(label="Enable Debug Mode") | |
api_submit = gr.Button("Call LLM API") | |
with gr.Column(): | |
dataframe_output = gr.Dataframe(label="CSV Output", wrap=True) | |
csv_download = gr.File(label="Download CSV File") | |
api_debug = gr.Textbox(label="Debug Information", visible=False) | |
api_status = gr.Textbox(label="API Status", visible=False) | |
# PDF Processing | |
pdf_submit.click( | |
process_pdf, | |
inputs=[pdf_input], | |
outputs=[json_display, json_download, pdf_errors] | |
) | |
# API Processing | |
api_submit.click( | |
call_llm_api, | |
inputs=[api_key, json_download, repeated_info, debug_mode], | |
outputs=[dataframe_output, csv_download, api_debug, api_status] | |
) | |
# Show/hide debug based on checkbox | |
debug_mode.change( | |
lambda x: gr.update(visible=x), | |
inputs=[debug_mode], | |
outputs=[api_debug] | |
) | |
# Update API status visibility | |
api_submit.click( | |
lambda: gr.update(visible=True), | |
inputs=None, | |
outputs=[api_status] | |
) | |
if __name__ == "__main__": | |
demo.launch() |