from fastapi import FastAPI, Request, Depends, HTTPException from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.responses import StreamingResponse, HTMLResponse, RedirectResponse from fastapi.background import BackgroundTasks from contextlib import asynccontextmanager import requests from curl_cffi import requests as cffi_requests import uuid import json import time from typing import Optional import asyncio import base64 import tempfile import os import re import threading import logging from dotenv import load_dotenv from playwright.sync_api import sync_playwright from datetime import datetime, timezone, timedelta from concurrent.futures import ThreadPoolExecutor import random # 加载环境变量 load_dotenv(override=True) # 配置日志 logging.basicConfig( level=logging.INFO, # 改为 INFO 级别 format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 修改全局数据存储 global_data = { "cookie": None, "cookies": None, "last_update": 0, "cookie_expires": 0, # 添加 cookie 过期时间 "is_refreshing": False # 添加刷新状态标志 } @asynccontextmanager async def lifespan(app: FastAPI): # 启动时获取 cookie logger.info("Starting FastAPI application, initializing cookie fetcher...") # 创建并启动线程 cookie_thread = threading.Thread(target=get_cookie_with_retry) cookie_thread.daemon = True # 设置为守护线程 cookie_thread.start() # 创建并启动自动刷新线程 refresh_thread = threading.Thread(target=auto_refresh_cookie) refresh_thread.daemon = True refresh_thread.start() logger.info("Cookie fetcher and auto-refresh threads started") yield # 关闭时清理资源 logger.info("Shutting down FastAPI application") global_data["cookie"] = None global_data["cookies"] = None global_data["last_update"] = 0 global_data["is_refreshing"] = False def get_cookie_with_retry(max_retries=3, retry_delay=5): """带重试机制的获取 cookie 函数""" retries = 0 while retries < max_retries: logger.info(f"Cookie fetching attempt {retries + 1}/{max_retries}") cookie = get_cookie() if cookie: logger.info("Successfully retrieved cookie") return cookie retries += 1 if retries < max_retries: logger.info(f"Retrying cookie fetch in {retry_delay} seconds...") time.sleep(retry_delay) logger.error(f"Failed to fetch cookie after {max_retries} attempts") return None app = FastAPI(lifespan=lifespan) security = HTTPBearer() # OpenAI API Key 配置,可以通过环境变量覆盖 OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", None) logger.info(f"OPENAI_API_KEY is set: {OPENAI_API_KEY is not None}") # logger.info(f"OPENAI_API_KEY value: {OPENAI_API_KEY}") def get_random_browser_fingerprint(): """生成随机的浏览器指纹""" # 随机选择浏览器版本 chrome_versions = ["120", "121", "122", "123", "124", "125"] edge_versions = ["120", "121", "122", "123", "124", "125"] selected_version = random.choice(chrome_versions) edge_version = random.choice(edge_versions) # 随机选择操作系统 os_versions = [ "Windows NT 10.0; Win64; x64", "Macintosh; Intel Mac OS X 10_15_7", "Macintosh; Intel Mac OS X 11_0_1", "Macintosh; Intel Mac OS X 12_0_1" ] selected_os = random.choice(os_versions) # 随机选择语言偏好 languages = [ "en-US,en;q=0.9", "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7", "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "en-GB,en;q=0.9,en-US;q=0.8" ] selected_language = random.choice(languages) # 随机选择视口大小 viewport_sizes = [ (1920, 1080), (1366, 768), (1440, 900), (1536, 864), (1680, 1050) ] selected_viewport = random.choice(viewport_sizes) # 构建用户代理字符串 user_agent = f"Mozilla/5.0 ({selected_os}) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{selected_version}.0.0.0 Safari/537.36 Edg/{edge_version}.0.0.0" # 构建请求头 headers = { "accept": "*/*", "accept-language": selected_language, "content-type": "application/json", "origin": "https://chat.akash.network", "referer": "https://chat.akash.network/", "sec-ch-ua": f'"Microsoft Edge";v="{edge_version}", "Not-A.Brand";v="8", "Chromium";v="{selected_version}"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"macOS"', "sec-fetch-dest": "empty", "sec-fetch-mode": "cors", "sec-fetch-site": "same-origin", "user-agent": user_agent } return { "headers": headers, "viewport": selected_viewport, "user_agent": user_agent } def get_cookie(): """获取 cookie 的函数""" browser = None context = None page = None try: logger.info("Starting cookie retrieval process...") # 获取随机浏览器指纹 fingerprint = get_random_browser_fingerprint() logger.info(f"Using browser fingerprint: {fingerprint['user_agent']}") with sync_playwright() as p: try: # 启动浏览器 logger.info("Launching browser...") browser = p.chromium.launch( headless=True, args=[ '--no-sandbox', '--disable-dev-shm-usage', '--disable-gpu', '--disable-software-rasterizer', '--disable-extensions', '--disable-setuid-sandbox', '--no-first-run', '--no-zygote', '--single-process', f'--window-size={fingerprint["viewport"][0]},{fingerprint["viewport"][1]}', '--disable-blink-features=AutomationControlled', '--disable-features=IsolateOrigins,site-per-process' ] ) logger.info("Browser launched successfully") # 创建上下文,使用随机指纹 logger.info("Creating browser context...") context = browser.new_context( viewport={'width': fingerprint["viewport"][0], 'height': fingerprint["viewport"][1]}, user_agent=fingerprint["user_agent"], locale='en-US', timezone_id='America/New_York', permissions=['geolocation'], extra_http_headers=fingerprint["headers"] ) # 添加脚本以覆盖 navigator.webdriver context.add_init_script(""" Object.defineProperty(navigator, 'webdriver', { get: () => false, }); // 更多指纹伪装 Object.defineProperty(navigator, 'plugins', { get: () => [1, 2, 3, 4, 5], }); """) logger.info("Browser context created successfully") # 创建页面 logger.info("Creating new page...") page = context.new_page() logger.info("Page created successfully") # 设置页面超时 page.set_default_timeout(60000) # 访问目标网站,添加重试机制 max_retries = 3 retry_delay = 5 for attempt in range(max_retries): try: logger.info(f"Navigating to target website (attempt {attempt + 1}/{max_retries})...") page.goto("https://chat.akash.network/", timeout=50000) break except Exception as e: if attempt == max_retries - 1: raise logger.warning(f"Navigation attempt {attempt + 1} failed: {e}") time.sleep(retry_delay) # 等待页面加载 logger.info("Waiting for page load...") try: # 首先等待 DOM 加载完成 page.wait_for_load_state("domcontentloaded", timeout=30000) logger.info("DOM content loaded") # 等待一段时间,让 Cloudflare 检查完成 logger.info("Waiting for Cloudflare check...") time.sleep(5) # 尝试点击页面,模拟用户行为 try: page.mouse.move(100, 100) page.mouse.click(100, 100) logger.info("Simulated user interaction") # 随机滚动页面 page.mouse.wheel(0, 100) time.sleep(0.5) page.mouse.wheel(0, -50) logger.info("Simulated scrolling") except Exception as e: logger.warning(f"Failed to simulate user interaction: {e}") # 再次等待一段时间 time.sleep(5) except Exception as e: logger.warning(f"Timeout waiting for load state: {e}") # 等待更长时间确保页面完全加载 try: page.wait_for_load_state("networkidle", timeout=10000) logger.info("Network idle reached") except Exception as e: logger.warning(f"Timeout waiting for network idle: {e}") # 获取 cookies logger.info("Getting cookies...") cookies = context.cookies() if not cookies: logger.error("No cookies found") return None # 记录所有 cookie 名称以进行调试 cookie_names = [cookie['name'] for cookie in cookies] logger.info(f"Retrieved cookies: {cookie_names}") # 检查是否有 cf_clearance cookie cf_cookie = next((cookie for cookie in cookies if cookie['name'] == 'cf_clearance'), None) if not cf_cookie: logger.error("cf_clearance cookie not found") return None # 检查是否有 session_token cookie session_cookie = next((cookie for cookie in cookies if cookie['name'] == 'session_token'), None) if not session_cookie: logger.error("session_token cookie not found") # 继续执行,因为某些情况下可能不需要 session_token # 构建 cookie 字符串 cookie_str = '; '.join([f"{cookie['name']}={cookie['value']}" for cookie in cookies]) logger.info(f"Cookie string length: {len(cookie_str)}") global_data["cookie"] = cookie_str global_data["cookies"] = cookies # 保存完整的 cookies 列表 global_data["last_update"] = time.time() # 设置 cookie 过期时间 if session_cookie and 'expires' in session_cookie and session_cookie['expires'] > 0: global_data["cookie_expires"] = session_cookie['expires'] logger.info(f"Session token expires at: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(session_cookie['expires']))}") else: # 如果没有明确的过期时间,默认设置为30分钟后过期 global_data["cookie_expires"] = time.time() + 1800 # 30 分钟 logger.info("No explicit expiration in session_token cookie, setting default 30 minute expiration") logger.info("Successfully retrieved cookies") return cookie_str except Exception as e: logger.error(f"Error in browser operations: {e}") logger.error(f"Error type: {type(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return None finally: # 确保资源被正确关闭 try: if page: logger.info("Closing page...") try: page.close() logger.info("Page closed successfully") except Exception as e: logger.error(f"Error closing page: {e}") except Exception as e: logger.error(f"Error in page cleanup: {e}") try: if context: logger.info("Closing context...") try: context.close() logger.info("Context closed successfully") except Exception as e: logger.error(f"Error closing context: {e}") except Exception as e: logger.error(f"Error in context cleanup: {e}") try: if browser: logger.info("Closing browser...") try: browser.close() logger.info("Browser closed successfully") except Exception as e: logger.error(f"Error closing browser: {e}") except Exception as e: logger.error(f"Error in browser cleanup: {e}") # 确保所有资源都被清理 page = None context = None browser = None # 主动触发垃圾回收 import gc gc.collect() logger.info("Resource cleanup completed") except Exception as e: logger.error(f"Error fetching cookie: {str(e)}") logger.error(f"Error type: {type(e)}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") # 最后再次确保资源被清理 if page: try: page.close() except: pass if context: try: context.close() except: pass if browser: try: browser.close() except: pass # 主动触发垃圾回收 import gc gc.collect() return None # 添加刷新 cookie 的函数 async def refresh_cookie(): """刷新 cookie 的函数,用于401错误触发""" logger.info("Refreshing cookie due to 401 error") # 如果已经在刷新中,等待一段时间 if global_data["is_refreshing"]: logger.info("Cookie refresh already in progress, waiting...") # 等待最多10秒 for _ in range(10): await asyncio.sleep(1) if not global_data["is_refreshing"]: break # 如果仍然在刷新中,强制刷新 if global_data["is_refreshing"]: logger.info("Forcing cookie refresh due to 401 error") global_data["is_refreshing"] = False try: global_data["is_refreshing"] = True # 标记 cookie 为过期 global_data["cookie_expires"] = 0 # 调用同步函数进行cookie获取,使用线程池不阻塞事件循环 executor = ThreadPoolExecutor(max_workers=1) loop = asyncio.get_event_loop() new_cookie = await loop.run_in_executor(executor, get_cookie_with_retry) return new_cookie finally: global_data["is_refreshing"] = False async def background_refresh_cookie(): """后台刷新 cookie 的函数,不影响接口调用""" if global_data["is_refreshing"]: logger.info("Cookie refresh already in progress, skipping") return try: global_data["is_refreshing"] = True logger.info("Starting background cookie refresh") # 使用线程池执行同步函数 executor = ThreadPoolExecutor(max_workers=1) loop = asyncio.get_event_loop() new_cookie = await loop.run_in_executor(executor, get_cookie) if new_cookie: logger.info("Background cookie refresh successful") # 更新 cookie 和过期时间 global_data["cookie"] = new_cookie global_data["last_update"] = time.time() # 查找 session_token cookie 的过期时间 session_cookie = next((cookie for cookie in global_data["cookies"] if cookie['name'] == 'session_token'), None) if session_cookie and 'expires' in session_cookie and session_cookie['expires'] > 0: global_data["cookie_expires"] = session_cookie['expires'] logger.info(f"Session token expires at: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(session_cookie['expires']))}") else: # 如果没有明确的过期时间,默认设置为30分钟后过期 global_data["cookie_expires"] = time.time() + 1800 logger.info("No explicit expiration in session_token cookie, setting default 30 minute expiration") else: logger.error("Background cookie refresh failed") except Exception as e: logger.error(f"Error in background cookie refresh: {e}") finally: global_data["is_refreshing"] = False async def check_and_update_cookie(): """检查并更新 cookie""" try: current_time = time.time() # 只在 cookie 不存在或已过期时刷新 if not global_data["cookie"] or current_time >= global_data["cookie_expires"]: logger.info("Cookie expired or not available, starting refresh") try: # 使用线程池执行同步的 get_cookie 函数 loop = asyncio.get_event_loop() with ThreadPoolExecutor() as executor: new_cookie = await loop.run_in_executor(executor, get_cookie) if new_cookie: logger.info("Cookie refresh successful") else: logger.error("Cookie refresh failed") except Exception as e: logger.error(f"Error during cookie refresh: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") else: logger.info("Using existing cookie") except Exception as e: logger.error(f"Error in check_and_update_cookie: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") async def get_api_key(credentials: HTTPAuthorizationCredentials = Depends(security)): token = credentials.credentials # logger.info(f"Received token: {token}") # 如果设置了 OPENAI_API_KEY,则需要验证 if OPENAI_API_KEY is not None: # 去掉 Bearer 前缀后再比较 clean_token = token.replace("Bearer ", "") if token.startswith("Bearer ") else token # logger.info(f"Clean token: {clean_token}") if clean_token != OPENAI_API_KEY: logger.error(f"Token mismatch. Expected: {OPENAI_API_KEY}, Got: {clean_token}") raise HTTPException( status_code=401, detail="Invalid API key" ) logger.info("API key validation passed") return True async def validate_cookie(background_tasks: BackgroundTasks): # 检查并更新 cookie(如果需要) await check_and_update_cookie() # 等待 cookie 初始化完成 max_wait = 30 # 最大等待时间(秒) start_time = time.time() while not global_data["cookie"] and time.time() - start_time < max_wait: await asyncio.sleep(1) logger.info("Waiting for cookie initialization...") # 检查是否有有效的 cookie if not global_data["cookie"]: logger.error("Cookie not available after waiting") raise HTTPException( status_code=503, detail="Service temporarily unavailable - Cookie not available" ) logger.info("Cookie validation passed") return global_data["cookie"] async def check_image_status(session: requests.Session, job_id: str, headers: dict) -> Optional[str]: """检查图片生成状态并获取生成的图片""" max_retries = 30 for attempt in range(max_retries): try: print(f"\nAttempt {attempt + 1}/{max_retries} for job {job_id}") response = session.get( f'https://chat.akash.network/api/image-status?ids={job_id}', headers=headers ) print(f"Status response code: {response.status_code}") status_data = response.json() if status_data and isinstance(status_data, list) and len(status_data) > 0: job_info = status_data[0] status = job_info.get('status') print(f"Job status: {status}") # 只有当状态为 completed 时才处理结果 if status == "completed": result = job_info.get("result") if result and not result.startswith("Failed"): print("Got valid result, attempting upload...") image_url = await upload_to_xinyew(result, job_id) if image_url: print(f"Successfully uploaded image: {image_url}") return image_url print("Image upload failed") return None print("Invalid result received") return None elif status == "failed": print(f"Job {job_id} failed") return None # 如果状态是其他(如 pending),继续等待 await asyncio.sleep(1) continue except Exception as e: print(f"Error checking status: {e}") return None print(f"Timeout waiting for job {job_id}") return None @app.get("/", response_class=HTMLResponse) async def health_check(): """健康检查端点,返回服务状态""" # 检查 cookie 状态 cookie_status = "ok" if global_data["cookie"] else "error" status_color = "green" if cookie_status == "ok" else "red" status_text = "正常" if cookie_status == "ok" else "异常" # 获取当前时间(北京时间) current_time = datetime.now(timezone(timedelta(hours=8))) # 格式化 cookie 过期时间(北京时间) if global_data["cookie_expires"]: expires_time = datetime.fromtimestamp(global_data["cookie_expires"], timezone(timedelta(hours=8))) expires_str = expires_time.strftime("%Y-%m-%d %H:%M:%S") # 计算剩余时间 time_left = global_data["cookie_expires"] - time.time() hours_left = int(time_left // 3600) minutes_left = int((time_left % 3600) // 60) if hours_left > 0: time_left_str = f"{hours_left}小时{minutes_left}分钟" else: time_left_str = f"{minutes_left}分钟" else: expires_str = "未知" time_left_str = "未知" # 格式化最后更新时间(北京时间) if global_data["last_update"]: last_update_time = datetime.fromtimestamp(global_data["last_update"], timezone(timedelta(hours=8))) last_update_str = last_update_time.strftime("%Y-%m-%d %H:%M:%S") # 计算多久前更新 time_since_update = time.time() - global_data["last_update"] if time_since_update < 60: update_ago = f"{int(time_since_update)}秒前" elif time_since_update < 3600: update_ago = f"{int(time_since_update // 60)}分钟前" else: update_ago = f"{int(time_since_update // 3600)}小时前" else: last_update_str = "从未更新" update_ago = "未知" status = { "status": "ok", "cookie_status": { "status": cookie_status, "status_text": status_text, "status_color": status_color, "expires": expires_str, "time_left": time_left_str, "available": bool(global_data["cookie"]), "last_update": last_update_str, "update_ago": update_ago } } # 返回 HTML 响应 return HTMLResponse(content=f"""