Spaces:
Build error
Build error
from flask import Flask, request, jsonify, render_template, url_for | |
from flask_cors import CORS | |
import torch | |
import torch.nn as nn | |
from torchvision import models, transforms | |
from PIL import Image | |
from huggingface_hub import hf_hub_download | |
import os | |
from mtcnn import MTCNN | |
import cv2 | |
from flask_bcrypt import generate_password_hash, check_password_hash | |
from pymongo import MongoClient | |
import numpy as np | |
from werkzeug.security import generate_password_hash, check_password_hash | |
from werkzeug.utils import secure_filename | |
import logging | |
import matplotlib.pyplot as plt | |
import seaborn as sns | |
from transformers import AutoImageProcessor, AutoModelForImageClassification # New imports | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
app = Flask(__name__, template_folder="templates", static_folder="static") | |
CORS(app) | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
UPLOAD_FOLDER = "static/uploads" | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
# ------------------- Model Loading Functions ------------------- | |
def load_model_from_hf(repo_id, filename, num_classes): | |
model_path = hf_hub_download(repo_id=repo_id, filename=filename) | |
model = models.convnext_tiny(weights=None) | |
in_features = model.classifier[2].in_features | |
model.classifier[2] = nn.Linear(in_features, num_classes) | |
model.load_state_dict(torch.load(model_path, map_location=device)) | |
model.to(device) | |
model.eval() | |
return model | |
# Load the existing deepfake/cheapfake models | |
deepfake_model = load_model_from_hf("faryalnimra/DFDC-detection-model", "DFDC.pth", 2) | |
cheapfake_model = load_model_from_hf("faryalnimra/ORIG-TAMP", "ORIG-TAMP.pth", 1) | |
# ------------------- New Real/Fake Detector Model ------------------- | |
# This model determines if the uploaded image is real (label 1) or fake (label 0) | |
model_name = "prithivMLmods/Deep-Fake-Detector-Model" | |
processor = AutoImageProcessor.from_pretrained(model_name, use_fast=False) | |
realfake_detector = AutoModelForImageClassification.from_pretrained(model_name) | |
realfake_detector.to(device) | |
realfake_detector.eval() | |
# ------------------- Image Preprocessing ------------------- | |
transform = transforms.Compose([ | |
transforms.ToTensor(), | |
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) | |
]) | |
# ------------------- Face Detector ------------------- | |
face_detector = MTCNN() | |
def detect_face(image_path): | |
image = cv2.imread(image_path) | |
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) | |
faces = face_detector.detect_faces(image_rgb) | |
face_count = sum(1 for face in faces if face.get("confidence", 0) > 0.90 and face.get("box", [0, 0, 0, 0])[2] > 30) | |
return face_count | |
# ------------------- API Endpoint: /predict ------------------- | |
def predict(): | |
if "file" not in request.files: | |
return jsonify({"error": "No file uploaded"}), 400 | |
file = request.files["file"] | |
prediction_type = request.form.get("prediction_type", "real_vs_fake") # default | |
filename = os.path.join(UPLOAD_FOLDER, file.filename) | |
file.save(filename) | |
try: | |
image = Image.open(filename).convert("RGB") | |
image_tensor = transform(image).unsqueeze(0).to(device) | |
except Exception as e: | |
return jsonify({"error": "Error processing image", "details": str(e)}), 500 | |
# --------- CASE 1: ONLY Real/Fake Prediction ---------- | |
if prediction_type == "real_vs_fake": | |
with torch.no_grad(): | |
inputs = processor(images=image, return_tensors="pt").to(device) | |
outputs_realfake = realfake_detector(**inputs) | |
pred_label = torch.argmax(outputs_realfake.logits, dim=1).item() | |
if pred_label == 1: | |
return jsonify({ | |
"prediction": "Real", | |
"message": "Image is authentic. No further processing.", | |
"image_url": url_for("static", filename=f"uploads/{file.filename}") | |
}) | |
else: | |
return jsonify({ | |
"prediction": "Fake", | |
"message": "Image is fake, but type (Deepfake/Cheapfake) not determined in this mode.", | |
"image_url": url_for("static", filename=f"uploads/{file.filename}") | |
}) | |
# --------- CASE 2: Deepfake vs Cheapfake Analysis ---------- | |
elif prediction_type == "deepfake_vs_cheapfake": | |
with torch.no_grad(): | |
deepfake_probs = torch.softmax(deepfake_model(image_tensor), dim=1)[0] | |
deepfake_confidence_before = deepfake_probs[1].item() * 100 | |
cheapfake_confidence_before = torch.sigmoid(cheapfake_model(image_tensor)).item() * 100 | |
face_count = detect_face(filename) | |
face_factor = min(face_count / 2, 1) | |
if deepfake_confidence_before <= cheapfake_confidence_before: | |
adjusted_deepfake_confidence = deepfake_confidence_before * (1 + 0.3 * face_factor) | |
adjusted_cheapfake_confidence = cheapfake_confidence_before * (1 - 0.3 * face_factor) | |
else: | |
adjusted_deepfake_confidence = deepfake_confidence_before | |
adjusted_cheapfake_confidence = cheapfake_confidence_before | |
fake_type = "Deepfake" if adjusted_deepfake_confidence > adjusted_cheapfake_confidence else "Cheapfake" | |
return jsonify({ | |
"prediction": "Fake", | |
"fake_type": fake_type, | |
"deepfake_confidence_before": f"{deepfake_confidence_before:.2f}%", | |
"deepfake_confidence_adjusted": f"{adjusted_deepfake_confidence:.2f}%", | |
"cheapfake_confidence_before": f"{cheapfake_confidence_before:.2f}%", | |
"cheapfake_confidence_adjusted": f"{adjusted_cheapfake_confidence:.2f}%", | |
"faces_detected": face_count, | |
"image_url": url_for("static", filename=f"uploads/{file.filename}") | |
}) | |
# --------- CASE 3: Invalid prediction_type --------- | |
else: | |
return jsonify({"error": "Invalid prediction_type. Use 'real_vs_fake' or 'deepfake_vs_cheapfake'"}), 400 | |
# ------------------- Heatmap Generator and API ------------------- | |
# Flask setup | |
UPLOAD_FOLDER = "static/uploads" | |
HEATMAP_FOLDER = "static/heatmaps" | |
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg"} | |
os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
os.makedirs(HEATMAP_FOLDER, exist_ok=True) | |
def allowed_file(filename): | |
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS | |
# Load your model | |
deepfake_model = load_model_from_hf("faryalnimra/DFDC-detection-model", "DFDC.pth", 2) | |
deepfake_model.eval() | |
# Choose the last Conv2D layer | |
target_layer = None | |
for name, module in deepfake_model.named_modules(): | |
if isinstance(module, torch.nn.Conv2d): | |
target_layer = module | |
# Grad-CAM class | |
class GradCAM: | |
def __init__(self, model, target_layer): | |
self.model = model | |
self.target_layer = target_layer | |
self.gradients = None | |
self.activations = None | |
self._register_hooks() | |
def _register_hooks(self): | |
def forward_hook(module, input, output): | |
self.activations = output.detach() | |
def backward_hook(module, grad_in, grad_out): | |
self.gradients = grad_out[0].detach() | |
self.target_layer.register_forward_hook(forward_hook) | |
self.target_layer.register_backward_hook(backward_hook) | |
def generate(self, input_tensor, class_idx=None): | |
self.model.eval() | |
output = self.model(input_tensor) | |
if class_idx is None: | |
class_idx = torch.argmax(output, dim=1).item() | |
self.model.zero_grad() | |
loss = output[0, class_idx] | |
loss.backward() | |
gradients = self.gradients.cpu().numpy()[0] | |
activations = self.activations.cpu().numpy()[0] | |
weights = np.mean(gradients, axis=(1, 2)) | |
cam = np.zeros(activations.shape[1:], dtype=np.float32) | |
for i, w in enumerate(weights): | |
cam += w * activations[i, :, :] | |
cam = np.maximum(cam, 0) | |
cam = cv2.resize(cam, (input_tensor.size(3), input_tensor.size(2))) | |
cam = cam - np.min(cam) | |
cam = cam / np.max(cam) | |
return cam, output | |
# Preprocessing | |
preprocess = transforms.Compose([ | |
transforms.Resize((224, 224)), | |
transforms.ToTensor(), | |
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]), | |
]) | |
gradcam = GradCAM(deepfake_model, target_layer) | |
# Generate heatmap and prediction | |
def generate_heatmap(original_image_path, heatmap_save_path): | |
img = Image.open(original_image_path).convert("RGB") | |
input_tensor = preprocess(img).unsqueeze(0) | |
cam, output = gradcam.generate(input_tensor) | |
# Get prediction | |
probabilities = torch.nn.functional.softmax(output, dim=1)[0] | |
class_idx = torch.argmax(probabilities).item() | |
confidence = probabilities[class_idx].item() | |
label = "Fake" if class_idx == 1 else "Real" | |
# Generate heatmap | |
heatmap = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET) | |
heatmap = cv2.GaussianBlur(heatmap, (7, 7), 0) | |
heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB) | |
img_np = np.array(img.resize((224, 224))) | |
superimposed_img = heatmap * 0.5 + img_np * 0.5 | |
superimposed_img = np.uint8(superimposed_img) | |
Image.fromarray(superimposed_img).save(heatmap_save_path) | |
return label, confidence | |
# Flask route | |
def generate_heatmap_api(): | |
if "file" not in request.files: | |
return jsonify({"error": "No file uploaded"}), 400 | |
file = request.files["file"] | |
if file.filename == "" or not allowed_file(file.filename): | |
return jsonify({"error": "Invalid file type. Allowed types are .png, .jpg, .jpeg"}), 400 | |
filename = secure_filename(file.filename) | |
original_image_path = os.path.join(UPLOAD_FOLDER, filename) | |
try: | |
file.save(original_image_path) | |
except Exception as e: | |
return jsonify({"error": "Failed to save the file"}), 500 | |
heatmap_filename = f"heatmap_{filename}" | |
heatmap_path = os.path.join(HEATMAP_FOLDER, heatmap_filename) | |
label, confidence = generate_heatmap(original_image_path, heatmap_path) | |
return jsonify({ | |
"original_image_url": url_for("static", filename=f"uploads/{filename}", _external=True), | |
"heatmap_image_url": url_for("static", filename=f"heatmaps/{heatmap_filename}", _external=True), | |
"prediction": label, | |
"confidence": f"{confidence:.2f}" | |
}) | |
# To run: | |
# if __name__ == "__main__": | |
# app.run(debug=True) | |
#MongoDB Atlantis from flask import Flask, request, jsonify | |
# MongoDB connection | |
client = MongoClient('mongodb+srv://fakecatcherai:[email protected]/?retryWrites=true&w=majority&appName=Cluster0') | |
db = client['fakecatcherDB'] | |
users_collection = db['users'] | |
contacts_collection = db['contacts'] | |
def is_valid_password(password): | |
if (len(password) < 8 or | |
not re.search(r'[A-Z]', password) or | |
not re.search(r'[a-z]', password) or | |
not re.search(r'[0-9]', password) or | |
not re.search(r'[!@#$%^&*(),.?":{}|<>]', password)): | |
return False | |
return True | |
def register(): | |
data = request.get_json() | |
first_name = data.get('firstName') | |
last_name = data.get('lastName') | |
email = data.get('email') | |
password = data.get('password') | |
if users_collection.find_one({'email': email}): | |
logging.warning(f"Attempted register with existing email: {email}") | |
return jsonify({'message': 'Email already exists!'}), 400 | |
# β Password constraints check | |
if not is_valid_password(password): | |
return jsonify({'message': 'Password must be at least 8 characters long and include uppercase, lowercase, number, and special character.'}), 400 | |
hashed_pw = generate_password_hash(password) | |
users_collection.insert_one({ | |
'first_name': first_name, | |
'last_name': last_name, | |
'email': email, | |
'password': hashed_pw | |
}) | |
logging.info(f"New user registered: {first_name} {last_name}, Email: {email}") | |
return jsonify({'message': 'Registration successful!'}), 201 | |
# π΅ Login Route | |
def login(): | |
data = request.get_json() | |
email = data.get('email') | |
password = data.get('password') | |
# Check if the user exists | |
user = users_collection.find_one({'email': email}) | |
if not user or not check_password_hash(user['password'], password): | |
logging.warning(f"Failed login attempt for email: {email}") | |
return jsonify({'message': 'Invalid email or password!'}), 401 | |
logging.info(f"User logged in successfully: {email}") | |
return jsonify({'message': 'Login successful!'}), 200 | |
def forgot_password(): | |
data = request.get_json() | |
email = data.get('email') | |
new_password = data.get('newPassword') | |
confirm_password = data.get('confirmPassword') | |
# Check if passwords match | |
if new_password != confirm_password: | |
logging.warning(f"Password reset failed. Passwords do not match for email: {email}") | |
return jsonify({'message': 'Passwords do not match!'}), 400 | |
# Check if the user exists | |
user = users_collection.find_one({'email': email}) | |
if not user: | |
logging.warning(f"Password reset attempt for non-existent email: {email}") | |
return jsonify({'message': 'User not found!'}), 404 | |
# Hash the new password and update it | |
hashed_pw = generate_password_hash(new_password) | |
users_collection.update_one({'email': email}, {'$set': {'password': hashed_pw}}) | |
logging.info(f"Password successfully reset for email: {email}") | |
return jsonify({'message': 'Password updated successfully!'}), 200 | |
# π£ Contact Form Route (React Page: Contact) | |
def contact(): | |
data = request.get_json() | |
email = data.get('email') | |
query = data.get('query') | |
message = data.get('message') | |
# Check if all fields are provided | |
if not email or not query or not message: | |
logging.warning(f"Incomplete contact form submission from email: {email}") | |
return jsonify({'message': 'All fields are required!'}), 400 | |
# Insert the contact data | |
contact_data = { | |
'email': email, | |
'query': query, | |
'message': message | |
} | |
contacts_collection.insert_one(contact_data) | |
logging.info(f"Contact form submitted successfully from email: {email}") | |
return jsonify({'message': 'Your message has been sent successfully.'}), 200 | |
if __name__ == '__main__': | |
app.run(debug=True) | |