| from typing import Optional | |
| from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity | |
| from core.memory.token_buffer_memory import TokenBufferMemory | |
| from core.model_manager import ModelInstance | |
| from core.model_runtime.entities.message_entities import PromptMessage | |
| from core.model_runtime.entities.model_entities import ModelPropertyKey | |
| from core.prompt.entities.advanced_prompt_entities import MemoryConfig | |
| class PromptTransform: | |
| def _append_chat_histories( | |
| self, | |
| memory: TokenBufferMemory, | |
| memory_config: MemoryConfig, | |
| prompt_messages: list[PromptMessage], | |
| model_config: ModelConfigWithCredentialsEntity, | |
| ) -> list[PromptMessage]: | |
| rest_tokens = self._calculate_rest_token(prompt_messages, model_config) | |
| histories = self._get_history_messages_list_from_memory(memory, memory_config, rest_tokens) | |
| prompt_messages.extend(histories) | |
| return prompt_messages | |
| def _calculate_rest_token( | |
| self, prompt_messages: list[PromptMessage], model_config: ModelConfigWithCredentialsEntity | |
| ) -> int: | |
| rest_tokens = 2000 | |
| model_context_tokens = model_config.model_schema.model_properties.get(ModelPropertyKey.CONTEXT_SIZE) | |
| if model_context_tokens: | |
| model_instance = ModelInstance( | |
| provider_model_bundle=model_config.provider_model_bundle, model=model_config.model | |
| ) | |
| curr_message_tokens = model_instance.get_llm_num_tokens(prompt_messages) | |
| max_tokens = 0 | |
| for parameter_rule in model_config.model_schema.parameter_rules: | |
| if parameter_rule.name == "max_tokens" or ( | |
| parameter_rule.use_template and parameter_rule.use_template == "max_tokens" | |
| ): | |
| max_tokens = ( | |
| model_config.parameters.get(parameter_rule.name) | |
| or model_config.parameters.get(parameter_rule.use_template) | |
| ) or 0 | |
| rest_tokens = model_context_tokens - max_tokens - curr_message_tokens | |
| rest_tokens = max(rest_tokens, 0) | |
| return rest_tokens | |
| def _get_history_messages_from_memory( | |
| self, | |
| memory: TokenBufferMemory, | |
| memory_config: MemoryConfig, | |
| max_token_limit: int, | |
| human_prefix: Optional[str] = None, | |
| ai_prefix: Optional[str] = None, | |
| ) -> str: | |
| """Get memory messages.""" | |
| kwargs = {"max_token_limit": max_token_limit} | |
| if human_prefix: | |
| kwargs["human_prefix"] = human_prefix | |
| if ai_prefix: | |
| kwargs["ai_prefix"] = ai_prefix | |
| if memory_config.window.enabled and memory_config.window.size is not None and memory_config.window.size > 0: | |
| kwargs["message_limit"] = memory_config.window.size | |
| return memory.get_history_prompt_text(**kwargs) | |
| def _get_history_messages_list_from_memory( | |
| self, memory: TokenBufferMemory, memory_config: MemoryConfig, max_token_limit: int | |
| ) -> list[PromptMessage]: | |
| """Get memory messages.""" | |
| return memory.get_history_prompt_messages( | |
| max_token_limit=max_token_limit, | |
| message_limit=memory_config.window.size | |
| if ( | |
| memory_config.window.enabled and memory_config.window.size is not None and memory_config.window.size > 0 | |
| ) | |
| else None, | |
| ) | |