Spaces:
Sleeping
Sleeping
import gradio as gr | |
import fitz # PyMuPDF | |
import easyocr | |
import os | |
import tempfile | |
import numpy as np | |
import json | |
import cv2 | |
import re | |
import csv | |
import io | |
import time | |
import gc | |
import requests | |
from datetime import datetime | |
import pandas as pd | |
from pathlib import Path | |
# Configuration | |
JSON_SAVE_FOLDER = "processed_json" | |
os.makedirs(JSON_SAVE_FOLDER, exist_ok=True) | |
# Initialize EasyOCR reader with CPU only | |
def init_ocr(): | |
return easyocr.Reader(['hi', 'en'], gpu=False) # Force CPU usage | |
reader = init_ocr() | |
def process_page_safely(page, page_num, attempt=1): | |
try: | |
pix = page.get_pixmap(dpi=200) | |
img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n) | |
if pix.n == 4: | |
img_data = cv2.cvtColor(img_data, cv2.COLOR_RGBA2RGB) | |
max_pixels = 2000 * 2000 | |
if img_data.shape[0] * img_data.shape[1] > max_pixels: | |
half = img_data.shape[0] // 2 | |
top_part = img_data[:half, :] | |
bottom_part = img_data[half:, :] | |
results_top = reader.readtext(top_part, detail=1, batch_size=1) | |
results_bottom = reader.readtext(bottom_part, detail=1, batch_size=1) | |
results = results_top + results_bottom | |
else: | |
results = reader.readtext(img_data, detail=1, batch_size=1) | |
full_text = [] | |
confidence_scores = [] | |
for (bbox, text, confidence) in results: | |
cleaned_text = re.sub(r'[oO]', '0', text) | |
cleaned_text = re.sub(r'[lL]', '1', cleaned_text) | |
full_text.append(cleaned_text) | |
confidence_scores.append(confidence) | |
avg_confidence = sum(confidence_scores)/len(confidence_scores) if confidence_scores else 0 | |
return { | |
"page": page_num, | |
"text": "\n".join(full_text), | |
"confidence": avg_confidence, | |
"dimensions": {"width": pix.width, "height": pix.height} | |
} | |
except Exception as e: | |
if attempt <= 3: | |
time.sleep(1) | |
gc.collect() | |
return process_page_safely(page, page_num, attempt+1) | |
return {"error": f"Page {page_num} error after {attempt} attempts: {str(e)}"} | |
def process_pdf(pdf_path, progress=gr.Progress()): | |
all_json = [] | |
errors = [] | |
try: | |
with fitz.open(pdf_path) as doc: | |
total_pages = len(doc) | |
for i in range(total_pages): | |
progress(i/total_pages, desc=f"Processing page {i+1}/{total_pages}") | |
page = doc.load_page(i) | |
page_result = process_page_safely(page, i+1) | |
if "error" in page_result: | |
errors.append(page_result["error"]) | |
continue | |
all_json.append(page_result) | |
time.sleep(0.5) | |
gc.collect() | |
return all_json, errors | |
except Exception as e: | |
return None, [f"Processing error: {str(e)}"] | |
def process_folder(folder_path, progress=gr.Progress()): | |
folder_name = os.path.basename(folder_path) | |
all_pdfs_json = [] | |
all_errors = [] | |
# Get all PDF files in the folder | |
pdf_files = [f for f in os.listdir(folder_path) if f.lower().endswith('.pdf')] | |
if not pdf_files: | |
return None, None, f"No PDF files found in folder: {folder_name}" | |
# Process each PDF in the folder | |
for i, pdf_file in enumerate(pdf_files): | |
progress(i/len(pdf_files), desc=f"Processing {pdf_file} in {folder_name}") | |
pdf_path = os.path.join(folder_path, pdf_file) | |
# Create temp file (needed for fitz) | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tf: | |
with open(pdf_path, 'rb') as f: | |
tf.write(f.read()) | |
temp_pdf_path = tf.name | |
try: | |
pdf_json, errors = process_pdf(temp_pdf_path, progress) | |
if pdf_json: | |
all_pdfs_json.extend(pdf_json) | |
if errors: | |
all_errors.extend(errors) | |
finally: | |
try: | |
if os.path.exists(temp_pdf_path): | |
os.unlink(temp_pdf_path) | |
except: | |
pass | |
if not all_pdfs_json: | |
return None, None, "\n".join(all_errors) if all_errors else "No data extracted from any PDF" | |
# Save combined JSON for the folder | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
json_filename = f"{folder_name}_processed_{timestamp}.json" | |
json_path = os.path.join(JSON_SAVE_FOLDER, json_filename) | |
with open(json_path, 'w', encoding='utf-8') as f: | |
json.dump(all_pdfs_json, f, indent=2, ensure_ascii=False) | |
return all_pdfs_json, json_path, "\n".join(all_errors) if all_errors else "No errors" | |
def process_folders(folder_paths, progress=gr.Progress()): | |
all_results = [] | |
all_json_paths = [] | |
all_errors = [] | |
# Ensure we don't process more than 5 folders | |
folder_paths = folder_paths[:5] | |
for i, folder_path in enumerate(folder_paths): | |
progress(i/len(folder_paths), desc=f"Processing folder {i+1}/{len(folder_paths)}") | |
json_data, json_path, errors = process_folder(folder_path, progress) | |
if json_data: | |
all_results.append({ | |
"folder": os.path.basename(folder_path), | |
"data": json_data | |
}) | |
if json_path: | |
all_json_paths.append(json_path) | |
if errors and errors != "No errors": | |
all_errors.append(f"Folder {os.path.basename(folder_path)}: {errors}") | |
return all_results, all_json_paths, "\n".join(all_errors) if all_errors else "No errors" | |
def chunk_json_by_char_limit(data, char_limit=3500): | |
chunks = [] | |
current_chunk = [] | |
current_length = 0 | |
for entry in data: | |
entry_str = json.dumps(entry, ensure_ascii=False) | |
entry_length = len(entry_str) | |
if current_length + entry_length > char_limit: | |
chunks.append(current_chunk) | |
current_chunk = [entry] | |
current_length = entry_length | |
else: | |
current_chunk.append(entry) | |
current_length += entry_length | |
if current_chunk: | |
chunks.append(current_chunk) | |
return chunks | |
def call_llm_api(api_key, json_file_paths, repeated_info, debug_mode): | |
all_csv_data = {} | |
all_debug_info = "" | |
api_status = True | |
for json_path in json_file_paths: | |
try: | |
with open(json_path, 'r', encoding='utf-8') as f: | |
full_data = json.load(f) | |
# Extract folder name from the JSON filename (format: foldername_processed_timestamp.json) | |
folder_name = os.path.basename(json_path).split('_processed_')[0] | |
json_chunks = chunk_json_by_char_limit(full_data, char_limit=3500) | |
all_csv_chunks = [] | |
header_preserved = False | |
debug_info = f"Processing folder: {folder_name}\n" | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
for idx, chunk in enumerate(json_chunks): | |
prompt = f""" | |
{repeated_info} | |
Below is a portion of the voter data in JSON format. Please extract all entries into a CSV format with the following columns: | |
विधानसभा, सेक्शन, मतदाता ID, मतदाता का नाम, अभिभावक का नाम, घर संख्या, आयु, लिंग, फोटो उपलब्ध? | |
Rules: | |
1. Use exactly these column headers in Hindi as shown above | |
2. Separate values with COMMAS (,) | |
3. For photo availability, use "हाँ" or "नहीं" | |
4. Do NOT include any extra explanation — only CSV | |
JSON Data: | |
{json.dumps(chunk, ensure_ascii=False)} | |
Respond with ONLY the CSV data (including header ONLY in the first chunk). | |
""".strip() | |
payload = { | |
"model": "google/gemma-3n-e4b-it:free", | |
"messages": [ | |
{"role": "user", "content": prompt} | |
], | |
"temperature": 0.1, | |
"max_tokens": 2048 | |
} | |
try: | |
response = requests.post( | |
"https://openrouter.ai/api/v1/chat/completions", | |
headers=headers, | |
json=payload, | |
timeout=120 | |
) | |
except Exception as e: | |
all_csv_data[folder_name] = pd.DataFrame({"Error": [f"Network error: {str(e)}"]}) | |
debug_info += f"\nError in chunk {idx+1}: {str(e)}\n" | |
api_status = False | |
continue | |
if debug_mode: | |
debug_info += f"\n--- Chunk {idx+1} ---\nStatus: {response.status_code}\n{response.text}\n" | |
if response.status_code != 200: | |
all_csv_data[folder_name] = pd.DataFrame({"Error": [f"API Error on chunk {idx+1}: {response.text}"]}) | |
debug_info += f"\nAPI Error in chunk {idx+1}: {response.text}\n" | |
api_status = False | |
continue | |
chunk_csv = response.json()["choices"][0]["message"]["content"].strip() | |
# Keep header for first chunk only | |
lines = chunk_csv.splitlines() | |
if not header_preserved: | |
all_csv_chunks.append(chunk_csv) | |
header_preserved = True | |
else: | |
if len(lines) > 1: | |
all_csv_chunks.append("\n".join(lines[1:])) | |
else: | |
all_csv_chunks.append("") # if empty or malformed | |
time.sleep(1.5) | |
# Combine CSV results for this folder | |
combined_csv = "\n".join(all_csv_chunks) | |
csv_filename = f"{folder_name}_output_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
csv_path = os.path.join(JSON_SAVE_FOLDER, csv_filename) | |
with open(csv_path, 'w', encoding='utf-8-sig', newline='') as f: | |
f.write(combined_csv) | |
# Attempt to parse CSV into DataFrame | |
try: | |
df = pd.read_csv(io.StringIO(combined_csv)) | |
all_csv_data[folder_name] = df | |
except Exception as e: | |
all_csv_data[folder_name] = pd.DataFrame({"Error": [f"CSV Parsing Error: {str(e)}", combined_csv]}) | |
api_status = False | |
if debug_mode: | |
all_debug_info += debug_info + "\n" | |
except Exception as e: | |
all_csv_data[folder_name] = pd.DataFrame({"Error": [str(e)]}) | |
all_debug_info += f"\nError processing {folder_name}: {str(e)}\n" | |
api_status = False | |
# Prepare download files | |
download_files = [] | |
for folder_name in all_csv_data: | |
csv_filename = f"{folder_name}_output.csv" | |
csv_path = os.path.join(JSON_SAVE_FOLDER, csv_filename) | |
all_csv_data[folder_name].to_csv(csv_path, index=False, encoding='utf-8-sig') | |
download_files.append(csv_path) | |
# If only one folder, return its DataFrame directly, otherwise return a dict of DataFrames | |
if len(all_csv_data) == 1: | |
df_output = list(all_csv_data.values())[0] | |
else: | |
df_output = pd.concat(all_csv_data.values(), keys=all_csv_data.keys()) | |
return ( | |
df_output, | |
download_files[0] if len(download_files) == 1 else download_files, | |
all_debug_info if debug_mode else "", | |
api_status | |
) | |
# Gradio interface | |
with gr.Blocks(title="Hindi PDF Folder Processor with LLM API") as demo: | |
gr.Markdown("## 📄 Hindi PDF Folder Processor with LLM API") | |
gr.Markdown("Process folders of PDFs to extract text and convert to structured CSV using LLM") | |
with gr.Tab("PDF Processing"): | |
with gr.Row(): | |
with gr.Column(): | |
folder_input = gr.File( | |
label="Upload Folder(s) (Select multiple)", | |
file_count="multiple", | |
file_types=[".pdf"] | |
) | |
pdf_submit = gr.Button("Process PDF Folders") | |
gr.Markdown("Note: Please select multiple folders (up to 5) containing PDFs") | |
with gr.Column(): | |
json_display = gr.JSON(label="Extracted JSON Data") | |
pdf_errors = gr.Textbox(label="Processing Errors") | |
json_download = gr.File(label="Download JSON Files", visible=False) | |
with gr.Tab("LLM API Processing"): | |
with gr.Row(): | |
with gr.Column(): | |
api_key = gr.Textbox(label="OpenRouter API Key", type="password") | |
repeated_info = gr.Textbox( | |
label="Additional Instructions", | |
value="Extract voter information from the following text:" | |
) | |
debug_mode = gr.Checkbox(label="Enable Debug Mode") | |
api_submit = gr.Button("Call LLM API") | |
with gr.Column(): | |
dataframe_output = gr.Dataframe(label="CSV Output", wrap=True) | |
csv_download = gr.File(label="Download CSV Files") | |
api_debug = gr.Textbox(label="Debug Information", visible=False) | |
api_status = gr.Textbox(label="API Status", visible=False) | |
def process_selected_folders(files): | |
# Filter out non-directory files and limit to 5 folders | |
folder_paths = [] | |
for file_info in files: | |
file_path = file_info.name | |
if os.path.isdir(file_path): | |
folder_paths.append(file_path) | |
if len(folder_paths) >= 5: | |
break | |
if not folder_paths: | |
return None, None, "No valid folders selected or found in the upload" | |
return process_folders(folder_paths) | |
# PDF Processing | |
pdf_submit.click( | |
process_selected_folders, | |
inputs=[folder_input], | |
outputs=[json_display, json_download, pdf_errors] | |
) | |
# API Processing | |
api_submit.click( | |
call_llm_api, | |
inputs=[api_key, json_download, repeated_info, debug_mode], | |
outputs=[dataframe_output, csv_download, api_debug, api_status] | |
) | |
# Show/hide debug based on checkbox | |
debug_mode.change( | |
lambda x: gr.update(visible=x), | |
inputs=[debug_mode], | |
outputs=[api_debug] | |
) | |
# Update API status visibility | |
api_submit.click( | |
lambda: gr.update(visible=True), | |
inputs=None, | |
outputs=[api_status] | |
) | |
if __name__ == "__main__": | |
demo.launch(share=True) |