Spaces:
Paused
Paused
import os | |
import shutil | |
import fitz # PyMuPDF | |
import logging | |
import asyncio | |
from uuid import uuid4 | |
from fastapi import FastAPI, File, UploadFile, Form | |
from telegram import Bot | |
from telegram.constants import ParseMode | |
from starlette.middleware.cors import CORSMiddleware | |
# Logging Setup | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# Telegram Bot Setup | |
TELEGRAM_BOT_TOKEN = "8046554458:AAGxVuq_xrTXihLmxE_vcopsdg11zPvv1_I" | |
TELEGRAM_CHAT_ID = "5173085859" | |
bot = Bot(token=TELEGRAM_BOT_TOKEN) | |
# Directories Setup | |
UPLOAD_DIR = "uploads" | |
IMAGES_DIR = "images" | |
os.makedirs(UPLOAD_DIR, exist_ok=True) | |
os.makedirs(IMAGES_DIR, exist_ok=True) | |
# FastAPI Setup | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
def check_status(): | |
return {"status": "API is working"} | |
# ----- Original PDF Upload Endpoint (unchanged) ----- | |
async def upload_pdf(file: UploadFile = File(...)): | |
""" Upload PDF, convert to images, send to Telegram & cleanup """ | |
file_id = str(uuid4()) | |
file_path = os.path.join(UPLOAD_DIR, f"{file_id}.pdf") | |
try: | |
# Save uploaded PDF | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
# Convert PDF to images & send to Telegram | |
image_links = await convert_pdf_to_images(file_path) | |
return {"status": "Images sent to Telegram", "images": image_links} | |
except Exception as e: | |
logger.error(f"Error processing PDF: {e}") | |
# Cleanup uploaded file if exists | |
if os.path.exists(file_path): | |
os.remove(file_path) | |
return {"status": "error", "message": str(e)} | |
async def convert_pdf_to_images(pdf_path): | |
""" Converts PDF pages to images & sends them to Telegram """ | |
# Use a thread executor for synchronous PDF operations | |
loop = asyncio.get_running_loop() | |
doc = await loop.run_in_executor(None, fitz.open, pdf_path) | |
image_links = [] | |
for i in range(len(doc)): | |
page = await loop.run_in_executor(None, doc.load_page, i) | |
image_path = os.path.join(IMAGES_DIR, f"{uuid4()}.png") | |
# Get pixmap and save image | |
pix = await loop.run_in_executor(None, page.get_pixmap) | |
await loop.run_in_executor(None, pix.save, image_path) | |
# Send image to Telegram with page number caption | |
caption = f"Page {i+1}" | |
telegram_url = await send_image_to_telegram(image_path, caption) | |
image_links.append(str(telegram_url)) | |
return image_links | |
# ------------------------------------------------------- | |
# ----- New Endpoint: Image Upload with Caption ----- | |
async def upload_image( | |
file: UploadFile = File(...), | |
caption: str = Form("Uploaded image") | |
): | |
""" | |
Upload an image along with a caption and send it to Telegram. | |
""" | |
file_id = str(uuid4()) | |
extension = file.filename.split(".")[-1] | |
image_path = os.path.join(IMAGES_DIR, f"{file_id}.{extension}") | |
try: | |
with open(image_path, "wb") as buffer: | |
shutil.copyfileobj(file.file, buffer) | |
telegram_response = await send_image_to_telegram(image_path, caption) | |
return {"status": "Image sent to Telegram", "telegram_response": str(telegram_response)} | |
except Exception as e: | |
logger.error(f"Error processing image: {e}") | |
if os.path.exists(image_path): | |
os.remove(image_path) | |
return {"status": "error", "message": str(e)} | |
# ------------------------------------------------------- | |
def send_photo_sync(chat_id, image_file, caption): | |
""" Synchronous helper to send a photo via Telegram """ | |
return bot.send_photo(chat_id=chat_id, photo=image_file, caption=caption, parse_mode=ParseMode.HTML) | |
async def send_image_to_telegram(image_path, caption): | |
""" Send an image to Telegram asynchronously """ | |
loop = asyncio.get_running_loop() | |
with open(image_path, "rb") as image_file: | |
response = await loop.run_in_executor(None, send_photo_sync, TELEGRAM_CHAT_ID, image_file, caption) | |
return response | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860, log_level="info", reload=True) | |