Spaces:
Running
Running
import os | |
import logging | |
import tiktoken | |
import pandas as pd | |
from datetime import datetime | |
from typing import List, Dict, Any, Optional | |
# Налаштування логування | |
logger = logging.getLogger(__name__) | |
# Перевірка наявності LlamaIndex | |
try: | |
from llama_index.core import Document | |
from llama_index.core.llms import ChatMessage | |
LLAMA_INDEX_AVAILABLE = True | |
except ImportError: | |
logger.warning("Не вдалося імпортувати LlamaIndex. Встановіть необхідні залежності для використання AI Report.") | |
LLAMA_INDEX_AVAILABLE = False | |
# Імпорт промпта для звіту | |
from prompts import get_report_prompt | |
class JiraAIReport: | |
""" | |
Клас для генерації аналітичних звітів на основі даних Jira, | |
використовуючи повний контекст даних (аналогічно режиму Q/A). | |
""" | |
def __init__(self, api_key_openai=None, api_key_gemini=None, model_type="gemini", temperature=0.2): | |
""" | |
Ініціалізація AI генератора звітів. | |
Args: | |
api_key_openai (str): API ключ для OpenAI | |
api_key_gemini (str): API ключ для Google Gemini | |
model_type (str): Тип моделі ("openai" або "gemini") | |
temperature (float): Параметр температури для генерації відповідей | |
""" | |
self.model_type = model_type.lower() | |
self.temperature = temperature | |
self.api_key_openai = api_key_openai or os.getenv("OPENAI_API_KEY", "") | |
self.api_key_gemini = api_key_gemini or os.getenv("GEMINI_API_KEY", "") | |
# Перевірка наявності LlamaIndex | |
if not LLAMA_INDEX_AVAILABLE: | |
logger.error("LlamaIndex не доступний. Встановіть пакети: pip install llama-index-llms-gemini llama-index") | |
raise ImportError("LlamaIndex не встановлено. Необхідний для роботи генератора звітів.") | |
# Ініціалізація моделі LLM | |
self.llm = None | |
# Дані Jira | |
self.df = None | |
self.jira_documents = [] | |
# Ініціалізуємо модель LLM | |
self._initialize_llm() | |
def _initialize_llm(self): | |
"""Ініціалізує модель LLM відповідно до налаштувань.""" | |
try: | |
# Ініціалізація LLM моделі | |
if self.model_type == "gemini" and self.api_key_gemini: | |
os.environ["GEMINI_API_KEY"] = self.api_key_gemini | |
from llama_index.llms.gemini import Gemini | |
self.llm = Gemini( | |
model="models/gemini-2.0-flash", | |
temperature=self.temperature, | |
max_tokens=4096, | |
) | |
logger.info("Успішно ініціалізовано Gemini 2.0 Flash модель") | |
elif self.model_type == "openai" and self.api_key_openai: | |
os.environ["OPENAI_API_KEY"] = self.api_key_openai | |
from llama_index.llms.openai import OpenAI | |
self.llm = OpenAI( | |
model="gpt-4o-mini", | |
temperature=self.temperature, | |
max_tokens=4096 | |
) | |
logger.info("Успішно ініціалізовано OpenAI GPT-4o-mini модель") | |
else: | |
error_msg = f"Не вдалося ініціалізувати LLM модель типу {self.model_type}. Перевірте API ключі." | |
logger.error(error_msg) | |
raise ValueError(error_msg) | |
except Exception as e: | |
logger.error(f"Помилка ініціалізації моделі LLM: {e}") | |
raise | |
def load_documents_from_dataframe(self, df): | |
""" | |
Завантаження документів прямо з DataFrame без створення індексів. | |
Args: | |
df (pandas.DataFrame): DataFrame з даними Jira | |
Returns: | |
bool: True якщо дані успішно завантажено | |
""" | |
try: | |
logger.info("Завантаження даних з DataFrame для генерації звіту") | |
# Зберігаємо оригінальний DataFrame | |
self.df = df.copy() | |
# Конвертуємо дані в документи | |
self._convert_dataframe_to_documents() | |
return True | |
except Exception as e: | |
logger.error(f"Помилка при завантаженні даних з DataFrame: {e}") | |
return False | |
def _convert_dataframe_to_documents(self): | |
""" | |
Перетворює дані DataFrame в об'єкти Document для роботи з моделлю LLM. | |
""" | |
import pandas as pd | |
if self.df is None: | |
logger.error("Не вдалося створити документи: відсутні дані DataFrame") | |
return | |
logger.info("Перетворення даних DataFrame в документи для звіту...") | |
self.jira_documents = [] | |
for idx, row in self.df.iterrows(): | |
# Основний текст - опис тікета | |
text = "" | |
if 'Description' in row and pd.notna(row['Description']): | |
text = str(row['Description']) | |
# Додавання коментарів, якщо вони є | |
for col in self.df.columns: | |
if col.startswith('Comment') and pd.notna(row[col]): | |
text += f"\n\nКоментар: {str(row[col])}" | |
# Метадані для документа | |
metadata = { | |
"issue_key": row['Issue key'] if 'Issue key' in row and pd.notna(row['Issue key']) else "", | |
"issue_type": row['Issue Type'] if 'Issue Type' in row and pd.notna(row['Issue Type']) else "", | |
"status": row['Status'] if 'Status' in row and pd.notna(row['Status']) else "", | |
"priority": row['Priority'] if 'Priority' in row and pd.notna(row['Priority']) else "", | |
"assignee": row['Assignee'] if 'Assignee' in row and pd.notna(row['Assignee']) else "", | |
"reporter": row['Reporter'] if 'Reporter' in row and pd.notna(row['Reporter']) else "", | |
"created": str(row['Created']) if 'Created' in row and pd.notna(row['Created']) else "", | |
"updated": str(row['Updated']) if 'Updated' in row and pd.notna(row['Updated']) else "", | |
"summary": row['Summary'] if 'Summary' in row and pd.notna(row['Summary']) else "", | |
"project": row['Project name'] if 'Project name' in row and pd.notna(row['Project name']) else "" | |
} | |
# Додатково перевіряємо поле зв'язків, якщо воно є | |
if 'Outward issue link (Relates)' in row and pd.notna(row['Outward issue link (Relates)']): | |
metadata["related_issues"] = row['Outward issue link (Relates)'] | |
# Додатково перевіряємо інші можливі поля зв'язків | |
for col in self.df.columns: | |
if col.startswith('Outward issue link') and col != 'Outward issue link (Relates)' and pd.notna(row[col]): | |
link_type = col.replace('Outward issue link ', '').strip('()') | |
if "links" not in metadata: | |
metadata["links"] = {} | |
metadata["links"][link_type] = str(row[col]) | |
# Створення документа | |
doc = Document( | |
text=text, | |
metadata=metadata | |
) | |
self.jira_documents.append(doc) | |
logger.info(f"Створено {len(self.jira_documents)} документів для генерації звіту") | |
def _count_tokens(self, text: str, model: str = "gpt-3.5-turbo") -> int: | |
""" | |
Підраховує приблизну кількість токенів для тексту. | |
Args: | |
text (str): Текст для підрахунку токенів | |
model (str): Назва моделі для вибору енкодера | |
Returns: | |
int: Кількість токенів | |
""" | |
try: | |
encoding = tiktoken.encoding_for_model(model) | |
tokens = encoding.encode(text) | |
return len(tokens) | |
except Exception as e: | |
logger.warning(f"Не вдалося підрахувати токени через tiktoken: {e}") | |
# Якщо не можемо використати tiktoken, робимо просту оцінку | |
# В середньому 1 токен ≈ 3 символи для змішаного тексту | |
return len(text) // 3 # Приблизна оцінка | |
def _prepare_context_data(self): | |
""" | |
Підготовка даних для контексту звіту. | |
Returns: | |
str: Підготовлений контекст з даних | |
""" | |
if not self.jira_documents: | |
logger.error("Відсутні документи для підготовки контексту") | |
return "" | |
# Статистика по тікетах | |
status_counts = {} | |
type_counts = {} | |
priority_counts = {} | |
assignee_counts = {} | |
for doc in self.jira_documents: | |
status = doc.metadata.get("status", "") | |
issue_type = doc.metadata.get("issue_type", "") | |
priority = doc.metadata.get("priority", "") | |
assignee = doc.metadata.get("assignee", "") | |
if status: | |
status_counts[status] = status_counts.get(status, 0) + 1 | |
if issue_type: | |
type_counts[issue_type] = type_counts.get(issue_type, 0) + 1 | |
if priority: | |
priority_counts[priority] = priority_counts.get(priority, 0) + 1 | |
if assignee: | |
assignee_counts[assignee] = assignee_counts.get(assignee, 0) + 1 | |
# Формуємо текстовий опис для LLM | |
data_summary = f"СТАТИСТИКА ПРОЕКТУ JIRA:\n\n" | |
data_summary += f"Загальна кількість тікетів: {len(self.jira_documents)}\n\n" | |
data_summary += "Розподіл за статусами:\n" | |
for status, count in sorted(status_counts.items(), key=lambda x: x[1], reverse=True): | |
percentage = (count / len(self.jira_documents) * 100) | |
data_summary += f"- {status}: {count} ({percentage:.1f}%)\n" | |
data_summary += "\nРозподіл за типами:\n" | |
for type_name, count in sorted(type_counts.items(), key=lambda x: x[1], reverse=True): | |
percentage = (count / len(self.jira_documents) * 100) | |
data_summary += f"- {type_name}: {count} ({percentage:.1f}%)\n" | |
data_summary += "\nРозподіл за пріоритетами:\n" | |
for priority, count in sorted(priority_counts.items(), key=lambda x: x[1], reverse=True): | |
percentage = (count / len(self.jira_documents) * 100) | |
data_summary += f"- {priority}: {count} ({percentage:.1f}%)\n" | |
# Топ-5 виконавців | |
if assignee_counts: | |
data_summary += "\nТоп виконавці завдань:\n" | |
for assignee, count in sorted(assignee_counts.items(), key=lambda x: x[1], reverse=True)[:5]: | |
data_summary += f"- {assignee}: {count} тікетів\n" | |
# Додаємо інформацію про важливі тікети (з високим пріоритетом) | |
high_priority_tickets = [] | |
for doc in self.jira_documents: | |
if doc.metadata.get("priority", "").lower() in ["high", "highest", "critical", "blocker", "високий", "критичний"]: | |
high_priority_tickets.append(doc) | |
if high_priority_tickets: | |
data_summary += "\nВажливі тікети (високий пріоритет):\n" | |
for doc in high_priority_tickets[:5]: # Обмежуємо кількість для економії токенів | |
issue_key = doc.metadata.get("issue_key", "") | |
summary = doc.metadata.get("summary", "") | |
status = doc.metadata.get("status", "") | |
data_summary += f"- {issue_key}: '{summary}' (Статус: {status})\n" | |
# Додаємо інформацію про останні оновлені тікети | |
try: | |
# Спочатку намагаємося отримати список тікетів з датами оновлення | |
tickets_with_dates = [] | |
for doc in self.jira_documents: | |
updated = doc.metadata.get("updated", "") | |
if updated: | |
try: | |
# Спроба парсингу дати | |
updated_date = pd.to_datetime(updated) | |
tickets_with_dates.append((doc, updated_date)) | |
except: | |
pass | |
# Сортуємо за датою оновлення (від найновіших до найстаріших) | |
tickets_with_dates.sort(key=lambda x: x[1], reverse=True) | |
# Додаємо інформацію про останні оновлені тікети | |
if tickets_with_dates: | |
data_summary += "\nОстанні оновлені тікети:\n" | |
for doc, date in tickets_with_dates[:5]: | |
issue_key = doc.metadata.get("issue_key", "") | |
summary = doc.metadata.get("summary", "") | |
status = doc.metadata.get("status", "") | |
data_summary += f"- {issue_key}: '{summary}' (Статус: {status}, Оновлено: {date.strftime('%Y-%m-%d')})\n" | |
except Exception as e: | |
logger.warning(f"Помилка при обробці дат оновлення: {e}") | |
return data_summary | |
def generate_report(self, format_type="markdown") -> Dict[str, Any]: | |
""" | |
Генерація аналітичного звіту на основі даних Jira. | |
Args: | |
format_type (str): Формат звіту ("markdown", "html") | |
Returns: | |
Dict[str, Any]: Словник з результатами, включаючи звіт та метадані | |
""" | |
try: | |
if not self.jira_documents or not self.llm: | |
error_msg = "Не вдалося виконати запит: відсутні документи або LLM" | |
logger.error(error_msg) | |
return {"error": error_msg} | |
logger.info(f"Запуск генерації звіту у форматі {format_type}") | |
# Підготовка контексту з даних | |
data_summary = self._prepare_context_data() | |
# Підрахунок токенів для контексту | |
context_tokens = self._count_tokens(data_summary) | |
logger.info(f"Підготовлено контекст для звіту: {context_tokens} токенів") | |
# Отримуємо системний промпт відповідно до формату | |
system_prompt = get_report_prompt(format_type) | |
# Формуємо повідомлення для чату | |
messages = [ | |
ChatMessage(role="system", content=system_prompt), | |
ChatMessage(role="user", content=f"Ось дані для аналізу:\n\n{data_summary}") | |
] | |
# Отримуємо відповідь від LLM | |
logger.info("Генерація звіту...") | |
response = self.llm.chat(messages) | |
# Підрахунок токенів для відповіді | |
report_text = str(response) | |
response_tokens = self._count_tokens(report_text) | |
logger.info(f"Звіт успішно згенеровано, токенів: {response_tokens}") | |
return { | |
"report": report_text, | |
"metadata": { | |
"context_tokens": context_tokens, | |
"report_tokens": response_tokens, | |
"total_tokens": context_tokens + response_tokens, | |
"format": format_type, | |
"documents_used": len(self.jira_documents) | |
} | |
} | |
except Exception as e: | |
error_msg = f"Помилка при генерації звіту: {e}" | |
logger.error(error_msg) | |
return {"error": error_msg} | |
def get_statistics(self) -> Dict[str, Any]: | |
""" | |
Повертає загальну статистику за документами. | |
Returns: | |
Dict[str, Any]: Словник зі статистикою | |
""" | |
if not self.jira_documents: | |
return {"error": "Немає завантажених документів"} | |
# Статистика по тікетах | |
status_counts = {} | |
type_counts = {} | |
priority_counts = {} | |
assignee_counts = {} | |
for doc in self.jira_documents: | |
status = doc.metadata.get("status", "") | |
issue_type = doc.metadata.get("issue_type", "") | |
priority = doc.metadata.get("priority", "") | |
assignee = doc.metadata.get("assignee", "") | |
if status: | |
status_counts[status] = status_counts.get(status, 0) + 1 | |
if issue_type: | |
type_counts[issue_type] = type_counts.get(issue_type, 0) + 1 | |
if priority: | |
priority_counts[priority] = priority_counts.get(priority, 0) + 1 | |
if assignee: | |
assignee_counts[assignee] = assignee_counts.get(assignee, 0) + 1 | |
# Формуємо результат | |
return { | |
"document_count": len(self.jira_documents), | |
"status_counts": status_counts, | |
"type_counts": type_counts, | |
"priority_counts": priority_counts, | |
"top_assignees": dict(sorted(assignee_counts.items(), key=lambda x: x[1], reverse=True)[:5]) | |
} |