from curl_cffi import requests from typing import Optional, Dict, Any, Generator, Literal import json from .pow import DeepSeekPOW import pkg_resources import sys from pathlib import Path import subprocess import time ThinkingMode = Literal['detailed', 'simple', 'disabled'] SearchMode = Literal['enabled', 'disabled'] class DeepSeekError(Exception): """Base exception for all DeepSeek API errors""" pass class AuthenticationError(DeepSeekError): """Raised when authentication fails""" pass class RateLimitError(DeepSeekError): """Raised when API rate limit is exceeded""" pass class NetworkError(DeepSeekError): """Raised when network communication fails""" pass class CloudflareError(DeepSeekError): """Raised when Cloudflare blocks the request""" pass class APIError(DeepSeekError): """Raised when API returns an error response""" def __init__(self, message: str, status_code: Optional[int] = None): super().__init__(message) self.status_code = status_code class DeepSeekAPI: BASE_URL = "https://chat.deepseek.com/api/v0" def __init__(self, auth_token: str): if not auth_token or not isinstance(auth_token, str): raise AuthenticationError("Invalid auth token provided") try: curl_cffi_version = pkg_resources.get_distribution('curl-cffi').version if curl_cffi_version != '0.8.1b9': print("\033[93mWarning: DeepSeek API requires curl-cffi version 0.8.1b9", file=sys.stderr) print("Please install the correct version using: pip install curl-cffi==0.8.1b9\033[0m", file=sys.stderr) except pkg_resources.DistributionNotFound: print("\033[93mWarning: curl-cffi not found. Please install version 0.8.1b9:", file=sys.stderr) print("pip install curl-cffi==0.8.1b9\033[0m", file=sys.stderr) self.auth_token = auth_token self.pow_solver = DeepSeekPOW() # Load cookies from JSON file cookies_path = Path(__file__).parent / 'cookies.json' try: with open(cookies_path, 'r') as f: cookie_data = json.load(f) self.cookies = cookie_data.get('cookies', {}) except (FileNotFoundError, json.JSONDecodeError) as e: print(f"\033[93mWarning: Could not load cookies from {cookies_path}: {e}\033[0m", file=sys.stderr) self.cookies = {} def _get_headers(self, pow_response: Optional[str] = None) -> Dict[str, str]: headers = { 'accept': '*/*', 'accept-language': 'en,fr-FR;q=0.9,fr;q=0.8,es-ES;q=0.7,es;q=0.6,en-US;q=0.5,am;q=0.4,de;q=0.3', 'authorization': f'Bearer {self.auth_token}', 'content-type': 'application/json', 'origin': 'https://chat.deepseek.com', 'referer': 'https://chat.deepseek.com/', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36', 'x-app-version': '20241129.1', 'x-client-locale': 'en_US', 'x-client-platform': 'web', 'x-client-version': '1.0.0-always', } if pow_response: headers['x-ds-pow-response'] = pow_response return headers def _refresh_cookies(self) -> None: """Run the cookie refresh script and reload cookies""" try: # Get path to bypass.py script_path = Path(__file__).parent / 'bypass.py' # Run the script subprocess.run([sys.executable, script_path], check=True) # Wait briefly for cookies file to be written time.sleep(2) # Reload cookies cookies_path = Path(__file__).parent / 'cookies.json' with open(cookies_path, 'r') as f: cookie_data = json.load(f) self.cookies = cookie_data.get('cookies', {}) except Exception as e: print(f"\033[93mWarning: Failed to refresh cookies: {e}\033[0m", file=sys.stderr) def _make_request(self, method: str, endpoint: str, json_data: Dict[str, Any], pow_required: bool = False) -> Any: url = f"{self.BASE_URL}{endpoint}" retry_count = 0 max_retries = 2 while retry_count < max_retries: try: headers = self._get_headers() if pow_required: challenge = self._get_pow_challenge() pow_response = self.pow_solver.solve_challenge(challenge) headers = self._get_headers(pow_response) response = requests.request( method=method, url=url, headers=headers, json=json_data, cookies=self.cookies, impersonate='chrome120', timeout=None ) # Check if we hit Cloudflare protection if "" in response.text and "Just a moment" in response.text: print("\033[93mWarning: Cloudflare protection detected. Bypassing...\033[0m", file=sys.stderr) if retry_count < max_retries - 1: self._refresh_cookies() # Refresh cookies retry_count += 1 continue # Handle other response codes if response.status_code == 401: raise AuthenticationError("Invalid or expired authentication token") elif response.status_code == 429: raise RateLimitError("API rate limit exceeded") elif response.status_code >= 500: raise APIError(f"Server error occurred: {response.text}", response.status_code) elif response.status_code != 200: raise APIError(f"API request failed: {response.text}", response.status_code) return response.json() except requests.exceptions.RequestException as e: raise NetworkError(f"Network error occurred: {str(e)}") except json.JSONDecodeError: raise APIError("Invalid JSON response from server") raise APIError("Failed to bypass Cloudflare protection after multiple attempts") def _get_pow_challenge(self) -> Dict[str, Any]: try: response = self._make_request( 'POST', '/chat/create_pow_challenge', {'target_path': '/api/v0/chat/completion'} ) return response['data']['biz_data']['challenge'] except KeyError: raise APIError("Invalid challenge response format from server") def create_chat_session(self) -> str: """Creates a new chat session and returns the session ID""" try: response = self._make_request( 'POST', '/chat_session/create', {'character_id': None} ) return response['data']['biz_data']['id'] except KeyError: raise APIError("Invalid session creation response format from server") def chat_completion(self, chat_session_id: str, prompt: str, parent_message_id: Optional[str] = None, thinking_enabled: bool = True, search_enabled: bool = False) -> Generator[Dict[str, Any], None, None]: """ Send a message and get streaming response Args: chat_session_id (str): The ID of the chat session prompt (str): The message to send parent_message_id (Optional[str]): ID of the parent message for threading thinking_enabled (bool): Whether to show the thinking process search_enabled (bool): Whether to enable web search for up-to-date information Returns: Generator[Dict[str, Any], None, None]: Yields message chunks with content and type Raises: AuthenticationError: If the authentication token is invalid RateLimitError: If the API rate limit is exceeded NetworkError: If a network error occurs APIError: If any other API error occurs """ if not prompt or not isinstance(prompt, str): raise ValueError("Prompt must be a non-empty string") if not chat_session_id or not isinstance(chat_session_id, str): raise ValueError("Chat session ID must be a non-empty string") json_data = { 'chat_session_id': chat_session_id, 'parent_message_id': parent_message_id, 'prompt': prompt, 'ref_file_ids': [], 'thinking_enabled': thinking_enabled, 'search_enabled': search_enabled, } try: headers = self._get_headers( pow_response=self.pow_solver.solve_challenge( self._get_pow_challenge() ) ) response = requests.post( f"{self.BASE_URL}/chat/completion", headers=headers, json=json_data, cookies=self.cookies, # Add cookies impersonate='chrome120', stream=True, timeout=None ) if response.status_code != 200: error_text = next(response.iter_lines(), b'').decode('utf-8', 'ignore') if response.status_code == 401: raise AuthenticationError("Invalid or expired authentication token") elif response.status_code == 429: raise RateLimitError("API rate limit exceeded") else: raise APIError(f"API request failed: {error_text}", response.status_code) for chunk in response.iter_lines(): try: parsed = self._parse_chunk(chunk) if parsed: yield parsed if parsed.get('finish_reason') == 'stop': break except Exception as e: raise APIError(f"Error parsing response chunk: {str(e)}") except requests.exceptions.RequestException as e: raise NetworkError(f"Network error occurred during streaming: {str(e)}") def _parse_chunk(self, chunk: bytes) -> Optional[Dict[str, Any]]: """Parse a SSE chunk from the API response""" if not chunk: return None try: if chunk.startswith(b'data: '): data = json.loads(chunk[6:]) if 'choices' in data and data['choices']: choice = data['choices'][0] if 'delta' in choice: delta = choice['delta'] return { 'content': delta.get('content', ''), 'type': delta.get('type', ''), 'finish_reason': choice.get('finish_reason') } except json.JSONDecodeError: raise APIError("Invalid JSON in response chunk") except Exception as e: raise APIError(f"Error parsing chunk: {str(e)}") return None