from flask import Flask, request, jsonify, send_from_directory import requests import threading import time import uuid import json import os from urllib.parse import urlparse import socket app = Flask(__name__, static_folder='.', static_url_path='') # --- Configuration --- DATA_FILE = "data.json" PING_INTERVAL_SECONDS = 60 # Backend pings every 60 seconds HISTORY_DURATION_SECONDS = 60 * 60 # Store history for 1 hour # --- Data Store --- # Structure: { "id": "uuid", "url": "string", "status": "pending/ok/error/checking", # "ip": "string", "responseTime": float_ms, "lastChecked": "iso_string_utc", # "history": [{"timestamp": float_unix_ts_seconds, "status": "ok/error"}], # "_thread": threading.Thread_object, "_stop_event": threading.Event_object } monitored_urls_store = {} # In-memory store: id -> url_data lock = threading.Lock() # To protect access to monitored_urls_store # --- Helper Functions --- def save_data_to_json(): # This function must be called with 'lock' acquired serializable_data = {} for url_id, data in monitored_urls_store.items(): s_data = data.copy() s_data.pop("_thread", None) s_data.pop("_stop_event", None) serializable_data[url_id] = s_data try: with open(DATA_FILE, 'w') as f: json.dump(serializable_data, f, indent=2) except IOError as e: print(f"Error saving data to {DATA_FILE}: {e}") def load_data_from_json(): global monitored_urls_store if os.path.exists(DATA_FILE): try: with open(DATA_FILE, 'r') as f: loaded_json_data = json.load(f) temp_store = {} for url_id_key, data_item in loaded_json_data.items(): # Ensure essential fields and use 'id' from data if present, else key data_item.setdefault('id', url_id_key) current_id = data_item['id'] data_item.setdefault('status', 'pending') data_item.setdefault('ip', data_item.get('ip', 'N/A')) data_item.setdefault('responseTime', None) data_item.setdefault('lastChecked', None) data_item.setdefault('history', data_item.get('history', [])) temp_store[current_id] = data_item with lock: # Lock before modifying global monitored_urls_store monitored_urls_store = temp_store except json.JSONDecodeError: print(f"Warning: Could not decode {DATA_FILE}. Starting with an empty list.") with lock: monitored_urls_store = {} except Exception as e: print(f"Error loading data from {DATA_FILE}: {e}. Starting fresh.") with lock: monitored_urls_store = {} else: with lock: monitored_urls_store = {} url_ids_to_start_monitoring = [] with lock: url_ids_to_start_monitoring = list(monitored_urls_store.keys()) for url_id in url_ids_to_start_monitoring: start_url_monitoring_thread(url_id) def get_host_ip_address(hostname_str): try: # Check if hostname_str is already a valid IP address socket.inet_aton(hostname_str) # Throws an OSError if not a valid IPv4 string return hostname_str except OSError: # It's not an IP, so try to resolve it as a hostname try: ip_address = socket.gethostbyname(hostname_str) return ip_address except socket.gaierror: print(f"Could not resolve hostname: {hostname_str}") return 'N/A' except Exception as e: print(f"Error processing hostname/IP for {hostname_str}: {e}") return 'N/A' def prune_url_history(url_data_entry): # Assumes 'lock' is acquired or called from the thread managing this entry cutoff_time = time.time() - HISTORY_DURATION_SECONDS url_data_entry['history'] = [ entry for entry in url_data_entry.get('history', []) if entry['timestamp'] >= cutoff_time ] def execute_url_check(url_id_to_check): url_config_snapshot = None with lock: if url_id_to_check not in monitored_urls_store: return current_url_data = monitored_urls_store[url_id_to_check] if current_url_data.get('_stop_event') and current_url_data['_stop_event'].is_set(): return print(f"Checking {current_url_data['url']} (ID: {url_id_to_check})...") current_url_data['status'] = 'checking' url_config_snapshot = current_url_data.copy() # Snapshot for use outside lock if not url_config_snapshot: return check_start_time = time.perf_counter() final_check_status = 'error' http_response_time_ms = None # Identify your bot to website owners http_headers = {'User-Agent': 'URLPinger/1.0 (HuggingFace Space Bot)'} try: # Attempt HEAD request first try: head_response = requests.head(url_config_snapshot['url'], timeout=10, allow_redirects=True, headers=http_headers) if 200 <= head_response.status_code < 400: # OK or Redirect final_check_status = 'ok' else: print(f"HEAD for {url_config_snapshot['url']} returned {head_response.status_code}. Trying GET.") except requests.exceptions.Timeout: print(f"HEAD timeout for {url_config_snapshot['url']}. Trying GET...") except requests.RequestException as e_head: print(f"HEAD failed for {url_config_snapshot['url']}: {e_head}. Trying GET...") # If HEAD was not conclusive, try GET if final_check_status != 'ok': try: get_response = requests.get(url_config_snapshot['url'], timeout=15, allow_redirects=True, headers=http_headers) if get_response.ok: # Only 2xx status codes final_check_status = 'ok' else: print(f"GET for {url_config_snapshot['url']} status: {get_response.status_code}") final_check_status = 'error' except requests.exceptions.Timeout: print(f"GET timeout for {url_config_snapshot['url']}") final_check_status = 'error' except requests.RequestException as e_get: print(f"GET failed for {url_config_snapshot['url']}: {e_get}") final_check_status = 'error' if final_check_status == 'ok': http_response_time_ms = (time.perf_counter() - check_start_time) * 1000 except Exception as e: print(f"Outer check exception for {url_config_snapshot['url']}: {e}") final_check_status = 'error' with lock: if url_id_to_check not in monitored_urls_store: return # URL might have been removed during check live_url_data = monitored_urls_store[url_id_to_check] live_url_data['status'] = final_check_status live_url_data['responseTime'] = round(http_response_time_ms) if http_response_time_ms is not None else None live_url_data['lastChecked'] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) # ISO 8601 UTC current_history_list = live_url_data.get('history', []) current_history_list.append({'timestamp': time.time(), 'status': final_check_status}) # timestamp in seconds live_url_data['history'] = current_history_list prune_url_history(live_url_data) save_data_to_json() print(f"Finished check for {live_url_data['url']}: {final_check_status}, {http_response_time_ms} ms") def pinger_thread_function(url_id_param, stop_event_param): while not stop_event_param.is_set(): execute_url_check(url_id_param) # Sleep for PING_INTERVAL_SECONDS, but check stop_event periodically for _ in range(PING_INTERVAL_SECONDS): if stop_event_param.is_set(): break time.sleep(1) print(f"PingerThread for {url_id_param} stopped.") def start_url_monitoring_thread(target_url_id): with lock: if target_url_id not in monitored_urls_store: print(f"Cannot start monitoring: URL ID {target_url_id} not found.") return url_data_entry = monitored_urls_store[target_url_id] # Stop existing thread if it's alive if "_thread" in url_data_entry and url_data_entry["_thread"].is_alive(): print(f"Monitor for URL ID {target_url_id} already running. Attempting to restart.") if "_stop_event" in url_data_entry and url_data_entry["_stop_event"]: # Check if _stop_event exists url_data_entry["_stop_event"].set() url_data_entry["_thread"].join(timeout=3) # Wait for thread to stop new_stop_event = threading.Event() # daemon=True allows main program to exit even if threads are running new_thread = threading.Thread(target=pinger_thread_function, args=(target_url_id, new_stop_event), daemon=True) url_data_entry["_thread"] = new_thread url_data_entry["_stop_event"] = new_stop_event new_thread.start() print(f"Started/Restarted monitoring for URL ID {target_url_id}: {url_data_entry['url']}") def stop_url_monitoring_thread(target_url_id): # This function must be called with 'lock' acquired if target_url_id in monitored_urls_store: url_data_entry = monitored_urls_store[target_url_id] if "_thread" in url_data_entry and url_data_entry["_thread"].is_alive(): print(f"Signaling stop for monitor thread of URL ID {target_url_id}") if "_stop_event" in url_data_entry and url_data_entry["_stop_event"]: # Check if _stop_event exists url_data_entry["_stop_event"].set() # Not joining here to keep API responsive, daemon thread will exit. url_data_entry.pop("_thread", None) url_data_entry.pop("_stop_event", None) # --- API Endpoints --- @app.route('/') def serve_index(): return send_from_directory(app.static_folder, 'index.html') @app.route('/api/urls', methods=['GET']) def get_all_urls(): with lock: # Prepare data for sending: list of url data, no thread objects response_list = [] for data_item in monitored_urls_store.values(): display_item = data_item.copy() display_item.pop("_thread", None) display_item.pop("_stop_event", None) response_list.append(display_item) return jsonify(response_list) @app.route('/api/urls', methods=['POST']) def add_new_url(): request_data = request.get_json() if not request_data or 'url' not in request_data: return jsonify({"error": "URL is required"}), 400 input_url = request_data['url'].strip() if not input_url.startswith('http://') and not input_url.startswith('https://'): input_url = 'https://' + input_url # Default to https try: parsed_input_url = urlparse(input_url) if not parsed_input_url.scheme or not parsed_input_url.netloc: raise ValueError("Invalid URL structure") url_hostname = parsed_input_url.hostname except ValueError: return jsonify({"error": "Invalid URL format"}), 400 with lock: # Check for duplicates (case-insensitive, ignoring trailing slashes) normalized_new_url = input_url.rstrip('/').lower() for existing_url_id in list(monitored_urls_store.keys()): # Iterate over keys to avoid issues if store is modified existing_url_data = monitored_urls_store.get(existing_url_id) if existing_url_data and existing_url_data['url'].rstrip('/').lower() == normalized_new_url: return jsonify({"error": "URL already monitored"}), 409 # Conflict new_url_id = str(uuid.uuid4()) resolved_ip = get_host_ip_address(url_hostname) if url_hostname else 'N/A' url_entry_to_add = { "id": new_url_id, "url": input_url, "status": 'pending', "ip": resolved_ip, "responseTime": None, "lastChecked": None, "history": [] } # Make a copy of the entry for the response *before* it's potentially modified # by start_url_monitoring_thread with non-serializable objects. response_payload = url_entry_to_add.copy() monitored_urls_store[new_url_id] = url_entry_to_add # url_entry_to_add will be modified by start_url_monitoring_thread save_data_to_json() start_url_monitoring_thread(new_url_id) # This will add _thread and _stop_event to monitored_urls_store[new_url_id] # Return the clean response_payload, which does not have _thread or _stop_event return jsonify(response_payload), 201 @app.route('/api/urls/', methods=['DELETE']) def delete_existing_url(target_url_id): with lock: if target_url_id in monitored_urls_store: stop_url_monitoring_thread(target_url_id) removed_url_entry = monitored_urls_store.pop(target_url_id) save_data_to_json() # Prepare data for response (without thread objects) response_data = removed_url_entry.copy() # Copy before potential modification if stop_url_monitoring_thread didn't pop everything response_data.pop("_thread", None) response_data.pop("_stop_event", None) print(f"Deleted URL ID {target_url_id}") return jsonify({"message": "URL removed", "url": response_data}), 200 else: return jsonify({"error": "URL not found"}), 404 # --- Main Execution / Gunicorn Entry Point --- # Load data once when the application module is initialized # This handles both `flask run` and gunicorn scenarios. if os.environ.get('WERKZEUG_RUN_MAIN') != 'true': # Avoids double load in Flask debug mode load_data_from_json() if __name__ == '__main__': # This block is for local development (e.g., `python app.py`) # `load_data_from_json()` is called above unless Werkzeug reloader is active. # If using Flask's reloader, load_data_from_json will be called twice: # once by the main process, once by the reloader's child process. # The check for WERKZEUG_RUN_MAIN ensures it only loads in the main one or the child. if os.environ.get('WERKZEUG_RUN_MAIN') == 'true': # Ensure data is loaded in the reloaded process too load_data_from_json() app.run(debug=True, host='0.0.0.0', port=7860) # When run with Gunicorn, Gunicorn imports `app` from this `app.py` file. # `load_data_from_json()` will have been called during that import (due to the WERKZEUG_RUN_MAIN check).