import pandas as pd import numpy as np import re import json import os from io import StringIO from PyPDF2 import PdfReader from docx import Document as DocxDocument from llama_index.core.text_splitter import SentenceSplitter from llama_index.embeddings.huggingface import HuggingFaceEmbedding from sklearn.metrics.pairwise import cosine_similarity from llama_index.core.schema import Document from scripts.config import * def extract_text_from_pdf(file_path): text = "" with open(file_path, 'rb') as file: pdf_reader = PdfReader(file) for page in pdf_reader.pages: text += page.extract_text() + "\n" return text def extract_text_from_docx(file_path): doc = DocxDocument(file_path) text = "" for paragraph in doc.paragraphs: text += paragraph.text + "\n" return text def extract_text_from_txt(file_path): encodings = ['utf-8', 'windows-1251', 'cp1252', 'iso-8859-1'] for encoding in encodings: try: with open(file_path, 'r', encoding=encoding) as file: return file.read() except UnicodeDecodeError: continue with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: return file.read() def extract_text_from_csv(file_path): df = pd.read_csv(file_path, encoding='utf-8') text = "" for column in df.columns: text += f"{column}: {' '.join(df[column].astype(str).tolist())}\n" return text def extract_text_from_xlsx(file_path): df = pd.read_excel(file_path) text = "" for column in df.columns: text += f"{column}: {' '.join(df[column].astype(str).tolist())}\n" return text def extract_text_from_json(file_path): with open(file_path, 'r', encoding='utf-8') as file: data = json.load(file) def flatten_json(obj, prefix=""): text = "" if isinstance(obj, dict): for key, value in obj.items(): new_key = f"{prefix}.{key}" if prefix else key text += flatten_json(value, new_key) elif isinstance(obj, list): for i, item in enumerate(obj): new_key = f"{prefix}[{i}]" if prefix else f"[{i}]" text += flatten_json(item, new_key) else: text += f"{prefix}: {str(obj)}\n" return text return flatten_json(data) def extract_text_from_file(file_path): file_extension = os.path.splitext(file_path)[1].lower() extractors = { '.pdf': extract_text_from_pdf, '.docx': extract_text_from_docx, '.txt': extract_text_from_txt, '.csv': extract_text_from_csv, '.xlsx': extract_text_from_xlsx, '.xls': extract_text_from_xlsx, '.json': extract_text_from_json } if file_extension in extractors: return extractors[file_extension](file_path) else: raise ValueError(f"Unsupported file format: {file_extension}") def preprocess_text(text): if pd.isna(text): return "" text = str(text) text = re.sub(r'(^\s*[\.\_]{3,}\s*$)', '', text, flags=re.MULTILINE) text = re.sub(r'(^\s*\d+\s*[\.\_]{3,}\s*$)', '', text, flags=re.MULTILINE) text = re.sub(r'[\.\_]{5,}', ' ', text) text = re.sub(r'№\s*[_\s]*от\s*«[_\s]*»\s*[_\s]*\.{0,}', '', text, flags=re.IGNORECASE) text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r'[ \t]+', ' ', text) text = re.sub(r'—{2,}', '—', text) text = re.sub(r'_{2,}', '', text) text = text.strip() return text def create_initial_chunks(text): sentence_splitter = SentenceSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) return sentence_splitter.split_text(text) def get_chunk_embeddings(chunks): embeddings_model = HuggingFaceEmbedding(model_name=EMBEDDING_MODEL) chunk_embeddings = [] for chunk in chunks: embedding = embeddings_model.get_text_embedding(chunk) chunk_embeddings.append(embedding) return np.array(chunk_embeddings) def merge_similar_chunks(initial_chunks, similarity_matrix): merged_chunks = [] used_indices = set() for i, chunk in enumerate(initial_chunks): if i in used_indices: continue current_chunk = chunk current_indices = [i] for j in range(i + 1, len(initial_chunks)): if j in used_indices: continue if similarity_matrix[i][j] > SIMILARITY_THRESHOLD: combined_text = current_chunk + " " + initial_chunks[j] if len(combined_text) <= MAX_CHUNK_SIZE: current_chunk = combined_text current_indices.append(j) if len(current_chunk) >= MIN_CHUNK_SIZE: merged_chunks.append(current_chunk) used_indices.update(current_indices) return merged_chunks def extract_sections_from_chunk(chunk_text): section_patterns = [ r'^(\d+(?:\.\d+)*)\s+([А-Яа-я][А-Яа-я\s,\-\(\)\"\']+)', r'^([А-Я][А-Я\s]+)\s*\n', r'^(\d+)\.\s*([А-Яа-я][А-Яа-я\s,\-\(\)\"\']+)', r'Статья\s+(\d+(?:\.\d+)?)\.\s*([А-Яа-я][А-Яа-я\s,\-\(\)\"\']+)', r'Пункт\s+(\d+(?:\.\d+)?)\.\s*([А-Яа-я][А-Яа-я\s,\-\(\)\"\']+)', ] current_section = '' current_subsection = '' for pattern in section_patterns: matches = re.findall(pattern, chunk_text, re.MULTILINE | re.IGNORECASE) for match in matches: if len(match) == 2: section_num = match[0] section_title = match[1].strip() if '.' in section_num and len(section_num.split('.')) > 1: current_subsection = f"{section_num} {section_title}" else: current_section = f"{section_num} {section_title}" break if current_section or current_subsection: break return current_section, current_subsection def process_single_document(file_path): filename = os.path.basename(file_path) text = extract_text_from_file(file_path) text = preprocess_text(text) if not text or len(text.strip()) < 50: return [] initial_chunks = create_initial_chunks(text) if len(initial_chunks) < 2: merged_chunks = initial_chunks else: try: chunk_embeddings = get_chunk_embeddings(initial_chunks) similarity_matrix = cosine_similarity(chunk_embeddings) merged_chunks = merge_similar_chunks(initial_chunks, similarity_matrix) except Exception as e: print(f"Error in similarity processing for {filename}: {str(e)}") merged_chunks = initial_chunks results = [] for i, chunk_text in enumerate(merged_chunks): current_section, current_subsection = extract_sections_from_chunk(chunk_text) results.append({ 'document_id': filename, 'section': current_section, 'subsection': current_subsection, 'chunk_text': chunk_text, 'chunk_length': len(chunk_text), 'chunk_id': f"{filename}_chunk_{i}", 'txt_file_id': filename, 'file_link': file_path }) return results def process_multiple_documents(file_paths): all_results = [] for file_path in file_paths: try: doc_results = process_single_document(file_path) all_results.extend(doc_results) print(f"Processed {file_path}: {len(doc_results)} chunks created") except Exception as e: print(f"Error processing {file_path}: {str(e)}") return all_results def create_llama_documents(processed_chunks): documents = [] for chunk_data in processed_chunks: metadata = { 'chunk_id': chunk_data['chunk_id'], 'document_id': chunk_data['document_id'], 'section': chunk_data['section'] if chunk_data['section'] else '', 'subsection': chunk_data['subsection'] if chunk_data['subsection'] else '', 'chunk_length': chunk_data['chunk_length'], 'txt_file_id': chunk_data.get('txt_file_id', chunk_data['document_id']), 'file_link': chunk_data.get('file_link', chunk_data['file_link'] if 'file_link' in chunk_data else '') } doc = Document( text=chunk_data['chunk_text'], metadata=metadata, id_=chunk_data['chunk_id'] ) documents.append(doc) return documents def save_processed_chunks(processed_chunks, output_path='processed_data/processed_chunks.csv'): os.makedirs(os.path.dirname(output_path), exist_ok=True) df_chunks = pd.DataFrame(processed_chunks) df_chunks.to_csv(output_path, index=False, encoding='utf-8') return df_chunks def load_processed_chunks(input_path='processed_data/processed_chunks.csv'): return pd.read_csv(input_path, encoding='utf-8')