|
import gradio as gr |
|
import os |
|
import re |
|
import time |
|
import torch |
|
import torch.nn as nn |
|
from PIL import Image |
|
import pytesseract |
|
from playwright.sync_api import sync_playwright |
|
import asyncio |
|
from transformers import AutoTokenizer, BertTokenizerFast |
|
from torchvision import transforms |
|
from torchvision import models |
|
from torchvision.transforms import functional as F |
|
import pandas as pd |
|
from huggingface_hub import hf_hub_download |
|
import warnings |
|
warnings.filterwarnings("ignore") |
|
from pathlib import Path |
|
import subprocess |
|
import traceback |
|
|
|
|
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
print(f"Using device: {device}") |
|
|
|
|
|
try: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tokenizer = BertTokenizerFast.from_pretrained("indobenchmark/indobert-base-p1") |
|
except Exception as e: |
|
print(f"Error loading tokenizer: {e}") |
|
|
|
print("Falling back to default BERT tokenizer") |
|
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased') |
|
|
|
|
|
class ResizePadToSquare: |
|
def __init__(self, target_size=300): |
|
self.target_size = target_size |
|
|
|
def __call__(self, img): |
|
img = img.convert("RGB") |
|
img.thumbnail((self.target_size, self.target_size), Image.BILINEAR) |
|
delta_w = self.target_size - img.size[0] |
|
delta_h = self.target_size - img.size[1] |
|
padding = (delta_w // 2, delta_h // 2, delta_w - delta_w // 2, delta_h - delta_h // 2) |
|
img = F.pad(img, padding, fill=0, padding_mode='constant') |
|
return img |
|
|
|
transform = transforms.Compose([ |
|
ResizePadToSquare(300), |
|
transforms.ToTensor(), |
|
transforms.Normalize(mean=[0.485, 0.456, 0.406], |
|
std=[0.229, 0.224, 0.225]), |
|
]) |
|
|
|
|
|
def ensure_playwright_chromium(): |
|
try: |
|
print("Checking and installing Playwright Chromium if not present...") |
|
subprocess.run(["playwright", "install", "chromium"], check=True) |
|
print("Playwright Chromium installation completed.") |
|
except Exception as e: |
|
print("Error during Playwright Chromium installation:", e) |
|
traceback.print_exc() |
|
|
|
|
|
ensure_playwright_chromium() |
|
|
|
|
|
SCREENSHOT_DIR = "screenshots" |
|
os.makedirs(SCREENSHOT_DIR, exist_ok=True) |
|
|
|
|
|
pytesseract.pytesseract.tesseract_cmd = '/usr/bin/tesseract' |
|
print("Tesseract OCR initialized.") |
|
|
|
|
|
class LateFusionModel(nn.Module): |
|
def __init__(self, image_model, text_model): |
|
super(LateFusionModel, self).__init__() |
|
self.image_model = image_model |
|
self.text_model = text_model |
|
self.image_weight = nn.Parameter(torch.tensor(0.5)) |
|
self.text_weight = nn.Parameter(torch.tensor(0.5)) |
|
|
|
def forward(self, images, input_ids, attention_mask): |
|
with torch.no_grad(): |
|
image_logits = self.image_model(images).squeeze(1) |
|
text_logits = self.text_model(input_ids=input_ids, attention_mask=attention_mask).logits.squeeze(1) |
|
|
|
weights = torch.softmax(torch.stack([self.image_weight, self.text_weight]), dim=0) |
|
fused_logits = weights[0] * image_logits + weights[1] * text_logits |
|
|
|
return fused_logits, image_logits, text_logits, weights |
|
|
|
|
|
model_path = "models/best_fusion_model.pt" |
|
if os.path.exists(model_path): |
|
fusion_model = torch.load(model_path, map_location=device, weights_only=False) |
|
else: |
|
model_path = hf_hub_download(repo_id="azzandr/gambling-fusion-model", filename="best_fusion_model.pt") |
|
fusion_model = torch.load(model_path, map_location=device, weights_only=False) |
|
|
|
fusion_model.to(device) |
|
fusion_model.eval() |
|
print("Fusion model loaded successfully!") |
|
|
|
|
|
|
|
image_model_path = "models/best_image_model_Adam_lr0.0001_bs32_state_dict.pt" |
|
if os.path.exists(image_model_path): |
|
image_only_model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT) |
|
num_features = image_only_model.classifier[1].in_features |
|
image_only_model.classifier = nn.Linear(num_features, 1) |
|
image_only_model.load_state_dict(torch.load(image_model_path, map_location=device)) |
|
image_only_model.to(device) |
|
image_only_model.eval() |
|
print("Image-only model loaded from state_dict successfully!") |
|
else: |
|
|
|
image_model_path = hf_hub_download(repo_id="azzandr/gambling-image-model", |
|
filename="best_image_model_Adam_lr0.0001_bs32_state_dict.pt") |
|
image_only_model = models.efficientnet_b3(weights=models.EfficientNet_B3_Weights.DEFAULT) |
|
num_features = image_only_model.classifier[1].in_features |
|
image_only_model.classifier = nn.Linear(num_features, 1) |
|
image_only_model.load_state_dict(torch.load(image_model_path, map_location=device)) |
|
image_only_model.to(device) |
|
image_only_model.eval() |
|
print("Image-only model loaded from HuggingFace successfully!") |
|
|
|
|
|
def clean_text(text): |
|
exceptions = { |
|
"di", "ke", "ya" |
|
} |
|
|
|
text = re.sub(r"http\S+", "", text) |
|
text = re.sub(r"\n", " ", text) |
|
text = re.sub(r"[^a-zA-Z']", " ", text) |
|
text = re.sub(r"\s{2,}", " ", text).strip().lower() |
|
|
|
|
|
words = text.split() |
|
filtered_words = [ |
|
w for w in words |
|
if (len(w) > 2 or w in exceptions) |
|
] |
|
text = ' '.join(filtered_words) |
|
|
|
|
|
text = re.sub(r'\b[aeiou]+\b', '', text) |
|
text = re.sub(r'\b[^aeiou\s]+\b', '', text) |
|
text = re.sub(r'\b\w{20,}\b', '', text) |
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
|
|
|
if len(text.split()) < 5: |
|
print(f"Cleaned text too short ({len(text.split())} words). Ignoring text.") |
|
return "" |
|
return text |
|
|
|
|
|
def take_screenshot(url): |
|
filename = url.replace('https://', '').replace('http://', '').replace('/', '_').replace('.', '_') + '.png' |
|
filepath = os.path.join(SCREENSHOT_DIR, filename) |
|
|
|
try: |
|
print(f"\n=== [START SCREENSHOT] URL: {url} ===") |
|
from playwright.sync_api import sync_playwright |
|
|
|
with sync_playwright() as p: |
|
print("Launching Playwright Chromium...") |
|
browser = p.chromium.launch() |
|
page = browser.new_page( |
|
viewport={"width": 1280, "height": 800}, |
|
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/113.0.0.0 Safari/537.36" |
|
) |
|
page.set_default_timeout(60000) |
|
page.set_extra_http_headers({"Accept-Language": "en-US,en;q=0.9"}) |
|
|
|
print("Navigating to URL...") |
|
page.goto(url, wait_until="networkidle", timeout=60000) |
|
page.wait_for_timeout(3000) |
|
|
|
print("Taking screenshot (viewport only)...") |
|
page.screenshot(path=filepath) |
|
browser.close() |
|
print(f"Screenshot saved to {filepath}") |
|
|
|
print(f"=== [END SCREENSHOT] ===\n") |
|
return filepath |
|
|
|
except Exception as e: |
|
print(f"[ERROR] Failed to take screenshot for URL: {url}") |
|
print(f"Exception: {e}") |
|
traceback.print_exc() |
|
return None |
|
|
|
def resize_if_needed(image_path, max_mb=1, target_width=720): |
|
file_size = os.path.getsize(image_path) / (1024 * 1024) |
|
if file_size > max_mb: |
|
try: |
|
with Image.open(image_path) as img: |
|
width, height = img.size |
|
if width > target_width: |
|
ratio = target_width / float(width) |
|
new_height = int((float(height) * float(ratio))) |
|
img = img.resize((target_width, new_height), Image.Resampling.LANCZOS) |
|
img.save(image_path, optimize=True, quality=85) |
|
print(f"Image resized to {target_width}x{new_height}") |
|
except Exception as e: |
|
print(f"Resize error: {e}") |
|
|
|
def extract_text_from_image(image_path): |
|
try: |
|
resize_if_needed(image_path, max_mb=1, target_width=720) |
|
|
|
|
|
text = pytesseract.image_to_string(Image.open(image_path), lang='ind') |
|
print(f"OCR text extracted with Tesseract: {len(text)} characters") |
|
|
|
return text.strip() |
|
except Exception as e: |
|
print(f"Tesseract OCR error: {e}") |
|
return "" |
|
|
|
def prepare_data_for_model(image_path, text): |
|
image = Image.open(image_path) |
|
image_tensor = transform(image).unsqueeze(0).to(device) |
|
|
|
clean_text_data = clean_text(text) |
|
encoding = tokenizer.encode_plus( |
|
clean_text_data, |
|
add_special_tokens=True, |
|
max_length=128, |
|
padding='max_length', |
|
truncation=True, |
|
return_tensors='pt' |
|
) |
|
|
|
input_ids = encoding['input_ids'].to(device) |
|
attention_mask = encoding['attention_mask'].to(device) |
|
|
|
return image_tensor, input_ids, attention_mask |
|
|
|
def predict_single_url(url): |
|
if not url.startswith(('http://', 'https://')): |
|
url = 'https://' + url |
|
|
|
screenshot_path = take_screenshot(url) |
|
if not screenshot_path: |
|
return f"Error: Failed to take screenshot for {url}", None |
|
|
|
text = extract_text_from_image(screenshot_path) |
|
|
|
if not text.strip(): |
|
print(f"No OCR text found for {url}. Using Image-Only Model.") |
|
image = Image.open(screenshot_path) |
|
image_tensor = transform(image).unsqueeze(0).to(device) |
|
|
|
with torch.no_grad(): |
|
image_logits = image_only_model(image_tensor).squeeze(1) |
|
image_probs = torch.sigmoid(image_logits) |
|
|
|
threshold = 0.6 |
|
is_gambling = image_probs[0] > threshold |
|
|
|
label = "Gambling" if is_gambling else "Non-Gambling" |
|
confidence = image_probs[0].item() if is_gambling else 1 - image_probs[0].item() |
|
print(f"[Image-Only] URL: {url}") |
|
print(f"Prediction: {label} | Confidence: {confidence:.2f}\n") |
|
return label, f"Confidence: {confidence:.2f}" |
|
|
|
else: |
|
image_tensor, input_ids, attention_mask = prepare_data_for_model(screenshot_path, text) |
|
|
|
with torch.no_grad(): |
|
fused_logits, image_logits, text_logits, weights = fusion_model(image_tensor, input_ids, attention_mask) |
|
fused_probs = torch.sigmoid(fused_logits) |
|
image_probs = torch.sigmoid(image_logits) |
|
text_probs = torch.sigmoid(text_logits) |
|
|
|
threshold = 0.6 |
|
is_gambling = fused_probs[0] > threshold |
|
|
|
label = "Gambling" if is_gambling else "Non-Gambling" |
|
confidence = fused_probs[0].item() if is_gambling else 1 - fused_probs[0].item() |
|
|
|
|
|
print(f"[Fusion Model] URL: {url}") |
|
print(f"Image Model Prediction Probability: {image_probs[0]:.2f}") |
|
print(f"Text Model Prediction Probability: {text_probs[0]:.2f}") |
|
print(f"Fusion Final Prediction: {label} | Confidence: {confidence:.2f}\n") |
|
|
|
return label, f"Confidence: {confidence:.2f}" |
|
|
|
def predict_batch_urls(file_obj): |
|
results = [] |
|
content = file_obj.read().decode('utf-8') |
|
urls = [line.strip() for line in content.splitlines() if line.strip()] |
|
for url in urls: |
|
label, confidence = predict_single_url(url) |
|
results.append({"url": url, "label": label, "confidence": confidence}) |
|
|
|
df = pd.DataFrame(results) |
|
print(f"Batch prediction completed for {len(urls)} URLs.") |
|
return df |
|
|
|
|
|
|
|
with gr.Blocks() as app: |
|
gr.Markdown("# 🕵️ Gambling Website Detection (URL Based)") |
|
gr.Markdown("### Using Playwright & Tesseract OCR") |
|
|
|
with gr.Tab("Single URL"): |
|
url_input = gr.Textbox(label="Enter Website URL") |
|
predict_button = gr.Button("Predict") |
|
label_output = gr.Label() |
|
confidence_output = gr.Textbox(label="Confidence", interactive=False) |
|
|
|
predict_button.click(fn=predict_single_url, inputs=url_input, outputs=[label_output, confidence_output]) |
|
|
|
with gr.Tab("Batch URLs"): |
|
file_input = gr.File(label="Upload .txt file with URLs (one per line)") |
|
batch_predict_button = gr.Button("Batch Predict") |
|
batch_output = gr.DataFrame() |
|
|
|
batch_predict_button.click(fn=predict_batch_urls, inputs=file_input, outputs=batch_output) |
|
|
|
if __name__ == "__main__": |
|
app.launch(server_name="0.0.0.0", server_port=7860) |