faryal / app.py
faryalnimra's picture
Upload full backend project
85f4388
from flask import Flask, request, jsonify, render_template, url_for
from flask_cors import CORS
import torch
import torch.nn as nn
from torchvision import models, transforms
from PIL import Image
from huggingface_hub import hf_hub_download
import os
from mtcnn import MTCNN
import cv2
from flask_bcrypt import generate_password_hash, check_password_hash
from pymongo import MongoClient
import numpy as np
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
import logging
import matplotlib.pyplot as plt
import seaborn as sns
from transformers import AutoImageProcessor, AutoModelForImageClassification # New imports
# Setup logging
logging.basicConfig(level=logging.INFO)
app = Flask(__name__, template_folder="templates", static_folder="static")
CORS(app)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
UPLOAD_FOLDER = "static/uploads"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
# ------------------- Model Loading Functions -------------------
def load_model_from_hf(repo_id, filename, num_classes):
model_path = hf_hub_download(repo_id=repo_id, filename=filename)
model = models.convnext_tiny(weights=None)
in_features = model.classifier[2].in_features
model.classifier[2] = nn.Linear(in_features, num_classes)
model.load_state_dict(torch.load(model_path, map_location=device))
model.to(device)
model.eval()
return model
# Load the existing deepfake/cheapfake models
deepfake_model = load_model_from_hf("faryalnimra/DFDC-detection-model", "DFDC.pth", 2)
cheapfake_model = load_model_from_hf("faryalnimra/ORIG-TAMP", "ORIG-TAMP.pth", 1)
# ------------------- New Real/Fake Detector Model -------------------
# This model determines if the uploaded image is real (label 1) or fake (label 0)
model_name = "prithivMLmods/Deep-Fake-Detector-Model"
processor = AutoImageProcessor.from_pretrained(model_name, use_fast=False)
realfake_detector = AutoModelForImageClassification.from_pretrained(model_name)
realfake_detector.to(device)
realfake_detector.eval()
# ------------------- Image Preprocessing -------------------
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])
# ------------------- Face Detector -------------------
face_detector = MTCNN()
def detect_face(image_path):
image = cv2.imread(image_path)
image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
faces = face_detector.detect_faces(image_rgb)
face_count = sum(1 for face in faces if face.get("confidence", 0) > 0.90 and face.get("box", [0, 0, 0, 0])[2] > 30)
return face_count
# ------------------- API Endpoint: /predict -------------------
@app.route("/predict", methods=["POST"])
def predict():
if "file" not in request.files:
return jsonify({"error": "No file uploaded"}), 400
file = request.files["file"]
prediction_type = request.form.get("prediction_type", "real_vs_fake") # default
filename = os.path.join(UPLOAD_FOLDER, file.filename)
file.save(filename)
try:
image = Image.open(filename).convert("RGB")
image_tensor = transform(image).unsqueeze(0).to(device)
except Exception as e:
return jsonify({"error": "Error processing image", "details": str(e)}), 500
# --------- CASE 1: ONLY Real/Fake Prediction ----------
if prediction_type == "real_vs_fake":
with torch.no_grad():
inputs = processor(images=image, return_tensors="pt").to(device)
outputs_realfake = realfake_detector(**inputs)
pred_label = torch.argmax(outputs_realfake.logits, dim=1).item()
if pred_label == 1:
return jsonify({
"prediction": "Real",
"message": "Image is authentic. No further processing.",
"image_url": url_for("static", filename=f"uploads/{file.filename}")
})
else:
return jsonify({
"prediction": "Fake",
"message": "Image is fake, but type (Deepfake/Cheapfake) not determined in this mode.",
"image_url": url_for("static", filename=f"uploads/{file.filename}")
})
# --------- CASE 2: Deepfake vs Cheapfake Analysis ----------
elif prediction_type == "deepfake_vs_cheapfake":
with torch.no_grad():
deepfake_probs = torch.softmax(deepfake_model(image_tensor), dim=1)[0]
deepfake_confidence_before = deepfake_probs[1].item() * 100
cheapfake_confidence_before = torch.sigmoid(cheapfake_model(image_tensor)).item() * 100
face_count = detect_face(filename)
face_factor = min(face_count / 2, 1)
if deepfake_confidence_before <= cheapfake_confidence_before:
adjusted_deepfake_confidence = deepfake_confidence_before * (1 + 0.3 * face_factor)
adjusted_cheapfake_confidence = cheapfake_confidence_before * (1 - 0.3 * face_factor)
else:
adjusted_deepfake_confidence = deepfake_confidence_before
adjusted_cheapfake_confidence = cheapfake_confidence_before
fake_type = "Deepfake" if adjusted_deepfake_confidence > adjusted_cheapfake_confidence else "Cheapfake"
return jsonify({
"prediction": "Fake",
"fake_type": fake_type,
"deepfake_confidence_before": f"{deepfake_confidence_before:.2f}%",
"deepfake_confidence_adjusted": f"{adjusted_deepfake_confidence:.2f}%",
"cheapfake_confidence_before": f"{cheapfake_confidence_before:.2f}%",
"cheapfake_confidence_adjusted": f"{adjusted_cheapfake_confidence:.2f}%",
"faces_detected": face_count,
"image_url": url_for("static", filename=f"uploads/{file.filename}")
})
# --------- CASE 3: Invalid prediction_type ---------
else:
return jsonify({"error": "Invalid prediction_type. Use 'real_vs_fake' or 'deepfake_vs_cheapfake'"}), 400
# ------------------- Heatmap Generator and API -------------------
# Flask setup
UPLOAD_FOLDER = "static/uploads"
HEATMAP_FOLDER = "static/heatmaps"
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg"}
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(HEATMAP_FOLDER, exist_ok=True)
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
# Load your model
deepfake_model = load_model_from_hf("faryalnimra/DFDC-detection-model", "DFDC.pth", 2)
deepfake_model.eval()
# Choose the last Conv2D layer
target_layer = None
for name, module in deepfake_model.named_modules():
if isinstance(module, torch.nn.Conv2d):
target_layer = module
# Grad-CAM class
class GradCAM:
def __init__(self, model, target_layer):
self.model = model
self.target_layer = target_layer
self.gradients = None
self.activations = None
self._register_hooks()
def _register_hooks(self):
def forward_hook(module, input, output):
self.activations = output.detach()
def backward_hook(module, grad_in, grad_out):
self.gradients = grad_out[0].detach()
self.target_layer.register_forward_hook(forward_hook)
self.target_layer.register_backward_hook(backward_hook)
def generate(self, input_tensor, class_idx=None):
self.model.eval()
output = self.model(input_tensor)
if class_idx is None:
class_idx = torch.argmax(output, dim=1).item()
self.model.zero_grad()
loss = output[0, class_idx]
loss.backward()
gradients = self.gradients.cpu().numpy()[0]
activations = self.activations.cpu().numpy()[0]
weights = np.mean(gradients, axis=(1, 2))
cam = np.zeros(activations.shape[1:], dtype=np.float32)
for i, w in enumerate(weights):
cam += w * activations[i, :, :]
cam = np.maximum(cam, 0)
cam = cv2.resize(cam, (input_tensor.size(3), input_tensor.size(2)))
cam = cam - np.min(cam)
cam = cam / np.max(cam)
return cam, output
# Preprocessing
preprocess = transforms.Compose([
transforms.Resize((224, 224)),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
gradcam = GradCAM(deepfake_model, target_layer)
# Generate heatmap and prediction
def generate_heatmap(original_image_path, heatmap_save_path):
img = Image.open(original_image_path).convert("RGB")
input_tensor = preprocess(img).unsqueeze(0)
cam, output = gradcam.generate(input_tensor)
# Get prediction
probabilities = torch.nn.functional.softmax(output, dim=1)[0]
class_idx = torch.argmax(probabilities).item()
confidence = probabilities[class_idx].item()
label = "Fake" if class_idx == 1 else "Real"
# Generate heatmap
heatmap = cv2.applyColorMap(np.uint8(255 * cam), cv2.COLORMAP_JET)
heatmap = cv2.GaussianBlur(heatmap, (7, 7), 0)
heatmap = cv2.cvtColor(heatmap, cv2.COLOR_BGR2RGB)
img_np = np.array(img.resize((224, 224)))
superimposed_img = heatmap * 0.5 + img_np * 0.5
superimposed_img = np.uint8(superimposed_img)
Image.fromarray(superimposed_img).save(heatmap_save_path)
return label, confidence
# Flask route
@app.route("/generate_heatmap", methods=["POST"])
def generate_heatmap_api():
if "file" not in request.files:
return jsonify({"error": "No file uploaded"}), 400
file = request.files["file"]
if file.filename == "" or not allowed_file(file.filename):
return jsonify({"error": "Invalid file type. Allowed types are .png, .jpg, .jpeg"}), 400
filename = secure_filename(file.filename)
original_image_path = os.path.join(UPLOAD_FOLDER, filename)
try:
file.save(original_image_path)
except Exception as e:
return jsonify({"error": "Failed to save the file"}), 500
heatmap_filename = f"heatmap_{filename}"
heatmap_path = os.path.join(HEATMAP_FOLDER, heatmap_filename)
label, confidence = generate_heatmap(original_image_path, heatmap_path)
return jsonify({
"original_image_url": url_for("static", filename=f"uploads/{filename}", _external=True),
"heatmap_image_url": url_for("static", filename=f"heatmaps/{heatmap_filename}", _external=True),
"prediction": label,
"confidence": f"{confidence:.2f}"
})
# To run:
# if __name__ == "__main__":
# app.run(debug=True)
#MongoDB Atlantis from flask import Flask, request, jsonify
# MongoDB connection
client = MongoClient('mongodb+srv://fakecatcherai:[email protected]/?retryWrites=true&w=majority&appName=Cluster0')
db = client['fakecatcherDB']
users_collection = db['users']
contacts_collection = db['contacts']
def is_valid_password(password):
if (len(password) < 8 or
not re.search(r'[A-Z]', password) or
not re.search(r'[a-z]', password) or
not re.search(r'[0-9]', password) or
not re.search(r'[!@#$%^&*(),.?":{}|<>]', password)):
return False
return True
@app.route('/Register', methods=['POST'])
def register():
data = request.get_json()
first_name = data.get('firstName')
last_name = data.get('lastName')
email = data.get('email')
password = data.get('password')
if users_collection.find_one({'email': email}):
logging.warning(f"Attempted register with existing email: {email}")
return jsonify({'message': 'Email already exists!'}), 400
# βœ… Password constraints check
if not is_valid_password(password):
return jsonify({'message': 'Password must be at least 8 characters long and include uppercase, lowercase, number, and special character.'}), 400
hashed_pw = generate_password_hash(password)
users_collection.insert_one({
'first_name': first_name,
'last_name': last_name,
'email': email,
'password': hashed_pw
})
logging.info(f"New user registered: {first_name} {last_name}, Email: {email}")
return jsonify({'message': 'Registration successful!'}), 201
# πŸ”΅ Login Route
@app.route('/Login', methods=['POST'])
def login():
data = request.get_json()
email = data.get('email')
password = data.get('password')
# Check if the user exists
user = users_collection.find_one({'email': email})
if not user or not check_password_hash(user['password'], password):
logging.warning(f"Failed login attempt for email: {email}")
return jsonify({'message': 'Invalid email or password!'}), 401
logging.info(f"User logged in successfully: {email}")
return jsonify({'message': 'Login successful!'}), 200
@app.route('/ForgotPassword', methods=['POST'])
def forgot_password():
data = request.get_json()
email = data.get('email')
new_password = data.get('newPassword')
confirm_password = data.get('confirmPassword')
# Check if passwords match
if new_password != confirm_password:
logging.warning(f"Password reset failed. Passwords do not match for email: {email}")
return jsonify({'message': 'Passwords do not match!'}), 400
# Check if the user exists
user = users_collection.find_one({'email': email})
if not user:
logging.warning(f"Password reset attempt for non-existent email: {email}")
return jsonify({'message': 'User not found!'}), 404
# Hash the new password and update it
hashed_pw = generate_password_hash(new_password)
users_collection.update_one({'email': email}, {'$set': {'password': hashed_pw}})
logging.info(f"Password successfully reset for email: {email}")
return jsonify({'message': 'Password updated successfully!'}), 200
# 🟣 Contact Form Route (React Page: Contact)
@app.route('/Contact', methods=['POST'])
def contact():
data = request.get_json()
email = data.get('email')
query = data.get('query')
message = data.get('message')
# Check if all fields are provided
if not email or not query or not message:
logging.warning(f"Incomplete contact form submission from email: {email}")
return jsonify({'message': 'All fields are required!'}), 400
# Insert the contact data
contact_data = {
'email': email,
'query': query,
'message': message
}
contacts_collection.insert_one(contact_data)
logging.info(f"Contact form submitted successfully from email: {email}")
return jsonify({'message': 'Your message has been sent successfully.'}), 200
if __name__ == '__main__':
app.run(debug=True)