Reqxtract-v2 / app.py
om4r932's picture
Add query requirements capacity + TODO : Categorization
040cfa1
raw
history blame
13.7 kB
import traceback
from fastapi import FastAPI, BackgroundTasks
from schemas import *
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from litellm.router import Router
from aiolimiter import AsyncLimiter
import pandas as pd
import asyncio
import re
import nltk
nltk.download('stopwords')
nltk.download('punkt_tab')
nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import string
import subprocess
import requests
from dotenv import load_dotenv
load_dotenv()
import os
from lxml import etree
import zipfile
import io
import warnings
warnings.filterwarnings("ignore")
from bs4 import BeautifulSoup
app = FastAPI(title="Requirements Extractor")
app.add_middleware(CORSMiddleware, allow_credentials=True, allow_headers=["*"], allow_methods=["*"], allow_origins=["*"])
llm_router = Router(model_list=[{"model_name": "gemini-v1", "litellm_params": {"model": "gemini/gemini-2.0-flash", "api_key": os.environ.get("GEMINI"), "max_retries": 10, "rpm": 15}},
{"model_name": "gemini-v2", "litellm_params": {"model": "gemini/gemini-2.5-flash", "api_key": os.environ.get("GEMINI"), "max_retries": 10, "rpm": 10}}]
, fallbacks=[{"gemini-v2": ["gemini-v1"]}], num_retries=10)
limiter_mapping = {
model["model_name"]: AsyncLimiter(model["litellm_params"]["rpm"], 60)
for model in llm_router.model_list
}
lemmatizer = WordNetLemmatizer()
NSMAP = {
'w': 'http://schemas.openxmlformats.org/wordprocessingml/2006/main',
'v': 'urn:schemas-microsoft-com:vml'
}
def lemma(text: str):
stop_words = set(stopwords.words('english'))
txt = text.translate(str.maketrans('', '', string.punctuation)).strip()
tokens = [token for token in word_tokenize(txt.lower()) if token not in stop_words]
return [lemmatizer.lemmatize(token) for token in tokens]
def get_docx_archive(url: str) -> zipfile.ZipFile:
"""Récupère le docx depuis l'URL et le retourne comme objet ZipFile"""
if not url.endswith("zip"):
raise ValueError("URL doit pointer vers un fichier ZIP")
doc_id = os.path.splitext(os.path.basename(url))[0]
resp = requests.get(url, verify=False, headers={
"User-Agent": 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
resp.raise_for_status()
with zipfile.ZipFile(io.BytesIO(resp.content)) as zf:
for file_name in zf.namelist():
if file_name.endswith(".docx"):
docx_bytes = zf.read(file_name)
return zipfile.ZipFile(io.BytesIO(docx_bytes))
elif file_name.endswith(".doc"):
input_path = f"/tmp/{doc_id}.doc"
output_path = f"/tmp/{doc_id}.docx"
docx_bytes = zf.read(file_name)
with open(input_path, "wb") as f:
f.write(docx_bytes)
subprocess.run([
"libreoffice",
"--headless",
"--convert-to", "docx",
"--outdir", "/tmp",
input_path
], check=True)
with open(output_path, "rb") as f:
docx_bytes = f.read()
os.remove(input_path)
os.remove(output_path)
return zipfile.ZipFile(io.BytesIO(docx_bytes))
raise ValueError("Aucun fichier docx/doc trouvé dans l'archive")
def parse_document_xml(docx_zip: zipfile.ZipFile) -> etree._ElementTree:
"""Parse le document.xml principal"""
xml_bytes = docx_zip.read('word/document.xml')
parser = etree.XMLParser(remove_blank_text=True)
return etree.fromstring(xml_bytes, parser=parser)
def clean_document_xml(root: etree._Element) -> None:
"""Nettoie le XML en modifiant l'arbre directement"""
# Suppression des balises <w:del> et leur contenu
for del_elem in root.xpath('//w:del', namespaces=NSMAP):
parent = del_elem.getparent()
if parent is not None:
parent.remove(del_elem)
# Désencapsulation des balises <w:ins>
for ins_elem in root.xpath('//w:ins', namespaces=NSMAP):
parent = ins_elem.getparent()
index = parent.index(ins_elem)
for child in ins_elem.iterchildren():
parent.insert(index, child)
index += 1
parent.remove(ins_elem)
# Nettoyage des commentaires
for tag in ['w:commentRangeStart', 'w:commentRangeEnd', 'w:commentReference']:
for elem in root.xpath(f'//{tag}', namespaces=NSMAP):
parent = elem.getparent()
if parent is not None:
parent.remove(elem)
def create_modified_docx(original_zip: zipfile.ZipFile, modified_root: etree._Element) -> bytes:
"""Crée un nouveau docx avec le XML modifié"""
output = io.BytesIO()
with zipfile.ZipFile(output, 'w', compression=zipfile.ZIP_DEFLATED) as new_zip:
# Copier tous les fichiers non modifiés
for file in original_zip.infolist():
if file.filename != 'word/document.xml':
new_zip.writestr(file, original_zip.read(file.filename))
# Ajouter le document.xml modifié
xml_str = etree.tostring(
modified_root,
xml_declaration=True,
encoding='UTF-8',
pretty_print=True
)
new_zip.writestr('word/document.xml', xml_str)
output.seek(0)
return output.getvalue()
def docx_to_txt(doc_id: str, url: str):
docx_zip = get_docx_archive(url)
root = parse_document_xml(docx_zip)
clean_document_xml(root)
modified_bytes = create_modified_docx(docx_zip, root)
input_path = f"/tmp/{doc_id}_cleaned.docx"
output_path = f"/tmp/{doc_id}_cleaned.txt"
with open(input_path, "wb") as f:
f.write(modified_bytes)
subprocess.run([
"libreoffice",
"--headless",
"--convert-to", "txt",
"--outdir", "/tmp",
input_path
], check=True)
with open(output_path, "r", encoding="utf-8") as f:
txt_data = [line.strip() for line in f if line.strip()]
os.remove(input_path)
os.remove(output_path)
return txt_data
@app.get("/")
def render_page():
return FileResponse("index.html")
@app.post("/get_meetings", response_model=MeetingsResponse)
def get_meetings(req: MeetingsRequest):
working_group = req.working_group
tsg = re.sub(r"\d+", "", working_group)
wg_number = re.search(r"\d", working_group).group(0)
url = "https://www.3gpp.org/ftp/tsg_" + tsg
resp = requests.get(url, verify=False)
soup = BeautifulSoup(resp.text, "html.parser")
meeting_folders = []
all_meetings = []
wg_folders = [item.get_text() for item in soup.select("tr td a")]
selected_folder = None
for folder in wg_folders:
if str(wg_number) in folder:
selected_folder = folder
break
url += "/" + selected_folder
if selected_folder:
resp = requests.get(url, verify=False)
soup = BeautifulSoup(resp.text, "html.parser")
meeting_folders = [item.get_text() for item in soup.select("tr td a") if item.get_text().startswith("TSG")]
all_meetings = [working_group + "#" + meeting.split("_", 1)[1].replace("_", " ").replace("-", " ") for meeting in meeting_folders]
return MeetingsResponse(meetings=dict(zip(all_meetings, meeting_folders)))
@app.post("/get_dataframe", response_model=DataResponse)
def get_change_request_dataframe(req: DataRequest):
working_group = req.working_group
tsg = re.sub(r"\d+", "", working_group)
wg_number = re.search(r"\d", working_group).group(0)
url = "https://www.3gpp.org/ftp/tsg_" + tsg
resp = requests.get(url, verify=False)
soup = BeautifulSoup(resp.text, "html.parser")
wg_folders = [item.get_text() for item in soup.select("tr td a")]
selected_folder = None
for folder in wg_folders:
if str(wg_number) in folder:
selected_folder = folder
break
url += "/" + selected_folder + "/" + req.meeting + "/docs"
resp = requests.get(url, verify=False)
soup = BeautifulSoup(resp.text, "html.parser")
files = [item.get_text() for item in soup.select("tr td a") if item.get_text().endswith(".xlsx")]
def gen_url(tdoc: str):
return f"{url}/{tdoc}.zip"
df = pd.read_excel(str(url + "/" + files[0]).replace("#", "%23"))
filtered_df = df[(((df["Type"] == "CR") & ((df["CR category"] == "B") | (df["CR category"] == "C"))) | (df["Type"] == "pCR")) & ~(df["Uploaded"].isna())][["TDoc", "Title", "CR category", "Source", "Type", "Agenda item", "Agenda item description", "TDoc Status"]]
filtered_df["URL"] = filtered_df["TDoc"].apply(gen_url)
df = filtered_df.fillna("")
return DataResponse(data=df[["TDoc", "Title", "Type", "TDoc Status", "Agenda item description", "URL"]].to_dict(orient="records"))
@app.post("/generate_requirements", response_model=RequirementsResponse)
async def gen_reqs(req: RequirementsRequest, background_tasks: BackgroundTasks):
documents = req.documents
n_docs = len(documents)
async def process_document(doc):
doc_id = doc.document
url = doc.url
try:
full = "\n".join(docx_to_txt(doc_id, url))
except Exception as e:
traceback.print_exception(e)
return RequirementsResponse(requirements=[DocRequirements(document=doc_id, context="Error LLM", requirements=[])]).requirements
try:
model_used = "gemini-v2" # À adapter si fallback activé
async with limiter_mapping[model_used]:
resp_ai = await llm_router.acompletion(
model=model_used,
messages=[{"role":"user","content": f"Here's the document whose ID is {doc_id} : {full}\n\nExtract all requirements and group them by context, returning a list of objects where each object includes a document ID, a concise description of the context where the requirements apply (not a chapter title or copied text), and a list of associated requirements; always return the result as a list, even if only one context is found."}],
response_format=RequirementsResponse
)
return RequirementsResponse.model_validate_json(resp_ai.choices[0].message.content).requirements
except Exception as e:
if "rate limit" in str(e).lower():
try:
model_used = "gemini-v2" # À adapter si fallback activé
async with limiter_mapping[model_used]:
resp_ai = await llm_router.acompletion(
model=model_used,
messages=[{"role":"user","content": f"Here's the document whose ID is {doc_id} : {full}\n\nExtract all requirements and group them by context, returning a list of objects where each object includes a document ID, a concise description of the context where the requirements apply (not a chapter title or copied text), and a list of associated requirements; always return the result as a list, even if only one context is found."}],
response_format=RequirementsResponse
)
return RequirementsResponse.model_validate_json(resp_ai.choices[0].message.content).requirements
except Exception as fallback_e:
traceback.print_exception(fallback_e)
return RequirementsResponse(requirements=[DocRequirements(document=doc_id, context="Error LLM", requirements=[])]).requirements
else:
traceback.print_exception(e)
return RequirementsResponse(requirements=[DocRequirements(document=doc_id, context="Error LLM", requirements=[])]).requirements
async def process_batch(batch):
results = await asyncio.gather(*(process_document(doc) for doc in batch))
return [item for sublist in results for item in sublist]
all_requirements = []
if n_docs <= 30:
batch_results = await process_batch(documents)
all_requirements.extend(batch_results)
else:
batch_size = 30
batches = [documents[i:i + batch_size] for i in range(0, n_docs, batch_size)]
for i, batch in enumerate(batches):
batch_results = await process_batch(batch)
all_requirements.extend(batch_results)
if i < len(batches) - 1:
background_tasks.add_task(asyncio.sleep, 60)
return RequirementsResponse(requirements=all_requirements)
@app.post("/get_reqs_from_query", response_model=ReqSearchResponse)
def find_requirements_from_problem_description(req: ReqSearchRequest):
requirements = req.requirements
query = req.query
requirements_text = "\n".join([f"[Document: {r.document} | Context: {r.context} | Requirement: {r.requirement}]" for r in requirements])
print("Called the LLM")
resp_ai = llm_router.completion(
model="gemini-v2",
messages=[{"role":"user","content": f"Given all the requirements : \n {requirements_text} \n and the problem description \"{query}\", return a list of objects each with document ID, context, and requirement for the most relevant requirements that reference or best cover the problem."}],
response_format=ReqSearchResponse
)
print("Answered")
return ReqSearchResponse.model_validate_json(resp_ai.choices[0].message.content)