update from https://github.com/ArneBinder/argumentation-structure-identification/pull/529
d868d2e
verified
import json | |
import os | |
import uuid | |
from dataclasses import dataclass | |
from pathlib import Path | |
from typing import Dict, Optional | |
import requests | |
from bs4 import BeautifulSoup | |
from .grobid_client import GrobidClient | |
from .grobid_util import extract_paper_metadata_from_grobid_xml, parse_bibliography | |
from .s2orc_paper import Paper | |
from .utils import ( | |
_clean_empty_and_duplicate_authors_from_grobid_parse, | |
check_if_citations_are_bracket_style, | |
extract_abstract_from_tei_xml, | |
extract_back_matter_from_tei_xml, | |
extract_body_text_from_tei_xml, | |
extract_figures_and_tables_from_tei_xml, | |
normalize_grobid_id, | |
sub_all_note_tags, | |
) | |
BASE_TEMP_DIR = "./grobid/temp" | |
BASE_OUTPUT_DIR = "./grobid/output" | |
BASE_LOG_DIR = "./grobid/log" | |
def convert_tei_xml_soup_to_s2orc_json(soup: BeautifulSoup, paper_id: str, pdf_hash: str) -> Paper: | |
""" | |
Convert Grobid TEI XML to S2ORC json format | |
:param soup: BeautifulSoup of XML file content | |
:param paper_id: name of file | |
:param pdf_hash: hash of PDF | |
:return: | |
""" | |
# extract metadata | |
metadata = extract_paper_metadata_from_grobid_xml(soup.fileDesc) | |
# clean metadata authors (remove dupes etc) | |
metadata["authors"] = _clean_empty_and_duplicate_authors_from_grobid_parse(metadata["authors"]) | |
# parse bibliography entries (removes empty bib entries) | |
biblio_entries = parse_bibliography(soup) | |
bibkey_map = {normalize_grobid_id(bib["ref_id"]): bib for bib in biblio_entries} | |
# # process formulas and replace with text | |
# extract_formulas_from_tei_xml(soup) | |
# extract figure and table captions | |
refkey_map = extract_figures_and_tables_from_tei_xml(soup) | |
# get bracket style | |
is_bracket_style = check_if_citations_are_bracket_style(soup) | |
# substitute all note tags with p tags | |
soup = sub_all_note_tags(soup) | |
# process abstract if possible | |
abstract_entries = extract_abstract_from_tei_xml( | |
soup, bibkey_map, refkey_map, is_bracket_style | |
) | |
# process body text | |
body_entries = extract_body_text_from_tei_xml(soup, bibkey_map, refkey_map, is_bracket_style) | |
# parse back matter (acks, author statements, competing interests, abbrevs etc) | |
back_matter = extract_back_matter_from_tei_xml(soup, bibkey_map, refkey_map, is_bracket_style) | |
# form final paper entry | |
return Paper( | |
paper_id=paper_id, | |
pdf_hash=pdf_hash, | |
metadata=metadata, | |
abstract=abstract_entries, | |
body_text=body_entries, | |
back_matter=back_matter, | |
bib_entries=bibkey_map, | |
ref_entries=refkey_map, | |
) | |
def convert_tei_xml_file_to_s2orc_json(tei_file: str, pdf_hash: str = "") -> Paper: | |
""" | |
Convert a TEI XML file to S2ORC JSON | |
:param tei_file: | |
:param pdf_hash: | |
:return: | |
""" | |
if not os.path.exists(tei_file): | |
raise FileNotFoundError("Input TEI XML file doesn't exist") | |
paper_id = tei_file.split("/")[-1].split(".")[0] | |
soup = BeautifulSoup(open(tei_file, "rb").read(), "xml") | |
paper = convert_tei_xml_soup_to_s2orc_json(soup, paper_id, pdf_hash) | |
return paper | |
def process_pdf_stream( | |
input_file: str, sha: str, input_stream: bytes, grobid_config: Optional[Dict] = None | |
) -> Dict: | |
""" | |
Process PDF stream | |
:param input_file: | |
:param sha: | |
:param input_stream: | |
:return: | |
""" | |
# process PDF through Grobid -> TEI.XML | |
client = GrobidClient(grobid_config) | |
tei_text = client.process_pdf_stream( | |
input_file, input_stream, "temp", "processFulltextDocument" | |
) | |
# make soup | |
soup = BeautifulSoup(tei_text, "xml") | |
# get paper | |
paper = convert_tei_xml_soup_to_s2orc_json(soup, input_file, sha) | |
return paper.release_json("pdf") | |
def process_pdf_file( | |
input_file: str, | |
temp_dir: str = BASE_TEMP_DIR, | |
output_dir: str = BASE_OUTPUT_DIR, | |
grobid_config: Optional[Dict] = None, | |
verbose: bool = True, | |
) -> str: | |
""" | |
Process a PDF file and get JSON representation | |
:param input_file: | |
:param temp_dir: | |
:param output_dir: | |
:return: | |
""" | |
os.makedirs(temp_dir, exist_ok=True) | |
os.makedirs(output_dir, exist_ok=True) | |
# get paper id as the name of the file | |
paper_id = os.path.splitext(os.path.basename(input_file))[0] | |
tei_file = os.path.join(temp_dir, f"{paper_id}.tei.xml") | |
output_file = os.path.join(output_dir, f"{paper_id}.json") | |
# check if input file exists and output file doesn't | |
if not os.path.exists(input_file): | |
raise FileNotFoundError(f"{input_file} doesn't exist") | |
if os.path.exists(output_file): | |
if verbose: | |
print(f"{output_file} already exists!") | |
return output_file | |
# process PDF through Grobid -> TEI.XML | |
client = GrobidClient(grobid_config) | |
# TODO: compute PDF hash | |
# TODO: add grobid version number to output | |
client.process_pdf(input_file, temp_dir, "processFulltextDocument") | |
# process TEI.XML -> JSON | |
assert os.path.exists(tei_file) | |
paper = convert_tei_xml_file_to_s2orc_json(tei_file) | |
# write to file | |
with open(output_file, "w") as outf: | |
json.dump(paper.release_json(), outf, indent=4, sort_keys=False) | |
return output_file | |
UUID_NAMESPACE = uuid.UUID("bab08d37-ac12-40c4-847a-20ca337742fd") | |
def paper_url_to_uuid(paper_url: str) -> "uuid.UUID": | |
return uuid.uuid5(UUID_NAMESPACE, paper_url) | |
class PDFDownloader: | |
verbose: bool = True | |
def download(self, url: str, opath: str | Path) -> Path: | |
"""Download a pdf file from URL and save locally. | |
Skip if there is a file at `opath` already. | |
Parameters | |
---------- | |
url : str | |
URL of the target PDF file | |
opath : str | |
Path to save downloaded PDF data. | |
""" | |
if os.path.exists(opath): | |
return Path(opath) | |
if not os.path.exists(os.path.dirname(opath)): | |
os.makedirs(os.path.dirname(opath), exist_ok=True) | |
if self.verbose: | |
print(f"Downloading {url} into {opath}") | |
with open(opath, "wb") as f: | |
res = requests.get(url) | |
f.write(res.content) | |
return Path(opath) | |
class FulltextExtractor: | |
def __call__(self, pdf_file_path: Path | str) -> tuple[str, dict] | None: | |
"""Extract plain text from a PDf file""" | |
raise NotImplementedError | |
class GrobidFulltextExtractor(FulltextExtractor): | |
tmp_dir: str = "./tmp/grobid" | |
grobid_config: Optional[Dict] = None | |
section_seperator: str = "\n\n" | |
paragraph_seperator: str = "\n" | |
verbose: bool = True | |
def construct_plain_text(self, extraction_result: dict) -> str: | |
section_strings = [] | |
# add the title, if available (consider it as the first section) | |
title = extraction_result.get("title") | |
if title and title.strip(): | |
section_strings.append(title.strip()) | |
section_paragraphs: dict[str, list[str]] = extraction_result["sections"] | |
section_strings.extend( | |
self.paragraph_seperator.join( | |
# consider the section title as the first paragraph and | |
# remove empty paragraphs | |
filter(lambda s: len(s) > 0, map(lambda s: s.strip(), [section_name] + paragraphs)) | |
) | |
for section_name, paragraphs in section_paragraphs.items() | |
) | |
return self.section_seperator.join(section_strings) | |
def postprocess_extraction_result(self, extraction_result: dict) -> dict: | |
# add sections | |
sections: dict[str, list[str]] = {} | |
for body_text in extraction_result["pdf_parse"]["body_text"]: | |
section_name = body_text["section"] | |
if section_name not in sections.keys(): | |
sections[section_name] = [] | |
sections[section_name] += [body_text["text"]] | |
extraction_result = {**extraction_result, "sections": sections} | |
return extraction_result | |
def __call__(self, pdf_file_path: Path | str) -> tuple[str, dict] | None: | |
"""Extract plain text from a PDf file""" | |
try: | |
extraction_fpath = process_pdf_file( | |
str(pdf_file_path), | |
temp_dir=self.tmp_dir, | |
output_dir=self.tmp_dir, | |
grobid_config=self.grobid_config, | |
verbose=self.verbose, | |
) | |
with open(extraction_fpath, "r") as f: | |
extraction_result = json.load(f) | |
processed_extraction_result = self.postprocess_extraction_result(extraction_result) | |
plain_text = self.construct_plain_text(processed_extraction_result) | |
return plain_text, extraction_result | |
except AssertionError: | |
print("Grobid failed to parse this document.") | |
return None | |