# -*- coding: utf-8 -*- # Bonne pratique d'ajouter l'encodage # Importations nécessaires from flask import Flask, request, render_template_string, jsonify, redirect, url_for import requests import threading import uuid # Pour générer des identifiants uniques pour chaque tâche import time import copy # Pour copier le payload pour chaque requête from typing import Dict, Any, List, Tuple, Optional # Pour les annotations de type # --- Configuration --- TARGET_URL: str = "https://hook.us1.make.com/zal5qn0ggbewmvtsbo2uenfno8tz3n56" BASE_PAYLOAD: Dict[str, Any] = { "name": "Testeur Auto ", "email": "yoo+auto@example.com", # Ajout de '+auto' pour distinguer "company": "aragon Inc.", "message": "Ceci est un test automatisé via Flask.", "date": "2023-10-27T10:30:00Z", # Tu pourrais rendre cette date dynamique si besoin "source": "http://simulateur-bsbs-flask.com" } # Constantes pour les statuts (évite les fautes de frappe) STATUS_STARTING: str = 'starting' STATUS_RUNNING: str = 'running' STATUS_COMPLETED: str = 'completed' STATUS_FAILED: str = 'failed' # Optional: STATUS_COMPLETED_WITH_ERRORS: str = 'completed_with_errors' # Structure pour stocker l'état des tâches (jobs) en mémoire # Format: { 'job_id': {'status': str, 'total': int, 'completed_count': int, 'error_count': int, 'errors': List[Dict[str, Any]] } } jobs: Dict[str, Dict[str, Any]] = {} jobs_lock = threading.Lock() # Pour éviter les problèmes d'accès concurrents au dict jobs app = Flask(__name__) # Correction de l'initialisation de Flask # --- Templates HTML (inchangés, car ils semblent corrects) --- # Page d'accueil pour démarrer les requêtes HTML_INDEX = """ Lanceur de Requêtes

Envoyer des Requêtes POST en Masse

{% if error %}

{{ error }}

{% endif %}

Tâches récentes :

""" # Page pour suivre la progression d'une tâche spécifique HTML_STATUS = """ Statut Tâche {{ job_id }}

Statut de la Tâche : {{ job_id }}

Chargement des informations...
0%

Requêtes traitées : 0 / ?

Erreurs rencontrées : 0

Retour à l'accueil

