gguf-my-repo / app.py
Oleg Shulyakov
Fix train_data_file usage
dec2486
import os
import subprocess
import signal
import tempfile
from pathlib import Path
from textwrap import dedent
from typing import Optional, Tuple, List, Union
from dataclasses import dataclass, field
os.environ["GRADIO_ANALYTICS_ENABLED"] = "False"
import gradio as gr
from huggingface_hub import HfApi, ModelCard, whoami
from gradio_huggingfacehub_search import HuggingfaceHubSearch
from apscheduler.schedulers.background import BackgroundScheduler
@dataclass
class QuantizationConfig:
"""Configuration for model quantization."""
method: str
use_imatrix: bool = False
imatrix_method: str = "IQ4_NL"
train_data: str = ""
quant_embedding: bool = False
embedding_tensor_method: str = "Q8_0"
leave_output: bool = False
quant_output: bool = False
output_tensor_method: str = "Q8_0"
# Generated values - These will be set during processing
fp16_model: str = field(default="", init=False)
quantized_gguf: str = field(default="", init=False)
imatrix_file: str = field(default="", init=False)
@dataclass
class SplitConfig:
"""Configuration for model splitting."""
enabled: bool = False
max_tensors: int = 256
max_size: Optional[str] = None
@dataclass
class OutputConfig:
"""Configuration for output settings."""
private_repo: bool = False
repo_name: str = ""
filename: str = ""
@dataclass
class ModelProcessingConfig:
"""Configuration for the entire model processing pipeline."""
token: str
model_id: str
model_name: str
outdir: str
quant_config: QuantizationConfig
split_config: SplitConfig
output_config: OutputConfig
# Generated values - These will be set during processing
new_repo_url: str = field(default="", init=False)
new_repo_id: str = field(default="", init=False)
class GGUFConverterError(Exception):
"""Custom exception for GGUF conversion errors."""
pass
class HuggingFaceModelProcessor:
"""Handles the processing of Hugging Face models to GGUF format."""
ERROR_LOGIN = "You must be logged in to use GGUF-my-repo."
DOWNLOAD_FOLDER = "./downloads"
OUTPUT_FOLDER = "./outputs"
CALIBRATION_FILE = "calibration_data_v5_rc.txt"
QUANTIZE_TIMEOUT=86400
HF_TO_GGUF_TIMEOUT=3600
IMATRIX_TIMEOUT=86400
SPLIT_TIMEOUT=3600
KILL_TIMEOUT=5
def __init__(self):
self.SPACE_ID = os.environ.get("SPACE_ID", "")
self.SPACE_URL = f"https://{self.SPACE_ID.replace('/', '-')}.hf.space/" if self.SPACE_ID else "http://localhost:7860/"
self.HF_TOKEN = os.environ.get("HF_TOKEN")
self.RUN_LOCALLY = os.environ.get("RUN_LOCALLY")
# Create necessary folders
self._create_folder(self.DOWNLOAD_FOLDER)
self._create_folder(self.OUTPUT_FOLDER)
def _create_folder(self, folder_name: str) -> str:
"""Create a folder if it doesn't exist."""
if not os.path.exists(folder_name):
print(f"Creating folder: {folder_name}")
os.makedirs(folder_name)
return folder_name
def _validate_token(self, oauth_token: Optional[gr.OAuthToken]) -> str:
"""Validate the OAuth token and return the token string."""
if oauth_token is None or oauth_token.token is None:
raise GGUFConverterError(self.ERROR_LOGIN)
try:
whoami(oauth_token.token)
return oauth_token.token
except Exception as e:
raise GGUFConverterError(self.ERROR_LOGIN)
def _escape_html(self, s: str) -> str:
"""Escape HTML characters for safe display."""
replacements = [
("&", "&"),
("<", "&lt;"),
(">", "&gt;"),
('"', "&quot;"),
("\n", "<br/>")
]
for old, new in replacements:
s = s.replace(old, new)
return s
def _get_model_creator(self, model_id: str) -> str:
"""Extract model creator from model ID."""
return model_id.split('/')[0]
def _get_model_name(self, model_id: str) -> str:
"""Extract model name from model ID."""
return model_id.split('/')[-1]
def _upload_file(self, processing_config: ModelProcessingConfig, path_or_fileobj: str, path_in_repo: str) -> None:
"""Upload a file to Hugging Face repository."""
if self.RUN_LOCALLY == "1":
print("Skipping upload...")
return
api = HfApi(token=processing_config.token)
api.upload_file(
path_or_fileobj=path_or_fileobj,
path_in_repo=path_in_repo,
repo_id=processing_config.new_repo_id,
)
def _generate_importance_matrix(self, quant_config: QuantizationConfig) -> None:
"""Generate importance matrix for quantization."""
if not os.path.isfile(quant_config.fp16_model):
raise GGUFConverterError(f"Model file not found: {quant_config.fp16_model}")
if quant_config.train_data:
train_data_path = quant_config.train_data
else:
train_data_path = self.CALIBRATION_FILE
if not os.path.isfile(train_data_path):
raise GGUFConverterError(f"Training data file not found: {train_data_path}")
print(f"Training data file path: {train_data_path}")
print("Running imatrix command...")
imatrix_command = [
"llama-imatrix",
"-m", quant_config.fp16_model,
"-f", train_data_path,
"-ngl", "99",
"--output-frequency", "10",
"-o", quant_config.imatrix_file,
]
process = subprocess.Popen(imatrix_command, shell=False, stderr=subprocess.STDOUT)
try:
process.wait(timeout=self.IMATRIX_TIMEOUT)
except subprocess.TimeoutExpired:
print("Imatrix computation timed out. Sending SIGINT to allow graceful termination...")
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=self.KILL_TIMEOUT)
except subprocess.TimeoutExpired:
print("Imatrix proc still didn't term. Forcefully terminating process...")
process.kill()
raise GGUFConverterError("Error generating imatrix: Operation timed out.")
if process.returncode != 0:
raise GGUFConverterError(f"Error generating imatrix: code={process.returncode}.")
print(f"Importance matrix generation completed: {os.path.abspath(quant_config.imatrix_file)}")
def _split_and_upload_model(self, processing_config: ModelProcessingConfig) -> None:
"""Split large model files and upload shards."""
quant_config = processing_config.quant_config
split_config = processing_config.split_config
print(f"Model path: {quant_config.quantized_gguf}")
print(f"Output dir: {processing_config.outdir}")
split_cmd = ["llama-gguf-split", "--split"]
if split_config.max_size:
split_cmd.extend(["--split-max-size", split_config.max_size])
else:
split_cmd.extend(["--split-max-tensors", str(split_config.max_tensors)])
model_path_prefix = '.'.join(quant_config.quantized_gguf.split('.')[:-1])
split_cmd.extend([quant_config.quantized_gguf, model_path_prefix])
print(f"Split command: {split_cmd}")
process = subprocess.Popen(split_cmd, shell=False, stderr=subprocess.STDOUT)
try:
process.wait(timeout=self.SPLIT_TIMEOUT)
except subprocess.TimeoutExpired:
print("Splitting timed out. Sending SIGINT to allow graceful termination...")
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=self.KILL_TIMEOUT)
except subprocess.TimeoutExpired:
print("Splitting timed out. Killing process...")
process.kill()
raise GGUFConverterError("Error splitting the model: Operation timed out.")
if process.returncode != 0:
raise GGUFConverterError(f"Error splitting the model: code={process.returncode}")
print("Model split successfully!")
# Remove original model file
if os.path.exists(quant_config.quantized_gguf):
os.remove(quant_config.quantized_gguf)
model_file_prefix = model_path_prefix.split('/')[-1]
print(f"Model file name prefix: {model_file_prefix}")
sharded_model_files = [
f for f in os.listdir(processing_config.outdir)
if f.startswith(model_file_prefix) and f.endswith(".gguf")
]
if not sharded_model_files:
raise GGUFConverterError("No sharded files found.")
print(f"Sharded model files: {sharded_model_files}")
for file in sharded_model_files:
file_path = os.path.join(processing_config.outdir, file)
try:
print(f"Uploading file: {file_path}")
self._upload_file(processing_config, file_path, file)
except Exception as e:
raise GGUFConverterError(f"Error uploading file {file_path}: {e}")
print("Sharded model has been uploaded successfully!")
def _download_base_model(self, processing_config: ModelProcessingConfig) -> str:
"""Download and convert Hugging Face model to GGUF FP16 format."""
print(f"Downloading model {processing_config.model_name}")
if os.path.exists(processing_config.quant_config.fp16_model):
print("Skipping fp16 conversion...")
print(f"Converted model path: {os.path.abspath(processing_config.quant_config.fp16_model)}")
return processing_config.quant_config.fp16_model
with tempfile.TemporaryDirectory(dir=self.DOWNLOAD_FOLDER) as tmpdir:
local_dir = f"{Path(tmpdir)}/{processing_config.model_name}"
print(f"Local directory: {os.path.abspath(local_dir)}")
# Download model
api = HfApi(token=processing_config.token)
pattern = (
"*.safetensors"
if any(
file.path.endswith(".safetensors")
for file in api.list_repo_tree(
repo_id=processing_config.model_id,
recursive=True,
)
)
else "*.bin"
)
dl_pattern = ["*.md", "*.json", "*.model"]
dl_pattern += [pattern]
api.snapshot_download(repo_id=processing_config.model_id, local_dir=local_dir, allow_patterns=dl_pattern)
print("Model downloaded successfully!")
print(f"Model directory contents: {os.listdir(local_dir)}")
config_dir = os.path.join(local_dir, "config.json")
adapter_config_dir = os.path.join(local_dir, "adapter_config.json")
if os.path.exists(adapter_config_dir) and not os.path.exists(config_dir):
raise GGUFConverterError(
'adapter_config.json is present.<br/><br/>If you are converting a LoRA adapter to GGUF, '
'please use <a href="https://huggingface.co/spaces/ggml-org/gguf-my-lora" target="_blank" '
'style="text-decoration:underline">GGUF-my-lora</a>.'
)
# Convert HF to GGUF
print(f"Converting to GGUF FP16: {os.path.abspath(processing_config.quant_config.fp16_model)}")
convert_command = [
"python3", "/app/convert_hf_to_gguf.py", local_dir,
"--outtype", "f16", "--outfile", processing_config.quant_config.fp16_model
]
process = subprocess.Popen(convert_command, shell=False, stderr=subprocess.STDOUT)
try:
process.wait(timeout=self.HF_TO_GGUF_TIMEOUT)
except subprocess.TimeoutExpired:
print("Conversion timed out. Sending SIGINT to allow graceful termination...")
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=self.KILL_TIMEOUT)
except subprocess.TimeoutExpired:
print("Conversion timed out. Killing process...")
process.kill()
raise GGUFConverterError("Error converting to fp16: Operation timed out.")
if process.returncode != 0:
raise GGUFConverterError(f"Error converting to fp16: code={process.returncode}")
print("Model converted to fp16 successfully!")
print(f"Converted model path: {os.path.abspath(processing_config.quant_config.fp16_model)}")
return processing_config.quant_config.fp16_model
def _quantize_model(self, quant_config: QuantizationConfig) -> str:
"""Quantize the GGUF model."""
quantize_cmd = ["llama-quantize"]
if quant_config.quant_embedding:
quantize_cmd.extend(["--token-embedding-type", quant_config.embedding_tensor_method])
if quant_config.leave_output:
quantize_cmd.append("--leave-output-tensor")
else:
if quant_config.quant_output:
quantize_cmd.extend(["--output-tensor-type", quant_config.output_tensor_method])
# Set imatrix file path if needed
if quant_config.use_imatrix:
self._generate_importance_matrix(quant_config)
quantize_cmd.extend(["--imatrix", quant_config.imatrix_file])
else:
print("Not using imatrix quantization.")
quantize_cmd.append(quant_config.fp16_model)
quantize_cmd.append(quant_config.quantized_gguf)
if quant_config.use_imatrix:
quantize_cmd.append(quant_config.imatrix_method)
else:
quantize_cmd.append(quant_config.method)
print(f"Quantizing model with {quantize_cmd}")
# Use Popen for quantization
process = subprocess.Popen(quantize_cmd, shell=False, stderr=subprocess.STDOUT)
try:
process.wait(timeout=self.QUANTIZE_TIMEOUT)
except subprocess.TimeoutExpired:
print("Quantization timed out. Sending SIGINT to allow graceful termination...")
process.send_signal(signal.SIGINT)
try:
process.wait(timeout=self.KILL_TIMEOUT)
except subprocess.TimeoutExpired:
print("Quantization timed out. Killing process...")
process.kill()
raise GGUFConverterError("Error quantizing: Operation timed out.")
if process.returncode != 0:
raise GGUFConverterError(f"Error quantizing: code={process.returncode}")
print(f"Quantized successfully with {quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method} option!")
print(f"Quantized model path: {os.path.abspath(quant_config.quantized_gguf)}")
return quant_config.quantized_gguf
def _create_empty_repo(self, processing_config: ModelProcessingConfig):
api = HfApi(token=processing_config.token)
new_repo_url = api.create_repo(
repo_id=processing_config.output_config.repo_name,
exist_ok=True,
private=processing_config.output_config.private_repo
)
processing_config.new_repo_url = new_repo_url.url
processing_config.new_repo_id = new_repo_url.repo_id
print("Repo created successfully!", processing_config.new_repo_url)
return new_repo_url
def _generate_readme(self, processing_config: ModelProcessingConfig) -> str:
"""Generate README.md for the quantized model."""
creator = self._get_model_creator(processing_config.model_id)
username = whoami(processing_config.token)["name"]
try:
card = ModelCard.load(processing_config.model_id, token=processing_config.token)
except:
card = ModelCard("")
if card.data.tags is None:
card.data.tags = []
card.data.tags.extend(["llama-cpp", "gguf-my-repo"])
card.data.base_model = processing_config.model_id
card.text = dedent(
f"""
# {processing_config.model_name}
**Model creator:** [{creator}](https://huggingface.co/{creator})<br/>
**Original model**: [{processing_config.model_id}](https://huggingface.co/{processing_config.model_id})<br/>
**GGUF quantization:** provided by [{username}](https:/huggingface.co/{username}) using `llama.cpp`<br/>
## Special thanks
🙏 Special thanks to [Georgi Gerganov](https://github.com/ggerganov) and the whole team working on [llama.cpp](https://github.com/ggerganov/llama.cpp/) for making all of this possible.
## Use with Ollama
```bash
ollama run "hf.co/{processing_config.new_repo_id}:<quantization>"
```
## Use with LM Studio
```bash
lms load "{processing_config.new_repo_id}"
```
## Use with llama.cpp CLI
```bash
llama-cli --hf-repo "{processing_config.new_repo_id}" --hf-file "{processing_config.output_config.filename}" -p "The meaning to life and the universe is"
```
## Use with llama.cpp Server:
```bash
llama-server --hf-repo "{processing_config.new_repo_id}" --hf-file "{processing_config.output_config.filename}" -c 4096
```
"""
)
readme_path = f"{processing_config.outdir}/README.md"
card.save(readme_path)
return readme_path
def process_model(self, processing_config: ModelProcessingConfig) -> Tuple[str, str]:
"""Main method to process a model through the entire pipeline."""
quant_config = processing_config.quant_config
split_config = processing_config.split_config
output_config = processing_config.output_config
print(f"Current working directory: {os.path.abspath(os.getcwd())}")
# Download and convert base model
self._download_base_model(processing_config)
# Quantize the model
self._quantize_model(quant_config)
# Create empty repo
self._create_empty_repo(processing_config)
# Upload model
if split_config.enabled:
print(f"Splitting quantized model: {os.path.abspath(quant_config.quantized_gguf)}")
self._split_and_upload_model(processing_config)
else:
try:
print(f"Uploading quantized model: {os.path.abspath(quant_config.quantized_gguf)}")
self._upload_file(processing_config, quant_config.quantized_gguf, output_config.filename)
except Exception as e:
raise GGUFConverterError(f"Error uploading quantized model: {e}")
# Upload imatrix if it exists
if quant_config.use_imatrix and os.path.isfile(quant_config.imatrix_file):
try:
print(f"Uploading imatrix.dat: {os.path.abspath(quant_config.imatrix_file)}")
self._upload_file(processing_config, quant_config.imatrix_file, f"{processing_config.model_name}-imatrix.gguf")
except Exception as e:
raise GGUFConverterError(f"Error uploading imatrix.dat: {e}")
# Upload README.md
readme_path = self._generate_readme(processing_config)
self._upload_file(processing_config, readme_path, "README.md")
print(f"Uploaded successfully with {quant_config.imatrix_method if quant_config.use_imatrix else quant_config.method} option!")
class GGUFConverterUI:
"""Gradio UI for the GGUF Converter."""
def __init__(self):
self.processor = HuggingFaceModelProcessor()
self.css = """/* Custom CSS to allow scrolling */
.gradio-container {overflow-y: auto;}
"""
# Initialize components
self._initialize_components()
self._setup_interface()
def _initialize_components(self):
"""Initialize all UI components."""
#####
# Base model section
#####
self.model_id = HuggingfaceHubSearch(
label="Hub Model ID",
placeholder="Search for model id on Huggingface",
search_type="model",
)
#####
# Quantization section
#####
self.use_imatrix = gr.Checkbox(
value=False,
label="Use Imatrix Quantization",
info="Use importance matrix for quantization."
)
self.q_method = gr.Dropdown(
choices=["Q2_K", "Q3_K_S", "Q3_K_M", "Q3_K_L", "Q4_0", "Q4_K_S", "Q4_K_M", "Q5_0", "Q5_K_S", "Q5_K_M", "Q6_K", "Q8_0", "F16", "BF16"],
label="Quantization Method",
info="GGML quantization type",
value="Q4_K_M",
filterable=False,
visible=True
)
self.imatrix_q_method = gr.Dropdown(
choices=["IQ3_M", "IQ3_XXS", "Q4_K_M", "Q4_K_S", "IQ4_NL", "IQ4_XS", "Q5_K_M", "Q5_K_S"],
label="Imatrix Quantization Method",
info="GGML imatrix quants type",
value="IQ4_NL",
filterable=False,
visible=False
)
self.train_data_file = gr.File(
label="Training Data File",
file_types=[".txt"],
visible=False
)
#####
# Advanced Options section
#####
self.split_model = gr.Checkbox(
value=False,
label="Split Model",
info="Shard the model using gguf-split."
)
self.split_max_tensors = gr.Number(
value=256,
label="Max Tensors per File",
info="Maximum number of tensors per file when splitting model.",
visible=False
)
self.split_max_size = gr.Textbox(
label="Max File Size",
info="Maximum file size when splitting model (--split-max-size). May leave empty to use the default. Accepted suffixes: M, G. Example: 256M, 5G",
visible=False
)
self.leave_output = gr.Checkbox(
value=False,
label="Leave output tensor",
info="Leaves output.weight un(re)quantized"
)
self.quant_embedding = gr.Checkbox(
value=False,
label="Quant embeddings tensor",
info="Quantize embeddings tensor separately"
)
self.embedding_tensor_method = gr.Dropdown(
choices=["Q2_K", "Q3_K", "Q4_K", "Q5_K", "Q6_K", "Q8_0"],
label="Embeddings Quantization Method",
info="use a specific quant type for the token embeddings tensor",
value="Q8_0",
filterable=False,
visible=False
)
self.quant_output = gr.Checkbox(
value=False,
label="Quant output tensor",
info="Quantize output tensor separately"
)
self.output_tensor_method = gr.Dropdown(
choices=["Q2_K", "Q3_K", "Q4_K", "Q5_K", "Q6_K", "Q8_0"],
label="Output Quantization Method",
info="use a specific quant type for the output.weight tensor",
value="Q8_0",
filterable=False,
visible=False
)
#####
# Output Settings section
#####
self.private_repo = gr.Checkbox(
value=False,
label="Private Repo",
info="Create a private repo under your username."
)
self.repo_name = gr.Textbox(
label="Output Repository Name",
info="Set your repository name",
max_lines=1
)
self.gguf_name = gr.Textbox(
label="Output File Name",
info="Set output file name",
max_lines=1
)
#####
# Buttons section
#####
self.clear_btn = gr.ClearButton(
value="Clear",
variant="secondary",
components=[
self.model_id,
self.q_method,
self.use_imatrix,
self.imatrix_q_method,
self.private_repo,
self.train_data_file,
self.leave_output,
self.quant_embedding,
self.embedding_tensor_method,
self.quant_output,
self.output_tensor_method,
self.split_model,
self.split_max_tensors,
self.split_max_size,
self.repo_name,
self.gguf_name,
]
)
self.submit_btn = gr.Button(
value="Submit",
variant="primary"
)
#####
# Outputs section
#####
self.output_label = gr.Markdown(label="output")
self.output_image = gr.Image(
show_label=False,
show_download_button=False,
interactive=False
)
@staticmethod
def _update_output_repo(model_id: str, oauth_token: Optional[gr.OAuthToken]) -> str:
"""Update output repository name based on model and user."""
if oauth_token is None or not oauth_token.token:
return ""
if not model_id:
return ""
try:
username = whoami(oauth_token.token)["name"]
model_name = model_id.split('/')[-1]
return f"{username}/{model_name}-GGUF"
except:
return ""
@staticmethod
def _update_output_filename(model_id: str, use_imatrix: bool, q_method: str, imatrix_q_method: str) -> str:
"""Update output filename based on model and quantization settings."""
if not model_id:
return ""
model_name = model_id.split('/')[-1]
if use_imatrix:
return f"{model_name}-{imatrix_q_method.upper()}-imat.gguf"
return f"{model_name}-{q_method.upper()}.gguf"
def _setup_interface(self):
"""Set up the Gradio interface."""
with gr.Blocks(css=self.css) as self.demo:
#####
# Layout
#####
gr.Markdown(HuggingFaceModelProcessor.ERROR_LOGIN)
gr.LoginButton(min_width=250)
gr.HTML("<h1 style=\"text-aling:center;\">Create your own GGUF Quants!</h1>")
gr.Markdown(f"The space takes an HF repo as an input, quantizes it and creates a Public repo containing the selected quant under your HF user namespace.<br/>Use via {self.processor.SPACE_URL}")
with gr.Row():
with gr.Column() as inputs:
gr.Markdown("### Model Configuration")
self.model_id.render()
with gr.Column():
self.use_imatrix.render()
self.q_method.render()
self.imatrix_q_method.render()
self.train_data_file.render()
gr.Markdown("### Advanced Options")
self.quant_embedding.render()
self.embedding_tensor_method.render()
self.leave_output.render()
self.quant_output.render()
self.output_tensor_method.render()
self.split_model.render()
with gr.Row() as split_options:
self.split_max_tensors.render()
self.split_max_size.render()
gr.Markdown("### Output Settings")
gr.Markdown("You can customize settings for your GGUF repo.")
self.private_repo.render()
with gr.Row():
self.repo_name.render()
self.gguf_name.render()
# Buttons
with gr.Row() as buttons:
self.clear_btn.render()
self.submit_btn.render()
with gr.Column() as outputs:
self.output_label.render()
self.output_image.render()
#####
# Event handlers
#####
self.submit_btn.click(
fn=self._process_model_wrapper,
inputs=[
self.model_id,
self.q_method,
self.use_imatrix,
self.imatrix_q_method,
self.private_repo,
self.train_data_file,
self.repo_name,
self.gguf_name,
self.quant_embedding,
self.embedding_tensor_method,
self.leave_output,
self.quant_output,
self.output_tensor_method,
self.split_model,
self.split_max_tensors,
self.split_max_size
],
outputs=[
self.output_label,
self.output_image,
],
)
#####
# OnChange handlers
#####
self.use_imatrix.change(
fn=lambda use_imatrix: [gr.update(visible=not use_imatrix), gr.update(visible=use_imatrix), gr.update(visible=use_imatrix)],
inputs=self.use_imatrix,
outputs=[self.q_method, self.imatrix_q_method, self.train_data_file]
)
self.split_model.change(
fn=lambda split_model: [gr.update(visible=split_model), gr.update(visible=split_model)],
inputs=self.split_model,
outputs=[self.split_max_tensors, self.split_max_size]
)
self.quant_embedding.change(
fn=lambda quant_embedding: gr.update(visible=quant_embedding),
inputs=self.quant_embedding,
outputs=[self.embedding_tensor_method]
)
self.leave_output.change(
fn=lambda leave_output, quant_output: [gr.update(visible=not leave_output), gr.update(visible=not leave_output and quant_output)],
inputs=[self.leave_output, self.leave_output],
outputs=[self.quant_output, self.output_tensor_method]
)
self.quant_output.change(
fn=lambda quant_output: [gr.update(visible=not quant_output), gr.update(visible=quant_output)],
inputs=self.quant_output,
outputs=[self.leave_output, self.output_tensor_method]
)
self.model_id.change(
fn=self._update_output_repo,
inputs=[self.model_id],
outputs=[self.repo_name]
)
self.model_id.change(
fn=self._update_output_filename,
inputs=[self.model_id, self.use_imatrix, self.q_method, self.imatrix_q_method],
outputs=[self.gguf_name]
)
self.use_imatrix.change(
fn=self._update_output_filename,
inputs=[self.model_id, self.use_imatrix, self.q_method, self.imatrix_q_method],
outputs=[self.gguf_name]
)
self.q_method.change(
fn=self._update_output_filename,
inputs=[self.model_id, self.use_imatrix, self.q_method, self.imatrix_q_method],
outputs=[self.gguf_name]
)
self.imatrix_q_method.change(
fn=self._update_output_filename,
inputs=[self.model_id, self.use_imatrix, self.q_method, self.imatrix_q_method],
outputs=[self.gguf_name]
)
def _process_model_wrapper(self, model_id: str, q_method: str, use_imatrix: bool,
imatrix_q_method: str, private_repo: bool, train_data_file,
repo_name: str, gguf_name: str, quant_embedding: bool,
embedding_tensor_method: str, leave_output: bool,
quant_output: bool, output_tensor_method: str,
split_model: bool, split_max_tensors, split_max_size: str, oauth_token: Optional[gr.OAuthToken]) -> Tuple[str, str]:
"""Wrapper for the process_model method to handle the conversion using ModelProcessingConfig."""
try:
# Validate token and get token string
token = self.processor._validate_token(oauth_token)
# Create configuration objects
quant_config = QuantizationConfig(
method=q_method,
use_imatrix=use_imatrix,
imatrix_method=imatrix_q_method,
train_data=train_data_file.name,
quant_embedding=quant_embedding,
embedding_tensor_method=embedding_tensor_method,
leave_output=leave_output,
quant_output=quant_output,
output_tensor_method=output_tensor_method
)
split_config = SplitConfig(
enabled=split_model,
max_tensors=split_max_tensors if isinstance(split_max_tensors, int) else 256,
max_size=split_max_size
)
output_config = OutputConfig(
private_repo=private_repo,
repo_name=repo_name,
filename=gguf_name
)
model_name = self.processor._get_model_name(model_id)
with tempfile.TemporaryDirectory(dir=self.processor.OUTPUT_FOLDER) as outDirObj:
outdir = (
self.processor._create_folder(os.path.join(self.processor.OUTPUT_FOLDER, model_name))
if self.processor.RUN_LOCALLY == "1"
else Path(outDirObj)
)
quant_config.fp16_model = f"{outdir}/{model_name}-fp16.gguf"
quant_config.imatrix_file = f"{outdir}/{model_name}-imatrix.gguf"
quant_config.quantized_gguf = f"{outdir}/{gguf_name}"
processing_config = ModelProcessingConfig(
token=token,
model_id=model_id,
model_name=model_name,
outdir=outdir,
quant_config=quant_config,
split_config=split_config,
output_config=output_config
)
# Call the processor's main method with the config object
self.processor.process_model(processing_config)
return (
f'<h1>✅ DONE</h1><br/>Find your repo here: <a href="{processing_config.new_repo_url}" target="_blank" style="text-decoration:underline">{processing_config.new_repo_id}</a>',
"llama.png",
)
except Exception as e:
print(f"Error processing model: {e}")
return (f'<h1>❌ ERROR</h1><br/><pre style="white-space:pre-wrap;">{self.processor._escape_html(str(e))}</pre>', "error.png")
def launch(self):
"""Launch the Gradio interface."""
# Set up space restart scheduler
def restart_space():
HfApi().restart_space(repo_id=self.processor.SPACE_ID, token=self.processor.HF_TOKEN, factory_reboot=True)
scheduler = BackgroundScheduler()
scheduler.add_job(restart_space, "interval", seconds=21600)
scheduler.start()
# Launch the interface
self.demo.queue(default_concurrency_limit=1, max_size=5).launch(debug=True, show_api=False)
# Main execution
if __name__ == "__main__":
ui = GGUFConverterUI()
ui.launch()