Azzan Dwi Riski
fix issues
589504e
import gradio as gr
import os
import re
import time
import torch
import torch.nn as nn
from PIL import Image
import pytesseract
from playwright.sync_api import sync_playwright
import asyncio
from transformers import AutoTokenizer, BertTokenizerFast
from torchvision import transforms
from torchvision import models
from torchvision.transforms import functional as F
import pandas as pd
from huggingface_hub import hf_hub_download
import warnings
warnings.filterwarnings("ignore")
from pathlib import Path
import subprocess
import traceback
# --- Setup ---
# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Load tokenizer with proper error handling
try:
# # Try to load from local tokenizer directory
# tokenizer_path = '/app/tokenizers/indobert-base-p1'
# if os.path.exists(tokenizer_path):
# print(f"Loading tokenizer from local path: {tokenizer_path}")
# tokenizer = AutoTokenizer.from_pretrained(tokenizer_path)
# else:
# # If local not available, try direct download with cache
# print("Local tokenizer not found, downloading from Hugging Face...")
# # tokenizer = AutoTokenizer.from_pretrained('indobenchmark/indobert-base-p1',
# # use_fast=True,
# # cache_dir='/app/tokenizers')
tokenizer = BertTokenizerFast.from_pretrained("indobenchmark/indobert-base-p1")
except Exception as e:
print(f"Error loading tokenizer: {e}")
# Fallback to default BERT tokenizer if needed
print("Falling back to default BERT tokenizer")
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
# Image transformation
class ResizePadToSquare:
def __init__(self, target_size=300):
self.target_size = target_size
def __call__(self, img):
img = img.convert("RGB")
img.thumbnail((self.target_size, self.target_size), Image.BILINEAR)
delta_w = self.target_size - img.size[0]
delta_h = self.target_size - img.size[1]
padding = (delta_w // 2, delta_h // 2, delta_w - delta_w // 2, delta_h - delta_h // 2)
img = F.pad(img, padding, fill=0, padding_mode='constant')
return img
transform = transforms.Compose([
ResizePadToSquare(300),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]),
])
# Jalankan ini sekali di awal startup aplikasi (misalnya di main file / sebelum model load)
def ensure_playwright_chromium():
try:
print("Checking and installing Playwright Chromium if not present...")
subprocess.run(["playwright", "install", "chromium"], check=True)
print("Playwright Chromium installation completed.")
except Exception as e:
print("Error during Playwright Chromium installation:", e)
traceback.print_exc()
# Pastikan dipanggil saat startup (di luar fungsi screenshot)
ensure_playwright_chromium()
# Screenshot folder
SCREENSHOT_DIR = "screenshots"
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
# Set Tesseract language
pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract' # Path to tesseract in Docker
print("Tesseract OCR initialized.")
# --- Model ---
class LateFusionModel(nn.Module):
def __init__(self, image_model, text_model):
super(LateFusionModel, self).__init__()
self.image_model = image_model
self.text_model = text_model
self.image_weight = nn.Parameter(torch.tensor(0.5))
self.text_weight = nn.Parameter(torch.tensor(0.5))
def forward(self, images, input_ids, attention_mask):
with torch.no_grad():
image_logits = self.image_model(images).squeeze(1)
text_logits = self.text_model(input_ids=input_ids, attention_mask=attention_mask).logits.squeeze(1)
weights = torch.softmax(torch.stack([self.image_weight, self.text_weight]), dim=0)
fused_logits = weights[0] * image_logits + weights[1] * text_logits
return fused_logits, image_logits, text_logits, weights
# Load model
model_path = "models/best_fusion_model.pt"
if os.path.exists(model_path):
fusion_model = torch.load(model_path, map_location=device, weights_only=False)
else:
model_path = hf_hub_download(repo_id="azzandr/gambling-fusion-model", filename="best_fusion_model.pt")
fusion_model = torch.load(model_path, map_location=device, weights_only=False)
fusion_model.to(device)
fusion_model.eval()
print("Fusion model loaded successfully!")
# Load Image-Only Model
# Load image model from state_dict
image_model_path = "models/best_image_model_Adam_lr0.0001_bs32_state_dict.pt"
if os.path.exists(image_model_path):
image_only_model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT)
num_features = image_only_model.classifier[1].in_features
image_only_model.classifier = nn.Linear(num_features, 1)
image_only_model.load_state_dict(torch.load(image_model_path, map_location=device))
image_only_model.to(device)
image_only_model.eval()
print("Image-only model loaded from state_dict successfully!")
else:
# Download from HuggingFace if local file doesn't exist
image_model_path = hf_hub_download(repo_id="azzandr/gambling-image-model",
filename="best_image_model_Adam_lr0.0001_bs32_state_dict.pt")
image_only_model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT)
num_features = image_only_model.classifier[1].in_features
image_only_model.classifier = nn.Linear(num_features, 1)
image_only_model.load_state_dict(torch.load(image_model_path, map_location=device))
image_only_model.to(device)
image_only_model.eval()
print("Image-only model loaded from HuggingFace successfully!")
# --- Functions ---
def clean_text(text):
exceptions = {
"di", "ke", "ya"
}
# ----- BASIC CLEANING -----
text = re.sub(r"http\S+", "", text) # Hapus URL
text = re.sub(r"\n", " ", text) # Ganti newline dengan spasi
text = re.sub(r"[^a-zA-Z']", " ", text) # Hanya sisakan huruf dan apostrof
text = re.sub(r"\s{2,}", " ", text).strip().lower() # Hapus spasi ganda, ubah ke lowercase
# ----- FILTERING -----
words = text.split()
filtered_words = [
w for w in words
if (len(w) > 2 or w in exceptions) # Simpan kata >2 huruf atau ada di exceptions
]
text = ' '.join(filtered_words)
# ----- REMOVE UNWANTED PATTERNS -----
text = re.sub(r'\b[aeiou]+\b', '', text) # Hapus kata semua vokal (panjang berapa pun)
text = re.sub(r'\b[^aeiou\s]+\b', '', text) # Hapus kata semua konsonan (panjang berapa pun)
text = re.sub(r'\b\w{20,}\b', '', text) # Hapus kata sangat panjang (≥20 huruf)
text = re.sub(r'\s+', ' ', text).strip() # Bersihkan spasi ekstra
# check words number
if len(text.split()) < 5:
print(f"Cleaned text too short ({len(text.split())} words). Ignoring text.")
return "" # empty return to use image-only
return text
# Fungsi untuk mengambil screenshot viewport
def take_screenshot(url):
filename = url.replace('https://', '').replace('http://', '').replace('/', '_').replace('.', '_') + '.png'
filepath = os.path.join(SCREENSHOT_DIR, filename)
try:
print(f"\n=== [START SCREENSHOT] URL: {url} ===")
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
print("Launching Playwright Chromium...")
browser = p.chromium.launch()
page = browser.new_page(
viewport={"width": 1280, "height": 800},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36"
)
page.set_default_timeout(60000)
page.set_extra_http_headers({"Accept-Language": "en-US,en;q=0.9"})
print("Navigating to URL...")
page.goto(url, wait_until="networkidle", timeout=60000)
page.wait_for_timeout(3000)
print("Taking screenshot (viewport only)...")
page.screenshot(path=filepath)
browser.close()
print(f"Screenshot saved to {filepath}")
print(f"=== [END SCREENSHOT] ===\n")
return filepath
except Exception as e:
print(f"[ERROR] Failed to take screenshot for URL: {url}")
print(f"Exception: {e}")
traceback.print_exc()
return None
def resize_if_needed(image_path, max_mb=1, target_width=720):
file_size = os.path.getsize(image_path) / (1024 * 1024) # dalam MB
if file_size > max_mb:
try:
with Image.open(image_path) as img:
width, height = img.size
if width > target_width:
ratio = target_width / float(width)
new_height = int((float(height) * float(ratio)))
img = img.resize((target_width, new_height), Image.Resampling.LANCZOS)
img.save(image_path, optimize=True, quality=85)
print(f"Image resized to {target_width}x{new_height}")
except Exception as e:
print(f"Resize error: {e}")
def extract_text_from_image(image_path):
try:
resize_if_needed(image_path, max_mb=1, target_width=720)
# Use Tesseract OCR with Indonesian language
text = pytesseract.image_to_string(Image.open(image_path), lang='ind')
print(f"OCR text extracted with Tesseract: {len(text)} characters")
return text.strip()
except Exception as e:
print(f"Tesseract OCR error: {e}")
return ""
def prepare_data_for_model(image_path, text):
image = Image.open(image_path)
image_tensor = transform(image).unsqueeze(0).to(device)
clean_text_data = clean_text(text)
encoding = tokenizer.encode_plus(
clean_text_data,
add_special_tokens=True,
max_length=128,
padding='max_length',
truncation=True,
return_tensors='pt'
)
input_ids = encoding['input_ids'].to(device)
attention_mask = encoding['attention_mask'].to(device)
return image_tensor, input_ids, attention_mask
def predict_single_url(url):
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
screenshot_path = take_screenshot(url)
if not screenshot_path:
return f"Error: Failed to take screenshot for {url}", None
text = extract_text_from_image(screenshot_path)
if not text.strip(): # Jika text kosong
print(f"No OCR text found for {url}. Using Image-Only Model.")
image = Image.open(screenshot_path)
image_tensor = transform(image).unsqueeze(0).to(device)
with torch.no_grad():
image_logits = image_only_model(image_tensor).squeeze(1)
image_probs = torch.sigmoid(image_logits)
threshold = 0.6
is_gambling = image_probs[0] > threshold
label = "Gambling" if is_gambling else "Non-Gambling"
confidence = image_probs[0].item() if is_gambling else 1 - image_probs[0].item()
print(f"[Image-Only] URL: {url}")
print(f"Prediction: {label} | Confidence: {confidence:.2f}\n")
return label, f"Confidence: {confidence:.2f}"
else:
image_tensor, input_ids, attention_mask = prepare_data_for_model(screenshot_path, text)
with torch.no_grad():
fused_logits, image_logits, text_logits, weights = fusion_model(image_tensor, input_ids, attention_mask)
fused_probs = torch.sigmoid(fused_logits)
image_probs = torch.sigmoid(image_logits)
text_probs = torch.sigmoid(text_logits)
threshold = 0.6
is_gambling = fused_probs[0] > threshold
label = "Gambling" if is_gambling else "Non-Gambling"
confidence = fused_probs[0].item() if is_gambling else 1 - fused_probs[0].item()
# ✨ Log detail
print(f"[Fusion Model] URL: {url}")
print(f"Image Model Prediction Probability: {image_probs[0]:.2f}")
print(f"Text Model Prediction Probability: {text_probs[0]:.2f}")
print(f"Fusion Final Prediction: {label} | Confidence: {confidence:.2f}\n")
return label, f"Confidence: {confidence:.2f}"
def predict_batch_urls(file_obj):
results = []
content = file_obj.read().decode('utf-8')
urls = [line.strip() for line in content.splitlines() if line.strip()]
for url in urls:
label, confidence = predict_single_url(url)
results.append({"url": url, "label": label, "confidence": confidence})
df = pd.DataFrame(results)
print(f"Batch prediction completed for {len(urls)} URLs.")
return df
# --- Gradio App ---
with gr.Blocks() as app:
gr.Markdown("# 🕵️ Gambling Website Detection (URL Based)")
gr.Markdown("### Using Playwright & Tesseract OCR")
with gr.Tab("Single URL"):
url_input = gr.Textbox(label="Enter Website URL")
predict_button = gr.Button("Predict")
label_output = gr.Label()
confidence_output = gr.Textbox(label="Confidence", interactive=False)
predict_button.click(fn=predict_single_url, inputs=url_input, outputs=[label_output, confidence_output])
with gr.Tab("Batch URLs"):
file_input = gr.File(label="Upload .txt file with URLs (one per line)")
batch_predict_button = gr.Button("Batch Predict")
batch_output = gr.DataFrame()
batch_predict_button.click(fn=predict_batch_urls, inputs=file_input, outputs=batch_output)
if __name__ == "__main__":
app.launch(server_name="0.0.0.0", server_port=7860)