from bs4 import BeautifulSoup import warnings import io import zipfile from lxml import etree import os from dotenv import load_dotenv import requests import subprocess import string from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from concurrent.futures import ThreadPoolExecutor, as_completed import json import traceback from fastapi import FastAPI, BackgroundTasks, HTTPException from fastapi.staticfiles import StaticFiles from schemas import * from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse, StreamingResponse from litellm.router import Router from aiolimiter import AsyncLimiter import pandas as pd import asyncio import logging import re import nltk load_dotenv() logging.basicConfig( level=logging.INFO, format='[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d]: %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) nltk.download('stopwords') nltk.download('punkt_tab') nltk.download('wordnet') warnings.filterwarnings("ignore") app = FastAPI(title="Requirements Extractor") app.mount("/static", StaticFiles(directory="static"), name="static") app.add_middleware(CORSMiddleware, allow_credentials=True, allow_headers=[ "*"], allow_methods=["*"], allow_origins=["*"]) llm_router = Router(model_list=[ { "model_name": "gemini-v1", "litellm_params": { "model": "gemini/gemini-2.0-flash", "api_key": os.environ.get("GEMINI"), "max_retries": 10, "rpm": 15, "allowed_fails": 1, "cooldown": 30, } }, { "model_name": "gemini-v2", "litellm_params": { "model": "gemini/gemini-2.5-flash", "api_key": os.environ.get("GEMINI"), "max_retries": 10, "rpm": 10, "allowed_fails": 1, "cooldown": 30, } }], fallbacks=[{"gemini-v2": ["gemini-v1"]}], num_retries=10, retry_after=30) limiter_mapping = { model["model_name"]: AsyncLimiter(model["litellm_params"]["rpm"], 60) for model in llm_router.model_list } lemmatizer = WordNetLemmatizer() NSMAP = { 'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main', 'v': 'urn:schemas-microsoft-com:vml' } def lemma(text: str): stop_words = set(stopwords.words('english')) txt = text.translate(str.maketrans('', '', string.punctuation)).strip() tokens = [token for token in word_tokenize( txt.lower()) if token not in stop_words] return [lemmatizer.lemmatize(token) for token in tokens] def get_docx_archive(url: str) -> zipfile.ZipFile: """Récupère le docx depuis l'URL et le retourne comme objet ZipFile""" if not url.endswith("zip"): raise ValueError("URL doit pointer vers un fichier ZIP") doc_id = os.path.splitext(os.path.basename(url))[0] resp = requests.get(url, verify=False, headers={ "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) resp.raise_for_status() with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: for file_name in zf.namelist(): if file_name.endswith(".docx"): docx_bytes = zf.read(file_name) return zipfile.ZipFile(io.BytesIO(docx_bytes)) elif file_name.endswith(".doc"): input_path = f"/tmp/{doc_id}.doc" output_path = f"/tmp/{doc_id}.docx" docx_bytes = zf.read(file_name) with open(input_path, "wb") as f: f.write(docx_bytes) subprocess.run([ "libreoffice", "--headless", "--convert-to", "docx", "--outdir", "/tmp", input_path ], check=True) with open(output_path, "rb") as f: docx_bytes = f.read() os.remove(input_path) os.remove(output_path) return zipfile.ZipFile(io.BytesIO(docx_bytes)) raise ValueError("Aucun fichier docx/doc trouvé dans l'archive") def parse_document_xml(docx_zip: zipfile.ZipFile) -> etree._ElementTree: """Parse le document.xml principal""" xml_bytes = docx_zip.read('word/document.xml') parser = etree.XMLParser(remove_blank_text=True) return etree.fromstring(xml_bytes, parser=parser) def clean_document_xml(root: etree._Element) -> None: """Nettoie le XML en modifiant l'arbre directement""" # Suppression des balises et leur contenu for del_elem in root.xpath('//w:del', namespaces=NSMAP): parent = del_elem.getparent() if parent is not None: parent.remove(del_elem) # Désencapsulation des balises for ins_elem in root.xpath('//w:ins', namespaces=NSMAP): parent = ins_elem.getparent() index = parent.index(ins_elem) for child in ins_elem.iterchildren(): parent.insert(index, child) index += 1 parent.remove(ins_elem) # Nettoyage des commentaires for tag in ['w:commentRangeStart', 'w:commentRangeEnd', 'w:commentReference']: for elem in root.xpath(f'//{tag}', namespaces=NSMAP): parent = elem.getparent() if parent is not None: parent.remove(elem) def create_modified_docx(original_zip: zipfile.ZipFile, modified_root: etree._Element) -> bytes: """Crée un nouveau docx avec le XML modifié""" output = io.BytesIO() with zipfile.ZipFile(output, 'w', compression=zipfile.ZIP_DEFLATED) as new_zip: # Copier tous les fichiers non modifiés for file in original_zip.infolist(): if file.filename != 'word/document.xml': new_zip.writestr(file, original_zip.read(file.filename)) # Ajouter le document.xml modifié xml_str = etree.tostring( modified_root, xml_declaration=True, encoding='UTF-8', pretty_print=True ) new_zip.writestr('word/document.xml', xml_str) output.seek(0) return output.getvalue() def docx_to_txt(doc_id: str, url: str): docx_zip = get_docx_archive(url) root = parse_document_xml(docx_zip) clean_document_xml(root) modified_bytes = create_modified_docx(docx_zip, root) input_path = f"/tmp/{doc_id}_cleaned.docx" output_path = f"/tmp/{doc_id}_cleaned.txt" with open(input_path, "wb") as f: f.write(modified_bytes) subprocess.run([ "libreoffice", "--headless", "--convert-to", "txt", "--outdir", "/tmp", input_path ], check=True) with open(output_path, "r", encoding="utf-8") as f: txt_data = [line.strip() for line in f if line.strip()] os.remove(input_path) os.remove(output_path) return txt_data @app.get("/") def render_page(): return FileResponse("index.html") @app.post("/get_meetings", response_model=MeetingsResponse) def get_meetings(req: MeetingsRequest): working_group = req.working_group tsg = re.sub(r"\d+", "", working_group) wg_number = re.search(r"\d", working_group).group(0) logging.debug(tsg, wg_number) url = "https://www.3gpp.org/ftp/tsg_" + tsg logging.debug(url) resp = requests.get(url, verify=False) soup = BeautifulSoup(resp.text, "html.parser") meeting_folders = [] all_meetings = [] wg_folders = [item.get_text() for item in soup.select("tr td a")] selected_folder = None for folder in wg_folders: if "wg" + str(wg_number) in folder.lower(): selected_folder = folder break url += "/" + selected_folder logging.debug(url) if selected_folder: resp = requests.get(url, verify=False) soup = BeautifulSoup(resp.text, "html.parser") meeting_folders = [item.get_text() for item in soup.select("tr td a") if item.get_text( ).startswith("TSG") or (item.get_text().startswith("CT") and "-" in item.get_text())] all_meetings = [working_group + "#" + meeting.split("_", 1)[1].replace("_", " ").replace( "-", " ") if meeting.startswith('TSG') else meeting.replace("-", "#") for meeting in meeting_folders] return MeetingsResponse(meetings=dict(zip(all_meetings, meeting_folders))) @app.post("/get_dataframe", response_model=DataResponse) def get_change_request_dataframe(req: DataRequest): working_group = req.working_group tsg = re.sub(r"\d+", "", working_group) wg_number = re.search(r"\d", working_group).group(0) url = "https://www.3gpp.org/ftp/tsg_" + tsg logging.info("Fetching TDocs dataframe") resp = requests.get(url, verify=False) soup = BeautifulSoup(resp.text, "html.parser") wg_folders = [item.get_text() for item in soup.select("tr td a")] selected_folder = None for folder in wg_folders: if str(wg_number) in folder: selected_folder = folder break url += "/" + selected_folder + "/" + req.meeting + "/docs" resp = requests.get(url, verify=False) soup = BeautifulSoup(resp.text, "html.parser") files = [item.get_text() for item in soup.select("tr td a") if item.get_text().endswith(".xlsx")] def gen_url(tdoc: str): return f"{url}/{tdoc}.zip" df = pd.read_excel(str(url + "/" + files[0]).replace("#", "%23")) filtered_df = df[(((df["Type"] == "CR") & ((df["CR category"] == "B") | (df["CR category"] == "C"))) | (df["Type"] == "pCR")) & ~( df["Uploaded"].isna())][["TDoc", "Title", "CR category", "Source", "Type", "Agenda item", "Agenda item description", "TDoc Status"]] filtered_df["URL"] = filtered_df["TDoc"].apply(gen_url) df = filtered_df.fillna("") return DataResponse(data=df[["TDoc", "Title", "Type", "TDoc Status", "Agenda item description", "URL"]].to_dict(orient="records")) @app.post("/download_tdocs") def download_tdocs(req: DownloadRequest): """Download the specified TDocs and zips them in a single archive""" documents = req.documents logging.info(f"Downloading TDocs: {documents}") def process_document(doc: str): doc_id = doc url = requests.post( 'https://organizedprogrammers-3gppdocfinder.hf.space/find', headers={"Content-Type": "application/json"}, data=json.dumps({"doc_id": doc_id}), verify=False ) print(url.status_code) url = url.json()['url'] print(url) try: txt = "\n".join(docx_to_txt(doc_id, url)) except Exception as e: txt = f"Document {doc_id} text extraction failed: {e}" return doc_id, txt.encode("utf-8") def process_batch(batch): results = {} for doc in batch: try: doc_id, file_bytes = process_document(doc) results[doc_id] = file_bytes except Exception as e: traceback.print_exception(e) results[doc] = b"Erreur" return results documents_bytes = process_batch(documents) zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, mode='w', compression=zipfile.ZIP_DEFLATED) as zip_file: for doc_id, txt_data in documents_bytes.items(): zip_file.writestr(f'{doc_id}.txt', txt_data) zip_buffer.seek(0) return StreamingResponse( zip_buffer, media_type="application/zip" ) @app.post("/generate_requirements", response_model=RequirementsResponse) async def gen_reqs(req: RequirementsRequest, background_tasks: BackgroundTasks): """Extract requirements from the specified TDocs using a LLM""" documents = req.documents n_docs = len(documents) logging.info("Generating requirements for documents: {}".format( [doc.document for doc in documents])) def prompt(doc_id, full): return f"Here's the document whose ID is {doc_id} : {full}\n\nExtract all requirements and group them by context, returning a list of objects where each object includes a document ID, a concise description of the context where the requirements apply (not a chapter title or copied text), and a list of associated requirements; always return the result as a list, even if only one context is found. Remove the errors" async def process_document(doc): doc_id = doc.document url = doc.url try: full = "\n".join(docx_to_txt(doc_id, url)) except Exception as e: traceback.print_exception(e) return RequirementsResponse(requirements=[DocRequirements(document=doc_id, context="Error LLM", requirements=[])]).requirements try: model_used = "gemini-v2" # À adapter si fallback activé async with limiter_mapping[model_used]: resp_ai = await llm_router.acompletion( model=model_used, messages=[ {"role": "user", "content": prompt(doc_id, full)}], response_format=RequirementsResponse ) return RequirementsResponse.model_validate_json(resp_ai.choices[0].message.content).requirements except Exception as e: if "rate limit" in str(e).lower(): try: model_used = "gemini-v2" # À adapter si fallback activé async with limiter_mapping[model_used]: resp_ai = await llm_router.acompletion( model=model_used, messages=[ {"role": "user", "content": prompt(doc_id, full)}], response_format=RequirementsResponse ) return RequirementsResponse.model_validate_json(resp_ai.choices[0].message.content).requirements except Exception as fallback_e: traceback.print_exception(fallback_e) return RequirementsResponse(requirements=[DocRequirements(document=doc_id, context="Error LLM", requirements=[])]).requirements else: traceback.print_exception(e) return RequirementsResponse(requirements=[DocRequirements(document=doc_id, context="Error LLM", requirements=[])]).requirements async def process_batch(batch): results = await asyncio.gather(*(process_document(doc) for doc in batch)) return [item for sublist in results for item in sublist] all_requirements = [] if n_docs <= 30: batch_results = await process_batch(documents) all_requirements.extend(batch_results) else: batch_size = 30 batches = [documents[i:i + batch_size] for i in range(0, n_docs, batch_size)] for i, batch in enumerate(batches): batch_results = await process_batch(batch) all_requirements.extend(batch_results) if i < len(batches) - 1: background_tasks.add_task(asyncio.sleep, 60) return RequirementsResponse(requirements=all_requirements) @app.post("/get_reqs_from_query", response_model=ReqSearchResponse) def find_requirements_from_problem_description(req: ReqSearchRequest): requirements = req.requirements query = req.query requirements_text = "\n".join( [f"[Selection ID: {r.req_id} | Document: {r.document} | Context: {r.context} | Requirement: {r.requirement}]" for r in requirements]) print("Called the LLM") resp_ai = llm_router.completion( model="gemini-v2", messages=[{"role": "user", "content": f"Given all the requirements : \n {requirements_text} \n and the problem description \"{query}\", return a list of 'Selection ID' for the most relevant corresponding requirements that reference or best cover the problem. If none of the requirements covers the problem, simply return an empty list"}], response_format=ReqSearchLLMResponse ) print("Answered") print(resp_ai.choices[0].message.content) out_llm = ReqSearchLLMResponse.model_validate_json( resp_ai.choices[0].message.content).selected if max(out_llm) > len(requirements) - 1: raise HTTPException( status_code=500, detail="LLM error : Generated a wrong index, please try again.") return ReqSearchResponse(requirements=[requirements[i] for i in out_llm])