Spaces:
Running
Running
import logging | |
import traceback | |
import os | |
import json | |
from pathlib import Path | |
from typing import Dict, List, Any, Optional | |
import tiktoken | |
import pandas as pd | |
from datetime import datetime | |
from modules.config.ai_settings import ( | |
get_metadata_csv, | |
MAX_TOKENS, | |
CHUNK_SIZE, CHUNK_OVERLAP, | |
EXCLUDED_EMBED_METADATA_KEYS, | |
EXCLUDED_LLM_METADATA_KEYS, | |
SIMILARITY_TOP_K, HYBRID_SEARCH_MODE | |
) | |
from prompts import system_prompt_hybrid_chat | |
# Імпорт базових компонентів LlamaIndex | |
from llama_index.core import ( | |
VectorStoreIndex, | |
Document, | |
StorageContext, | |
load_index_from_storage, | |
Settings | |
) | |
from llama_index.vector_stores.faiss import FaissVectorStore | |
from llama_index.retrievers.bm25 import BM25Retriever | |
from llama_index.core.query_engine import RetrieverQueryEngine | |
from llama_index.core.retrievers import QueryFusionRetriever | |
from llama_index.core.llms import ChatMessage | |
# Імпорт уніфікованого менеджера індексів | |
import builtins | |
# Забезпечення бінарного формату для всіх операцій | |
Settings.persist_json_format = False | |
from modules.data_management.index_utils import count_tokens, initialize_embedding_model, check_index_integrity | |
os.environ["CUDA_VISIBLE_DEVICES"] = "" | |
os.environ["TORCH_DEVICE"] = "cpu" | |
logger = logging.getLogger(__name__) | |
class JiraHybridChat: | |
""" | |
Клас для роботи з гібридним чатом на основі даних Jira. | |
Використовує комбінацію BM25 та векторного пошуку для покращення релевантності. | |
""" | |
# Ліміт кешу екземплярів | |
MAX_CACHE_SIZE = 5 | |
# Глобальний кеш екземплярів чату | |
chat_instances_cache = {} | |
def __init__( | |
self, | |
indices_dir=None, | |
app=None, | |
api_key_openai=None, | |
api_key_gemini=None, | |
model_type="gemini", | |
model_name=None, | |
temperature=0.2, | |
): | |
""" | |
Args: | |
indices_dir (str): Шлях до директорії з індексами | |
app: будь-який об'єкт, звідки беремо current_data (DataFrame) | |
api_key_openai (str): ключ OpenAI | |
api_key_gemini (str): ключ Google Gemini | |
model_type (str): "gemini" або "openai" | |
model_name (str): назва моделі | |
temperature (float): температура LLM | |
""" | |
self.indices_dir = indices_dir | |
self.app = app | |
self.model_type = model_type.lower() | |
self.model_name = model_name | |
self.temperature = temperature | |
self.llm_initialized = False | |
self.indices_loaded = False | |
self.api_key_openai = api_key_openai or os.getenv("OPENAI_API_KEY", "") | |
self.api_key_gemini = api_key_gemini or os.getenv("GEMINI_API_KEY", "") | |
# Проставляємо змінні середовища | |
if self.api_key_openai: | |
os.environ["OPENAI_API_KEY"] = self.api_key_openai | |
if self.api_key_gemini: | |
os.environ["GEMINI_API_KEY"] = self.api_key_gemini | |
# Основні поля | |
self.llm = None | |
self.index = None | |
self.retriever_bm25 = None | |
self.retriever_vector = None | |
self.retriever_fusion = None | |
self.query_engine = None | |
self.df = None | |
self.jira_documents = [] | |
self.nodes = [] | |
# Отримуємо index_manager з глобальної змінної, якщо доступний | |
self.index_manager = None | |
if hasattr(builtins, 'index_manager'): | |
self.index_manager = builtins.index_manager | |
logger.info("Використовується глобальний index_manager") | |
# Додаткові параметри | |
self.similarity_top_k = SIMILARITY_TOP_K | |
self.hybrid_mode = HYBRID_SEARCH_MODE | |
# Ініціалізація в оптимізованому порядку | |
self._initialize() | |
def _initialize(self): | |
"""Ініціалізація в правильному порядку для уникнення дублювання.""" | |
# 1) Ініціалізуємо LLM | |
self._initialize_llm() | |
# 2) Перевіряємо кеш | |
if self.indices_dir and self.indices_dir in JiraHybridChat.chat_instances_cache: | |
cached_instance = JiraHybridChat.chat_instances_cache[self.indices_dir] | |
if cached_instance.index is not None: | |
self._load_from_cache(cached_instance) | |
self.indices_loaded = True | |
logger.info(f"Використано кешований екземпляр для {self.indices_dir}") | |
return | |
# 3) Спробуємо завантажити з вказаного шляху | |
if self.indices_dir and self.load_indices(self.indices_dir): | |
self.indices_loaded = True | |
return | |
# 4) Завантажуємо дані для створення нових індексів | |
df = self._get_dataframe() | |
if df is None: | |
return | |
# 5) Створюємо документи | |
self.df = df | |
self.jira_documents = self._create_documents_from_dataframe(df) | |
if not self.jira_documents: | |
return | |
# 6) Створюємо індекси в пам'яті | |
if self._create_indices_in_memory(): | |
self.indices_loaded = True | |
# 7) Зберігаємо на диск, якщо вказано indices_dir | |
if self.indices_dir: | |
self._persist_indices_to_disk(self.indices_dir) | |
# 8) Кешуємо екземпляр | |
self._add_to_cache() | |
def _initialize_llm(self): | |
"""Ініціалізація LLM залежно від model_type (gemini / openai).""" | |
try: | |
if self.model_type == "gemini" and self.api_key_gemini: | |
from llama_index.llms.gemini import Gemini | |
if not self.model_name: | |
self.model_name = "models/gemini-2.0-flash" | |
self.llm = Gemini( | |
model=self.model_name, | |
temperature=self.temperature, | |
max_tokens=MAX_TOKENS, | |
) | |
logger.info(f"Успішно ініціалізовано Gemini модель: {self.model_name}") | |
self.llm_initialized = True | |
elif self.model_type == "openai" and self.api_key_openai: | |
from llama_index.llms.openai import OpenAI | |
if not self.model_name: | |
self.model_name = "gpt-4o-mini" | |
self.llm = OpenAI( | |
model=self.model_name, | |
temperature=self.temperature, | |
max_tokens=MAX_TOKENS | |
) | |
logger.info(f"Успішно ініціалізовано OpenAI модель: {self.model_name}") | |
self.llm_initialized = True | |
else: | |
error_msg = f"Не вдалося ініціалізувати LLM {self.model_type}. Перевірте ключі." | |
logger.error(error_msg) | |
raise ValueError(error_msg) | |
except Exception as e: | |
logger.error(f"Помилка ініціалізації LLM: {e}") | |
logger.error(traceback.format_exc()) | |
def _load_from_cache(self, cached_instance): | |
"""Копіюємо дані з кешованого екземпляра.""" | |
self.index = cached_instance.index | |
self.retriever_bm25 = cached_instance.retriever_bm25 | |
self.retriever_vector = cached_instance.retriever_vector | |
self.retriever_fusion = cached_instance.retriever_fusion | |
self.query_engine = cached_instance.query_engine | |
self.jira_documents = cached_instance.jira_documents | |
self.nodes = cached_instance.nodes | |
self.df = cached_instance.df | |
def _add_to_cache(self): | |
"""Додаємо поточний екземпляр у кеш.""" | |
if not self.index or self.indices_dir is None: | |
return | |
# Якщо кеш переповнений, видаляємо найстаріший запис | |
if len(JiraHybridChat.chat_instances_cache) >= self.MAX_CACHE_SIZE: | |
oldest_key = next(iter(JiraHybridChat.chat_instances_cache)) | |
JiraHybridChat.chat_instances_cache.pop(oldest_key) | |
# Додаємо поточний екземпляр у кеш | |
JiraHybridChat.chat_instances_cache[self.indices_dir] = self | |
logger.info(f"Додано екземпляр у кеш для {self.indices_dir}") | |
def _get_dataframe(self): | |
"""Отримуємо DataFrame з app.current_data або з CSV файлу.""" | |
# Спочатку пробуємо отримати з app.current_data | |
if hasattr(self, "app") and hasattr(self.app, "current_data"): | |
if isinstance(self.app.current_data, pd.DataFrame) and not self.app.current_data.empty: | |
logger.info(f"Отримано DataFrame з app.current_data: {len(self.app.current_data)} рядків") | |
return self.app.current_data | |
# Пробуємо отримати з app.last_loaded_csv | |
if hasattr(self, "app") and hasattr(self.app, "last_loaded_csv"): | |
csv_path = self.app.last_loaded_csv | |
if csv_path and os.path.exists(csv_path): | |
try: | |
df = pd.read_csv(csv_path) | |
logger.info(f"Завантажено DataFrame з CSV: {len(df)} рядків") | |
return df | |
except Exception as e: | |
logger.warning(f"Помилка при читанні CSV: {e}") | |
logger.warning("Немає доступних даних для створення індексів.") | |
return None | |
def _create_documents_from_dataframe(self, df): | |
"""Конвертуємо DataFrame у список документів.""" | |
documents = [] | |
for idx, row in df.iterrows(): | |
# Основний текст документа | |
text = "" | |
if 'Description' in row and pd.notna(row['Description']): | |
text = str(row['Description']) | |
# Додаємо коментарі до тексту | |
for col in df.columns: | |
if col.startswith("Comment") and pd.notna(row[col]): | |
text += f"\n\nКоментар: {row[col]}" | |
# Метадані документа | |
metadata = get_metadata_csv(row, idx) | |
excluded_embed_metadata_keys = [] | |
excluded_llm_metadata_keys = [] | |
# Створюємо документ | |
doc = Document( | |
text=text, | |
metadata=metadata, | |
metadata_seperator="::", | |
excluded_embed_metadata_keys=excluded_embed_metadata_keys, | |
excluded_llm_metadata_keys=excluded_llm_metadata_keys, | |
text_template="Metadata: {metadata_str}\n-----\nContent: {content}", | |
) | |
documents.append(doc) | |
logger.info(f"Створено {len(documents)} документів для індексів") | |
return documents | |
def _create_indices_in_memory(self): | |
"""Створюємо індекси FAISS в пам'яті.""" | |
try: | |
if not self.jira_documents: | |
return False | |
# Ініціалізуємо модель ембедингів | |
try: | |
embed_model = initialize_embedding_model() | |
if embed_model is None: | |
logger.error("Не вдалося отримати модель ембедингів") | |
return False | |
# Встановлюємо модель ембедингів у глобальних налаштуваннях | |
Settings.embed_model = embed_model | |
# Перевіряємо, чи це Google Embeddings | |
if "CustomEmbedding" in str(type(embed_model)): | |
logger.info("Виявлено Google Embeddings API") | |
# Отримуємо розмірність ембедингів через тестовий запит | |
test_embedding = embed_model.get_text_embedding("Test") | |
embed_dim = len(test_embedding) | |
logger.info(f"Розмірність ембедингів Google: {embed_dim}") | |
else: | |
# Це HuggingFace або інший тип ембедингів | |
sample_embedding = embed_model.get_text_embedding("Test") | |
embed_dim = len(sample_embedding) | |
logger.info(f"Розмірність локальних ембедингів: {embed_dim}") | |
except Exception as embed_error: | |
logger.error(f"Помилка при ініціалізації моделі ембедингів: {embed_error}") | |
logger.error(traceback.format_exc()) | |
return False | |
# Створюємо та налаштовуємо індекс | |
from llama_index.core.node_parser import TokenTextSplitter | |
# Розділювач тексту для чанкінгу | |
text_splitter = TokenTextSplitter( | |
chunk_size=CHUNK_SIZE, | |
chunk_overlap=CHUNK_OVERLAP | |
) | |
# Створюємо FAISS індекс | |
try: | |
import faiss | |
faiss_index = faiss.IndexFlatL2(embed_dim) | |
vector_store = FaissVectorStore(faiss_index=faiss_index) | |
# Створюємо контекст зберігання | |
storage_context = StorageContext.from_defaults(vector_store=vector_store) | |
# Створюємо індекс | |
self.index = VectorStoreIndex.from_documents( | |
self.jira_documents, | |
storage_context=storage_context, | |
transformations=[text_splitter] | |
) | |
# Зберігаємо вузли для подальшого використання | |
self.nodes = list(self.index.storage_context.docstore.docs.values()) | |
# Налаштування retrievers | |
self._setup_retrievers() | |
logger.info("Індекси успішно створено в пам'яті") | |
return True | |
except Exception as index_error: | |
logger.error(f"Помилка при створенні FAISS індексу: {index_error}") | |
logger.error(traceback.format_exc()) | |
return False | |
except Exception as e: | |
logger.error(f"Загальна помилка при створенні індексів: {e}") | |
logger.error(traceback.format_exc()) | |
return False | |
def _setup_retrievers(self): | |
"""Налаштовуємо різні типи retrievers для пошуку.""" | |
docstore = self.index.storage_context.docstore | |
# BM25 retriever (пошук за ключовими словами) | |
self.retriever_bm25 = BM25Retriever.from_defaults( | |
docstore=docstore, | |
similarity_top_k=self.similarity_top_k | |
) | |
# Векторний retriever (семантичний пошук) | |
self.retriever_vector = self.index.as_retriever( | |
similarity_top_k=self.similarity_top_k | |
) | |
# Гібридний retriever (комбінація BM25 та векторного) | |
self.retriever_fusion = QueryFusionRetriever( | |
[self.retriever_bm25, self.retriever_vector], | |
mode=self.hybrid_mode, | |
similarity_top_k=self.similarity_top_k, | |
num_queries=1, | |
use_async=True | |
) | |
# Query engine для виконання запитів | |
self.query_engine = RetrieverQueryEngine(self.retriever_fusion) | |
def _persist_indices_to_disk(self, indices_dir): | |
"""Зберігаємо індекси на диск.""" | |
try: | |
# Створюємо директорію, якщо її не існує | |
Path(indices_dir).mkdir(parents=True, exist_ok=True) | |
# Якщо індекси вже створені в пам'яті, просто зберігаємо їх на диск | |
if self.index is not None: | |
# Забезпечуємо бінарний формат збереження | |
Settings.persist_json_format = False | |
# Очищаємо директорію перед збереженням | |
path_dir = Path(indices_dir) | |
if path_dir.exists(): | |
for item in path_dir.iterdir(): | |
if item.is_file(): | |
item.unlink() | |
elif item.is_dir(): | |
import shutil | |
shutil.rmtree(item) | |
# Зберігаємо індекси | |
self.index.storage_context.persist(persist_dir=indices_dir) | |
# Створюємо BM25 директорію і зберігаємо параметри | |
bm25_dir = Path(indices_dir) / "bm25" | |
bm25_dir.mkdir(exist_ok=True) | |
# Зберігаємо параметри BM25 | |
bm25_params = { | |
"similarity_top_k": self.retriever_bm25.similarity_top_k, | |
"index_creation_time": datetime.now().isoformat() | |
} | |
with open(bm25_dir / "params.json", "w", encoding="utf-8") as f: | |
json.dump(bm25_params, f, ensure_ascii=False, indent=2) | |
# Створюємо маркерний файл | |
with open(os.path.join(indices_dir, "indices.valid"), "w") as f: | |
f.write(f"Indices created at {datetime.now().isoformat()}") | |
# Зберігаємо метадані | |
metadata = { | |
"created_at": datetime.now().isoformat(), | |
"documents_count": len(self.jira_documents), | |
"storage_format": "binary" | |
} | |
with open(os.path.join(indices_dir, "metadata.json"), "w", encoding="utf-8") as f: | |
json.dump(metadata, f, ensure_ascii=False, indent=2) | |
# Оновлюємо шлях у глобальному index_manager, якщо він доступний | |
if self.index_manager and hasattr(self.index_manager, 'register_indices_path'): | |
session_id = getattr(self.app, 'current_session_id', None) | |
self.index_manager.register_indices_path(indices_dir, session_id) | |
self.indices_dir = indices_dir | |
logger.info(f"Індекси успішно збережено у: {indices_dir}") | |
return True | |
# Якщо індекси ще не створено, але є index_manager - використовуємо його | |
elif self.index_manager and self.df is not None: | |
# Генеруємо унікальний ідентифікатор сесії, якщо він відсутній | |
if not hasattr(self.app, 'current_session_id') or self.app.current_session_id is None: | |
import uuid | |
session_id = f"{uuid.uuid4()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
else: | |
session_id = self.app.current_session_id | |
# Реєструємо або створюємо нові індекси | |
if hasattr(self.index_manager, 'register_indices_path'): | |
self.index_manager.register_indices_path(indices_dir, session_id) | |
self.indices_dir = indices_dir | |
return True | |
else: | |
# Резервний варіант - створюємо нові індекси через manager | |
index_result = self.index_manager.get_or_create_indices(self.df, session_id) | |
if "indices_dir" in index_result: | |
self.indices_dir = index_result["indices_dir"] | |
logger.info(f"Індекси збережені через index_manager: {self.indices_dir}") | |
return True | |
else: | |
logger.error(f"Помилка при збереженні індексів через index_manager: {index_result.get('error', 'невідома помилка')}") | |
return False | |
else: | |
logger.error("Немає індексів для збереження") | |
return False | |
except Exception as e: | |
logger.error(f"Помилка при збереженні індексів: {e}") | |
logger.error(traceback.format_exc()) | |
return False | |
def load_indices(self, indices_dir): | |
"""Завантаження індексів з директорії, якщо вони існують.""" | |
try: | |
# Перевіряємо цілісність індексів | |
is_valid, error_msg = check_index_integrity(indices_dir) | |
if not is_valid: | |
logger.warning(f"Файл маркера не знайдено в {indices_dir}: {error_msg}") | |
return False | |
# Забезпечуємо бінарний формат для завантаження | |
Settings.persist_json_format = False | |
# Діагностика вмісту директорії | |
import glob | |
logger.info(f"Файли у директорії {indices_dir}: {glob.glob(os.path.join(indices_dir, '*'))}") | |
# Завантаження за приклад з LlamaIndex | |
try: | |
# Створюємо vector_store з директорії | |
vector_store = FaissVectorStore.from_persist_dir(indices_dir) | |
# Створюємо storage_context | |
storage_context = StorageContext.from_defaults( | |
vector_store=vector_store, | |
persist_dir=indices_dir | |
) | |
# Завантажуємо індекс | |
self.index = load_index_from_storage( | |
storage_context=storage_context, | |
index_cls=VectorStoreIndex | |
) | |
# Налаштовуємо retrievers | |
self._setup_retrievers() | |
logger.info(f"Індекси успішно завантажені з: {indices_dir}") | |
return True | |
except Exception as e: | |
logger.error(f"Проблема при завантаженні індексів: {e}") | |
logger.error(traceback.format_exc()) | |
return False | |
except Exception as e: | |
logger.error(f"Помилка при завантаженні індексів: {e}") | |
logger.error(traceback.format_exc()) | |
return False | |
def chat_with_hybrid_search(self, question, chat_history=None): | |
"""Виконуємо гібридний пошук і отримуємо відповідь від LLM.""" | |
if not self.llm or not self.retriever_fusion: | |
error_msg = "Не вдалося виконати запит: LLM або індекси не ініціалізовано." | |
logger.error(error_msg) | |
return {"error": error_msg} | |
try: | |
logger.info(f"Обробка запиту: {question}") | |
question_tokens = count_tokens(question) | |
# Виконуємо пошук | |
logger.info("Виконання гібридного пошуку за запитом") | |
nodes = self.retriever_fusion.retrieve(question) | |
# Формуємо контекст | |
context = "ЗНАЙДЕНІ РЕЛЕВАНТНІ ДОКУМЕНТИ:\n\n" | |
relevant_docs = [] | |
for i, node in enumerate(nodes): | |
context += f"Документ {i+1} (релевантність: {node.score:.4f}):\n" | |
ticket_id = node.metadata.get("issue_key", f"TICKET-{i+1}") | |
summary = node.metadata.get("summary", "Без опису") | |
context += f"ТІКЕТ {ticket_id}: {summary}\n" | |
# Додаємо метадані | |
for k, v in node.metadata.items(): | |
if k in EXCLUDED_LLM_METADATA_KEYS or k in ["summary", "issue_key", "node_info"]: | |
continue | |
if v: | |
context += f"{k}: {v}\n" | |
# Додаємо текст документа | |
if node.text: | |
context += f"Опис: {node.text}\n" | |
context += "\n" + "-"*40 + "\n\n" | |
# Зберігаємо інформацію про документ | |
relevant_docs.append({ | |
"rank": i+1, | |
"relevance": node.score, | |
"ticket_id": ticket_id, | |
"summary": summary | |
}) | |
# Рахуємо токени в контексті | |
context_tokens = count_tokens(context) | |
# Формуємо повідомлення для LLM | |
messages = [] | |
messages.append(ChatMessage(role="system", content=system_prompt_hybrid_chat)) | |
# Додаємо історію чату, якщо вона є | |
if chat_history: | |
for h in chat_history: | |
role_ = h.get("role", "user") | |
content_ = h.get("content", "") | |
if role_ in ["user", "assistant", "system"]: | |
messages.append(ChatMessage(role=role_, content=content_)) | |
# Додаємо контекст і питання | |
messages.append(ChatMessage(role="system", content=f"Контекст:\n\n{context}")) | |
messages.append(ChatMessage(role="user", content=question)) | |
# Відправляємо запит до LLM | |
logger.info(f"Відправка запиту до LLM (токени: питання={question_tokens}, контекст={context_tokens})") | |
response = self.llm.chat(messages) | |
response_text = str(response) | |
# Рахуємо токени відповіді | |
response_tokens = count_tokens(response_text) | |
logger.info(f"Отримано відповідь від LLM (токени: відповідь={response_tokens})") | |
# Формуємо результат | |
return { | |
"answer": response_text, | |
"metadata": { | |
"question_tokens": question_tokens, | |
"context_tokens": context_tokens, | |
"response_tokens": response_tokens, | |
"total_tokens": question_tokens + context_tokens + response_tokens, | |
"relevant_documents": relevant_docs[:self.similarity_top_k] | |
} | |
} | |
except Exception as e: | |
error_msg = f"Помилка при виконанні запиту: {e}" | |
logger.error(error_msg) | |
logger.error(traceback.format_exc()) | |
return {"error": error_msg} | |
# --- Допоміжні методи для сумісності --- | |
def chat(self, question, history=None): | |
"""Скорочений метод, повертає лише текст відповіді.""" | |
result = self.chat_with_hybrid_search(question, history) | |
if "error" in result: | |
return f"Помилка: {result['error']}" | |
return result["answer"] | |
def run_qa(self, question, history=None): | |
"""Сумісність із jira_qa_assistant.py""" | |
return self.chat_with_hybrid_search(question, history) | |
def run_full_context_qa(self, question): | |
"""Запит без історії.""" | |
return self.chat_with_hybrid_search(question) | |
def load_data_from_dataframe(self, df): | |
"""Завантаження даних з DataFrame.""" | |
try: | |
self.df = df.copy() | |
self.jira_documents = self._create_documents_from_dataframe(df) | |
if self._create_indices_in_memory(): | |
self.indices_loaded = True | |
return True | |
return False | |
except Exception as e: | |
logger.error(f"Помилка при завантаженні даних з DataFrame: {e}") | |
return False | |
def load_data_from_csv(self, file_path): | |
"""Завантаження даних з CSV файлу.""" | |
try: | |
df = pd.read_csv(file_path) | |
return self.load_data_from_dataframe(df) | |
except Exception as e: | |
logger.error(f"Помилка при завантаженні даних з CSV: {e}") | |
return False | |
def save_indices(self, indices_dir=None): | |
"""Публічний метод для збереження індексів.""" | |
if indices_dir is None and self.indices_dir is not None: | |
indices_dir = self.indices_dir | |
if indices_dir: | |
return self._persist_indices_to_disk(indices_dir) | |
else: | |
logger.error("Не вказано директорію для збереження індексів") | |
return False |