Electrol_roll / app.py
shivam0109's picture
added few changes and folder
065043f
import gradio as gr
import fitz # PyMuPDF
import easyocr
import os
import tempfile
import numpy as np
import json
import cv2
import re
import csv
import io
import time
import gc
import requests
from datetime import datetime
import pandas as pd
from pathlib import Path
# Configuration
JSON_SAVE_FOLDER = "processed_json"
os.makedirs(JSON_SAVE_FOLDER, exist_ok=True)
# Initialize EasyOCR reader with CPU only
def init_ocr():
return easyocr.Reader(['hi', 'en'], gpu=False) # Force CPU usage
reader = init_ocr()
def process_page_safely(page, page_num, attempt=1):
try:
pix = page.get_pixmap(dpi=200)
img_data = np.frombuffer(pix.samples, dtype=np.uint8).reshape(pix.height, pix.width, pix.n)
if pix.n == 4:
img_data = cv2.cvtColor(img_data, cv2.COLOR_RGBA2RGB)
max_pixels = 2000 * 2000
if img_data.shape[0] * img_data.shape[1] > max_pixels:
half = img_data.shape[0] // 2
top_part = img_data[:half, :]
bottom_part = img_data[half:, :]
results_top = reader.readtext(top_part, detail=1, batch_size=1)
results_bottom = reader.readtext(bottom_part, detail=1, batch_size=1)
results = results_top + results_bottom
else:
results = reader.readtext(img_data, detail=1, batch_size=1)
full_text = []
confidence_scores = []
for (bbox, text, confidence) in results:
cleaned_text = re.sub(r'[oO]', '0', text)
cleaned_text = re.sub(r'[lL]', '1', cleaned_text)
full_text.append(cleaned_text)
confidence_scores.append(confidence)
avg_confidence = sum(confidence_scores)/len(confidence_scores) if confidence_scores else 0
return {
"page": page_num,
"text": "\n".join(full_text),
"confidence": avg_confidence,
"dimensions": {"width": pix.width, "height": pix.height}
}
except Exception as e:
if attempt <= 3:
time.sleep(1)
gc.collect()
return process_page_safely(page, page_num, attempt+1)
return {"error": f"Page {page_num} error after {attempt} attempts: {str(e)}"}
def process_pdf(pdf_path, progress=gr.Progress()):
all_json = []
errors = []
try:
with fitz.open(pdf_path) as doc:
total_pages = len(doc)
for i in range(total_pages):
progress(i/total_pages, desc=f"Processing page {i+1}/{total_pages}")
page = doc.load_page(i)
page_result = process_page_safely(page, i+1)
if "error" in page_result:
errors.append(page_result["error"])
continue
all_json.append(page_result)
time.sleep(0.5)
gc.collect()
return all_json, errors
except Exception as e:
return None, [f"Processing error: {str(e)}"]
def process_folder(folder_path, progress=gr.Progress()):
folder_name = os.path.basename(folder_path)
all_pdfs_json = []
all_errors = []
# Get all PDF files in the folder
pdf_files = [f for f in os.listdir(folder_path) if f.lower().endswith('.pdf')]
if not pdf_files:
return None, None, f"No PDF files found in folder: {folder_name}"
# Process each PDF in the folder
for i, pdf_file in enumerate(pdf_files):
progress(i/len(pdf_files), desc=f"Processing {pdf_file} in {folder_name}")
pdf_path = os.path.join(folder_path, pdf_file)
# Create temp file (needed for fitz)
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tf:
with open(pdf_path, 'rb') as f:
tf.write(f.read())
temp_pdf_path = tf.name
try:
pdf_json, errors = process_pdf(temp_pdf_path, progress)
if pdf_json:
all_pdfs_json.extend(pdf_json)
if errors:
all_errors.extend(errors)
finally:
try:
if os.path.exists(temp_pdf_path):
os.unlink(temp_pdf_path)
except:
pass
if not all_pdfs_json:
return None, None, "\n".join(all_errors) if all_errors else "No data extracted from any PDF"
# Save combined JSON for the folder
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
json_filename = f"{folder_name}_processed_{timestamp}.json"
json_path = os.path.join(JSON_SAVE_FOLDER, json_filename)
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(all_pdfs_json, f, indent=2, ensure_ascii=False)
return all_pdfs_json, json_path, "\n".join(all_errors) if all_errors else "No errors"
def process_folders(folder_paths, progress=gr.Progress()):
all_results = []
all_json_paths = []
all_errors = []
# Ensure we don't process more than 5 folders
folder_paths = folder_paths[:5]
for i, folder_path in enumerate(folder_paths):
progress(i/len(folder_paths), desc=f"Processing folder {i+1}/{len(folder_paths)}")
json_data, json_path, errors = process_folder(folder_path, progress)
if json_data:
all_results.append({
"folder": os.path.basename(folder_path),
"data": json_data
})
if json_path:
all_json_paths.append(json_path)
if errors and errors != "No errors":
all_errors.append(f"Folder {os.path.basename(folder_path)}: {errors}")
return all_results, all_json_paths, "\n".join(all_errors) if all_errors else "No errors"
def chunk_json_by_char_limit(data, char_limit=3500):
chunks = []
current_chunk = []
current_length = 0
for entry in data:
entry_str = json.dumps(entry, ensure_ascii=False)
entry_length = len(entry_str)
if current_length + entry_length > char_limit:
chunks.append(current_chunk)
current_chunk = [entry]
current_length = entry_length
else:
current_chunk.append(entry)
current_length += entry_length
if current_chunk:
chunks.append(current_chunk)
return chunks
def call_llm_api(api_key, json_file_paths, repeated_info, debug_mode):
all_csv_data = {}
all_debug_info = ""
api_status = True
for json_path in json_file_paths:
try:
with open(json_path, 'r', encoding='utf-8') as f:
full_data = json.load(f)
# Extract folder name from the JSON filename (format: foldername_processed_timestamp.json)
folder_name = os.path.basename(json_path).split('_processed_')[0]
json_chunks = chunk_json_by_char_limit(full_data, char_limit=3500)
all_csv_chunks = []
header_preserved = False
debug_info = f"Processing folder: {folder_name}\n"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
for idx, chunk in enumerate(json_chunks):
prompt = f"""
{repeated_info}
Below is a portion of the voter data in JSON format. Please extract all entries into a CSV format with the following columns:
विधानसभा, सेक्शन, मतदाता ID, मतदाता का नाम, अभिभावक का नाम, घर संख्या, आयु, लिंग, फोटो उपलब्ध?
Rules:
1. Use exactly these column headers in Hindi as shown above
2. Separate values with COMMAS (,)
3. For photo availability, use "हाँ" or "नहीं"
4. Do NOT include any extra explanation — only CSV
JSON Data:
{json.dumps(chunk, ensure_ascii=False)}
Respond with ONLY the CSV data (including header ONLY in the first chunk).
""".strip()
payload = {
"model": "google/gemma-3n-e4b-it:free",
"messages": [
{"role": "user", "content": prompt}
],
"temperature": 0.1,
"max_tokens": 2048
}
try:
response = requests.post(
"https://openrouter.ai/api/v1/chat/completions",
headers=headers,
json=payload,
timeout=120
)
except Exception as e:
all_csv_data[folder_name] = pd.DataFrame({"Error": [f"Network error: {str(e)}"]})
debug_info += f"\nError in chunk {idx+1}: {str(e)}\n"
api_status = False
continue
if debug_mode:
debug_info += f"\n--- Chunk {idx+1} ---\nStatus: {response.status_code}\n{response.text}\n"
if response.status_code != 200:
all_csv_data[folder_name] = pd.DataFrame({"Error": [f"API Error on chunk {idx+1}: {response.text}"]})
debug_info += f"\nAPI Error in chunk {idx+1}: {response.text}\n"
api_status = False
continue
chunk_csv = response.json()["choices"][0]["message"]["content"].strip()
# Keep header for first chunk only
lines = chunk_csv.splitlines()
if not header_preserved:
all_csv_chunks.append(chunk_csv)
header_preserved = True
else:
if len(lines) > 1:
all_csv_chunks.append("\n".join(lines[1:]))
else:
all_csv_chunks.append("") # if empty or malformed
time.sleep(1.5)
# Combine CSV results for this folder
combined_csv = "\n".join(all_csv_chunks)
csv_filename = f"{folder_name}_output_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
csv_path = os.path.join(JSON_SAVE_FOLDER, csv_filename)
with open(csv_path, 'w', encoding='utf-8-sig', newline='') as f:
f.write(combined_csv)
# Attempt to parse CSV into DataFrame
try:
df = pd.read_csv(io.StringIO(combined_csv))
all_csv_data[folder_name] = df
except Exception as e:
all_csv_data[folder_name] = pd.DataFrame({"Error": [f"CSV Parsing Error: {str(e)}", combined_csv]})
api_status = False
if debug_mode:
all_debug_info += debug_info + "\n"
except Exception as e:
all_csv_data[folder_name] = pd.DataFrame({"Error": [str(e)]})
all_debug_info += f"\nError processing {folder_name}: {str(e)}\n"
api_status = False
# Prepare download files
download_files = []
for folder_name in all_csv_data:
csv_filename = f"{folder_name}_output.csv"
csv_path = os.path.join(JSON_SAVE_FOLDER, csv_filename)
all_csv_data[folder_name].to_csv(csv_path, index=False, encoding='utf-8-sig')
download_files.append(csv_path)
# If only one folder, return its DataFrame directly, otherwise return a dict of DataFrames
if len(all_csv_data) == 1:
df_output = list(all_csv_data.values())[0]
else:
df_output = pd.concat(all_csv_data.values(), keys=all_csv_data.keys())
return (
df_output,
download_files[0] if len(download_files) == 1 else download_files,
all_debug_info if debug_mode else "",
api_status
)
# Gradio interface
with gr.Blocks(title="Hindi PDF Folder Processor with LLM API") as demo:
gr.Markdown("## 📄 Hindi PDF Folder Processor with LLM API")
gr.Markdown("Process folders of PDFs to extract text and convert to structured CSV using LLM")
with gr.Tab("PDF Processing"):
with gr.Row():
with gr.Column():
folder_input = gr.File(
label="Upload Folder(s) (Select multiple)",
file_count="multiple",
file_types=[".pdf"]
)
pdf_submit = gr.Button("Process PDF Folders")
gr.Markdown("Note: Please select multiple folders (up to 5) containing PDFs")
with gr.Column():
json_display = gr.JSON(label="Extracted JSON Data")
pdf_errors = gr.Textbox(label="Processing Errors")
json_download = gr.File(label="Download JSON Files", visible=False)
with gr.Tab("LLM API Processing"):
with gr.Row():
with gr.Column():
api_key = gr.Textbox(label="OpenRouter API Key", type="password")
repeated_info = gr.Textbox(
label="Additional Instructions",
value="Extract voter information from the following text:"
)
debug_mode = gr.Checkbox(label="Enable Debug Mode")
api_submit = gr.Button("Call LLM API")
with gr.Column():
dataframe_output = gr.Dataframe(label="CSV Output", wrap=True)
csv_download = gr.File(label="Download CSV Files")
api_debug = gr.Textbox(label="Debug Information", visible=False)
api_status = gr.Textbox(label="API Status", visible=False)
def process_selected_folders(files):
# Filter out non-directory files and limit to 5 folders
folder_paths = []
for file_info in files:
file_path = file_info.name
if os.path.isdir(file_path):
folder_paths.append(file_path)
if len(folder_paths) >= 5:
break
if not folder_paths:
return None, None, "No valid folders selected or found in the upload"
return process_folders(folder_paths)
# PDF Processing
pdf_submit.click(
process_selected_folders,
inputs=[folder_input],
outputs=[json_display, json_download, pdf_errors]
)
# API Processing
api_submit.click(
call_llm_api,
inputs=[api_key, json_download, repeated_info, debug_mode],
outputs=[dataframe_output, csv_download, api_debug, api_status]
)
# Show/hide debug based on checkbox
debug_mode.change(
lambda x: gr.update(visible=x),
inputs=[debug_mode],
outputs=[api_debug]
)
# Update API status visibility
api_submit.click(
lambda: gr.update(visible=True),
inputs=None,
outputs=[api_status]
)
if __name__ == "__main__":
demo.launch(share=True)