import shutil import bm25s from bm25s.hf import BM25HF import threading, re, time, concurrent.futures, requests, os, hashlib, traceback, io, zipfile, subprocess, tempfile, json, fitz import pandas as pd import numpy as np from bs4 import BeautifulSoup from datasets import load_dataset, Dataset from datasets.data_files import EmptyDatasetError from dotenv import load_dotenv load_dotenv() class TDocIndexer: def __init__(self, max_workers=33): self.indexer_length = 0 self.dataset = "OrganizedProgrammers/3GPPTDocLocation" self.indexer = self.load_indexer() self.main_ftp_url = "https://3gpp.org/ftp" self.valid_doc_pattern = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE) self.max_workers = max_workers self.print_lock = threading.Lock() self.indexer_lock = threading.Lock() self.total_indexed = 0 self.processed_count = 0 self.total_count = 0 def load_indexer(self): self.indexer_length = 0 all_docs = {} tdoc_locations = load_dataset(self.dataset) tdoc_locations = tdoc_locations["train"].to_list() for doc in tdoc_locations: self.indexer_length += 1 all_docs[doc["doc_id"]] = doc["url"] return all_docs def save_indexer(self): """Save the updated index""" data = [] for doc_id, url in self.indexer.items(): data.append({"doc_id": doc_id, "url": url}) dataset = Dataset.from_list(data) dataset.push_to_hub(self.dataset, token=os.environ["HF"]) self.indexer = self.load_indexer() def get_docs_from_url(self, url): try: response = requests.get(url, verify=False, timeout=10) soup = BeautifulSoup(response.text, "html.parser") return [item.get_text() for item in soup.select("tr td a")] except Exception as e: with self.print_lock: print(f"Erreur lors de l'accès à {url}: {e}") return [] def is_valid_document_pattern(self, filename): return bool(self.valid_doc_pattern.match(filename)) def is_zip_file(self, filename): return filename.lower().endswith('.zip') def extract_doc_id(self, filename): if self.is_valid_document_pattern(filename): match = self.valid_doc_pattern.match(filename) if match: # Retourner le motif complet (comme S1-12345) full_id = filename.split('.')[0] # Enlever l'extension si présente return full_id.split('_')[0] # Enlever les suffixes après underscore si présents return None def process_zip_files(self, files_list, base_url, workshop=False): """Traiter une liste de fichiers pour trouver et indexer les ZIP valides""" indexed_count = 0 for file in files_list: if file in ['./', '../', 'ZIP/', 'zip/']: continue # Vérifier si c'est un fichier ZIP et s'il correspond au motif if self.is_zip_file(file) and (self.is_valid_document_pattern(file) or workshop): file_url = f"{base_url}/{file}" # Extraire l'ID du document doc_id = self.extract_doc_id(file) if doc_id is None: doc_id = file.split('.')[0] if doc_id: # Vérifier si ce fichier est déjà indexé with self.indexer_lock: if doc_id in self.indexer and self.indexer[doc_id] == file_url: continue # Ajouter ou mettre à jour l'index self.indexer[doc_id] = file_url indexed_count += 1 self.total_indexed += 1 return indexed_count def process_meeting(self, meeting, wg_url, workshop=False): """Traiter une réunion individuelle avec multithreading""" try: if meeting in ['./', '../']: return 0 meeting_url = f"{wg_url}/{meeting}" with self.print_lock: print(f"Vérification du meeting: {meeting}") # Vérifier le contenu de la réunion meeting_contents = self.get_docs_from_url(meeting_url) key = None if "docs" in [x.lower() for x in meeting_contents]: key = "docs" elif "tdocs" in [x.lower() for x in meeting_contents]: key = "tdocs" elif "tdoc" in [x.lower() for x in meeting_contents]: key = "tdoc" if key is not None: docs_url = f"{meeting_url}/{key}" with self.print_lock: print(f"Vérification des documents présent dans {docs_url}") # Récupérer la liste des fichiers dans le dossier Docs docs_files = self.get_docs_from_url(docs_url) # 1. Indexer les fichiers ZIP directement dans le dossier Docs docs_indexed_count = self.process_zip_files(docs_files, docs_url, workshop) if docs_indexed_count > 0: with self.print_lock: print(f"{docs_indexed_count} fichiers trouvés") # 2. Vérifier le sous-dossier ZIP s'il existe if "zip" in [x.lower() for x in docs_files]: zip_url = f"{docs_url}/zip" with self.print_lock: print(f"Vérification du dossier ./zip: {zip_url}") # Récupérer les fichiers dans le sous-dossier ZIP zip_files = self.get_docs_from_url(zip_url) # Indexer les fichiers ZIP dans le sous-dossier ZIP zip_indexed_count = self.process_zip_files(zip_files, zip_url, workshop) if zip_indexed_count > 0: with self.print_lock: print(f"{zip_indexed_count} fichiers trouvés") # Mise à jour du compteur de progression with self.indexer_lock: self.processed_count += 1 # Affichage de la progression with self.print_lock: progress = (self.processed_count / self.total_count) * 100 if self.total_count > 0 else 0 print(f"\rProgression: {self.processed_count}/{self.total_count} réunions traitées ({progress:.1f}%)") return 1 # Réunion traitée avec succès except Exception as e: with self.print_lock: print(f"\nErreur lors du traitement de la réunion {meeting}: {str(e)}") return 0 def process_workgroup(self, wg, main_url): """Traiter un groupe de travail avec multithreading pour ses réunions""" if wg in ['./', '../']: return wg_url = f"{main_url}/{wg}" with self.print_lock: print(f"Vérification du working group: {wg}") # Récupérer les dossiers de réunion meeting_folders = self.get_docs_from_url(wg_url) # Ajouter au compteur total self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) # Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [executor.submit(self.process_meeting, meeting, wg_url) for meeting in meeting_folders if meeting not in ['./', '../']] # Attendre que toutes les tâches soient terminées concurrent.futures.wait(futures) def index_all_tdocs(self): """Indexer tous les documents ZIP dans la structure FTP 3GPP avec multithreading""" print("Démarrage de l'indexation des TDocs 3GPP complète") start_time = time.time() docs_count_before = self.indexer_length # Principaux groupes TSG main_groups = ["tsg_sa", "tsg_ct", "tsg_ran"] # Ajouter d'autres si nécessaire for main_tsg in main_groups: print(f"Indexation de {main_tsg.upper()}...") main_url = f"{self.main_ftp_url}/{main_tsg}" # Récupérer les groupes de travail workgroups = self.get_docs_from_url(main_url) # Traiter chaque groupe de travail séquentiellement # (mais les réunions à l'intérieur seront traitées en parallèle) for wg in workgroups: self.process_workgroup(wg, main_url) docs_count_after = len(self.indexer) new_docs_count = abs(docs_count_after - docs_count_before) print(f"Indexation terminée en {time.time() - start_time:.2f} secondes") print(f"Nouveaux documents ZIP indexés: {new_docs_count}") print(f"Total des documents dans l'index: {docs_count_after}") return self.indexer def index_all_workshops(self): print("Démarrage de l'indexation des workshops ZIP 3GPP...") start_time = time.time() docs_count_before = len(self.indexer) print("\nIndexation du dossier 'workshop'") main_url = f"{self.main_ftp_url}/workshop" # Récupérer les dossiers de réunion meeting_folders = self.get_docs_from_url(main_url) # Ajouter au compteur total self.total_count += len([m for m in meeting_folders if m not in ['./', '../']]) # Utiliser ThreadPoolExecutor pour traiter les réunions en parallèle with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [executor.submit(self.process_meeting, meeting, main_url, workshop=True) for meeting in meeting_folders if meeting not in ['./', '../']] concurrent.futures.wait(futures) docs_count_after = len(self.indexer) new_docs_count = docs_count_after - docs_count_before print(f"\nIndexation terminée en {time.time() - start_time:.2f} secondes") print(f"Nouveaux documents ZIP indexés: {new_docs_count}") print(f"Total des documents dans l'index: {docs_count_after}") return self.indexer class Spec3GPPIndexer: def __init__(self, max_workers=16): self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list() self.documents_by_spec_num = self._make_doc_index(self.spec_contents) self.indexed_specifications = {} self.specifications_passed = set() self.processed_count = 0 self.total_count = 0 self.DICT_LOCK = threading.Lock() self.DOCUMENT_LOCK = threading.Lock() self.STOP_EVENT = threading.Event() self.max_workers = max_workers self.LIBREOFFICE_SEMAPHORE = threading.Semaphore(self.max_workers) def _make_doc_index(self, specs): doc_index = {} for section in specs: if section["doc_id"] not in doc_index: doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} else: doc_index[section["doc_id"]]["content"][section["section"]] = section["content"] return doc_index @staticmethod def version_to_code(version_str): chars = "0123456789abcdefghijklmnopqrstuvwxyz" parts = version_str.split('.') if len(parts) != 3: return None try: x, y, z = [int(p) for p in parts] except ValueError: return None if x < 36 and y < 36 and z < 36: return f"{chars[x]}{chars[y]}{chars[z]}" else: return f"{str(x).zfill(2)}{str(y).zfill(2)}{str(z).zfill(2)}" @staticmethod def hasher(specification, version_code): return hashlib.md5(f"{specification}{version_code}".encode()).hexdigest() @staticmethod def get_scope(content): for title, text in content.items(): if title.lower().endswith("scope"): return text return "" def get_text(self, specification, version_code): if self.STOP_EVENT.is_set(): return [] doc_id = specification series = doc_id.split(".")[0] url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" try: response = requests.get(url, verify=False) if response.status_code != 200: return [] zip_bytes = io.BytesIO(response.content) with zipfile.ZipFile(zip_bytes) as zip_file: # Filtrer uniquement fichiers .doc et .docx docx_files = [f for f in zip_file.namelist() if f.lower().endswith(('.doc', '.docx'))] if not docx_files: return [] full_text = [] for doc_file in docx_files: with tempfile.TemporaryDirectory() as tmpdir: extracted_path = os.path.join(tmpdir, os.path.basename(doc_file)) with open(extracted_path, 'wb') as f: f.write(zip_file.read(doc_file)) # Profil libreoffice temp dédié profile_dir = tempfile.mkdtemp(prefix="libreoffice_profile_") try: with self.LIBREOFFICE_SEMAPHORE: cmd = [ 'soffice', '--headless', f'-env:UserInstallation=file://{profile_dir}', '--convert-to', 'txt:Text', '--outdir', tmpdir, extracted_path ] subprocess.run(cmd, check=True, timeout=60*5, stdout=subprocess.PIPE, stderr=subprocess.PIPE) txt_file = os.path.splitext(extracted_path)[0] + '.txt' if os.path.exists(txt_file): with open(txt_file, 'r', encoding='utf-8', errors='ignore') as ftxt: full_text.extend(ftxt.readlines()) finally: shutil.rmtree(profile_dir, ignore_errors=True) return full_text except Exception as e: print(f"Error getting text for {specification} v{version_code}: {e}") return [] def get_spec_content(self, specification, version_code): if self.STOP_EVENT.is_set(): return {} text = self.get_text(specification, version_code) if not text: return {} chapters = [] chapter_regex = re.compile(r"^(\d+[a-z]?(?:\.\d+)*)\t[A-Z0-9][\ \S]+[^\.]$") for i, line in enumerate(text): if chapter_regex.fullmatch(line): chapters.append((i, line)) document = {} for i in range(len(chapters)): start_index, chapter_title = chapters[i] end_index = chapters[i+1][0] if i+1 < len(chapters) else len(text) content_lines = text[start_index + 1:end_index] document[chapter_title.replace("\t", " ")] = "\n".join(content_lines) return document def fetch_spec_table(self): response = requests.get( 'https://www.3gpp.org/dynareport?code=status-report.htm', headers={"User-Agent": 'Mozilla/5.0'}, verify=False ) dfs = pd.read_html(io.StringIO(response.text)) for x in range(len(dfs)): dfs[x] = dfs[x].replace({np.nan: None}) columns_needed = [0, 1, 2, 3, 4] extracted_dfs = [df.iloc[:, columns_needed] for df in dfs] columns = [x.replace("\xa0", "_") for x in extracted_dfs[0].columns] specifications = [] for df in extracted_dfs: for index, row in df.iterrows(): doc = row.to_list() doc_dict = dict(zip(columns, doc)) specifications.append(doc_dict) return specifications def process_specification(self, spec): if self.STOP_EVENT.is_set(): return try: doc_id = str(spec['spec_num']) version_code = self.version_to_code(str(spec['vers'])) if not version_code: with self.DICT_LOCK: self.processed_count += 1 return document = None already_indexed = False with self.DOCUMENT_LOCK: doc_in_cache = doc_id in self.documents_by_spec_num and \ self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version_code) if doc_in_cache and doc_id not in self.specifications_passed: document = self.documents_by_spec_num[doc_id] self.specifications_passed.add(doc_id) already_indexed = True elif doc_id not in self.specifications_passed: doc_content = self.get_spec_content(doc_id, version_code) if doc_content: document = {"content": doc_content, "hash": self.hasher(doc_id, version_code)} with self.DOCUMENT_LOCK: self.documents_by_spec_num[doc_id] = document self.specifications_passed.add(doc_id) already_indexed = False if document: url = f"https://www.3gpp.org/ftp/Specs/archive/{doc_id.split('.')[0]}_series/{doc_id}/{doc_id.replace('.', '')}-{version_code}.zip" metadata = { "id": doc_id, "title": spec.get("title", ""), "type": spec.get("type", ""), "version": str(spec.get("vers", "")), "working_group": spec.get("WG", ""), "url": url, "scope": self.get_scope(document["content"]) } key = f"{doc_id}+-+{spec.get('title', '')}+-+{spec.get('type', '')}+-+{spec.get('vers', '')}+-+{spec.get('WG', '')}" with self.DICT_LOCK: self.indexed_specifications[key] = metadata with self.DICT_LOCK: self.processed_count += 1 status = "already indexed" if already_indexed else "indexed now" print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}") except Exception as e: traceback.print_exc() print(f"Error processing spec {spec.get('spec_num')} v{spec.get('vers')}: {e}") with self.DICT_LOCK: self.processed_count += 1 print(f"Progress: {self.processed_count}/{self.total_count} specs processed") def get_document(self, spec_id: str, spec_title: str): text = [f"{spec_id} - {spec_title}\n"] for section in self.spec_contents: if spec_id == section["doc_id"]: text.extend([f"{section['section']}\n\n{section['content']}"]) return text def create_bm25_index(self): dataset_metadata = self.indexed_specifications.values() unique_specs = set() corpus_json = [] for specification in dataset_metadata: if specification['id'] in unique_specs: continue for section in self.spec_contents: if specification['id'] == section['doc_id']: corpus_json.append({"text": f"{section['section']}\n{section['content']}", "metadata": { "id": specification['id'], "title": specification['title'], "section_title": section['section'], "version": specification['version'], "type": specification['type'], "working_group": specification['working_group'], "url": specification['url'], "scope": specification['scope'] }}) corpus_text = [doc["text"] for doc in corpus_json] corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") print("Indexing BM25") retriever = BM25HF(corpus=corpus_json) retriever.index(corpus_tokens) retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSections", token=os.environ.get("HF")) unique_specs = set() corpus_json = [] for specification in dataset_metadata: if specification['id'] in unique_specs: continue text_list = self.get_document(specification['id'], specification['title']) text = "\n".join(text_list) if len(text_list) == 1: continue corpus_json.append({"text": text, "metadata": specification}) unique_specs.add(specification['id']) corpus_text = [doc["text"] for doc in corpus_json] corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") print("Indexing BM25") retriever = BM25HF(corpus=corpus_json) retriever.index(corpus_tokens) retriever.save_to_hub("OrganizedProgrammers/3GPPBM25IndexSingle", token=os.environ.get("HF")) def run(self): print("Fetching specification tables from 3GPP...") specifications = self.fetch_spec_table() self.total_count = len(specifications) print(f"Processing {self.total_count} specs with {self.max_workers} threads...") with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [executor.submit(self.process_specification, spec) for spec in specifications] for f in concurrent.futures.as_completed(futures): if self.STOP_EVENT.is_set(): break print("All specs processed.") # Sauvegarde (identique au script original) def save(self): print("Saving indexed data...") flat_metadata = [metadata for metadata in self.indexed_specifications.values()] flat_docs = [] print("Flatting doc contents") for doc_id, data in self.documents_by_spec_num.items(): for title, content in data["content"].items(): flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) print("Creating datasets ...") push_spec_content = Dataset.from_list(flat_docs) push_spec_metadata = Dataset.from_list(flat_metadata) # Token handling assumed set in environment print("Pushing ...") push_spec_content.push_to_hub("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF"]) push_spec_metadata.push_to_hub("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF"]) self.spec_contents = load_dataset("OrganizedProgrammers/3GPPSpecContent")["train"].to_list() self.documents_by_spec_num = self._make_doc_index(self.spec_contents) print("Save finished.") class SpecETSIIndexer: def __init__(self, max_workers=16): self.session = requests.Session() self.session.verify = False self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list() self.documents_by_spec_num = self._make_doc_index(self.spec_contents) self.indexed_specifications = {} self.specifications_passed = set() self.processed_count = 0 self.total_count = 0 self.DICT_LOCK = threading.Lock() self.DOCUMENT_LOCK = threading.Lock() self.STOP_EVENT = threading.Event() self.max_workers = max_workers self.df = self._fetch_spec_table() def _make_doc_index(self, specs): doc_index = {} for section in specs: if section["doc_id"] not in doc_index: doc_index[section["doc_id"]] = {"content": {section["section"]: section["content"]}, "hash": section["hash"]} else: doc_index[section["doc_id"]]["content"][section["section"]] = section["content"] return doc_index def _fetch_spec_table(self): # Connexion login et récupération CSV TS/TR print("Connexion login ETSI...") self.session.post( "https://portal.etsi.org/ETSIPages/LoginEOL.ashx", verify=False, headers={"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) ..."}, data=json.dumps({"username": os.environ.get("EOL_USER"), "password": os.environ.get("EOL_PASSWORD")}), ) print("Récupération des métadonnées TS/TR …") url_ts = "https://www.etsi.org/?option=com_standardssearch&view=data&format=csv&includeScope=1&page=1&search=&title=1&etsiNumber=1&content=0&version=0&onApproval=0&published=1&withdrawn=0&historical=0&isCurrent=1&superseded=0&harmonized=0&keyword=&TB=&stdType=TS&frequency=&mandate=&collection=&sort=1" url_tr = url_ts.replace("stdType=TS", "stdType=TR") data_ts = self.session.get(url_ts, verify=False).content data_tr = self.session.get(url_tr, verify=False).content df_ts = pd.read_csv(io.StringIO(data_ts.decode('utf-8')), sep=";", skiprows=1, index_col=False) df_tr = pd.read_csv(io.StringIO(data_tr.decode('utf-8')), sep=";", skiprows=1, index_col=False) backup_ts = df_ts["ETSI deliverable"] backup_tr = df_tr["ETSI deliverable"] df_ts["ETSI deliverable"] = df_ts["ETSI deliverable"].str.extract(r"\s*ETSI TS (\d+ \d+(?:-\d+(?:-\d+)?)?)") df_tr["ETSI deliverable"] = df_tr["ETSI deliverable"].str.extract(r"\s*ETSI TR (\d+ \d+(?:-\d+(?:-\d+)?)?)") version1 = backup_ts.str.extract(r"\s*ETSI TS \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") version2 = backup_tr.str.extract(r"\s*ETSI TR \d+ \d+(?:-\d+(?:-\d+)?)? V(\d+\.\d+\.\d+)") df_ts["Version"] = version1[0] df_tr["Version"] = version2[0] def ver_tuple(v): return tuple(map(int, v.split("."))) df_ts["temp"] = df_ts["Version"].apply(ver_tuple) df_tr["temp"] = df_tr["Version"].apply(ver_tuple) df_ts["Type"] = "TS" df_tr["Type"] = "TR" df = pd.concat([df_ts, df_tr]) unique_df = df.loc[df.groupby("ETSI deliverable")["temp"].idxmax()] unique_df = unique_df.drop(columns="temp") unique_df = unique_df[(~unique_df["title"].str.contains("3GPP", case=True, na=False))] df = df.drop(columns="temp") df = df[(~df["title"].str.contains("3GPP", case=True, na=False))] return df @staticmethod def hasher(specification: str, version: str): return hashlib.md5(f"{specification}{version}".encode()).hexdigest() @staticmethod def get_scope(content): for title, text in content.items(): if title.lower().endswith("scope"): return text return "" def get_document(self, spec_id: str, spec_title: str): text = [f"{spec_id} - {spec_title}\n"] for section in self.spec_contents: if spec_id == section["doc_id"]: text.extend([f"{section['section']}\n\n{section['content']}"]) return text def get_text(self, specification: str): if self.STOP_EVENT.is_set(): return None, [] print(f"\n[INFO] Tentative de récupération de la spécification {specification}", flush=True) try: # Récupérer la ligne avec le bon lien PDF row = self.df[self.df["ETSI deliverable"] == specification] if row.empty: print(f"[WARN] Spécification {specification} absente du tableau") return None, [] pdf_link = row.iloc[0]["PDF link"] response = self.session.get( pdf_link, headers={"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ...'} ) if response.status_code != 200: print(f"[ERREUR] Echec du téléchargement du PDF pour {specification}.") return None, [] pdf = fitz.open(stream=response.content, filetype="pdf") return pdf, pdf.get_toc() except Exception as e: print(f"[ERROR] Échec get_text pour {specification} : {e}", flush=True) return None, [] def get_spec_content(self, specification: str): def extract_sections(text, titles): sections = {} sorted_titles = sorted(titles, key=lambda t: text.find(t)) for i, title in enumerate(sorted_titles): start = text.find(title) if i + 1 < len(sorted_titles): end = text.find(sorted_titles[i + 1]) sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:end].replace(title, "").strip().rstrip()) else: sections[re.sub(r"\s+", " ", title)] = re.sub(r"\s+", " ", text[start:].replace(title, "").strip().rstrip()) return sections if self.STOP_EVENT.is_set(): return {} print(f"[INFO] Extraction du contenu de {specification}", flush=True) pdf, doc_toc = self.get_text(specification) text = [] if not pdf or not doc_toc: print("[ERREUR] Pas de texte ou table of contents trouvé !") return {} # On prend à partir de la première réelle page référencée first_page = 0 for level, title, page in doc_toc: first_page = page - 1 break for page in pdf[first_page:]: text.append("\n".join([line.strip() for line in page.get_text().splitlines()])) text = "\n".join(text) if not text or not doc_toc or self.STOP_EVENT.is_set(): print("[ERREUR] Pas de texte/table of contents récupéré !") return {} titles = [] for level, title, page in doc_toc: if self.STOP_EVENT.is_set(): return {} if title and title[0].isnumeric() and '\n'.join(title.strip().split(" ", 1)) in text: titles.append('\n'.join(title.strip().split(" ", 1))) return extract_sections(text, titles) def process_specification(self, spec): if self.STOP_EVENT.is_set(): return try: version = spec.get('Version') if not version: return doc_id = str(spec.get("ETSI deliverable")) document = None already_indexed = False with self.DOCUMENT_LOCK: if (doc_id in self.documents_by_spec_num and self.documents_by_spec_num[doc_id]["hash"] == self.hasher(doc_id, version) and doc_id not in self.specifications_passed): document = self.documents_by_spec_num[doc_id] self.specifications_passed.add(doc_id) already_indexed = True elif doc_id in self.specifications_passed: document = self.documents_by_spec_num[doc_id] already_indexed = True else: document_content = self.get_spec_content(doc_id) if document_content: self.documents_by_spec_num[doc_id] = {"content": document_content, "hash": self.hasher(doc_id, version)} document = {"content": document_content, "hash": self.hasher(doc_id, version)} self.specifications_passed.add(doc_id) already_indexed = False if document: string_key = f"{doc_id}+-+{spec['title']}+-+{spec['Type']}+-+{spec['Version']}" metadata = { "id": str(doc_id), "title": spec["title"], "type": spec["Type"], "version": version, "url": spec["PDF link"], "scope": "" if not document else self.get_scope(document["content"]) } with self.DICT_LOCK: self.indexed_specifications[string_key] = metadata with self.DICT_LOCK: self.processed_count += 1 status = "already indexed" if already_indexed else "indexed now" print(f"Spec {doc_id} ({spec.get('title', '')}): {status} - Progress {self.processed_count}/{self.total_count}") except Exception as e: traceback.print_exc() print(f"\n[ERREUR] Échec du traitement de {doc_id} {spec.get('Version')}: {e}", flush=True) with self.DICT_LOCK: self.processed_count += 1 print(f"Progress: {self.processed_count}/{self.total_count} specs processed") def run(self): print("Démarrage indexation ETSI…") specifications = self.df.to_dict(orient="records") self.total_count = len(specifications) print(f"Traitement de {self.total_count} specs avec {self.max_workers} threads...\n") with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [executor.submit(self.process_specification, spec) for spec in specifications] for f in concurrent.futures.as_completed(futures): if self.STOP_EVENT.is_set(): break print(f"\nAll {self.processed_count}/{self.total_count} specs processed.") def save(self): print("\nSauvegarde en cours...", flush=True) flat_metadata = [metadata for metadata in self.indexed_specifications.values()] flat_docs = [] for doc_id, data in self.documents_by_spec_num.items(): for title, content in data["content"].items(): flat_docs.append({"hash": data["hash"], "doc_id": doc_id, "section": title, "content": content}) push_spec_content = Dataset.from_list(flat_docs) push_spec_metadata = Dataset.from_list(flat_metadata) push_spec_content.push_to_hub("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF"]) push_spec_metadata.push_to_hub("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF"]) self.spec_contents = load_dataset("OrganizedProgrammers/ETSISpecContent")["train"].to_list() self.documents_by_spec_num = self._make_doc_index(self.spec_contents) print("Sauvegarde terminée.") def create_bm25_index(self): dataset_metadata = self.indexed_specifications.values() unique_specs = set() corpus_json = [] for specification in dataset_metadata: if specification['id'] in unique_specs: continue for section in self.spec_contents: if specification['id'] == section['doc_id']: corpus_json.append({"text": f"{section['section']}\n{section['content']}", "metadata": { "id": specification['id'], "title": specification['title'], "section_title": section['section'], "version": specification['version'], "type": specification['type'], "url": specification['url'], "scope": specification['scope'] }}) corpus_text = [doc["text"] for doc in corpus_json] corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") print("Indexing BM25") retriever = BM25HF(corpus=corpus_json) retriever.index(corpus_tokens) retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSections", token=os.environ.get("HF")) unique_specs = set() corpus_json = [] for specification in dataset_metadata: if specification['id'] in unique_specs: continue text_list = self.get_document(specification['id'], specification['title']) text = "\n".join(text_list) if len(text_list) == 1: continue corpus_json.append({"text": text, "metadata": specification}) unique_specs.add(specification['id']) corpus_text = [doc["text"] for doc in corpus_json] corpus_tokens = bm25s.tokenize(corpus_text, stopwords="en") print("Indexing BM25") retriever = BM25HF(corpus=corpus_json) retriever.index(corpus_tokens) retriever.save_to_hub("OrganizedProgrammers/ETSIBM25IndexSingle", token=os.environ.get("HF"))