import time import json import re import os from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager from webdriver_manager.core.os_manager import ChromeType def extract_m3u8_urls(url, wait_time=10, headers=None): """ Captura URLs m3u8 de una página web y las devuelve directamente. Args: url (str): URL de la página a analizar wait_time (int): Tiempo de espera en segundos headers (dict): Headers personalizados Returns: list: Lista de URLs m3u8 encontradas """ # Configurar opciones de Chrome chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--disable-gpu") chrome_options.add_argument("--window-size=1920x1080") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") # Agregar headers personalizados if headers: for key, value in headers.items(): if key.lower() == 'user-agent': chrome_options.add_argument(f"--user-agent={value}") else: chrome_options.add_argument(f"--header={key}: {value}") # Habilitar registro de red chrome_options.set_capability("goog:loggingPrefs", {"performance": "ALL"}) # Crear un directorio de caché en el directorio del usuario cache_dir = os.path.join(os.path.expanduser("~"), ".wdm_cache") os.makedirs(cache_dir, exist_ok=True) # Iniciar navegador usando ChromeDriverManager con directorio de caché personalizado service = Service(ChromeDriverManager(path=cache_dir).install()) driver = webdriver.Chrome(service=service, options=chrome_options) try: # Navegar a la URL driver.get(url) # Aplicar headers a futuras peticiones XHR y fetch if headers: header_script = """ // Headers para XHR (function(open) { XMLHttpRequest.prototype.open = function(method, url) { var xhr = this; var args = arguments; open.apply(xhr, args); """ for key, value in headers.items(): if key.lower() != 'user-agent': header_script += f'xhr.setRequestHeader("{key}", "{value}");' header_script += """ }; })(XMLHttpRequest.prototype.open); // Headers para fetch (function(fetch) { window.fetch = function(url, options) { options = options || {}; options.headers = options.headers || {}; """ for key, value in headers.items(): header_script += f'options.headers["{key}"] = "{value}";' header_script += """ return fetch.call(this, url, options); }; })(window.fetch); """ driver.execute_script(header_script) # Esperar a que la página cargue completamente time.sleep(wait_time) # Colección para todas las URLs m3u8 m3u8_urls = set() # 1. Buscar en peticiones de red logs = driver.get_log("performance") for log in logs: try: log_entry = json.loads(log["message"])["message"] # Filtrar peticiones de red if "Network.responseReceived" in log_entry["method"] or "Network.requestWillBeSent" in log_entry["method"]: if "request" in log_entry["params"] and "url" in log_entry["params"]["request"]: url = log_entry["params"]["request"]["url"] if ".m3u8" in url: m3u8_urls.add(url) # También buscar en respuestas elif "response" in log_entry["params"] and "url" in log_entry["params"]["response"]: url = log_entry["params"]["response"]["url"] if ".m3u8" in url: m3u8_urls.add(url) except: continue # 2. Buscar en el contenido de la página y scripts m3u8_pattern = re.compile(r'https?://[^"\'\s]+\.m3u8[^"\'\s]*') # En el HTML for match in m3u8_pattern.finditer(driver.page_source): m3u8_urls.add(match.group(0)) # En scripts scripts = driver.find_elements("tag name", "script") for script in scripts: content = script.get_attribute("innerHTML") for match in m3u8_pattern.finditer(content): m3u8_urls.add(match.group(0)) # 3. Buscar en variables JavaScript js_variables = driver.execute_script(""" var allVars = {}; for (var key in window) { try { if (typeof window[key] === 'string' && window[key].includes('.m3u8')) { allVars[key] = window[key]; } } catch(e) {} } return allVars; """) for key, value in js_variables.items(): for match in m3u8_pattern.finditer(value): m3u8_urls.add(match.group(0)) return list(m3u8_urls) finally: # Cerrar navegador driver.quit()