Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| from datetime import datetime | |
| import logging | |
| import os | |
| from pathlib import Path | |
| logger = logging.getLogger(__name__) | |
| class JiraCsvImporter: | |
| """ | |
| Клас для імпорту даних з CSV-файлів Jira | |
| """ | |
| def __init__(self, file_path): | |
| """ | |
| Ініціалізація імпортера CSV. | |
| Args: | |
| file_path (str): Шлях до CSV-файлу | |
| """ | |
| self.file_path = file_path | |
| self.df = None | |
| def load_data(self): | |
| """ | |
| Завантаження даних з CSV-файлу. | |
| Returns: | |
| pandas.DataFrame: Завантажені дані або None у випадку помилки | |
| """ | |
| try: | |
| logger.info(f"Завантаження CSV-файлу: {self.file_path}") | |
| print(f"Завантаження CSV-файлу: {self.file_path}") # Додаткове логування в консоль | |
| # Перевірка існування файлу | |
| if not os.path.exists(self.file_path): | |
| logger.error(f"Файл не знайдено: {self.file_path}") | |
| print(f"Файл не знайдено: {self.file_path}") | |
| return None | |
| # Спробуємо різні кодування | |
| try: | |
| self.df = pd.read_csv(self.file_path, encoding='utf-8') | |
| print(f"Файл успішно прочитано з кодуванням utf-8") | |
| except UnicodeDecodeError: | |
| try: | |
| logger.warning("Помилка декодування UTF-8, спроба з latin1") | |
| print("Помилка декодування UTF-8, спроба з latin1") | |
| self.df = pd.read_csv(self.file_path, encoding='latin1') | |
| print(f"Файл успішно прочитано з кодуванням latin1") | |
| except Exception as e: | |
| logger.error(f"Помилка при спробі прочитати з latin1: {e}") | |
| print(f"Помилка при спробі прочитати з latin1: {e}") | |
| # Спробуємо читати без вказання кодування | |
| self.df = pd.read_csv(self.file_path) | |
| print(f"Файл успішно прочитано зі стандартним кодуванням") | |
| # Тимчасово вимкнемо перевірку колонок для діагностики | |
| # required_columns = self._check_required_columns() | |
| # if not required_columns: | |
| # logger.warning("CSV-файл не містить необхідних колонок") | |
| # print("CSV-файл не містить необхідних колонок") | |
| # return None | |
| # Відображення наявних колонок для діагностики | |
| print(f"Наявні колонки: {self.df.columns.tolist()}") | |
| print(f"Кількість рядків: {len(self.df)}") | |
| # Обробка дат | |
| self._process_dates() | |
| # Очищення та підготовка даних | |
| self._clean_data() | |
| logger.info(f"Успішно завантажено {len(self.df)} записів") | |
| print(f"Успішно завантажено {len(self.df)} записів") | |
| return self.df | |
| except Exception as e: | |
| logger.error(f"Помилка при завантаженні CSV-файлу: {e}") | |
| import traceback | |
| error_details = traceback.format_exc() | |
| print(f"Помилка при завантаженні CSV-файлу: {e}") | |
| print(f"Деталі помилки: {error_details}") | |
| return None | |
| def _check_required_columns(self): | |
| """ | |
| Перевірка наявності необхідних колонок у CSV-файлі. | |
| Returns: | |
| bool: True, якщо всі необхідні колонки присутні | |
| """ | |
| # Основні колонки, які очікуються у файлі Jira | |
| basic_columns = ['Summary', 'Issue key', 'Status', 'Issue Type', 'Priority', 'Created', 'Updated'] | |
| # Альтернативні назви колонок | |
| alternative_columns = { | |
| 'Summary': ['Summary', 'Короткий опис'], | |
| 'Issue key': ['Issue key', 'Key', 'Ключ'], | |
| 'Status': ['Status', 'Статус'], | |
| 'Issue Type': ['Issue Type', 'Type', 'Тип'], | |
| 'Priority': ['Priority', 'Пріоритет'], | |
| 'Created': ['Created', 'Створено'], | |
| 'Updated': ['Updated', 'Оновлено'] | |
| } | |
| # Перевірка наявності колонок | |
| missing_columns = [] | |
| for col in basic_columns: | |
| found = False | |
| # Перевірка основної назви | |
| if col in self.df.columns: | |
| found = True | |
| else: | |
| # Перевірка альтернативних назв | |
| for alt_col in alternative_columns.get(col, []): | |
| if alt_col in self.df.columns: | |
| # Перейменування колонки до стандартного імені | |
| self.df.rename(columns={alt_col: col}, inplace=True) | |
| found = True | |
| break | |
| if not found: | |
| missing_columns.append(col) | |
| if missing_columns: | |
| logger.warning(f"Відсутні колонки: {', '.join(missing_columns)}") | |
| print(f"Відсутні колонки: {', '.join(missing_columns)}") | |
| return False | |
| return True | |
| def _process_dates(self): | |
| """ | |
| Обробка дат у DataFrame. | |
| """ | |
| try: | |
| # Перетворення колонок з датами | |
| date_columns = ['Created', 'Updated', 'Resolved', 'Due Date'] | |
| for col in date_columns: | |
| if col in self.df.columns: | |
| try: | |
| self.df[col] = pd.to_datetime(self.df[col], errors='coerce') | |
| print(f"Колонку {col} успішно конвертовано до datetime") | |
| except Exception as e: | |
| logger.warning(f"Не вдалося конвертувати колонку {col} до datetime: {e}") | |
| print(f"Не вдалося конвертувати колонку {col} до datetime: {e}") | |
| except Exception as e: | |
| logger.error(f"Помилка при обробці дат: {e}") | |
| print(f"Помилка при обробці дат: {e}") | |
| def _clean_data(self): | |
| """ | |
| Очищення та підготовка даних. | |
| """ | |
| try: | |
| # Видалення порожніх рядків | |
| if 'Issue key' in self.df.columns: | |
| self.df.dropna(subset=['Issue key'], inplace=True) | |
| print(f"Видалено порожні рядки за колонкою 'Issue key'") | |
| # Додаткова обробка даних | |
| if 'Status' in self.df.columns: | |
| self.df['Status'] = self.df['Status'].fillna('Unknown') | |
| print(f"Заповнено відсутні значення в колонці 'Status'") | |
| if 'Priority' in self.df.columns: | |
| self.df['Priority'] = self.df['Priority'].fillna('Not set') | |
| print(f"Заповнено відсутні значення в колонці 'Priority'") | |
| # Створення додаткових колонок для аналізу | |
| if 'Created' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Created']): | |
| self.df['Created_Date'] = self.df['Created'].dt.date | |
| self.df['Created_Month'] = self.df['Created'].dt.to_period('M') | |
| print(f"Створено додаткові колонки для дат створення") | |
| if 'Updated' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Updated']): | |
| self.df['Updated_Date'] = self.df['Updated'].dt.date | |
| self.df['Days_Since_Update'] = (datetime.now() - self.df['Updated']).dt.days | |
| print(f"Створено додаткові колонки для дат оновлення") | |
| except Exception as e: | |
| logger.error(f"Помилка при очищенні даних: {e}") | |
| print(f"Помилка при очищенні даних: {e}") | |
| def export_to_csv(self, output_path=None): | |
| """ | |
| Експорт оброблених даних у CSV-файл. | |
| Args: | |
| output_path (str): Шлях для збереження файлу. Якщо None, створюється автоматично. | |
| Returns: | |
| str: Шлях до збереженого файлу або None у випадку помилки | |
| """ | |
| if self.df is None: | |
| logger.error("Немає даних для експорту") | |
| return None | |
| try: | |
| if output_path is None: | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| output_dir = Path("exported_data") | |
| output_dir.mkdir(exist_ok=True) | |
| output_path = output_dir / f"jira_data_{timestamp}.csv" | |
| self.df.to_csv(output_path, index=False, encoding='utf-8') | |
| logger.info(f"Дані успішно експортовано у {output_path}") | |
| return str(output_path) | |
| except Exception as e: | |
| logger.error(f"Помилка при експорті даних: {e}") | |
| return None |