#!/usr/bin/env python3 import os import glob import base64 import time import shutil import pandas as pd import torch import torch.nn as nn import torch.nn.functional as F from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel from diffusers import StableDiffusionPipeline from torch.utils.data import Dataset, DataLoader import csv import fitz import requests from PIL import Image import cv2 import numpy as np import logging import asyncio import aiofiles from io import BytesIO from dataclasses import dataclass from typing import Optional, Tuple import zipfile import math import random import re import gradio as gr logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") logger = logging.getLogger(__name__) log_records = [] class LogCaptureHandler(logging.Handler): def emit(self, record): log_records.append(record) logger.addHandler(LogCaptureHandler()) # Data Classes and Models (unchanged from your original code) @dataclass class ModelConfig: name: str base_model: str size: str domain: Optional[str] = None model_type: str = "causal_lm" @property def model_path(self): return f"models/{self.name}" @dataclass class DiffusionConfig: name: str base_model: str size: str domain: Optional[str] = None @property def model_path(self): return f"diffusion_models/{self.name}" class SFTDataset(Dataset): def __init__(self, data, tokenizer, max_length=128): self.data = data self.tokenizer = tokenizer self.max_length = max_length def __len__(self): return len(self.data) def __getitem__(self, idx): prompt = self.data[idx]["prompt"] response = self.data[idx]["response"] full_text = f"{prompt} {response}" full_encoding = self.tokenizer(full_text, max_length=self.max_length, padding="max_length", truncation=True, return_tensors="pt") prompt_encoding = self.tokenizer(prompt, max_length=self.max_length, padding=False, truncation=True, return_tensors="pt") input_ids = full_encoding["input_ids"].squeeze() attention_mask = full_encoding["attention_mask"].squeeze() labels = input_ids.clone() prompt_len = prompt_encoding["input_ids"].shape[1] if prompt_len < self.max_length: labels[:prompt_len] = -100 return {"input_ids": input_ids, "attention_mask": attention_mask, "labels": labels} class TinyUNet(nn.Module): def __init__(self, in_channels=3, out_channels=3): super(TinyUNet, self).__init__() self.down1 = nn.Conv2d(in_channels, 32, 3, padding=1) self.down2 = nn.Conv2d(32, 64, 3, padding=1, stride=2) self.mid = nn.Conv2d(64, 128, 3, padding=1) self.up1 = nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1) self.up2 = nn.Conv2d(64 + 32, 32, 3, padding=1) self.out = nn.Conv2d(32, out_channels, 3, padding=1) self.time_embed = nn.Linear(1, 64) def forward(self, x, t): t_embed = F.relu(self.time_embed(t.unsqueeze(-1))) t_embed = t_embed.view(t_embed.size(0), t_embed.size(1), 1, 1) x1 = F.relu(self.down1(x)) x2 = F.relu(self.down2(x1)) x_mid = F.relu(self.mid(x2)) + t_embed x_up1 = F.relu(self.up1(x_mid)) x_up2 = F.relu(self.up2(torch.cat([x_up1, x1], dim=1))) return self.out(x_up2) class TinyDiffusion: def __init__(self, model, timesteps=100): self.model = model self.timesteps = timesteps self.beta = torch.linspace(0.0001, 0.02, timesteps) self.alpha = 1 - self.beta self.alpha_cumprod = torch.cumprod(self.alpha, dim=0) def train(self, images, epochs=50): dataset = TinyDiffusionDataset(images) dataloader = DataLoader(dataset, batch_size=1, shuffle=True) optimizer = torch.optim.Adam(self.model.parameters(), lr=1e-4) device = torch.device("cpu") self.model.to(device) for epoch in range(epochs): total_loss = 0 for x in dataloader: x = x.to(device) t = torch.randint(0, self.timesteps, (x.size(0),), device=device).float() noise = torch.randn_like(x) alpha_t = self.alpha_cumprod[t.long()].view(-1, 1, 1, 1) x_noisy = torch.sqrt(alpha_t) * x + torch.sqrt(1 - alpha_t) * noise pred_noise = self.model(x_noisy, t) loss = F.mse_loss(pred_noise, noise) optimizer.zero_grad() loss.backward() optimizer.step() total_loss += loss.item() logger.info(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(dataloader):.4f}") return self def generate(self, size=(64, 64), steps=100): device = torch.device("cpu") x = torch.randn(1, 3, size[0], size[1], device=device) for t in reversed(range(steps)): t_tensor = torch.full((1,), t, device=device, dtype=torch.float32) alpha_t = self.alpha_cumprod[t].view(-1, 1, 1, 1) pred_noise = self.model(x, t_tensor) x = (x - (1 - self.alpha[t]) / torch.sqrt(1 - alpha_t) * pred_noise) / torch.sqrt(self.alpha[t]) if t > 0: x += torch.sqrt(self.beta[t]) * torch.randn_like(x) x = torch.clamp(x * 255, 0, 255).byte() return Image.fromarray(x.squeeze(0).permute(1, 2, 0).cpu().numpy()) class TinyDiffusionDataset(Dataset): def __init__(self, images): self.images = [torch.tensor(np.array(img.convert("RGB")).transpose(2, 0, 1), dtype=torch.float32) / 255.0 for img in images] def __len__(self): return len(self.images) def __getitem__(self, idx): return self.images[idx] class ModelBuilder: def __init__(self): self.config = None self.model = None self.tokenizer = None self.sft_data = None def load_model(self, model_path: str, config: Optional[ModelConfig] = None): self.model = AutoModelForCausalLM.from_pretrained(model_path) self.tokenizer = AutoTokenizer.from_pretrained(model_path) if self.tokenizer.pad_token is None: self.tokenizer.pad_token = self.tokenizer.eos_token if config: self.config = config self.model.to("cuda" if torch.cuda.is_available() else "cpu") return self def fine_tune_sft(self, csv_path: str, epochs: int = 3, batch_size: int = 4): self.sft_data = [] with open(csv_path, "r") as f: reader = csv.DictReader(f) for row in reader: self.sft_data.append({"prompt": row["prompt"], "response": row["response"]}) dataset = SFTDataset(self.sft_data, self.tokenizer) dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True) optimizer = torch.optim.AdamW(self.model.parameters(), lr=2e-5) self.model.train() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") self.model.to(device) for epoch in range(epochs): total_loss = 0 for batch in dataloader: optimizer.zero_grad() input_ids = batch["input_ids"].to(device) attention_mask = batch["attention_mask"].to(device) labels = batch["labels"].to(device) outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels) loss = outputs.loss loss.backward() optimizer.step() total_loss += loss.item() logger.info(f"Epoch {epoch + 1} completed. Average loss: {total_loss / len(dataloader):.4f}") return self def save_model(self, path: str): os.makedirs(os.path.dirname(path), exist_ok=True) self.model.save_pretrained(path) self.tokenizer.save_pretrained(path) def evaluate(self, prompt: str): self.model.eval() with torch.no_grad(): inputs = self.tokenizer(prompt, return_tensors="pt", max_length=128, truncation=True).to(self.model.device) outputs = self.model.generate(**inputs, max_new_tokens=50, do_sample=True, top_p=0.95, temperature=0.7) return self.tokenizer.decode(outputs[0], skip_special_tokens=True) class DiffusionBuilder: def __init__(self): self.config = None self.pipeline = None def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): self.pipeline = StableDiffusionPipeline.from_pretrained(model_path, torch_dtype=torch.float32).to("cpu") if config: self.config = config return self def generate(self, prompt: str): return self.pipeline(prompt, num_inference_steps=20).images[0] # Utility Functions def generate_filename(sequence, ext="png"): timestamp = time.strftime("%d%m%Y%HM%S") return f"{sequence}_{timestamp}.{ext}" def pdf_url_to_filename(url): safe_name = re.sub(r'[<>:"/\\|?*]', '_', url) return f"{safe_name}.pdf" def get_gallery_files(file_types=["png", "pdf"]): return sorted(list(set([f for ext in file_types for f in glob.glob(f"*.{ext}")]))) # Deduplicate files def download_pdf(url, output_path): try: response = requests.get(url, stream=True, timeout=10) if response.status_code == 200: with open(output_path, "wb") as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return True except requests.RequestException as e: logger.error(f"Failed to download {url}: {e}") return False async def process_pdf_snapshot(pdf_path, mode="single"): doc = fitz.open(pdf_path) output_files = [] if mode == "single": page = doc[0] pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) output_file = generate_filename("single", "png") pix.save(output_file) output_files.append(output_file) doc.close() return output_files # Gradio Interface Functions def update_gallery(history): all_files = get_gallery_files() gallery_content = "\n".join([f"- {f}" for f in all_files[:5]]) history.append(f"Gallery updated: {len(all_files)} files") return gallery_content, history def camera_snap(image, history): if image is not None: filename = generate_filename("cam") image.save(filename) history.append(f"Snapshot saved: {filename}") return f"Image saved as {filename}", history return "No image captured", history def download_pdfs(urls, history): urls = urls.strip().split("\n") downloaded = [] for url in urls: if url: output_path = pdf_url_to_filename(url) if download_pdf(url, output_path): downloaded.append(output_path) history.append(f"Downloaded PDF: {output_path}") return f"Downloaded {len(downloaded)} PDFs", history def build_model(model_type, base_model, model_name, domain, history): config = (ModelConfig if model_type == "Causal LM" else DiffusionConfig)(name=model_name, base_model=base_model, size="small", domain=domain) builder = ModelBuilder() if model_type == "Causal LM" else DiffusionBuilder() builder.load_model(base_model, config) builder.save_model(config.model_path) history.append(f"Built {model_type} model: {model_name}") return builder, f"Model saved to {config.model_path}", history def test_model(builder, prompt, history): if builder is None: return "No model loaded", history if isinstance(builder, ModelBuilder): result = builder.evaluate(prompt) history.append(f"Tested Causal LM: {prompt} -> {result}") return result, history elif isinstance(builder, DiffusionBuilder): image = builder.generate(prompt) output_file = generate_filename("diffusion_test") image.save(output_file) history.append(f"Tested Diffusion: {prompt} -> {output_file}") return output_file, history # Gradio UI with gr.Blocks(title="AI Vision & SFT Titans ๐Ÿš€") as demo: gr.Markdown("# AI Vision & SFT Titans ๐Ÿš€") history = gr.State(value=[]) builder = gr.State(value=None) with gr.Row(): with gr.Column(scale=1): gr.Markdown("## Captured Files ๐Ÿ“œ") gallery_output = gr.Textbox(label="Gallery", lines=5) gr.Button("Update Gallery").click(update_gallery, inputs=[history], outputs=[gallery_output, history]) with gr.Column(scale=3): with gr.Tabs(): with gr.TabItem("Camera Snap ๐Ÿ“ท"): camera_input = gr.Image(type="pil", label="Take a Picture") snap_output = gr.Textbox(label="Status") gr.Button("Capture").click(camera_snap, inputs=[camera_input, history], outputs=[snap_output, history]) with gr.TabItem("Download PDFs ๐Ÿ“ฅ"): url_input = gr.Textbox(label="Enter PDF URLs (one per line)", lines=5) pdf_output = gr.Textbox(label="Status") gr.Button("Download").click(download_pdfs, inputs=[url_input, history], outputs=[pdf_output, history]) with gr.TabItem("Build Titan ๐ŸŒฑ"): model_type = gr.Dropdown(["Causal LM", "Diffusion"], label="Model Type") base_model = gr.Dropdown( choices=["HuggingFaceTB/SmolLM-135M", "Qwen/Qwen1.5-0.5B-Chat"] if model_type.value == "Causal LM" else ["OFA-Sys/small-stable-diffusion-v0", "stabilityai/stable-diffusion-2-base"], label="Base Model" ) model_name = gr.Textbox(label="Model Name", value=f"tiny-titan-{int(time.time())}") domain = gr.Textbox(label="Domain", value="general") build_output = gr.Textbox(label="Status") gr.Button("Build").click(build_model, inputs=[model_type, base_model, model_name, domain, history], outputs=[builder, build_output, history]) with gr.TabItem("Test Titan ๐Ÿงช"): test_prompt = gr.Textbox(label="Test Prompt", value="What is AI?") test_output = gr.Textbox(label="Result") gr.Button("Test").click(test_model, inputs=[builder, test_prompt, history], outputs=[test_output, history]) with gr.Row(): gr.Markdown("## History ๐Ÿ“œ") history_output = gr.Textbox(value="\n".join(history.value), label="History", lines=5, interactive=False) demo.launch()