Spaces:
Runtime error
Runtime error
| # Character_Chat_Lib.py | |
| # Description: Functions for character chat cards. | |
| # | |
| # Imports | |
| import json | |
| import logging | |
| import io | |
| import base64 | |
| import time | |
| from typing import Dict, Any, Optional, List, Tuple | |
| # | |
| # External Imports | |
| from PIL import Image | |
| # | |
| # Local imports | |
| from App_Function_Libraries.DB.DB_Manager import get_character_card_by_id, get_character_chat_by_id | |
| from App_Function_Libraries.Metrics.metrics_logger import log_counter, log_histogram | |
| # | |
| # Constants | |
| #################################################################################################### | |
| # | |
| # Functions | |
| # Using https://github.com/malfoyslastname/character-card-spec-v2 as the standard for v2 character cards | |
| ################################################################################# | |
| # | |
| # Placeholder functions: | |
| def replace_placeholders(text: str, char_name: str, user_name: str) -> str: | |
| """ | |
| Replace placeholders in the given text with appropriate values. | |
| Args: | |
| text (str): The text containing placeholders. | |
| char_name (str): The name of the character. | |
| user_name (str): The name of the user. | |
| Returns: | |
| str: The text with placeholders replaced. | |
| """ | |
| replacements = { | |
| '{{char}}': char_name, | |
| '{{user}}': user_name, | |
| '{{random_user}}': user_name # Assuming random_user is the same as user for simplicity | |
| } | |
| for placeholder, value in replacements.items(): | |
| text = text.replace(placeholder, value) | |
| return text | |
| def replace_user_placeholder(history, user_name): | |
| """ | |
| Replaces all instances of '{{user}}' in the chat history with the actual user name. | |
| Args: | |
| history (list): The current chat history as a list of tuples (user_message, bot_message). | |
| user_name (str): The name entered by the user. | |
| Returns: | |
| list: Updated chat history with placeholders replaced. | |
| """ | |
| if not user_name: | |
| user_name = "User" # Default name if none provided | |
| updated_history = [] | |
| for user_msg, bot_msg in history: | |
| # Replace in user message | |
| if user_msg: | |
| user_msg = user_msg.replace("{{user}}", user_name) | |
| # Replace in bot message | |
| if bot_msg: | |
| bot_msg = bot_msg.replace("{{user}}", user_name) | |
| updated_history.append((user_msg, bot_msg)) | |
| return updated_history | |
| # | |
| # End of Placeholder functions | |
| ################################################################################# | |
| ################################################################################# | |
| # | |
| # Functions for character card processing: | |
| def extract_character_id(choice: str) -> int: | |
| """Extract the character ID from the dropdown selection string.""" | |
| log_counter("extract_character_id_attempt") | |
| try: | |
| character_id = int(choice.split('(ID: ')[1].rstrip(')')) | |
| log_counter("extract_character_id_success") | |
| return character_id | |
| except Exception as e: | |
| log_counter("extract_character_id_error", labels={"error": str(e)}) | |
| raise | |
| def load_character_wrapper(character_id: int, user_name: str) -> Tuple[Dict[str, Any], List[Tuple[Optional[str], str]], Optional[Image.Image]]: | |
| """Wrapper function to load character and image using the extracted ID.""" | |
| log_counter("load_character_wrapper_attempt") | |
| start_time = time.time() | |
| try: | |
| char_data, chat_history, img = load_character_and_image(character_id, user_name) | |
| load_duration = time.time() - start_time | |
| log_histogram("load_character_wrapper_duration", load_duration) | |
| log_counter("load_character_wrapper_success") | |
| return char_data, chat_history, img | |
| except Exception as e: | |
| log_counter("load_character_wrapper_error", labels={"error": str(e)}) | |
| raise | |
| def parse_character_book(book_data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Parse the character book data from a V2 character card. | |
| Args: | |
| book_data (Dict[str, Any]): The raw character book data from the character card. | |
| Returns: | |
| Dict[str, Any]: The parsed and structured character book data. | |
| """ | |
| parsed_book = { | |
| 'name': book_data.get('name', ''), | |
| 'description': book_data.get('description', ''), | |
| 'scan_depth': book_data.get('scan_depth'), | |
| 'token_budget': book_data.get('token_budget'), | |
| 'recursive_scanning': book_data.get('recursive_scanning', False), | |
| 'extensions': book_data.get('extensions', {}), | |
| 'entries': [] | |
| } | |
| for entry in book_data.get('entries', []): | |
| parsed_entry = { | |
| 'keys': entry['keys'], | |
| 'content': entry['content'], | |
| 'extensions': entry.get('extensions', {}), | |
| 'enabled': entry['enabled'], | |
| 'insertion_order': entry['insertion_order'], | |
| 'case_sensitive': entry.get('case_sensitive', False), | |
| 'name': entry.get('name', ''), | |
| 'priority': entry.get('priority'), | |
| 'id': entry.get('id'), | |
| 'comment': entry.get('comment', ''), | |
| 'selective': entry.get('selective', False), | |
| 'secondary_keys': entry.get('secondary_keys', []), | |
| 'constant': entry.get('constant', False), | |
| 'position': entry.get('position') | |
| } | |
| parsed_book['entries'].append(parsed_entry) | |
| return parsed_book | |
| def load_character_and_image(character_id: int, user_name: str) -> Tuple[Optional[Dict[str, Any]], List[Tuple[Optional[str], str]], Optional[Image.Image]]: | |
| """ | |
| Load a character and its associated image based on the character ID. | |
| Args: | |
| character_id (int): The ID of the character to load. | |
| user_name (str): The name of the user, used for placeholder replacement. | |
| Returns: | |
| Tuple[Optional[Dict[str, Any]], List[Tuple[Optional[str], str]], Optional[Image.Image]]: | |
| A tuple containing the character data, chat history, and character image (if available). | |
| """ | |
| log_counter("load_character_and_image_attempt") | |
| start_time = time.time() | |
| try: | |
| char_data = get_character_card_by_id(character_id) | |
| if not char_data: | |
| log_counter("load_character_and_image_no_data") | |
| logging.warning(f"No character data found for ID: {character_id}") | |
| return None, [], None | |
| # Replace placeholders in character data | |
| for field in ['first_mes', 'mes_example', 'scenario', 'description', 'personality']: | |
| if field in char_data: | |
| char_data[field] = replace_placeholders(char_data[field], char_data['name'], user_name) | |
| # Replace placeholders in first_mes | |
| first_mes = char_data.get('first_mes', "Hello! I'm ready to chat.") | |
| first_mes = replace_placeholders(first_mes, char_data['name'], user_name) | |
| chat_history = [(None, first_mes)] if first_mes else [] | |
| img = None | |
| if char_data.get('image'): | |
| try: | |
| image_data = base64.b64decode(char_data['image']) | |
| img = Image.open(io.BytesIO(image_data)).convert("RGBA") | |
| log_counter("load_character_image_success") | |
| except Exception as e: | |
| log_counter("load_character_image_error", labels={"error": str(e)}) | |
| logging.error(f"Error processing image for character '{char_data['name']}': {e}") | |
| load_duration = time.time() - start_time | |
| log_histogram("load_character_and_image_duration", load_duration) | |
| log_counter("load_character_and_image_success") | |
| return char_data, chat_history, img | |
| except Exception as e: | |
| log_counter("load_character_and_image_error", labels={"error": str(e)}) | |
| logging.error(f"Error in load_character_and_image: {e}") | |
| return None, [], None | |
| def load_chat_and_character(chat_id: int, user_name: str) -> Tuple[Optional[Dict[str, Any]], List[Tuple[str, str]], Optional[Image.Image]]: | |
| """ | |
| Load a chat and its associated character, including the character image and process templates. | |
| Args: | |
| chat_id (int): The ID of the chat to load. | |
| user_name (str): The name of the user. | |
| Returns: | |
| Tuple[Optional[Dict[str, Any]], List[Tuple[str, str]], Optional[Image.Image]]: | |
| A tuple containing the character data, processed chat history, and character image (if available). | |
| """ | |
| log_counter("load_chat_and_character_attempt") | |
| start_time = time.time() | |
| try: | |
| # Load the chat | |
| chat = get_character_chat_by_id(chat_id) | |
| if not chat: | |
| log_counter("load_chat_and_character_no_chat") | |
| logging.warning(f"No chat found with ID: {chat_id}") | |
| return None, [], None | |
| # Load the associated character | |
| character_id = chat['character_id'] | |
| char_data = get_character_card_by_id(character_id) | |
| if not char_data: | |
| log_counter("load_chat_and_character_no_character") | |
| logging.warning(f"No character found for chat ID: {chat_id}") | |
| return None, chat['chat_history'], None | |
| # Process the chat history | |
| processed_history = process_chat_history(chat['chat_history'], char_data['name'], user_name) | |
| # Load the character image | |
| img = None | |
| if char_data.get('image'): | |
| try: | |
| image_data = base64.b64decode(char_data['image']) | |
| img = Image.open(io.BytesIO(image_data)).convert("RGBA") | |
| log_counter("load_chat_character_image_success") | |
| except Exception as e: | |
| log_counter("load_chat_character_image_error", labels={"error": str(e)}) | |
| logging.error(f"Error processing image for character '{char_data['name']}': {e}") | |
| # Process character data templates | |
| for field in ['first_mes', 'mes_example', 'scenario', 'description', 'personality']: | |
| if field in char_data: | |
| char_data[field] = replace_placeholders(char_data[field], char_data['name'], user_name) | |
| load_duration = time.time() - start_time | |
| log_histogram("load_chat_and_character_duration", load_duration) | |
| log_counter("load_chat_and_character_success") | |
| return char_data, processed_history, img | |
| except Exception as e: | |
| log_counter("load_chat_and_character_error", labels={"error": str(e)}) | |
| logging.error(f"Error in load_chat_and_character: {e}") | |
| return None, [], None | |
| def extract_json_from_image(image_file): | |
| logging.debug(f"Attempting to extract JSON from image: {image_file.name}") | |
| log_counter("extract_json_from_image_attempt") | |
| start_time = time.time() | |
| try: | |
| with Image.open(image_file) as img: | |
| logging.debug("Image opened successfully") | |
| metadata = img.info | |
| if 'chara' in metadata: | |
| logging.debug("Found 'chara' in image metadata") | |
| chara_content = metadata['chara'] | |
| logging.debug(f"Content of 'chara' metadata (first 100 chars): {chara_content[:100]}...") | |
| try: | |
| decoded_content = base64.b64decode(chara_content).decode('utf-8') | |
| logging.debug(f"Decoded content (first 100 chars): {decoded_content[:100]}...") | |
| log_counter("extract_json_from_image_metadata_success") | |
| return decoded_content | |
| except Exception as e: | |
| logging.error(f"Error decoding base64 content: {e}") | |
| log_counter("extract_json_from_image_decode_error", labels={"error": str(e)}) | |
| logging.warning("'chara' not found in metadata, attempting to find JSON data in image bytes") | |
| # Alternative method to extract embedded JSON from image bytes if metadata is not available | |
| img_byte_arr = io.BytesIO() | |
| img.save(img_byte_arr, format='PNG') | |
| img_bytes = img_byte_arr.getvalue() | |
| img_str = img_bytes.decode('latin1') | |
| # Search for JSON-like structures in the image bytes | |
| json_start = img_str.find('{') | |
| json_end = img_str.rfind('}') | |
| if json_start != -1 and json_end != -1 and json_end > json_start: | |
| possible_json = img_str[json_start:json_end+1] | |
| try: | |
| json.loads(possible_json) | |
| logging.debug("Found JSON data in image bytes") | |
| log_counter("extract_json_from_image_bytes_success") | |
| return possible_json | |
| except json.JSONDecodeError: | |
| logging.debug("No valid JSON found in image bytes") | |
| log_counter("extract_json_from_image_invalid_json") | |
| logging.warning("No JSON data found in the image") | |
| log_counter("extract_json_from_image_no_json_found") | |
| except Exception as e: | |
| log_counter("extract_json_from_image_error", labels={"error": str(e)}) | |
| logging.error(f"Error extracting JSON from image: {e}") | |
| extract_duration = time.time() - start_time | |
| log_histogram("extract_json_from_image_duration", extract_duration) | |
| return None | |
| def load_chat_history(file): | |
| log_counter("load_chat_history_attempt") | |
| start_time = time.time() | |
| try: | |
| content = file.read().decode('utf-8') | |
| chat_data = json.loads(content) | |
| # Extract history and character name from the loaded data | |
| history = chat_data.get('history') or chat_data.get('messages') | |
| character_name = chat_data.get('character') or chat_data.get('character_name') | |
| if not history or not character_name: | |
| log_counter("load_chat_history_incomplete_data") | |
| logging.error("Chat history or character name missing in the imported file.") | |
| return None, None | |
| load_duration = time.time() - start_time | |
| log_histogram("load_chat_history_duration", load_duration) | |
| log_counter("load_chat_history_success") | |
| return history, character_name | |
| except Exception as e: | |
| log_counter("load_chat_history_error", labels={"error": str(e)}) | |
| logging.error(f"Error loading chat history: {e}") | |
| return None, None | |
| def process_chat_history(chat_history: List[Tuple[str, str]], char_name: str, user_name: str) -> List[Tuple[str, str]]: | |
| """ | |
| Process the chat history to replace placeholders in both user and character messages. | |
| Args: | |
| chat_history (List[Tuple[str, str]]): The chat history. | |
| char_name (str): The name of the character. | |
| user_name (str): The name of the user. | |
| Returns: | |
| List[Tuple[str, str]]: The processed chat history. | |
| """ | |
| log_counter("process_chat_history_attempt") | |
| start_time = time.time() | |
| try: | |
| processed_history = [] | |
| for user_msg, char_msg in chat_history: | |
| if user_msg: | |
| user_msg = replace_placeholders(user_msg, char_name, user_name) | |
| if char_msg: | |
| char_msg = replace_placeholders(char_msg, char_name, user_name) | |
| processed_history.append((user_msg, char_msg)) | |
| process_duration = time.time() - start_time | |
| log_histogram("process_chat_history_duration", process_duration) | |
| log_counter("process_chat_history_success", labels={"message_count": len(chat_history)}) | |
| return processed_history | |
| except Exception as e: | |
| log_counter("process_chat_history_error", labels={"error": str(e)}) | |
| logging.error(f"Error processing chat history: {e}") | |
| raise | |
| def validate_character_book(book_data): | |
| """ | |
| Validate the 'character_book' field in the character card. | |
| Args: | |
| book_data (dict): The character book data. | |
| Returns: | |
| Tuple[bool, List[str]]: A tuple containing a boolean indicating validity and a list of validation messages. | |
| """ | |
| validation_messages = [] | |
| # Optional fields with expected types | |
| optional_fields = { | |
| 'name': str, | |
| 'description': str, | |
| 'scan_depth': (int, float), | |
| 'token_budget': (int, float), | |
| 'recursive_scanning': bool, | |
| 'extensions': dict, | |
| 'entries': list | |
| } | |
| for field, expected_type in optional_fields.items(): | |
| if field in book_data: | |
| if not isinstance(book_data[field], expected_type): | |
| validation_messages.append(f"Field 'character_book.{field}' must be of type '{expected_type}'.") | |
| # 'entries' is required | |
| if 'entries' not in book_data or not isinstance(book_data['entries'], list): | |
| validation_messages.append("Field 'character_book.entries' is required and must be a list.") | |
| return False, validation_messages | |
| # Validate each entry in 'entries' | |
| entries = book_data.get('entries', []) | |
| entry_ids = set() | |
| for idx, entry in enumerate(entries): | |
| is_valid_entry, entry_messages = validate_character_book_entry(entry, idx, entry_ids) | |
| if not is_valid_entry: | |
| validation_messages.extend(entry_messages) | |
| is_valid = len(validation_messages) == 0 | |
| return is_valid, validation_messages | |
| def validate_character_book_entry(entry, idx, entry_ids): | |
| """ | |
| Validate an entry in the 'character_book.entries' list. | |
| Args: | |
| entry (dict): The entry data. | |
| idx (int): The index of the entry in the list. | |
| entry_ids (set): A set of existing entry IDs for uniqueness checking. | |
| Returns: | |
| Tuple[bool, List[str]]: A tuple containing a boolean indicating validity and a list of validation messages. | |
| """ | |
| validation_messages = [] | |
| required_fields = { | |
| 'keys': list, | |
| 'content': str, | |
| 'extensions': dict, | |
| 'enabled': bool, | |
| 'insertion_order': (int, float) | |
| } | |
| for field, expected_type in required_fields.items(): | |
| if field not in entry: | |
| validation_messages.append(f"Entry {idx}: Missing required field '{field}'.") | |
| elif not isinstance(entry[field], expected_type): | |
| validation_messages.append(f"Entry {idx}: Field '{field}' must be of type '{expected_type}'.") | |
| elif field == 'content' and not entry[field].strip(): | |
| validation_messages.append(f"Entry {idx}: Field 'content' cannot be empty.") | |
| elif field == 'keys' and not entry[field]: | |
| validation_messages.append(f"Entry {idx}: Field 'keys' cannot be empty.") | |
| # Optional fields | |
| optional_fields = { | |
| 'case_sensitive': bool, | |
| 'name': str, | |
| 'priority': (int, float), | |
| 'id': (int, float), | |
| 'comment': str, | |
| 'selective': bool, | |
| 'secondary_keys': list, | |
| 'constant': bool, | |
| 'position': str # Should be 'before_char' or 'after_char' | |
| } | |
| for field, expected_type in optional_fields.items(): | |
| if field in entry and not isinstance(entry[field], expected_type): | |
| validation_messages.append(f"Entry {idx}: Field '{field}' must be of type '{expected_type}'.") | |
| # Validate 'position' value if present | |
| if 'position' in entry: | |
| if entry['position'] not in ['before_char', 'after_char']: | |
| validation_messages.append(f"Entry {idx}: Field 'position' must be 'before_char' or 'after_char'.") | |
| # Validate 'secondary_keys' if 'selective' is True | |
| if entry.get('selective', False): | |
| if 'secondary_keys' not in entry or not isinstance(entry['secondary_keys'], list): | |
| validation_messages.append(f"Entry {idx}: 'secondary_keys' must be a list when 'selective' is True.") | |
| elif not entry['secondary_keys']: | |
| validation_messages.append(f"Entry {idx}: 'secondary_keys' cannot be empty when 'selective' is True.") | |
| # Validate 'keys' list elements | |
| if 'keys' in entry and isinstance(entry['keys'], list): | |
| for i, key in enumerate(entry['keys']): | |
| if not isinstance(key, str) or not key.strip(): | |
| validation_messages.append(f"Entry {idx}: Element {i} in 'keys' must be a non-empty string.") | |
| # Validate 'secondary_keys' list elements | |
| if 'secondary_keys' in entry and isinstance(entry['secondary_keys'], list): | |
| for i, key in enumerate(entry['secondary_keys']): | |
| if not isinstance(key, str) or not key.strip(): | |
| validation_messages.append(f"Entry {idx}: Element {i} in 'secondary_keys' must be a non-empty string.") | |
| # Validate 'id' uniqueness | |
| if 'id' in entry: | |
| entry_id = entry['id'] | |
| if entry_id in entry_ids: | |
| validation_messages.append \ | |
| (f"Entry {idx}: Duplicate 'id' value '{entry_id}'. Each entry 'id' must be unique.") | |
| else: | |
| entry_ids.add(entry_id) | |
| # Validate 'extensions' keys are namespaced | |
| if 'extensions' in entry and isinstance(entry['extensions'], dict): | |
| for key in entry['extensions'].keys(): | |
| if '/' not in key and '_' not in key: | |
| validation_messages.append \ | |
| (f"Entry {idx}: Extension key '{key}' in 'extensions' should be namespaced to prevent conflicts.") | |
| is_valid = len(validation_messages) == 0 | |
| return is_valid, validation_messages | |
| def validate_v2_card(card_data): | |
| """ | |
| Validate a character card according to the V2 specification. | |
| Args: | |
| card_data (dict): The parsed character card data. | |
| Returns: | |
| Tuple[bool, List[str]]: A tuple containing a boolean indicating validity and a list of validation messages. | |
| """ | |
| validation_messages = [] | |
| # Check top-level fields | |
| if 'spec' not in card_data: | |
| validation_messages.append("Missing 'spec' field.") | |
| elif card_data['spec'] != 'chara_card_v2': | |
| validation_messages.append(f"Invalid 'spec' value: {card_data['spec']}. Expected 'chara_card_v2'.") | |
| if 'spec_version' not in card_data: | |
| validation_messages.append("Missing 'spec_version' field.") | |
| else: | |
| # Ensure 'spec_version' is '2.0' or higher | |
| try: | |
| spec_version = float(card_data['spec_version']) | |
| if spec_version < 2.0: | |
| validation_messages.append \ | |
| (f"'spec_version' must be '2.0' or higher. Found '{card_data['spec_version']}'.") | |
| except ValueError: | |
| validation_messages.append \ | |
| (f"Invalid 'spec_version' format: {card_data['spec_version']}. Must be a number as a string.") | |
| if 'data' not in card_data: | |
| validation_messages.append("Missing 'data' field.") | |
| return False, validation_messages # Cannot proceed without 'data' field | |
| data = card_data['data'] | |
| # Required fields in 'data' | |
| required_fields = ['name', 'description', 'personality', 'scenario', 'first_mes', 'mes_example'] | |
| for field in required_fields: | |
| if field not in data: | |
| validation_messages.append(f"Missing required field in 'data': '{field}'.") | |
| elif not isinstance(data[field], str): | |
| validation_messages.append(f"Field '{field}' must be a string.") | |
| elif not data[field].strip(): | |
| validation_messages.append(f"Field '{field}' cannot be empty.") | |
| # Optional fields with expected types | |
| optional_fields = { | |
| 'creator_notes': str, | |
| 'system_prompt': str, | |
| 'post_history_instructions': str, | |
| 'alternate_greetings': list, | |
| 'tags': list, | |
| 'creator': str, | |
| 'character_version': str, | |
| 'extensions': dict, | |
| 'character_book': dict # If present, should be a dict | |
| } | |
| for field, expected_type in optional_fields.items(): | |
| if field in data: | |
| if not isinstance(data[field], expected_type): | |
| validation_messages.append(f"Field '{field}' must be of type '{expected_type.__name__}'.") | |
| elif field == 'extensions': | |
| # Validate that extensions keys are properly namespaced | |
| for key in data[field].keys(): | |
| if '/' not in key and '_' not in key: | |
| validation_messages.append \ | |
| (f"Extension key '{key}' in 'extensions' should be namespaced to prevent conflicts.") | |
| # If 'alternate_greetings' is present, check that it's a list of non-empty strings | |
| if 'alternate_greetings' in data and isinstance(data['alternate_greetings'], list): | |
| for idx, greeting in enumerate(data['alternate_greetings']): | |
| if not isinstance(greeting, str) or not greeting.strip(): | |
| validation_messages.append(f"Element {idx} in 'alternate_greetings' must be a non-empty string.") | |
| # If 'tags' is present, check that it's a list of non-empty strings | |
| if 'tags' in data and isinstance(data['tags'], list): | |
| for idx, tag in enumerate(data['tags']): | |
| if not isinstance(tag, str) or not tag.strip(): | |
| validation_messages.append(f"Element {idx} in 'tags' must be a non-empty string.") | |
| # Validate 'extensions' field | |
| if 'extensions' in data and not isinstance(data['extensions'], dict): | |
| validation_messages.append("Field 'extensions' must be a dictionary.") | |
| # Validate 'character_book' if present | |
| if 'character_book' in data: | |
| is_valid_book, book_messages = validate_character_book(data['character_book']) | |
| if not is_valid_book: | |
| validation_messages.extend(book_messages) | |
| is_valid = len(validation_messages) == 0 | |
| return is_valid, validation_messages | |
| # | |
| # End of File | |
| #################################################################################################### | |