Spaces:
Building
Building
import asyncio | |
from pathlib import Path | |
import traceback | |
from typing import Dict, List, Literal, Tuple | |
from fastapi.routing import APIRouter | |
import logging | |
import io | |
import zipfile | |
import os | |
from httpx import AsyncClient | |
from pydantic import BaseModel | |
import requests | |
import subprocess | |
import pandas as pd | |
import re | |
import tempfile | |
from lxml import etree | |
from bs4 import BeautifulSoup | |
from fastapi import Depends, HTTPException | |
from dependencies import get_http_client, get_llm_router | |
from fastapi.responses import StreamingResponse | |
from litellm.router import Router | |
from schemas import DataRequest, DataResponse, DocRequirements, DocDownloadRequest, MeetingsRequest, MeetingsResponse, ExtractRequirementsRequest, ExtractRequirementsResponse | |
# API router for requirement extraction from docs / doc list retrieval / download | |
router = APIRouter(tags=["document extraction"]) | |
# ==================================================== Utilities ================================================================= | |
NSMAP = { | |
'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main', | |
'v': 'urn:schemas-microsoft-com:vml' | |
} | |
# ================================== Converting of files to .txt ==================================== | |
def convert_file(contents: io.BytesIO, filename: str, input_ext: str, output_ext: str, filter: str = None) -> io.BytesIO: | |
""" | |
Converts the given file bytes using Libreoffice headless to the specified file type. | |
Args: | |
contents: File contents | |
filename: File base name WITHOUT THE EXTENSION | |
input_ext: Input extension (WITHOUT THE DOT) | |
output_ext: Output extension (WITHOUT THE DOT) | |
filter: The conversion filter to use. | |
""" | |
with tempfile.TemporaryDirectory() as tmpdir: | |
dir_path = Path(tmpdir) | |
input_file_path = dir_path / f"{filename}.{input_ext}" | |
output_file_path = dir_path / f"{filename}.{output_ext}" | |
# write the memory contents to the input file | |
with open(input_file_path, "wb") as in_file: | |
in_file.write(contents.read()) | |
out_bytes = io.BytesIO() | |
# convert using libreoffice | |
subprocess.run([ | |
"libreoffice", | |
"--headless", | |
"--convert-to", f"{output_ext}:{filter}" if filter else output_ext, | |
"--outdir", tmpdir, | |
input_file_path | |
], check=True) | |
with open(output_file_path, mode="rb") as out: | |
out_bytes.write(out.read()) | |
out_bytes.seek(0) | |
return out_bytes | |
def get_docx_archive(url: str) -> zipfile.ZipFile: | |
"""Récupère le docx depuis l'URL et le retourne comme objet ZipFile""" | |
if not url.endswith("zip"): | |
raise ValueError("URL doit pointer vers un fichier ZIP") | |
doc_id = os.path.splitext(os.path.basename(url))[0] | |
resp = requests.get(url, verify=False, headers={ | |
"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
}) | |
resp.raise_for_status() | |
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: | |
for file_name in zf.namelist(): | |
if file_name.endswith(".docx"): | |
docx_bytes = zf.read(file_name) | |
return zipfile.ZipFile(io.BytesIO(docx_bytes)) | |
elif file_name.endswith(".doc"): | |
in_bytes = io.BytesIO(zf.read(file_name)) | |
docx_bytes = convert_file(in_bytes, doc_id, "doc", "docx") | |
return zipfile.ZipFile(docx_bytes) | |
raise ValueError("Aucun fichier docx/doc trouvé dans l'archive") | |
def apply_docx_revisions(docx_zip: zipfile.ZipFile) -> io.BytesIO: | |
""" | |
Applique les révisions des .docx avant de retourner le contenu | |
""" | |
try: | |
xml_bytes = docx_zip.read('word/document.xml') | |
except KeyError: | |
raise FileNotFoundError( | |
"word/document.xml not found in the DOCX archive.") | |
parser = etree.XMLParser(remove_blank_text=True) | |
root = etree.fromstring(xml_bytes, parser=parser) | |
# Suppression des balises <w:del> et leur contenu | |
for del_elem in root.xpath('//w:del', namespaces=NSMAP): | |
parent = del_elem.getparent() | |
if parent is not None: | |
parent.remove(del_elem) | |
# Désencapsulation des balises <w:ins> | |
for ins_elem in root.xpath('//w:ins', namespaces=NSMAP): | |
parent = ins_elem.getparent() | |
if parent is not None: | |
index = parent.index(ins_elem) | |
for child in ins_elem.iterchildren(): | |
parent.insert(index, child) | |
index += 1 | |
parent.remove(ins_elem) | |
# Nettoyage des commentaires | |
for tag in ['w:commentRangeStart', 'w:commentRangeEnd', 'w:commentReference']: | |
for elem in root.xpath(f'//{tag}', namespaces=NSMAP): | |
parent = elem.getparent() | |
if parent is not None: | |
parent.remove(elem) | |
# 3. Create a new docx with the modified XML | |
output = io.BytesIO() | |
with zipfile.ZipFile(output, 'w', compression=zipfile.ZIP_DEFLATED) as new_zip: | |
# Copier tous les fichiers non modifiés | |
for file_info in docx_zip.infolist(): | |
if file_info.filename != 'word/document.xml': | |
new_zip.writestr(file_info, docx_zip.read(file_info.filename)) | |
# Ajouter le document.xml modifié | |
xml_str = etree.tostring( | |
root, | |
xml_declaration=True, | |
encoding='UTF-8', | |
pretty_print=True | |
) | |
new_zip.writestr('word/document.xml', xml_str) | |
output.seek(0) | |
return output | |
def docx_to_txt(doc_id: str, url: str) -> str: | |
docx_zip = get_docx_archive(url) | |
modified_bytes = apply_docx_revisions(docx_zip) | |
final_bytes = convert_file( | |
modified_bytes, f"{doc_id}", "docx", "txt") | |
final_bytes_text = str(final_bytes.read(), encoding="utf-8") | |
txt_data = [line.strip() | |
for line in final_bytes_text.splitlines() if line.strip()] | |
return txt_data | |
# ============================================= Doc routes ========================================================= | |
async def get_meetings(req: MeetingsRequest, http_client: AsyncClient = Depends(get_http_client)): | |
# Extracting WG | |
working_group = req.working_group | |
tsg = re.sub(r"\d+", "", working_group) | |
wg_number = re.search(r"\d", working_group).group(0) | |
# building corresponding FTP url | |
logging.debug(tsg, wg_number) | |
url = "https://www.3gpp.org/ftp/tsg_" + tsg | |
logging.debug(url) | |
ftp_request = await http_client.get(url) | |
soup = BeautifulSoup(ftp_request.text, "html.parser") | |
meeting_folders = [] | |
all_meetings = [] | |
wg_folders = [item.get_text() for item in soup.select("tr td a")] | |
selected_folder = None | |
# sanity check to ensure the requested workgroup is present in the ftp directories | |
for folder in wg_folders: | |
if "wg" + str(wg_number) in folder.lower(): | |
selected_folder = folder | |
break | |
url += "/" + selected_folder | |
logging.debug(url) | |
if selected_folder: | |
resp = await http_client.get(url) | |
soup = BeautifulSoup(resp.text, "html.parser") | |
meeting_folders = [item.get_text() for item in soup.select("tr td a") if item.get_text( | |
).startswith("TSG") or (item.get_text().startswith("CT") and "-" in item.get_text())] | |
all_meetings = [working_group + "#" + meeting.split("_", 1)[1].replace("_", " ").replace( | |
"-", " ") if meeting.startswith('TSG') else meeting.replace("-", "#") for meeting in meeting_folders] | |
return MeetingsResponse(meetings=dict(zip(all_meetings, meeting_folders))) | |
# ============================================================================================================================================ | |
async def get_docs_df(req: DataRequest, http_client: AsyncClient = Depends(get_http_client)): | |
""" | |
Downloads the document list dataframe for a given meeting | |
""" | |
# Extracting WG | |
working_group = req.working_group | |
tsg = re.sub(r"\d+", "", working_group) | |
wg_number = re.search(r"\d", working_group).group(0) | |
url = "https://www.3gpp.org/ftp/tsg_" + tsg | |
logging.info("Fetching TDocs dataframe") | |
resp = await http_client.get(url) | |
soup = BeautifulSoup(resp.text, "html.parser") | |
wg_folders = [item.get_text() for item in soup.select("tr td a")] | |
selected_folder = None | |
for folder in wg_folders: | |
if "wg" + str(wg_number) in folder.lower(): | |
selected_folder = folder | |
break | |
url += "/" + selected_folder + "/" + req.meeting + "/docs" | |
resp = await http_client.get(url) | |
soup = BeautifulSoup(resp.text, "html.parser") | |
files = [item.get_text() for item in soup.select("tr td a") | |
if item.get_text().endswith(".xlsx")] | |
if files == []: | |
raise HTTPException(status_code=404, detail="No XLSX has been found") | |
def gen_url(tdoc: str): | |
return f"{url}/{tdoc}.zip" | |
df = pd.read_excel(str(url + "/" + files[0]).replace("#", "%23")) | |
filtered_df = df[~( | |
df["Uploaded"].isna())][["TDoc", "Title", "CR category", "Source", "Type", "Agenda item", "Agenda item description", "TDoc Status"]] | |
filtered_df["URL"] = filtered_df["TDoc"].apply(gen_url) | |
df = filtered_df.fillna("") | |
return DataResponse(data=df[["TDoc", "Title", "Type", "TDoc Status", "Agenda item description", "URL"]].to_dict(orient="records")) | |
# ================================================================================================================================== | |
def download_tdocs(req: DocDownloadRequest): | |
"""Download the specified TDocs and zips them in a single archive""" | |
# Document IDs to download | |
document_ids = [doc.document for doc in req.documents] | |
logging.info(f"Downloading TDocs: {document_ids}") | |
documents_content: Dict[str, bytes] = {} | |
failed_documents: List[str] = [] | |
def _process_single_document(doc_id: str, doc_url: str) -> Tuple[bool, bytes]: | |
"""Attempts to convert a document to text and returns success status and content.""" | |
try: | |
text_lines = docx_to_txt(doc_id, doc_url) | |
content_bytes = "\n".join(text_lines).encode("utf-8") | |
return content_bytes | |
except Exception as e: | |
logging.warning( | |
f"Failed to process document '{doc_id}' from URL '{doc_url}': {e}") | |
error_message = f"Document '{doc_id}' text extraction failed: {e}".encode( | |
"utf-8") | |
return error_message | |
for doc in req.documents: | |
content = _process_single_document(doc.document, doc.url) | |
documents_content[doc.document] = content | |
zip_buffer = io.BytesIO() | |
with zipfile.ZipFile(zip_buffer, mode='w', compression=zipfile.ZIP_DEFLATED) as zip_file: | |
for doc_id, content_bytes in documents_content.items(): | |
safe_filename = f"{doc_id}.txt" | |
zip_file.writestr(safe_filename, content_bytes) | |
zip_buffer.seek(0) | |
return StreamingResponse( | |
zip_buffer, | |
media_type="application/zip", | |
headers={"Content-Disposition": "attachment; filename=tdocs.zip"} | |
) | |
# ====================================================================================================================================================================================== | |
class ProgressUpdate(BaseModel): | |
"""Defines the structure of a single SSE message.""" | |
status: Literal["progress", "complete"] | |
data: dict | |
total_docs: int | |
processed_docs: int | |
async def gen_reqs(req: ExtractRequirementsRequest, llm_router: Router = Depends(get_llm_router)): | |
"""Extract requirements from the specified xxxxCR docs using a LLM and returns SSE events about the progress of ongoing operations""" | |
documents = req.documents | |
n_docs = len(documents) | |
logging.info( | |
"Generating requirements for documents: {}".format(req.documents)) | |
# limit max concurrency of LLM requests to prevent a huge pile of errors because of small rate limits | |
concurrency_sema = asyncio.Semaphore(4) | |
def prompt(doc_id, full): | |
return f"Here's the document whose ID is {doc_id} : {full}\n\nExtract all requirements and group them by context, returning a list of objects where each object includes a document ID, a concise description of the context where the requirements apply (not a chapter title or copied text), and a list of associated requirements; always return the result as a list, even if only one context is found. Remove the errors" | |
async def _process_document(doc) -> list[DocRequirements]: | |
doc_id = doc.document | |
url = doc.url | |
# convert the docx to txt for use | |
try: | |
full = "\n".join(docx_to_txt(doc_id, url)) | |
except Exception as e: | |
fmt = "".join(traceback.format_exception(e)) | |
logging.error(f"Failed to process doc {doc_id} : {fmt}") | |
return [DocRequirements(document=doc_id, context="Failed to process document", requirements=[])] | |
try: | |
await concurrency_sema.acquire() | |
model_used = "gemini-v2" | |
resp_ai = await llm_router.acompletion( | |
model=model_used, | |
messages=[ | |
{"role": "user", "content": prompt(doc_id, full)}], | |
response_format=ExtractRequirementsResponse | |
) | |
return ExtractRequirementsResponse.model_validate_json(resp_ai.choices[0].message.content).requirements | |
except Exception as e: | |
return [DocRequirements(document=doc_id, context="Error LLM", requirements=[])] | |
finally: | |
concurrency_sema.release() | |
# futures for all processed documents | |
process_futures = [_process_document(doc) for doc in documents] | |
# lambda to print progress | |
def progress_update(x): return f"data: {x.model_dump_json()}\n\n" | |
# async generator that generates the SSE events for progress | |
async def _stream_generator(docs: list[asyncio.Future]): | |
items = [] | |
n_processed = 0 | |
yield progress_update(ProgressUpdate(status="progress", data={}, total_docs=n_docs, processed_docs=0)) | |
for doc in asyncio.as_completed(docs): | |
result = await doc | |
items.extend(result) | |
n_processed += 1 | |
yield progress_update(ProgressUpdate(status="progress", data={}, total_docs=n_docs, processed_docs=n_processed)) | |
final_response = ExtractRequirementsResponse(requirements=items) | |
yield progress_update(ProgressUpdate(status="complete", data=final_response.model_dump(), total_docs=n_docs, processed_docs=n_processed)) | |
return StreamingResponse(_stream_generator(process_futures), media_type="text/event-stream") | |