# coding:utf-8 import hashlib import json import random import time import uuid import requests from PIL import Image from io import BytesIO import base64 import os def ca_f_encrypt(frames, index, pid, use_proxy=False, proxies=None): url = "https://kiteyuan.233285.xyz/cafEncrypt" payload = json.dumps({ "frames": frames, "index": index, "pid": pid }) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", "Connection": "keep-alive", 'Content-Type': 'application/json', "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1" } try: # response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies if use_proxy else None) response = requests.request("POST", url, headers=headers, data=payload, proxies=None) response.raise_for_status() if not response.text: print(f"API响应为空: {url}") return {"f": "", "ca": ["", "", "", ""]} # 解析响应以确保与 text.py 行为一致 result = json.loads(response.text) if "f" not in result or "ca" not in result: print(f"API响应缺少关键字段: {result}") return {"f": "", "ca": ["", "", "", ""]} return result except requests.exceptions.RequestException as e: print(f"API请求失败: {e}") if use_proxy: print(f"当前使用的代理: {proxies}") return {"f": "", "ca": ["", "", "", ""]} except json.JSONDecodeError as e: print(f"JSON解析错误: {e}, 响应内容: {response.text}") return {"f": "", "ca": ["", "", "", ""]} def image_parse(image, frames, use_proxy=False, proxies=None): url = "https://kiteyuan.233285.xyz/imageParse" payload = json.dumps({ "image": image, "frames": frames }) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", "Connection": "keep-alive", 'Content-Type': 'application/json', "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1" } try: # response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies if use_proxy else None) response = requests.request("POST", url, headers=headers, data=payload, proxies=None) response.raise_for_status() # 检查HTTP状态码 if not response.text: print(f"API响应为空: {url}") return {"best_index": 0} # 返回一个默认值 # 解析响应以确保与 text.py 行为一致 result = json.loads(response.text) if "best_index" not in result: print(f"API响应缺少best_index字段: {result}") return {"best_index": 0} return result except requests.exceptions.RequestException as e: print(f"API请求失败: {e}") if use_proxy: print(f"当前使用的代理: {proxies}") return {"best_index": 0} # 返回一个默认值 except json.JSONDecodeError as e: print(f"JSON解析错误: {e}, 响应内容: {response.text}") return {"best_index": 0} # 返回一个默认值 def sign_encrypt(code, captcha_token, rtc_token, use_proxy=False, proxies=None): url = "https://kiteyuan.233285.xyz/signEncrypt" # 检查 code 是否为空或 None if not code: print("code 参数为空,无法进行加密") return {"request_id": "", "sign": ""} # 如果 code 是字符串而不是对象,则直接使用 if isinstance(code, str): payload_data = code else: try: payload_data = json.dumps(code) except (TypeError, ValueError) as e: print(f"code 参数序列化失败: {e}") return {"request_id": "", "sign": ""} try: payload = json.dumps({ "code": payload_data, "captcha_token": captcha_token, "rtc_token": rtc_token }) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", "Connection": "keep-alive", 'Content-Type': 'application/json', "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1" } # response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies if use_proxy else None, timeout=30) response = requests.request("POST", url, headers=headers, data=payload, proxies=None, timeout=30) response.raise_for_status() if not response.text: print(f"API响应为空: {url}") return {"request_id": "", "sign": ""} # 解析响应以确保与 text.py 行为一致 result = json.loads(response.text) if "request_id" not in result or "sign" not in result: print(f"API响应缺少关键字段: {result}") return {"request_id": "", "sign": ""} return result except requests.exceptions.RequestException as e: print(f"API请求失败: {e}") if use_proxy: print(f"当前使用的代理: {proxies}") return {"request_id": "", "sign": ""} except json.JSONDecodeError as e: print(f"JSON解析错误: {e}, 响应内容: {response.text}") return {"request_id": "", "sign": ""} except Exception as e: print(f"未知错误: {e}") return {"request_id": "", "sign": ""} def d_encrypt(pid, device_id, f, use_proxy=False, proxies=None): url = "https://kiteyuan.233285.xyz/dEncrypt" payload = json.dumps({ "pid": pid, "device_id": device_id, "f": f }) headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7", "Connection": "keep-alive", 'Content-Type': 'application/json', "Upgrade-Insecure-Requests": "1", "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "none", "Sec-Fetch-User": "?1" } try: # response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies if use_proxy else None) response = requests.request("POST", url, headers=headers, data=payload, proxies=None) response.raise_for_status() if not response.text: print(f"API响应为空: {url}") return "" return response.text except requests.exceptions.RequestException as e: print(f"API请求失败: {e}") if use_proxy: print(f"当前使用的代理: {proxies}") return "" # md5加密算法 def captcha_sign_encrypt(encrypt_string, salts): for salt in salts: encrypt_string = hashlib.md5((encrypt_string + salt["salt"]).encode("utf-8")).hexdigest() return encrypt_string def captcha_image_parse(pikpak, device_id): try: # 获取frames信息 frames_info = pikpak.gen() if not frames_info or not isinstance(frames_info, dict) or "pid" not in frames_info or "frames" not in frames_info: print("获取frames_info失败,返回内容:", frames_info) return {"response_data": {"result": "reject"}, "pid": "", "traceid": ""} if "traceid" not in frames_info: frames_info["traceid"] = "" # 下载验证码图片 captcha_image = image_download(device_id, frames_info["pid"], frames_info["traceid"], pikpak.use_proxy, pikpak.proxies) if not captcha_image: print("图片下载失败") return {"response_data": {"result": "reject"}, "pid": frames_info["pid"], "traceid": frames_info["traceid"]} # 读取图片数据并转换为 PIL.Image img = Image.open(BytesIO(captcha_image)) # 将图片转换为 Base64 编码 buffered = BytesIO() img.save(buffered, format="PNG") # 可根据图片格式调整 format base64_image = base64.b64encode(buffered.getvalue()).decode() # 获取最佳滑块位置 best_index = image_parse(base64_image, frames_info["frames"], pikpak.use_proxy, pikpak.proxies) if "best_index" not in best_index: print("图片分析失败, 返回内容:", best_index) return {"response_data": {"result": "reject"}, "pid": frames_info["pid"], "traceid": frames_info["traceid"]} # 滑块加密 json_data = ca_f_encrypt(frames_info["frames"], best_index["best_index"], frames_info["pid"], pikpak.use_proxy, pikpak.proxies) if "f" not in json_data or "ca" not in json_data: print("加密计算失败, 返回内容:", json_data) return {"response_data": {"result": "reject"}, "pid": frames_info["pid"], "traceid": frames_info["traceid"]} f = json_data['f'] npac = json_data['ca'] # d加密 d = d_encrypt(frames_info["pid"], device_id, f, pikpak.use_proxy, pikpak.proxies) if not d: print("d_encrypt失败") return {"response_data": {"result": "reject"}, "pid": frames_info["pid"], "traceid": frames_info["traceid"]} # 验证 verify2 = pikpak.image_verify(frames_info["pid"], frames_info["traceid"], f, npac[0], npac[1], npac[2], npac[3], d) return { "response_data": verify2, "pid": frames_info["pid"], "traceid": frames_info["traceid"], } except Exception as e: print(f"滑块验证过程中出错: {e}") import traceback traceback.print_exc() return {"response_data": {"result": "reject"}, "pid": "", "traceid": ""} def image_download(device_id, pid, traceid, use_proxy=False, proxies=None): url = f"https://user.mypikpak.com/pzzl/image?deviceid={device_id}&pid={pid}&traceid={traceid}" headers = { 'pragma': 'no-cache', 'priority': 'u=1, i' } try: response = requests.get(url, headers=headers, proxies=proxies if use_proxy else None) response.raise_for_status() if response.status_code == 200: return response.content # 直接返回图片的二进制数据 else: print(f"下载失败,状态码: {response.status_code}") return None except requests.exceptions.RequestException as e: print(f"图片下载失败: {e}") if use_proxy: print(f"当前使用的代理: {proxies}") return None def ramdom_version(): version_list = [ { "v": "1.42.6", "algorithms": [{"alg": "md5", "salt": "frupTFdxwcJ5mcL3R8"}, {"alg": "md5", "salt": "jB496fSFfbWLhWyqV"}, {"alg": "md5", "salt": "xYLtzn8LT5h3KbAalCjc/Wf"}, {"alg": "md5", "salt": "PSHSbm1SlxbvkwNk4mZrJhBZ1vsHCtEdm3tsRiy1IPUnqi1FNB5a2F"}, {"alg": "md5", "salt": "SX/WvPCRzgkLIp99gDnLaCs0jGn2+urx7vz/"}, {"alg": "md5", "salt": "OGdm+dgLk5EpK4O1nDB+Z4l"}, {"alg": "md5", "salt": "nwtOQpz2xFLIE3EmrDwMKe/Vlw2ubhRcnS2R23bwx9wMh+C3Sg"}, {"alg": "md5", "salt": "FI/9X9jbnTLa61RHprndT0GkVs18Chd"}] }, { "v": "1.47.1", "algorithms": [{'alg': 'md5', 'salt': 'Gez0T9ijiI9WCeTsKSg3SMlx'}, {'alg': 'md5', 'salt': 'zQdbalsolyb1R/'}, {'alg': 'md5', 'salt': 'ftOjr52zt51JD68C3s'}, {'alg': 'md5', 'salt': 'yeOBMH0JkbQdEFNNwQ0RI9T3wU/v'}, {'alg': 'md5', 'salt': 'BRJrQZiTQ65WtMvwO'}, {'alg': 'md5', 'salt': 'je8fqxKPdQVJiy1DM6Bc9Nb1'}, {'alg': 'md5', 'salt': 'niV'}, {'alg': 'md5', 'salt': '9hFCW2R1'}, {'alg': 'md5', 'salt': 'sHKHpe2i96'}, {'alg': 'md5', 'salt': 'p7c5E6AcXQ/IJUuAEC9W6'}, {'alg': 'md5', 'salt': ''}, {'alg': 'md5', 'salt': 'aRv9hjc9P+Pbn+u3krN6'}, {'alg': 'md5', 'salt': 'BzStcgE8qVdqjEH16l4'}, {'alg': 'md5', 'salt': 'SqgeZvL5j9zoHP95xWHt'}, {'alg': 'md5', 'salt': 'zVof5yaJkPe3VFpadPof'}] }, { "v": "1.48.3", "algorithms": [{'alg': 'md5', 'salt': 'aDhgaSE3MsjROCmpmsWqP1sJdFJ'}, {'alg': 'md5', 'salt': '+oaVkqdd8MJuKT+uMr2AYKcd9tdWge3XPEPR2hcePUknd'}, {'alg': 'md5', 'salt': 'u/sd2GgT2fTytRcKzGicHodhvIltMntA3xKw2SRv7S48OdnaQIS5mn'}, {'alg': 'md5', 'salt': '2WZiae2QuqTOxBKaaqCNHCW3olu2UImelkDzBn'}, {'alg': 'md5', 'salt': '/vJ3upic39lgmrkX855Qx'}, {'alg': 'md5', 'salt': 'yNc9ruCVMV7pGV7XvFeuLMOcy1'}, {'alg': 'md5', 'salt': '4FPq8mT3JQ1jzcVxMVfwFftLQm33M7i'}, {'alg': 'md5', 'salt': 'xozoy5e3Ea'}] }, { "v": "1.49.3", "algorithms": [{'alg': 'md5', 'salt': '7xOq4Z8s'}, {'alg': 'md5', 'salt': 'QE9/9+IQco'}, {'alg': 'md5', 'salt': 'WdX5J9CPLZp'}, {'alg': 'md5', 'salt': 'NmQ5qFAXqH3w984cYhMeC5TJR8j'}, {'alg': 'md5', 'salt': 'cc44M+l7GDhav'}, {'alg': 'md5', 'salt': 'KxGjo/wHB+Yx8Lf7kMP+/m9I+'}, {'alg': 'md5', 'salt': 'wla81BUVSmDkctHDpUT'}, {'alg': 'md5', 'salt': 'c6wMr1sm1WxiR3i8LDAm3W'}, {'alg': 'md5', 'salt': 'hRLrEQCFNYi0PFPV'}, {'alg': 'md5', 'salt': 'o1J41zIraDtJPNuhBu7Ifb/q3'}, {'alg': 'md5', 'salt': 'U'}, {'alg': 'md5', 'salt': 'RrbZvV0CTu3gaZJ56PVKki4IeP'}, {'alg': 'md5', 'salt': 'NNuRbLckJqUp1Do0YlrKCUP'}, {'alg': 'md5', 'salt': 'UUwnBbipMTvInA0U0E9'}, {'alg': 'md5', 'salt': 'VzGc'}] }, { "v": "1.51.2", "algorithms": [{'alg': 'md5', 'salt': 'vPjelkvqcWoCsQO1CnkVod8j2GbcE0yEHEwJ3PKSKW'}, {'alg': 'md5', 'salt': 'Rw5aO9MHuhY'}, {'alg': 'md5', 'salt': 'Gk111qdZkPw/xgj'}, {'alg': 'md5', 'salt': '/aaQ4/f8HNpyzPOtIF3rG/UEENiRRvpIXku3WDWZHuaIq+0EOF'}, {'alg': 'md5', 'salt': '6p1gxZhV0CNuKV2QO5vpibkR8IJeFURvqNIKXWOIyv1A'}, {'alg': 'md5', 'salt': 'gWR'}, {'alg': 'md5', 'salt': 'iPD'}, {'alg': 'md5', 'salt': 'ASEm+P75YfKzQRW6eRDNNTd'}, {'alg': 'md5', 'salt': '2fauuwVCxLCpL/FQ/iJ5NpOPb7gRZs0EWJwe/2YNPQr3ore+ZiIri6s/tYayG'}] } ] return version_list[0] # return random.choice(version_list) def random_rtc_token(): # 生成 8 组 16 进制数,每组 4 位,使用冒号分隔 ipv6_parts = ["{:04x}".format(random.randint(0, 0xFFFF)) for _ in range(8)] ipv6_address = ":".join(ipv6_parts) return ipv6_address class PikPak: def __init__(self, invite_code, client_id, device_id, version, algorithms, email, rtc_token, client_secret, package_name, use_proxy=False, proxy_http=None, proxy_https=None): # 初始化实例属性 self.invite_code = invite_code # 邀请码 self.client_id = client_id # 客户端ID self.device_id = device_id # 设备ID self.timestamp = 0 # 时间戳 self.algorithms = algorithms # 版本盐值 self.version = version # 版本 self.email = email # 邮箱 self.rtc_token = rtc_token # RTC Token self.captcha_token = "" # Captcha Token self.client_secret = client_secret # Client Secret self.user_id = "" # 用户ID self.access_token = "" # 登录令牌 self.refresh_token = "" # 刷新令牌 self.verification_token = "" # Verification Token self.captcha_sign = "" # Captcha Sign self.verification_id = "" # Verification ID self.package_name = package_name # 客户端包名 self.use_proxy = use_proxy # 是否使用代理 # 代理配置 if use_proxy: self.proxies = { "http": proxy_http or "http://127.0.0.1:7890", "https": proxy_https or "http://127.0.0.1:7890", } else: self.proxies = None def send_request(self, method, url, headers=None, params=None, json_data=None, data=None, use_proxy=None): headers = headers or {} # 如果未指定use_proxy,则使用类的全局设置 use_proxy = self.use_proxy if use_proxy is None else use_proxy # 确保当use_proxy为True时,有可用的代理配置 if use_proxy and not self.proxies: # 如果类的use_proxy为True但proxies未设置,使用默认代理 proxies = { "http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890" } else: proxies = self.proxies if use_proxy else None try: response = requests.request( method=method, url=url, headers=headers, params=params, json=json_data, data=data, proxies=proxies, timeout=30 # 添加超时设置 ) response.raise_for_status() # 检查HTTP状态码 print(response.text) try: return response.json() except json.JSONDecodeError: return response.text except requests.exceptions.RequestException as e: print(f"请求失败: {url}, 错误: {e}") if use_proxy: print(f"当前使用的代理: {proxies}") # 返回一个空的响应对象 return {} def gen(self): url = "https://user.mypikpak.com/pzzl/gen" params = {"deviceid": self.device_id, "traceid": ""} headers = {"Host": "user.mypikpak.com", "accept": "application/json, text/plain, */*"} response = self.send_request("GET", url, headers=headers, params=params) # 检查响应是否有效 if not response or not isinstance(response, dict) or "pid" not in response or "frames" not in response: print(f"gen请求返回无效响应: {response}") return response def image_verify(self, pid, trace_id, f, n, p, a, c, d): url = "https://user.mypikpak.com/pzzl/verify" params = {"pid": pid, "deviceid": self.device_id, "traceid": trace_id, "f": f, "n": n, "p": p, "a": a, "c": c, "d": d} headers = {"Host": "user.mypikpak.com", "accept": "application/json, text/plain, */*"} response = self.send_request("GET", url, headers=headers, params=params) # 检查响应是否有效 if not response or not isinstance(response, dict) or "result" not in response: print(f"image_verify请求返回无效响应: {response}") return {"result": "reject"} return response def executor(self): url = "https://api-drive.mypikpak.com/captcha-jsonp/v2/executor?callback=handleJsonpResult_" + str(int(time.time() * 1000)) headers = {'pragma': 'no-cache', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'} try: # 使用普通 requests 而不是 self.send_request 以获取原始响应 response = requests.get(url, headers=headers, proxies=self.proxies if self.use_proxy else None, timeout=30) response.raise_for_status() # 检查 HTTP 状态码 content = response.text print(f"executor 原始响应: {content}") # 如果内容为空,直接返回空字符串 if not content: print("executor 响应内容为空") return "" # 处理 JSONP 响应格式 if "handleJsonpResult" in content: # 提取 JSON 部分,JSONP 格式通常是 callback(json数据) start_index = content.find('(') end_index = content.rfind(')') if start_index != -1 and end_index != -1: json_str = content[start_index + 1:end_index] # 有时 JSONP 响应中包含反引号,需要去除 if json_str.startswith('`') and json_str.endswith('`'): json_str = json_str[1:-1] return json_str else: print(f"无法从JSONP响应中提取有效内容: {content}") return "" elif isinstance(content, str) and (content.startswith('{') or content.startswith('[')): # 可能是直接返回的 JSON 字符串 return content else: print(f"未知的响应格式: {content}") return "" except requests.exceptions.RequestException as e: print(f"执行 executor 请求失败: {e}") return "" except Exception as e: print(f"解析 executor 响应失败: {e}") return "" def report(self, request_id, sign, pid, trace_id): url = "https://user.mypikpak.com/credit/v1/report" params = { "deviceid": self.device_id, "captcha_token": self.captcha_token, "request_id": request_id, "sign": sign, "type": "pzzlSlider", "result": 0, "data": pid, "traceid": trace_id, "rtc_token": self.rtc_token } headers = {'pragma': 'no-cache', 'priority': 'u=1, i'} response = self.send_request("GET", url, params=params, headers=headers) # 检查响应是否有效 if not response or not isinstance(response, dict) or "captcha_token" not in response: print(f"report请求返回无效响应: {response}") else: self.captcha_token = response.get('captcha_token') return response def verification(self): url = 'https://user.mypikpak.com/v1/auth/verification' params = {"email": self.email, "target": "ANY", "usage": "REGISTER", "locale": "zh-CN", "client_id": self.client_id} headers = {'host': 'user.mypikpak.com', 'x-captcha-token': self.captcha_token, 'x-device-id': self.device_id, "x-client-id": self.client_id} response = self.send_request("POST", url, headers=headers, data=params) # 检查响应是否有效 if not response or not isinstance(response, dict) or "verification_id" not in response: print(f"verification请求返回无效响应: {response}") else: self.verification_id = response.get('verification_id') return response def verify_post(self, verification_code): url = "https://user.mypikpak.com/v1/auth/verification/verify" params = {"client_id": self.client_id} payload = {"client_id": self.client_id, "verification_id": self.verification_id, "verification_code": verification_code} headers = {"X-Device-Id": self.device_id} response = self.send_request("POST", url, headers=headers, json_data=payload, params=params) # 检查响应是否有效 if not response or not isinstance(response, dict) or "verification_token" not in response: print(f"verify_post请求返回无效响应: {response}") else: self.verification_token = response.get('verification_token') return response def init(self, action): self.refresh_captcha_sign() url = "https://user.mypikpak.com/v1/shield/captcha/init" params = {"client_id": self.client_id} payload = { "action": action, "captcha_token": self.captcha_token, "client_id": self.client_id, "device_id": self.device_id, "meta": { "captcha_sign": "1." + self.captcha_sign, "user_id": self.user_id, "package_name": self.package_name, "client_version": self.version, "email": self.email, "timestamp": self.timestamp } } headers = {"x-device-id": self.device_id} response = self.send_request("POST", url, headers=headers, json_data=payload, params=params) # 检查响应是否有效 if not response or not isinstance(response, dict) or "captcha_token" not in response: print(f"init请求返回无效响应: {response}") else: self.captcha_token = response.get('captcha_token') return response def signup(self, name, password, verification_code): url = "https://user.mypikpak.com/v1/auth/signup" params = {"client_id": self.client_id} payload = { "captcha_token": self.captcha_token, "client_id": self.client_id, "client_secret": self.client_secret, "email": self.email, "name": name, "password": password, "verification_code": verification_code, "verification_token": self.verification_token } headers = {"X-Device-Id": self.device_id} response = self.send_request("POST", url, headers=headers, json_data=payload, params=params) # 检查响应是否有效 if not response or not isinstance(response, dict): print(f"signup请求返回无效响应: {response}") else: self.access_token = response.get('access_token', '') self.refresh_token = response.get('refresh_token', '') self.user_id = response.get('sub', '') return response def activation_code(self): url = "https://api-drive.mypikpak.com/vip/v1/order/activation-code" payload = {"activation_code": self.invite_code, "data": {}} headers = { "Host": "api-drive.mypikpak.com", "authorization": "Bearer " + self.access_token, "x-captcha-token": self.captcha_token, "x-device-id": self.device_id, 'x-system-language': "ko", 'content-type': 'application/json' } response = self.send_request("POST", url, headers=headers, json_data=payload) # 检查响应是否有效 if not response or not isinstance(response, dict): print(f"activation_code请求返回无效响应: {response}") return response def files_task(self, task_link): url = "https://api-drive.mypikpak.com/drive/v1/files" payload = { "kind": "drive#file", "folder_type": "DOWNLOAD", "upload_type": "UPLOAD_TYPE_URL", "url": {"url": task_link}, "params": {"with_thumbnail": "true", "from": "manual"} } headers = { "Authorization": "Bearer " + self.access_token, "x-device-id": self.device_id, "x-captcha-token": self.captcha_token, "Content-Type": "application/json" } response = self.send_request("POST", url, headers=headers, json_data=payload) # 检查响应是否有效 if not response or not isinstance(response, dict): print(f"files_task请求返回无效响应: {response}") return response def refresh_captcha_sign(self): self.timestamp = str(int(time.time()) * 1000) encrypt_string = self.client_id + self.version + self.package_name + self.device_id + self.timestamp self.captcha_sign = captcha_sign_encrypt(encrypt_string, self.algorithms) def save_account_info(name, account_info): # 保证account目录存在 if not os.path.exists("./account"): os.makedirs("./account") with open("./account/" + name + ".json", "w", encoding="utf-8") as f: json.dump(account_info, f, ensure_ascii=False, indent=4) def test_proxy(proxy_url): """测试代理连接是否可用""" test_url = "https://mypikpak.com" # 改为 PikPak 的网站,更可能连通 proxies = { "http": proxy_url, "https": proxy_url } try: response = requests.get(test_url, proxies=proxies, timeout=10) # 增加超时时间 response.raise_for_status() print(f"代理连接测试成功: {proxy_url}") return True except Exception as e: print(f"代理连接测试失败: {proxy_url}, 错误: {e}") return False # 程序运行主函数 def main(): try: # 1、初始化参数 current_version = ramdom_version() version = current_version['v'] algorithms = current_version['algorithms'] client_id = "YNxT9w7GMdWvEOKa" client_secret = "dbw2OtmVEeuUvIptb1Coyg" package_name = "com.pikcloud.pikpak" device_id = str(uuid.uuid4()).replace("-", "") rtc_token = random_rtc_token() print(f"当前版本:{version} 设备号:{device_id} 令牌:{rtc_token}") # 询问用户是否使用代理 use_proxy_input = input('是否启用代理(y/n):').strip().lower() use_proxy = use_proxy_input == 'y' or use_proxy_input == 'yes' proxy_http = None proxy_https = None if use_proxy: # 询问用户是否使用默认代理 default_proxy = input('是否使用默认代理地址 http://127.0.0.1:7890 (y/n):').strip().lower() if default_proxy == 'y' or default_proxy == 'yes': proxy_url = "http://127.0.0.1:7890" print("已启用代理,使用默认地址:", proxy_url) # 测试默认代理连接 if not test_proxy(proxy_url): retry = input("默认代理连接测试失败,是否继续使用(y/n):").strip().lower() if retry != 'y' and retry != 'yes': print("已取消代理设置,将直接连接") use_proxy = False proxy_url = None if use_proxy: proxy_http = proxy_url proxy_https = proxy_url else: # 用户自定义代理地址和端口 proxy_host = input('请输入代理主机地址 (默认127.0.0.1): ').strip() proxy_host = proxy_host if proxy_host else '127.0.0.1' proxy_port = input('请输入代理端口 (默认7890): ').strip() proxy_port = proxy_port if proxy_port else '7890' proxy_protocol = input('请输入代理协议 (http/https/socks5,默认http): ').strip().lower() proxy_protocol = proxy_protocol if proxy_protocol in ['http', 'https', 'socks5'] else 'http' proxy_url = f"{proxy_protocol}://{proxy_host}:{proxy_port}" print(f"已设置代理地址: {proxy_url}") # 测试自定义代理连接 if not test_proxy(proxy_url): retry = input("自定义代理连接测试失败,是否继续使用(y/n):").strip().lower() if retry != 'y' and retry != 'yes': print("已取消代理设置,将直接连接") use_proxy = False proxy_url = None if use_proxy: proxy_http = proxy_url proxy_https = proxy_url else: print("未启用代理,直接连接") invite_code = input('请输入你的邀请码:') email = input("请输入注册用的邮箱:") # 2、实例化PikPak类,传入代理设置 pikpak = PikPak(invite_code, client_id, device_id, version, algorithms, email, rtc_token, client_secret, package_name, use_proxy=use_proxy, proxy_http=proxy_http, proxy_https=proxy_https) # 3、刷新timestamp,加密sign值。 init_result = pikpak.init("POST:/v1/auth/verification") if not init_result or not isinstance(init_result, dict) or "captcha_token" not in init_result: print("初始化失败,请检查网络连接或代理设置") input("按任意键退出程序") return # 4、图片滑块分析 max_attempts = 5 # 最大尝试次数 captcha_result = None for attempt in range(max_attempts): print(f"尝试滑块验证 ({attempt+1}/{max_attempts})...") try: captcha_result = captcha_image_parse(pikpak, device_id) print(captcha_result) if captcha_result and "response_data" in captcha_result and captcha_result['response_data'].get('result') == 'accept': print("滑块验证成功!") break else: print('滑块验证失败, 正在重新尝试...') time.sleep(2) # 延迟2秒再次尝试 except Exception as e: print(f"滑块验证过程出错: {e}") import traceback traceback.print_exc() time.sleep(2) # 出错后延迟2秒再次尝试 if not captcha_result or "response_data" not in captcha_result or captcha_result['response_data'].get('result') != 'accept': print("滑块验证失败,达到最大尝试次数") input("按任意键退出程序") return # 5、滑块验证加密 try: executor_info = pikpak.executor() if not executor_info: print("获取executor信息失败") input("按任意键退出程序") return sign_encrypt_info = sign_encrypt(executor_info, pikpak.captcha_token, rtc_token, pikpak.use_proxy, pikpak.proxies) if not sign_encrypt_info or "request_id" not in sign_encrypt_info or "sign" not in sign_encrypt_info: print("签名加密失败") print(f"executor_info: {executor_info}") print(f"captcha_token: {pikpak.captcha_token}") print(f"rtc_token: {rtc_token}") input("按任意键退出程序") return # 更新 captcha_token pikpak.report(sign_encrypt_info['request_id'], sign_encrypt_info['sign'], captcha_result['pid'], captcha_result['traceid']) # 发送邮箱验证码 verification_result = pikpak.verification() if not verification_result or not isinstance(verification_result, dict) or "verification_id" not in verification_result: print("请求验证码失败") input("按任意键退出程序") return except Exception as e: print(f"验证过程出错: {e}") import traceback traceback.print_exc() input("按任意键退出程序") return # 6、提交验证码 verification_code = input("请输入接收到的验证码:") pikpak.verify_post(verification_code) # 7、刷新timestamp,加密sign值 pikpak.init("POST:/v1/auth/signup") # 8、注册登录 name = email.split("@")[0] password = "zhiyuan233" pikpak.signup(name, password, verification_code) # 9、填写邀请码 pikpak.activation_code() # 准备账号信息 account_info = { "version": pikpak.version, "device_id": pikpak.device_id, "email": pikpak.email, "captcha_token": pikpak.captcha_token, "access_token": pikpak.access_token, "refresh_token": pikpak.refresh_token, "user_id": pikpak.user_id, "timestamp": pikpak.timestamp, "password": password, "name": name } print("请保存好账号信息备用:", json.dumps(account_info, indent=4, ensure_ascii=False)) # 确认是否保存账号信息 save_info = input("是否保存账号信息到文件(y/n):").strip().lower() if save_info == 'y' or save_info == 'yes': try: # 创建account目录(如果不存在) if not os.path.exists("./account"): os.makedirs("./account") save_account_info(name, account_info) print(f"账号信息已保存到 ./account/{name}.json") except Exception as e: print(f"保存账号信息失败: {e}") input("运行完成,回车结束程序:") except KeyboardInterrupt: print("\n程序被用户中断") except Exception as e: print(f"程序运行出错: {e}") import traceback traceback.print_exc() input("按任意键退出程序") if __name__ == "__main__": print("开发者声明:免费转载需标注出处:B站-纸鸢花的花语,此工具仅供交流学习和技术分析,严禁用于任何商业牟利行为。(包括但不限于倒卖、二改倒卖、引流、冒充作者、广告植入...)") main()