File size: 8,611 Bytes
1392287
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1b96641
1392287
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
import litellm
import pandas as pd
from pydantic import BaseModel, Field
from typing import Any, List, Dict, Optional
import re
import subprocess
import requests
import os
from lxml import etree
import zipfile
import io
import warnings
warnings.filterwarnings("ignore")
from bs4 import BeautifulSoup

app = FastAPI(title="Requirements Extractor")
app.add_middleware(CORSMiddleware, allow_credentials=True, allow_headers=["*"], allow_methods=["*"], allow_origins=["*"])

class MeetingsRequest(BaseModel):
    working_group: str

class MeetingsResponse(BaseModel):
    meetings: Dict[str, str]

class DataRequest(BaseModel):
    working_group: str
    meeting: str

class DataResponse(BaseModel):
    data: List[Dict[Any, Any]]

class DocRequirements(BaseModel):
    doc_id: str
    context: str
    requirements: List[str]

class DocInfo(BaseModel):
    document: str
    url: str

class RequirementsRequest(BaseModel):
    documents: List[DocInfo]

class RequirementsResponse(BaseModel):
    requirements: List[DocRequirements]

NSMAP = {
    'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main',
    'v': 'urn:schemas-microsoft-com:vml'
}

def get_docx_archive(url: str) -> zipfile.ZipFile:
    """Récupère le docx depuis l'URL et le retourne comme objet ZipFile"""
    if not url.endswith("zip"): 
        raise ValueError("URL doit pointer vers un fichier ZIP")
    
    resp = requests.get(url, verify=False, headers={
        "User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    })
    resp.raise_for_status()

    with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
        for file_name in zf.namelist():
            if file_name.endswith((".docx", ".doc")):
                docx_bytes = zf.read(file_name)
                return zipfile.ZipFile(io.BytesIO(docx_bytes))
    
    raise ValueError("Aucun fichier docx/doc trouvé dans l'archive")

def parse_document_xml(docx_zip: zipfile.ZipFile) -> etree._ElementTree:
    """Parse le document.xml principal"""
    xml_bytes = docx_zip.read('word/document.xml')
    parser = etree.XMLParser(remove_blank_text=True)
    return etree.fromstring(xml_bytes, parser=parser)

def clean_document_xml(root: etree._Element) -> None:
    """Nettoie le XML en modifiant l'arbre directement"""
    # Suppression des balises <w:del> et leur contenu
    for del_elem in root.xpath('//w:del', namespaces=NSMAP):
        parent = del_elem.getparent()
        if parent is not None:
            parent.remove(del_elem)
    
    # Désencapsulation des balises <w:ins>
    for ins_elem in root.xpath('//w:ins', namespaces=NSMAP):
        parent = ins_elem.getparent()
        index = parent.index(ins_elem)
        for child in ins_elem.iterchildren():
            parent.insert(index, child)
            index += 1
        parent.remove(ins_elem)
    
    # Nettoyage des commentaires
    for tag in ['w:commentRangeStart', 'w:commentRangeEnd', 'w:commentReference']:
        for elem in root.xpath(f'//{tag}', namespaces=NSMAP):
            parent = elem.getparent()
            if parent is not None:
                parent.remove(elem)

def create_modified_docx(original_zip: zipfile.ZipFile, modified_root: etree._Element) -> bytes:
    """Crée un nouveau docx avec le XML modifié"""
    output = io.BytesIO()
    
    with zipfile.ZipFile(output, 'w', compression=zipfile.ZIP_DEFLATED) as new_zip:
        # Copier tous les fichiers non modifiés
        for file in original_zip.infolist():
            if file.filename != 'word/document.xml':
                new_zip.writestr(file, original_zip.read(file.filename))
        
        # Ajouter le document.xml modifié
        xml_str = etree.tostring(
            modified_root,
            xml_declaration=True,
            encoding='UTF-8',
            pretty_print=True
        )
        new_zip.writestr('word/document.xml', xml_str)
    
    output.seek(0)
    return output.getvalue()

def docx_to_txt(doc_id: str, url: str):
    docx_zip = get_docx_archive(url)
    root = parse_document_xml(docx_zip)
    clean_document_xml(root)
    modified_bytes = create_modified_docx(docx_zip, root)

    input_path = f"/tmp/{doc_id}_cleaned.docx"
    output_path = f"/tmp/{doc_id}_cleaned.txt"
    with open(input_path, "wb") as f:
        f.write(modified_bytes)
    
    subprocess.run([
        "libreoffice",
        "--headless",
        "--convert-to", "txt",
        "--outdir", "/tmp",
        input_path
    ], check=True)

    with open(output_path, "r", encoding="utf-8") as f:
        txt_data = [line.strip() for line in f if line.strip()]

    os.remove(input_path)
    os.remove(output_path)
    return txt_data

@app.get("/")
def render_page():
    return FileResponse("index.html")

@app.post("/get_meetings", response_model=MeetingsResponse)
def get_meetings(req: MeetingsRequest):
    working_group = req.working_group
    tsg = re.sub(r"\d+", "", working_group)
    wg_number = re.search(r"\d", working_group).group(0)
    url = "https://www.3gpp.org/ftp/tsg_" + tsg
    resp = requests.get(url, verify=False)
    soup = BeautifulSoup(resp.text, "html.parser")
    meeting_folders = []
    all_meetings = []
    wg_folders = [item.get_text() for item in soup.select("tr td a")]
    selected_folder = None
    for folder in wg_folders:
        if str(wg_number) in folder:
            selected_folder = folder
            break

    url += "/" + selected_folder

    if selected_folder:
        resp = requests.get(url, verify=False)
        soup = BeautifulSoup(resp.text, "html.parser")
        meeting_folders = [item.get_text() for item in soup.select("tr td a") if item.get_text().startswith("TSG")]
        all_meetings = [working_group + "#" + meeting.split("_", 1)[1].replace("_", " ").replace("-", " ") for meeting in meeting_folders]
    
    return MeetingsResponse(meetings=dict(zip(all_meetings, meeting_folders)))

@app.post("/get_dataframe", response_model=DataResponse)
def get_change_request_dataframe(req: DataRequest):
    working_group = req.working_group
    tsg = re.sub(r"\d+", "", working_group)
    wg_number = re.search(r"\d", working_group).group(0)
    url = "https://www.3gpp.org/ftp/tsg_" + tsg
    resp = requests.get(url, verify=False)
    soup = BeautifulSoup(resp.text, "html.parser")
    wg_folders = [item.get_text() for item in soup.select("tr td a")]
    selected_folder = None
    for folder in wg_folders:
        if str(wg_number) in folder:
            selected_folder = folder
            break

    url += "/" + selected_folder + "/" + req.meeting + "/docs"
    resp = requests.get(url, verify=False)
    soup = BeautifulSoup(resp.text, "html.parser")
    files = [item.get_text() for item in soup.select("tr td a") if item.get_text().endswith(".xlsx")]

    def gen_url(tdoc: str):
        return f"{url}/{tdoc}.zip"

    df = pd.read_excel(str(url + "/" + files[0]).replace("#", "%23"))
    filtered_df = df[(((df["Type"] == "CR") & ((df["CR category"] == "B") | (df["CR category"] == "C"))) | (df["Type"] == "pCR")) & ~(df["Uploaded"].isna())][["TDoc", "Title", "CR category", "Source", "Type", "Agenda item", "Agenda item description", "TDoc Status"]]
    filtered_df["URL"] = filtered_df["TDoc"].apply(gen_url)

    df = filtered_df.fillna("")
    return DataResponse(data=df[["TDoc", "Title", "Type", "TDoc Status", "Agenda item description", "URL"]].to_dict(orient="records"))

@app.post("/generate_requirements", response_model=RequirementsResponse)
def gen_reqs(req: RequirementsRequest):
    documents = req.documents
    output = []
    for doc in documents:
        doc_id = doc.document
        url = doc.url

        full = "\n".join(docx_to_txt(doc_id, url))

        resp_ai = litellm.completion(
            model="gemini/gemini-2.0-flash",
            api_key=os.environ.get("GEMINI"),
            messages=[{"role":"user","content": f"Here's the document whose ID is {doc_id} with requirements : {full}\n\nI want you to extract all the requirements and give me a context (not giving the section or whatever, a sentence is needed) where that calls for those requirements. If multiples covered contexts is present, make as many requirements list by context as you want."}],
            response_format=DocRequirements
        )

        reqs = DocRequirements.model_validate_json(resp_ai.choices[0].message.content)
        output.append(reqs)
    
    return RequirementsResponse(requirements=output)