""" # --- Fonctions Logiques --- def send_single_request(target_url: str, payload: Dict[str, Any], job_id: str, request_index: int) -> Tuple[bool, Optional[str]]: """ Envoie UNE requête POST unique à la cible. Args: target_url: L'URL à laquelle envoyer la requête. payload: Le dictionnaire de base du payload. job_id: L'ID de la tâche parente (pour logs/debug). request_index: L'index de cette requête dans la tâche (0-based). Returns: Un tuple (succès: bool, message_erreur: Optional[str]). """ # Crée une copie profonde pour éviter de modifier l'original entre les threads # et pour ajouter des informations spécifiques à cette requête. current_payload = copy.deepcopy(payload) # Personnalisation du message pour identifier la requête spécifique current_payload['message'] += f" (Requête {request_index + 1} / Job {job_id})" # Ajout d'un UUID unique à chaque requête pour un suivi fin si nécessaire côté serveur cible current_payload['request_uuid'] = str(uuid.uuid4()) # Optionnel: Mettre à jour la date/heure au moment de l'envoi # from datetime import datetime, timezone # current_payload['date'] = datetime.now(timezone.utc).isoformat() try: response = requests.post(target_url, json=current_payload, timeout=30) # Timeout de 30 secondes response.raise_for_status() # Lève une exception pour les codes d'erreur HTTP (4xx, 5xx) # app.logger.debug(f"Job {job_id} - Requête {request_index + 1}: Succès (Status {response.status_code})") return True, None # Succès except requests.exceptions.Timeout: error_msg = f"Timeout après 30s pour Req {request_index + 1}" app.logger.warning(f"Job {job_id}: {error_msg}") return False, error_msg except requests.exceptions.HTTPError as e: error_msg = f"Erreur HTTP {e.response.status_code} pour Req {request_index + 1}: {e.response.reason}" app.logger.warning(f"Job {job_id}: {error_msg} - Réponse: {e.response.text[:200]}") # Log début de réponse return False, error_msg except requests.exceptions.RequestException as e: # Capture les autres erreurs (DNS, connexion refusée, etc.) error_msg = f"Erreur Réseau/Requête pour Req {request_index + 1}: {str(e)}" app.logger.error(f"Job {job_id}: {error_msg}") return False, error_msg # Échec avec message d'erreur def background_task(job_id: str, num_requests: int, target_url: str, base_payload: Dict[str, Any]): """ Fonction exécutée dans un thread séparé pour envoyer toutes les requêtes d'une tâche. Met à jour l'état de la tâche dans le dictionnaire partagé `jobs`. """ app.logger.info(f"Tâche {job_id}: Démarrage de {num_requests} requêtes vers {target_url}") completed_count: int = 0 error_count: int = 0 error_messages: List[Dict[str, Any]] = [] # Note: L'initialisation de la tâche est maintenant faite dans /start avant le lancement du thread. # La partie ci-dessous est redondante mais laissée commentée pour info. # # Vérifier/Initialiser le statut (au cas où, bien que fait dans /start) # with jobs_lock: # if job_id not in jobs: # app.logger.warning(f"Tâche {job_id} non trouvée dans jobs au démarrage du thread. Initialisation.") # jobs[job_id] = { # 'status': STATUS_RUNNING, # On la met directement en running # 'total': num_requests, # 'completed_count': 0, # 'error_count': 0, # 'errors': [] # } # else: # # Si elle existe déjà (normal), s'assurer qu'elle est en running # jobs[job_id]['status'] = STATUS_RUNNING # Mettre à jour le statut en 'running' une fois démarré with jobs_lock: if job_id in jobs: jobs[job_id]['status'] = STATUS_RUNNING else: # Cas très improbable si /start n'a pas fini avant que le thread ne check app.logger.error(f"Tâche {job_id} non trouvée au moment de passer en status RUNNING.") return # Arrêter le thread si la tâche n'existe pas for i in range(num_requests): success, error_msg = send_single_request(target_url, base_payload, job_id, i) # Mettre à jour la progression dans le dictionnaire partagé (section critique) with jobs_lock: # Vérifier si la tâche existe toujours (elle pourrait être supprimée?) if job_id not in jobs: app.logger.warning(f"Tâche {job_id} disparue pendant l'exécution. Arrêt.") break # Sortir de la boucle si la tâche n'est plus suivie # Incrémenter le compteur des requêtes traitées jobs[job_id]['completed_count'] += 1 completed_count = jobs[job_id]['completed_count'] # Mettre à jour la variable locale aussi if not success: jobs[job_id]['error_count'] += 1 error_count = jobs[job_id]['error_count'] # Mettre à jour la variable locale # Ajouter les détails de l'erreur (index basé sur 1 pour l'affichage) error_detail = {'index': i + 1, 'error': error_msg or "Erreur inconnue"} jobs[job_id]['errors'].append(error_detail) # Gardons seulement les X dernières erreurs pour éviter de saturer la mémoire max_errors_to_keep = 100 jobs[job_id]['errors'] = jobs[job_id]['errors'][-max_errors_to_keep:] # Petite pause optionnelle pour ne pas surcharger la cible ou le réseau local # time.sleep(0.05) # 50ms pause # Marquer la tâche comme terminée une fois la boucle finie with jobs_lock: if job_id in jobs: # Déterminer le statut final basé sur les erreurs # Si toutes les requêtes ont échoué -> FAILED # Sinon -> COMPLETED (le nombre d'erreurs indique si c'était parfait ou non) final_status = STATUS_FAILED if error_count == num_requests and num_requests > 0 else STATUS_COMPLETED jobs[job_id]['status'] = final_status app.logger.info(f"Tâche {job_id}: Terminé. Statut final: {final_status}. {completed_count - error_count}/{num_requests} succès, {error_count} erreurs.") else: app.logger.warning(f"Tâche {job_id} non trouvée à la fin de l'exécution pour marquer comme terminée.") # --- Routes Flask --- @app.route('/', methods=['GET']) def index(): """Affiche la page d'accueil avec le formulaire et la liste des tâches.""" with jobs_lock: # Trie les tâches par clé (ID), en ordre inverse (plus récent d'abord si UUID approxime le temps) # Pour un tri chronologique fiable, il faudrait ajouter un timestamp lors de la création de la tâche. sorted_jobs = dict(sorted(jobs.items(), reverse=True)) return render_template_string(HTML_INDEX, jobs_list=sorted_jobs) @app.route('/start', methods=['POST']) def start_requests(): """ Reçoit le nombre de requêtes depuis le formulaire, valide l'entrée, crée une nouvelle tâche et lance le thread d'arrière-plan. Redirige vers la page de statut de la nouvelle tâche. """ num_requests_str = request.form.get('num_requests') num_requests: int = 0 try: num_requests = int(num_requests_str) if num_requests <= 0: raise ValueError("Le nombre de requêtes doit être un entier positif.") if num_requests > 10000: # Limite de sécurité (optionnelle) raise ValueError("Le nombre de requêtes est limité à 10000.") except (TypeError, ValueError, AttributeError) as e: app.logger.warning(f"Tentative de démarrage échouée - nombre invalide: '{num_requests_str}' - Erreur: {e}") # Correction: Il faut re-passer la liste des jobs au template d'index en cas d'erreur with jobs_lock: sorted_jobs = dict(sorted(jobs.items(), reverse=True)) return render_template_string(HTML_INDEX, error=f"Nombre de requêtes invalide : {e}", jobs_list=sorted_jobs), 400 # Générer un ID de tâche unique (partie courte d'un UUID v4) job_id: str = str(uuid.uuid4())[:8] # Initialiser l'état de la tâche dans le dictionnaire partagé (section critique) with jobs_lock: jobs[job_id] = { 'status': STATUS_STARTING, # Statut initial avant que le thread ne démarre vraiment 'total': num_requests, 'completed_count': 0, 'error_count': 0, 'errors': [] # Optionnel: Ajouter un timestamp de création # 'created_at': datetime.now(timezone.utc).isoformat() } # Créer et démarrer le thread pour exécuter la tâche en arrière-plan thread = threading.Thread( target=background_task, args=(job_id, num_requests, TARGET_URL, BASE_PAYLOAD), daemon=True # Permet au programme principal de quitter même si des threads sont encore en cours d'exécution ) thread.start() app.logger.info(f"Nouvelle tâche {job_id} démarrée pour {num_requests} requêtes.") # Rediriger l'utilisateur vers la page de statut de cette nouvelle tâche return redirect(url_for('job_status', job_id=job_id)) @app.route('/status/', methods=['GET']) def job_status(job_id: str): """Affiche la page HTML de suivi pour une tâche spécifique (identifiée par job_id).""" with jobs_lock: # Vérifier si la tâche existe juste pour éviter d'afficher une page pour un ID invalide if job_id not in jobs: app.logger.warning(f"Tentative d'accès à la page de statut pour une tâche inexistante: {job_id}") return "Tâche non trouvée", 404 # La page HTML contient le JavaScript qui appellera l'API '/api/status/' # pour obtenir les données de progression dynamiquement. # On passe les constantes de statut au template pour que le JS puisse les utiliser return render_template_string(HTML_STATUS, job_id=job_id, STATUS_COMPLETED=STATUS_COMPLETED, STATUS_FAILED=STATUS_FAILED) @app.route('/api/status/', methods=['GET']) def api_job_status(job_id: str): """ Fournit l'état actuel d'une tâche spécifique au format JSON. Utilisé par le JavaScript de la page de statut pour les mises à jour. """ with jobs_lock: # Obtenir les informations de la tâche. Utiliser .get() pour gérer le cas où l'ID n'existe pas. job_info = jobs.get(job_id) if job_info: # Renvoyer une copie profonde pour éviter toute modification concurrente pendant la sérialisation JSON # Bien que le lock aide, c'est une sécurité supplémentaire, surtout si les structures de données deviennent complexes. return jsonify(copy.deepcopy(job_info)) else: # Si la tâche n'est pas trouvée, renvoyer une réponse JSON avec une erreur 404. app.logger.warning(f"API: Statut demandé pour tâche inexistante: {job_id}") return jsonify({"error": "Tâche non trouvée", "job_id": job_id}), 404 # --- Démarrage de l'application --- if __name__ == '__main__': # Utiliser host='0.0.0.0' pour rendre l'application accessible # depuis d'autres machines sur le même réseau local. # ATTENTION : debug=True ne doit JAMAIS être utilisé dans un environnement de production. # Il expose des vulnérabilités de sécurité et affecte les performances. print("Démarrage du serveur Flask...") print(f"Accéder à l'application via http://localhost:5000 ou http://:5000") app.run(host='0.0.0.0', port=5000, debug=True) # Mettre debug=False pour la production