deepak191z's picture
Upload 9 files
53e18b4 verified
raw
history blame
11.4 kB
from curl_cffi import requests
from typing import Optional, Dict, Any, Generator, Literal
import json
from .pow import DeepSeekPOW
import pkg_resources
import sys
from pathlib import Path
import subprocess
import time
ThinkingMode = Literal['detailed', 'simple', 'disabled']
SearchMode = Literal['enabled', 'disabled']
class DeepSeekError(Exception):
"""Base exception for all DeepSeek API errors"""
pass
class AuthenticationError(DeepSeekError):
"""Raised when authentication fails"""
pass
class RateLimitError(DeepSeekError):
"""Raised when API rate limit is exceeded"""
pass
class NetworkError(DeepSeekError):
"""Raised when network communication fails"""
pass
class CloudflareError(DeepSeekError):
"""Raised when Cloudflare blocks the request"""
pass
class APIError(DeepSeekError):
"""Raised when API returns an error response"""
def __init__(self, message: str, status_code: Optional[int] = None):
super().__init__(message)
self.status_code = status_code
class DeepSeekAPI:
BASE_URL = "https://chat.deepseek.com/api/v0"
def __init__(self, auth_token: str):
if not auth_token or not isinstance(auth_token, str):
raise AuthenticationError("Invalid auth token provided")
try:
curl_cffi_version = pkg_resources.get_distribution('curl-cffi').version
if curl_cffi_version != '0.8.1b9':
print("\033[93mWarning: DeepSeek API requires curl-cffi version 0.8.1b9", file=sys.stderr)
print("Please install the correct version using: pip install curl-cffi==0.8.1b9\033[0m", file=sys.stderr)
except pkg_resources.DistributionNotFound:
print("\033[93mWarning: curl-cffi not found. Please install version 0.8.1b9:", file=sys.stderr)
print("pip install curl-cffi==0.8.1b9\033[0m", file=sys.stderr)
self.auth_token = auth_token
self.pow_solver = DeepSeekPOW()
# Load cookies from JSON file
cookies_path = Path(__file__).parent / 'cookies.json'
try:
with open(cookies_path, 'r') as f:
cookie_data = json.load(f)
self.cookies = cookie_data.get('cookies', {})
except (FileNotFoundError, json.JSONDecodeError) as e:
print(f"\033[93mWarning: Could not load cookies from {cookies_path}: {e}\033[0m", file=sys.stderr)
self.cookies = {}
def _get_headers(self, pow_response: Optional[str] = None) -> Dict[str, str]:
headers = {
'accept': '*/*',
'accept-language': 'en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3',
'authorization': f'Bearer {self.auth_token}',
'content-type': 'application/json',
'origin': 'https://chat.deepseek.com',
'referer': 'https://chat.deepseek.com/',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36',
'x-app-version': '20241129.1',
'x-client-locale': 'en_US',
'x-client-platform': 'web',
'x-client-version': '1.0.0-always',
}
if pow_response:
headers['x-ds-pow-response'] = pow_response
return headers
def _refresh_cookies(self) -> None:
"""Run the cookie refresh script and reload cookies"""
try:
# Get path to bypass.py
script_path = Path(__file__).parent / 'bypass.py'
# Run the script
subprocess.run([sys.executable, script_path], check=True)
# Wait briefly for cookies file to be written
time.sleep(2)
# Reload cookies
cookies_path = Path(__file__).parent / 'cookies.json'
with open(cookies_path, 'r') as f:
cookie_data = json.load(f)
self.cookies = cookie_data.get('cookies', {})
except Exception as e:
print(f"\033[93mWarning: Failed to refresh cookies: {e}\033[0m", file=sys.stderr)
def _make_request(self, method: str, endpoint: str, json_data: Dict[str, Any], pow_required: bool = False) -> Any:
url = f"{self.BASE_URL}{endpoint}"
retry_count = 0
max_retries = 2
while retry_count < max_retries:
try:
headers = self._get_headers()
if pow_required:
challenge = self._get_pow_challenge()
pow_response = self.pow_solver.solve_challenge(challenge)
headers = self._get_headers(pow_response)
response = requests.request(
method=method,
url=url,
headers=headers,
json=json_data,
cookies=self.cookies,
impersonate='chrome120',
timeout=None
)
# Check if we hit Cloudflare protection
if "<!DOCTYPE html>" in response.text and "Just a moment" in response.text:
print("\033[93mWarning: Cloudflare protection detected. Bypassing...\033[0m", file=sys.stderr)
if retry_count < max_retries - 1:
self._refresh_cookies() # Refresh cookies
retry_count += 1
continue
# Handle other response codes
if response.status_code == 401:
raise AuthenticationError("Invalid or expired authentication token")
elif response.status_code == 429:
raise RateLimitError("API rate limit exceeded")
elif response.status_code >= 500:
raise APIError(f"Server error occurred: {response.text}", response.status_code)
elif response.status_code != 200:
raise APIError(f"API request failed: {response.text}", response.status_code)
return response.json()
except requests.exceptions.RequestException as e:
raise NetworkError(f"Network error occurred: {str(e)}")
except json.JSONDecodeError:
raise APIError("Invalid JSON response from server")
raise APIError("Failed to bypass Cloudflare protection after multiple attempts")
def _get_pow_challenge(self) -> Dict[str, Any]:
try:
response = self._make_request(
'POST',
'/chat/create_pow_challenge',
{'target_path': '/api/v0/chat/completion'}
)
return response['data']['biz_data']['challenge']
except KeyError:
raise APIError("Invalid challenge response format from server")
def create_chat_session(self) -> str:
"""Creates a new chat session and returns the session ID"""
try:
response = self._make_request(
'POST',
'/chat_session/create',
{'character_id': None}
)
return response['data']['biz_data']['id']
except KeyError:
raise APIError("Invalid session creation response format from server")
def chat_completion(self,
chat_session_id: str,
prompt: str,
parent_message_id: Optional[str] = None,
thinking_enabled: bool = True,
search_enabled: bool = False) -> Generator[Dict[str, Any], None, None]:
"""
Send a message and get streaming response
Args:
chat_session_id (str): The ID of the chat session
prompt (str): The message to send
parent_message_id (Optional[str]): ID of the parent message for threading
thinking_enabled (bool): Whether to show the thinking process
search_enabled (bool): Whether to enable web search for up-to-date information
Returns:
Generator[Dict[str, Any], None, None]: Yields message chunks with content and type
Raises:
AuthenticationError: If the authentication token is invalid
RateLimitError: If the API rate limit is exceeded
NetworkError: If a network error occurs
APIError: If any other API error occurs
"""
if not prompt or not isinstance(prompt, str):
raise ValueError("Prompt must be a non-empty string")
if not chat_session_id or not isinstance(chat_session_id, str):
raise ValueError("Chat session ID must be a non-empty string")
json_data = {
'chat_session_id': chat_session_id,
'parent_message_id': parent_message_id,
'prompt': prompt,
'ref_file_ids': [],
'thinking_enabled': thinking_enabled,
'search_enabled': search_enabled,
}
try:
headers = self._get_headers(
pow_response=self.pow_solver.solve_challenge(
self._get_pow_challenge()
)
)
response = requests.post(
f"{self.BASE_URL}/chat/completion",
headers=headers,
json=json_data,
cookies=self.cookies, # Add cookies
impersonate='chrome120',
stream=True,
timeout=None
)
if response.status_code != 200:
error_text = next(response.iter_lines(), b'').decode('utf-8', 'ignore')
if response.status_code == 401:
raise AuthenticationError("Invalid or expired authentication token")
elif response.status_code == 429:
raise RateLimitError("API rate limit exceeded")
else:
raise APIError(f"API request failed: {error_text}", response.status_code)
for chunk in response.iter_lines():
try:
parsed = self._parse_chunk(chunk)
if parsed:
yield parsed
if parsed.get('finish_reason') == 'stop':
break
except Exception as e:
raise APIError(f"Error parsing response chunk: {str(e)}")
except requests.exceptions.RequestException as e:
raise NetworkError(f"Network error occurred during streaming: {str(e)}")
def _parse_chunk(self, chunk: bytes) -> Optional[Dict[str, Any]]:
"""Parse a SSE chunk from the API response"""
if not chunk:
return None
try:
if chunk.startswith(b'data: '):
data = json.loads(chunk[6:])
if 'choices' in data and data['choices']:
choice = data['choices'][0]
if 'delta' in choice:
delta = choice['delta']
return {
'content': delta.get('content', ''),
'type': delta.get('type', ''),
'finish_reason': choice.get('finish_reason')
}
except json.JSONDecodeError:
raise APIError("Invalid JSON in response chunk")
except Exception as e:
raise APIError(f"Error parsing chunk: {str(e)}")
return None