Spaces:
Sleeping
Sleeping
| from typing import Optional, Any, Mapping, Iterator, AsyncIterator, Union, Dict, TypedDict | |
| try: | |
| from enum import StrEnum | |
| except ImportError: | |
| from strenum import StrEnum | |
| try: | |
| from http import HTTPMethod | |
| except ImportError: | |
| class HTTPMethod(StrEnum): | |
| GET = "GET" | |
| POST = "POST" | |
| PUT = "PUT" | |
| DELETE = "DELETE" | |
| import httpx | |
| # noinspection PyProtectedMember | |
| import httpx._types as types | |
| from pydantic import BaseModel | |
| from dify_client_python.dify_client import errors, models | |
| _httpx_client = httpx.Client() | |
| _async_httpx_client = httpx.AsyncClient() | |
| IGNORED_STREAM_EVENTS = (models.StreamEvent.PING.value,) | |
| # feedback | |
| ENDPOINT_FEEDBACKS = "/messages/{message_id}/feedbacks" | |
| # suggest | |
| ENDPOINT_SUGGESTED = "/messages/{message_id}/suggested" | |
| # files upload | |
| ENDPOINT_FILES_UPLOAD = "/files/upload" | |
| # completion | |
| ENDPOINT_COMPLETION_MESSAGES = "/completion-messages" | |
| ENDPOINT_STOP_COMPLETION_MESSAGES = "/completion-messages/{task_id}/stop" | |
| # chat | |
| ENDPOINT_CHAT_MESSAGES = "/chat-messages" | |
| ENDPOINT_STOP_CHAT_MESSAGES = "/chat-messages/{task_id}/stop" | |
| # workflow | |
| ENDPOINT_RUN_WORKFLOWS = "/workflows/run" | |
| ENDPOINT_STOP_WORKFLOWS = "/workflows/{task_id}/stop" | |
| # audio <-> text | |
| ENDPOINT_TEXT_TO_AUDIO = "/text-to-audio" | |
| ENDPOINT_AUDIO_TO_TEXT = "/audio-to-text" | |
| class ServerSentEvent(TypedDict): | |
| event: Optional[str] | |
| data: str | |
| id: Optional[str] | |
| retry: Optional[int] | |
| class Client(BaseModel): | |
| api_key: str | |
| api_base: Optional[str] = "https://api.dify.ai/v1" | |
| def request(self, endpoint: str, method: str, | |
| content: Optional[types.RequestContent] = None, | |
| data: Optional[types.RequestData] = None, | |
| files: Optional[types.RequestFiles] = None, | |
| json: Optional[Any] = None, | |
| params: Optional[types.QueryParamTypes] = None, | |
| headers: Optional[Mapping[str, str]] = None, | |
| **kwargs: object, | |
| ) -> httpx.Response: | |
| """ | |
| Sends a synchronous HTTP request to the specified endpoint. | |
| Args: | |
| endpoint: The API endpoint to send the request to. | |
| method: The HTTP method to use (e.g., 'GET', 'POST'). | |
| content: Raw content to include in the request body. | |
| data: Form data to include in the request body. | |
| files: Files to include in the request body. | |
| json: JSON data to include in the request body. | |
| params: Query parameters to include in the request URL. | |
| headers: Additional headers to include in the request. | |
| **kwargs: Extra keyword arguments to pass to the request function. | |
| Returns: | |
| A `httpx.Response` object containing the HTTP response. | |
| Raises: | |
| Various DifyAPIError exceptions if the response contains an error. | |
| """ | |
| merged_headers = {} | |
| if headers: | |
| merged_headers.update(headers) | |
| self._prepare_auth_headers(merged_headers) | |
| response = _httpx_client.request(method, endpoint, content=content, data=data, files=files, json=json, | |
| params=params, headers=merged_headers, **kwargs) | |
| errors.raise_for_status(response) | |
| return response | |
| def request_stream(self, endpoint: str, method: str, | |
| content: Optional[types.RequestContent] = None, | |
| data: Optional[types.RequestData] = None, | |
| files: Optional[types.RequestFiles] = None, | |
| json: Optional[Any] = None, | |
| params: Optional[types.QueryParamTypes] = None, | |
| headers: Optional[Mapping[str, str]] = None, | |
| **kwargs, | |
| ) -> Iterator[ServerSentEvent]: | |
| """ | |
| Opens a server-sent events (SSE) stream to the specified endpoint. | |
| Args: | |
| endpoint: The API endpoint to send the request to. | |
| method: The HTTP method to use (e.g., 'GET', 'POST'). | |
| content: Raw content to include in the request body. | |
| data: Form data to include in the request body. | |
| files: Files to include in the request body. | |
| json: JSON data to include in the request body. | |
| params: Query parameters to include in the request URL. | |
| headers: Additional headers to include in the request. | |
| **kwargs: Extra keyword arguments to pass to the request function. | |
| Returns: | |
| An iterator of `ServerSentEvent` objects representing the stream of events. | |
| Raises: | |
| Various DifyAPIError exceptions if an error event is received in the stream. | |
| """ | |
| merged_headers = { | |
| 'Accept': 'text/event-stream', | |
| 'Cache-Control': 'no-cache', | |
| 'Connection': 'keep-alive' | |
| } | |
| if headers: | |
| merged_headers.update(headers) | |
| self._prepare_auth_headers(merged_headers) | |
| response = _httpx_client.stream( | |
| method, | |
| endpoint, | |
| headers=merged_headers, | |
| content=content, | |
| data=data, | |
| files=files, | |
| json=json, | |
| params=params, | |
| **kwargs | |
| ) | |
| with response as event_source: | |
| if not _check_stream_content_type(event_source): | |
| event_source.read() | |
| errors.raise_for_status(event_source) | |
| for line in event_source.iter_lines(): | |
| if not line: | |
| continue | |
| if line.startswith(b'data: '): | |
| data = line[6:].decode('utf-8') | |
| try: | |
| json_data = json.loads(data) | |
| event = { | |
| 'event': json_data.get('event'), | |
| 'data': data, | |
| 'id': None, | |
| 'retry': None | |
| } | |
| if event['event'] in IGNORED_STREAM_EVENTS: | |
| continue | |
| yield event | |
| except json.JSONDecodeError: | |
| continue | |
| def feedback_messages(self, message_id: str, req: models.FeedbackRequest, **kwargs) -> models.FeedbackResponse: | |
| """ | |
| Submits feedback for a specific message. | |
| Args: | |
| message_id: The identifier of the message to submit feedback for. | |
| req: A `FeedbackRequest` object containing the feedback details, such as the rating. | |
| **kwargs: Extra keyword arguments to pass to the request function. | |
| Returns: | |
| A `FeedbackResponse` object containing the result of the feedback submission. | |
| """ | |
| response = self.request( | |
| self._prepare_url(ENDPOINT_FEEDBACKS, message_id=message_id), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| return models.FeedbackResponse(**response.json()) | |
| def suggest_messages(self, message_id: str, req: models.ChatSuggestRequest, **kwargs) -> models.ChatSuggestResponse: | |
| """ | |
| Retrieves suggested messages based on a specific message. | |
| Args: | |
| message_id: The identifier of the message to get suggestions for. | |
| req: A `ChatSuggestRequest` object containing the request details. | |
| **kwargs: Extra keyword arguments to pass to the request function. | |
| Returns: | |
| A `ChatSuggestResponse` object containing suggested messages. | |
| """ | |
| response = self.request( | |
| self._prepare_url(ENDPOINT_SUGGESTED, message_id=message_id), | |
| HTTPMethod.GET, | |
| params=req.model_dump(), | |
| **kwargs, | |
| ) | |
| return models.ChatSuggestResponse(**response.json()) | |
| def upload_files(self, file: types.FileTypes, req: models.UploadFileRequest, | |
| **kwargs) -> models.UploadFileResponse: | |
| """ | |
| Uploads a file to be used in subsequent requests. | |
| Args: | |
| file: The file to upload. This can be a file-like object, or a tuple of | |
| (`filename`, file-like object, mime_type). | |
| req: An `UploadFileRequest` object containing the upload details, such as the user who is uploading. | |
| **kwargs: Extra keyword arguments to pass to the request function. | |
| Returns: | |
| An `UploadFileResponse` object containing details about the uploaded file, such as its identifier and URL. | |
| """ | |
| response = self.request( | |
| self._prepare_url(ENDPOINT_FILES_UPLOAD), | |
| HTTPMethod.POST, | |
| data=req.model_dump(), | |
| files=[("file", file)], | |
| **kwargs, | |
| ) | |
| return models.UploadFileResponse(**response.json()) | |
| def completion_messages(self, req: models.CompletionRequest, **kwargs) \ | |
| -> Union[models.CompletionResponse, Iterator[models.CompletionStreamResponse]]: | |
| """ | |
| Sends a request to generate a completion or a series of completions based on the provided input. | |
| Returns: | |
| If the response mode is blocking, it returns a `CompletionResponse` object containing the generated message. | |
| If the response mode is streaming, it returns an iterator of `CompletionStreamResponse` objects containing | |
| the stream of generated events. | |
| """ | |
| if req.response_mode == models.ResponseMode.BLOCKING: | |
| return self._completion_messages(req, **kwargs) | |
| if req.response_mode == models.ResponseMode.STREAMING: | |
| return self._completion_messages_stream(req, **kwargs) | |
| raise ValueError(f"Invalid request_mode: {req.response_mode}") | |
| def _completion_messages(self, req: models.CompletionRequest, **kwargs) -> models.CompletionResponse: | |
| response = self.request( | |
| self._prepare_url(ENDPOINT_COMPLETION_MESSAGES), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| return models.CompletionResponse(**response.json()) | |
| def _completion_messages_stream(self, req: models.CompletionRequest, **kwargs) \ | |
| -> Iterator[models.CompletionStreamResponse]: | |
| event_source = self.request_stream( | |
| self._prepare_url(ENDPOINT_COMPLETION_MESSAGES), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| for sse in event_source: | |
| yield models.build_completion_stream_response(sse.json()) | |
| def stop_completion_messages(self, task_id: str, req: models.StopRequest, **kwargs) -> models.StopResponse: | |
| """ | |
| Sends a request to stop a streaming completion task. | |
| Returns: | |
| A `StopResponse` object indicating the success of the operation. | |
| """ | |
| return self._stop_stream(self._prepare_url(ENDPOINT_STOP_COMPLETION_MESSAGES, task_id=task_id), req, **kwargs) | |
| def chat_messages(self, req: models.ChatRequest, **kwargs) \ | |
| -> Union[models.ChatResponse, Iterator[models.ChatStreamResponse]]: | |
| """ | |
| Sends a request to generate a chat message or a series of chat messages based on the provided input. | |
| Returns: | |
| If the response mode is blocking, it returns a `ChatResponse` object containing the generated chat message. | |
| If the response mode is streaming, it returns an iterator of `ChatStreamResponse` objects containing the | |
| stream of chat events. | |
| """ | |
| if req.response_mode == models.ResponseMode.BLOCKING: | |
| return self._chat_messages(req, **kwargs) | |
| if req.response_mode == models.ResponseMode.STREAMING: | |
| return self._chat_messages_stream(req, **kwargs) | |
| raise ValueError(f"Invalid request_mode: {req.response_mode}") | |
| def _chat_messages(self, req: models.ChatRequest, **kwargs) -> models.ChatResponse: | |
| response = self.request( | |
| self._prepare_url(ENDPOINT_CHAT_MESSAGES), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| return models.ChatResponse(**response.json()) | |
| def _chat_messages_stream(self, req: models.ChatRequest, **kwargs) -> Iterator[models.ChatStreamResponse]: | |
| event_source = self.request_stream( | |
| self._prepare_url(ENDPOINT_CHAT_MESSAGES), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| for sse in event_source: | |
| yield models.build_chat_stream_response(sse.json()) | |
| def stop_chat_messages(self, task_id: str, req: models.StopRequest, **kwargs) -> models.StopResponse: | |
| """ | |
| Sends a request to stop a streaming chat task. | |
| Returns: | |
| A `StopResponse` object indicating the success of the operation. | |
| """ | |
| return self._stop_stream(self._prepare_url(ENDPOINT_STOP_CHAT_MESSAGES, task_id=task_id), req, **kwargs) | |
| def run_workflows(self, req: models.WorkflowsRunRequest, **kwargs) \ | |
| -> Union[models.WorkflowsRunResponse, Iterator[models.WorkflowsRunStreamResponse]]: | |
| """ | |
| Initiates the execution of a workflow, which can consist of multiple steps and actions. | |
| Returns: | |
| If the response mode is blocking, it returns a `WorkflowsRunResponse` object containing the results of the | |
| completed workflow. | |
| If the response mode is streaming, it returns an iterator of `WorkflowsRunStreamResponse` objects | |
| containing the stream of workflow events. | |
| """ | |
| if req.response_mode == models.ResponseMode.BLOCKING: | |
| return self._run_workflows(req, **kwargs) | |
| if req.response_mode == models.ResponseMode.STREAMING: | |
| return self._run_workflows_stream(req, **kwargs) | |
| raise ValueError(f"Invalid request_mode: {req.response_mode}") | |
| def _run_workflows(self, req: models.WorkflowsRunRequest, **kwargs) -> models.WorkflowsRunResponse: | |
| response = self.request( | |
| self._prepare_url(ENDPOINT_RUN_WORKFLOWS), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| return models.WorkflowsRunResponse(**response.json()) | |
| def _run_workflows_stream(self, req: models.WorkflowsRunRequest, **kwargs) \ | |
| -> Iterator[models.WorkflowsRunStreamResponse]: | |
| event_source = self.request_stream( | |
| self._prepare_url(ENDPOINT_RUN_WORKFLOWS), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| for sse in event_source: | |
| yield models.build_workflows_stream_response(sse.json()) | |
| def stop_workflows(self, task_id: str, req: models.StopRequest, **kwargs) -> models.StopResponse: | |
| """ | |
| Sends a request to stop a streaming workflow task. | |
| Returns: | |
| A `StopResponse` object indicating the success of the operation. | |
| """ | |
| return self._stop_stream(self._prepare_url(ENDPOINT_STOP_WORKFLOWS, task_id=task_id), req, **kwargs) | |
| def _stop_stream(self, endpoint: str, req: models.StopRequest, **kwargs) -> models.StopResponse: | |
| response = self.request( | |
| endpoint, | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| return models.StopResponse(**response.json()) | |
| def _prepare_url(self, endpoint: str, **kwargs) -> str: | |
| return self.api_base + endpoint.format(**kwargs) | |
| def _prepare_auth_headers(self, headers: Dict[str, str]): | |
| if "authorization" not in (key.lower() for key in headers.keys()): | |
| headers["Authorization"] = f"Bearer {self.api_key}" | |
| class AsyncClient(BaseModel): | |
| api_key: str | |
| api_base: Optional[str] = "https://api.dify.ai/v1" | |
| async def arequest(self, endpoint: str, method: str, | |
| content: Optional[types.RequestContent] = None, | |
| data: Optional[types.RequestData] = None, | |
| files: Optional[types.RequestFiles] = None, | |
| json: Optional[Any] = None, | |
| params: Optional[types.QueryParamTypes] = None, | |
| headers: Optional[Mapping[str, str]] = None, | |
| **kwargs, | |
| ) -> httpx.Response: | |
| """ | |
| Asynchronously sends a request to the specified Dify API endpoint. | |
| Args: | |
| endpoint: The endpoint URL to which the request is sent. | |
| method: The HTTP method to be used for the request (e.g., 'GET', 'POST'). | |
| content: Raw content to include in the request body, if any. | |
| data: Form data to be sent in the request body. | |
| files: Files to be uploaded with the request. | |
| json: JSON data to be sent in the request body. | |
| params: Query parameters to be included in the request URL. | |
| headers: Additional headers to be sent with the request. | |
| **kwargs: Extra keyword arguments to be passed to the underlying HTTPX request function. | |
| Returns: | |
| A httpx.Response object containing the server's response to the HTTP request. | |
| Raises: | |
| Various DifyAPIError exceptions if the response contains an error. | |
| """ | |
| merged_headers = {} | |
| if headers: | |
| merged_headers.update(headers) | |
| self._prepare_auth_headers(merged_headers) | |
| response = await _async_httpx_client.request(method, endpoint, content=content, data=data, files=files, | |
| json=json, params=params, headers=merged_headers, **kwargs) | |
| errors.raise_for_status(response) | |
| return response | |
| async def arequest_stream(self, endpoint: str, method: str, | |
| content: Optional[types.RequestContent] = None, | |
| data: Optional[types.RequestData] = None, | |
| files: Optional[types.RequestFiles] = None, | |
| json: Optional[Any] = None, | |
| params: Optional[types.QueryParamTypes] = None, | |
| headers: Optional[Mapping[str, str]] = None, | |
| **kwargs, | |
| ) -> AsyncIterator[ServerSentEvent]: | |
| """ | |
| Asynchronously establishes a streaming connection to the specified Dify API endpoint. | |
| Args: | |
| endpoint: The endpoint URL to which the request is sent. | |
| method: The HTTP method to be used for the request (e.g., 'GET', 'POST'). | |
| content: Raw content to include in the request body, if any. | |
| data: Form data to be sent in the request body. | |
| files: Files to be uploaded with the request. | |
| json: JSON data to be sent in the request body. | |
| params: Query parameters to be included in the request URL. | |
| headers: Additional headers to be sent with the request. | |
| **kwargs: Extra keyword arguments to be passed to the underlying HTTPX request function. | |
| Yields: | |
| ServerSentEvent objects representing the events received from the server. | |
| Raises: | |
| Various DifyAPIError exceptions if an error event is received in the stream. | |
| """ | |
| merged_headers = {} | |
| if headers: | |
| merged_headers.update(headers) | |
| self._prepare_auth_headers(merged_headers) | |
| async with aconnect_sse(_async_httpx_client, method, endpoint, headers=merged_headers, | |
| content=content, data=data, files=files, json=json, params=params, | |
| **kwargs) as event_source: | |
| if not _check_stream_content_type(event_source.response): | |
| await event_source.response.aread() | |
| errors.raise_for_status(event_source.response) | |
| async for sse in event_source.aiter_sse(): | |
| errors.raise_for_status(sse) | |
| if sse.event in IGNORED_STREAM_EVENTS or sse.data in IGNORED_STREAM_EVENTS: | |
| continue | |
| yield sse | |
| async def afeedback_messages(self, message_id: str, req: models.FeedbackRequest, **kwargs) \ | |
| -> models.FeedbackResponse: | |
| """ | |
| Submits feedback for a specific message. | |
| Args: | |
| message_id: The identifier of the message to submit feedback for. | |
| req: A `FeedbackRequest` object containing the feedback details, such as the rating. | |
| **kwargs: Extra keyword arguments to pass to the request function. | |
| Returns: | |
| A `FeedbackResponse` object containing the result of the feedback submission. | |
| """ | |
| response = await self.arequest( | |
| self._prepare_url(ENDPOINT_FEEDBACKS, message_id=message_id), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| return models.FeedbackResponse(**response.json()) | |
| async def asuggest_messages(self, message_id: str, req: models.ChatSuggestRequest, **kwargs) \ | |
| -> models.ChatSuggestResponse: | |
| """ | |
| Retrieves suggested messages based on a specific message. | |
| Args: | |
| message_id: The identifier of the message to get suggestions for. | |
| req: A `ChatSuggestRequest` object containing the request details. | |
| **kwargs: Extra keyword arguments to pass to the request function. | |
| Returns: | |
| A `ChatSuggestResponse` object containing suggested messages. | |
| """ | |
| response = await self.arequest( | |
| self._prepare_url(ENDPOINT_SUGGESTED, message_id=message_id), | |
| HTTPMethod.GET, | |
| params=req.model_dump(), | |
| **kwargs, | |
| ) | |
| return models.ChatSuggestResponse(**response.json()) | |
| async def aupload_files(self, file: types.FileTypes, req: models.UploadFileRequest, **kwargs) \ | |
| -> models.UploadFileResponse: | |
| """ | |
| Uploads a file to be used in subsequent requests. | |
| Args: | |
| file: The file to upload. This can be a file-like object, or a tuple of | |
| (`filename`, file-like object, mime_type). | |
| req: An `UploadFileRequest` object containing the upload details, such as the user who is uploading. | |
| **kwargs: Extra keyword arguments to pass to the request function. | |
| Returns: | |
| An `UploadFileResponse` object containing details about the uploaded file, such as its identifier and URL. | |
| """ | |
| response = await self.arequest( | |
| self._prepare_url(ENDPOINT_FILES_UPLOAD), | |
| HTTPMethod.POST, | |
| data=req.model_dump(), | |
| files=[("file", file)], | |
| **kwargs, | |
| ) | |
| return models.UploadFileResponse(**response.json()) | |
| async def acompletion_messages(self, req: models.CompletionRequest, **kwargs) \ | |
| -> Union[models.CompletionResponse, AsyncIterator[models.CompletionStreamResponse]]: | |
| """ | |
| Sends a request to generate a completion or a series of completions based on the provided input. | |
| Returns: | |
| If the response mode is blocking, it returns a `CompletionResponse` object containing the generated message. | |
| If the response mode is streaming, it returns an iterator of `CompletionStreamResponse` objects containing | |
| the stream of generated events. | |
| """ | |
| if req.response_mode == models.ResponseMode.BLOCKING: | |
| return await self._acompletion_messages(req, **kwargs) | |
| if req.response_mode == models.ResponseMode.STREAMING: | |
| return self._acompletion_messages_stream(req, **kwargs) | |
| raise ValueError(f"Invalid request_mode: {req.response_mode}") | |
| async def _acompletion_messages(self, req: models.CompletionRequest, **kwargs) -> models.CompletionResponse: | |
| response = await self.arequest( | |
| self._prepare_url(ENDPOINT_COMPLETION_MESSAGES), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| return models.CompletionResponse(**response.json()) | |
| async def _acompletion_messages_stream(self, req: models.CompletionRequest, **kwargs) \ | |
| -> AsyncIterator[models.CompletionStreamResponse]: | |
| async for sse in self.arequest_stream( | |
| self._prepare_url(ENDPOINT_COMPLETION_MESSAGES), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs): | |
| yield models.build_completion_stream_response(sse.json()) | |
| async def astop_completion_messages(self, task_id: str, req: models.StopRequest, **kwargs) -> models.StopResponse: | |
| """ | |
| Sends a request to stop a streaming completion task. | |
| Returns: | |
| A `StopResponse` object indicating the success of the operation. | |
| """ | |
| return await self._astop_stream( | |
| self._prepare_url(ENDPOINT_STOP_COMPLETION_MESSAGES, task_id=task_id), req, **kwargs) | |
| async def achat_messages(self, req: models.ChatRequest, **kwargs) \ | |
| -> Union[models.ChatResponse, AsyncIterator[models.ChatStreamResponse]]: | |
| """ | |
| Sends a request to generate a chat message or a series of chat messages based on the provided input. | |
| Returns: | |
| If the response mode is blocking, it returns a `ChatResponse` object containing the generated chat message. | |
| If the response mode is streaming, it returns an iterator of `ChatStreamResponse` objects containing the | |
| stream of chat events. | |
| """ | |
| if req.response_mode == models.ResponseMode.BLOCKING: | |
| return await self._achat_messages(req, **kwargs) | |
| if req.response_mode == models.ResponseMode.STREAMING: | |
| return self._achat_messages_stream(req, **kwargs) | |
| raise ValueError(f"Invalid request_mode: {req.response_mode}") | |
| async def _achat_messages(self, req: models.ChatRequest, **kwargs) -> models.ChatResponse: | |
| response = await self.arequest( | |
| self._prepare_url(ENDPOINT_CHAT_MESSAGES), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| return models.ChatResponse(**response.json()) | |
| async def _achat_messages_stream(self, req: models.ChatRequest, **kwargs) \ | |
| -> AsyncIterator[models.ChatStreamResponse]: | |
| async for sse in self.arequest_stream( | |
| self._prepare_url(ENDPOINT_CHAT_MESSAGES), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs): | |
| yield models.build_chat_stream_response(sse.json()) | |
| async def astop_chat_messages(self, task_id: str, req: models.StopRequest, **kwargs) -> models.StopResponse: | |
| """ | |
| Sends a request to stop a streaming chat task. | |
| Returns: | |
| A `StopResponse` object indicating the success of the operation. | |
| """ | |
| return await self._astop_stream(self._prepare_url(ENDPOINT_STOP_CHAT_MESSAGES, task_id=task_id), req, **kwargs) | |
| async def arun_workflows(self, req: models.WorkflowsRunRequest, **kwargs) \ | |
| -> Union[models.WorkflowsRunResponse, AsyncIterator[models.WorkflowsStreamResponse]]: | |
| """ | |
| Initiates the execution of a workflow, which can consist of multiple steps and actions. | |
| Returns: | |
| If the response mode is blocking, it returns a `WorkflowsRunResponse` object containing the results of the | |
| completed workflow. | |
| If the response mode is streaming, it returns an iterator of `WorkflowsRunStreamResponse` objects | |
| containing the stream of workflow events. | |
| """ | |
| if req.response_mode == models.ResponseMode.BLOCKING: | |
| return await self._arun_workflows(req, **kwargs) | |
| if req.response_mode == models.ResponseMode.STREAMING: | |
| return self._arun_workflows_stream(req, **kwargs) | |
| raise ValueError(f"Invalid request_mode: {req.response_mode}") | |
| async def _arun_workflows(self, req: models.WorkflowsRunRequest, **kwargs) -> models.WorkflowsRunResponse: | |
| response = await self.arequest( | |
| self._prepare_url(ENDPOINT_RUN_WORKFLOWS), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| return models.WorkflowsRunResponse(**response.json()) | |
| async def _arun_workflows_stream(self, req: models.WorkflowsRunRequest, **kwargs) \ | |
| -> AsyncIterator[models.WorkflowsRunStreamResponse]: | |
| async for sse in self.arequest_stream( | |
| self._prepare_url(ENDPOINT_RUN_WORKFLOWS), | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs): | |
| yield models.build_workflows_stream_response(sse.json()) | |
| async def astop_workflows(self, task_id: str, req: models.StopRequest, **kwargs) -> models.StopResponse: | |
| """ | |
| Sends a request to stop a streaming workflow task. | |
| Returns: | |
| A `StopResponse` object indicating the success of the operation. | |
| """ | |
| return await self._astop_stream(self._prepare_url(ENDPOINT_STOP_WORKFLOWS, task_id=task_id), req, **kwargs) | |
| async def _astop_stream(self, endpoint: str, req: models.StopRequest, **kwargs) -> models.StopResponse: | |
| response = await self.arequest( | |
| endpoint, | |
| HTTPMethod.POST, | |
| json=req.model_dump(), | |
| **kwargs, | |
| ) | |
| return models.StopResponse(**response.json()) | |
| def _prepare_url(self, endpoint: str, **kwargs) -> str: | |
| return self.api_base + endpoint.format(**kwargs) | |
| def _prepare_auth_headers(self, headers: Dict[str, str]): | |
| if "authorization" not in (key.lower() for key in headers.keys()): | |
| headers["Authorization"] = f"Bearer {self.api_key}" | |
| def _get_content_type(headers: httpx.Headers) -> str: | |
| return headers.get("content-type", "").partition(";")[0] | |
| def _check_stream_content_type(response: httpx.Response) -> bool: | |
| content_type = _get_content_type(response.headers) | |
| return response.is_success and "text/event-stream" in content_type | |