#!/usr/bin/env python3 import os import glob import time import base64 import pandas as pd import torch from transformers import AutoModelForCausalLM, AutoTokenizer from diffusers import StableDiffusionPipeline import fitz from PIL import Image import logging import asyncio import aiofiles from io import BytesIO from dataclasses import dataclass from typing import Optional import gradio as gr logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) log_records = [] class LogCaptureHandler(logging.Handler): def emit(self, record): log_records.append(record) logger.addHandler(LogCaptureHandler()) @dataclass class ModelConfig: name: str base_model: str size: str domain: Optional[str] = None model_type: str = "causal_lm" @property def model_path(self): return f"models/{self.name}" @dataclass class DiffusionConfig: name: str base_model: str size: str domain: Optional[str] = None @property def model_path(self): return f"diffusion_models/{self.name}" class ModelBuilder: def __init__(self): self.config = None self.model = None self.tokenizer = None def load_model(self, model_path: str, config: Optional[ModelConfig] = None): self.model = AutoModelForCausalLM.from_pretrained(model_path) self.tokenizer = AutoTokenizer.from_pretrained(model_path) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token if config: self.config = config self.model.to("cuda" if torch.cuda.is_available() else "cpu") return self def save_model(self, path: str): os.makedirs(os.path.dirname(path), exist_ok=True) self.model.save_pretrained(path) self.tokenizer.save_pretrained(path) class DiffusionBuilder: def __init__(self): self.config = None self.pipeline = None def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): self.pipeline = StableDiffusionPipeline.from_pretrained(model_path, torch_dtype=torch.float32).to("cpu") if config: self.config = config return self def save_model(self, path: str): os.makedirs(os.path.dirname(path), exist_ok=True) self.pipeline.save_pretrained(path) def generate(self, prompt: str): return self.pipeline(prompt, num_inference_steps=20).images[0] def generate_filename(sequence, ext): timestamp = time.strftime("%d%m%Y%H%M%S") return f"{sequence}_{timestamp}.{ext}" def get_gallery_files(file_types): return sorted(list(set([f for ext in file_types for f in glob.glob(f"*.{ext}")]))) # Deduplicate files async def process_image_gen(prompt, output_file, builder): if builder and isinstance(builder, DiffusionBuilder) and builder.pipeline: pipeline = builder.pipeline else: pipeline = StableDiffusionPipeline.from_pretrained("OFA-Sys/small-stable-diffusion-v0", torch_dtype=torch.float32).to("cpu") gen_image = pipeline(prompt, num_inference_steps=20).images[0] gen_image.save(output_file) return gen_image # Upload Functions def upload_images(files, history, selected_files): if not files: return "No files uploaded", history, selected_files uploaded = [] for file in files: ext = file.name.split('.')[-1].lower() if ext in ["jpg", "png"]: output_path = f"img_{int(time.time())}_{os.path.basename(file.name)}" with open(output_path, "wb") as f: f.write(file.read()) uploaded.append(output_path) history.append(f"Uploaded Image: {output_path}") selected_files[output_path] = False return f"Uploaded {len(uploaded)} images", history, selected_files def upload_videos(files, history, selected_files): if not files: return "No files uploaded", history, selected_files uploaded = [] for file in files: ext = file.name.split('.')[-1].lower() if ext == "mp4": output_path = f"vid_{int(time.time())}_{os.path.basename(file.name)}" with open(output_path, "wb") as f: f.write(file.read()) uploaded.append(output_path) history.append(f"Uploaded Video: {output_path}") selected_files[output_path] = False return f"Uploaded {len(uploaded)} videos", history, selected_files def upload_documents(files, history, selected_files): if not files: return "No files uploaded", history, selected_files uploaded = [] for file in files: ext = file.name.split('.')[-1].lower() if ext in ["md", "pdf", "docx"]: output_path = f"doc_{int(time.time())}_{os.path.basename(file.name)}" with open(output_path, "wb") as f: f.write(file.read()) uploaded.append(output_path) history.append(f"Uploaded Document: {output_path}") selected_files[output_path] = False return f"Uploaded {len(uploaded)} documents", history, selected_files def upload_datasets(files, history, selected_files): if not files: return "No files uploaded", history, selected_files uploaded = [] for file in files: ext = file.name.split('.')[-1].lower() if ext in ["csv", "xlsx"]: output_path = f"data_{int(time.time())}_{os.path.basename(file.name)}" with open(output_path, "wb") as f: f.write(file.read()) uploaded.append(output_path) history.append(f"Uploaded Dataset: {output_path}") selected_files[output_path] = False return f"Uploaded {len(uploaded)} datasets", history, selected_files def upload_links(links_title, links_url, history, selected_files): if not links_title or not links_url: return "No links provided", history, selected_files links = list(zip(links_title.split('\n'), links_url.split('\n'))) uploaded = [] for title, url in links: if title and url: link_entry = f"[{title}]({url})" uploaded.append(link_entry) history.append(f"Added Link: {link_entry}") selected_files[link_entry] = False return f"Added {len(uploaded)} links", history, selected_files # Gallery Update def update_galleries(history, selected_files): galleries = { "images": get_gallery_files(["jpg", "png"]), "videos": get_gallery_files(["mp4"]), "documents": get_gallery_files(["md", "pdf", "docx"]), "datasets": get_gallery_files(["csv", "xlsx"]), "links": [f for f in selected_files.keys() if f.startswith('[') and '](' in f and f.endswith(')')] } gallery_outputs = { "images": [(Image.open(f), os.path.basename(f)) for f in galleries["images"]], "videos": [(f, os.path.basename(f)) for f in galleries["videos"]], # File path as preview "documents": [(Image.frombytes("RGB", fitz.open(f)[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)).size, fitz.open(f)[0].get_pixmap(matrix=fitz.Matrix(0.5, 0.5)).samples) if f.endswith('.pdf') else f, os.path.basename(f)) for f in galleries["documents"]], "datasets": [(f, os.path.basename(f)) for f in galleries["datasets"]], "links": [(f, f.split(']')[0][1:]) for f in galleries["links"]] } history.append(f"Updated galleries: {sum(len(g) for g in galleries.values())} files") return gallery_outputs, history, selected_files # Sidebar Update def update_sidebar(history, selected_files): all_files = get_gallery_files(["jpg", "png", "mp4", "md", "pdf", "docx", "csv", "xlsx"]) + [f for f in selected_files.keys() if f.startswith('[') and '](' in f and f.endswith(')')] file_list = [gr.File(label=os.path.basename(f) if not f.startswith('[') else f.split(']')[0][1:], value=f) for f in all_files] return file_list, history # Operations def toggle_selection(file_list, selected_files): for file in file_list: selected_files[file] = not selected_files.get(file, False) return selected_files def image_gen(prompt, builder, history, selected_files): selected = [f for f, sel in selected_files.items() if sel and f.endswith(('.jpg', '.png'))] if not selected: return "No images selected", None, history, selected_files output_file = generate_filename("gen_output", "png") gen_image = asyncio.run(process_image_gen(prompt, output_file, builder)) history.append(f"Image Gen: {prompt} -> {output_file}") selected_files[output_file] = True return f"Image saved to {output_file}", gen_image, history, selected_files # Gradio UI with gr.Blocks(title="AI Vision & SFT Titans 🚀") as demo: gr.Markdown("# AI Vision & SFT Titans 🚀") history = gr.State(value=[]) builder = gr.State(value=None) selected_files = gr.State(value={}) with gr.Row(): with gr.Column(scale=1): gr.Markdown("## 📁 Files") sidebar_files = gr.Files(label="Downloads", height=300) with gr.Column(scale=3): with gr.Row(): gr.Markdown("## 🛠️ Toolbar") select_btn = gr.Button("✅ Select") gen_btn = gr.Button("🎨 Generate") with gr.Tabs(): with gr.TabItem("📤 Upload"): with gr.Row(): img_upload = gr.File(label="🖼️ Images (jpg/png)", file_count="multiple", accept=["image/jpeg", "image/png"]) vid_upload = gr.File(label="🎥 Videos (mp4)", file_count="multiple", accept=["video/mp4"]) with gr.Row(): doc_upload = gr.File(label="📜 Docs (md/pdf/docx)", file_count="multiple", accept=["text/markdown", "application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"]) data_upload = gr.File(label="📊 Data (csv/xlsx)", file_count="multiple", accept=["text/csv", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"]) with gr.Row(): links_title = gr.Textbox(label="🔗 Link Titles", lines=3) links_url = gr.Textbox(label="🔗 Link URLs", lines=3) upload_status = gr.Textbox(label="Status") gr.Button("📤 Upload Images").click(upload_images, inputs=[img_upload, history, selected_files], outputs=[upload_status, history, selected_files]).then(update_galleries, inputs=[history, selected_files], outputs=[gr.Gallery(), gr.Gallery(), gr.Gallery(), gr.Gallery(), gr.Gallery(), history, selected_files]).then(update_sidebar, inputs=[history, selected_files], outputs=[sidebar_files, history]) gr.Button("📤 Upload Videos").click(upload_videos, inputs=[vid_upload, history, selected_files], outputs=[upload_status, history, selected_files]).then(update_galleries, inputs=[history, selected_files], outputs=[gr.Gallery(), gr.Gallery(), gr.Gallery(), gr.Gallery(), gr.Gallery(), history, selected_files]).then(update_sidebar, inputs=[history, selected_files], outputs=[sidebar_files, history]) gr.Button("📤 Upload Docs").click(upload_documents, inputs=[doc_upload, history, selected_files], outputs=[upload_status, history, selected_files]).then(update_galleries, inputs=[history, selected_files], outputs=[gr.Gallery(), gr.Gallery(), gr.Gallery(), gr.Gallery(), gr.Gallery(), history, selected_files]).then(update_sidebar, inputs=[history, selected_files], outputs=[sidebar_files, history]) gr.Button("📤 Upload Data").click(upload_datasets, inputs=[data_upload, history, selected_files], outputs=[upload_status, history, selected_files]).then(update_galleries, inputs=[history, selected_files], outputs=[gr.Gallery(), gr.Gallery(), gr.Gallery(), gr.Gallery(), gr.Gallery(), history, selected_files]).then(update_sidebar, inputs=[history, selected_files], outputs=[sidebar_files, history]) gr.Button("📤 Upload Links").click(upload_links, inputs=[links_title, links_url, history, selected_files], outputs=[upload_status, history, selected_files]).then(update_galleries, inputs=[history, selected_files], outputs=[gr.Gallery(), gr.Gallery(), gr.Gallery(), gr.Gallery(), gr.Gallery(), history, selected_files]).then(update_sidebar, inputs=[history, selected_files], outputs=[sidebar_files, history]) with gr.TabItem("🖼️ Gallery"): img_gallery = gr.Gallery(label="🖼️ Images (jpg/png)", columns=4, height="auto") vid_gallery = gr.Gallery(label="🎥 Videos (mp4)", columns=4, height="auto") doc_gallery = gr.Gallery(label="📜 Docs (md/pdf/docx)", columns=4, height="auto") data_gallery = gr.Gallery(label="📊 Data (csv/xlsx)", columns=4, height="auto") link_gallery = gr.Gallery(label="🔗 Links", columns=4, height="auto") gr.Button("🔄 Refresh").click(update_galleries, inputs=[history, selected_files], outputs=[img_gallery, vid_gallery, doc_gallery, data_gallery, link_gallery, history, selected_files]).then(update_sidebar, inputs=[history, selected_files], outputs=[sidebar_files, history]) with gr.TabItem("🔍 Operations"): prompt = gr.Textbox(label="Image Gen Prompt", value="Generate a neon version") op_status = gr.Textbox(label="Status") op_output = gr.Image(label="Output") select_files = gr.Dropdown(choices=list(selected_files.value.keys()), multiselect=True, label="Select Files") select_btn.click(toggle_selection, inputs=[select_files, selected_files], outputs=[selected_files]).then(update_sidebar, inputs=[history, selected_files], outputs=[sidebar_files, history]) gen_btn.click(image_gen, inputs=[prompt, builder, history, selected_files], outputs=[op_status, op_output, history, selected_files]).then(update_galleries, inputs=[history, selected_files], outputs=[img_gallery, vid_gallery, doc_gallery, data_gallery, link_gallery, history, selected_files]).then(update_sidebar, inputs=[history, selected_files], outputs=[sidebar_files, history]) demo.launch()