Spaces:
Build error
Build error
| """ | |
| s3_bulk_download.py | |
| authors: Matt Bierbaum and Colin Clement | |
| date: 2019-02-27 | |
| This module uses AWS to request a signed key url, which requests files | |
| from the ArXiv S3 bucket. It then unpacks and converts the pdfs into text. | |
| Note that at the time of writing the ArXiv manifest, it contains 1.15 TB | |
| of PDFs, which would cost $103 to receive from AWS S3. | |
| see: https://arxiv.org/help/bulk_data_s3 | |
| Usage | |
| ----- | |
| Set DIR_FULLTEXT as the directory where the text parsed from pdfs should be placed. | |
| Set DIR_PDFTARS as the directory where the raw pdf tars should be placed. | |
| ``` | |
| import arxiv_public_data.s3_bulk_download as s3 | |
| # Download manifest file (or load if already downloaded) | |
| manifest = s3.get_manifest() | |
| # Download tar files and convert pdf to text | |
| # Costs money! Will only download if it does not find files | |
| s3.process_manifest_files(manifest) | |
| # If you just want to download the PDFs and not convert to text use | |
| s3.download_check_tarfiles(manifest) | |
| ``` | |
| """ | |
| import os | |
| import re | |
| import gzip | |
| import json | |
| import glob | |
| import shlex | |
| import shutil | |
| import tarfile | |
| import boto3 | |
| import hashlib | |
| import requests | |
| import subprocess | |
| from functools import partial | |
| from multiprocessing import Pool | |
| from collections import defaultdict | |
| import xml.etree.ElementTree as ET | |
| from arxiv_public_data import fulltext | |
| from arxiv_public_data.config import DIR_FULLTEXT, DIR_PDFTARS, LOGGER | |
| logger = LOGGER.getChild('s3') | |
| CHUNK_SIZE = 2**20 # 1MB | |
| BUCKET_NAME = 'arxiv' | |
| S3_PDF_MANIFEST = 'pdf/arXiv_pdf_manifest.xml' | |
| S3_TEX_MANIFEST = 'src/arXiv_src_manifest.xml' | |
| HEADERS = {'x-amz-request-payer': 'requester'} | |
| s3 = boto3.client('s3', region_name='us-east-1') | |
| def download_file(filename, outfile, chunk_size=CHUNK_SIZE, redownload=False, | |
| dryrun=False): | |
| """ | |
| Downloads filename from the ArXiv AWS S3 bucket, and returns streaming md5 | |
| sum of the content | |
| Parameters | |
| ---------- | |
| filename : str | |
| KEY corresponding to AWS bucket file | |
| outfile : stf | |
| name and path of local file in which downloaded file will be stored | |
| (optional) | |
| chunk_size : int | |
| requests byte streaming size (so 500MB are not stored in memory | |
| prior to processing) | |
| redownload : bool | |
| Look to see if file is already downloaded, and simply return md5sum | |
| if it it exists, unless redownload is True | |
| dryrun : bool | |
| If True, only log activity | |
| Returns | |
| ------- | |
| md5sum : str | |
| md5 checksum of the contents of filename | |
| """ | |
| if os.path.exists(outfile) and not redownload: | |
| md5 = hashlib.md5() | |
| md5.update(gzip.open(outfile, 'rb').read()) | |
| return md5.hexdigest() | |
| md5 = hashlib.md5() | |
| url = s3.generate_presigned_url( | |
| "get_object", | |
| Params={ | |
| "Bucket": BUCKET_NAME, "Key": filename, "RequestPayer": 'requester' | |
| } | |
| ) | |
| if not dryrun: | |
| logger.info('Requesting "{}" (costs money!)'.format(filename)) | |
| request = requests.get(url, stream=True) | |
| response_iter = request.iter_content(chunk_size=chunk_size) | |
| logger.info("\t Writing {}".format(outfile)) | |
| with gzip.open(outfile, 'wb') as fout: | |
| for i, chunk in enumerate(response_iter): | |
| fout.write(chunk) | |
| md5.update(chunk) | |
| else: | |
| logger.info('Requesting "{}" (free!)'.format(filename)) | |
| logger.info("\t Writing {}".format(outfile)) | |
| return md5.hexdigest() | |
| def default_manifest_filename(): | |
| return os.path.join(DIR_PDFTARS, 'arxiv-manifest.xml.gz') | |
| def get_manifest(filename=None, redownload=False): | |
| """ | |
| Get the file manifest for the ArXiv | |
| Parameters | |
| ---------- | |
| redownload : bool | |
| If true, forces redownload of manifest even if it exists | |
| Returns | |
| ------- | |
| file_information : list of dicts | |
| each dict contains the file metadata | |
| """ | |
| manifest_file = filename or default_manifest_filename() | |
| md5 = download_file( | |
| S3_PDF_MANIFEST, manifest_file, redownload=redownload, dryrun=False | |
| ) | |
| manifest = gzip.open(manifest_file, 'rb').read() | |
| return parse_manifest(manifest) | |
| def parse_manifest(manifest): | |
| """ | |
| Parse the XML of the ArXiv manifest file. | |
| Parameters | |
| ---------- | |
| manifest : str | |
| xml string from the ArXiv manifest file | |
| Returns | |
| ------- | |
| file_information : list of dicts | |
| One dict for each file, containing the filename, size, md5sum, | |
| and other metadata | |
| """ | |
| root = ET.fromstring(manifest) | |
| return [ | |
| {c.tag: f.find(c.tag).text for c in f.getchildren()} | |
| for f in root.findall('file') | |
| ] | |
| def _tar_to_filename(filename): | |
| return os.path.join(DIR_PDFTARS, os.path.basename(filename)) + '.gz' | |
| def download_check_tarfile(filename, md5_expected, dryrun=False, redownload=False): | |
| """ Download filename, check its md5sum, and form the output path """ | |
| outname = _tar_to_filename(filename) | |
| md5_downloaded = download_file( | |
| filename, outname, dryrun=dryrun, redownload=redownload | |
| ) | |
| if not dryrun: | |
| if md5_expected != md5_downloaded: | |
| msg = "MD5 '{}' does not match expected '{}' for file '{}'".format( | |
| md5_downloaded, md5_expected, filename | |
| ) | |
| raise AssertionError(msg) | |
| return outname | |
| def download_check_tarfiles(list_of_fileinfo, dryrun=False): | |
| """ | |
| Download tar files from the ArXiv manifest and check that their MD5sums | |
| match | |
| Parameters | |
| ---------- | |
| list_of_fileinfo : list | |
| Some elements of results of get_manifest | |
| (optional) | |
| dryrun : bool | |
| If True, only log activity | |
| """ | |
| for fileinfo in list_of_fileinfo: | |
| download_check_tarfile(fileinfo['filename'], fileinfo['md5sum'], dryrun=dryrun) | |
| def call(cmd, dryrun=False, debug=False): | |
| """ Spawn a subprocess and execute the string in cmd """ | |
| if dryrun: | |
| logger.info(cmd) | |
| return 0 | |
| else: | |
| return subprocess.check_call( | |
| shlex.split(cmd), stderr=None if debug else open(os.devnull, 'w') | |
| ) | |
| def _make_pathname(filename): | |
| """ | |
| Make filename path for text document, sorted like on arXiv servers. | |
| Parameters | |
| ---------- | |
| filename : str | |
| string filename of arXiv article | |
| (optional) | |
| Returns | |
| ------- | |
| pathname : str | |
| pathname in which to store the article following | |
| * Old ArXiv IDs: e.g. hep-ph0001001.txt returns | |
| DIR_PDFTARS/hep-ph/0001/hep-ph0001001.txt | |
| * New ArXiv IDs: e.g. 1501.13851.txt returns | |
| DIR_PDFTARS/arxiv/1501/1501.13851.txt | |
| """ | |
| basename = os.path.basename(filename) | |
| fname = os.path.splitext(basename)[0] | |
| if '.' in fname: # new style ArXiv ID | |
| yearmonth = fname.split('.')[0] | |
| return os.path.join(DIR_FULLTEXT, 'arxiv', yearmonth, basename) | |
| # old style ArXiv ID | |
| cat, aid = re.split(r'(\d+)', fname)[:2] | |
| yearmonth = aid[:4] | |
| return os.path.join(DIR_FULLTEXT, cat, yearmonth, basename) | |
| def process_tarfile_inner(filename, pdfnames=None, processes=1, dryrun=False, | |
| timelimit=fulltext.TIMELIMIT): | |
| outname = _tar_to_filename(filename) | |
| if not os.path.exists(outname): | |
| msg = 'Tarfile from manifest not found {}, skipping...'.format(outname) | |
| logger.error(msg) | |
| return | |
| # unpack tar file | |
| if pdfnames: | |
| namelist = ' '.join(pdfnames) | |
| cmd = 'tar --one-top-level -C {} -xf {} {}' | |
| cmd = cmd.format(DIR_PDFTARS, outname, namelist) | |
| else: | |
| cmd = 'tar --one-top-level -C {} -xf {}'.format(DIR_PDFTARS, outname) | |
| _call(cmd, dryrun) | |
| basename = os.path.splitext(os.path.basename(filename))[0] | |
| pdfdir = os.path.join(DIR_PDFTARS, basename, basename.split('_')[2]) | |
| # Run fulltext to convert pdfs in tardir into *.txt | |
| converts = fulltext.convert_directory_parallel( | |
| pdfdir, processes=processes, timelimit=timelimit | |
| ) | |
| # move txt into final file structure | |
| txtfiles = glob.glob('{}/*.txt'.format(pdfdir)) | |
| for tf in txtfiles: | |
| mvfn = _make_pathname(tf) | |
| dirname = os.path.dirname(mvfn) | |
| if not os.path.exists(dirname): | |
| _call('mkdir -p {}'.format(dirname), dryrun) | |
| if not dryrun: | |
| shutil.move(tf, mvfn) | |
| # clean up pdfs | |
| _call('rm -rf {}'.format(os.path.join(DIR_PDFTARS, basename)), dryrun) | |
| def process_tarfile(fileinfo, pdfnames=None, dryrun=False, debug=False, processes=1): | |
| """ | |
| Download and process one of the tar files from the ArXiv manifest. | |
| Download, unpack, and spawn the Docker image for converting pdf2text. | |
| It will only try to download the file if it does not already exist. | |
| The tar file will be stored in DIR_FULLTEXT/<fileinfo[filename](tar)> and the | |
| resulting arXiv articles will be stored in the subdirectory | |
| DIR_FULLTEXT/arxiv/<yearmonth>/<aid>.txt for old style arXiv IDs and | |
| DIR_FULLTEXT/<category>/<yearmonth>/<aid>.txt for new style arXiv IDs. | |
| Parameters | |
| ---------- | |
| fileinfo : dict | |
| dictionary of file information from parse_manifest | |
| (optional) | |
| dryrun : bool | |
| If True, only log activity | |
| debug : bool | |
| Silence stderr of Docker _call if debug is False | |
| """ | |
| filename = fileinfo['filename'] | |
| md5sum = fileinfo['md5sum'] | |
| if check_if_any_processed(fileinfo): | |
| logger.info('Tar file appears processed, skipping {}...'.format(filename)) | |
| return | |
| logger.info('Processing tar "{}" ...'.format(filename)) | |
| process_tarfile_inner(filename, pdfnames=None, processes=processes, dryrun=dryrun) | |
| def process_manifest_files(list_of_fileinfo, processes=1, dryrun=False): | |
| """ | |
| Download PDFs from the ArXiv AWS S3 bucket and convert each pdf to text | |
| Parameters. If files are already downloaded, it will only process them. | |
| ---------- | |
| list_of_fileinfo : list | |
| Some elements of results of get_manifest | |
| (optional) | |
| processes : int | |
| number of paralell workers to spawn (roughly as many CPUs as you have) | |
| dryrun : bool | |
| If True, only log activity | |
| """ | |
| for fileinfo in list_of_fileinfo: | |
| process_tarfile(fileinfo, dryrun=dryrun, processes=processes) | |
| def check_if_any_processed(fileinfo): | |
| """ | |
| Spot check a tarfile to see if the pdfs have been converted to text, | |
| given an element of the s3 manifest | |
| """ | |
| first = _make_pathname(fileinfo['first_item']+'.txt') | |
| last = _make_pathname(fileinfo['last_item']+'.txt') | |
| return os.path.exists(first) and os.path.exists(last) | |
| def generate_tarfile_indices(manifest): | |
| """ | |
| Go through the manifest and for every tarfile, get a list of the PDFs | |
| that should be contained within it. This is a separate function because | |
| even checking the tars is rather slow. | |
| Returns | |
| ------- | |
| index : dictionary | |
| keys: tarfile, values: list of pdfs | |
| """ | |
| index = {} | |
| for fileinfo in manifest: | |
| name = fileinfo['filename'] | |
| logger.info("Indexing {}...".format(name)) | |
| tarname = os.path.join(DIR_PDFTARS, os.path.basename(name))+'.gz' | |
| files = [i for i in tarfile.open(tarname).getnames() if i.endswith('.pdf')] | |
| index[name] = files | |
| return index | |
| def check_missing_txt_files(index): | |
| """ | |
| Use the index file from `generate_tarfile_indices` to check which pdf->txt | |
| conversions are outstanding. | |
| """ | |
| missing = defaultdict(list) | |
| for tar, pdflist in index.items(): | |
| logger.info("Checking {}...".format(tar)) | |
| for pdf in pdflist: | |
| txt = _make_pathname(pdf).replace('.pdf', '.txt') | |
| if not os.path.exists(txt): | |
| missing[tar].append(pdf) | |
| return missing | |
| def rerun_missing(missing, processes=1): | |
| """ | |
| Use the output of `check_missing_txt_files` to attempt to rerun the text | |
| files which are missing from the conversion. There are various reasons | |
| that they can fail. | |
| """ | |
| sort = list(reversed( | |
| sorted([(k, v) for k, v in missing.items()], key=lambda x: len(x[1])) | |
| )) | |
| for tar, names in sort: | |
| logger.info("Running {} ({} to do)...".format(tar, len(names))) | |
| process_tarfile_inner( | |
| tar, pdfnames=names, processes=processes, | |
| timelimit=5 * fulltext.TIMELIMIT | |
| ) | |
| def process_missing(manifest, processes=1): | |
| """ | |
| Do the full process of figuring what is missing and running them | |
| """ | |
| indexfile = os.path.join(DIR_PDFTARS, 'manifest-index.json') | |
| if not os.path.exists(indexfile): | |
| index = generate_tarfile_indices(manifest) | |
| json.dump(index, open(indexfile, 'w')) | |
| index = json.load(open(indexfile)) | |
| missing = check_missing_txt_files(index) | |
| rerun_missing(missing, processes=processes) | |