chatbot4nct_test2 / helpers.py
quoc-khanh's picture
Update helpers.py
1cf0c77 verified
raw
history blame
16.8 kB
from docx import Document
import json
import datetime
import tempfile
from pathlib import Path
from unidecode import unidecode
from langchain_community.document_loaders import JSONLoader, UnstructuredWordDocumentLoader, WebBaseLoader, AsyncHtmlLoader
from langchain_community.document_transformers import Html2TextTransformer
from langchain_text_splitters import RecursiveCharacterTextSplitter, RecursiveJsonSplitter
from langchain_community.vectorstores import FAISS
from langchain_google_genai import GoogleGenerativeAIEmbeddings, ChatGoogleGenerativeAI
import google.generativeai as genai
from tqdm import tqdm
from pathlib import Path
import shutil
import requests
from bs4 import BeautifulSoup
import os
from langchain_docling import DoclingLoader#, ExportType
from langchain_docling.loader import ExportType
import logging
# logging.getLogger("langchain").setLevel(logging.ERROR)
logging.getLogger().setLevel(logging.ERROR)
import re
import ast
from langchain.schema import Document
# from file_loader import get_vectorstore
key = os.getenv("GOOGLE_API_KEY")
# import asyncio từ
# from urllib.parse import urljoin
# from playwright.async_api import async_playwright
# from langchain_community.document_loaders import AsyncHtmlLoader
# from langchain_community.document_transformers import Html2TextTransformer
# from tqdm.asyncio import tqdm
# async def _fetch_urls(base_url):
# """Extract all links from a JavaScript-rendered webpage."""
# async with async_playwright() as p:
# try:
# browser = await p.chromium.launch(headless=True)
# page = await browser.new_page()
# await page.goto(base_url)
# await page.wait_for_load_state("networkidle")
# urls = set()
# links = await page.locator("a").all()
# for link in links:
# href = await link.get_attribute("href")
# if href and "#" not in href:
# full_url = urljoin(base_url, href)
# if full_url.startswith(base_url):
# urls.add(full_url)
# await browser.close()
# except Exception as e:
# print(f"⚠️ Không thể truy cập {base_url}: {e}")
# return [] # Trả về danh sách rỗng nếu gặp lỗi
# return list(urls)
# async def _fetch_web_content(urls):
# """Fetch HTML content and convert it to text, with a progress bar."""
# docs = []
# progress_bar = tqdm(total=len(urls), desc="Scraping Pages", unit="page")
# for page_url in urls:
# try:
# loader = AsyncHtmlLoader(page_url)
# html2text = Html2TextTransformer()
# html = await loader.aload()
# doc = html2text.transform_documents(html)
# docs.extend(doc)
# except Exception as e:
# print(f"Error loading {page_url}: {e}")
# progress_bar.update(1) # Update progress bar
# progress_bar.close()
# return docs
# def scrape_website(base_urls):
# """
# Scrapes a list of base URLs and extracts their content.
# Includes a progress bar for tracking.
# """
# async def _main():
# all_urls = []
# for base_url in base_urls:
# urls = await _fetch_urls(base_url)
# all_urls.extend(urls)
# docs = await _fetch_web_content(all_urls)
# return docs
# return asyncio.run(_main)
# class ChunkerWrapper:
# def __init__(self, splitter):
# self.splitter = splitter
# def chunk(self, text):
# # Use the 'split_text' method of the splitter to divide the text
# return self.splitter.split_text(text)
# def get_web_documents(base_urls=['https://nct.neu.edu.vn/']):
# """Tải nội dung từ danh sách URL với thanh tiến trình"""
# docs = []
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=100)
# chunker = ChunkerWrapper(text_splitter)
# for page_url in tqdm(base_urls, desc="Đang tải trang", unit="url"):
# try:
# # loader = WebBaseLoader(page_url)
# loader = DoclingLoader(file_path=page_url,chunker=chunker # This will break your doc into manageable pieces.
# )
# html = loader.load()
# doc = html
# docs.extend(doc)
# except Exception as e:
# print(f"Lỗi khi tải {page_url}: {e}")
# print(f"Tải thành công {len(docs)} trang.")
# return docs
# def load_text_data(file_path):
# """Tải nội dung văn bản từ file DOCX (đã loại bảng)."""
# # cleaned_file = Document(file_path) #remove_tables_from_docx(file_path)
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=100)
# chunker = ChunkerWrapper(text_splitter)
# return DoclingLoader(file_path=file_path, chunker=chunker # This will break your doc into manageable pieces.
# ).load()
# def extract_metadata(input_string):
# # Use regex to find the content inside curly braces
# match = re.search(r'\{.*?\}', input_string)
# if match:
# metadata_str = match.group() # This returns the substring with the braces
# try:
# # Safely evaluate the string to a dictionary
# new_metadata = ast.literal_eval(metadata_str)
# except Exception as e:
# print(f"Error evaluating metadata: {e}")
# new_metadata = {}
# else:
# new_metadata = None
# return new_metadata
def extract_metadata(response):
if not isinstance(response, str):
response = str(response) # Chuyển về string nếu cần
# Tìm phần "text" trước
text_match = re.search(r'"text":\s*"([^"]+)"', response, re.DOTALL)
if not text_match:
return None # Không tìm thấy phần text nào
text_content = text_match.group(1) # Lấy nội dung trong "text"
# Tìm tất cả dictionary trong text_content
matches = re.findall(r'\{.*?\}', text_content, re.DOTALL)
if not matches:
return None # Không tìm thấy dict nào
smallest_dict = None
min_length = float("inf")
for match in matches:
try:
parsed_dict = ast.literal_eval(match) # Chuyển đổi string thành dictionary
if isinstance(parsed_dict, dict):
dict_length = len(str(parsed_dict)) # Độ dài chuỗi của dict
if dict_length < min_length:
smallest_dict = parsed_dict
min_length = dict_length
except Exception:
continue # Bỏ qua nếu không phải dictionary hợp lệ
return smallest_dict
# # 🛠 **Ví dụ sử dụng**
# response = None # Hoặc một object không phải string
# smallest_dict = extract_smallest_dict(response)
# print(smallest_dict) # Output: None
# # Example usage:
# input_str = "Some random text before and then {'a': 'abc', 'b': 'bcd'} and some random text after."
# metadata = extract_metadata(input_str)
# print(metadata)
def define_metadata(input_text):
condition1 = 'Chương trình'
condition2 = 'Đề án'
condition3 = 'Đề cương'
condition4 = ['Trí tuệ nhân tạo',
'Toán kinh tế',
'Thống kê kinh tế',
'Phân tích dữ liệu trong Kinh tế',
'Kỹ thuật phần mềm',
'Khoa học máy tính',
'Khoa học dữ liệu',
'Hệ thống thông tin quản lý',
'Hệ thống thông tin',
'Định phí bảo hiểm và Quản trị rủi ro',
'Công nghệ thông tin',
'An toàn thông tin']
#cond1 cond2 la str, con3 la list ten cac nganh
result = {}
if condition3 in input_text:
result['Tai lieu ve'] = 'Đề cương'
elif condition1 in input_text:
result['Tai lieu ve'] = 'Chương trình đào tạo'
elif condition2 in input_text:
result['Tai lieu ve'] = 'Đề án'
for cond in condition4:
if cond in input_text:
if cond in ['An toàn thông tin', 'Công nghệ thông tin', 'Khoa học máy tính', 'Kỹ thuật phần mềm']:
result['Khoa'] = 'Công nghệ thông tin (FIT)'
if cond in ['Toán kinh tế', 'Phân tích dữ liệu trong Kinh tế', 'Định phí bảo hiểm và Quản trị rủi ro']:
result['Khoa'] = 'Toán Kinh tế (MFE)'
if cond == 'Toán kinh tế':
cond += ' (TOKT)'
if cond == 'Phân tích dữ liệu trong Kinh tế':
cond += ' (DSEB)'
if cond == 'Định phí bảo hiểm và Quản trị rủi ro':
cond += ' (Actuary)'
if cond in ['Khoa học dữ liệu', 'Trí tuệ nhân tạo']:
result['Khoa'] = 'Khoa học dữ liệu và Trí tuệ nhân tạo (FDA)'
if cond == 'Thống kê kinh tế':
result['Khoa'] = 'Thống kê'
if cond in ['Hệ thống thông tin', 'Hệ thống thông tin quản lý']:
result['Khoa'] = 'Hệ thống thông tin quản lý (MIS)'
result['Nganh'] = cond
return result
def update_documents_metadata(documents, new_metadata):
updated_documents = []
for doc in documents:
# Preserve the original 'source'
original_source = doc.metadata.get("source")
# Update metadata with new key-value pairs
doc.metadata.update(new_metadata)
# Ensure the 'source' remains unchanged
if original_source:
doc.metadata["source"] = original_source
updated_documents.append(doc)
return updated_documents
def get_web_documents(base_urls=['https://nct.neu.edu.vn/']):
"""Fetch content from a list of URLs with a progress bar."""
docs = []
for page_url in tqdm(base_urls, desc="Loading page", unit="url"):
try:
loader = DoclingLoader(
file_path=page_url,
export_type=ExportType.DOC_CHUNKS # Enable internal chunking
)
doc = loader.load()
docs.extend(doc)
except Exception as e:
print(f"Error loading {page_url}: {e}")
print(f"Successfully loaded {len(docs)} documents.")
return docs
def load_text_data(file_path):
"""Load text content from a DOCX file (tables removed)."""
text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=2500)
loader = DoclingLoader(
file_path=file_path,
export_type=ExportType.MARKDOWN, # Enable internal chunking,
chunker = text_splitter,
# convert_kwargs={"input_format": "docx"} # Specify the input format
)
docs = loader.load()
chunks = text_splitter.split_documents(docs)
# You can wrap each chunk back into a Document if needed.
return chunks
def log_message(messages, filename="chat_log.txt"):
"""Ghi lịch sử tin nhắn vào file log"""
with open(filename, "a", encoding="utf-8") as f:
log_entry = {
"timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"conversation": messages
}
f.write(json.dumps(log_entry, ensure_ascii=False) + "\n")
def remove_tables_from_docx(file_path):
"""Tạo bản sao của file DOCX nhưng loại bỏ tất cả bảng bên trong."""
doc = Document(file_path)
new_doc = Document()
for para in doc.paragraphs:
new_doc.add_paragraph(para.text)
# 📌 Lưu vào file tạm, đảm bảo đóng đúng cách
with tempfile.NamedTemporaryFile(delete=False, suffix=".docx") as temp_file:
temp_path = temp_file.name
new_doc.save(temp_path)
return temp_path # ✅ Trả về đường dẫn file mới, không làm hỏng file gốc
def extract_tables_from_docx(file_path):
doc = Document(file_path)
tables = []
all_paragraphs = [p.text.strip() for p in doc.paragraphs if p.text.strip()] # Lấy tất cả đoạn văn bản không rỗng
table_index = 0
para_index = 0
table_positions = []
# Xác định vị trí của bảng trong tài liệu
for element in doc.element.body:
if element.tag.endswith("tbl"):
table_positions.append((table_index, para_index))
table_index += 1
elif element.tag.endswith("p"):
para_index += 1
for idx, (table_idx, para_idx) in enumerate(table_positions):
data = []
for row in doc.tables[table_idx].rows:
data.append([cell.text.strip() for cell in row.cells])
if len(data) > 1: # Chỉ lấy bảng có dữ liệu
# Lấy 5 dòng trước và sau bảng
related_start = max(0, para_idx - 5)
related_end = min(len(all_paragraphs), para_idx + 5)
related_text = all_paragraphs[related_start:related_end]
tables.append({"table": idx + 1, "content": data, "related": related_text})
return tables
def convert_to_json(tables):
structured_data = {}
for table in tables:
headers = [unidecode(h) for h in table["content"][0]] # Bỏ dấu ở headers
rows = [[unidecode(cell) for cell in row] for row in table["content"][1:]] # Bỏ dấu ở dữ liệu
json_table = [dict(zip(headers, row)) for row in rows if len(row) == len(headers)]
related_text = [unidecode(text) for text in table["related"]] # Bỏ dấu ở văn bản liên quan
structured_data[table["table"]] = {
"content": json_table,
"related": related_text
}
return json.dumps(structured_data, indent=4, ensure_ascii=False)
def save_json_to_file(json_data, output_path):
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(json.loads(json_data), f, ensure_ascii=False, indent=4)
# def load_json_with_langchain(json_path):
# loader = JSONLoader(file_path=json_path, jq_schema='.. | .content?', text_content=False)
# data = loader.load()
# # # Kiểm tra xem dữ liệu có bị lỗi không
# # print("Sample Data:", data[:2]) # In thử 2 dòng đầu
# return data
def load_json_manually(json_path):
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
def load_table_data(file_path, output_json_path):
tables = extract_tables_from_docx(file_path)
json_output = convert_to_json(tables)
save_json_to_file(json_output, output_json_path)
table_data = load_json_manually(output_json_path)
return table_data
def get_splits(file_path, output_json_path):
# table_data = load_table_data(file_path, output_json_path)
text_data = load_text_data(file_path)
# Chia nhỏ văn bản
# json_splitter = RecursiveJsonSplitter(max_chunk_size = 1000)
# text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=250)
# table_splits = json_splitter.create_documents(texts=[table_data])
# text_splits = text_splitter.split_documents(text_data)
# all_splits = table_splits + text_splits DoclingLoader
return text_data #text_splits
def get_json_splits_only(file_path):
table_data = load_json_manually(file_path)
def remove_accents(obj): #xoa dau tieng viet
if isinstance(obj, str):
return unidecode(obj)
elif isinstance(obj, list):
return [remove_accents(item) for item in obj]
elif isinstance(obj, dict):
return {remove_accents(k): remove_accents(v) for k, v in obj.items()}
return obj
cleaned_data = remove_accents(table_data)
wrapped_data = {"data": cleaned_data} if isinstance(cleaned_data, list) else cleaned_data
json_splitter = RecursiveJsonSplitter(max_chunk_size = 2000)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=100)
table_splits = json_splitter.create_documents(texts=[wrapped_data])
table_splits = text_splitter.split_documents(table_splits)
return table_splits
def list_docx_files(folder_path):
"""List all DOCX and DOC files in the given folder (including subfolders)."""
return [str(file) for file in Path(folder_path).rglob("*.docx")] + \
[str(file) for file in Path(folder_path).rglob("*.doc")]
def prompt_order(queries):
text = 'Dưới đây là các câu hỏi của người dùng theo thời gian, hãy đưa ra đáp án tốt nhất:\n'
i = 0
for q in queries:
i += 1
text += f'Câu hỏi {i}: {str(q)}\n'
return text