#!/usr/bin/env python # app.py import io import os import re import base64 import glob import logging import random import shutil import time import zipfile import json import asyncio from pathlib import Path from datetime import datetime from typing import Any, List, Dict, Optional import pandas as pd import pytz import streamlit as st import aiofiles import requests from PIL import Image, ImageDraw, UnidentifiedImageError from reportlab.pdfgen import canvas from reportlab.lib.utils import ImageReader from reportlab.lib.pagesizes import letter import fitz # PyMuPDF from huggingface_hub import InferenceClient from huggingface_hub.utils import RepositoryNotFoundError, GatedRepoError # Optional AI/ML imports try: import torch from transformers import ( AutoModelForCausalLM, AutoTokenizer, AutoProcessor, AutoModelForVision2Seq, pipeline ) _transformers_available = True except ImportError: _transformers_available = False try: from diffusers import StableDiffusionPipeline _diffusers_available = True except ImportError: _diffusers_available = False # --- Page Configuration --- st.set_page_config( page_title="Vision & Layout Titans (HF) πŸš€πŸ–ΌοΈ", page_icon="πŸ€–", layout="wide", initial_sidebar_state="expanded", menu_items={ 'Get Help': 'https://huggingface.co/docs', 'About': "Combined App: Imageβ†’PDF Layout + HF AI Tools 🌌" } ) # --- Logging Setup --- logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) log_records: List[logging.LogRecord] = [] class LogCaptureHandler(logging.Handler): def emit(self, record): log_records.append(record) logger.addHandler(LogCaptureHandler()) # --- Constants & Defaults --- HF_TOKEN = os.getenv("HF_TOKEN") DEFAULT_PROVIDER = "hf-inference" FEATURED_MODELS_LIST = [ "meta-llama/Meta-Llama-3.1-8B-Instruct", "mistralai/Mistral-7B-Instruct-v0.3", "google/gemma-2-9b-it", "Qwen/Qwen2-7B-Instruct", "microsoft/Phi-3-mini-4k-instruct", "HuggingFaceH4/zephyr-7b-beta", "NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", "HuggingFaceTB/SmolLM-1.7B-Instruct" ] # --- Session State Initialization --- def _init_state(key: str, default: Any): if key not in st.session_state: st.session_state[key] = default for k, v in { 'layout_snapshots': [], 'layout_new_uploads': [], 'layout_last_capture': None, 'history': [], 'processing': {}, 'asset_checkboxes': {}, 'downloaded_pdfs': {}, 'unique_counter': 0, 'cam0_file': None, 'cam1_file': None, 'characters': [], 'char_form_reset_key': 0, 'gallery_size': 10, 'hf_inference_client': None, 'hf_provider': DEFAULT_PROVIDER, 'hf_custom_key': "", 'hf_selected_api_model': FEATURED_MODELS_LIST[0], 'hf_custom_api_model': "", 'local_models': {}, 'selected_local_model_path': None, 'gen_max_tokens': 512, 'gen_temperature': 0.7, 'gen_top_p': 0.95, 'gen_frequency_penalty': 0.0, 'gen_seed': -1 }.items(): _init_state(k, v) # --- Utility Functions --- def generate_filename(seq: str, ext: str = "png") -> str: ts = time.strftime('%Y%m%d_%H%M%S') safe = re.sub(r'[^\w\-]+', '_', seq) return f"{safe}_{ts}.{ext}" def clean_stem(fn: str) -> str: return os.path.splitext(os.path.basename(fn))[0].replace('-', ' ').replace('_', ' ').title() def get_download_link(path: str, mime: str, label: str = "Download") -> str: if not os.path.exists(path): return f"{label} (not found)" data = open(path,'rb').read() b64 = base64.b64encode(data).decode() return f'{label}' def get_gallery_files(types: List[str] = ['png','jpg','jpeg','pdf','md','txt']) -> List[str]: files = set() for ext in types: files.update(glob.glob(f"*.{ext}")) files.update(glob.glob(f"*.{ext.upper()}")) return sorted(files) # Delete with rerun def delete_asset(path: str): try: os.remove(path) st.session_state['asset_checkboxes'].pop(path, None) if path in st.session_state['layout_snapshots']: st.session_state['layout_snapshots'].remove(path) st.toast(f"Deleted {os.path.basename(path)}", icon="βœ…") except OSError as e: st.error(f"Delete failed: {e}") st.rerun() # Sidebar gallery updater def update_gallery(): st.sidebar.markdown("### Asset Gallery πŸ“ΈπŸ“–") files = get_gallery_files() if not files: st.sidebar.info("No assets.") return st.sidebar.caption(f"Found {len(files)} assets.") for f in files[:st.session_state['gallery_size']]: name = os.path.basename(f) ext = os.path.splitext(f)[1].lower() st.sidebar.markdown(f"**{name}**") with st.sidebar.expander("Preview", expanded=False): try: if ext in ['.png','.jpg','.jpeg']: st.image(Image.open(f), use_container_width=True) elif ext == '.pdf': doc = fitz.open(f) if doc.page_count: pix = doc[0].get_pixmap(matrix=fitz.Matrix(0.5,0.5)) img = Image.frombytes('RGB',[pix.width,pix.height],pix.samples) st.image(img, use_container_width=True) doc.close() else: txt = Path(f).read_text(errors='ignore') st.code(txt[:200]+'…') except: st.warning("Preview error") c1,c2,c3 = st.sidebar.columns(3) sel = st.session_state['asset_checkboxes'].get(f, False) c1.checkbox("Select", value=sel, key=f"cb_{f}") st.session_state['asset_checkboxes'][f] = st.session_state.get(f"cb_{f}") mime = {'png':'image/png','jpg':'image/jpeg','jpeg':'image/jpeg','pdf':'application/pdf','md':'text/markdown','txt':'text/plain'}.get(ext[1:], 'application/octet-stream') with open(f,'rb') as fp: c2.download_button("πŸ“₯", data=fp, file_name=name, mime=mime, key=f"dl_{f}") c3.button("πŸ—‘οΈ", key=f"del_{f}", on_click=delete_asset, args=(f,)) st.sidebar.markdown("---") # --- PDF Snapshot & Generation --- async def process_pdf_snapshot(path: str, mode: str='single', resF: float=2.0) -> List[str]: status = st.empty() status.text("Snapshot start...") out_files: List[str] = [] try: doc = fitz.open(path) mat = fitz.Matrix(resF,resF) cnt = {'single':1,'twopage':2,'allpages':len(doc)}.get(mode,1) for i in range(min(cnt,len(doc))): s = time.time() page = doc[i] pix = page.get_pixmap(matrix=mat) base = os.path.splitext(os.path.basename(path))[0] fname = generate_filename(f"{base}_pg{i+1}_{mode}","png") await asyncio.to_thread(pix.save, fname) out_files.append(fname) status.text(f"Saved {fname} ({int(time.time()-s)}s)") doc.close() status.success(f"Snapshot done: {len(out_files)} files") except Exception as e: status.error(f"Snapshot error: {e}") for f in out_files: if os.path.exists(f): os.remove(f) out_files = [] return out_files from reportlab.lib.pagesizes import letter def make_image_sized_pdf(sources: List[Any]) -> Optional[bytes]: # dedupe seen, uniq = set(), [] for s in sources: key = s if isinstance(s,str) else getattr(s,'name',None) if key and key not in seen: seen.add(key) uniq.append(s) if not uniq: st.warning("No images for PDF") return None buf = io.BytesIO() c = canvas.Canvas(buf, pagesize=letter) status = st.empty() for idx,s in enumerate(uniq,1): try: img = Image.open(s) if isinstance(s,str) else Image.open(s) w,h = img.size cap = 30 c.setPageSize((w,h+cap)) c.drawImage(ImageReader(img),0,cap,w,h,mask='auto') cap_txt = clean_stem(s if isinstance(s,str) else s.name) c.setFont('Helvetica',12) c.drawCentredString(w/2,cap/2,cap_txt) c.setFont('Helvetica',8) c.drawRightString(w-10,10,str(idx)) c.showPage() status.text(f"Page {idx}/{len(uniq)} added") except Exception as e: status.error(f"Error page {idx}: {e}") c.save() buf.seek(0) return buf.getvalue() # --- HF Inference Client --- def get_hf_client() -> Optional[InferenceClient]: provider = st.session_state['hf_provider'] token = st.session_state['hf_custom_key'].strip() or HF_TOKEN if provider!='hf-inference' and not token: st.error(f"Provider {provider} needs token") return None client = st.session_state['hf_inference_client'] if not client: st.session_state['hf_inference_client'] = InferenceClient(token=token, provider=provider) return st.session_state['hf_inference_client'] # --- HF Processing --- def process_text_hf(text: str, prompt: str, use_api: bool) -> str: stp = st.empty(); stp.text("Processing...") msgs = [{"role":"system","content":"You are an assistant."}, {"role":"user","content":f"{prompt}\n\n{text}"}] out = "" if use_api: client = get_hf_client() if not client: return "Client error" model = st.session_state['hf_custom_api_model'] or st.session_state['hf_selected_api_model'] try: resp = client.chat_completion( model=model, messages=msgs, max_tokens=st.session_state['gen_max_tokens'], temperature=st.session ]}]}