Spaces:
Sleeping
Sleeping
| # coding: utf-8 | |
| """ | |
| oss.py | |
| Utility class for OSS (Object Storage Service) operations. | |
| Provides simple methods for data operations: upload, read, delete, update. | |
| """ | |
| import os | |
| import json | |
| import tempfile | |
| from typing import Optional, Dict, List, Any, Tuple, Union, BinaryIO, TextIO, IO, AnyStr | |
| from aworld.utils import import_package | |
| from aworld.logs.util import logger | |
| class OSSClient: | |
| """ | |
| A utility class for OSS (Object Storage Service) operations. | |
| Provides methods for data operations: upload, read, delete, update. | |
| """ | |
| def __init__(self, | |
| access_key_id: Optional[str] = None, | |
| access_key_secret: Optional[str] = None, | |
| endpoint: Optional[str] = None, | |
| bucket_name: Optional[str] = None, | |
| enable_export: Optional[bool] = None): | |
| """ | |
| Initialize OSSClient with credentials. | |
| Args: | |
| access_key_id: OSS access key ID. If None, will try to get from environment variable OSS_ACCESS_KEY_ID | |
| access_key_secret: OSS access key secret. If None, will try to get from environment variable OSS_ACCESS_KEY_SECRET | |
| endpoint: OSS endpoint. If None, will try to get from environment variable OSS_ENDPOINT | |
| bucket_name: OSS bucket name. If None, will try to get from environment variable OSS_BUCKET_NAME | |
| enable_export: Whether to enable OSS export. If None, will try to get from environment variable EXPORT_REPLAY_TRACE_TO_OSS | |
| """ | |
| self.access_key_id = access_key_id or os.getenv('OSS_ACCESS_KEY_ID') | |
| self.access_key_secret = access_key_secret or os.getenv('OSS_ACCESS_KEY_SECRET') | |
| self.endpoint = endpoint or os.getenv('OSS_ENDPOINT') | |
| self.bucket_name = bucket_name or os.getenv('OSS_BUCKET_NAME') | |
| self.enable_export = enable_export if enable_export is not None else os.getenv("EXPORT_REPLAY_TRACE_TO_OSS", | |
| "false").lower() == "true" | |
| self.bucket = None | |
| self._initialized = False | |
| def initialize(self) -> bool: | |
| """ | |
| Initialize the OSS client with the provided or environment credentials. | |
| Returns: | |
| bool: True if initialization is successful, False otherwise | |
| """ | |
| if self._initialized: | |
| return True | |
| if not self.enable_export: | |
| logger.info("OSS export is disabled. Set EXPORT_REPLAY_TRACE_TO_OSS=true to enable.") | |
| return False | |
| if not all([self.access_key_id, self.access_key_secret, self.endpoint, self.bucket_name]): | |
| logger.warn( | |
| "Missing required OSS credentials. Please provide all required parameters or set environment variables.") | |
| return False | |
| try: | |
| import_package("oss2") | |
| import oss2 | |
| auth = oss2.Auth(self.access_key_id, self.access_key_secret) | |
| self.bucket = oss2.Bucket(auth, self.endpoint, self.bucket_name) | |
| self._initialized = True | |
| return True | |
| except ImportError: | |
| logger.warn("Failed to import oss2 module. Please install it with 'pip install oss2'.") | |
| return False | |
| except Exception as e: | |
| logger.warn(f"Failed to initialize OSS client. Error: {str(e)}") | |
| return False | |
| # ---- Basic Data Operation Methods ---- | |
| def upload_data(self, data: Union[IO[AnyStr], str, bytes, dict], oss_key: str) -> Optional[str]: | |
| """ | |
| Upload data to OSS. Supports various types of data: | |
| - In-memory file objects (IO[AnyStr]) | |
| - Strings (str) | |
| - Bytes (bytes) | |
| - Dictionaries (dict), will be automatically converted to JSON | |
| - File paths (str) | |
| Args: | |
| data: Data to upload, can be a file object or other supported types | |
| oss_key: The key (path) in OSS where the data will be stored | |
| Returns: | |
| str: The OSS key if successful, None otherwise | |
| """ | |
| if not self.initialize(): | |
| logger.warn("OSS client not initialized or export is disabled") | |
| return None | |
| try: | |
| # Handle file objects | |
| if hasattr(data, 'read'): | |
| content = data.read() | |
| if isinstance(content, str): | |
| content = content.encode('utf-8') | |
| self.bucket.put_object(oss_key, content) | |
| logger.info(f"Successfully uploaded memory file to OSS: {oss_key}") | |
| return oss_key | |
| # Handle dictionaries | |
| if isinstance(data, dict): | |
| content = json.dumps(data, ensure_ascii=False).encode('utf-8') | |
| self.bucket.put_object(oss_key, content) | |
| return oss_key | |
| # Handle strings | |
| if isinstance(data, str): | |
| # Check if it's a file path | |
| if os.path.isfile(data): | |
| self.bucket.put_object_from_file(oss_key, data) | |
| logger.info(f"Successfully uploaded file {data} to OSS: {oss_key}") | |
| return oss_key | |
| # Otherwise treat as string content | |
| content = data.encode('utf-8') | |
| self.bucket.put_object(oss_key, content) | |
| return oss_key | |
| # Handle bytes | |
| self.bucket.put_object(oss_key, data) | |
| logger.info(f"Successfully uploaded data to OSS: {oss_key}") | |
| return oss_key | |
| except Exception as e: | |
| logger.warn(f"Failed to upload data to OSS: {str(e)}") | |
| return None | |
| def read_data(self, oss_key: str, as_json: bool = False) -> Union[bytes, dict, str, None]: | |
| """ | |
| Read data from OSS. | |
| Args: | |
| oss_key: The key (path) in OSS of the data to read | |
| as_json: If True, parse the data as JSON and return a dict | |
| Returns: | |
| The data as bytes, dict (if as_json=True), or None if failed | |
| """ | |
| if not self.initialize(): | |
| logger.warn("OSS client not initialized or export is disabled") | |
| return None | |
| try: | |
| # Read data | |
| result = self.bucket.get_object(oss_key) | |
| data = result.read() | |
| # Convert to string or JSON if requested | |
| if as_json: | |
| return json.loads(data) | |
| return data | |
| except Exception as e: | |
| logger.warn(f"Failed to read data from OSS: {str(e)}") | |
| return None | |
| def read_text(self, oss_key: str) -> Optional[str]: | |
| """ | |
| Read text data from OSS. | |
| Args: | |
| oss_key: The key (path) in OSS of the text to read | |
| Returns: | |
| str: The text data, or None if failed | |
| """ | |
| data = self.read_data(oss_key) | |
| if data is not None: | |
| try: | |
| return data.decode('utf-8') | |
| except Exception as e: | |
| logger.warn(f"Failed to decode data as UTF-8: {str(e)}") | |
| return None | |
| def delete_data(self, oss_key: str) -> bool: | |
| """ | |
| Delete data from OSS. | |
| Args: | |
| oss_key: The key (path) in OSS of the data to delete | |
| Returns: | |
| bool: True if successful, False otherwise | |
| """ | |
| if not self.initialize(): | |
| logger.warn("OSS client not initialized or export is disabled") | |
| return False | |
| try: | |
| self.bucket.delete_object(oss_key) | |
| logger.info(f"Successfully deleted data from OSS: {oss_key}") | |
| return True | |
| except Exception as e: | |
| logger.warn(f"Failed to delete data from OSS: {str(e)}") | |
| return False | |
| def update_data(self, oss_key: str, data: Union[IO[AnyStr], str, bytes, dict]) -> Optional[str]: | |
| """ | |
| Update data in OSS (delete and upload). | |
| Args: | |
| oss_key: The key (path) in OSS of the data to update | |
| data: New data to upload, can be a file object or other supported types | |
| Returns: | |
| str: The OSS key if successful, None otherwise | |
| """ | |
| # For OSS, update is the same as upload (it overwrites) | |
| return self.upload_data(data, oss_key) | |
| def update_json(self, oss_key: str, update_dict: dict) -> Optional[str]: | |
| """ | |
| Update JSON data in OSS by merging with existing data. | |
| Args: | |
| oss_key: The key (path) in OSS of the JSON data to update | |
| update_dict: Dictionary with fields to update | |
| Returns: | |
| str: The OSS key if successful, None otherwise | |
| """ | |
| if not self.initialize(): | |
| logger.warn("OSS client not initialized or export is disabled") | |
| return None | |
| try: | |
| # Read existing data | |
| existing_data = self.read_data(oss_key, as_json=True) | |
| if existing_data is None: | |
| existing_data = {} | |
| # Update data | |
| if isinstance(existing_data, dict): | |
| existing_data.update(update_dict) | |
| else: | |
| logger.warn(f"Existing data is not a dictionary: {oss_key}") | |
| return None | |
| # Upload updated data | |
| return self.upload_data(existing_data, oss_key) | |
| except Exception as e: | |
| logger.warn(f"Failed to update JSON data in OSS: {str(e)}") | |
| return None | |
| # ---- File Operation Methods ---- | |
| def upload_file(self, local_file: str, oss_key: Optional[str] = None) -> Optional[str]: | |
| """ | |
| Upload a local file to OSS. | |
| Args: | |
| local_file: Path to the local file | |
| oss_key: The key (path) in OSS where the file will be stored. | |
| If None, will use the basename of the local file | |
| Returns: | |
| str: The OSS key if successful, None otherwise | |
| """ | |
| if not self.initialize(): | |
| logger.warn("OSS client not initialized or export is disabled") | |
| return None | |
| try: | |
| if not os.path.exists(local_file): | |
| logger.warn(f"Local file {local_file} does not exist") | |
| return None | |
| if oss_key is None: | |
| oss_key = f"uploads/{os.path.basename(local_file)}" | |
| self.bucket.put_object_from_file(oss_key, local_file) | |
| logger.info(f"Successfully uploaded {local_file} to OSS: {oss_key}") | |
| return oss_key | |
| except Exception as e: | |
| logger.warn(f"Failed to upload {local_file} to OSS: {str(e)}") | |
| return None | |
| def download_file(self, oss_key: str, local_file: str) -> bool: | |
| """ | |
| Download a file from OSS to local. | |
| Args: | |
| oss_key: The key (path) in OSS of the file to download | |
| local_file: Path where the downloaded file will be saved | |
| Returns: | |
| bool: True if successful, False otherwise | |
| """ | |
| if not self.initialize(): | |
| logger.warn("OSS client not initialized or export is disabled") | |
| return False | |
| try: | |
| # Ensure the directory exists | |
| os.makedirs(os.path.dirname(os.path.abspath(local_file)), exist_ok=True) | |
| # Download the file | |
| self.bucket.get_object_to_file(oss_key, local_file) | |
| logger.info(f"Successfully downloaded {oss_key} to {local_file}") | |
| return True | |
| except Exception as e: | |
| logger.warn(f"Failed to download {oss_key} from OSS: {str(e)}") | |
| return False | |
| def list_objects(self, prefix: str = "", delimiter: str = "") -> List[Dict[str, Any]]: | |
| """ | |
| List objects in the OSS bucket with the given prefix. | |
| Args: | |
| prefix: Prefix to filter objects | |
| delimiter: Delimiter for hierarchical listing | |
| Returns: | |
| List of objects with their properties | |
| """ | |
| if not self.initialize(): | |
| logger.warn("OSS client not initialized or export is disabled") | |
| return [] | |
| try: | |
| result = [] | |
| for obj in self.bucket.list_objects(prefix=prefix, delimiter=delimiter).object_list: | |
| result.append({ | |
| 'key': obj.key, | |
| 'size': obj.size, | |
| 'last_modified': obj.last_modified | |
| }) | |
| return result | |
| except Exception as e: | |
| logger.warn(f"Failed to list objects with prefix {prefix}: {str(e)}") | |
| return [] | |
| # ---- Advanced Operation Methods ---- | |
| def exists(self, oss_key: str) -> bool: | |
| """ | |
| Check if an object exists in OSS. | |
| Args: | |
| oss_key: The key (path) in OSS to check | |
| Returns: | |
| bool: True if the object exists, False otherwise | |
| """ | |
| if not self.initialize(): | |
| logger.warn("OSS client not initialized or export is disabled") | |
| return False | |
| try: | |
| # Use head_object to check if the object exists | |
| self.bucket.head_object(oss_key) | |
| return True | |
| except: | |
| return False | |
| def copy_object(self, source_key: str, target_key: str) -> bool: | |
| """ | |
| Copy an object within the same bucket. | |
| Args: | |
| source_key: The source object key | |
| target_key: The target object key | |
| Returns: | |
| bool: True if successful, False otherwise | |
| """ | |
| if not self.initialize(): | |
| logger.warn("OSS client not initialized or export is disabled") | |
| return False | |
| try: | |
| self.bucket.copy_object(self.bucket_name, source_key, target_key) | |
| logger.info(f"Successfully copied {source_key} to {target_key}") | |
| return True | |
| except Exception as e: | |
| logger.warn(f"Failed to copy {source_key} to {target_key}: {str(e)}") | |
| return False | |
| def get_object_url(self, oss_key: str, expires: int = 3600) -> Optional[str]: | |
| """ | |
| Generate a temporary URL for accessing an object. | |
| Args: | |
| oss_key: The key (path) in OSS of the object | |
| expires: URL expiration time in seconds | |
| Returns: | |
| str: The signed URL, or None if failed | |
| """ | |
| if not self.initialize(): | |
| logger.warn("OSS client not initialized or export is disabled") | |
| return None | |
| try: | |
| url = self.bucket.sign_url('GET', oss_key, expires) | |
| return url | |
| except Exception as e: | |
| logger.warn(f"Failed to generate URL for {oss_key}: {str(e)}") | |
| return None | |
| def upload_directory(self, local_dir: str, oss_prefix: str = "") -> Tuple[bool, List[str]]: | |
| """ | |
| Upload an entire directory to OSS. | |
| Args: | |
| local_dir: Path to the local directory | |
| oss_prefix: Prefix to prepend to all uploaded files | |
| Returns: | |
| Tuple of (success, list of uploaded files) | |
| """ | |
| if not self.initialize(): | |
| logger.warn("OSS client not initialized or export is disabled") | |
| return False, [] | |
| if not os.path.isdir(local_dir): | |
| logger.warn(f"Local directory {local_dir} does not exist or is not a directory") | |
| return False, [] | |
| uploaded_files = [] | |
| errors = [] | |
| for root, _, files in os.walk(local_dir): | |
| for file in files: | |
| local_file = os.path.join(root, file) | |
| rel_path = os.path.relpath(local_file, local_dir) | |
| oss_key = os.path.join(oss_prefix, rel_path).replace("\\", "/") | |
| result = self.upload_file(local_file, oss_key) | |
| if result: | |
| uploaded_files.append(result) | |
| else: | |
| errors.append(local_file) | |
| if errors: | |
| logger.warn(f"Failed to upload {len(errors)} files") | |
| return False, uploaded_files | |
| return True, uploaded_files | |
| def get_oss_client(access_key_id: Optional[str] = None, | |
| access_key_secret: Optional[str] = None, | |
| endpoint: Optional[str] = None, | |
| bucket_name: Optional[str] = None, | |
| enable_export: Optional[bool] = None) -> OSSClient: | |
| """ | |
| Factory function to create and initialize an OSSClient. | |
| Args: | |
| access_key_id: OSS access key ID | |
| access_key_secret: OSS access key secret | |
| endpoint: OSS endpoint | |
| bucket_name: OSS bucket name | |
| enable_export: Whether to enable OSS export | |
| Returns: | |
| OSSClient: An initialized OSSClient instance | |
| """ | |
| client = OSSClient( | |
| access_key_id=access_key_id, | |
| access_key_secret=access_key_secret, | |
| endpoint=endpoint, | |
| bucket_name=bucket_name, | |
| enable_export=enable_export | |
| ) | |
| client.initialize() | |
| return client | |
| # ---- Test Cases ---- | |
| if __name__ == "__main__": | |
| """ | |
| OSS tool class test cases | |
| Note: Before running the tests, you need to set the following environment variables, | |
| or provide the parameters directly in the test code: | |
| - OSS_ACCESS_KEY_ID | |
| - OSS_ACCESS_KEY_SECRET | |
| - OSS_ENDPOINT | |
| - OSS_BUCKET_NAME | |
| - EXPORT_REPLAY_TRACE_TO_OSS=true | |
| """ | |
| import io | |
| import time | |
| # Test configuration | |
| TEST_PREFIX = f"test/oss_utils_123" # Use timestamp to avoid conflicts | |
| # Initialize client | |
| # Method 1: Using environment variables | |
| # oss_client = get_oss_client(enable_export=True) | |
| # Method 2: Provide parameters directly | |
| oss_client = get_oss_client( | |
| access_key_id="", # Replace with your actual access key ID | |
| access_key_secret="", # Replace with your actual access key secret | |
| endpoint="", # Replace with your actual OSS endpoint | |
| bucket_name="", # Replace with your actual bucket name | |
| enable_export=True | |
| ) | |
| text_key = f"{TEST_PREFIX}/text.txt" | |
| result = oss_client.upload_data("malai This is a test text", text_key) | |
| print(f"Upload string data: {'Success: ' + result if result else 'Failed'}") | |
| print("\nTest 6: Read text data") | |
| content = oss_client.read_text(text_key) | |
| print(f"Read text data: {content}") | |
| # Test 1: Upload string data | |
| print("\nTest 1: Upload string data") | |
| text_key = f"{TEST_PREFIX}/text.txt" | |
| result = oss_client.upload_data("This is a test text", text_key) | |
| print(f"Upload string data: {'Success: ' + result if result else 'Failed'}") | |
| # Test 2: Upload dictionary data (automatically converted to JSON) | |
| print("\nTest 2: Upload dictionary data") | |
| json_key = f"{TEST_PREFIX}/data.json" | |
| data = { | |
| "name": "Test data", | |
| "values": [1, 2, 3], | |
| "nested": { | |
| "key": "value" | |
| } | |
| } | |
| result = oss_client.upload_data(data, json_key) | |
| print(f"Upload dictionary data: {'Success: ' + result if result else 'Failed'}") | |
| # Test 3: Upload in-memory binary file object | |
| print("\nTest 3: Upload in-memory binary file object") | |
| binary_key = f"{TEST_PREFIX}/binary.dat" | |
| binary_data = io.BytesIO(b"\x00\x01\x02\x03\x04") | |
| result = oss_client.upload_data(binary_data, binary_key) | |
| print(f"Upload binary file object: {'Success: ' + result if result else 'Failed'}") | |
| # Test 4: Upload in-memory text file object | |
| print("\nTest 4: Upload in-memory text file object") | |
| text_file_key = f"{TEST_PREFIX}/text_file.txt" | |
| text_file = io.StringIO("This is the content of an in-memory text file") | |
| result = oss_client.upload_data(text_file, text_file_key) | |
| print(f"Upload text file object: {'Success: ' + result if result else 'Failed'}") | |
| # Test 5: Create and upload temporary file | |
| print("\nTest 5: Create and upload temporary file") | |
| with tempfile.NamedTemporaryFile(delete=False) as tmp: | |
| tmp.write(b"This is the content of a temporary file") | |
| tmp_path = tmp.name | |
| file_key = f"{TEST_PREFIX}/temp_file.txt" | |
| result = oss_client.upload_file(tmp_path, file_key) | |
| print(f"Upload temporary file: {'Success: ' + result if result else 'Failed'}") | |
| os.unlink(tmp_path) # Delete temporary file | |
| # Test 6: Read text data | |
| print("\nTest 6: Read text data") | |
| content = oss_client.read_text(text_key) | |
| print(f"Read text data: {content}") | |
| # Test 7: Read JSON data | |
| print("\nTest 7: Read JSON data") | |
| json_content = oss_client.read_data(json_key, as_json=True) | |
| print(f"Read JSON data: {json_content}") | |
| # Test 8: Update JSON data (merge method) | |
| print("\nTest 8: Update JSON data") | |
| update_data = {"updated": True, "timestamp": time.time()} | |
| result = oss_client.update_json(json_key, update_data) | |
| print(f"Update JSON data: {'Success: ' + result if result else 'Failed'}") | |
| # View updated JSON data | |
| updated_json = oss_client.read_data(json_key, as_json=True) | |
| print(f"Updated JSON data: {updated_json}") | |
| # Test 9: Overwrite existing data | |
| print("\nTest 9: Overwrite existing data") | |
| result = oss_client.upload_data("This is the overwritten text", text_key) | |
| print(f"Overwrite existing data: {'Success: ' + result if result else 'Failed'}") | |
| # View overwritten data | |
| new_content = oss_client.read_text(text_key) | |
| print(f"Overwritten text data: {new_content}") | |
| # Test 10: List objects | |
| print("\nTest 10: List objects") | |
| objects = oss_client.list_objects(prefix=TEST_PREFIX) | |
| print(f"Found {len(objects)} objects:") | |
| for obj in objects: | |
| print(f" - {obj['key']} (Size: {obj['size']} bytes, Modified: {obj['last_modified']})") | |
| # Test 11: Generate temporary URL | |
| print("\nTest 11: Generate temporary URL") | |
| url = oss_client.get_object_url(text_key, expires=300) # 5 minutes expiration | |
| print(f"Temporary URL: {url}") | |
| # Test 12: Copy object | |
| print("\nTest 12: Copy object") | |
| copy_key = f"{TEST_PREFIX}/copy_of_text.txt" | |
| result = oss_client.copy_object(text_key, copy_key) | |
| print(f"Copy object: {'Success: ' + copy_key if result else 'Failed'}") | |
| # Test 13: Check if object exists | |
| print("\nTest 13: Check if object exists") | |
| exists = oss_client.exists(text_key) | |
| print(f"Object {text_key} exists: {exists}") | |
| non_existent_key = f"{TEST_PREFIX}/non_existent.txt" | |
| exists = oss_client.exists(non_existent_key) | |
| print(f"Object {non_existent_key} exists: {exists}") | |
| # Test 14: Delete objects | |
| print("\nTest 14: Delete objects") | |
| for obj in objects: | |
| success = oss_client.delete_data(obj['key']) | |
| print(f"Delete object {obj['key']}: {'Success' if success else 'Failed'}") | |
| # Cleanup: Delete copied object (may not be included in the previous list) | |
| oss_client.delete_data(copy_key) | |
| print("\nTests completed!") |