import json import os import time import uuid import argparse # 导入参数解析库 import random import string import logging import requests from flask import Flask, render_template, request, jsonify, send_from_directory from flask_cors import CORS # 导入 pikpak.py 中的函数 from utils.pk_email import connect_imap from utils.pikpak import ( sign_encrypt, captcha_image_parse, ramdom_version, random_rtc_token, PikPak, save_account_info, test_proxy, ) # 导入 email_client from utils.email_client import EmailClient # 重试参数 max_retries = 3 retry_delay = 1.0 # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 定义一个retry函数,用于重试指定的函数 def retry_function(func, *args, max_retries=3, delay=1, **kwargs): """ 对指定函数进行重试 Args: func: 要重试的函数 *args: 传递给函数的位置参数 max_retries: 最大重试次数,默认为3 delay: 每次重试之间的延迟(秒),默认为1 **kwargs: 传递给函数的关键字参数 Returns: 函数的返回值,如果所有重试都失败则返回None """ retries = 0 result = None while retries < max_retries: if retries > 0: logger.info(f"第 {retries} 次重试函数 {func.__name__}...") result = func(*args, **kwargs) # 如果函数返回非None结果,视为成功 if result is not None: if retries > 0: logger.info(f"在第 {retries} 次重试后成功") return result # 如果达到最大重试次数,返回最后一次结果 if retries >= max_retries - 1: logger.warning(f"函数 {func.__name__} 在 {max_retries} 次尝试后失败") break # 等待指定的延迟时间 time.sleep(delay) retries += 1 return result # 解析命令行参数 parser = argparse.ArgumentParser(description="PikPak 自动邀请注册系统") args = parser.parse_args() app = Flask(__name__, static_url_path='/assets') # cors CORS(app, resources={r"/*": {"origins": "*"}}) app.secret_key = os.urandom(24) # 全局字典用于存储用户处理过程中的数据,以 email 为键 user_process_data = {} @app.route("/api/health") def health_check(): in_huggingface = ( os.environ.get("SPACE_ID") is not None or os.environ.get("SYSTEM") == "spaces" ) return jsonify( { "status": "OK", } ) @app.route("/api/initialize", methods=["POST"]) def initialize(): # 获取用户表单输入 use_proxy = request.form.get("use_proxy") == "true" proxy_url = request.form.get("proxy_url", "") invite_code = request.form.get("invite_code", "") email = request.form.get("email", "") # 初始化参数 current_version = ramdom_version() version = current_version["v"] algorithms = current_version["algorithms"] client_id = "YNxT9w7GMdWvEOKa" client_secret = "dbw2OtmVEeuUvIptb1Coyg" package_name = "com.pikcloud.pikpak" device_id = str(uuid.uuid4()).replace("-", "") rtc_token = random_rtc_token() # 将这些参数存储到会话中以便后续使用 # session["use_proxy"] = use_proxy # session["proxy_url"] = proxy_url # session["invite_code"] = invite_code # session["email"] = email # session["version"] = version # session["algorithms"] = algorithms # session["client_id"] = client_id # session["client_secret"] = client_secret # session["package_name"] = package_name # session["device_id"] = device_id # session["rtc_token"] = rtc_token # 如果启用代理,则测试代理 proxy_status = None if use_proxy: proxy_status = test_proxy(proxy_url) # 创建PikPak实例 pikpak = PikPak( invite_code, client_id, device_id, version, algorithms, email, rtc_token, client_secret, package_name, use_proxy=use_proxy, proxy_http=proxy_url, proxy_https=proxy_url, ) # 初始化验证码 init_result = pikpak.init("POST:/v1/auth/verification") if ( not init_result or not isinstance(init_result, dict) or "captcha_token" not in init_result ): return jsonify( {"status": "error", "message": "初始化失败,请检查网络连接或代理设置"} ) # 将用户数据存储在全局字典中 user_data = { "use_proxy": use_proxy, "proxy_url": proxy_url, "invite_code": invite_code, "email": email, "version": version, "algorithms": algorithms, "client_id": client_id, "client_secret": client_secret, "package_name": package_name, "device_id": device_id, "rtc_token": rtc_token, "captcha_token": pikpak.captcha_token, # Store captcha_token here } user_process_data[email] = user_data # 将验证码令牌保存到会话中 - REMOVED # session["captcha_token"] = pikpak.captcha_token return jsonify( { "status": "success", "message": "初始化成功,请进行滑块验证", "email": email, # Return email to client "proxy_status": proxy_status, "version": version, "device_id": device_id, "rtc_token": rtc_token, } ) @app.route("/api/verify_captcha", methods=["POST"]) def verify_captcha(): # 尝试从表单或JSON获取email email = request.form.get('email') if not email and request.is_json: data = request.get_json() email = data.get('email') if not email: return jsonify({"status": "error", "message": "请求中未提供Email"}) # 从全局字典获取用户数据 user_data = user_process_data.get(email) if not user_data: return jsonify({"status": "error", "message": "会话数据不存在或已过期,请重新初始化"}) # 从 user_data 中获取存储的数据 device_id = user_data.get("device_id") # email = user_data.get("email") # Email is now the key, already have it invite_code = user_data.get("invite_code") client_id = user_data.get("client_id") version = user_data.get("version") algorithms = user_data.get("algorithms") rtc_token = user_data.get("rtc_token") client_secret = user_data.get("client_secret") package_name = user_data.get("package_name") use_proxy = user_data.get("use_proxy") proxy_url = user_data.get("proxy_url") captcha_token = user_data.get("captcha_token", "") # Use get with default # Check if essential data is present (device_id is checked as example) if not device_id: return jsonify({"status": "error", "message": "必要的会话数据丢失,请重新初始化"}) # 创建PikPak实例 (使用从 user_data 获取的数据) pikpak = PikPak( invite_code, client_id, device_id, version, algorithms, email, rtc_token, client_secret, package_name, use_proxy=use_proxy, proxy_http=proxy_url, proxy_https=proxy_url, ) # 从 user_data 设置验证码令牌 pikpak.captcha_token = captcha_token # 尝试验证码验证 max_attempts = 5 captcha_result = None for attempt in range(max_attempts): try: captcha_result = captcha_image_parse(pikpak, device_id) if ( captcha_result and "response_data" in captcha_result and captcha_result["response_data"].get("result") == "accept" ): break time.sleep(2) except Exception as e: time.sleep(2) if ( not captcha_result or "response_data" not in captcha_result or captcha_result["response_data"].get("result") != "accept" ): return jsonify({"status": "error", "message": "滑块验证失败,请重试"}) # 滑块验证加密 try: executor_info = pikpak.executor() if not executor_info: return jsonify({"status": "error", "message": "获取executor信息失败"}) sign_encrypt_info = sign_encrypt( executor_info, pikpak.captcha_token, rtc_token, pikpak.use_proxy, pikpak.proxies, ) if ( not sign_encrypt_info or "request_id" not in sign_encrypt_info or "sign" not in sign_encrypt_info ): return jsonify({"status": "error", "message": "签名加密失败"}) # 更新验证码令牌 report_result = pikpak.report( sign_encrypt_info["request_id"], sign_encrypt_info["sign"], captcha_result["pid"], captcha_result["traceid"], ) # 请求邮箱验证码 verification_result = pikpak.verification() if ( not verification_result or not isinstance(verification_result, dict) or "verification_id" not in verification_result ): return jsonify({"status": "error", "message": "请求验证码失败"}) # 将更新的数据保存到 user_data 中 user_data["captcha_token"] = pikpak.captcha_token user_data["verification_id"] = pikpak.verification_id return jsonify({"status": "success", "message": "验证码已发送到邮箱,请查收"}) except Exception as e: import traceback error_trace = traceback.format_exc() return jsonify( { "status": "error", "message": f"验证过程出错: {str(e)}", "trace": error_trace, } ) def gen_password(): # 生成12位密码 return "".join(random.choices(string.ascii_letters + string.digits, k=12)) @app.route("/api/register", methods=["POST"]) def register(): # 从表单获取验证码和email verification_code = request.form.get("verification_code") email = request.form.get('email') # Get email from form if not email: return jsonify({"status": "error", "message": "请求中未提供Email"}) if not verification_code: return jsonify({"status": "error", "message": "验证码不能为空"}) # 从全局字典获取用户数据 user_data = user_process_data.get(email) if not user_data: return jsonify({"status": "error", "message": "会话数据不存在或已过期,请重新初始化"}) # 从 user_data 中获取存储的数据 device_id = user_data.get("device_id") # email = user_data.get("email") # Already have email invite_code = user_data.get("invite_code") client_id = user_data.get("client_id") version = user_data.get("version") algorithms = user_data.get("algorithms") rtc_token = user_data.get("rtc_token") client_secret = user_data.get("client_secret") package_name = user_data.get("package_name") use_proxy = user_data.get("use_proxy") proxy_url = user_data.get("proxy_url") verification_id = user_data.get("verification_id") captcha_token = user_data.get("captcha_token", "") # Check if essential data is present if not device_id or not verification_id: return jsonify({"status": "error", "message": "必要的会话数据丢失,请重新初始化"}) # 创建PikPak实例 pikpak = PikPak( invite_code, client_id, device_id, version, algorithms, email, rtc_token, client_secret, package_name, use_proxy=use_proxy, proxy_http=proxy_url, proxy_https=proxy_url, ) # 从 user_data 中设置验证码令牌和验证ID pikpak.captcha_token = captcha_token pikpak.verification_id = verification_id # 验证验证码 pikpak.verify_post(verification_code) # 刷新时间戳并加密签名值 pikpak.init("POST:/v1/auth/signup") # 注册并登录 name = email.split("@")[0] password = gen_password() # 默认密码 signup_result = pikpak.signup(name, password, verification_code) # 填写邀请码 pikpak.activation_code() if ( not signup_result or not isinstance(signup_result, dict) or "access_token" not in signup_result ): return jsonify({"status": "error", "message": "注册失败,请检查验证码或重试"}) # 保存账号信息到JSON文件 account_info = { "captcha_token": pikpak.captcha_token, "timestamp": pikpak.timestamp, "name": name, "email": email, "password": password, "device_id": device_id, "version": version, "user_id": signup_result.get("sub", ""), "access_token": signup_result.get("access_token", ""), "refresh_token": signup_result.get("refresh_token", ""), "invite_code": invite_code, } # 保存账号信息 account_file = save_account_info(name, account_info) return jsonify( { "status": "success", "message": "注册成功!账号已保存。", "account_info": account_info, } ) @app.route("/api/test_proxy", methods=["POST"]) def test_proxy_route(): proxy_url = request.form.get("proxy_url", "http://127.0.0.1:7890") result = test_proxy(proxy_url) return jsonify( { "status": "success" if result else "error", "message": "代理连接测试成功" if result else "代理连接测试失败", } ) @app.route("/api/get_verification", methods=["POST"]) def get_verification(): """ 处理获取验证码的请求 """ email_user = request.form["email"] email_password = request.form["password"] # 先尝试从收件箱获取验证码 result = connect_imap(email_user, email_password, "INBOX") # 如果收件箱没有找到验证码,则尝试从垃圾邮件中查找 if result["code"] == 0: result = connect_imap(email_user, email_password, "Junk") return jsonify(result) @app.route("/api/fetch_accounts", methods=["GET"]) def fetch_accounts(): # 获取account文件夹中的所有JSON文件 account_files = [] for file in os.listdir("account"): if file.endswith(".json"): file_path = os.path.join("account", file) try: with open(file_path, "r", encoding="utf-8") as f: account_data = json.load(f) if isinstance(account_data, dict): # 添加文件名属性,用于后续操作 account_data["filename"] = file account_files.append(account_data) except Exception as e: logger.error(f"Error reading {file}: {str(e)}") if not account_files: return jsonify( {"status": "info", "message": "没有找到保存的账号", "accounts": []} ) account_files.sort(key=lambda x: x.get("timestamp", ""), reverse=True) return jsonify( { "status": "success", "message": f"找到 {len(account_files)} 个账号", "accounts": account_files, } ) @app.route("/api/update_account", methods=["POST"]) def update_account(): data = request.json if not data or "filename" not in data or "account_data" not in data: return jsonify({"status": "error", "message": "请求数据不完整"}) filename = data.get("filename") account_data = data.get("account_data") # 安全检查文件名 if not filename or ".." in filename or not filename.endswith(".json"): return jsonify({"status": "error", "message": "无效的文件名"}) file_path = os.path.join("account", filename) try: with open(file_path, "w", encoding="utf-8") as f: json.dump(account_data, f, indent=4, ensure_ascii=False) return jsonify({"status": "success", "message": "账号已成功更新"}) except Exception as e: return jsonify({"status": "error", "message": f"更新账号时出错: {str(e)}"}) @app.route("/api/delete_account", methods=["POST"]) def delete_account(): # 检查是否是单个文件名还是文件名列表 if 'filenames' in request.form: # 批量删除模式 filenames = request.form.getlist('filenames') if not filenames: return jsonify({"status": "error", "message": "未提供文件名"}) results = { "success": [], "failed": [] } for filename in filenames: # 安全检查文件名 if ".." in filename or not filename.endswith(".json"): results["failed"].append({"filename": filename, "reason": "无效的文件名"}) continue file_path = os.path.join("account", filename) try: # 检查文件是否存在 if not os.path.exists(file_path): results["failed"].append({"filename": filename, "reason": "账号文件不存在"}) continue # 删除文件 os.remove(file_path) results["success"].append(filename) except Exception as e: results["failed"].append({"filename": filename, "reason": str(e)}) # 返回批量删除结果 if len(results["success"]) > 0: if len(results["failed"]) > 0: message = f"成功删除 {len(results['success'])} 个账号,{len(results['failed'])} 个账号删除失败" status = "partial" else: message = f"成功删除 {len(results['success'])} 个账号" status = "success" else: message = "所有账号删除失败" status = "error" return jsonify({ "status": status, "message": message, "results": results }) else: # 保持向后兼容 - 单个文件删除模式 filename = request.form.get("filename") if not filename: return jsonify({"status": "error", "message": "未提供文件名"}) # 安全检查文件名 if ".." in filename or not filename.endswith(".json"): return jsonify({"status": "error", "message": "无效的文件名"}) file_path = os.path.join("account", filename) try: # 检查文件是否存在 if not os.path.exists(file_path): return jsonify({"status": "error", "message": "账号文件不存在"}) # 删除文件 os.remove(file_path) return jsonify({"status": "success", "message": "账号已成功删除"}) except Exception as e: return jsonify({"status": "error", "message": f"删除账号时出错: {str(e)}"}) @app.route("/api/activate_account_with_names", methods=["POST"]) def activate_account_with_names(): try: data = request.json key = data.get("key") names = data.get("names", []) # 获取指定的账户名称列表 all_accounts = data.get("all", False) # 获取是否处理所有账户的标志 if not key: return jsonify({"status": "error", "message": "密钥不能为空"}) if not all_accounts and (not names or not isinstance(names, list)): return jsonify({"status": "error", "message": "请提供有效的账户名称列表或设置 all=true"}) # 存储账号数据及其文件路径 accounts_with_paths = [] for file in os.listdir("account"): if file.endswith(".json"): # 如果 all=true 或者文件名在指定的names列表中,则处理该文件 file_name_without_ext = os.path.splitext(file)[0] if all_accounts or file_name_without_ext in names or file in names: file_path = os.path.join("account", file) with open(file_path, "r", encoding="utf-8") as f: account_data = json.load(f) # 保存文件路径以便后续更新 accounts_with_paths.append( {"path": file_path, "data": account_data} ) if not accounts_with_paths: return jsonify({"status": "error", "message": "未找到指定的账号数据"}) # 使用多线程处理每个账号 import threading import queue # 创建结果队列 result_queue = queue.Queue() # 定义线程处理函数 def process_account(account_with_path, account_key, result_q): try: file_path = account_with_path["path"] single_account = account_with_path["data"] response = requests.post( headers={ "Content-Type": "application/json", "referer": "https://inject.kiteyuan.info/", "user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36 Edg/134.0.0.0", }, url="https://inject.kiteyuan.info/infoInject", json={"info": single_account, "key": account_key}, timeout=30, ) # 将结果放入队列 if response.status_code == 200: api_result = response.json() # 检查API是否返回了正确的数据格式 if isinstance(api_result, dict) and api_result.get("code") == 200 and "data" in api_result: # 获取返回的数据对象 account_data = api_result.get("data", {}) if account_data and isinstance(account_data, dict): # 更新账号信息 updated_account = single_account.copy() # 更新令牌信息 (从data子对象中提取) for key in ["access_token", "refresh_token", "captcha_token", "timestamp", "device_id", "user_id"]: if key in account_data: updated_account[key] = account_data[key] # 保存更新后的账号数据 with open(file_path, "w", encoding="utf-8") as f: json.dump(updated_account, f, indent=4, ensure_ascii=False) # 将更新后的数据放入结果队列 result_q.put( { "status": "success", "account": single_account.get("email", "未知邮箱"), "result": account_data, "updated": True, } ) else: # 返回的data不是字典类型 result_q.put( { "status": "error", "account": single_account.get("email", "未知邮箱"), "message": "返回的数据格式不符合预期", "result": api_result, } ) else: # API返回错误码或格式不符合预期 error_msg = api_result.get("msg", "未知错误") result_q.put( { "status": "error", "account": single_account.get("email", "未知邮箱"), "message": f"激活失败: {error_msg}", "result": api_result, } ) else: result_q.put( { "status": "error", "account": single_account.get("email", "未知邮箱"), "message": f"激活失败: HTTP {response.status_code}-{response.json().get('detail', '未知错误')}", "result": response.text, } ) except Exception as e: result_q.put( { "status": "error", "account": single_account.get("email", "未知邮箱"), "message": f"处理失败: {str(e)}", } ) # 创建并启动线程 threads = [] for account_with_path in accounts_with_paths: thread = threading.Thread( target=process_account, args=(account_with_path, key, result_queue) ) threads.append(thread) thread.start() # 等待所有线程完成 for thread in threads: thread.join() # 收集所有结果 results = [] while not result_queue.empty(): results.append(result_queue.get()) # 统计成功和失败的数量 success_count = sum(1 for r in results if r["status"] == "success") updated_count = sum( 1 for r in results if r.get("status") == "success" and r.get("updated", False) ) return jsonify( { "status": "success", "message": f"账号激活完成: {success_count}/{len(accounts_with_paths)}个成功, {updated_count}个已更新数据", "results": results, } ) except Exception as e: return jsonify({"status": "error", "message": f"操作失败: {str(e)}"}) @app.route("/api/check_email_inventory", methods=["GET"]) def check_email_inventory(): try: # 发送请求到库存API response = requests.get( url="https://zizhu.shanyouxiang.com/kucun", headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36" }, timeout=10, ) if response.status_code == 200: return jsonify({"status": "success", "inventory": response.json()}) else: return jsonify( { "status": "error", "message": f"获取库存失败: HTTP {response.status_code}", } ) except Exception as e: return jsonify({"status": "error", "message": f"获取库存时出错: {str(e)}"}) @app.route("/api/check_balance", methods=["GET"]) def check_balance(): try: # 从请求参数中获取卡号 card = request.args.get("card") if not card: return jsonify({"status": "error", "message": "未提供卡号参数"}) # 发送请求到余额查询API response = requests.get( url="https://zizhu.shanyouxiang.com/yue", params={"card": card}, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36" }, timeout=10, ) if response.status_code == 200: return jsonify({"status": "success", "balance": response.json()}) else: return jsonify( { "status": "error", "message": f"查询余额失败: HTTP {response.status_code}", } ) except Exception as e: return jsonify({"status": "error", "message": f"查询余额时出错: {str(e)}"}) @app.route("/api/extract_emails", methods=["GET"]) def extract_emails(): try: # 从请求参数中获取必需的参数 card = request.args.get("card") shuliang = request.args.get("shuliang") leixing = request.args.get("leixing") # 获取前端传递的重试次数计数器,如果没有则初始化为0 frontend_retry_count = int(request.args.get("retry_count", "0")) # 验证必需的参数 if not card: return jsonify({"status": "error", "message": "未提供卡号参数"}) if not shuliang: return jsonify({"status": "error", "message": "未提供提取数量参数"}) if not leixing or leixing not in ["outlook", "hotmail"]: return jsonify( { "status": "error", "message": "提取类型参数无效,必须为 outlook 或 hotmail", } ) # 尝试将数量转换为整数 try: shuliang_int = int(shuliang) if shuliang_int < 1 or shuliang_int > 2000: return jsonify( {"status": "error", "message": "提取数量必须在1到2000之间"} ) except ValueError: return jsonify({"status": "error", "message": "提取数量必须为整数"}) # 后端重试计数器 retry_count = 0 max_retries = 20 # 单次后端请求的最大重试次数 retry_delay = 0 # 每次重试间隔秒数 # 记录总的前端+后端重试次数,用于展示给用户 total_retry_count = frontend_retry_count while retry_count < max_retries: # 发送请求到邮箱提取API response = requests.get( url="https://zizhu.shanyouxiang.com/huoqu", params={"card": card, "shuliang": shuliang, "leixing": leixing}, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36" }, timeout=10, # 降低单次请求的超时时间,以便更快地进行重试 ) if response.status_code == 200: # 检查响应是否为JSON格式,如果是,通常表示没有库存 try: json_response = response.json() if isinstance(json_response, dict) and "msg" in json_response: # 没有库存,需要重试 retry_count += 1 total_retry_count += 1 # 如果达到后端最大重试次数,返回特殊状态让前端继续重试 if retry_count >= max_retries: return jsonify( { "status": "retry", "message": f"暂无库存: {json_response['msg']},已重试{total_retry_count}次,继续尝试中...", "retry_count": total_retry_count, } ) # 等待一段时间后重试 time.sleep(retry_delay) continue except ValueError: # 不是JSON格式,可能是成功的文本列表响应 pass # 处理文本响应 response_text = response.text.strip() # 解析响应文本为邮箱列表 emails = [] if response_text: for line in response_text.split("\n"): if line.strip(): emails.append(line.strip()) # 如果没有实际提取到邮箱(可能是空文本响应),继续重试 if not emails: retry_count += 1 total_retry_count += 1 if retry_count >= max_retries: return jsonify( { "status": "retry", "message": f"未能获取到邮箱,已重试{total_retry_count}次,继续尝试中...", "retry_count": total_retry_count, } ) time.sleep(retry_delay) continue # 成功获取到邮箱,返回结果 return jsonify( { "status": "success", "emails": emails, "count": len(emails), "retries": total_retry_count, "message": f"成功获取{len(emails)}个邮箱,总共重试{total_retry_count}次", } ) else: # 请求失败,返回错误 return jsonify( { "status": "error", "message": f"提取邮箱失败: HTTP {response.status_code}", "response": response.text, } ) # 如果执行到这里,说明超过了最大重试次数 return jsonify( { "status": "retry", "message": f"暂无邮箱库存,已重试{total_retry_count}次,继续尝试中...", "retry_count": total_retry_count, } ) except Exception as e: return jsonify({"status": "error", "message": f"提取邮箱时出错: {str(e)}"}) # --- 新增API:通过EmailClient获取验证码 --- @app.route('/api/get_email_verification_code', methods=['POST']) def get_email_verification_code_api(): """ 通过 EmailClient (通常是基于HTTP API的邮件服务) 获取验证码。 接收 JSON 或 Form data。 必需参数: email, token, client_id 可选参数: password, api_base_url, mailbox, code_regex, max_retries, retry_delay 如果EmailClient方法失败,将尝试使用connect_imap作为备用方法。 如果用户之前已配置代理,也会使用相同的代理设置。 """ global max_retries, retry_delay if request.is_json: data = request.get_json() else: data = request.form email = data.get('email') token = data.get('token') # 对应 EmailClient 的 refresh_token client_id = data.get('client_id') if not all([email, token, client_id]): return jsonify({"status": "error", "message": "缺少必需参数: email, token, client_id"}), 400 # 获取可选参数 password = data.get('password') api_base_url = data.get('api_base_url') # 如果提供,将覆盖 EmailClient 的默认设置 mailbox = data.get('mailbox', "INBOX") code_regex = data.get('code_regex', r'\b\d{6}\b') # 默认匹配6位数字 # 检查是否在用户处理数据中有该邮箱,并提取代理设置 use_proxy = False proxy_url = None if email in user_process_data: user_data = user_process_data.get(email, {}) use_proxy = user_data.get("use_proxy", False) proxy_url = user_data.get("proxy_url", "") if use_proxy else None logger.info(f"为邮箱 {email} 使用代理设置: {use_proxy}, {proxy_url}") try: # 实例化 EmailClient,传入代理设置 email_client = EmailClient(api_base_url=api_base_url) # 设置代理(如果 EmailClient 类支持代理配置) if use_proxy and proxy_url and hasattr(email_client, 'set_proxy'): email_client.set_proxy(proxy_url) elif use_proxy and proxy_url: logger.warning("EmailClient 类不支持设置代理") # 使用重试机制调用获取验证码的方法 verification_code = retry_function( email_client.get_verification_code, token=token, client_id=client_id, email=email, password=password, mailbox=mailbox, code_regex=code_regex, max_retries=max_retries, delay=retry_delay ) if verification_code: return jsonify({"status": "success", "verification_code": verification_code}) else: # EmailClient 失败,尝试使用connect_imap作为备用方法 logger.info(f"EmailClient在{max_retries}次尝试后未能找到验证码,尝试使用connect_imap作为备用方法") # 检查是否有password参数 if not password: return jsonify({"status": "error", "msg": "EmailClient失败,且未提供password参数,无法使用备用方法"}), 200 # 先尝试从收件箱获取验证码,传入代理设置 result = connect_imap(email, password, "INBOX", use_proxy=use_proxy, proxy_url=proxy_url) # 如果收件箱没有找到验证码,则尝试从垃圾邮件中查找 if result["code"] == 0: result = connect_imap(email, password, "Junk", use_proxy=use_proxy, proxy_url=proxy_url) logger.info(f"catch 当前Oauth登录失败,IMAP结果如下:{result['msg']}") result["msg"] = f"当前Oauth登录失败,IMAP结果如下:{result['msg']}" if result["code"] == 0: return jsonify({"status": "error", "msg": "收件箱和垃圾邮件中均未找到验证码"}), 200 elif result["code"] == 200: return jsonify({"status": "success", "verification_code": result["verification_code"], "msg": result["msg"]}) else: return jsonify({"status": "error", "msg": result["msg"]}), 200 except Exception as e: # 捕获实例化或调用过程中的其他潜在错误 logger.error(f"处理 /api/get_email_verification_code 时出错: {str(e)}") import traceback logger.error(traceback.format_exc()) # 如果有password参数,尝试使用connect_imap作为备用方法 if password: logger.info(f"EmailClient出现异常,尝试使用connect_imap作为备用方法") try: # 先尝试从收件箱获取验证码,传入代理设置 result = connect_imap(email, password, "INBOX", use_proxy=use_proxy, proxy_url=proxy_url) # 如果收件箱没有找到验证码,则尝试从垃圾邮件中查找 if result["code"] == 0: result = connect_imap(email, password, "Junk", use_proxy=use_proxy, proxy_url=proxy_url) logger.info(f"catch 当前Oauth登录失败,IMAP结果如下:{result['msg']}") result["msg"] = f"当前Oauth登录失败,IMAP结果如下:{result['msg']}" if result["code"] == 0: return jsonify({"status": "error", "msg": "收件箱和垃圾邮件中均未找到验证码"}), 200 elif result["code"] == 200: return jsonify({"status": "success", "verification_code": result["verification_code"], "msg": result["msg"]}) else: return jsonify({"status": "error", "msg": result["msg"]}), 200 except Exception as backup_error: logger.error(f"备用方法connect_imap也失败: {str(backup_error)}") return jsonify({"status": "error", "message": f"主要和备用验证码获取方法均出现错误"}), 500 return jsonify({"status": "error", "message": f"处理请求时发生内部错误"}), 500 # 处理所有前端路由 @app.route('/', defaults={'path': ''}) @app.route('/') def serve(path): #favicon vite.svg if path == 'favicon.ico' or path == 'vite.svg': return send_from_directory("static", path) # 对于所有其他请求 - 返回index.html (SPA入口点) return render_template('index.html') if __name__ == "__main__": app.run(debug=False, host="0.0.0.0", port=5000)