formiq / app.py
chandinisaisri's picture
Update app.py
11bdeb7 verified
import streamlit as st
import torch
from transformers import LayoutLMv3Processor, LayoutLMv3ForTokenClassification, TrOCRProcessor, VisionEncoderDecoderModel
from PIL import Image
import io
import json
import pandas as pd
import plotly.express as px
import numpy as np
from typing import Dict, Any
import logging
import pytesseract
import re
from openai import OpenAI
import os
from pdf2image import convert_from_bytes
from dotenv import load_dotenv
from chatbot_utils import ask_receipt_chatbot
import time
from tensorboard.backend.event_processing import event_accumulator
from torch.utils.tensorboard import SummaryWriter
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib
import boto3
from decimal import Decimal
import uuid
from paddleocr import PaddleOCR
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Initialize OpenAI client for Perplexity
api_key = os.getenv('PERPLEXITY_API_KEY')
if not api_key:
st.error("""
⚠️ Perplexity API key not found! Please add your API key to the Space's secrets:
1. Go to Space Settings
2. Click on 'Repository secrets'
3. Add a new secret with name 'PERPLEXITY_API_KEY'
4. Add your Perplexity API key as the value
""")
st.stop()
client = OpenAI(
api_key=api_key,
base_url="https://api.perplexity.ai"
)
# Initialize LayoutLM model
@st.cache_resource
def load_model():
model_name = "microsoft/layoutlmv3-base"
processor = LayoutLMv3Processor.from_pretrained(model_name)
model = LayoutLMv3ForTokenClassification.from_pretrained(model_name)
return processor, model
def extract_json_from_llm_output(llm_result):
# Try to extract JSON from a code block first (```json ... ``` or ``` ... ```)
code_block_match = re.search(r"```(?:json)?\s*({[\s\S]*?})\s*```", llm_result, re.IGNORECASE)
if code_block_match:
return code_block_match.group(1)
# Fallback: extract first {...} block
match = re.search(r'\{[\s\S]*\}', llm_result)
if match:
return match.group(0)
return None
def extract_fields(image_path):
text = pytesseract.image_to_string(Image.open(image_path))
st.subheader("Raw OCR Output")
st.code(text)
# Improved Regex patterns for fields
patterns = {
"name": r"Mrs\s+\w+\s+\w+",
"date": r"Date[:\s]+([\d/]+)",
"product": r"\d+\s+\w+.*Style\s+\d+",
"amount_paid": r"Total Paid\s+\$?([\d.,]+)",
"receipt_no": r"Receipt No\.?\s*:?\s*(\d+)"
}
results = {}
for field, pattern in patterns.items():
match = re.search(pattern, text, re.IGNORECASE)
if match:
results[field] = match.group(1) if match.groups() else match.group(0)
else:
results[field] = None
# Extract all products
results["products"] = extract_products(text)
return results
def extract_products(text):
# Pattern to match product lines with quantity, name, and price
# Example: "2 PISTACHIO 14.49" or "1076903 PISTACHIO 14.49"
product_pattern = r"(?:(\d+)\s+)?([A-Z0-9 ]+)\s+(\d+\.\d{2})"
matches = re.findall(product_pattern, text)
products = []
for match in matches:
quantity, name, price = match
product = {
"name": name.strip(),
"price": float(price),
"quantity": int(quantity) if quantity else 1,
"total": float(price) * (int(quantity) if quantity else 1)
}
products.append(product)
return products
def extract_with_perplexity_llm(ocr_text):
prompt = f"""
You are an expert at extracting structured data from receipts.
From the following OCR text, extract these fields and return them as a JSON object with exactly these keys:
- name (customer name)
- date (date of purchase)
- amount_paid (total amount paid)
- receipt_no (receipt number)
- products (a list of all products, each with name, price, and quantity if available)
Example output:
{{
"name": "Mrs. Genevieve Lopez",
"date": "12/13/2024",
"amount_paid": 29.69,
"receipt_no": "042085",
"products": [
{{"name": "Orange Juice", "price": 2.15, "quantity": 1}},
{{"name": "Apples", "price": 3.50, "quantity": 1}}
]
}}
Text:
\"\"\"{ocr_text}\"\"\"
"""
messages = [
{
"role": "system",
"content": "You are an AI assistant that extracts structured information from text."
},
{
"role": "user",
"content": prompt
}
]
response = client.chat.completions.create(
model="sonar-pro",
messages=messages
)
return response.choices[0].message.content
def convert_floats_to_decimal(obj):
if isinstance(obj, float):
return Decimal(str(obj))
elif isinstance(obj, dict):
return {k: convert_floats_to_decimal(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_floats_to_decimal(i) for i in obj]
else:
return obj
def save_to_dynamodb(data, table_name="Receipts"):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(table_name)
# Calculate total amount if not provided
if "products" in data and not data.get("amount_paid"):
total = sum(product["total"] for product in data["products"])
data["amount_paid"] = total
# Convert all float values to Decimal for DynamoDB
data = convert_floats_to_decimal(data)
# Generate receipt number if not present
if not data.get("receipt_no"):
data["receipt_no"] = str(uuid.uuid4())
table.put_item(Item=data)
def merge_extractions(regex_fields, llm_fields):
merged = {}
for key in ["name", "date", "amount_paid", "receipt_no"]:
merged[key] = llm_fields.get(key) or regex_fields.get(key)
merged["products"] = llm_fields.get("products") or regex_fields.get("products")
return merged
def extract_handwritten_text(image):
processor = TrOCRProcessor.from_pretrained('microsoft/trocr-base-handwritten')
model = VisionEncoderDecoderModel.from_pretrained('microsoft/trocr-base-handwritten')
pixel_values = processor(images=image, return_tensors="pt").pixel_values
generated_ids = model.generate(pixel_values)
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
return generated_text
@st.cache_resource
def get_paddle_ocr():
return PaddleOCR(use_angle_cls=True, lang='en', show_log=False)
def extract_handwritten_text_paddle(image):
ocr = get_paddle_ocr()
# Save PIL image to a temporary file
temp_path = 'temp_uploaded_image_paddle.jpg'
image.save(temp_path)
result = ocr.ocr(temp_path, cls=True)
lines = [line[1][0] for line in result[0]]
return '\n'.join(lines)
def main():
st.set_page_config(
page_title="FormIQ - Intelligent Receipt Parser",
page_icon="📄",
layout="wide"
)
st.title("FormIQ: Intelligent Receipt Parser")
st.markdown("""
Upload your documents to extract and validate information using advanced AI models.
""")
# Sidebar
with st.sidebar:
st.header("Settings")
document_type = st.selectbox(
"Document Type",
options=["invoice", "receipt", "form"],
index=0
)
confidence_threshold = st.slider(
"Confidence Threshold",
min_value=0.0,
max_value=1.0,
value=0.5,
step=0.05
)
st.markdown("---")
st.markdown("### About")
st.markdown("""
FormIQ uses LayoutLMv3 and Perplexity AI to extract and validate information from documents.
""")
# Receipt Chatbot in sidebar
st.markdown("---")
st.header("💬 Receipt Chatbot")
st.write("Ask questions about your receipts stored in DynamoDB.")
user_question = st.text_input("Enter your question:", "What is the total amount paid?")
if st.button("Ask Chatbot", key="sidebar_chatbot"):
with st.spinner("Getting answer from Perplexity LLM..."):
answer = ask_receipt_chatbot(user_question)
st.success(answer)
# Main content
uploaded_file = st.file_uploader(
"Upload Document",
type=["png", "jpg", "jpeg", "pdf"],
help="Upload a document image to process"
)
if uploaded_file is not None:
image = Image.open(uploaded_file).convert("RGB")
st.image(image, caption="Uploaded Document", width=600)
handwritten_text = None
# Option to extract handwritten text with PaddleOCR
if st.checkbox("Extract handwritten text (PaddleOCR)?"):
with st.spinner("Extracting handwritten text with PaddleOCR..."):
handwritten_text = extract_handwritten_text_paddle(image)
st.subheader("Handwritten Text Extracted (PaddleOCR)")
st.write(handwritten_text)
# Process button
if st.button("Process Document"):
with st.spinner("Processing document..."):
try:
temp_path = "temp_uploaded_image.jpg"
image.save(temp_path)
# Use handwritten text if available, else fallback to pytesseract
if handwritten_text:
llm_input_text = handwritten_text
else:
llm_input_text = pytesseract.image_to_string(Image.open(temp_path))
llm_result = extract_with_perplexity_llm(llm_input_text)
llm_json = extract_json_from_llm_output(llm_result)
st.subheader("Structured Data (Perplexity LLM)")
if llm_json:
try:
llm_data = json.loads(llm_json)
st.json(llm_data)
save_to_dynamodb(llm_data)
st.success("Saved to DynamoDB!")
except Exception as e:
st.error(f"Failed to parse LLM output as JSON: {e}")
else:
st.warning("No valid JSON found in LLM output.")
except Exception as e:
logger.error(f"Error processing document: {str(e)}")
st.error(f"Error processing document: {str(e)}")
st.header("Model Training & Evaluation Demo")
if st.button("Start Training"):
epochs = 10
num_classes = 3 # Example: 3 classes for confusion matrix
losses = []
val_losses = []
accuracies = []
progress = st.progress(0)
chart = st.line_chart({"Loss": [], "Val Loss": [], "Accuracy": []})
writer = SummaryWriter("logs")
for epoch in range(epochs):
# Simulate training
loss = np.exp(-epoch/5) + np.random.rand() * 0.05
val_loss = loss + np.random.rand() * 0.02
acc = 1 - loss + np.random.rand() * 0.02
losses.append(loss)
val_losses.append(val_loss)
accuracies.append(acc)
chart.add_rows({"Loss": [loss], "Val Loss": [val_loss], "Accuracy": [acc]})
progress.progress((epoch+1)/epochs)
st.write(f"Epoch {epoch+1}: Loss={loss:.4f}, Val Loss={val_loss:.4f}, Accuracy={acc:.4f}")
# Log to TensorBoard
writer.add_scalar("loss", loss, epoch)
writer.add_scalar("val_loss", val_loss, epoch)
writer.add_scalar("accuracy", acc, epoch)
# Simulate predictions and labels for confusion matrix
y_true = np.random.randint(0, num_classes, 100)
y_pred = y_true.copy()
y_pred[np.random.choice(100, 10, replace=False)] = np.random.randint(0, num_classes, 10)
cm = confusion_matrix(y_true, y_pred, labels=range(num_classes))
# Only log confusion matrix in the last epoch
if epoch == epochs - 1:
fig, ax = plt.subplots()
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[f"Class {i}" for i in range(num_classes)])
disp.plot(ax=ax)
plt.close(fig)
writer.add_figure("confusion_matrix", fig, epoch)
writer.close()
st.success("Training complete!")
# Show last confusion matrix in Streamlit
if 'cm' in locals():
st.subheader("Confusion Matrix (Last Epoch)")
fig, ax = plt.subplots()
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=[f"Class {i}" for i in range(num_classes)])
disp.plot(ax=ax)
st.pyplot(fig)
else:
st.info("Confusion matrix not found.")
if __name__ == "__main__":
main()