Attaque2 / app.py
Docfile's picture
Update app.py
e1feafa verified
raw
history blame
24.3 kB
# -*- coding: utf-8 -*- # Bonne pratique d'ajouter l'encodage
# Importations nécessaires
from flask import Flask, request, render_template_string, jsonify, redirect, url_for
import requests
import threading
import uuid # Pour générer des identifiants uniques pour chaque tâche
import time
import copy # Pour copier le payload pour chaque requête
from typing import Dict, Any, List, Tuple, Optional # Pour les annotations de type
# --- Configuration ---
TARGET_URL: str = "https://hook.us1.make.com/zal5qn0ggbewmvtsbo2uenfno8tz3n56"
BASE_PAYLOAD: Dict[str, Any] = {
"name": "Testeur Auto ",
"email": "[email protected]", # Ajout de '+auto' pour distinguer
"company": "aragon Inc.",
"message": "Ceci est un test automatisé via Flask.",
"date": "2023-10-27T10:30:00Z", # Tu pourrais rendre cette date dynamique si besoin
"source": "http://simulateur-bsbs-flask.com"
}
# Constantes pour les statuts (évite les fautes de frappe)
STATUS_STARTING: str = 'starting'
STATUS_RUNNING: str = 'running'
STATUS_COMPLETED: str = 'completed'
STATUS_FAILED: str = 'failed'
# Optional: STATUS_COMPLETED_WITH_ERRORS: str = 'completed_with_errors'
# Structure pour stocker l'état des tâches (jobs) en mémoire
# Format: { 'job_id': {'status': str, 'total': int, 'completed_count': int, 'error_count': int, 'errors': List[Dict[str, Any]] } }
jobs: Dict[str, Dict[str, Any]] = {}
jobs_lock = threading.Lock() # Pour éviter les problèmes d'accès concurrents au dict jobs
app = Flask(__name__) # Correction de l'initialisation de Flask
# --- Templates HTML (inchangés, car ils semblent corrects) ---
# Page d'accueil pour démarrer les requêtes
HTML_INDEX = """
<!DOCTYPE html>
<html lang="fr">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Lanceur de Requêtes</title>
<style>
body { font-family: sans-serif; margin: 20px; background-color: #f4f4f4; color: #333; }
h1, h2 { color: #555; }
label { display: block; margin-bottom: 5px; font-weight: bold; }
input[type=number] { width: 100px; padding: 8px; margin-bottom: 15px; border: 1px solid #ccc; border-radius: 4px; }
button { padding: 10px 15px; cursor: pointer; background-color: #007bff; color: white; border: none; border-radius: 4px; }
button:hover { background-color: #0056b3; }
.error { color: red; margin-top: 10px; font-weight: bold; }
ul { list-style: none; padding: 0; }
li { background-color: #fff; margin-bottom: 10px; padding: 10px; border: 1px solid #ddd; border-radius: 4px; }
li a { text-decoration: none; color: #007bff; }
li a:hover { text-decoration: underline; }
.job-info { font-size: 0.9em; color: #666; }
</style>
</head>
<body>
<h1>Envoyer des Requêtes POST en Masse</h1>
<form method="POST" action="{{ url_for('start_requests') }}">
<label for="num_requests">Nombre de requêtes à envoyer :</label>
<input type="number" id="num_requests" name="num_requests" min="1" required value="10">
<button type="submit">Lancer les requêtes</button>
</form>
{% if error %}
<p class="error">{{ error }}</p>
{% endif %}
<h2>Tâches récentes :</h2>
<ul>
{% for job_id, job_info in jobs_list.items() %}
<li>
<a href="{{ url_for('job_status', job_id=job_id) }}">Tâche {{ job_id }}</a>
<span class="job-info">
(Statut : {{ job_info.status }}, {{ job_info.completed_count }}/{{ job_info.total }} requêtes traitées, {{ job_info.error_count }} erreurs)
</span>
</li>
{% else %}
<li>Aucune tâche récente.</li>
{% endfor %}
</ul>
</body>
</html>
"""
# Page pour suivre la progression d'une tâche spécifique
HTML_STATUS = """
<!DOCTYPE html>
<html lang="fr">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Statut Tâche {{ job_id }}</title>
<style>
body { font-family: sans-serif; margin: 20px; background-color: #f4f4f4; color: #333; }
h1 { color: #555; }
#progress-bar-container { width: 100%; background-color: #e0e0e0; border-radius: 5px; margin-bottom: 10px; overflow: hidden; } /* Overflow hidden for border radius */
#progress-bar { width: 0%; height: 30px; background-color: #28a745; /* Green default */ text-align: center; line-height: 30px; color: white; border-radius: 0; /* Radius handled by container */ transition: width 0.5s ease-in-out, background-color 0.5s ease-in-out; }
.error-log { margin-top: 15px; max-height: 300px; overflow-y: auto; border: 1px solid #ccc; padding: 10px; background-color: #f9f9f9; border-radius: 4px;}
.error-log h3 { margin-top: 0; }
.error-log p { margin: 5px 0; font-size: 0.9em; color: #dc3545; } /* Red error text */
.status-message { font-weight: bold; margin-bottom: 15px; padding: 10px; background-color: #e9ecef; border-radius: 4px; }
a { color: #007bff; text-decoration: none; }
a:hover { text-decoration: underline; }
</style>
</head>
<body>
<h1>Statut de la Tâche : {{ job_id }}</h1>
<div id="status-message" class="status-message">Chargement des informations...</div>
<div id="progress-bar-container">
<div id="progress-bar">0%</div>
</div>
<p>Requêtes traitées : <span id="completed">0</span> / <span id="total">?</span></p>
<p>Erreurs rencontrées : <span id="errors">0</span></p>
<div id="error-details" class="error-log" style="display: none;">
<h3>Détails des erreurs :</h3>
<div id="error-list"></div>
</div>
<p><a href="{{ url_for('index') }}">Retour à l'accueil</a></p>
<script>
const jobId = "{{ job_id }}";
const statusMessageEl = document.getElementById('status-message');
const progressBarEl = document.getElementById('progress-bar');
const completedEl = document.getElementById('completed');
const totalEl = document.getElementById('total');
const errorsEl = document.getElementById('errors');
const errorDetailsEl = document.getElementById('error-details');
const errorListEl = document.getElementById('error-list');
let intervalId = null;
let lastStatus = ""; // Pour éviter des mises à jour inutiles si le statut n'a pas changé
function updateStatus() {
fetch(`/api/status/${jobId}`)
.then(response => {
if (!response.ok) {
// Si 404, la tâche n'existe pas (ou plus), arrêter les requêtes
if (response.status === 404) {
statusMessageEl.textContent = "Erreur: Tâche non trouvée.";
throw new Error('Job Not Found'); // Pour arrêter l'intervalle dans le catch
}
throw new Error(`Erreur HTTP: ${response.status}`);
}
return response.json();
})
.then(data => {
if (!data) { // Devrait être géré par le 404 mais double sécurité
statusMessageEl.textContent = "En attente des données...";
return;
}
// Comparer avec le statut précédent pour éviter de redessiner si rien n'a changé
const currentDataSignature = `${data.status}-${data.completed_count}-${data.error_count}`;
if (currentDataSignature === lastStatus) {
// console.log("Pas de changement de statut détecté.");
return; // Rien à faire
}
lastStatus = currentDataSignature;
// Mise à jour des éléments DOM
completedEl.textContent = data.completed_count;
totalEl.textContent = data.total;
errorsEl.textContent = data.error_count;
statusMessageEl.textContent = `Statut : ${data.status}`; // Utilise les constantes définies côté serveur
let percentage = 0;
if (data.total > 0) {
// On calcule le pourcentage sur le nombre total de requêtes à faire
percentage = Math.round((data.completed_count / data.total) * 100);
}
progressBarEl.style.width = percentage + '%';
progressBarEl.textContent = percentage + '%';
// Afficher les erreurs s'il y en a
if (data.error_count > 0 && data.errors && data.errors.length > 0) {
errorListEl.innerHTML = ''; // Vider les erreurs précédentes
data.errors.forEach(err => {
const p = document.createElement('p');
// Utilisation de textContent pour éviter les injections XSS potentielles si les messages d'erreur contenaient du HTML
p.textContent = `Req ${err.index}: ${err.error}`;
errorListEl.appendChild(p);
});
errorDetailsEl.style.display = 'block';
} else {
errorDetailsEl.style.display = 'none';
}
// Gérer la fin de la tâche
if (data.status === '{{ STATUS_COMPLETED }}' || data.status === '{{ STATUS_FAILED }}') {
if (intervalId) {
clearInterval(intervalId);
intervalId = null; // Arrête les mises à jour futures
app.logger.info(f"Mises à jour automatiques arrêtées pour la tâche {jobId} (statut final: {data.status}).");
}
// Mettre à jour la couleur de la barre de progression en fonction du résultat final
if (data.status === '{{ STATUS_COMPLETED }}' && data.error_count == 0) {
progressBarEl.style.backgroundColor = '#28a745'; // Vert succès
} else if (data.status === '{{ STATUS_COMPLETED }}' && data.error_count > 0) {
progressBarEl.style.backgroundColor = '#ffc107'; // Jaune/Orange pour succès avec erreurs
} else { // STATUS_FAILED
progressBarEl.style.backgroundColor = '#dc3545'; // Rouge échec
}
} else {
// Si la tâche est en cours, s'assurer que la couleur est celle par défaut (ou une couleur "en cours")
progressBarEl.style.backgroundColor = '#007bff'; // Bleu pour en cours
}
})
.catch(error => {
console.error("Erreur lors de la récupération du statut:", error);
if (error.message !== 'Job Not Found') { // Ne pas écraser le message "Tâche non trouvée"
statusMessageEl.textContent = "Erreur lors de la récupération du statut.";
}
// Arrêter les mises à jour en cas d'erreur persistante ou de tâche non trouvée
if (intervalId) {
clearInterval(intervalId);
intervalId = null;
app.logger.error(f"Arrêt des mises à jour pour la tâche {jobId} suite à une erreur: {error}");
}
// Optionnel: Changer la couleur de la barre en cas d'erreur de récupération
progressBarEl.style.backgroundColor = '#6c757d'; // Gris pour état indéterminé/erreur
});
}
// Démarrer la mise à jour : première mise à jour immédiate, puis toutes les 2 secondes
updateStatus();
if (!intervalId && statusMessageEl.textContent !== "Erreur: Tâche non trouvée.") { // Ne pas démarrer l'intervalle si la tâche n'est déjà pas trouvée
intervalId = setInterval(updateStatus, 2000);
}
</script>
</body>
</html>
"""
# --- Fonctions Logiques ---
def send_single_request(target_url: str, payload: Dict[str, Any], job_id: str, request_index: int) -> Tuple[bool, Optional[str]]:
"""
Envoie UNE requête POST unique à la cible.
Args:
target_url: L'URL à laquelle envoyer la requête.
payload: Le dictionnaire de base du payload.
job_id: L'ID de la tâche parente (pour logs/debug).
request_index: L'index de cette requête dans la tâche (0-based).
Returns:
Un tuple (succès: bool, message_erreur: Optional[str]).
"""
# Crée une copie profonde pour éviter de modifier l'original entre les threads
# et pour ajouter des informations spécifiques à cette requête.
current_payload = copy.deepcopy(payload)
# Personnalisation du message pour identifier la requête spécifique
current_payload['message'] += f" (Requête {request_index + 1} / Job {job_id})"
# Ajout d'un UUID unique à chaque requête pour un suivi fin si nécessaire côté serveur cible
current_payload['request_uuid'] = str(uuid.uuid4())
# Optionnel: Mettre à jour la date/heure au moment de l'envoi
# from datetime import datetime, timezone
# current_payload['date'] = datetime.now(timezone.utc).isoformat()
try:
response = requests.post(target_url, json=current_payload, timeout=30) # Timeout de 30 secondes
response.raise_for_status() # Lève une exception pour les codes d'erreur HTTP (4xx, 5xx)
# app.logger.debug(f"Job {job_id} - Requête {request_index + 1}: Succès (Status {response.status_code})")
return True, None # Succès
except requests.exceptions.Timeout:
error_msg = f"Timeout après 30s pour Req {request_index + 1}"
app.logger.warning(f"Job {job_id}: {error_msg}")
return False, error_msg
except requests.exceptions.HTTPError as e:
error_msg = f"Erreur HTTP {e.response.status_code} pour Req {request_index + 1}: {e.response.reason}"
app.logger.warning(f"Job {job_id}: {error_msg} - Réponse: {e.response.text[:200]}") # Log début de réponse
return False, error_msg
except requests.exceptions.RequestException as e:
# Capture les autres erreurs (DNS, connexion refusée, etc.)
error_msg = f"Erreur Réseau/Requête pour Req {request_index + 1}: {str(e)}"
app.logger.error(f"Job {job_id}: {error_msg}")
return False, error_msg # Échec avec message d'erreur
def background_task(job_id: str, num_requests: int, target_url: str, base_payload: Dict[str, Any]):
"""
Fonction exécutée dans un thread séparé pour envoyer toutes les requêtes d'une tâche.
Met à jour l'état de la tâche dans le dictionnaire partagé `jobs`.
"""
app.logger.info(f"Tâche {job_id}: Démarrage de {num_requests} requêtes vers {target_url}")
completed_count: int = 0
error_count: int = 0
error_messages: List[Dict[str, Any]] = []
# Note: L'initialisation de la tâche est maintenant faite dans /start avant le lancement du thread.
# La partie ci-dessous est redondante mais laissée commentée pour info.
# # Vérifier/Initialiser le statut (au cas où, bien que fait dans /start)
# with jobs_lock:
# if job_id not in jobs:
# app.logger.warning(f"Tâche {job_id} non trouvée dans jobs au démarrage du thread. Initialisation.")
# jobs[job_id] = {
# 'status': STATUS_RUNNING, # On la met directement en running
# 'total': num_requests,
# 'completed_count': 0,
# 'error_count': 0,
# 'errors': []
# }
# else:
# # Si elle existe déjà (normal), s'assurer qu'elle est en running
# jobs[job_id]['status'] = STATUS_RUNNING
# Mettre à jour le statut en 'running' une fois démarré
with jobs_lock:
if job_id in jobs:
jobs[job_id]['status'] = STATUS_RUNNING
else:
# Cas très improbable si /start n'a pas fini avant que le thread ne check
app.logger.error(f"Tâche {job_id} non trouvée au moment de passer en status RUNNING.")
return # Arrêter le thread si la tâche n'existe pas
for i in range(num_requests):
success, error_msg = send_single_request(target_url, base_payload, job_id, i)
# Mettre à jour la progression dans le dictionnaire partagé (section critique)
with jobs_lock:
# Vérifier si la tâche existe toujours (elle pourrait être supprimée?)
if job_id not in jobs:
app.logger.warning(f"Tâche {job_id} disparue pendant l'exécution. Arrêt.")
break # Sortir de la boucle si la tâche n'est plus suivie
# Incrémenter le compteur des requêtes traitées
jobs[job_id]['completed_count'] += 1
completed_count = jobs[job_id]['completed_count'] # Mettre à jour la variable locale aussi
if not success:
jobs[job_id]['error_count'] += 1
error_count = jobs[job_id]['error_count'] # Mettre à jour la variable locale
# Ajouter les détails de l'erreur (index basé sur 1 pour l'affichage)
error_detail = {'index': i + 1, 'error': error_msg or "Erreur inconnue"}
jobs[job_id]['errors'].append(error_detail)
# Gardons seulement les X dernières erreurs pour éviter de saturer la mémoire
max_errors_to_keep = 100
jobs[job_id]['errors'] = jobs[job_id]['errors'][-max_errors_to_keep:]
# Petite pause optionnelle pour ne pas surcharger la cible ou le réseau local
# time.sleep(0.05) # 50ms pause
# Marquer la tâche comme terminée une fois la boucle finie
with jobs_lock:
if job_id in jobs:
# Déterminer le statut final basé sur les erreurs
# Si toutes les requêtes ont échoué -> FAILED
# Sinon -> COMPLETED (le nombre d'erreurs indique si c'était parfait ou non)
final_status = STATUS_FAILED if error_count == num_requests and num_requests > 0 else STATUS_COMPLETED
jobs[job_id]['status'] = final_status
app.logger.info(f"Tâche {job_id}: Terminé. Statut final: {final_status}. {completed_count - error_count}/{num_requests} succès, {error_count} erreurs.")
else:
app.logger.warning(f"Tâche {job_id} non trouvée à la fin de l'exécution pour marquer comme terminée.")
# --- Routes Flask ---
@app.route('/', methods=['GET'])
def index():
"""Affiche la page d'accueil avec le formulaire et la liste des tâches."""
with jobs_lock:
# Trie les tâches par clé (ID), en ordre inverse (plus récent d'abord si UUID approxime le temps)
# Pour un tri chronologique fiable, il faudrait ajouter un timestamp lors de la création de la tâche.
sorted_jobs = dict(sorted(jobs.items(), reverse=True))
return render_template_string(HTML_INDEX, jobs_list=sorted_jobs)
@app.route('/start', methods=['POST'])
def start_requests():
"""
Reçoit le nombre de requêtes depuis le formulaire,
valide l'entrée, crée une nouvelle tâche et lance le thread d'arrière-plan.
Redirige vers la page de statut de la nouvelle tâche.
"""
num_requests_str = request.form.get('num_requests')
num_requests: int = 0
try:
num_requests = int(num_requests_str)
if num_requests <= 0:
raise ValueError("Le nombre de requêtes doit être un entier positif.")
if num_requests > 10000: # Limite de sécurité (optionnelle)
raise ValueError("Le nombre de requêtes est limité à 10000.")
except (TypeError, ValueError, AttributeError) as e:
app.logger.warning(f"Tentative de démarrage échouée - nombre invalide: '{num_requests_str}' - Erreur: {e}")
# Correction: Il faut re-passer la liste des jobs au template d'index en cas d'erreur
with jobs_lock:
sorted_jobs = dict(sorted(jobs.items(), reverse=True))
return render_template_string(HTML_INDEX, error=f"Nombre de requêtes invalide : {e}", jobs_list=sorted_jobs), 400
# Générer un ID de tâche unique (partie courte d'un UUID v4)
job_id: str = str(uuid.uuid4())[:8]
# Initialiser l'état de la tâche dans le dictionnaire partagé (section critique)
with jobs_lock:
jobs[job_id] = {
'status': STATUS_STARTING, # Statut initial avant que le thread ne démarre vraiment
'total': num_requests,
'completed_count': 0,
'error_count': 0,
'errors': []
# Optionnel: Ajouter un timestamp de création
# 'created_at': datetime.now(timezone.utc).isoformat()
}
# Créer et démarrer le thread pour exécuter la tâche en arrière-plan
thread = threading.Thread(
target=background_task,
args=(job_id, num_requests, TARGET_URL, BASE_PAYLOAD),
daemon=True # Permet au programme principal de quitter même si des threads sont encore en cours d'exécution
)
thread.start()
app.logger.info(f"Nouvelle tâche {job_id} démarrée pour {num_requests} requêtes.")
# Rediriger l'utilisateur vers la page de statut de cette nouvelle tâche
return redirect(url_for('job_status', job_id=job_id))
@app.route('/status/<job_id>', methods=['GET'])
def job_status(job_id: str):
"""Affiche la page HTML de suivi pour une tâche spécifique (identifiée par job_id)."""
with jobs_lock:
# Vérifier si la tâche existe juste pour éviter d'afficher une page pour un ID invalide
if job_id not in jobs:
app.logger.warning(f"Tentative d'accès à la page de statut pour une tâche inexistante: {job_id}")
return "Tâche non trouvée", 404
# La page HTML contient le JavaScript qui appellera l'API '/api/status/<job_id>'
# pour obtenir les données de progression dynamiquement.
# On passe les constantes de statut au template pour que le JS puisse les utiliser
return render_template_string(HTML_STATUS, job_id=job_id,
STATUS_COMPLETED=STATUS_COMPLETED,
STATUS_FAILED=STATUS_FAILED)
@app.route('/api/status/<job_id>', methods=['GET'])
def api_job_status(job_id: str):
"""
Fournit l'état actuel d'une tâche spécifique au format JSON.
Utilisé par le JavaScript de la page de statut pour les mises à jour.
"""
with jobs_lock:
# Obtenir les informations de la tâche. Utiliser .get() pour gérer le cas où l'ID n'existe pas.
job_info = jobs.get(job_id)
if job_info:
# Renvoyer une copie profonde pour éviter toute modification concurrente pendant la sérialisation JSON
# Bien que le lock aide, c'est une sécurité supplémentaire, surtout si les structures de données deviennent complexes.
return jsonify(copy.deepcopy(job_info))
else:
# Si la tâche n'est pas trouvée, renvoyer une réponse JSON avec une erreur 404.
app.logger.warning(f"API: Statut demandé pour tâche inexistante: {job_id}")
return jsonify({"error": "Tâche non trouvée", "job_id": job_id}), 404
# --- Démarrage de l'application ---
if __name__ == '__main__':
# Utiliser host='0.0.0.0' pour rendre l'application accessible
# depuis d'autres machines sur le même réseau local.
# ATTENTION : debug=True ne doit JAMAIS être utilisé dans un environnement de production.
# Il expose des vulnérabilités de sécurité et affecte les performances.
print("Démarrage du serveur Flask...")
print(f"Accéder à l'application via http://localhost:5000 ou http://<votre_ip_locale>:5000")
app.run(host='0.0.0.0', port=5000, debug=True) # Mettre debug=False pour la production