import gradio as gr import os import re import time import torch import torch.nn as nn from PIL import Image import pytesseract from playwright.sync_api import sync_playwright import asyncio from transformers import AutoTokenizer, BertTokenizerFast from torchvision import transforms from torchvision import models from torchvision.transforms import functional as F import pandas as pd from huggingface_hub import hf_hub_download import warnings warnings.filterwarnings("ignore") from pathlib import Path import subprocess import traceback # ============================================= # CONFIGURATION # ============================================= BLOCK_PATTERNS = [ "doubleclick", "adservice", "googlesyndication", "ads", "adserver", "cookie", "consent", "analytics", "tracker", "tracking", "stats", "metric", "telemetry", "social", "facebook", "twitter", "linkedin", "pinterest", "popup", "notification", "banner" ] PAGE_TIMEOUT = 30000 # reduced to 30 seconds WAIT_FOR_LOAD_TIMEOUT = 5000 # reduced to 5 seconds CLOUDFLARE_CHECK_KEYWORDS = ["Checking your browser", "Just a moment", "Cloudflare"] MAX_REDIRECTS = 5 # Maximum number of redirects to follow # ============================================= # HELPER FUNCTIONS # ============================================= def ensure_http(url): if not url.startswith(('http://', 'https://')): return 'http://' + url return url def sanitize_filename(url): return re.sub(r'[^\w\-_\. ]', '_', url) def block_ads_and_cookies(page): def route_intercept(route): if any(resource in route.request.url.lower() for resource in BLOCK_PATTERNS): route.abort() else: route.continue_() page.route("**/*", route_intercept) def wait_for_page_stable(page): try: # First wait for DOM content page.wait_for_load_state('domcontentloaded', timeout=PAGE_TIMEOUT) # Then wait for network to be idle try: page.wait_for_load_state('networkidle', timeout=WAIT_FOR_LOAD_TIMEOUT) except: print("Network not fully idle, continuing anyway...") # Small additional wait time.sleep(2) except Exception as e: print(f"⚠️ Page not fully stable: {e}") def detect_and_bypass_cloudflare(page): try: content = page.content() if any(keyword.lower() in content.lower() for keyword in CLOUDFLARE_CHECK_KEYWORDS): print("⚡ Detected Cloudflare challenge, waiting 5 seconds...") time.sleep(5) page.reload() wait_for_page_stable(page) except Exception as e: print(f"⚠️ Failed to bypass Cloudflare: {e}") # --- Setup --- # Device setup device = torch.device("cuda" if torch.cuda.is_available() else "cpu") print(f"Using device: {device}") # Load tokenizer with proper error handling try: # # Try to load from local tokenizer directory # tokenizer_path = '/app/tokenizers/indobert-base-p1' # if os.path.exists(tokenizer_path): # print(f"Loading tokenizer from local path: {tokenizer_path}") # tokenizer = AutoTokenizer.from_pretrained(tokenizer_path) # else: # # If local not available, try direct download with cache # print("Local tokenizer not found, downloading from Hugging Face...") # # tokenizer = AutoTokenizer.from_pretrained('indobenchmark/indobert-base-p1', # # use_fast=True, # # cache_dir='/app/tokenizers') tokenizer = BertTokenizerFast.from_pretrained("indobenchmark/indobert-base-p1") except Exception as e: print(f"Error loading tokenizer: {e}") # Fallback to default BERT tokenizer if needed print("Falling back to default BERT tokenizer") tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') # Image transformation class ResizePadToSquare: def __init__(self, target_size=300): self.target_size = target_size def __call__(self, img): img = img.convert("RGB") img.thumbnail((self.target_size, self.target_size), Image.BILINEAR) delta_w = self.target_size - img.size[0] delta_h = self.target_size - img.size[1] padding = (delta_w // 2, delta_h // 2, delta_w - delta_w // 2, delta_h - delta_h // 2) img = F.pad(img, padding, fill=0, padding_mode='constant') return img transform = transforms.Compose([ ResizePadToSquare(300), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), ]) # Jalankan ini sekali di awal startup aplikasi (misalnya di main file / sebelum model load) def ensure_playwright_chromium(): try: print("Checking and installing Playwright Chromium if not present...") subprocess.run(["playwright", "install", "chromium"], check=True) print("Playwright Chromium installation completed.") except Exception as e: print("Error during Playwright Chromium installation:", e) traceback.print_exc() # Pastikan dipanggil saat startup (di luar fungsi screenshot) ensure_playwright_chromium() # Screenshot folder SCREENSHOT_DIR = "screenshots" os.makedirs(SCREENSHOT_DIR, exist_ok=True) # Set Tesseract language pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract' # Path to tesseract in Docker print("Tesseract OCR initialized.") # --- Model --- class LateFusionModel(nn.Module): def __init__(self, image_model, text_model): super(LateFusionModel, self).__init__() self.image_model = image_model self.text_model = text_model self.image_weight = nn.Parameter(torch.tensor(0.5)) self.text_weight = nn.Parameter(torch.tensor(0.5)) def forward(self, images, input_ids, attention_mask): with torch.no_grad(): image_logits = self.image_model(images).squeeze(1) text_logits = self.text_model(input_ids=input_ids, attention_mask=attention_mask).logits.squeeze(1) weights = torch.softmax(torch.stack([self.image_weight, self.text_weight]), dim=0) fused_logits = weights[0] * image_logits + weights[1] * text_logits return fused_logits, image_logits, text_logits, weights # Load model model_path = "models/best_fusion_model.pt" if os.path.exists(model_path): fusion_model = torch.load(model_path, map_location=device, weights_only=False) else: model_path = hf_hub_download(repo_id="azzandr/gambling-fusion-model", filename="best_fusion_model.pt") fusion_model = torch.load(model_path, map_location=device, weights_only=False) fusion_model.to(device) fusion_model.eval() print("Fusion model loaded successfully!") # Load Image-Only Model # Load image model from state_dict image_model_path = "models/best_image_model_Adam_lr0.0001_bs32_state_dict.pt" if os.path.exists(image_model_path): image_only_model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT) num_features = image_only_model.classifier[1].in_features image_only_model.classifier = nn.Linear(num_features, 1) image_only_model.load_state_dict(torch.load(image_model_path, map_location=device)) image_only_model.to(device) image_only_model.eval() print("Image-only model loaded from state_dict successfully!") else: # Download from HuggingFace if local file doesn't exist image_model_path = hf_hub_download(repo_id="azzandr/gambling-image-model", filename="best_image_model_Adam_lr0.0001_bs32_state_dict.pt") image_only_model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT) num_features = image_only_model.classifier[1].in_features image_only_model.classifier = nn.Linear(num_features, 1) image_only_model.load_state_dict(torch.load(image_model_path, map_location=device)) image_only_model.to(device) image_only_model.eval() print("Image-only model loaded from HuggingFace successfully!") # --- Functions --- def clean_text(text): exceptions = { "di", "ke", "ya" } # ----- BASIC CLEANING ----- text = re.sub(r"http\S+", "", text) # Hapus URL text = re.sub(r"\n", " ", text) # Ganti newline dengan spasi text = re.sub(r"[^a-zA-Z']", " ", text) # Hanya sisakan huruf dan apostrof text = re.sub(r"\s{2,}", " ", text).strip().lower() # Hapus spasi ganda, ubah ke lowercase # ----- FILTERING ----- words = text.split() filtered_words = [ w for w in words if (len(w) > 2 or w in exceptions) # Simpan kata >2 huruf atau ada di exceptions ] text = ' '.join(filtered_words) # ----- REMOVE UNWANTED PATTERNS ----- text = re.sub(r'\b[aeiou]+\b', '', text) # Hapus kata semua vokal (panjang berapa pun) text = re.sub(r'\b[^aeiou\s]+\b', '', text) # Hapus kata semua konsonan (panjang berapa pun) text = re.sub(r'\b\w{20,}\b', '', text) # Hapus kata sangat panjang (≥20 huruf) text = re.sub(r'\s+', ' ', text).strip() # Bersihkan spasi ekstra # check words number if len(text.split()) < 5: print(f"Cleaned text too short ({len(text.split())} words). Ignoring text.") return "" # empty return to use image-only return text def create_browser_context(playwright): return playwright.chromium.launch( args=[ '--disable-features=IsolateOrigins,site-per-process', '--disable-web-security', '--disable-site-isolation-trials', '--disable-setuid-sandbox', '--no-sandbox', '--disable-gpu', '--disable-dev-shm-usage', '--disable-extensions', '--disable-plugins', '--disable-background-timer-throttling', '--disable-backgrounding-occluded-windows', '--disable-renderer-backgrounding', '--no-first-run', '--no-default-browser-check', '--disable-translate', '--disable-ipc-flooding-protection' ] ).new_context( viewport={"width": 1280, "height": 800}, user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36", ignore_https_errors=True, java_script_enabled=True, bypass_csp=True, extra_http_headers={ "Accept-Language": "en-US,en;q=0.9", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", "Connection": "keep-alive", "DNT": "1", "Cache-Control": "no-cache" } ) def setup_request_interception(page): redirect_urls = set() def handle_request(route): request = route.request url = request.url # Block known ad/tracking patterns if any(pattern in url.lower() for pattern in BLOCK_PATTERNS): print(f"Blocking request to: {url}") route.abort() return # Track potential redirects by monitoring navigation requests if request.resource_type == "document": if url in redirect_urls: if len(redirect_urls) > MAX_REDIRECTS: print(f"Too many redirects (>{MAX_REDIRECTS}), aborting request") route.abort() return redirect_urls.add(url) # Continue with the request route.continue_() # Listen for response events to detect redirects def handle_response(response): if response.status >= 300 and response.status <= 399: redirect_urls.add(response.url) page.on("response", handle_response) page.route("**/*", handle_request) def try_navigation_strategies(page, url): strategies = [ {"wait_until": "commit", "timeout": 15000}, {"wait_until": "domcontentloaded", "timeout": 10000}, {"wait_until": "load", "timeout": 20000}, {"wait_until": "networkidle", "timeout": 30000} ] for i, strategy in enumerate(strategies): try: print(f"Trying navigation strategy {i+1}: {strategy}") response = page.goto(url, **strategy) print(f"Navigation successful with strategy {i+1}") return response except Exception as e: print(f"Strategy {i+1} failed: {e}") if "ERR_TOO_MANY_REDIRECTS" in str(e): print(f"Redirect error detected, trying next strategy...") continue elif i == len(strategies) - 1: # Last strategy raise e continue raise Exception("All navigation strategies failed") def take_screenshot(url): url = ensure_http(url) filename = sanitize_filename(url) + '.png' filepath = os.path.join(SCREENSHOT_DIR, filename) max_retries = 3 for attempt in range(max_retries): try: print(f"\n=== [SCREENSHOT ATTEMPT {attempt + 1}/{max_retries}] URL: {url} ===") with sync_playwright() as p: print("Launching browser with aggressive configuration...") context = create_browser_context(p) page = context.new_page() # Only set up basic request blocking for this attempt if attempt == 0: print("Setting up basic request interception...") def simple_block(route): url_lower = route.request.url.lower() if any(pattern in url_lower for pattern in BLOCK_PATTERNS): route.abort() else: route.continue_() page.route("**/*", simple_block) try: # Try different navigation strategies if attempt == 0: # First attempt: aggressive but safe response = try_navigation_strategies(page, url) elif attempt == 1: # Second attempt: minimal approach print("Trying minimal navigation approach...") response = page.goto(url, wait_until="commit", timeout=10000) else: # Third attempt: just try to load anything print("Trying basic navigation...") response = page.goto(url, timeout=15000) if response: print(f"Response status: {response.status}") # Try to wait for some content try: page.wait_for_timeout(3000) # Just wait 3 seconds if attempt == 0: wait_for_page_stable(page) except Exception as e: print(f"Page stability warning: {e}") # Take screenshot print("Taking screenshot...") page.screenshot(path=filepath) # If we get here, screenshot was successful context.close() print(f"Screenshot saved successfully to {filepath}") return filepath except Exception as nav_error: print(f"Navigation error on attempt {attempt + 1}: {nav_error}") # Try to take screenshot of whatever we have try: if page.url != "about:blank": print("Taking screenshot of partial page...") page.screenshot(path=filepath) context.close() if os.path.exists(filepath): print(f"Partial screenshot saved to {filepath}") return filepath except Exception as screenshot_error: print(f"Failed to take partial screenshot: {screenshot_error}") context.close() # If this is the last attempt, raise the error if attempt == max_retries - 1: raise nav_error else: print(f"Retrying with different approach...") time.sleep(2) # Wait before retry continue except Exception as e: print(f"[ERROR] Attempt {attempt + 1} failed: {e}") if attempt == max_retries - 1: print(f"All {max_retries} attempts failed for URL: {url}") traceback.print_exc() return None else: print("Waiting before next attempt...") time.sleep(3) continue return None def resize_if_needed(image_path, max_mb=1, target_width=720): file_size = os.path.getsize(image_path) / (1024 * 1024) # dalam MB if file_size > max_mb: try: with Image.open(image_path) as img: width, height = img.size if width > target_width: ratio = target_width / float(width) new_height = int((float(height) * float(ratio))) img = img.resize((target_width, new_height), Image.Resampling.LANCZOS) img.save(image_path, optimize=True, quality=85) print(f"Image resized to {target_width}x{new_height}") except Exception as e: print(f"Resize error: {e}") def extract_text_from_image(image_path): try: resize_if_needed(image_path, max_mb=1, target_width=720) # Use Tesseract OCR with Indonesian language text = pytesseract.image_to_string(Image.open(image_path), lang='ind') print(f"OCR text extracted with Tesseract: {len(text)} characters") return text.strip() except Exception as e: print(f"Tesseract OCR error: {e}") return "" def prepare_data_for_model(image_path, text): image = Image.open(image_path) image_tensor = transform(image).unsqueeze(0).to(device) clean_text_data = clean_text(text) encoding = tokenizer.encode_plus( clean_text_data, add_special_tokens=True, max_length=128, padding='max_length', truncation=True, return_tensors='pt' ) input_ids = encoding['input_ids'].to(device) attention_mask = encoding['attention_mask'].to(device) return image_tensor, input_ids, attention_mask def predict_single_url(url): if not url.startswith(('http://', 'https://')): url = 'https://' + url screenshot_path = take_screenshot(url) if not screenshot_path: return f"Error: Failed to take screenshot for {url}", None, None, None, None raw_text = extract_text_from_image(screenshot_path) cleaned_text = clean_text(raw_text) if raw_text.strip() else "" if not raw_text.strip(): # Jika text kosong print(f"No OCR text found for {url}. Using Image-Only Model.") image = Image.open(screenshot_path) image_tensor = transform(image).unsqueeze(0).to(device) with torch.no_grad(): image_logits = image_only_model(image_tensor).squeeze(1) image_probs = torch.sigmoid(image_logits) threshold = 0.6 is_gambling = image_probs[0] > threshold label = "Gambling" if is_gambling else "Non-Gambling" confidence = image_probs[0].item() if is_gambling else 1 - image_probs[0].item() print(f"[Image-Only] URL: {url}") print(f"Prediction: {label} | Confidence: {confidence:.2f}\n") return label, f"Confidence: {confidence:.2f}", screenshot_path, raw_text, cleaned_text else: image_tensor, input_ids, attention_mask = prepare_data_for_model(screenshot_path, raw_text) with torch.no_grad(): fused_logits, image_logits, text_logits, weights = fusion_model(image_tensor, input_ids, attention_mask) fused_probs = torch.sigmoid(fused_logits) image_probs = torch.sigmoid(image_logits) text_probs = torch.sigmoid(text_logits) threshold = 0.6 is_gambling = fused_probs[0] > threshold label = "Gambling" if is_gambling else "Non-Gambling" confidence = fused_probs[0].item() if is_gambling else 1 - fused_probs[0].item() # ✨ Log detail print(f"[Fusion Model] URL: {url}") print(f"Image Model Prediction Probability: {image_probs[0]:.2f}") print(f"Text Model Prediction Probability: {text_probs[0]:.2f}") print(f"Fusion Final Prediction: {label} | Confidence: {confidence:.2f}\n") return label, f"Confidence: {confidence:.2f}", screenshot_path, raw_text, cleaned_text def predict_batch_urls(file_obj): results = [] content = file_obj.read().decode('utf-8') urls = [line.strip() for line in content.splitlines() if line.strip()] for url in urls: label, confidence, screenshot_path, raw_text, cleaned_text = predict_single_url(url) results.append({"url": url, "label": label, "confidence": confidence, "screenshot_path": screenshot_path, "raw_text": raw_text, "cleaned_text": cleaned_text}) df = pd.DataFrame(results) print(f"Batch prediction completed for {len(urls)} URLs.") return df # --- Gradio App --- with gr.Blocks() as app: gr.Markdown("# 🕵️ Gambling Website Detection (URL Based)") gr.Markdown("### Using Playwright & Tesseract OCR") with gr.Tab("Single URL"): url_input = gr.Textbox(label="Enter Website URL") predict_button = gr.Button("Predict") with gr.Row(): with gr.Column(): label_output = gr.Label() confidence_output = gr.Textbox(label="Confidence", interactive=False) with gr.Column(): screenshot_output = gr.Image(label="Screenshot", type="filepath") with gr.Row(): with gr.Column(): raw_text_output = gr.Textbox(label="Raw OCR Text", lines=5) with gr.Column(): cleaned_text_output = gr.Textbox(label="Cleaned Text", lines=5) predict_button.click( fn=predict_single_url, inputs=url_input, outputs=[label_output, confidence_output, screenshot_output, raw_text_output, cleaned_text_output] ) with gr.Tab("Batch URLs"): file_input = gr.File(label="Upload .txt file with URLs (one per line)") batch_predict_button = gr.Button("Batch Predict") batch_output = gr.DataFrame() batch_predict_button.click(fn=predict_batch_urls, inputs=file_input, outputs=batch_output) if __name__ == "__main__": app.launch(server_name="0.0.0.0", server_port=7860)