import gradio as gr import os import re import time import torch import torch.nn as nn from PIL import Image import pytesseract from playwright.sync_api import sync_playwright import asyncio from transformers import AutoTokenizer, BertTokenizerFast from torchvision import transforms from torchvision import models from torchvision.transforms import functional as F import pandas as pd from huggingface_hub import hf_hub_download import warnings warnings.filterwarnings("ignore") from pathlib import Path import subprocess import traceback # --- Setup --- # Device setup device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # Load tokenizer with proper error handling try: # # Try to load from local tokenizer directory # tokenizer_path = '/app/tokenizers/indobert-base-p1' # if os.path.exists(tokenizer_path): # print(f"Loading tokenizer from local path: {tokenizer_path}") # tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) # else: # # If local not available, try direct download with cache # print("Local tokenizer not found, downloading from Hugging Face...") # # tokenizer = AutoTokenizer.from_pretrained('indobenchmark/indobert-base-p1', # # use_fast=True, # # cache_dir='/app/tokenizers') tokenizer = BertTokenizerFast.from_pretrained("indobenchmark/indobert-base-p1") except Exception as e: print(f"Error loading tokenizer: {e}") # Fallback to default BERT tokenizer if needed print("Falling back to default BERT tokenizer") tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') # Image transformation class ResizePadToSquare: def __init__(self, target_size=300): self.target_size = target_size def __call__(self, img): img = img.convert("RGB") img.thumbnail((self.target_size, self.target_size), Image.BILINEAR) delta_w = self.target_size - img.size[0] delta_h = self.target_size - img.size[1] padding = (delta_w // 2, delta_h // 2, delta_w - delta_w // 2, delta_h - delta_h // 2) img = F.pad(img, padding, fill=0, padding_mode='constant') return img transform = transforms.Compose([ ResizePadToSquare(300), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) # Jalankan ini sekali di awal startup aplikasi (misalnya di main file / sebelum model load) def ensure_playwright_chromium(): try: print("Checking and installing Playwright Chromium if not present...") subprocess.run(["playwright", "install", "chromium"], check=True) print("Playwright Chromium installation completed.") except Exception as e: print("Error during Playwright Chromium installation:", e) traceback.print_exc() # Pastikan dipanggil saat startup (di luar fungsi screenshot) ensure_playwright_chromium() # Screenshot folder SCREENSHOT_DIR = "screenshots" os.makedirs(SCREENSHOT_DIR, exist_ok=True) # Set Tesseract language pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract' # Path to tesseract in Docker print("Tesseract OCR initialized.") # --- Model --- class LateFusionModel(nn.Module): def __init__(self, image_model, text_model): super(LateFusionModel, self).__init__() self.image_model = image_model self.text_model = text_model self.image_weight = nn.Parameter(torch.tensor(0.5)) self.text_weight = nn.Parameter(torch.tensor(0.5)) def forward(self, images, input_ids, attention_mask): with torch.no_grad(): image_logits = self.image_model(images).squeeze(1) text_logits = self.text_model(input_ids=input_ids, attention_mask=attention_mask).logits.squeeze(1) weights = torch.softmax(torch.stack([self.image_weight, self.text_weight]), dim=0) fused_logits = weights[0] * image_logits + weights[1] * text_logits return fused_logits, image_logits, text_logits, weights # Load model model_path = "models/best_fusion_model.pt" if os.path.exists(model_path): fusion_model = torch.load(model_path, map_location=device, weights_only=False) else: model_path = hf_hub_download(repo_id="azzandr/gambling-fusion-model", filename="best_fusion_model.pt") fusion_model = torch.load(model_path, map_location=device, weights_only=False) fusion_model.to(device) fusion_model.eval() print("Fusion model loaded successfully!") # Load Image-Only Model # Load image model from state_dict image_model_path = "models/best_image_model_Adam_lr0.0001_bs32_state_dict.pt" if os.path.exists(image_model_path): image_only_model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT) num_features = image_only_model.classifier[1].in_features image_only_model.classifier = nn.Linear(num_features, 1) image_only_model.load_state_dict(torch.load(image_model_path, map_location=device)) image_only_model.to(device) image_only_model.eval() print("Image-only model loaded from state_dict successfully!") else: # Download from HuggingFace if local file doesn't exist image_model_path = hf_hub_download(repo_id="azzandr/gambling-image-model", filename="best_image_model_Adam_lr0.0001_bs32_state_dict.pt") image_only_model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT) num_features = image_only_model.classifier[1].in_features image_only_model.classifier = nn.Linear(num_features, 1) image_only_model.load_state_dict(torch.load(image_model_path, map_location=device)) image_only_model.to(device) image_only_model.eval() print("Image-only model loaded from HuggingFace successfully!") # --- Functions --- def clean_text(text): exceptions = { "di", "ke", "ya" } # ----- BASIC CLEANING ----- text = re.sub(r"http\S+", "", text) # Hapus URL text = re.sub(r"\n", " ", text) # Ganti newline dengan spasi text = re.sub(r"[^a-zA-Z']", " ", text) # Hanya sisakan huruf dan apostrof text = re.sub(r"\s{2,}", " ", text).strip().lower() # Hapus spasi ganda, ubah ke lowercase # ----- FILTERING ----- words = text.split() filtered_words = [ w for w in words if (len(w) > 2 or w in exceptions) # Simpan kata >2 huruf atau ada di exceptions ] text = ' '.join(filtered_words) # ----- REMOVE UNWANTED PATTERNS ----- text = re.sub(r'\b[aeiou]+\b', '', text) # Hapus kata semua vokal (panjang berapa pun) text = re.sub(r'\b[^aeiou\s]+\b', '', text) # Hapus kata semua konsonan (panjang berapa pun) text = re.sub(r'\b\w{20,}\b', '', text) # Hapus kata sangat panjang (≥20 huruf) text = re.sub(r'\s+', ' ', text).strip() # Bersihkan spasi ekstra # check words number if len(text.split()) < 5: print(f"Cleaned text too short ({len(text.split())} words). Ignoring text.") return "" # empty return to use image-only return text # Fungsi untuk mengambil screenshot viewport def take_screenshot(url): filename = url.replace('https://', '').replace('http://', '').replace('/', '_').replace('.', '_') + '.png' filepath = os.path.join(SCREENSHOT_DIR, filename) try: print(f"\n=== [START SCREENSHOT] URL: {url} ===") from playwright.sync_api import sync_playwright with sync_playwright() as p: print("Launching Playwright Chromium...") browser = p.chromium.launch() page = browser.new_page( viewport={"width": 1280, "height": 800}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36" ) page.set_default_timeout(60000) page.set_extra_http_headers({"Accept-Language": "en-US,en;q=0.9"}) print("Navigating to URL...") page.goto(url, wait_until="networkidle", timeout=60000) page.wait_for_timeout(3000) print("Taking screenshot (viewport only)...") page.screenshot(path=filepath) browser.close() print(f"Screenshot saved to {filepath}") print(f"=== [END SCREENSHOT] ===\n") return filepath except Exception as e: print(f"[ERROR] Failed to take screenshot for URL: {url}") print(f"Exception: {e}") traceback.print_exc() return None def resize_if_needed(image_path, max_mb=1, target_width=720): file_size = os.path.getsize(image_path) / (1024 * 1024) # dalam MB if file_size > max_mb: try: with Image.open(image_path) as img: width, height = img.size if width > target_width: ratio = target_width / float(width) new_height = int((float(height) * float(ratio))) img = img.resize((target_width, new_height), Image.Resampling.LANCZOS) img.save(image_path, optimize=True, quality=85) print(f"Image resized to {target_width}x{new_height}") except Exception as e: print(f"Resize error: {e}") def extract_text_from_image(image_path): try: resize_if_needed(image_path, max_mb=1, target_width=720) # Use Tesseract OCR with Indonesian language text = pytesseract.image_to_string(Image.open(image_path), lang='ind') print(f"OCR text extracted with Tesseract: {len(text)} characters") return text.strip() except Exception as e: print(f"Tesseract OCR error: {e}") return "" def prepare_data_for_model(image_path, text): image = Image.open(image_path) image_tensor = transform(image).unsqueeze(0).to(device) clean_text_data = clean_text(text) encoding = tokenizer.encode_plus( clean_text_data, add_special_tokens=True, max_length=128, padding='max_length', truncation=True, return_tensors='pt' ) input_ids = encoding['input_ids'].to(device) attention_mask = encoding['attention_mask'].to(device) return image_tensor, input_ids, attention_mask def predict_single_url(url): if not url.startswith(('http://', 'https://')): url = 'https://' + url screenshot_path = take_screenshot(url) if not screenshot_path: return f"Error: Failed to take screenshot for {url}", None text = extract_text_from_image(screenshot_path) if not text.strip(): # Jika text kosong print(f"No OCR text found for {url}. Using Image-Only Model.") image = Image.open(screenshot_path) image_tensor = transform(image).unsqueeze(0).to(device) with torch.no_grad(): image_logits = image_only_model(image_tensor).squeeze(1) image_probs = torch.sigmoid(image_logits) threshold = 0.6 is_gambling = image_probs[0] > threshold label = "Gambling" if is_gambling else "Non-Gambling" confidence = image_probs[0].item() if is_gambling else 1 - image_probs[0].item() print(f"[Image-Only] URL: {url}") print(f"Prediction: {label} | Confidence: {confidence:.2f}\n") return label, f"Confidence: {confidence:.2f}" else: image_tensor, input_ids, attention_mask = prepare_data_for_model(screenshot_path, text) with torch.no_grad(): fused_logits, image_logits, text_logits, weights = fusion_model(image_tensor, input_ids, attention_mask) fused_probs = torch.sigmoid(fused_logits) image_probs = torch.sigmoid(image_logits) text_probs = torch.sigmoid(text_logits) threshold = 0.6 is_gambling = fused_probs[0] > threshold label = "Gambling" if is_gambling else "Non-Gambling" confidence = fused_probs[0].item() if is_gambling else 1 - fused_probs[0].item() # ✨ Log detail print(f"[Fusion Model] URL: {url}") print(f"Image Model Prediction Probability: {image_probs[0]:.2f}") print(f"Text Model Prediction Probability: {text_probs[0]:.2f}") print(f"Fusion Final Prediction: {label} | Confidence: {confidence:.2f}\n") return label, f"Confidence: {confidence:.2f}" def predict_batch_urls(file_obj): results = [] content = file_obj.read().decode('utf-8') urls = [line.strip() for line in content.splitlines() if line.strip()] for url in urls: label, confidence = predict_single_url(url) results.append({"url": url, "label": label, "confidence": confidence}) df = pd.DataFrame(results) print(f"Batch prediction completed for {len(urls)} URLs.") return df # --- Gradio App --- with gr.Blocks() as app: gr.Markdown("# 🕵️ Gambling Website Detection (URL Based)") gr.Markdown("### Using Playwright & Tesseract OCR") with gr.Tab("Single URL"): url_input = gr.Textbox(label="Enter Website URL") predict_button = gr.Button("Predict") label_output = gr.Label() confidence_output = gr.Textbox(label="Confidence", interactive=False) predict_button.click(fn=predict_single_url, inputs=url_input, outputs=[label_output, confidence_output]) with gr.Tab("Batch URLs"): file_input = gr.File(label="Upload .txt file with URLs (one per line)") batch_predict_button = gr.Button("Batch Predict") batch_output = gr.DataFrame() batch_predict_button.click(fn=predict_batch_urls, inputs=file_input, outputs=batch_output) if __name__ == "__main__": app.launch(server_name="0.0.0.0", server_port=7860)