Link2Doc / merge_md.py
n0v33n
Create required file for this space
ff3a25c
import os
from reportlab.lib.pagesizes import A4
from reportlab.lib.units import inch
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from datetime import datetime
import markdown2
from mistralai import Mistral
from pathlib import Path
from urllib.parse import urlparse
import convertapi
import requests
from dotenv import load_dotenv
import re
load_dotenv()
convertapi.api_credentials = os.getenv("CONVERTAPI_TOKEN")
if not convertapi.api_credentials:
raise ValueError("CONVERTAPI_TOKEN environment variable is required")
SUPPORTED_FORMATS = ["pdf", "docx", "txt"]
MAX_FILE_SIZE = int(os.getenv("MAX_FILE_SIZE", 100 * 1024 * 1024))
# TEMP_DIR = os.getenv("TEMP_DIR", "temp")
# In merge_md.py, update temp directory handling
TEMP_DIR = os.getenv("TEMP_DIR", "/tmp/scraper_temp")
# Ensure temp directory exists
os.makedirs(TEMP_DIR, exist_ok=True)
def upload_to_service(file_path: str) -> str:
"""
Mock function to simulate uploading a file to a cloud service.
Args:
file_path (str): Path to the file to upload.
Returns:
str: Mock public URL or error message.
"""
try:
if not os.path.exists(file_path):
return f"File not found: {file_path}"
return f"https://mock-cloud-service.com/{os.path.basename(file_path)}"
except Exception as e:
return f"Error uploading file: {str(e)}"
def convert_from_url(document_url: str, output_format: str) -> str:
"""
Convert a document from a URL to a different format using ConvertAPI.
Args:
document_url (str): The URL of the input file.
output_format (str): The format to convert the file to.
Returns:
str: The path to the converted file or an error message.
"""
try:
if not document_url or not document_url.lower().startswith(("http://", "https://")):
return "Invalid or unsupported URL format."
if output_format not in SUPPORTED_FORMATS:
return f"Unsupported output format: {output_format}"
result = convertapi.convert(output_format, {"File": document_url})
input_filename = Path(urlparse(document_url).path).stem or "converted_file"
output_filename = f"{input_filename}.{output_format}"
output_path = Path(TEMP_DIR) / output_filename
output_path.parent.mkdir(exist_ok=True)
result.file.save(str(output_path))
return str(output_path)
except Exception as e:
return f"Error converting file from URL: {str(e)}"
def merge_md_to_pdf(output_dir, site_name, site_description="", site_category="General"):
"""
Merge all Markdown files in the output directory into a single PDF using reportlab after processing with Mistral AI.
Args:
output_dir (str): Directory containing Markdown files.
site_name (str): Name of the site for the PDF title.
site_description (str): Description of the site.
site_category (str): Category of the site.
Returns:
dict: Result containing success status, output PDF path, and message.
"""
try:
api_key = os.getenv("MISTRAL_API_KEY")
if not api_key:
return {
"success": False,
"error": "MISTRAL_API_KEY environment variable not set",
"output_pdf": None,
"pages_merged": 0
}
client = Mistral(api_key=api_key)
model = "mistral-large-latest"
if not os.path.exists(output_dir):
return {
"success": False,
"error": f"Output directory {output_dir} does not exist",
"output_pdf": None,
"pages_merged": 0
}
md_files = [
f for f in os.listdir(output_dir)
if f.endswith('.md') and f not in ['scraping_summary.md', 'scraping_log.txt']
]
if not md_files:
return {
"success": False,
"error": "No Markdown files found in the output directory",
"output_pdf": None,
"pages_merged": 0
}
pdf_output_path = os.path.join(output_dir, f"{site_name}_merged.pdf")
doc = SimpleDocTemplate(
pdf_output_path,
pagesize=A4,
rightMargin=inch,
leftMargin=inch,
topMargin=inch,
bottomMargin=inch
)
styles = getSampleStyleSheet()
title_style = ParagraphStyle(name='Title', fontSize=24, leading=28, alignment=1, spaceAfter=20)
heading_style = ParagraphStyle(name='Heading2', fontSize=18, leading=22, spaceAfter=15)
body_style = ParagraphStyle(name='Body', fontSize=12, leading=14, spaceAfter=10)
story = [
Paragraph(f"{site_name}", title_style),
Spacer(1, 0.2 * inch),
Paragraph(f"Description: {site_description}", body_style),
Paragraph(f"Category: {site_category}", body_style),
Paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", body_style),
PageBreak(),
Paragraph("Table of Contents", heading_style),
Spacer(1, 0.2 * inch)
]
toc_entries = []
for idx, md_file in enumerate(sorted(md_files), 1):
file_path = os.path.join(output_dir, md_file)
with open(file_path, 'r', encoding='utf-8') as f:
md_content = f.read()
title = md_content.split('\n')[0].strip('#').strip() or f"Page {idx}"
try:
prompt = f"""
You are an expert content editor. Below is the content of a Markdown file. Please enhance the content by making it more detailed, well-structured, and polished while preserving the original meaning. Ensure the output is in plain text suitable for inclusion in a PDF. Avoid adding Markdown or HTML formatting in the response.
If there are HTML tags like <p><strong>Agents-MCP-Hackathon (Agents-MCP-Hackathon)</strong></p>, convert them to plain text like Agents-MCP-Hackathon (Agents-MCP-Hackathon).
Original content:
{md_content}
Enhanced content:
"""
response = client.chat.complete(
model=model,
messages=[{"role": "user", "content": prompt}]
)
enhanced_content = response.choices[0].message.content.strip()
except Exception as e:
print(f"Warning: Failed to process {md_file} with Mistral AI: {str(e)}. Using original content.")
enhanced_content = md_content
html_content = markdown2.markdown(enhanced_content, extras=['fenced-code-blocks', 'tables'])
text_content = re.sub(r'<[^>]+>', '', html_content)
text_content = re.sub(r'\s+', ' ', text_content).strip()
lines = text_content.split('\n')
toc_entries.append(Paragraph(f"{idx}. {title}", body_style))
story.append(Paragraph(title, heading_style))
story.append(Spacer(1, 0.1 * inch))
for line in lines:
if line.strip():
story.append(Paragraph(line.strip(), body_style))
story.append(PageBreak())
story[6:6] = toc_entries + [PageBreak()]
doc.build(story)
return {
"success": True,
"output_pdf": pdf_output_path,
"pages_merged": len(md_files),
"message": f"Successfully merged {len(md_files)} Markdown files into {pdf_output_path} after processing with Mistral AI"
}
except Exception as e:
return {
"success": False,
"error": f"Failed to merge Markdown files into PDF: {str(e)}",
"output_pdf": None,
"pages_merged": 0
}
def merge_md_to_pdf_and_convert_to_url(output_dir, site_name, site_description="", site_category="General", output_format="pdf"):
"""
Merge Markdown files into a PDF, upload it to a service, and optionally convert to another format.
Args:
output_dir (str): Directory containing Markdown files.
site_name (str): Name of the site for the PDF title.
site_description (str): Description of the site.
site_category (str): Category of the site.
output_format (str): Optional format to convert the PDF to (e.g., 'docx', 'txt').
Returns:
dict: Result containing success status, output URL, and message.
"""
try:
merge_result = merge_md_to_pdf(output_dir, site_name, site_description, site_category)
if not merge_result["success"]:
return {
"success": False,
"error": merge_result["error"],
"output_url": None,
"converted_path": None
}
pdf_path = merge_result["output_pdf"]
if not pdf_path or not os.path.exists(pdf_path):
return {
"success": False,
"error": "Generated PDF not found",
"output_url": None,
"converted_path": None
}
pdf_url = upload_to_service(pdf_path)
if not pdf_url.startswith("http"):
return {
"success": False,
"error": f"Failed to obtain URL: {pdf_url}",
"output_url": None,
"converted_path": None
}
converted_path = pdf_path
if output_format != "pdf":
converted_path = convert_from_url(pdf_url, output_format)
if not converted_path.startswith(TEMP_DIR):
return {
"success": False,
"error": f"Conversion failed: {converted_path}",
"output_url": pdf_url,
"converted_path": None
}
return {
"success": True,
"output_url": pdf_url,
"converted_path": converted_path,
"message": f"Successfully merged {merge_result['pages_merged']} Markdown files into PDF and uploaded to {pdf_url}",
"pages_merged": merge_result["pages_merged"]
}
except Exception as e:
return {
"success": False,
"error": f"Error in merging or uploading: {str(e)}",
"output_url": None,
"converted_path": None
}