|
import asyncio |
|
from typing import Dict, List, Literal, Tuple |
|
from fastapi.routing import APIRouter |
|
import logging |
|
import string |
|
import io |
|
import traceback |
|
import zipfile |
|
import json |
|
import os |
|
from httpx import AsyncClient |
|
from pydantic import BaseModel |
|
import requests |
|
import subprocess |
|
import pandas as pd |
|
import re |
|
from lxml import etree |
|
from nltk.tokenize import word_tokenize |
|
from bs4 import BeautifulSoup |
|
from nltk.corpus import stopwords |
|
from nltk.stem import WordNetLemmatizer |
|
from fastapi import Depends, BackgroundTasks, HTTPException, Request |
|
from dependencies import DOC_FINDER_BASE_URL, get_http_client, get_llm_router |
|
from fastapi.responses import StreamingResponse |
|
from litellm.router import Router |
|
|
|
from schemas import DataRequest, DataResponse, DocRequirements, DocDownloadRequest, MeetingsRequest, MeetingsResponse, ExtractRequirementsRequest, ExtractRequirementsResponse |
|
|
|
|
|
router = APIRouter(tags=["document extraction"]) |
|
|
|
|
|
|
|
lemmatizer = WordNetLemmatizer() |
|
|
|
NSMAP = { |
|
'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main', |
|
'v': 'urn:schemas-microsoft-com:vml' |
|
} |
|
|
|
|
|
def lemma(text: str): |
|
stop_words = set(stopwords.words('english')) |
|
txt = text.translate(str.maketrans('', '', string.punctuation)).strip() |
|
tokens = [token for token in word_tokenize( |
|
txt.lower()) if token not in stop_words] |
|
return [lemmatizer.lemmatize(token) for token in tokens] |
|
|
|
|
|
def get_docx_archive(url: str) -> zipfile.ZipFile: |
|
"""Récupère le docx depuis l'URL et le retourne comme objet ZipFile""" |
|
if not url.endswith("zip"): |
|
raise ValueError("URL doit pointer vers un fichier ZIP") |
|
doc_id = os.path.splitext(os.path.basename(url))[0] |
|
resp = requests.get(url, verify=False, headers={ |
|
"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' |
|
}) |
|
resp.raise_for_status() |
|
|
|
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: |
|
for file_name in zf.namelist(): |
|
if file_name.endswith(".docx"): |
|
docx_bytes = zf.read(file_name) |
|
return zipfile.ZipFile(io.BytesIO(docx_bytes)) |
|
elif file_name.endswith(".doc"): |
|
input_path = f"/tmp/{doc_id}.doc" |
|
output_path = f"/tmp/{doc_id}.docx" |
|
docx_bytes = zf.read(file_name) |
|
|
|
with open(input_path, "wb") as f: |
|
f.write(docx_bytes) |
|
|
|
subprocess.run([ |
|
"libreoffice", |
|
"--headless", |
|
"--convert-to", "docx", |
|
"--outdir", "/tmp", |
|
input_path |
|
], check=True) |
|
|
|
with open(output_path, "rb") as f: |
|
docx_bytes = f.read() |
|
|
|
os.remove(input_path) |
|
os.remove(output_path) |
|
|
|
return zipfile.ZipFile(io.BytesIO(docx_bytes)) |
|
|
|
raise ValueError("Aucun fichier docx/doc trouvé dans l'archive") |
|
|
|
|
|
def parse_document_xml(docx_zip: zipfile.ZipFile) -> etree._ElementTree: |
|
"""Parse le document.xml principal""" |
|
xml_bytes = docx_zip.read('word/document.xml') |
|
parser = etree.XMLParser(remove_blank_text=True) |
|
return etree.fromstring(xml_bytes, parser=parser) |
|
|
|
|
|
def clean_document_xml(root: etree._Element) -> None: |
|
"""Nettoie le XML en modifiant l'arbre directement""" |
|
|
|
for del_elem in root.xpath('//w:del', namespaces=NSMAP): |
|
parent = del_elem.getparent() |
|
if parent is not None: |
|
parent.remove(del_elem) |
|
|
|
|
|
for ins_elem in root.xpath('//w:ins', namespaces=NSMAP): |
|
parent = ins_elem.getparent() |
|
index = parent.index(ins_elem) |
|
for child in ins_elem.iterchildren(): |
|
parent.insert(index, child) |
|
index += 1 |
|
parent.remove(ins_elem) |
|
|
|
|
|
for tag in ['w:commentRangeStart', 'w:commentRangeEnd', 'w:commentReference']: |
|
for elem in root.xpath(f'//{tag}', namespaces=NSMAP): |
|
parent = elem.getparent() |
|
if parent is not None: |
|
parent.remove(elem) |
|
|
|
|
|
def create_modified_docx(original_zip: zipfile.ZipFile, modified_root: etree._Element) -> bytes: |
|
"""Crée un nouveau docx avec le XML modifié""" |
|
output = io.BytesIO() |
|
|
|
with zipfile.ZipFile(output, 'w', compression=zipfile.ZIP_DEFLATED) as new_zip: |
|
|
|
for file in original_zip.infolist(): |
|
if file.filename != 'word/document.xml': |
|
new_zip.writestr(file, original_zip.read(file.filename)) |
|
|
|
|
|
xml_str = etree.tostring( |
|
modified_root, |
|
xml_declaration=True, |
|
encoding='UTF-8', |
|
pretty_print=True |
|
) |
|
new_zip.writestr('word/document.xml', xml_str) |
|
|
|
output.seek(0) |
|
return output.getvalue() |
|
|
|
|
|
def docx_to_txt(doc_id: str, url: str): |
|
docx_zip = get_docx_archive(url) |
|
root = parse_document_xml(docx_zip) |
|
clean_document_xml(root) |
|
modified_bytes = create_modified_docx(docx_zip, root) |
|
|
|
input_path = f"/tmp/{doc_id}_cleaned.docx" |
|
output_path = f"/tmp/{doc_id}_cleaned.txt" |
|
with open(input_path, "wb") as f: |
|
f.write(modified_bytes) |
|
|
|
subprocess.run([ |
|
"libreoffice", |
|
"--headless", |
|
"--convert-to", "txt", |
|
"--outdir", "/tmp", |
|
input_path |
|
], check=True) |
|
|
|
with open(output_path, "r", encoding="utf-8") as f: |
|
txt_data = [line.strip() for line in f if line.strip()] |
|
|
|
os.remove(input_path) |
|
os.remove(output_path) |
|
return txt_data |
|
|
|
|
|
|
|
|
|
@router.post("/get_meetings", response_model=MeetingsResponse) |
|
async def get_meetings(req: MeetingsRequest, http_client: AsyncClient = Depends(get_http_client)): |
|
|
|
working_group = req.working_group |
|
tsg = re.sub(r"\d+", "", working_group) |
|
wg_number = re.search(r"\d", working_group).group(0) |
|
|
|
|
|
logging.debug(tsg, wg_number) |
|
url = "https://www.3gpp.org/ftp/tsg_" + tsg |
|
logging.debug(url) |
|
|
|
ftp_request = await http_client.get(url) |
|
soup = BeautifulSoup(ftp_request.text, "html.parser") |
|
|
|
meeting_folders = [] |
|
all_meetings = [] |
|
wg_folders = [item.get_text() for item in soup.select("tr td a")] |
|
selected_folder = None |
|
|
|
|
|
for folder in wg_folders: |
|
if "wg" + str(wg_number) in folder.lower(): |
|
selected_folder = folder |
|
break |
|
|
|
url += "/" + selected_folder |
|
logging.debug(url) |
|
|
|
if selected_folder: |
|
resp = await http_client.get(url) |
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
meeting_folders = [item.get_text() for item in soup.select("tr td a") if item.get_text( |
|
).startswith("TSG") or (item.get_text().startswith("CT") and "-" in item.get_text())] |
|
all_meetings = [working_group + "#" + meeting.split("_", 1)[1].replace("_", " ").replace( |
|
"-", " ") if meeting.startswith('TSG') else meeting.replace("-", "#") for meeting in meeting_folders] |
|
|
|
return MeetingsResponse(meetings=dict(zip(all_meetings, meeting_folders))) |
|
|
|
|
|
|
|
|
|
@router.post("/get_dataframe", response_model=DataResponse) |
|
async def get_docs_df(req: DataRequest, http_client: AsyncClient = Depends(get_http_client)): |
|
""" |
|
Downloads the document list dataframe for a given meeting |
|
""" |
|
|
|
|
|
working_group = req.working_group |
|
tsg = re.sub(r"\d+", "", working_group) |
|
wg_number = re.search(r"\d", working_group).group(0) |
|
url = "https://www.3gpp.org/ftp/tsg_" + tsg |
|
logging.info("Fetching TDocs dataframe") |
|
|
|
resp = await http_client.get(url) |
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
wg_folders = [item.get_text() for item in soup.select("tr td a")] |
|
selected_folder = None |
|
for folder in wg_folders: |
|
if "wg" + str(wg_number) in folder.lower(): |
|
selected_folder = folder |
|
break |
|
|
|
url += "/" + selected_folder + "/" + req.meeting + "/docs" |
|
resp = await http_client.get(url) |
|
soup = BeautifulSoup(resp.text, "html.parser") |
|
files = [item.get_text() for item in soup.select("tr td a") |
|
if item.get_text().endswith(".xlsx")] |
|
|
|
if files == []: |
|
raise HTTPException(status_code=404, detail="No XLSX has been found") |
|
|
|
def gen_url(tdoc: str): |
|
return f"{url}/{tdoc}.zip" |
|
|
|
df = pd.read_excel(str(url + "/" + files[0]).replace("#", "%23")) |
|
filtered_df = df[~( |
|
df["Uploaded"].isna())][["TDoc", "Title", "CR category", "Source", "Type", "Agenda item", "Agenda item description", "TDoc Status"]] |
|
filtered_df["URL"] = filtered_df["TDoc"].apply(gen_url) |
|
|
|
df = filtered_df.fillna("") |
|
return DataResponse(data=df[["TDoc", "Title", "Type", "TDoc Status", "Agenda item description", "URL"]].to_dict(orient="records")) |
|
|
|
|
|
|
|
|
|
@router.post("/download_tdocs") |
|
def download_tdocs(req: DocDownloadRequest): |
|
"""Download the specified TDocs and zips them in a single archive""" |
|
|
|
|
|
document_ids = [doc.document for doc in req.documents] |
|
|
|
logging.info(f"Downloading TDocs: {document_ids}") |
|
|
|
|
|
doc_urls_req = requests.post(DOC_FINDER_BASE_URL + "find/batch", |
|
headers={ |
|
"Content-Type": "application/json" |
|
}, |
|
data=json.dumps({ |
|
"doc_ids": document_ids |
|
}), |
|
verify=False) |
|
|
|
doc_urls_req.raise_for_status() |
|
doc_urls = doc_urls_req.json() |
|
|
|
|
|
if len(doc_urls["results"]) == 0: |
|
logging.warning( |
|
f"Got no URL results for docs {document_ids}. 3GPP index may not be up to date") |
|
|
|
raise HTTPException( |
|
status_code=501, detail="Got no URL results for docs {documents}. 3GPP index may not be up to date") |
|
|
|
documents_content: Dict[str, bytes] = {} |
|
failed_documents: List[str] = [] |
|
|
|
def _process_single_document(doc_id: str, doc_url: str) -> Tuple[bool, bytes]: |
|
"""Attempts to convert a document to text and returns success status and content.""" |
|
try: |
|
text_lines = docx_to_txt(doc_id, doc_url) |
|
content_bytes = "\n".join(text_lines).encode("utf-8") |
|
return True, content_bytes |
|
except Exception as e: |
|
logging.warning( |
|
f"Failed to process document '{doc_id}' from URL '{doc_url}': {e}") |
|
error_message = f"Document '{doc_id}' text extraction failed: {e}".encode( |
|
"utf-8") |
|
return False, error_message |
|
|
|
for doc_id, doc_url in doc_urls["results"].items(): |
|
success, content = _process_single_document(doc_id, doc_url) |
|
documents_content[doc_id] = content |
|
if not success: |
|
failed_documents.append(doc_id) |
|
|
|
|
|
for requested_doc_id in document_ids: |
|
if requested_doc_id not in documents_content: |
|
error_msg = ( |
|
f"Failed to retrieve or process document '{requested_doc_id}'. " |
|
"The 3GPP index may not be up to date, or the document might be unavailable." |
|
).encode("utf-8") |
|
|
|
documents_content[requested_doc_id] = error_msg |
|
logging.warning( |
|
f"Document '{requested_doc_id}' was requested but not found or processed.") |
|
if requested_doc_id not in failed_documents: |
|
failed_documents.append(requested_doc_id) |
|
|
|
zip_buffer = io.BytesIO() |
|
with zipfile.ZipFile(zip_buffer, mode='w', compression=zipfile.ZIP_DEFLATED) as zip_file: |
|
for doc_id, content_bytes in documents_content.items(): |
|
safe_filename = f"{doc_id}.txt" |
|
zip_file.writestr(safe_filename, content_bytes) |
|
|
|
zip_buffer.seek(0) |
|
|
|
return StreamingResponse( |
|
zip_buffer, |
|
media_type="application/zip", |
|
headers={"Content-Disposition": "attachment; filename=tdocs.zip"} |
|
) |
|
|
|
|
|
|
|
|
|
class ProgressUpdate(BaseModel): |
|
"""Defines the structure of a single SSE message.""" |
|
status: Literal["progress", "complete"] |
|
data: dict |
|
total_docs: int |
|
processed_docs: int |
|
|
|
|
|
@router.post("/generate_requirements/sse") |
|
async def gen_reqs(req: ExtractRequirementsRequest, llm_router: Router = Depends(get_llm_router)): |
|
"""Extract requirements from the specified xxxxCR docs using a LLM and returns SSE events about the progress of ongoing operations""" |
|
|
|
documents = req.documents |
|
n_docs = len(documents) |
|
|
|
logging.info("Generating requirements for documents: {}".format(req.documents)) |
|
|
|
|
|
concurrency_sema = asyncio.Semaphore(4) |
|
|
|
def prompt(doc_id, full): |
|
return f"Here's the document whose ID is {doc_id} : {full}\n\nExtract all requirements and group them by context, returning a list of objects where each object includes a document ID, a concise description of the context where the requirements apply (not a chapter title or copied text), and a list of associated requirements; always return the result as a list, even if only one context is found. Remove the errors" |
|
|
|
async def _process_document(doc) -> list[DocRequirements]: |
|
doc_id = doc.document |
|
url = doc.url |
|
|
|
|
|
try: |
|
full = "\n".join(docx_to_txt(doc_id, url)) |
|
except Exception as e: |
|
logging.error( |
|
f"Failed to process document {doc_id}", e, stack_info=True) |
|
return [DocRequirements(document=doc_id, context="Error LLM", requirements=[])] |
|
|
|
try: |
|
await concurrency_sema.acquire() |
|
|
|
model_used = "gemini-v2" |
|
resp_ai = await llm_router.acompletion( |
|
model=model_used, |
|
messages=[ |
|
{"role": "user", "content": prompt(doc_id, full)}], |
|
response_format=ExtractRequirementsResponse |
|
) |
|
return ExtractRequirementsResponse.model_validate_json(resp_ai.choices[0].message.content).requirements |
|
except Exception as e: |
|
return [DocRequirements(document=doc_id, context="Error LLM", requirements=[])] |
|
finally: |
|
concurrency_sema.release() |
|
|
|
|
|
process_futures = [_process_document(doc) for doc in documents] |
|
|
|
|
|
def progress_update(x): return f"data: {x.model_dump_json()}\n\n" |
|
|
|
|
|
async def _stream_generator(docs: list[asyncio.Future]): |
|
items = [] |
|
n_processed = 0 |
|
|
|
yield progress_update(ProgressUpdate(status="progress", data={}, total_docs=n_docs, processed_docs=0)) |
|
|
|
for doc in asyncio.as_completed(docs): |
|
result = await doc |
|
items.extend(result) |
|
n_processed += 1 |
|
yield progress_update(ProgressUpdate(status="progress", data={}, total_docs=n_docs, processed_docs=n_processed)) |
|
|
|
final_response = ExtractRequirementsResponse(requirements=items) |
|
|
|
yield progress_update(ProgressUpdate(status="complete", data=final_response.model_dump(), total_docs=n_docs, processed_docs=n_processed)) |
|
|
|
return StreamingResponse(_stream_generator(process_futures), media_type="text/event-stream") |
|
|