Spaces:
Running
Running
import os | |
from reportlab.lib.pagesizes import A4 | |
from reportlab.lib.units import inch | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak | |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
from datetime import datetime | |
import markdown2 | |
from mistralai import Mistral | |
from pathlib import Path | |
from urllib.parse import urlparse | |
import convertapi | |
import requests | |
from dotenv import load_dotenv | |
import re | |
load_dotenv() | |
convertapi.api_credentials = os.getenv("CONVERTAPI_TOKEN") | |
if not convertapi.api_credentials: | |
raise ValueError("CONVERTAPI_TOKEN environment variable is required") | |
SUPPORTED_FORMATS = ["pdf", "docx", "txt"] | |
MAX_FILE_SIZE = int(os.getenv("MAX_FILE_SIZE", 100 * 1024 * 1024)) | |
# TEMP_DIR = os.getenv("TEMP_DIR", "temp") | |
# In merge_md.py, update temp directory handling | |
TEMP_DIR = os.getenv("TEMP_DIR", "/tmp/scraper_temp") | |
# Ensure temp directory exists | |
os.makedirs(TEMP_DIR, exist_ok=True) | |
def upload_to_service(file_path: str) -> str: | |
""" | |
Mock function to simulate uploading a file to a cloud service. | |
Args: | |
file_path (str): Path to the file to upload. | |
Returns: | |
str: Mock public URL or error message. | |
""" | |
try: | |
if not os.path.exists(file_path): | |
return f"File not found: {file_path}" | |
return f"https://mock-cloud-service.com/{os.path.basename(file_path)}" | |
except Exception as e: | |
return f"Error uploading file: {str(e)}" | |
def convert_from_url(document_url: str, output_format: str) -> str: | |
""" | |
Convert a document from a URL to a different format using ConvertAPI. | |
Args: | |
document_url (str): The URL of the input file. | |
output_format (str): The format to convert the file to. | |
Returns: | |
str: The path to the converted file or an error message. | |
""" | |
try: | |
if not document_url or not document_url.lower().startswith(("http://", "https://")): | |
return "Invalid or unsupported URL format." | |
if output_format not in SUPPORTED_FORMATS: | |
return f"Unsupported output format: {output_format}" | |
result = convertapi.convert(output_format, {"File": document_url}) | |
input_filename = Path(urlparse(document_url).path).stem or "converted_file" | |
output_filename = f"{input_filename}.{output_format}" | |
output_path = Path(TEMP_DIR) / output_filename | |
output_path.parent.mkdir(exist_ok=True) | |
result.file.save(str(output_path)) | |
return str(output_path) | |
except Exception as e: | |
return f"Error converting file from URL: {str(e)}" | |
def merge_md_to_pdf(output_dir, site_name, site_description="", site_category="General"): | |
""" | |
Merge all Markdown files in the output directory into a single PDF using reportlab after processing with Mistral AI. | |
Args: | |
output_dir (str): Directory containing Markdown files. | |
site_name (str): Name of the site for the PDF title. | |
site_description (str): Description of the site. | |
site_category (str): Category of the site. | |
Returns: | |
dict: Result containing success status, output PDF path, and message. | |
""" | |
try: | |
api_key = os.getenv("MISTRAL_API_KEY") | |
if not api_key: | |
return { | |
"success": False, | |
"error": "MISTRAL_API_KEY environment variable not set", | |
"output_pdf": None, | |
"pages_merged": 0 | |
} | |
client = Mistral(api_key=api_key) | |
model = "mistral-large-latest" | |
if not os.path.exists(output_dir): | |
return { | |
"success": False, | |
"error": f"Output directory {output_dir} does not exist", | |
"output_pdf": None, | |
"pages_merged": 0 | |
} | |
md_files = [ | |
f for f in os.listdir(output_dir) | |
if f.endswith('.md') and f not in ['scraping_summary.md', 'scraping_log.txt'] | |
] | |
if not md_files: | |
return { | |
"success": False, | |
"error": "No Markdown files found in the output directory", | |
"output_pdf": None, | |
"pages_merged": 0 | |
} | |
pdf_output_path = os.path.join(output_dir, f"{site_name}_merged.pdf") | |
doc = SimpleDocTemplate( | |
pdf_output_path, | |
pagesize=A4, | |
rightMargin=inch, | |
leftMargin=inch, | |
topMargin=inch, | |
bottomMargin=inch | |
) | |
styles = getSampleStyleSheet() | |
title_style = ParagraphStyle(name='Title', fontSize=24, leading=28, alignment=1, spaceAfter=20) | |
heading_style = ParagraphStyle(name='Heading2', fontSize=18, leading=22, spaceAfter=15) | |
body_style = ParagraphStyle(name='Body', fontSize=12, leading=14, spaceAfter=10) | |
story = [ | |
Paragraph(f"{site_name}", title_style), | |
Spacer(1, 0.2 * inch), | |
Paragraph(f"Description: {site_description}", body_style), | |
Paragraph(f"Category: {site_category}", body_style), | |
Paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", body_style), | |
PageBreak(), | |
Paragraph("Table of Contents", heading_style), | |
Spacer(1, 0.2 * inch) | |
] | |
toc_entries = [] | |
for idx, md_file in enumerate(sorted(md_files), 1): | |
file_path = os.path.join(output_dir, md_file) | |
with open(file_path, 'r', encoding='utf-8') as f: | |
md_content = f.read() | |
title = md_content.split('\n')[0].strip('#').strip() or f"Page {idx}" | |
try: | |
prompt = f""" | |
You are an expert content editor. Below is the content of a Markdown file. Please enhance the content by making it more detailed, well-structured, and polished while preserving the original meaning. Ensure the output is in plain text suitable for inclusion in a PDF. Avoid adding Markdown or HTML formatting in the response. | |
If there are HTML tags like <p><strong>Agents-MCP-Hackathon (Agents-MCP-Hackathon)</strong></p>, convert them to plain text like Agents-MCP-Hackathon (Agents-MCP-Hackathon). | |
Original content: | |
{md_content} | |
Enhanced content: | |
""" | |
response = client.chat.complete( | |
model=model, | |
messages=[{"role": "user", "content": prompt}] | |
) | |
enhanced_content = response.choices[0].message.content.strip() | |
except Exception as e: | |
print(f"Warning: Failed to process {md_file} with Mistral AI: {str(e)}. Using original content.") | |
enhanced_content = md_content | |
html_content = markdown2.markdown(enhanced_content, extras=['fenced-code-blocks', 'tables']) | |
text_content = re.sub(r'<[^>]+>', '', html_content) | |
text_content = re.sub(r'\s+', ' ', text_content).strip() | |
lines = text_content.split('\n') | |
toc_entries.append(Paragraph(f"{idx}. {title}", body_style)) | |
story.append(Paragraph(title, heading_style)) | |
story.append(Spacer(1, 0.1 * inch)) | |
for line in lines: | |
if line.strip(): | |
story.append(Paragraph(line.strip(), body_style)) | |
story.append(PageBreak()) | |
story[6:6] = toc_entries + [PageBreak()] | |
doc.build(story) | |
return { | |
"success": True, | |
"output_pdf": pdf_output_path, | |
"pages_merged": len(md_files), | |
"message": f"Successfully merged {len(md_files)} Markdown files into {pdf_output_path} after processing with Mistral AI" | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"error": f"Failed to merge Markdown files into PDF: {str(e)}", | |
"output_pdf": None, | |
"pages_merged": 0 | |
} | |
def merge_md_to_pdf_and_convert_to_url(output_dir, site_name, site_description="", site_category="General", output_format="pdf"): | |
""" | |
Merge Markdown files into a PDF, upload it to a service, and optionally convert to another format. | |
Args: | |
output_dir (str): Directory containing Markdown files. | |
site_name (str): Name of the site for the PDF title. | |
site_description (str): Description of the site. | |
site_category (str): Category of the site. | |
output_format (str): Optional format to convert the PDF to (e.g., 'docx', 'txt'). | |
Returns: | |
dict: Result containing success status, output URL, and message. | |
""" | |
try: | |
merge_result = merge_md_to_pdf(output_dir, site_name, site_description, site_category) | |
if not merge_result["success"]: | |
return { | |
"success": False, | |
"error": merge_result["error"], | |
"output_url": None, | |
"converted_path": None | |
} | |
pdf_path = merge_result["output_pdf"] | |
if not pdf_path or not os.path.exists(pdf_path): | |
return { | |
"success": False, | |
"error": "Generated PDF not found", | |
"output_url": None, | |
"converted_path": None | |
} | |
pdf_url = upload_to_service(pdf_path) | |
if not pdf_url.startswith("http"): | |
return { | |
"success": False, | |
"error": f"Failed to obtain URL: {pdf_url}", | |
"output_url": None, | |
"converted_path": None | |
} | |
converted_path = pdf_path | |
if output_format != "pdf": | |
converted_path = convert_from_url(pdf_url, output_format) | |
if not converted_path.startswith(TEMP_DIR): | |
return { | |
"success": False, | |
"error": f"Conversion failed: {converted_path}", | |
"output_url": pdf_url, | |
"converted_path": None | |
} | |
return { | |
"success": True, | |
"output_url": pdf_url, | |
"converted_path": converted_path, | |
"message": f"Successfully merged {merge_result['pages_merged']} Markdown files into PDF and uploaded to {pdf_url}", | |
"pages_merged": merge_result["pages_merged"] | |
} | |
except Exception as e: | |
return { | |
"success": False, | |
"error": f"Error in merging or uploading: {str(e)}", | |
"output_url": None, | |
"converted_path": None | |
} | |