DocIndexer-v2 / classes.py
om4r932's picture
First version
f7db7af
import shutil
import bm25s
from bm25s.hf import BM25HF
import threading, re, time, concurrent.futures, requests, os, hashlib, traceback, io, zipfile, subprocess, tempfile, json, fitz
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
from datasets import load_dataset, Dataset
from datasets.data_files import EmptyDatasetError
from dotenv import load_dotenv
load_dotenv()
class TDocIndexer:
def __init__(self, max_workers=33):
self.indexer_length = 0
self.dataset = "OrganizedProgrammers/3GPPTDocLocation"
self.indexer = self.load_indexer()
self.main_ftp_url = "https://3gpp.org/ftp"
self.valid_doc_pattern = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE)
self.max_workers = max_workers
self.print_lock = threading.Lock()
self.indexer_lock = threading.Lock()
self.total_indexed = 0
self.processed_count = 0
self.total_count = 0
def load_indexer(self):
self.indexer_length = 0
all_docs = {}
tdoc_locations = load_dataset(self.dataset)
tdoc_locations = tdoc_locations["train"].to_list()
for doc in tdoc_locations:
self.indexer_length += 1
all_docs[doc["doc_id"]] = doc["url"]
return all_docs
def save_indexer(self):
"""Save the updated index"""
data = []
for doc_id, url in self.indexer.items():
data.append({"doc_id": doc_id, "url": url})
dataset = Dataset.from_list(data)
dataset.push_to_hub(self.dataset, token=os.environ["HF"])
self.indexer = self.load_indexer()
def get_docs_from_url(self, url):
try:
response = requests.get(url, verify=False, timeout=10)
soup = BeautifulSoup(response.text, "html.parser")
return [item.get_text() for item in soup.select("tr td a")]
except Exception as e:
with self.print_lock:
print(f"Erreur lors de l'accès à {url}: {e}")
return []
def is_valid_document_pattern(self, filename):
return bool(self.valid_doc_pattern.match(filename))
def is_zip_file(self, filename):
return filename.lower().endswith('.zip')
def extract_doc_id(self, filename):
if self.is_valid_document_pattern(filename):
match = self.valid_doc_pattern.match(filename)
if match:
# Retourner le motif complet (comme S1-12345)
full_id = filename.split('.')[0] # Enlever l'extension si présente
return full_id.split('_')[0] # Enlever les suffixes après underscore si présents
return None
def process_zip_files(self, files_list, base_url, workshop=False):
"""Traiter une liste de fichiers pour trouver et indexer les ZIP valides"""
indexed_count = 0
for file in files_list:
if file in ['./', '../', 'ZIP/', 'zip/']:
continue
# Vérifier si c'est un fichier ZIP et s'il correspond au motif
if self.is_zip_file(file) and (self.is_valid_document_pattern(file) or workshop):
file_url = f"{base_url}/{file}"
# Extraire l'ID du document
doc_id = self.extract_doc_id(file)
if doc_id is None:
doc_id = file.split('.')[0]
if doc_id:
# Vérifier si ce fichier est déjà indexé
with self.indexer_lock:
if doc_id in self.indexer and self.indexer[doc_id] == file_url:
continue
# Ajouter ou mettre à jour l'index
self.indexer[doc_id] = file_url
indexed_count += 1
self.total_indexed += 1
return indexed_count
def process_meeting(self, meeting, wg_url, workshop=False):
"""Traiter une réunion individuelle avec multithreading"""
try:
if meeting in ['./', '../']:
return 0
meeting_url = f"{wg_url}/{meeting}"
with self.print_lock:
print(f"Vérification du meeting: {meeting}")
# Vérifier le contenu de la réunion
meeting_contents = self.get_docs_from_url(meeting_url)
key = None
if "docs" in [x.lower() for x in meeting_contents]:
key = "docs"
elif "tdocs" in [x.lower() for x in meeting_contents]:
key = "tdocs"
elif "tdoc" in [x.lower() for x in meeting_contents]:
key = "tdoc"
if key is not None:
docs_url = f"{meeting_url}/{key}"
with self.print_lock:
print(f"Vérification des documents présent dans {docs_url}")
# Récupérer la liste des fichiers dans le dossier Docs
docs_files = self.get_docs_from_url(docs_url)
# 1. Indexer les fichiers ZIP directement dans le dossier Docs
docs_indexed_count = self.process_zip_files(docs_files, docs_url, workshop)
if docs_indexed_count > 0:
with self.print_lock:
print(f"{docs_indexed_count} fichiers trouvés")
# 2. Vérifier le sous-dossier ZIP s'il existe
if "zip" in [x.lower() for x in docs_files]:
zip_url = f"{docs_url}/zip"
with self.print_lock:
print(f"Vérification du dossier ./zip: {zip_url}")
# Récupérer les fichiers dans le sous-dossier ZIP
zip_files = self.get_docs_from_url(zip_url)
# Indexer les fichiers ZIP dans le sous-dossier ZIP
zip_indexed_count = self.process_zip_files(zip_files, zip_url, workshop)
if zip_indexed_count > 0:
with self.print_lock:
print(f"{zip_indexed_count} fichiers trouvés")
# Mise à jour du compteur de progression
with self.indexer_lock:
self.processed_count += 1
# Affichage de la progression
with self.print_lock:
progress = (self.processed_count / self.total_count) * 100 if self.total_count > 0 else 0
print(f"\rProgression: {self.processed_count}/{self.total_count} réunions traitées ({progress:.1f}%)")
return 1 # Réunion traitée avec succès
except Exception as e:
with self.print_lock:
print(f"\nErreur lors du traitement de la réunion {meeting}: {str(e)}")
return 0
def process_workgroup(self, wg, main_url):
"""Traiter un groupe de travail avec multithreading pour ses réunions"""
if wg in ['./', '../']:
return
wg_url = f"{main_url}/{wg}"
with self.print_lock:
print(f"Vérification du working group: {wg}")
# Récupérer les dossiers de réunion
meeting_folders = self.get_docs_from_url(wg_url)
# Ajouter au compteur total
self.total_count += len([m for m in meeting_folders if m not in ['./', '../']])
# Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(self.process_meeting, meeting, wg_url)
for meeting in meeting_folders if meeting not in ['./', '../']]
# Attendre que toutes les tâches soient terminées
concurrent.futures.wait(futures)
def index_all_tdocs(self):
"""Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading"""
print("Démarrage de l'indexation des TDocs 3GPP complète")
start_time = time.time()
docs_count_before = self.indexer_length
# Principaux groupes TSG
main_groups = ["tsg_sa", "tsg_ct", "tsg_ran"] # Ajouter d'autres si nécessaire
for main_tsg in main_groups:
print(f"Indexation de {main_tsg.upper()}...")
main_url = f"{self.main_ftp_url}/{main_tsg}"
# Récupérer les groupes de travail
workgroups = self.get_docs_from_url(main_url)
# Traiter chaque groupe de travail séquentiellement
# (mais les réunions à l'intérieur seront traitées en parallèle)
for wg in workgroups:
self.process_workgroup(wg, main_url)
docs_count_after = len(self.indexer)
new_docs_count = abs(docs_count_after - docs_count_before)
print(f"Indexation terminée en {time.time() - start_time:.2f} secondes")
print(f"Nouveaux documents ZIP indexés: {new_docs_count}")
print(f"Total des documents dans l'index: {docs_count_after}")
return self.indexer
def index_all_workshops(self):
print("Démarrage de l'indexation des workshops ZIP 3GPP...")
start_time = time.time()
docs_count_before = len(self.indexer)
print("\nIndexation du dossier 'workshop'")
main_url = f"{self.main_ftp_url}/workshop"
# Récupérer les dossiers de réunion
meeting_folders = self.get_docs_from_url(main_url)
# Ajouter au compteur total
self.total_count += len([m for m in meeting_folders if m not in ['./', '../']])
# Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True)
for meeting in meeting_folders if meeting not in ['./', '../']]
concurrent.futures.wait(futures)
docs_count_after = len(self.indexer)
new_docs_count = docs_count_after - docs_count_before
print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes")
print(f"Nouveaux documents ZIP indexés: {new_docs_count}")
print(f"Total des documents dans l'index: {docs_count_after}")
return self.indexer
class Spec3GPPIndexer:
def __init__(self, max_workers=16):
self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list()
self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
self.indexed_specifications = {}
self.specifications_passed = set()
self.processed_count = 0
self.total_count = 0
self.DICT_LOCK = threading.Lock()
self.DOCUMENT_LOCK = threading.Lock()
self.STOP_EVENT = threading.Event()
self.max_workers = max_workers
self.LIBREOFFICE_SEMAPHORE = threading.Semaphore(self.max_workers)
def _make_doc_index(self, specs):
doc_index = {}
for section in specs:
if section["doc_id"] not in doc_index:
doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]}
else:
doc_index[section["doc_id"]]["content"][section["section"]] = section["content"]
return doc_index
@staticmethod
def version_to_code(version_str):
chars = "0123456789abcdefghijklmnopqrstuvwxyz"
parts = version_str.split('.')
if len(parts) != 3:
return None
try:
x, y, z = [int(p) for p in parts]
except ValueError:
return None
if x < 36 and y < 36 and z < 36:
return f"{chars[x]}{chars[y]}{chars[z]}"
else:
return f"{str(x).zfill(2)}{str(y).zfill(2)}{str(z).zfill(2)}"
@staticmethod
def hasher(specification, version_code):
return hashlib.md5(f"{specification}{version_code}".encode()).hexdigest()
@staticmethod
def get_scope(content):
for title, text in content.items():
if title.lower().endswith("scope"):
return text
return ""
def get_text(self, specification, version_code):
if self.STOP_EVENT.is_set():
return []
doc_id = specification
series = doc_id.split(".")[0]
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip"
try:
response = requests.get(url, verify=False)
if response.status_code != 200:
return []
zip_bytes = io.BytesIO(response.content)
with zipfile.ZipFile(zip_bytes) as zip_file:
# Filtrer uniquement fichiers .doc et .docx
docx_files = [f for f in zip_file.namelist() if f.lower().endswith(('.doc', '.docx'))]
if not docx_files:
return []
full_text = []
for doc_file in docx_files:
with tempfile.TemporaryDirectory() as tmpdir:
extracted_path = os.path.join(tmpdir, os.path.basename(doc_file))
with open(extracted_path, 'wb') as f:
f.write(zip_file.read(doc_file))
# Profil libreoffice temp dédié
profile_dir = tempfile.mkdtemp(prefix="libreoffice_profile_")
try:
with self.LIBREOFFICE_SEMAPHORE:
cmd = [
'soffice',
'--headless',
f'-env:UserInstallation=file://{profile_dir}',
'--convert-to', 'txt:Text',
'--outdir', tmpdir,
extracted_path
]
subprocess.run(cmd, check=True, timeout=60*5, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
txt_file = os.path.splitext(extracted_path)[0] + '.txt'
if os.path.exists(txt_file):
with open(txt_file, 'r', encoding='utf-8', errors='ignore') as ftxt:
full_text.extend(ftxt.readlines())
finally:
shutil.rmtree(profile_dir, ignore_errors=True)
return full_text
except Exception as e:
print(f"Error getting text for {specification} v{version_code}: {e}")
return []
def get_spec_content(self, specification, version_code):
if self.STOP_EVENT.is_set():
return {}
text = self.get_text(specification, version_code)
if not text:
return {}
chapters = []
chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+[^\.]$")
for i, line in enumerate(text):
if chapter_regex.fullmatch(line):
chapters.append((i, line))
document = {}
for i in range(len(chapters)):
start_index, chapter_title = chapters[i]
end_index = chapters[i+1][0] if i+1 < len(chapters) else len(text)
content_lines = text[start_index + 1:end_index]
document[chapter_title.replace("\t", " ")] = "\n".join(content_lines)
return document
def fetch_spec_table(self):
response = requests.get(
'https://www.3gpp.org/dynareport?code=status-report.htm',
headers={"User-Agent": 'Mozilla/5.0'},
verify=False
)
dfs = pd.read_html(io.StringIO(response.text))
for x in range(len(dfs)):
dfs[x] = dfs[x].replace({np.nan: None})
columns_needed = [0, 1, 2, 3, 4]
extracted_dfs = [df.iloc[:, columns_needed] for df in dfs]
columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns]
specifications = []
for df in extracted_dfs:
for index, row in df.iterrows():
doc = row.to_list()
doc_dict = dict(zip(columns, doc))
specifications.append(doc_dict)
return specifications
def process_specification(self, spec):
if self.STOP_EVENT.is_set():
return
try:
doc_id = str(spec['spec_num'])
version_code = self.version_to_code(str(spec['vers']))
if not version_code:
with self.DICT_LOCK:
self.processed_count += 1
return
document = None
already_indexed = False
with self.DOCUMENT_LOCK:
doc_in_cache = doc_id in self.documents_by_spec_num and \
self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version_code)
if doc_in_cache and doc_id not in self.specifications_passed:
document = self.documents_by_spec_num[doc_id]
self.specifications_passed.add(doc_id)
already_indexed = True
elif doc_id not in self.specifications_passed:
doc_content = self.get_spec_content(doc_id, version_code)
if doc_content:
document = {"content": doc_content, "hash": self.hasher(doc_id, version_code)}
with self.DOCUMENT_LOCK:
self.documents_by_spec_num[doc_id] = document
self.specifications_passed.add(doc_id)
already_indexed = False
if document:
url = f"https://www.3gpp.org/ftp/Specs/archive/{doc_id.split('.')[0]}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip"
metadata = {
"id": doc_id,
"title": spec.get("title", ""),
"type": spec.get("type", ""),
"version": str(spec.get("vers", "")),
"working_group": spec.get("WG", ""),
"url": url,
"scope": self.get_scope(document["content"])
}
key = f"{doc_id}+-+{spec.get('title', '')}+-+{spec.get('type', '')}+-+{spec.get('vers', '')}+-+{spec.get('WG', '')}"
with self.DICT_LOCK:
self.indexed_specifications[key] = metadata
with self.DICT_LOCK:
self.processed_count += 1
status = "already indexed" if already_indexed else "indexed now"
print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}")
except Exception as e:
traceback.print_exc()
print(f"Error processing spec {spec.get('spec_num')} v{spec.get('vers')}: {e}")
with self.DICT_LOCK:
self.processed_count += 1
print(f"Progress: {self.processed_count}/{self.total_count} specs processed")
def get_document(self, spec_id: str, spec_title: str):
text = [f"{spec_id} - {spec_title}\n"]
for section in self.spec_contents:
if spec_id == section["doc_id"]:
text.extend([f"{section['section']}\n\n{section['content']}"])
return text
def create_bm25_index(self):
dataset_metadata = self.indexed_specifications.values()
unique_specs = set()
corpus_json = []
for specification in dataset_metadata:
if specification['id'] in unique_specs: continue
for section in self.spec_contents:
if specification['id'] == section['doc_id']:
corpus_json.append({"text": f"{section['section']}\n{section['content']}", "metadata": {
"id": specification['id'],
"title": specification['title'],
"section_title": section['section'],
"version": specification['version'],
"type": specification['type'],
"working_group": specification['working_group'],
"url": specification['url'],
"scope": specification['scope']
}})
corpus_text = [doc["text"] for doc in corpus_json]
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en")
print("Indexing BM25")
retriever = BM25HF(corpus=corpus_json)
retriever.index(corpus_tokens)
retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSections", token=os.environ.get("HF"))
unique_specs = set()
corpus_json = []
for specification in dataset_metadata:
if specification['id'] in unique_specs: continue
text_list = self.get_document(specification['id'], specification['title'])
text = "\n".join(text_list)
if len(text_list) == 1: continue
corpus_json.append({"text": text, "metadata": specification})
unique_specs.add(specification['id'])
corpus_text = [doc["text"] for doc in corpus_json]
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en")
print("Indexing BM25")
retriever = BM25HF(corpus=corpus_json)
retriever.index(corpus_tokens)
retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSingle", token=os.environ.get("HF"))
def run(self):
print("Fetching specification tables from 3GPP...")
specifications = self.fetch_spec_table()
self.total_count = len(specifications)
print(f"Processing {self.total_count} specs with {self.max_workers} threads...")
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(self.process_specification, spec) for spec in specifications]
for f in concurrent.futures.as_completed(futures):
if self.STOP_EVENT.is_set():
break
print("All specs processed.")
# Sauvegarde (identique au script original)
def save(self):
print("Saving indexed data...")
flat_metadata = [metadata for metadata in self.indexed_specifications.values()]
flat_docs = []
print("Flatting doc contents")
for doc_id, data in self.documents_by_spec_num.items():
for title, content in data["content"].items():
flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content})
print("Creating datasets ...")
push_spec_content = Dataset.from_list(flat_docs)
push_spec_metadata = Dataset.from_list(flat_metadata)
# Token handling assumed set in environment
print("Pushing ...")
push_spec_content.push_to_hub("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF"])
push_spec_metadata.push_to_hub("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF"])
self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list()
self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
print("Save finished.")
class SpecETSIIndexer:
def __init__(self, max_workers=16):
self.session = requests.Session()
self.session.verify = False
self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list()
self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
self.indexed_specifications = {}
self.specifications_passed = set()
self.processed_count = 0
self.total_count = 0
self.DICT_LOCK = threading.Lock()
self.DOCUMENT_LOCK = threading.Lock()
self.STOP_EVENT = threading.Event()
self.max_workers = max_workers
self.df = self._fetch_spec_table()
def _make_doc_index(self, specs):
doc_index = {}
for section in specs:
if section["doc_id"] not in doc_index:
doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]}
else:
doc_index[section["doc_id"]]["content"][section["section"]] = section["content"]
return doc_index
def _fetch_spec_table(self):
# Connexion login et récupération CSV TS/TR
print("Connexion login ETSI...")
self.session.post(
"https://portal.etsi.org/ETSIPages/LoginEOL.ashx",
verify=False,
headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ..."},
data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")}),
)
print("Récupération des métadonnées TS/TR …")
url_ts = "https://www.etsi.org/?option=com_standardssearch&view=data&format=csv&includeScope=1&page=1&search=&title=1&etsiNumber=1&content=0&version=0&onApproval=0&published=1&withdrawn=0&historical=0&isCurrent=1&superseded=0&harmonized=0&keyword=&TB=&stdType=TS&frequency=&mandate=&collection=&sort=1"
url_tr = url_ts.replace("stdType=TS", "stdType=TR")
data_ts = self.session.get(url_ts, verify=False).content
data_tr = self.session.get(url_tr, verify=False).content
df_ts = pd.read_csv(io.StringIO(data_ts.decode('utf-8')), sep=";", skiprows=1, index_col=False)
df_tr = pd.read_csv(io.StringIO(data_tr.decode('utf-8')), sep=";", skiprows=1, index_col=False)
backup_ts = df_ts["ETSI deliverable"]
backup_tr = df_tr["ETSI deliverable"]
df_ts["ETSI deliverable"] = df_ts["ETSI deliverable"].str.extract(r"\s*ETSI TS (\d+ \d+(?:-\d+(?:-\d+)?)?)")
df_tr["ETSI deliverable"] = df_tr["ETSI deliverable"].str.extract(r"\s*ETSI TR (\d+ \d+(?:-\d+(?:-\d+)?)?)")
version1 = backup_ts.str.extract(r"\s*ETSI TS \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)")
version2 = backup_tr.str.extract(r"\s*ETSI TR \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)")
df_ts["Version"] = version1[0]
df_tr["Version"] = version2[0]
def ver_tuple(v):
return tuple(map(int, v.split(".")))
df_ts["temp"] = df_ts["Version"].apply(ver_tuple)
df_tr["temp"] = df_tr["Version"].apply(ver_tuple)
df_ts["Type"] = "TS"
df_tr["Type"] = "TR"
df = pd.concat([df_ts, df_tr])
unique_df = df.loc[df.groupby("ETSI deliverable")["temp"].idxmax()]
unique_df = unique_df.drop(columns="temp")
unique_df = unique_df[(~unique_df["title"].str.contains("3GPP", case=True, na=False))]
df = df.drop(columns="temp")
df = df[(~df["title"].str.contains("3GPP", case=True, na=False))]
return df
@staticmethod
def hasher(specification: str, version: str):
return hashlib.md5(f"{specification}{version}".encode()).hexdigest()
@staticmethod
def get_scope(content):
for title, text in content.items():
if title.lower().endswith("scope"):
return text
return ""
def get_document(self, spec_id: str, spec_title: str):
text = [f"{spec_id} - {spec_title}\n"]
for section in self.spec_contents:
if spec_id == section["doc_id"]:
text.extend([f"{section['section']}\n\n{section['content']}"])
return text
def get_text(self, specification: str):
if self.STOP_EVENT.is_set():
return None, []
print(f"\n[INFO] Tentative de récupération de la spécification {specification}", flush=True)
try:
# Récupérer la ligne avec le bon lien PDF
row = self.df[self.df["ETSI deliverable"] == specification]
if row.empty:
print(f"[WARN] Spécification {specification} absente du tableau")
return None, []
pdf_link = row.iloc[0]["PDF link"]
response = self.session.get(
pdf_link,
headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...'}
)
if response.status_code != 200:
print(f"[ERREUR] Echec du téléchargement du PDF pour {specification}.")
return None, []
pdf = fitz.open(stream=response.content, filetype="pdf")
return pdf, pdf.get_toc()
except Exception as e:
print(f"[ERROR] Échec get_text pour {specification} : {e}", flush=True)
return None, []
def get_spec_content(self, specification: str):
def extract_sections(text, titles):
sections = {}
sorted_titles = sorted(titles, key=lambda t: text.find(t))
for i, title in enumerate(sorted_titles):
start = text.find(title)
if i + 1 < len(sorted_titles):
end = text.find(sorted_titles[i + 1])
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip())
else:
sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip())
return sections
if self.STOP_EVENT.is_set():
return {}
print(f"[INFO] Extraction du contenu de {specification}", flush=True)
pdf, doc_toc = self.get_text(specification)
text = []
if not pdf or not doc_toc:
print("[ERREUR] Pas de texte ou table of contents trouvé !")
return {}
# On prend à partir de la première réelle page référencée
first_page = 0
for level, title, page in doc_toc:
first_page = page - 1
break
for page in pdf[first_page:]:
text.append("\n".join([line.strip() for line in page.get_text().splitlines()]))
text = "\n".join(text)
if not text or not doc_toc or self.STOP_EVENT.is_set():
print("[ERREUR] Pas de texte/table of contents récupéré !")
return {}
titles = []
for level, title, page in doc_toc:
if self.STOP_EVENT.is_set():
return {}
if title and title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text:
titles.append('\n'.join(title.strip().split(" ", 1)))
return extract_sections(text, titles)
def process_specification(self, spec):
if self.STOP_EVENT.is_set():
return
try:
version = spec.get('Version')
if not version: return
doc_id = str(spec.get("ETSI deliverable"))
document = None
already_indexed = False
with self.DOCUMENT_LOCK:
if (doc_id in self.documents_by_spec_num
and self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version)
and doc_id not in self.specifications_passed):
document = self.documents_by_spec_num[doc_id]
self.specifications_passed.add(doc_id)
already_indexed = True
elif doc_id in self.specifications_passed:
document = self.documents_by_spec_num[doc_id]
already_indexed = True
else:
document_content = self.get_spec_content(doc_id)
if document_content:
self.documents_by_spec_num[doc_id] = {"content": document_content, "hash": self.hasher(doc_id, version)}
document = {"content": document_content, "hash": self.hasher(doc_id, version)}
self.specifications_passed.add(doc_id)
already_indexed = False
if document:
string_key = f"{doc_id}+-+{spec['title']}+-+{spec['Type']}+-+{spec['Version']}"
metadata = {
"id": str(doc_id),
"title": spec["title"],
"type": spec["Type"],
"version": version,
"url": spec["PDF link"],
"scope": "" if not document else self.get_scope(document["content"])
}
with self.DICT_LOCK:
self.indexed_specifications[string_key] = metadata
with self.DICT_LOCK:
self.processed_count += 1
status = "already indexed" if already_indexed else "indexed now"
print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}")
except Exception as e:
traceback.print_exc()
print(f"\n[ERREUR] Échec du traitement de {doc_id} {spec.get('Version')}: {e}", flush=True)
with self.DICT_LOCK:
self.processed_count += 1
print(f"Progress: {self.processed_count}/{self.total_count} specs processed")
def run(self):
print("Démarrage indexation ETSI…")
specifications = self.df.to_dict(orient="records")
self.total_count = len(specifications)
print(f"Traitement de {self.total_count} specs avec {self.max_workers} threads...\n")
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = [executor.submit(self.process_specification, spec) for spec in specifications]
for f in concurrent.futures.as_completed(futures):
if self.STOP_EVENT.is_set():
break
print(f"\nAll {self.processed_count}/{self.total_count} specs processed.")
def save(self):
print("\nSauvegarde en cours...", flush=True)
flat_metadata = [metadata for metadata in self.indexed_specifications.values()]
flat_docs = []
for doc_id, data in self.documents_by_spec_num.items():
for title, content in data["content"].items():
flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content})
push_spec_content = Dataset.from_list(flat_docs)
push_spec_metadata = Dataset.from_list(flat_metadata)
push_spec_content.push_to_hub("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF"])
push_spec_metadata.push_to_hub("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF"])
self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list()
self.documents_by_spec_num = self._make_doc_index(self.spec_contents)
print("Sauvegarde terminée.")
def create_bm25_index(self):
dataset_metadata = self.indexed_specifications.values()
unique_specs = set()
corpus_json = []
for specification in dataset_metadata:
if specification['id'] in unique_specs: continue
for section in self.spec_contents:
if specification['id'] == section['doc_id']:
corpus_json.append({"text": f"{section['section']}\n{section['content']}", "metadata": {
"id": specification['id'],
"title": specification['title'],
"section_title": section['section'],
"version": specification['version'],
"type": specification['type'],
"url": specification['url'],
"scope": specification['scope']
}})
corpus_text = [doc["text"] for doc in corpus_json]
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en")
print("Indexing BM25")
retriever = BM25HF(corpus=corpus_json)
retriever.index(corpus_tokens)
retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSections", token=os.environ.get("HF"))
unique_specs = set()
corpus_json = []
for specification in dataset_metadata:
if specification['id'] in unique_specs: continue
text_list = self.get_document(specification['id'], specification['title'])
text = "\n".join(text_list)
if len(text_list) == 1: continue
corpus_json.append({"text": text, "metadata": specification})
unique_specs.add(specification['id'])
corpus_text = [doc["text"] for doc in corpus_json]
corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en")
print("Indexing BM25")
retriever = BM25HF(corpus=corpus_json)
retriever.index(corpus_tokens)
retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSingle", token=os.environ.get("HF"))