|
|
|
|
|
import argparse |
|
import logging |
|
import os |
|
from pathlib import Path |
|
import re |
|
import shutil |
|
import tempfile |
|
from urllib.parse import urlparse |
|
import uuid |
|
import urllib |
|
import time |
|
|
|
import requests |
|
|
|
from project_settings import environment, project_path |
|
from toolbox.to_markdown.base_to_markdown import BaseToMarkdown |
|
|
|
from alibabacloud_docmind_api20220711.client import Client as docmind_api20220711Client |
|
from alibabacloud_tea_openapi import models as open_api_models |
|
from alibabacloud_docmind_api20220711 import models as docmind_api20220711_models |
|
from alibabacloud_tea_util import models as util_models |
|
from alibabacloud_credentials.client import Client as CredClient |
|
|
|
logger = logging.getLogger("toolbox") |
|
|
|
|
|
def get_args(): |
|
parser = argparse.ArgumentParser() |
|
parser.add_argument( |
|
"--filename", |
|
|
|
default="https://aclanthology.org/2024.naacl-long.35.pdf", |
|
type=str |
|
) |
|
args = parser.parse_args() |
|
return args |
|
|
|
|
|
@BaseToMarkdown.register("aliyun") |
|
class AliyunToMarkdown(BaseToMarkdown): |
|
""" |
|
https://help.aliyun.com/zh/document-mind/developer-reference/document-parsing-large-model-version |
|
""" |
|
def __init__(self, |
|
filename: str, |
|
endpoint: str = "docmind-api.cn-hangzhou.aliyuncs.com", |
|
access_key_id: str = None, |
|
access_key_secret: str = None, |
|
): |
|
super().__init__(filename) |
|
self.filename_or_url = self.filename |
|
self.endpoint = endpoint |
|
|
|
if access_key_id is None or access_key_secret is None: |
|
self.access_key_id, self.access_key_secret = self.get_access_key() |
|
else: |
|
self.access_key_id = access_key_id |
|
self.access_key_secret = access_key_secret |
|
|
|
self.client = self.get_client() |
|
self.doc_mind_id: str = None |
|
self.status: str = None |
|
self.layouts: list = None |
|
|
|
self.image_count = 0 |
|
|
|
@staticmethod |
|
def get_access_key(): |
|
cred = CredClient().get_credential() |
|
|
|
access_key_id = cred.get_access_key_id() |
|
access_key_secret = cred.get_access_key_secret() |
|
return access_key_id, access_key_secret |
|
|
|
def get_client(self): |
|
config = open_api_models.Config( |
|
access_key_id=self.access_key_id, |
|
access_key_secret=self.access_key_secret, |
|
) |
|
config.endpoint = self.endpoint |
|
client = docmind_api20220711Client(config) |
|
return client |
|
|
|
def submit_url(self, url: str, filename_extension: str): |
|
request = docmind_api20220711_models.SubmitDocParserJobRequest( |
|
file_url=url, |
|
file_name_extension=filename_extension, |
|
) |
|
|
|
try: |
|
response = self.client.submit_doc_parser_job(request) |
|
doc_mind_id = response.body.data.id |
|
except Exception as error: |
|
print(f"submit file failed. type: {type(error)}, text: {str(error)}") |
|
raise error |
|
|
|
return doc_mind_id |
|
|
|
def submit_file(self, filename: str, filename_extension: str): |
|
request = docmind_api20220711_models.SubmitDocParserJobAdvanceRequest( |
|
file_url_object=open(filename, "rb"), |
|
file_name_extension=filename_extension, |
|
) |
|
|
|
runtime = util_models.RuntimeOptions() |
|
|
|
try: |
|
response = self.client.submit_doc_parser_job_advance(request, runtime) |
|
doc_mind_id = response.body.data.id |
|
except Exception as error: |
|
print(f"submit file failed. type: {type(error)}, text: {str(error)}") |
|
raise error |
|
|
|
return doc_mind_id |
|
|
|
def query(self, doc_mind_id: str): |
|
request = docmind_api20220711_models.QueryDocParserStatusRequest( |
|
id=doc_mind_id, |
|
) |
|
|
|
try: |
|
response = self.client.query_doc_parser_status(request) |
|
result = response.body.data |
|
except Exception as error: |
|
print(f"query failed. type: {type(error)}, text: {str(error)}") |
|
raise error |
|
|
|
return result |
|
|
|
def query_result(self, doc_mind_id: str, layout_num: int = 0, layout_step_size: int = 10): |
|
request = docmind_api20220711_models.GetDocParserResultRequest( |
|
id=doc_mind_id, |
|
layout_num=layout_num, |
|
layout_step_size = layout_step_size, |
|
) |
|
|
|
try: |
|
response = self.client.get_doc_parser_result(request) |
|
result = response.body.data |
|
except Exception as error: |
|
print(f"query failed. type: {type(error)}, text: {str(error)}") |
|
raise error |
|
|
|
return result |
|
|
|
def get_layouts(self, |
|
doc_mind_id: str = None, |
|
layout_step_size: int = 10, |
|
): |
|
if doc_mind_id is None and self.layouts is not None: |
|
return self.layouts |
|
|
|
doc_mind_id = doc_mind_id or self.doc_mind_id |
|
|
|
if self.status is None: |
|
js = self.query(doc_mind_id) |
|
self.status = js.status |
|
elif self.status == "failed": |
|
raise AssertionError("status: failed. ") |
|
|
|
layout_num = 0 |
|
layouts_list = list() |
|
while True: |
|
js = self.query_result( |
|
doc_mind_id=doc_mind_id, |
|
layout_num=layout_num, |
|
layout_step_size=layout_step_size |
|
) |
|
|
|
layouts = js["layouts"] |
|
if len(layouts) == 0: |
|
break |
|
layouts_list.extend(layouts) |
|
layout_num += layout_step_size |
|
return layouts_list |
|
|
|
def get_md_text(self, |
|
doc_mind_id: str = None, |
|
layout_step_size: int = 10, |
|
with_images: bool = True, |
|
with_formula: bool = True, |
|
with_table: bool = True, |
|
): |
|
result = "" |
|
layouts = self.get_layouts(doc_mind_id, layout_step_size) |
|
|
|
for layout in layouts: |
|
type_ = layout["type"] |
|
sub_type_ = layout["subType"] |
|
markdown_content_ = layout["markdownContent"] |
|
|
|
if type_ == "title": |
|
result += markdown_content_ |
|
elif type_ == "text": |
|
result += markdown_content_ |
|
elif type_ == "corner_note": |
|
result += markdown_content_ |
|
elif type_ == "contents_title" and sub_type_ == "cate_title": |
|
result += markdown_content_ |
|
elif type_ == "contents_title" and sub_type_ == "none": |
|
result += markdown_content_ |
|
elif type_ == "contents" and sub_type_ == "cate": |
|
result += markdown_content_ |
|
elif type_ == "multicolumn" and sub_type_ == "none": |
|
result += markdown_content_ |
|
elif type_ == "stamp" and sub_type_ == "none": |
|
continue |
|
elif type_ == "side" and sub_type_ == "sidebar": |
|
continue |
|
elif type_ == "side" and sub_type_ == "none": |
|
continue |
|
elif type_ == "head_image" and sub_type_ == "none": |
|
continue |
|
elif type_ == "foot_image" and sub_type_ == "none": |
|
continue |
|
elif type_ == "embedded" and sub_type_ == "none": |
|
continue |
|
elif type_ == "figure" and sub_type_ == "picture": |
|
if with_images: |
|
result += markdown_content_ |
|
elif type_ == "figure" and sub_type_ == "picture": |
|
if with_images: |
|
result += markdown_content_ |
|
elif type_ == "figure" and sub_type_ == "logo": |
|
if with_images: |
|
result += markdown_content_ |
|
elif type_ == "figure" and sub_type_ == "none": |
|
if with_images: |
|
result += markdown_content_ |
|
elif type_ == "figure_name" and sub_type_ == "none": |
|
if with_images: |
|
result += markdown_content_ |
|
elif type_ == "figure_name" and sub_type_ == "pic_title": |
|
if with_images: |
|
result += markdown_content_ |
|
elif type_ == "formula" and sub_type_ == "formula": |
|
if with_formula: |
|
result += markdown_content_ |
|
elif type_ == "formula" and sub_type_ == "none": |
|
if with_formula: |
|
result += markdown_content_ |
|
elif type_ == "table" and sub_type_ == "none": |
|
if with_table: |
|
result += markdown_content_ |
|
elif type_ == "table_name" and sub_type_ == "none": |
|
if with_table: |
|
result += markdown_content_ |
|
else: |
|
print(type_) |
|
print(sub_type_) |
|
print(markdown_content_) |
|
print(layout) |
|
result += markdown_content_ |
|
|
|
return result |
|
|
|
def save_to_zip(self, output_dir: str): |
|
is_url = self.is_url(self.filename_or_url) |
|
filename_extension = self.get_extension_name(self.filename_or_url, is_url=is_url) |
|
|
|
|
|
if is_url: |
|
doc_mind_id = self.submit_url(url=self.filename_or_url, filename_extension=filename_extension) |
|
else: |
|
doc_mind_id = self.submit_file(filename=self.filename_or_url, filename_extension=filename_extension) |
|
logger.info(f"doc_mind_id: {doc_mind_id}, filename: {self.filename_or_url}") |
|
|
|
|
|
while True: |
|
js = self.query(doc_mind_id=doc_mind_id) |
|
status = js.status |
|
if status is None: |
|
time.sleep(1) |
|
continue |
|
elif status == "init": |
|
time.sleep(1) |
|
continue |
|
elif status == "processing": |
|
time.sleep(1) |
|
continue |
|
elif status == "failed": |
|
raise AssertionError("failed. ") |
|
elif status == "success": |
|
break |
|
else: |
|
raise AssertionError(f"unexpected status: {status}") |
|
|
|
|
|
md_text = self.get_md_text( |
|
doc_mind_id=doc_mind_id, |
|
) |
|
|
|
|
|
basename = str(uuid.uuid4()) |
|
temp_dir = Path(tempfile.gettempdir()) / basename |
|
temp_dir.mkdir(parents=True, exist_ok=False) |
|
|
|
|
|
md_text = self.convert_image_to_local( |
|
markdown_text=md_text, |
|
data_dir=temp_dir.as_posix(), |
|
image_folder="media", |
|
) |
|
|
|
|
|
md_file = temp_dir / f"{basename}.md" |
|
with open(md_file.as_posix(), "w", encoding="utf-8") as f: |
|
f.write(md_text) |
|
|
|
|
|
output_zip_file = os.path.join(output_dir, f"{basename}.zip") |
|
self.zip_directory(temp_dir, output_zip_file) |
|
shutil.rmtree(temp_dir) |
|
return output_zip_file |
|
|
|
def save_image(self, |
|
image_url: str, |
|
data_dir: str = "media", |
|
image_folder: str = "media", |
|
): |
|
parse_result = urlparse(image_url) |
|
image_name = Path(parse_result.path).name |
|
|
|
filename = Path(data_dir) / image_folder / image_name |
|
filename.parent.mkdir(parents=True, exist_ok=True) |
|
|
|
resp = requests.get(image_url) |
|
with open(filename.as_posix(), "wb") as f: |
|
f.write(resp.content) |
|
|
|
return filename |
|
|
|
def convert_image_to_local(self, |
|
markdown_text: str, |
|
data_dir: str, |
|
image_folder: str = "media", |
|
): |
|
|
|
pattern1 = r'\!\[(?:.*?)\]\((.+?)\)' |
|
|
|
def replace(match): |
|
image_url = match.group(1) |
|
filename = self.save_image(image_url, data_dir, image_folder) |
|
relative_path = Path(filename).relative_to(data_dir) |
|
image_name = relative_path.name |
|
result = f"})" |
|
return result |
|
markdown_text = re.sub(pattern1, replace, markdown_text) |
|
|
|
return markdown_text |
|
|
|
@staticmethod |
|
def is_url(string: str): |
|
try: |
|
result = urlparse(string) |
|
return all([result.scheme, result.netloc]) |
|
except ValueError: |
|
return False |
|
|
|
def get_extension_name(self, filename_or_url: str, is_url: bool = False): |
|
if is_url: |
|
parse_result = urlparse(filename_or_url) |
|
path = parse_result.path |
|
_, filename_extension = os.path.splitext(path) |
|
else: |
|
_, filename_extension = os.path.splitext(filename_or_url) |
|
filename_extension = filename_extension[1:] |
|
filename_extension = filename_extension.lower() |
|
return filename_extension |
|
|
|
|
|
def main(): |
|
args = get_args() |
|
|
|
aliyun = AliyunToMarkdown( |
|
filename=args.filename, |
|
) |
|
|
|
output_zip_file = aliyun.save_to_zip(output_dir=".") |
|
print(output_zip_file) |
|
|
|
return |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|