from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse import litellm import pandas as pd from pydantic import BaseModel, Field from typing import Any, List, Dict, Optional import re import subprocess import requests import os from lxml import etree import zipfile import io import warnings warnings.filterwarnings("ignore") from bs4 import BeautifulSoup app = FastAPI(title="Requirements Extractor") app.add_middleware(CORSMiddleware, allow_credentials=True, allow_headers=["*"], allow_methods=["*"], allow_origins=["*"]) class MeetingsRequest(BaseModel): working_group: str class MeetingsResponse(BaseModel): meetings: Dict[str, str] class DataRequest(BaseModel): working_group: str meeting: str class DataResponse(BaseModel): data: List[Dict[Any, Any]] class DocRequirements(BaseModel): doc_id: str context: str requirements: List[str] class DocInfo(BaseModel): document: str url: str class RequirementsRequest(BaseModel): documents: List[DocInfo] class RequirementsResponse(BaseModel): requirements: List[DocRequirements] NSMAP = { 'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main', 'v': 'urn:schemas-microsoft-com:vml' } def get_docx_archive(url: str) -> zipfile.ZipFile: """Récupère le docx depuis l'URL et le retourne comme objet ZipFile""" if not url.endswith("zip"): raise ValueError("URL doit pointer vers un fichier ZIP") resp = requests.get(url, verify=False, headers={ "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' }) resp.raise_for_status() with zipfile.ZipFile(io.BytesIO(resp.content)) as zf: for file_name in zf.namelist(): if file_name.endswith((".docx", ".doc")): docx_bytes = zf.read(file_name) return zipfile.ZipFile(io.BytesIO(docx_bytes)) raise ValueError("Aucun fichier docx/doc trouvé dans l'archive") def parse_document_xml(docx_zip: zipfile.ZipFile) -> etree._ElementTree: """Parse le document.xml principal""" xml_bytes = docx_zip.read('word/document.xml') parser = etree.XMLParser(remove_blank_text=True) return etree.fromstring(xml_bytes, parser=parser) def clean_document_xml(root: etree._Element) -> None: """Nettoie le XML en modifiant l'arbre directement""" # Suppression des balises et leur contenu for del_elem in root.xpath('//w:del', namespaces=NSMAP): parent = del_elem.getparent() if parent is not None: parent.remove(del_elem) # Désencapsulation des balises for ins_elem in root.xpath('//w:ins', namespaces=NSMAP): parent = ins_elem.getparent() index = parent.index(ins_elem) for child in ins_elem.iterchildren(): parent.insert(index, child) index += 1 parent.remove(ins_elem) # Nettoyage des commentaires for tag in ['w:commentRangeStart', 'w:commentRangeEnd', 'w:commentReference']: for elem in root.xpath(f'//{tag}', namespaces=NSMAP): parent = elem.getparent() if parent is not None: parent.remove(elem) def create_modified_docx(original_zip: zipfile.ZipFile, modified_root: etree._Element) -> bytes: """Crée un nouveau docx avec le XML modifié""" output = io.BytesIO() with zipfile.ZipFile(output, 'w', compression=zipfile.ZIP_DEFLATED) as new_zip: # Copier tous les fichiers non modifiés for file in original_zip.infolist(): if file.filename != 'word/document.xml': new_zip.writestr(file, original_zip.read(file.filename)) # Ajouter le document.xml modifié xml_str = etree.tostring( modified_root, xml_declaration=True, encoding='UTF-8', pretty_print=True ) new_zip.writestr('word/document.xml', xml_str) output.seek(0) return output.getvalue() def docx_to_txt(doc_id: str, url: str): docx_zip = get_docx_archive(url) root = parse_document_xml(docx_zip) clean_document_xml(root) modified_bytes = create_modified_docx(docx_zip, root) input_path = f"/tmp/{doc_id}_cleaned.docx" output_path = f"/tmp/{doc_id}_cleaned.txt" with open(input_path, "wb") as f: f.write(modified_bytes) subprocess.run([ "libreoffice", "--headless", "--convert-to", "txt", "--outdir", "/tmp", input_path ], check=True) with open(output_path, "r", encoding="utf-8") as f: txt_data = [line.strip() for line in f if line.strip()] os.remove(input_path) os.remove(output_path) return txt_data @app.get("/") def render_page(): return FileResponse("index.html") @app.post("/get_meetings", response_model=MeetingsResponse) def get_meetings(req: MeetingsRequest): working_group = req.working_group tsg = re.sub(r"\d+", "", working_group) wg_number = re.search(r"\d", working_group).group(0) url = "https://www.3gpp.org/ftp/tsg_" + tsg resp = requests.get(url, verify=False) soup = BeautifulSoup(resp.text, "html.parser") meeting_folders = [] all_meetings = [] wg_folders = [item.get_text() for item in soup.select("tr td a")] selected_folder = None for folder in wg_folders: if str(wg_number) in folder: selected_folder = folder break url += "/" + selected_folder if selected_folder: resp = requests.get(url, verify=False) soup = BeautifulSoup(resp.text, "html.parser") meeting_folders = [item.get_text() for item in soup.select("tr td a") if item.get_text().startswith("TSG")] all_meetings = [working_group + "#" + meeting.split("_", 1)[1].replace("_", " ").replace("-", " ") for meeting in meeting_folders] return MeetingsResponse(meetings=dict(zip(all_meetings, meeting_folders))) @app.post("/get_dataframe", response_model=DataResponse) def get_change_request_dataframe(req: DataRequest): working_group = req.working_group tsg = re.sub(r"\d+", "", working_group) wg_number = re.search(r"\d", working_group).group(0) url = "https://www.3gpp.org/ftp/tsg_" + tsg resp = requests.get(url, verify=False) soup = BeautifulSoup(resp.text, "html.parser") wg_folders = [item.get_text() for item in soup.select("tr td a")] selected_folder = None for folder in wg_folders: if str(wg_number) in folder: selected_folder = folder break url += "/" + selected_folder + "/" + req.meeting + "/docs" resp = requests.get(url, verify=False) soup = BeautifulSoup(resp.text, "html.parser") files = [item.get_text() for item in soup.select("tr td a") if item.get_text().endswith(".xlsx")] def gen_url(tdoc: str): return f"{url}/{tdoc}.zip" df = pd.read_excel(str(url + "/" + files[0]).replace("#", "%23")) filtered_df = df[(((df["Type"] == "CR") & ((df["CR category"] == "B") | (df["CR category"] == "C"))) | (df["Type"] == "pCR")) & ~(df["Uploaded"].isna())][["TDoc", "Title", "CR category", "Source", "Type", "Agenda item", "Agenda item description", "TDoc Status"]] filtered_df["URL"] = filtered_df["TDoc"].apply(gen_url) df = filtered_df.fillna("") return DataResponse(data=df[["TDoc", "Title", "Type", "TDoc Status", "Agenda item description", "URL"]].to_dict(orient="records")) @app.post("/generate_requirements", response_model=RequirementsResponse) def gen_reqs(req: RequirementsRequest): documents = req.documents output = [] for doc in documents: doc_id = doc.document url = doc.url full = "\n".join(docx_to_txt(doc_id, url)) resp_ai = litellm.completion( model="gemini/gemini-2.0-flash", api_key=os.environ.get("GEMINI"), messages=[{"role":"user","content": f"Here's the document whose ID is {doc_id} with requirements : {full}\n\nI want you to extract all the requirements and give me a context (not giving the section or whatever, a sentence is needed) where that calls for those requirements. If multiples covered contexts is present, make as many requirements list by context as you want."}], response_format=DocRequirements ) reqs = DocRequirements.model_validate_json(resp_ai.choices[0].message.content) output.append(reqs) return RequirementsResponse(requirements=output)