Spaces:
Running
Running
import time | |
from datetime import datetime | |
import os, warnings, nltk, json, subprocess | |
import numpy as np | |
from nltk.stem import WordNetLemmatizer | |
from dotenv import load_dotenv | |
from sklearn.preprocessing import MinMaxScaler | |
os.environ['CURL_CA_BUNDLE'] = "" | |
warnings.filterwarnings('ignore') | |
nltk.download('wordnet') | |
load_dotenv() | |
from datasets import load_dataset | |
import bm25s | |
from bm25s.hf import BM25HF | |
from fastapi import FastAPI, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
from fastapi.responses import FileResponse | |
from fastapi.staticfiles import StaticFiles | |
from schemas import * | |
from classes import * | |
from bs4 import BeautifulSoup | |
import requests | |
lemmatizer = WordNetLemmatizer() | |
spec_metadatas_3gpp = load_dataset("OrganizedProgrammers/3GPPSpecMetadata", token=os.environ["HF_TOKEN"]) | |
spec_contents_3gpp = load_dataset("OrganizedProgrammers/3GPPSpecContent", token=os.environ["HF_TOKEN"]) | |
tdoc_locations_3gpp = load_dataset("OrganizedProgrammers/3GPPTDocLocation", token=os.environ["HF_TOKEN"]) | |
spec_metadatas_etsi = load_dataset("OrganizedProgrammers/ETSISpecMetadata", token=os.environ["HF_TOKEN"]) | |
spec_contents_etsi = load_dataset("OrganizedProgrammers/ETSISpecContent", token=os.environ["HF_TOKEN"]) | |
spec_contents_3gpp = spec_contents_3gpp["train"].to_list() | |
spec_metadatas_3gpp = spec_metadatas_3gpp["train"].to_list() | |
spec_contents_etsi = spec_contents_etsi["train"].to_list() | |
spec_metadatas_etsi = spec_metadatas_etsi["train"].to_list() | |
tdoc_locations = tdoc_locations_3gpp["train"].to_list() | |
bm25_index_3gpp = BM25HF.load_from_hub("OrganizedProgrammers/3GPPBM25IndexSingle", load_corpus=True, token=os.environ["HF_TOKEN"]) | |
bm25_index_etsi = BM25HF.load_from_hub("OrganizedProgrammers/ETSIBM25IndexSingle", load_corpus=True, token=os.environ["HF_TOKEN"]) | |
def get_docs_from_url(url): | |
"""Get list of documents/directories from a URL""" | |
try: | |
response = requests.get(url, verify=False, timeout=10) | |
soup = BeautifulSoup(response.text, "html.parser") | |
return [item.get_text() for item in soup.select("tr td a")] | |
except Exception as e: | |
print(f"Error accessing {url}: {e}") | |
return [] | |
def get_tdoc_url(doc_id): | |
for tdoc in tdoc_locations: | |
if tdoc["doc_id"] == doc_id: | |
return tdoc["url"] | |
return "Document not indexed (Re-index TDocs)" | |
def get_spec_url(document): | |
series = document.split(".")[0].zfill(2) | |
url = f"https://www.3gpp.org/ftp/Specs/archive/{series}_series/{document}" | |
versions = get_docs_from_url(url) | |
return url + "/" + versions[-1] if versions != [] else f"Specification {document} not found" | |
def get_document(spec_id: str, spec_title: str, source: str): | |
text = [f"{spec_id} - {spec_title}"] | |
spec_contents = spec_contents_3gpp if source == "3GPP" else spec_contents_etsi if source == "ETSI" else spec_contents_3gpp + spec_contents_etsi | |
for section in spec_contents: | |
if not isinstance(section, str) and spec_id == section["doc_id"]: | |
text.extend([section['section'], section['content']]) | |
return text | |
app = FastAPI(title="Document Finder Back-End", docs_url="/", description="Backend for DocFinder - Searching technical documents & specifications from 3GPP & ETSI") | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
etsi_doc_finder = ETSIDocFinder() | |
etsi_spec_finder = ETSISpecFinder() | |
valid_3gpp_doc_format = re.compile(r'^(S[1-6P]|C[1-6P]|R[1-6P])-\d+', flags=re.IGNORECASE) | |
valid_3gpp_spec_format = re.compile(r'^\d{2}\.\d{3}(?:-\d+)?') | |
valid_etsi_doc_format = re.compile(r'^(?:SET|SCP|SETTEC|SETREQ|SCPTEC|SCPREQ)\(\d+\)\d+(?:r\d+)?', flags=re.IGNORECASE) | |
valid_etsi_spec_format = re.compile(r'^\d{3} \d{3}(?:-\d+)?') | |
def find_document(request: DocRequest): | |
start_time = time.time() | |
document = request.doc_id | |
source = request.source | |
spec_metadatas = spec_metadatas_3gpp if source == "3GPP" else spec_metadatas_etsi if source == "ETSI" else spec_metadatas_3gpp + spec_metadatas_etsi | |
is_3gpp = valid_3gpp_doc_format.match(document) or valid_3gpp_spec_format.match(document) | |
url = get_tdoc_url(document) if valid_3gpp_doc_format.match(document) else \ | |
get_spec_url(document) if valid_3gpp_spec_format.match(document) else \ | |
etsi_doc_finder.search_document(document) if valid_etsi_doc_format.match(document) else \ | |
etsi_spec_finder.search_document(document) if valid_etsi_spec_format.match(document) else "Document ID not supported" | |
if "Specification" in url or "Document" in url: | |
raise HTTPException(status_code=404, detail=url) | |
version = None | |
if is_3gpp: | |
version = url.split("/")[-1].replace(".zip", "").split("-")[-1] | |
scope = None | |
for spec in spec_metadatas: | |
if spec['id'] == document: | |
scope = spec['scope'] | |
break | |
return DocResponse( | |
doc_id=document, | |
version=version, | |
url=url, | |
search_time=time.time() - start_time, | |
scope=scope | |
) | |
def find_document_batch(request: BatchDocRequest): | |
start_time = time.time() | |
documents = request.doc_ids | |
results = {} | |
missing = [] | |
for document in documents: | |
url = get_tdoc_url(document) if valid_3gpp_doc_format.match(document) else \ | |
get_spec_url(document) if valid_3gpp_spec_format.match(document) else \ | |
etsi_doc_finder.search_document(document) if valid_etsi_doc_format.match(document) else \ | |
etsi_spec_finder.search_document(document) if valid_etsi_spec_format.match(document) else "Document ID not supported" | |
if "Specification" in url or "Document" in url: | |
missing.append(document) | |
else: | |
results[document] = url | |
return BatchDocResponse( | |
results=results, | |
missing=missing, | |
search_time=time.time()-start_time | |
) | |
def search_specifications(request: KeywordRequest): | |
start_time = time.time() | |
boolSensitiveCase = request.case_sensitive | |
search_mode = request.search_mode | |
source = request.source | |
spec_metadatas = spec_metadatas_3gpp if source == "3GPP" else spec_metadatas_etsi if source == "ETSI" else spec_metadatas_3gpp + spec_metadatas_etsi | |
spec_type = request.spec_type | |
keywords = [string.lower() if boolSensitiveCase else string for string in request.keywords.split(",")] | |
print(keywords) | |
unique_specs = set() | |
results = [] | |
if keywords == [""] and search_mode == "deep": | |
raise HTTPException(status_code=400, detail="You must enter keywords in deep search mode !") | |
for spec in spec_metadatas: | |
valid = False | |
if spec['id'] in unique_specs: continue | |
if spec.get('type', None) is None or (spec_type is not None and spec["type"] != spec_type): continue | |
if search_mode == "deep": | |
contents = [] | |
doc = get_document(spec["id"], spec["title"], source) | |
docValid = len(doc) > 1 | |
if request.mode == "and": | |
string = f"{spec['id']}+-+{spec['title']}+-+{spec['type']}+-+{spec['version']}" | |
if all(keyword in (string.lower() if boolSensitiveCase else string) for keyword in keywords): | |
valid = True | |
if search_mode == "deep": | |
if docValid: | |
for x in range(1, len(doc) - 1, 2): | |
section_title = doc[x] | |
section_content = doc[x+1] | |
if "reference" not in section_title.lower() and "void" not in section_title.lower() and "annex" not in section_content.lower(): | |
if all(keyword in (section_content.lower() if boolSensitiveCase else section_content) for keyword in keywords): | |
valid = True | |
contents.append({section_title: section_content}) | |
elif request.mode == "or": | |
string = f"{spec['id']}+-+{spec['title']}+-+{spec['type']}+-+{spec['version']}" | |
if any(keyword in (string.lower() if boolSensitiveCase else string) for keyword in keywords): | |
valid = True | |
if search_mode == "deep": | |
if docValid: | |
for x in range(1, len(doc) - 1, 2): | |
section_title = doc[x] | |
section_content = doc[x+1] | |
if "reference" not in section_title.lower() and "void" not in section_title.lower() and "annex" not in section_content.lower(): | |
if any(keyword in (section_content.lower() if boolSensitiveCase else section_content) for keyword in keywords): | |
valid = True | |
contents.append({section_title: section_content}) | |
if valid: | |
spec_content = spec | |
if search_mode == "deep": | |
spec_content["contains"] = {k: v for d in contents for k, v in d.items()} | |
results.append(spec_content) | |
else: | |
unique_specs.add(spec['id']) | |
if len(results) > 0: | |
return KeywordResponse( | |
results=results, | |
search_time=time.time() - start_time | |
) | |
else: | |
raise HTTPException(status_code=404, detail="Specifications not found") | |
def bm25_search_specification(request: BM25KeywordRequest): | |
start_time = time.time() | |
source = request.source | |
spec_type = request.spec_type | |
threshold = request.threshold | |
query = request.keywords | |
results_out = [] | |
query_tokens = bm25s.tokenize(query) | |
if source == "3GPP": | |
results, scores = bm25_index_3gpp.retrieve(query_tokens, k=len(bm25_index_3gpp.corpus)) | |
elif source == "ETSI": | |
results, scores = bm25_index_etsi.retrieve(query_tokens, k=len(bm25_index_etsi.corpus)) | |
else: | |
print(len(bm25_index_3gpp.corpus), len(bm25_index_etsi.corpus)) | |
results1, scores1 = bm25_index_3gpp.retrieve(query_tokens, k=len(bm25_index_3gpp.corpus)) | |
results2, scores2 = bm25_index_etsi.retrieve(query_tokens, k=len(bm25_index_etsi.corpus)) | |
results = np.concatenate([results1, results2], axis=1) | |
scores = np.concatenate([scores1, scores2], axis=1) | |
def calculate_boosted_score(metadata, score, query): | |
title = set(metadata['title'].lower().split()) | |
q = set(query.lower().split()) | |
spec_id_presence = 0.5 if metadata['id'].lower() in q else 0 | |
booster = len(q & title) * 0.5 | |
return score + spec_id_presence + booster | |
spec_scores = {} | |
spec_indices = {} | |
spec_details = {} | |
for i in range(results.shape[1]): | |
doc = results[0, i] | |
score = scores[0, i] | |
spec = doc["metadata"]["id"] | |
boosted_score = calculate_boosted_score(doc['metadata'], score, query) | |
if spec not in spec_scores or boosted_score > spec_scores[spec]: | |
spec_scores[spec] = boosted_score | |
spec_indices[spec] = i | |
spec_details[spec] = { | |
'original_score': score, | |
'boosted_score': boosted_score, | |
'doc': doc | |
} | |
def normalize_scores(scores_dict): | |
if not scores_dict: | |
return {} | |
scores_array = np.array(list(scores_dict.values())).reshape(-1, 1) | |
scaler = MinMaxScaler() | |
normalized_scores = scaler.fit_transform(scores_array).flatten() | |
normalized_dict = {} | |
for i, spec in enumerate(scores_dict.keys()): | |
normalized_dict[spec] = normalized_scores[i] | |
return normalized_dict | |
normalized_scores = normalize_scores(spec_scores) | |
for spec in spec_details: | |
spec_details[spec]["normalized_score"] = normalized_scores[spec] | |
unique_specs = sorted(normalized_scores.keys(), key=lambda x: normalized_scores[x], reverse=True) | |
for rank, spec in enumerate(unique_specs, 1): | |
details = spec_details[spec] | |
metadata = details['doc']['metadata'] | |
if metadata.get('type', None) is None or (spec_type is not None and metadata["type"] != spec_type): | |
continue | |
if details['normalized_score'] < threshold / 100: | |
break | |
results_out.append(metadata) | |
if len(results_out) > 0: | |
return KeywordResponse( | |
results=results_out, | |
search_time=time.time() - start_time | |
) | |
else: | |
raise HTTPException(status_code=404, detail="Specifications not found") |