pikpakauto / utils /pikpak.py
Leeflour's picture
Update utils/pikpak.py
62ee16c verified
# coding:utf-8
import hashlib
import json
import random
import time
import uuid
import requests
from PIL import Image
from io import BytesIO
import base64
import os
def ca_f_encrypt(frames, index, pid, use_proxy=False, proxies=None):
url = "https://kiteyuan.233285.xyz/cafEncrypt"
payload = json.dumps({
"frames": frames,
"index": index,
"pid": pid
})
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"Connection": "keep-alive",
'Content-Type': 'application/json',
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1"
}
try:
# response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies if use_proxy else None)
response = requests.request("POST", url, headers=headers, data=payload, proxies=None)
response.raise_for_status()
if not response.text:
print(f"API响应为空: {url}")
return {"f": "", "ca": ["", "", "", ""]}
# 解析响应以确保与 text.py 行为一致
result = json.loads(response.text)
if "f" not in result or "ca" not in result:
print(f"API响应缺少关键字段: {result}")
return {"f": "", "ca": ["", "", "", ""]}
return result
except requests.exceptions.RequestException as e:
print(f"API请求失败: {e}")
if use_proxy:
print(f"当前使用的代理: {proxies}")
return {"f": "", "ca": ["", "", "", ""]}
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}, 响应内容: {response.text}")
return {"f": "", "ca": ["", "", "", ""]}
def image_parse(image, frames, use_proxy=False, proxies=None):
url = "https://kiteyuan.233285.xyz/imageParse"
payload = json.dumps({
"image": image,
"frames": frames
})
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"Connection": "keep-alive",
'Content-Type': 'application/json',
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1"
}
try:
# response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies if use_proxy else None)
response = requests.request("POST", url, headers=headers, data=payload, proxies=None)
response.raise_for_status() # 检查HTTP状态码
if not response.text:
print(f"API响应为空: {url}")
return {"best_index": 0} # 返回一个默认值
# 解析响应以确保与 text.py 行为一致
result = json.loads(response.text)
if "best_index" not in result:
print(f"API响应缺少best_index字段: {result}")
return {"best_index": 0}
return result
except requests.exceptions.RequestException as e:
print(f"API请求失败: {e}")
if use_proxy:
print(f"当前使用的代理: {proxies}")
return {"best_index": 0} # 返回一个默认值
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}, 响应内容: {response.text}")
return {"best_index": 0} # 返回一个默认值
def sign_encrypt(code, captcha_token, rtc_token, use_proxy=False, proxies=None):
url = "https://kiteyuan.233285.xyz/signEncrypt"
# 检查 code 是否为空或 None
if not code:
print("code 参数为空,无法进行加密")
return {"request_id": "", "sign": ""}
# 如果 code 是字符串而不是对象,则直接使用
if isinstance(code, str):
payload_data = code
else:
try:
payload_data = json.dumps(code)
except (TypeError, ValueError) as e:
print(f"code 参数序列化失败: {e}")
return {"request_id": "", "sign": ""}
try:
payload = json.dumps({
"code": payload_data,
"captcha_token": captcha_token,
"rtc_token": rtc_token
})
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"Connection": "keep-alive",
'Content-Type': 'application/json',
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1"
}
# response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies if use_proxy else None, timeout=30)
response = requests.request("POST", url, headers=headers, data=payload, proxies=None, timeout=30)
response.raise_for_status()
if not response.text:
print(f"API响应为空: {url}")
return {"request_id": "", "sign": ""}
# 解析响应以确保与 text.py 行为一致
result = json.loads(response.text)
if "request_id" not in result or "sign" not in result:
print(f"API响应缺少关键字段: {result}")
return {"request_id": "", "sign": ""}
return result
except requests.exceptions.RequestException as e:
print(f"API请求失败: {e}")
if use_proxy:
print(f"当前使用的代理: {proxies}")
return {"request_id": "", "sign": ""}
except json.JSONDecodeError as e:
print(f"JSON解析错误: {e}, 响应内容: {response.text}")
return {"request_id": "", "sign": ""}
except Exception as e:
print(f"未知错误: {e}")
return {"request_id": "", "sign": ""}
def d_encrypt(pid, device_id, f, use_proxy=False, proxies=None):
url = "https://kiteyuan.233285.xyz/dEncrypt"
payload = json.dumps({
"pid": pid,
"device_id": device_id,
"f": f
})
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"Accept-Language": "zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7",
"Connection": "keep-alive",
'Content-Type': 'application/json',
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1"
}
try:
# response = requests.request("POST", url, headers=headers, data=payload, proxies=proxies if use_proxy else None)
response = requests.request("POST", url, headers=headers, data=payload, proxies=None)
response.raise_for_status()
if not response.text:
print(f"API响应为空: {url}")
return ""
return response.text
except requests.exceptions.RequestException as e:
print(f"API请求失败: {e}")
if use_proxy:
print(f"当前使用的代理: {proxies}")
return ""
# md5加密算法
def captcha_sign_encrypt(encrypt_string, salts):
for salt in salts:
encrypt_string = hashlib.md5((encrypt_string + salt["salt"]).encode("utf-8")).hexdigest()
return encrypt_string
def captcha_image_parse(pikpak, device_id):
try:
# 获取frames信息
frames_info = pikpak.gen()
if not frames_info or not isinstance(frames_info, dict) or "pid" not in frames_info or "frames" not in frames_info:
print("获取frames_info失败,返回内容:", frames_info)
return {"response_data": {"result": "reject"}, "pid": "", "traceid": ""}
if "traceid" not in frames_info:
frames_info["traceid"] = ""
# 下载验证码图片
captcha_image = image_download(device_id, frames_info["pid"], frames_info["traceid"], pikpak.use_proxy, pikpak.proxies)
if not captcha_image:
print("图片下载失败")
return {"response_data": {"result": "reject"}, "pid": frames_info["pid"], "traceid": frames_info["traceid"]}
# 读取图片数据并转换为 PIL.Image
img = Image.open(BytesIO(captcha_image))
# 将图片转换为 Base64 编码
buffered = BytesIO()
img.save(buffered, format="PNG") # 可根据图片格式调整 format
base64_image = base64.b64encode(buffered.getvalue()).decode()
# 获取最佳滑块位置
best_index = image_parse(base64_image, frames_info["frames"], pikpak.use_proxy, pikpak.proxies)
if "best_index" not in best_index:
print("图片分析失败, 返回内容:", best_index)
return {"response_data": {"result": "reject"}, "pid": frames_info["pid"], "traceid": frames_info["traceid"]}
# 滑块加密
json_data = ca_f_encrypt(frames_info["frames"], best_index["best_index"], frames_info["pid"], pikpak.use_proxy, pikpak.proxies)
if "f" not in json_data or "ca" not in json_data:
print("加密计算失败, 返回内容:", json_data)
return {"response_data": {"result": "reject"}, "pid": frames_info["pid"], "traceid": frames_info["traceid"]}
f = json_data['f']
npac = json_data['ca']
# d加密
d = d_encrypt(frames_info["pid"], device_id, f, pikpak.use_proxy, pikpak.proxies)
if not d:
print("d_encrypt失败")
return {"response_data": {"result": "reject"}, "pid": frames_info["pid"], "traceid": frames_info["traceid"]}
# 验证
verify2 = pikpak.image_verify(frames_info["pid"], frames_info["traceid"], f, npac[0], npac[1], npac[2], npac[3], d)
return {
"response_data": verify2,
"pid": frames_info["pid"],
"traceid": frames_info["traceid"],
}
except Exception as e:
print(f"滑块验证过程中出错: {e}")
import traceback
traceback.print_exc()
return {"response_data": {"result": "reject"}, "pid": "", "traceid": ""}
def image_download(device_id, pid, traceid, use_proxy=False, proxies=None):
url = f"https://user.mypikpak.com/pzzl/image?deviceid={device_id}&pid={pid}&traceid={traceid}"
headers = {
'pragma': 'no-cache',
'priority': 'u=1, i'
}
try:
response = requests.get(url, headers=headers, proxies=proxies if use_proxy else None)
response.raise_for_status()
if response.status_code == 200:
return response.content # 直接返回图片的二进制数据
else:
print(f"下载失败,状态码: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"图片下载失败: {e}")
if use_proxy:
print(f"当前使用的代理: {proxies}")
return None
def ramdom_version():
version_list = [
{
"v": "1.42.6",
"algorithms": [{"alg": "md5", "salt": "frupTFdxwcJ5mcL3R8"},
{"alg": "md5", "salt": "jB496fSFfbWLhWyqV"},
{"alg": "md5", "salt": "xYLtzn8LT5h3KbAalCjc/Wf"},
{"alg": "md5", "salt": "PSHSbm1SlxbvkwNk4mZrJhBZ1vsHCtEdm3tsRiy1IPUnqi1FNB5a2F"},
{"alg": "md5", "salt": "SX/WvPCRzgkLIp99gDnLaCs0jGn2+urx7vz/"},
{"alg": "md5", "salt": "OGdm+dgLk5EpK4O1nDB+Z4l"},
{"alg": "md5", "salt": "nwtOQpz2xFLIE3EmrDwMKe/Vlw2ubhRcnS2R23bwx9wMh+C3Sg"},
{"alg": "md5", "salt": "FI/9X9jbnTLa61RHprndT0GkVs18Chd"}]
},
{
"v": "1.47.1",
"algorithms": [{'alg': 'md5', 'salt': 'Gez0T9ijiI9WCeTsKSg3SMlx'}, {'alg': 'md5', 'salt': 'zQdbalsolyb1R/'},
{'alg': 'md5', 'salt': 'ftOjr52zt51JD68C3s'},
{'alg': 'md5', 'salt': 'yeOBMH0JkbQdEFNNwQ0RI9T3wU/v'},
{'alg': 'md5', 'salt': 'BRJrQZiTQ65WtMvwO'},
{'alg': 'md5', 'salt': 'je8fqxKPdQVJiy1DM6Bc9Nb1'},
{'alg': 'md5', 'salt': 'niV'}, {'alg': 'md5', 'salt': '9hFCW2R1'},
{'alg': 'md5', 'salt': 'sHKHpe2i96'},
{'alg': 'md5', 'salt': 'p7c5E6AcXQ/IJUuAEC9W6'}, {'alg': 'md5', 'salt': ''},
{'alg': 'md5', 'salt': 'aRv9hjc9P+Pbn+u3krN6'},
{'alg': 'md5', 'salt': 'BzStcgE8qVdqjEH16l4'},
{'alg': 'md5', 'salt': 'SqgeZvL5j9zoHP95xWHt'},
{'alg': 'md5', 'salt': 'zVof5yaJkPe3VFpadPof'}]
},
{
"v": "1.48.3",
"algorithms": [{'alg': 'md5', 'salt': 'aDhgaSE3MsjROCmpmsWqP1sJdFJ'},
{'alg': 'md5', 'salt': '+oaVkqdd8MJuKT+uMr2AYKcd9tdWge3XPEPR2hcePUknd'},
{'alg': 'md5', 'salt': 'u/sd2GgT2fTytRcKzGicHodhvIltMntA3xKw2SRv7S48OdnaQIS5mn'},
{'alg': 'md5', 'salt': '2WZiae2QuqTOxBKaaqCNHCW3olu2UImelkDzBn'},
{'alg': 'md5', 'salt': '/vJ3upic39lgmrkX855Qx'},
{'alg': 'md5', 'salt': 'yNc9ruCVMV7pGV7XvFeuLMOcy1'},
{'alg': 'md5', 'salt': '4FPq8mT3JQ1jzcVxMVfwFftLQm33M7i'},
{'alg': 'md5', 'salt': 'xozoy5e3Ea'}]
},
{
"v": "1.49.3",
"algorithms": [{'alg': 'md5', 'salt': '7xOq4Z8s'}, {'alg': 'md5', 'salt': 'QE9/9+IQco'},
{'alg': 'md5', 'salt': 'WdX5J9CPLZp'}, {'alg': 'md5', 'salt': 'NmQ5qFAXqH3w984cYhMeC5TJR8j'},
{'alg': 'md5', 'salt': 'cc44M+l7GDhav'}, {'alg': 'md5', 'salt': 'KxGjo/wHB+Yx8Lf7kMP+/m9I+'},
{'alg': 'md5', 'salt': 'wla81BUVSmDkctHDpUT'},
{'alg': 'md5', 'salt': 'c6wMr1sm1WxiR3i8LDAm3W'},
{'alg': 'md5', 'salt': 'hRLrEQCFNYi0PFPV'},
{'alg': 'md5', 'salt': 'o1J41zIraDtJPNuhBu7Ifb/q3'},
{'alg': 'md5', 'salt': 'U'}, {'alg': 'md5', 'salt': 'RrbZvV0CTu3gaZJ56PVKki4IeP'},
{'alg': 'md5', 'salt': 'NNuRbLckJqUp1Do0YlrKCUP'},
{'alg': 'md5', 'salt': 'UUwnBbipMTvInA0U0E9'},
{'alg': 'md5', 'salt': 'VzGc'}]
},
{
"v": "1.51.2",
"algorithms": [{'alg': 'md5', 'salt': 'vPjelkvqcWoCsQO1CnkVod8j2GbcE0yEHEwJ3PKSKW'},
{'alg': 'md5', 'salt': 'Rw5aO9MHuhY'}, {'alg': 'md5', 'salt': 'Gk111qdZkPw/xgj'},
{'alg': 'md5', 'salt': '/aaQ4/f8HNpyzPOtIF3rG/UEENiRRvpIXku3WDWZHuaIq+0EOF'},
{'alg': 'md5', 'salt': '6p1gxZhV0CNuKV2QO5vpibkR8IJeFURvqNIKXWOIyv1A'},
{'alg': 'md5', 'salt': 'gWR'},
{'alg': 'md5', 'salt': 'iPD'}, {'alg': 'md5', 'salt': 'ASEm+P75YfKzQRW6eRDNNTd'},
{'alg': 'md5', 'salt': '2fauuwVCxLCpL/FQ/iJ5NpOPb7gRZs0EWJwe/2YNPQr3ore+ZiIri6s/tYayG'}]
}
]
return version_list[0]
# return random.choice(version_list)
def random_rtc_token():
# 生成 8 组 16 进制数,每组 4 位,使用冒号分隔
ipv6_parts = ["{:04x}".format(random.randint(0, 0xFFFF)) for _ in range(8)]
ipv6_address = ":".join(ipv6_parts)
return ipv6_address
class PikPak:
def __init__(self, invite_code, client_id, device_id, version, algorithms, email, rtc_token,
client_secret, package_name, use_proxy=False, proxy_http=None, proxy_https=None):
# 初始化实例属性
self.invite_code = invite_code # 邀请码
self.client_id = client_id # 客户端ID
self.device_id = device_id # 设备ID
self.timestamp = 0 # 时间戳
self.algorithms = algorithms # 版本盐值
self.version = version # 版本
self.email = email # 邮箱
self.rtc_token = rtc_token # RTC Token
self.captcha_token = "" # Captcha Token
self.client_secret = client_secret # Client Secret
self.user_id = "" # 用户ID
self.access_token = "" # 登录令牌
self.refresh_token = "" # 刷新令牌
self.verification_token = "" # Verification Token
self.captcha_sign = "" # Captcha Sign
self.verification_id = "" # Verification ID
self.package_name = package_name # 客户端包名
self.use_proxy = use_proxy # 是否使用代理
# 代理配置
if use_proxy:
self.proxies = {
"http": proxy_http or "http://127.0.0.1:7890",
"https": proxy_https or "http://127.0.0.1:7890",
}
else:
self.proxies = None
def send_request(self, method, url, headers=None, params=None, json_data=None, data=None, use_proxy=None):
headers = headers or {}
# 如果未指定use_proxy,则使用类的全局设置
use_proxy = self.use_proxy if use_proxy is None else use_proxy
# 确保当use_proxy为True时,有可用的代理配置
if use_proxy and not self.proxies:
# 如果类的use_proxy为True但proxies未设置,使用默认代理
proxies = {
"http": "http://127.0.0.1:7890",
"https": "http://127.0.0.1:7890"
}
else:
proxies = self.proxies if use_proxy else None
try:
response = requests.request(
method=method,
url=url,
headers=headers,
params=params,
json=json_data,
data=data,
proxies=proxies,
timeout=30 # 添加超时设置
)
response.raise_for_status() # 检查HTTP状态码
print(response.text)
try:
return response.json()
except json.JSONDecodeError:
return response.text
except requests.exceptions.RequestException as e:
print(f"请求失败: {url}, 错误: {e}")
if use_proxy:
print(f"当前使用的代理: {proxies}")
# 返回一个空的响应对象
return {}
def gen(self):
url = "https://user.mypikpak.com/pzzl/gen"
params = {"deviceid": self.device_id, "traceid": ""}
headers = {"Host": "user.mypikpak.com", "accept": "application/json, text/plain, */*"}
response = self.send_request("GET", url, headers=headers, params=params)
# 检查响应是否有效
if not response or not isinstance(response, dict) or "pid" not in response or "frames" not in response:
print(f"gen请求返回无效响应: {response}")
return response
def image_verify(self, pid, trace_id, f, n, p, a, c, d):
url = "https://user.mypikpak.com/pzzl/verify"
params = {"pid": pid, "deviceid": self.device_id, "traceid": trace_id, "f": f, "n": n, "p": p, "a": a, "c": c,
"d": d}
headers = {"Host": "user.mypikpak.com", "accept": "application/json, text/plain, */*"}
response = self.send_request("GET", url, headers=headers, params=params)
# 检查响应是否有效
if not response or not isinstance(response, dict) or "result" not in response:
print(f"image_verify请求返回无效响应: {response}")
return {"result": "reject"}
return response
def executor(self):
url = "https://api-drive.mypikpak.com/captcha-jsonp/v2/executor?callback=handleJsonpResult_" + str(int(time.time() * 1000))
headers = {'pragma': 'no-cache', 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'}
try:
# 使用普通 requests 而不是 self.send_request 以获取原始响应
response = requests.get(url, headers=headers, proxies=self.proxies if self.use_proxy else None, timeout=30)
response.raise_for_status() # 检查 HTTP 状态码
content = response.text
print(f"executor 原始响应: {content}")
# 如果内容为空,直接返回空字符串
if not content:
print("executor 响应内容为空")
return ""
# 处理 JSONP 响应格式
if "handleJsonpResult" in content:
# 提取 JSON 部分,JSONP 格式通常是 callback(json数据)
start_index = content.find('(')
end_index = content.rfind(')')
if start_index != -1 and end_index != -1:
json_str = content[start_index + 1:end_index]
# 有时 JSONP 响应中包含反引号,需要去除
if json_str.startswith('`') and json_str.endswith('`'):
json_str = json_str[1:-1]
return json_str
else:
print(f"无法从JSONP响应中提取有效内容: {content}")
return ""
elif isinstance(content, str) and (content.startswith('{') or content.startswith('[')):
# 可能是直接返回的 JSON 字符串
return content
else:
print(f"未知的响应格式: {content}")
return ""
except requests.exceptions.RequestException as e:
print(f"执行 executor 请求失败: {e}")
return ""
except Exception as e:
print(f"解析 executor 响应失败: {e}")
return ""
def report(self, request_id, sign, pid, trace_id):
url = "https://user.mypikpak.com/credit/v1/report"
params = {
"deviceid": self.device_id,
"captcha_token": self.captcha_token,
"request_id": request_id,
"sign": sign,
"type": "pzzlSlider",
"result": 0,
"data": pid,
"traceid": trace_id,
"rtc_token": self.rtc_token
}
headers = {'pragma': 'no-cache', 'priority': 'u=1, i'}
response = self.send_request("GET", url, params=params, headers=headers)
# 检查响应是否有效
if not response or not isinstance(response, dict) or "captcha_token" not in response:
print(f"report请求返回无效响应: {response}")
else:
self.captcha_token = response.get('captcha_token')
return response
def verification(self):
url = 'https://user.mypikpak.com/v1/auth/verification'
params = {"email": self.email, "target": "ANY", "usage": "REGISTER", "locale": "zh-CN",
"client_id": self.client_id}
headers = {'host': 'user.mypikpak.com', 'x-captcha-token': self.captcha_token, 'x-device-id': self.device_id,
"x-client-id": self.client_id}
response = self.send_request("POST", url, headers=headers, data=params)
# 检查响应是否有效
if not response or not isinstance(response, dict) or "verification_id" not in response:
print(f"verification请求返回无效响应: {response}")
else:
self.verification_id = response.get('verification_id')
return response
def verify_post(self, verification_code):
url = "https://user.mypikpak.com/v1/auth/verification/verify"
params = {"client_id": self.client_id}
payload = {"client_id": self.client_id, "verification_id": self.verification_id,
"verification_code": verification_code}
headers = {"X-Device-Id": self.device_id}
response = self.send_request("POST", url, headers=headers, json_data=payload, params=params)
# 检查响应是否有效
if not response or not isinstance(response, dict) or "verification_token" not in response:
print(f"verify_post请求返回无效响应: {response}")
else:
self.verification_token = response.get('verification_token')
return response
def init(self, action):
self.refresh_captcha_sign()
url = "https://user.mypikpak.com/v1/shield/captcha/init"
params = {"client_id": self.client_id}
payload = {
"action": action,
"captcha_token": self.captcha_token,
"client_id": self.client_id,
"device_id": self.device_id,
"meta": {
"captcha_sign": "1." + self.captcha_sign,
"user_id": self.user_id,
"package_name": self.package_name,
"client_version": self.version,
"email": self.email,
"timestamp": self.timestamp
}
}
headers = {"x-device-id": self.device_id}
response = self.send_request("POST", url, headers=headers, json_data=payload, params=params)
# 检查响应是否有效
if not response or not isinstance(response, dict) or "captcha_token" not in response:
print(f"init请求返回无效响应: {response}")
else:
self.captcha_token = response.get('captcha_token')
return response
def signup(self, name, password, verification_code):
url = "https://user.mypikpak.com/v1/auth/signup"
params = {"client_id": self.client_id}
payload = {
"captcha_token": self.captcha_token,
"client_id": self.client_id,
"client_secret": self.client_secret,
"email": self.email,
"name": name,
"password": password,
"verification_code": verification_code,
"verification_token": self.verification_token
}
headers = {"X-Device-Id": self.device_id}
response = self.send_request("POST", url, headers=headers, json_data=payload, params=params)
# 检查响应是否有效
if not response or not isinstance(response, dict):
print(f"signup请求返回无效响应: {response}")
else:
self.access_token = response.get('access_token', '')
self.refresh_token = response.get('refresh_token', '')
self.user_id = response.get('sub', '')
return response
def activation_code(self):
url = "https://api-drive.mypikpak.com/vip/v1/order/activation-code"
payload = {"activation_code": self.invite_code, "data": {}}
headers = {
"Host": "api-drive.mypikpak.com",
"authorization": "Bearer " + self.access_token,
"x-captcha-token": self.captcha_token,
"x-device-id": self.device_id,
'x-system-language': "ko",
'content-type': 'application/json'
}
response = self.send_request("POST", url, headers=headers, json_data=payload)
# 检查响应是否有效
if not response or not isinstance(response, dict):
print(f"activation_code请求返回无效响应: {response}")
return response
def files_task(self, task_link):
url = "https://api-drive.mypikpak.com/drive/v1/files"
payload = {
"kind": "drive#file",
"folder_type": "DOWNLOAD",
"upload_type": "UPLOAD_TYPE_URL",
"url": {"url": task_link},
"params": {"with_thumbnail": "true", "from": "manual"}
}
headers = {
"Authorization": "Bearer " + self.access_token,
"x-device-id": self.device_id,
"x-captcha-token": self.captcha_token,
"Content-Type": "application/json"
}
response = self.send_request("POST", url, headers=headers, json_data=payload)
# 检查响应是否有效
if not response or not isinstance(response, dict):
print(f"files_task请求返回无效响应: {response}")
return response
def refresh_captcha_sign(self):
self.timestamp = str(int(time.time()) * 1000)
encrypt_string = self.client_id + self.version + self.package_name + self.device_id + self.timestamp
self.captcha_sign = captcha_sign_encrypt(encrypt_string, self.algorithms)
def save_account_info(name, account_info):
# 保证account目录存在
if not os.path.exists("./account"):
os.makedirs("./account")
with open("./account/" + name + ".json", "w", encoding="utf-8") as f:
json.dump(account_info, f, ensure_ascii=False, indent=4)
def test_proxy(proxy_url):
"""测试代理连接是否可用"""
test_url = "https://mypikpak.com" # 改为 PikPak 的网站,更可能连通
proxies = {
"http": proxy_url,
"https": proxy_url
}
try:
response = requests.get(test_url, proxies=proxies, timeout=10) # 增加超时时间
response.raise_for_status()
print(f"代理连接测试成功: {proxy_url}")
return True
except Exception as e:
print(f"代理连接测试失败: {proxy_url}, 错误: {e}")
return False
# 程序运行主函数
def main():
try:
# 1、初始化参数
current_version = ramdom_version()
version = current_version['v']
algorithms = current_version['algorithms']
client_id = "YNxT9w7GMdWvEOKa"
client_secret = "dbw2OtmVEeuUvIptb1Coyg"
package_name = "com.pikcloud.pikpak"
device_id = str(uuid.uuid4()).replace("-", "")
rtc_token = random_rtc_token()
print(f"当前版本:{version} 设备号:{device_id} 令牌:{rtc_token}")
# 询问用户是否使用代理
use_proxy_input = input('是否启用代理(y/n):').strip().lower()
use_proxy = use_proxy_input == 'y' or use_proxy_input == 'yes'
proxy_http = None
proxy_https = None
if use_proxy:
# 询问用户是否使用默认代理
default_proxy = input('是否使用默认代理地址 http://127.0.0.1:7890 (y/n):').strip().lower()
if default_proxy == 'y' or default_proxy == 'yes':
proxy_url = "http://127.0.0.1:7890"
print("已启用代理,使用默认地址:", proxy_url)
# 测试默认代理连接
if not test_proxy(proxy_url):
retry = input("默认代理连接测试失败,是否继续使用(y/n):").strip().lower()
if retry != 'y' and retry != 'yes':
print("已取消代理设置,将直接连接")
use_proxy = False
proxy_url = None
if use_proxy:
proxy_http = proxy_url
proxy_https = proxy_url
else:
# 用户自定义代理地址和端口
proxy_host = input('请输入代理主机地址 (默认127.0.0.1): ').strip()
proxy_host = proxy_host if proxy_host else '127.0.0.1'
proxy_port = input('请输入代理端口 (默认7890): ').strip()
proxy_port = proxy_port if proxy_port else '7890'
proxy_protocol = input('请输入代理协议 (http/https/socks5,默认http): ').strip().lower()
proxy_protocol = proxy_protocol if proxy_protocol in ['http', 'https', 'socks5'] else 'http'
proxy_url = f"{proxy_protocol}://{proxy_host}:{proxy_port}"
print(f"已设置代理地址: {proxy_url}")
# 测试自定义代理连接
if not test_proxy(proxy_url):
retry = input("自定义代理连接测试失败,是否继续使用(y/n):").strip().lower()
if retry != 'y' and retry != 'yes':
print("已取消代理设置,将直接连接")
use_proxy = False
proxy_url = None
if use_proxy:
proxy_http = proxy_url
proxy_https = proxy_url
else:
print("未启用代理,直接连接")
invite_code = input('请输入你的邀请码:')
email = input("请输入注册用的邮箱:")
# 2、实例化PikPak类,传入代理设置
pikpak = PikPak(invite_code, client_id, device_id, version, algorithms, email, rtc_token, client_secret,
package_name, use_proxy=use_proxy, proxy_http=proxy_http, proxy_https=proxy_https)
# 3、刷新timestamp,加密sign值。
init_result = pikpak.init("POST:/v1/auth/verification")
if not init_result or not isinstance(init_result, dict) or "captcha_token" not in init_result:
print("初始化失败,请检查网络连接或代理设置")
input("按任意键退出程序")
return
# 4、图片滑块分析
max_attempts = 5 # 最大尝试次数
captcha_result = None
for attempt in range(max_attempts):
print(f"尝试滑块验证 ({attempt+1}/{max_attempts})...")
try:
captcha_result = captcha_image_parse(pikpak, device_id)
print(captcha_result)
if captcha_result and "response_data" in captcha_result and captcha_result['response_data'].get('result') == 'accept':
print("滑块验证成功!")
break
else:
print('滑块验证失败, 正在重新尝试...')
time.sleep(2) # 延迟2秒再次尝试
except Exception as e:
print(f"滑块验证过程出错: {e}")
import traceback
traceback.print_exc()
time.sleep(2) # 出错后延迟2秒再次尝试
if not captcha_result or "response_data" not in captcha_result or captcha_result['response_data'].get('result') != 'accept':
print("滑块验证失败,达到最大尝试次数")
input("按任意键退出程序")
return
# 5、滑块验证加密
try:
executor_info = pikpak.executor()
if not executor_info:
print("获取executor信息失败")
input("按任意键退出程序")
return
sign_encrypt_info = sign_encrypt(executor_info, pikpak.captcha_token, rtc_token, pikpak.use_proxy, pikpak.proxies)
if not sign_encrypt_info or "request_id" not in sign_encrypt_info or "sign" not in sign_encrypt_info:
print("签名加密失败")
print(f"executor_info: {executor_info}")
print(f"captcha_token: {pikpak.captcha_token}")
print(f"rtc_token: {rtc_token}")
input("按任意键退出程序")
return
# 更新 captcha_token
pikpak.report(sign_encrypt_info['request_id'], sign_encrypt_info['sign'], captcha_result['pid'],
captcha_result['traceid'])
# 发送邮箱验证码
verification_result = pikpak.verification()
if not verification_result or not isinstance(verification_result, dict) or "verification_id" not in verification_result:
print("请求验证码失败")
input("按任意键退出程序")
return
except Exception as e:
print(f"验证过程出错: {e}")
import traceback
traceback.print_exc()
input("按任意键退出程序")
return
# 6、提交验证码
verification_code = input("请输入接收到的验证码:")
pikpak.verify_post(verification_code)
# 7、刷新timestamp,加密sign值
pikpak.init("POST:/v1/auth/signup")
# 8、注册登录
name = email.split("@")[0]
password = "zhiyuan233"
pikpak.signup(name, password, verification_code)
# 9、填写邀请码
pikpak.activation_code()
# 准备账号信息
account_info = {
"version": pikpak.version,
"device_id": pikpak.device_id,
"email": pikpak.email,
"captcha_token": pikpak.captcha_token,
"access_token": pikpak.access_token,
"refresh_token": pikpak.refresh_token,
"user_id": pikpak.user_id,
"timestamp": pikpak.timestamp,
"password": password,
"name": name
}
print("请保存好账号信息备用:", json.dumps(account_info, indent=4, ensure_ascii=False))
# 确认是否保存账号信息
save_info = input("是否保存账号信息到文件(y/n):").strip().lower()
if save_info == 'y' or save_info == 'yes':
try:
# 创建account目录(如果不存在)
if not os.path.exists("./account"):
os.makedirs("./account")
save_account_info(name, account_info)
print(f"账号信息已保存到 ./account/{name}.json")
except Exception as e:
print(f"保存账号信息失败: {e}")
input("运行完成,回车结束程序:")
except KeyboardInterrupt:
print("\n程序被用户中断")
except Exception as e:
print(f"程序运行出错: {e}")
import traceback
traceback.print_exc()
input("按任意键退出程序")
if __name__ == "__main__":
print("开发者声明:免费转载需标注出处:B站-纸鸢花的花语,此工具仅供交流学习和技术分析,严禁用于任何商业牟利行为。(包括但不限于倒卖、二改倒卖、引流、冒充作者、广告植入...)")
main()