Spaces:
Sleeping
Sleeping
import pandas as pd | |
import numpy as np | |
import re | |
import json | |
import os | |
from io import StringIO | |
from PyPDF2 import PdfReader | |
from docx import Document as DocxDocument | |
from llama_index.core.text_splitter import SentenceSplitter | |
from llama_index.embeddings.huggingface import HuggingFaceEmbedding | |
from sklearn.metrics.pairwise import cosine_similarity | |
from llama_index.core.schema import Document | |
from scripts.config import * | |
def extract_text_from_pdf(file_path): | |
text = "" | |
with open(file_path, 'rb') as file: | |
pdf_reader = PdfReader(file) | |
for page in pdf_reader.pages: | |
text += page.extract_text() + "\n" | |
return text | |
def extract_text_from_docx(file_path): | |
doc = DocxDocument(file_path) | |
text = "" | |
for paragraph in doc.paragraphs: | |
text += paragraph.text + "\n" | |
return text | |
def extract_text_from_txt(file_path): | |
encodings = ['utf-8', 'windows-1251', 'cp1252', 'iso-8859-1'] | |
for encoding in encodings: | |
try: | |
with open(file_path, 'r', encoding=encoding) as file: | |
return file.read() | |
except UnicodeDecodeError: | |
continue | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: | |
return file.read() | |
def extract_text_from_csv(file_path): | |
df = pd.read_csv(file_path, encoding='utf-8') | |
text = "" | |
for column in df.columns: | |
text += f"{column}: {' '.join(df[column].astype(str).tolist())}\n" | |
return text | |
def extract_text_from_xlsx(file_path): | |
df = pd.read_excel(file_path) | |
text = "" | |
for column in df.columns: | |
text += f"{column}: {' '.join(df[column].astype(str).tolist())}\n" | |
return text | |
def extract_text_from_json(file_path): | |
with open(file_path, 'r', encoding='utf-8') as file: | |
data = json.load(file) | |
def flatten_json(obj, prefix=""): | |
text = "" | |
if isinstance(obj, dict): | |
for key, value in obj.items(): | |
new_key = f"{prefix}.{key}" if prefix else key | |
text += flatten_json(value, new_key) | |
elif isinstance(obj, list): | |
for i, item in enumerate(obj): | |
new_key = f"{prefix}[{i}]" if prefix else f"[{i}]" | |
text += flatten_json(item, new_key) | |
else: | |
text += f"{prefix}: {str(obj)}\n" | |
return text | |
return flatten_json(data) | |
def extract_text_from_file(file_path): | |
file_extension = os.path.splitext(file_path)[1].lower() | |
extractors = { | |
'.pdf': extract_text_from_pdf, | |
'.docx': extract_text_from_docx, | |
'.txt': extract_text_from_txt, | |
'.csv': extract_text_from_csv, | |
'.xlsx': extract_text_from_xlsx, | |
'.xls': extract_text_from_xlsx, | |
'.json': extract_text_from_json | |
} | |
if file_extension in extractors: | |
return extractors[file_extension](file_path) | |
else: | |
raise ValueError(f"Unsupported file format: {file_extension}") | |
def preprocess_text(text): | |
if pd.isna(text): | |
return "" | |
text = str(text) | |
text = re.sub(r'(^\s*[\.\_]{3,}\s*$)', '', text, flags=re.MULTILINE) | |
text = re.sub(r'(^\s*\d+\s*[\.\_]{3,}\s*$)', '', text, flags=re.MULTILINE) | |
text = re.sub(r'[\.\_]{5,}', ' ', text) | |
text = re.sub(r'№\s*[_\s]*от\s*«[_\s]*»\s*[_\s]*\.{0,}', '', text, flags=re.IGNORECASE) | |
text = re.sub(r'\n{3,}', '\n\n', text) | |
text = re.sub(r'[ \t]+', ' ', text) | |
text = re.sub(r'—{2,}', '—', text) | |
text = re.sub(r'_{2,}', '', text) | |
text = text.strip() | |
return text | |
def create_initial_chunks(text): | |
sentence_splitter = SentenceSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=CHUNK_OVERLAP) | |
return sentence_splitter.split_text(text) | |
def get_chunk_embeddings(chunks): | |
embeddings_model = HuggingFaceEmbedding(model_name=EMBEDDING_MODEL) | |
chunk_embeddings = [] | |
for chunk in chunks: | |
embedding = embeddings_model.get_text_embedding(chunk) | |
chunk_embeddings.append(embedding) | |
return np.array(chunk_embeddings) | |
def merge_similar_chunks(initial_chunks, similarity_matrix): | |
merged_chunks = [] | |
used_indices = set() | |
for i, chunk in enumerate(initial_chunks): | |
if i in used_indices: | |
continue | |
current_chunk = chunk | |
current_indices = [i] | |
for j in range(i + 1, len(initial_chunks)): | |
if j in used_indices: | |
continue | |
if similarity_matrix[i][j] > SIMILARITY_THRESHOLD: | |
combined_text = current_chunk + " " + initial_chunks[j] | |
if len(combined_text) <= MAX_CHUNK_SIZE: | |
current_chunk = combined_text | |
current_indices.append(j) | |
if len(current_chunk) >= MIN_CHUNK_SIZE: | |
merged_chunks.append(current_chunk) | |
used_indices.update(current_indices) | |
return merged_chunks | |
def extract_sections_from_chunk(chunk_text): | |
section_patterns = [ | |
r'^(\d+(?:\.\d+)*)\s+([А-Яа-я][А-Яа-я\s,\-\(\)\"\']+)', | |
r'^([А-Я][А-Я\s]+)\s*\n', | |
r'^(\d+)\.\s*([А-Яа-я][А-Яа-я\s,\-\(\)\"\']+)', | |
r'Статья\s+(\d+(?:\.\d+)?)\.\s*([А-Яа-я][А-Яа-я\s,\-\(\)\"\']+)', | |
r'Пункт\s+(\d+(?:\.\d+)?)\.\s*([А-Яа-я][А-Яа-я\s,\-\(\)\"\']+)', | |
] | |
current_section = '' | |
current_subsection = '' | |
for pattern in section_patterns: | |
matches = re.findall(pattern, chunk_text, re.MULTILINE | re.IGNORECASE) | |
for match in matches: | |
if len(match) == 2: | |
section_num = match[0] | |
section_title = match[1].strip() | |
if '.' in section_num and len(section_num.split('.')) > 1: | |
current_subsection = f"{section_num} {section_title}" | |
else: | |
current_section = f"{section_num} {section_title}" | |
break | |
if current_section or current_subsection: | |
break | |
return current_section, current_subsection | |
def process_single_document(file_path): | |
filename = os.path.basename(file_path) | |
text = extract_text_from_file(file_path) | |
text = preprocess_text(text) | |
if not text or len(text.strip()) < 50: | |
return [] | |
initial_chunks = create_initial_chunks(text) | |
if len(initial_chunks) < 2: | |
merged_chunks = initial_chunks | |
else: | |
try: | |
chunk_embeddings = get_chunk_embeddings(initial_chunks) | |
similarity_matrix = cosine_similarity(chunk_embeddings) | |
merged_chunks = merge_similar_chunks(initial_chunks, similarity_matrix) | |
except Exception as e: | |
print(f"Error in similarity processing for {filename}: {str(e)}") | |
merged_chunks = initial_chunks | |
results = [] | |
for i, chunk_text in enumerate(merged_chunks): | |
current_section, current_subsection = extract_sections_from_chunk(chunk_text) | |
results.append({ | |
'document_id': filename, | |
'section': current_section, | |
'subsection': current_subsection, | |
'chunk_text': chunk_text, | |
'chunk_length': len(chunk_text), | |
'chunk_id': f"{filename}_chunk_{i}", | |
'txt_file_id': filename, | |
'file_link': file_path | |
}) | |
return results | |
def process_multiple_documents(file_paths): | |
all_results = [] | |
for file_path in file_paths: | |
try: | |
doc_results = process_single_document(file_path) | |
all_results.extend(doc_results) | |
print(f"Processed {file_path}: {len(doc_results)} chunks created") | |
except Exception as e: | |
print(f"Error processing {file_path}: {str(e)}") | |
return all_results | |
def create_llama_documents(processed_chunks): | |
documents = [] | |
for chunk_data in processed_chunks: | |
metadata = { | |
'chunk_id': chunk_data['chunk_id'], | |
'document_id': chunk_data['document_id'], | |
'section': chunk_data['section'] if chunk_data['section'] else '', | |
'subsection': chunk_data['subsection'] if chunk_data['subsection'] else '', | |
'chunk_length': chunk_data['chunk_length'], | |
'txt_file_id': chunk_data.get('txt_file_id', chunk_data['document_id']), | |
'file_link': chunk_data.get('file_link', chunk_data['file_link'] if 'file_link' in chunk_data else '') | |
} | |
doc = Document( | |
text=chunk_data['chunk_text'], | |
metadata=metadata, | |
id_=chunk_data['chunk_id'] | |
) | |
documents.append(doc) | |
return documents | |
def save_processed_chunks(processed_chunks, output_path='processed_data/processed_chunks.csv'): | |
os.makedirs(os.path.dirname(output_path), exist_ok=True) | |
df_chunks = pd.DataFrame(processed_chunks) | |
df_chunks.to_csv(output_path, index=False, encoding='utf-8') | |
return df_chunks | |
def load_processed_chunks(input_path='processed_data/processed_chunks.csv'): | |
return pd.read_csv(input_path, encoding='utf-8') |