Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import os | |
| import json | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class DataExistsError(Exception): | |
| pass | |
| class DataNotFoundError(Exception): | |
| pass | |
| # FIXME: 😓这个东西写的比较拉跨,最好找个什么csv库替代掉... | |
| class BaseManager: | |
| def __init__(self, csv_file): | |
| self.csv_file = csv_file | |
| self.columns = ["id", "name", "desc", "params"] | |
| if not os.path.exists(csv_file): | |
| df = pd.DataFrame(columns=self.columns) | |
| df.to_csv(self.csv_file, index=False) | |
| def _load_data(self): | |
| return pd.read_csv(self.csv_file) | |
| def _save_data(self, df): | |
| df.to_csv(self.csv_file, index=False) | |
| def add_item(self, item_id, name, desc, params): | |
| df = self._load_data() | |
| if item_id in df["id"].values: | |
| raise DataExistsError(f"Item ID {item_id} already exists.") | |
| new_row = pd.DataFrame( | |
| [ | |
| { | |
| "id": item_id, | |
| "name": name, | |
| "desc": desc, | |
| "params": json.dumps(params, ensure_ascii=False), | |
| } | |
| ] | |
| ) | |
| df = pd.concat([df, new_row], ignore_index=True) | |
| self._save_data(df) | |
| def delete_item(self, item_id): | |
| df = self._load_data() | |
| if item_id not in df["id"].values: | |
| raise DataNotFoundError(f"Item ID {item_id} not found.") | |
| df = df[df["id"] != item_id] | |
| self._save_data(df) | |
| def update_item(self, item_id, name=None, desc=None, params=None): | |
| df = self._load_data() | |
| if item_id not in df["id"].values: | |
| raise DataNotFoundError(f"Item ID {item_id} not found.") | |
| if name: | |
| df.loc[df["id"] == item_id, "name"] = name | |
| if desc: | |
| df.loc[df["id"] == item_id, "desc"] = desc | |
| if params: | |
| df.loc[df["id"] == item_id, "params"] = params | |
| self._save_data(df) | |
| def get_item(self, item_id): | |
| df = self._load_data() | |
| if item_id not in df["id"].values: | |
| raise DataNotFoundError(f"Item ID {item_id} not found.") | |
| item = df[df["id"] == item_id].to_dict("records")[0] | |
| item["params"] = json.loads(item["params"]) | |
| return item | |
| def list_items(self): | |
| df = self._load_data() | |
| items = df.to_dict("records") | |
| for item in items: | |
| item["params"] = json.loads(item["params"]) | |
| return items | |
| def find_item_by_name(self, name): | |
| df = self._load_data() | |
| if name not in df["name"].values: | |
| raise DataNotFoundError(f"Name {name} not found.") | |
| item = df[df["name"] == name].to_dict("records")[0] | |
| item["params"] = json.loads(item["params"]) | |
| return item | |
| def find_params_by_name(self, name): | |
| try: | |
| return self.find_item_by_name(name)["params"] | |
| except Exception as e: | |
| logger.error(e) | |
| return {} | |
| def find_params_by_id(self, id): | |
| try: | |
| return self.get_item(id)["params"] | |
| except Exception as e: | |
| logger.error(e) | |
| return {} | |
| # Usage example | |
| if __name__ == "__main__": | |
| class SpeakerManager(BaseManager): | |
| def __init__(self, csv_file): | |
| super().__init__(csv_file) | |
| manager = SpeakerManager("speakers.test.csv") | |
| try: | |
| # Add speaker | |
| manager.add_item( | |
| 1, "Speaker1", "Description for speaker 1", '{"param1": "value1"}' | |
| ) | |
| except DataExistsError as e: | |
| print(e) | |
| # List all speakers | |
| speakers = manager.list_items() | |
| print(speakers) | |
| try: | |
| # Get specific speaker | |
| speaker = manager.get_item(1) | |
| print(speaker) | |
| except DataNotFoundError as e: | |
| print(e) | |
| try: | |
| # Update speaker | |
| manager.update_item( | |
| 1, name="Updated Speaker1", desc="Updated description for speaker 1" | |
| ) | |
| except DataNotFoundError as e: | |
| print(e) | |
| try: | |
| # Delete speaker | |
| manager.delete_item(1) | |
| except DataNotFoundError as e: | |
| print(e) | |
| try: | |
| # Find speaker by name | |
| speaker_by_name = manager.find_item_by_name("Updated Speaker1") | |
| print(speaker_by_name) | |
| except DataNotFoundError as e: | |
| print(e) | |
| # Find speakers by params | |
| speakers_by_params = manager.find_items_by_params('{"param1": "value1"}') | |
| print(speakers_by_params) | |