Spaces:
Running
Running
import time | |
from datetime import datetime | |
import os, warnings, nltk, json, subprocess | |
import numpy as np | |
from nltk.stem import WordNetLemmatizer | |
from dotenv import load_dotenv | |
from sklearn.preprocessing import MinMaxScaler | |
from bs4 import BeautifulSoup | |
import requests | |
from urllib.parse import parse_qs, urlparse | |
warnings.filterwarnings('ignore') | |
nltk.download('wordnet') | |
load_dotenv() | |
os.environ['CURL_CA_BUNDLE'] = "" | |
from huggingface_hub import configure_http_backend | |
def backend_factory() -> requests.Session: | |
session = requests.Session() | |
session.verify = False | |
return session | |
configure_http_backend(backend_factory=backend_factory) | |
from datasets import load_dataset | |
import bm25s | |
from bm25s.hf import BM25HF | |
from fastapi import FastAPI, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import FileResponse | |
from fastapi.staticfiles import StaticFiles | |
from schemas import * | |
from classes import * | |
lemmatizer = WordNetLemmatizer() | |
spec_metadatas_3gpp = load_dataset("OrganizedProgrammers/3GPPSpecMetadata") | |
spec_contents_3gpp = load_dataset("OrganizedProgrammers/3GPPSpecContent") | |
tdoc_locations_3gpp = load_dataset("OrganizedProgrammers/3GPPTDocLocation") | |
spec_metadatas_etsi = load_dataset("OrganizedProgrammers/ETSISpecMetadata") | |
spec_contents_etsi = load_dataset("OrganizedProgrammers/ETSISpecContent") | |
spec_contents_3gpp = spec_contents_3gpp["train"].to_list() | |
spec_metadatas_3gpp = spec_metadatas_3gpp["train"].to_list() | |
spec_contents_etsi = spec_contents_etsi["train"].to_list() | |
spec_metadatas_etsi = spec_metadatas_etsi["train"].to_list() | |
tdoc_locations = tdoc_locations_3gpp["train"].to_list() | |
bm25_index_3gpp = BM25HF.load_from_hub("OrganizedProgrammers/3GPPBM25IndexSingle", load_corpus=True, token=os.environ["HF_TOKEN"], ) | |
bm25_index_etsi = BM25HF.load_from_hub("OrganizedProgrammers/ETSIBM25IndexSingle", load_corpus=True, token=os.environ["HF_TOKEN"]) | |
def extract_args_and_map(href): | |
if not href or not href.lower().startswith('javascript:'): | |
return None | |
js = href[len('javascript:'):].strip() | |
m = re.match(r'\w+\((.*)\)', js) | |
if not m: | |
return None | |
args_str = m.group(1).strip() | |
parts = [part.strip() for part in args_str.split(',', 1)] | |
if len(parts) != 2: | |
return None | |
try: | |
media_id = int(parts[0]) | |
except ValueError: | |
return None | |
spec_type = parts[1].strip() | |
if (spec_type.startswith("'") and spec_type.endswith("'")) or (spec_type.startswith('"') and spec_type.endswith('"')): | |
spec_type = spec_type[1:-1] | |
return media_id, spec_type | |
url = "https://globalplatform.org/wp-content/themes/globalplatform/ajax/specs-library.php" | |
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"} | |
resp = requests.post(url, verify=False, headers=headers) | |
soup = BeautifulSoup(resp.text, 'html.parser') | |
panels = soup.find_all('div', class_='panel panel-default') | |
gp_spec_locations = {} | |
for panel in panels: | |
header = ''.join([t for t in panel.find('a').children if t.name is None]).strip() | |
try: | |
title, doc_id = header.split(' | ') | |
panel_body = panel.find('div', class_='panel-body') | |
download_btn_href = panel_body.find_all('a', href=lambda href: href and href.strip().lower().startswith('javascript:'))[0] | |
media_id, spec_type = extract_args_and_map(download_btn_href['href']) | |
changes_history = panel.find_all('div', class_="row") | |
paragraphs_ch = [version.find('p').text for version in changes_history][::-1] | |
document_commits = [] | |
for version in range(len(paragraphs_ch)): | |
document_commits.append(f"Version {version + 1} : {paragraphs_ch[version]}") | |
gp_spec_locations[doc_id] = {"title": title, "file_id": media_id, "committee": spec_type, "summary": "\n".join(document_commits)} | |
except: | |
continue | |
def get_docs_from_url(url): | |
"""Get list of documents/directories from a URL""" | |
try: | |
response = requests.get(url, verify=False, timeout=10) | |
soup = BeautifulSoup(response.text, "html.parser") | |
return [item.get_text() for item in soup.select("tr td a")] | |
except Exception as e: | |
print(f"Error accessing {url}: {e}") | |
return [] | |
def get_tdoc_url(doc_id): | |
for tdoc in tdoc_locations: | |
if tdoc["doc_id"] == doc_id: | |
return tdoc["url"] | |
return "Document not indexed (re-indexing documents ?)" | |
def get_spec_url(document): | |
series = document.split(".")[0].zfill(2) | |
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{document}" | |
versions = get_docs_from_url(url) | |
return url + "/" + versions[-1] if versions != [] else f"Specification {document} not found" | |
def get_document(spec_id: str, spec_title: str, source: str): | |
text = [f"{spec_id} - {spec_title}"] | |
spec_contents = spec_contents_3gpp if source == "3GPP" else spec_contents_etsi if source == "ETSI" else spec_contents_3gpp + spec_contents_etsi | |
for section in spec_contents: | |
if not isinstance(section, str) and spec_id == section["doc_id"]: | |
text.extend([section['section'], section['content']]) | |
return text | |
def get_gp_spec_url(data): | |
file_id = data['file_id'] | |
spec_type = data['committee'] | |
url = "https://globalplatform.org/wp-content/themes/globalplatform/ajax/download-spec-submit.php" | |
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36"} | |
resp = requests.post(url, verify=False, headers=headers, data={"first_name": "", "last_name": "", "company": "", "email": "", "media_id": file_id, "spec_type": spec_type, "agree": "true"}) | |
r = resp.text | |
mat = re.search(r"window\.location\.href\s*=\s*'([^']+)'", r) | |
if mat: | |
full_url = mat.group(1) | |
parsed_url = urlparse(full_url) | |
query_params = parse_qs(parsed_url.query) | |
return query_params.get('f')[0] | |
tags_metadata = [ | |
{ | |
"name": "Document Retrieval", | |
"description": """ | |
Direct document lookup operations for retrieving specific documents by their unique identifiers. | |
These endpoints provide fast access to document URLs, versions, and metadata without requiring keyword searches. | |
Perfect for when you know the exact document ID you're looking for. | |
""", | |
}, | |
{ | |
"name": "Content Search", | |
"description": """ | |
Advanced search operations for finding documents based on keywords and content matching. | |
Includes both quick metadata-based searches and deep content analysis with flexible filtering options. | |
Supports different search modes and logical operators for precise results. | |
""", | |
}, | |
] | |
app = FastAPI( | |
title="3GPP & ETSI Document Finder API", | |
description=open('documentation.md').read(), | |
openapi_tags=tags_metadata | |
) | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
etsi_doc_finder = ETSIDocFinder() | |
etsi_spec_finder = ETSISpecFinder() | |
valid_3gpp_doc_format = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE) | |
valid_3gpp_spec_format = re.compile(r'^\d{2}\.\d{3}(?:-\d+)?') | |
valid_etsi_doc_format = re.compile(r'^(?:SET|SCP|SETTEC|SETREQ|SCPTEC|SCPREQ)\(\d+\)\d+(?:r\d+)?', flags=re.IGNORECASE) | |
valid_etsi_spec_format = re.compile(r'^\d{3} \d{3}(?:-\d+)?') | |
def frontend(): | |
return FileResponse(os.path.join('templates', 'index.html')) | |
def reconnect(): | |
data = etsi_doc_finder.connect() | |
if data.get('error', None) and not data.get('error'): | |
return data['message'] | |
raise HTTPException(status_code=400, detail=data['message']) | |
def find_document(request: DocRequest): | |
start_time = time.time() | |
document = request.doc_id | |
if valid_3gpp_doc_format.match(document): | |
url = get_tdoc_url(document) | |
elif valid_3gpp_spec_format.match(document): | |
url = get_spec_url(document) | |
elif valid_etsi_doc_format.match(document): | |
url = etsi_doc_finder.search_document(document) | |
elif valid_etsi_spec_format.match(document): | |
url = etsi_spec_finder.search_document(document) | |
elif document.startswith("GP"): | |
for sp in gp_spec_locations: | |
if document.lower() in sp.lower(): | |
url = get_gp_spec_url(gp_spec_locations[sp]) | |
else: | |
url = "Document ID not supported" | |
if "Specification" in url or "Document" in url: | |
raise HTTPException(status_code=404, detail=url) | |
version = None | |
if valid_3gpp_spec_format.match(document): | |
version = url.split("/")[-1].replace(".zip", "").split("-")[-1] | |
scope = None | |
spec_metadatas = spec_metadatas_3gpp if valid_3gpp_spec_format.match(document) else spec_metadatas_etsi | |
for spec in spec_metadatas: | |
if spec['id'] == document: | |
scope = spec['scope'] | |
break | |
return DocResponse( | |
doc_id=document, | |
version=version, | |
url=url, | |
search_time=time.time() - start_time, | |
scope=scope | |
) | |
def find_document_batch(request: BatchDocRequest): | |
start_time = time.time() | |
documents = request.doc_ids | |
results = {} | |
missing = [] | |
for document in documents: | |
if valid_3gpp_doc_format.match(document): | |
url = get_tdoc_url(document) | |
elif valid_3gpp_spec_format.match(document): | |
url = get_spec_url(document) | |
elif valid_etsi_doc_format.match(document): | |
etsi_doc_finder.search_document(document) | |
elif valid_etsi_spec_format.match(document): | |
etsi_spec_finder.search_document(document) | |
elif document.startswith("GP"): | |
for sp in gp_spec_locations: | |
if document.lower() in sp.lower(): | |
url = get_gp_spec_url(gp_spec_locations[sp]) | |
else: | |
url = "Document ID not supported" | |
if "Specification" in url or "Document" in url: | |
missing.append(document) | |
else: | |
results[document] = url | |
return BatchDocResponse( | |
results=results, | |
missing=missing, | |
search_time=time.time()-start_time | |
) | |
def search_specifications(request: KeywordRequest): | |
start_time = time.time() | |
boolSensitiveCase = request.case_sensitive | |
search_mode = request.search_mode | |
source = request.source | |
spec_metadatas = spec_metadatas_3gpp if source == "3GPP" else spec_metadatas_etsi if source == "ETSI" else spec_metadatas_3gpp + spec_metadatas_etsi | |
spec_type = request.spec_type | |
keywords = [string.lower() if not boolSensitiveCase else string for string in request.keywords.split(",")] | |
print(keywords) | |
unique_specs = set() | |
results = [] | |
if keywords == [""] and search_mode == "deep": | |
raise HTTPException(status_code=400, detail="You must enter keywords in deep search mode !") | |
for spec in spec_metadatas: | |
valid = False | |
if spec['id'] in unique_specs: continue | |
if spec.get('type', None) is None or (spec_type is not None and spec["type"] != spec_type): continue | |
if search_mode == "deep": | |
contents = [] | |
doc = get_document(spec["id"], spec["title"], source) | |
docValid = len(doc) > 1 | |
if request.mode == "and": | |
string = f"{spec['id']}+-+{spec['title']}+-+{spec['type']}+-+{spec['version']}" | |
if all(keyword in (string.lower() if not boolSensitiveCase else string) for keyword in keywords): | |
valid = True | |
if search_mode == "deep": | |
if docValid: | |
for x in range(1, len(doc) - 1, 2): | |
section_title = doc[x] | |
section_content = doc[x+1] | |
if "reference" not in section_title.lower() and "void" not in section_title.lower() and "annex" not in section_content.lower(): | |
if all(keyword in (section_content.lower() if not boolSensitiveCase else section_content) for keyword in keywords): | |
valid = True | |
contents.append({section_title: section_content}) | |
elif request.mode == "or": | |
string = f"{spec['id']}+-+{spec['title']}+-+{spec['type']}+-+{spec['version']}" | |
if any(keyword in (string.lower() if not boolSensitiveCase else string) for keyword in keywords): | |
valid = True | |
if search_mode == "deep": | |
if docValid: | |
for x in range(1, len(doc) - 1, 2): | |
section_title = doc[x] | |
section_content = doc[x+1] | |
if "reference" not in section_title.lower() and "void" not in section_title.lower() and "annex" not in section_content.lower(): | |
if any(keyword in (section_content.lower() if not boolSensitiveCase else section_content) for keyword in keywords): | |
valid = True | |
contents.append({section_title: section_content}) | |
if valid: | |
spec_content = spec | |
if search_mode == "deep": | |
spec_content["contains"] = {k: v for d in contents for k, v in d.items()} | |
results.append(spec_content) | |
else: | |
unique_specs.add(spec['id']) | |
if len(results) > 0: | |
return KeywordResponse( | |
results=results, | |
search_time=time.time() - start_time | |
) | |
else: | |
raise HTTPException(status_code=404, detail="Specifications not found") | |
def bm25_search_specification(request: BM25KeywordRequest): | |
start_time = time.time() | |
source = request.source | |
spec_type = request.spec_type | |
threshold = request.threshold | |
query = request.keywords | |
results_out = [] | |
query_tokens = bm25s.tokenize(query) | |
if source == "3GPP": | |
results, scores = bm25_index_3gpp.retrieve(query_tokens, k=len(bm25_index_3gpp.corpus)) | |
elif source == "ETSI": | |
results, scores = bm25_index_etsi.retrieve(query_tokens, k=len(bm25_index_etsi.corpus)) | |
else: | |
print(len(bm25_index_3gpp.corpus), len(bm25_index_etsi.corpus)) | |
results1, scores1 = bm25_index_3gpp.retrieve(query_tokens, k=len(bm25_index_3gpp.corpus)) | |
results2, scores2 = bm25_index_etsi.retrieve(query_tokens, k=len(bm25_index_etsi.corpus)) | |
results = np.concatenate([results1, results2], axis=1) | |
scores = np.concatenate([scores1, scores2], axis=1) | |
def calculate_boosted_score(metadata, score, query): | |
title = set(metadata['title'].lower().split()) | |
q = set(query.lower().split()) | |
spec_id_presence = 0.5 if metadata['id'].lower() in q else 0 | |
booster = len(q & title) * 0.5 | |
return score + spec_id_presence + booster | |
spec_scores = {} | |
spec_indices = {} | |
spec_details = {} | |
for i in range(results.shape[1]): | |
doc = results[0, i] | |
score = scores[0, i] | |
spec = doc["metadata"]["id"] | |
boosted_score = calculate_boosted_score(doc['metadata'], score, query) | |
if spec not in spec_scores or boosted_score > spec_scores[spec]: | |
spec_scores[spec] = boosted_score | |
spec_indices[spec] = i | |
spec_details[spec] = { | |
'original_score': score, | |
'boosted_score': boosted_score, | |
'doc': doc | |
} | |
def normalize_scores(scores_dict): | |
if not scores_dict: | |
return {} | |
scores_array = np.array(list(scores_dict.values())).reshape(-1, 1) | |
scaler = MinMaxScaler() | |
normalized_scores = scaler.fit_transform(scores_array).flatten() | |
normalized_dict = {} | |
for i, spec in enumerate(scores_dict.keys()): | |
normalized_dict[spec] = normalized_scores[i] | |
return normalized_dict | |
normalized_scores = normalize_scores(spec_scores) | |
for spec in spec_details: | |
spec_details[spec]["normalized_score"] = normalized_scores[spec] | |
unique_specs = sorted(normalized_scores.keys(), key=lambda x: normalized_scores[x], reverse=True) | |
for rank, spec in enumerate(unique_specs, 1): | |
details = spec_details[spec] | |
metadata = details['doc']['metadata'] | |
if metadata.get('type', None) is None or (spec_type is not None and metadata["type"] != spec_type): | |
continue | |
if details['normalized_score'] < threshold / 100: | |
break | |
results_out.append(metadata) | |
if len(results_out) > 0: | |
return KeywordResponse( | |
results=results_out, | |
search_time=time.time() - start_time | |
) | |
else: | |
raise HTTPException(status_code=404, detail="Specifications not found") |