Spaces:
Running
Running
import gradio as gr | |
import requests | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
import tempfile | |
import os | |
import hashlib | |
from gradio import Progress | |
# Function to get OID from a raw Hugging Face LFS file URL | |
def get_lfs_oid(raw_url: str) -> str | None: | |
""" | |
Fetches the content of a raw Hugging Face LFS file URL and extracts the SHA256 OID. | |
""" | |
try: | |
response = requests.get(raw_url, timeout=10) | |
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx) | |
content = response.text | |
for line in content.splitlines(): | |
if line.startswith("oid sha256:"): | |
return line.split("sha256:")[1].strip() | |
return None | |
except requests.exceptions.RequestException as e: | |
print(f"Error fetching OID from {raw_url}: {e}") | |
return None | |
# Function to get .safetensors file info (file list and OIDs) using only HTTP requests | |
def get_model_safetensors_info(model_id: str) -> tuple[dict, str]: | |
""" | |
Fetches safetensors file information for a Hugging Face model using HTTP requests. | |
Returns {filename: oid} and error_message. | |
""" | |
safetensors_oids = {} | |
error_message = "" | |
try: | |
# Use Hugging Face Hub REST API to get file list | |
api_url = f"https://huggingface.co/api/models/{model_id}" | |
resp = requests.get(api_url, timeout=10) | |
if resp.status_code != 200: | |
error_message += f"Could not fetch file list for {model_id}: HTTP {resp.status_code}\n" | |
return safetensors_oids, error_message | |
data = resp.json() | |
files = [f['rfilename'] for f in data.get('siblings', []) if f['rfilename'].endswith('.safetensors')] | |
if not files: | |
error_message += f"No .safetensors files found for {model_id}.\n" | |
return safetensors_oids, error_message | |
# Parallel OID fetch | |
def fetch_oid(f): | |
raw_url = f"https://huggingface.co/{model_id}/raw/main/{f}" | |
oid = get_lfs_oid(raw_url) | |
return f, oid | |
with ThreadPoolExecutor(max_workers=min(8, len(files))) as executor: | |
future_to_file = {executor.submit(fetch_oid, f): f for f in files} | |
for future in as_completed(future_to_file): | |
f, oid = future.result() | |
if oid: | |
safetensors_oids[f] = oid | |
else: | |
error_message += f"Could not get OID for {f} in {model_id}.\n" | |
except Exception as e: | |
error_message += f"Error fetching info for {model_id}: {e}\n" | |
return safetensors_oids, error_message | |
# Main comparison function (no config, only file structure and OIDs) | |
def compare_hf_models(model_id1: str, model_id2: str) -> str: | |
""" | |
Compares two Hugging Face models based on their safetensors OIDs. | |
""" | |
if not model_id1 or not model_id2: | |
return "Please provide both model IDs." | |
output = [] | |
output.append(f"--- Fetching info for Model 1: {model_id1} ---") | |
oids1, err1 = get_model_safetensors_info(model_id1) | |
if err1: output.append(err1) | |
output.append(f"Found {len(oids1)} .safetensors files for {model_id1}.") | |
output.append(f"\n--- Fetching info for Model 2: {model_id2} ---") | |
oids2, err2 = get_model_safetensors_info(model_id2) | |
if err2: output.append(err2) | |
output.append(f"Found {len(oids2)} .safetensors files for {model_id2}.") | |
# 1. Compare Safetensors OIDs | |
output.append("\n--- Safetensors Weight File Comparison (via OID) ---") | |
if not oids1 and not oids2: | |
output.append("No .safetensors files found for either model. Cannot compare weights.") | |
weights_identical = False | |
elif not oids1: | |
output.append(f"No .safetensors files found for {model_id1}. Cannot compare weights.") | |
weights_identical = False | |
elif not oids2: | |
output.append(f"No .safetensors files found for {model_id2}. Cannot compare weights.") | |
weights_identical = False | |
else: | |
# Check if file lists are identical | |
files1_set = set(oids1.keys()) | |
files2_set = set(oids2.keys()) | |
if files1_set != files2_set: | |
output.append("The set of .safetensors files differs between models.") | |
output.append(f"Files in {model_id1} but not {model_id2}: {files1_set - files2_set}") | |
output.append(f"Files in {model_id2} but not {model_id1}: {files2_set - files1_set}") | |
weights_identical = False | |
else: | |
output.append("The models have the same set of .safetensors files.") | |
all_oids_match = True | |
diff_files = [] | |
for filename in files1_set: | |
if oids1[filename] != oids2[filename]: | |
all_oids_match = False | |
diff_files.append(filename) | |
if all_oids_match: | |
output.append("All corresponding .safetensors OIDs are IDENTICAL.") | |
output.append(f"This strongly suggests '{model_id1}' and '{model_id2}' are 'copy-paste' models at the weight level.") | |
weights_identical = True | |
else: | |
output.append(f"Some .safetensors OIDs DIFFER. Differing files: {', '.join(diff_files)}") | |
output.append(f"This indicates different weights. If file structure is identical, '{model_id2}' could be a 'fine-tuned' version of '{model_id1}' (or vice-versa, or both fine-tuned from a common base).") | |
weights_identical = False | |
output.append("\n--- Summary ---") | |
if weights_identical: | |
output.append(f"Conclusion: Models '{model_id1}' and '{model_id2}' are IDENTICAL (copy-paste).") | |
else: | |
output.append(f"Conclusion: Models '{model_id1}' and '{model_id2}' have different weights or file structures. They are distinct or fine-tuned models.") | |
return "\n".join(output) | |
def multi_compare_hf_models(model_ids: list) -> tuple: | |
if not model_ids or len(model_ids) < 2: | |
return "Please provide at least two model IDs.", None, None | |
details = [] | |
safetensors_data = {} | |
errors = {} | |
# Fetch all model info in parallel | |
with ThreadPoolExecutor(max_workers=min(8, len(model_ids))) as executor: | |
future_to_model = {executor.submit(get_model_safetensors_info, mid): mid for mid in model_ids} | |
for future in as_completed(future_to_model): | |
mid = future_to_model[future] | |
oids, err = future.result() | |
safetensors_data[mid] = oids | |
errors[mid] = err | |
# Build summary | |
summary = [] | |
all_files = set() | |
for mid, oids in safetensors_data.items(): | |
all_files.update(oids.keys()) | |
all_files = sorted(all_files) | |
# Table header | |
table = [["File"] + model_ids + ["Match"]] | |
for f in all_files: | |
row = [f] | |
oids_for_file = [] | |
for mid in model_ids: | |
oid = safetensors_data.get(mid, {}).get(f, "-") | |
oids_for_file.append(oid if oid else "-") | |
row.append(oid if oid else "-") | |
# Determine if all OIDs for this file match (ignoring missing) | |
present_oids = [oid for oid in oids_for_file if oid != "-"] | |
if len(present_oids) > 1 and all(oid == present_oids[0] for oid in present_oids): | |
row.append("Match") | |
else: | |
row.append("Unmatch") | |
table.append(row) | |
# Per-model details | |
for mid in model_ids: | |
oids = safetensors_data.get(mid, {}) | |
summary.append(f"{mid}: {len(oids)} .safetensors files.") | |
if errors[mid]: | |
summary.append(f"Errors for {mid}: {errors[mid]}") | |
# File presence summary | |
for f in all_files: | |
present = [mid for mid in model_ids if f in safetensors_data.get(mid, {})] | |
if len(present) != len(model_ids): | |
summary.append(f"File '{f}' missing in: {set(model_ids) - set(present)}") | |
return "\n".join(summary), table, safetensors_data | |
def download_file(url, dest): | |
try: | |
r = requests.get(url, stream=True, timeout=30) | |
r.raise_for_status() | |
with open(dest, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
f.write(chunk) | |
return True, "" | |
except Exception as e: | |
return False, str(e) | |
def download_file_with_progress(url, dest, progress: Progress = None): | |
try: | |
r = requests.get(url, stream=True, timeout=30) | |
r.raise_for_status() | |
total = int(r.headers.get('content-length', 0)) | |
downloaded = 0 | |
with open(dest, 'wb') as f: | |
for chunk in r.iter_content(chunk_size=8192): | |
if chunk: | |
f.write(chunk) | |
downloaded += len(chunk) | |
if progress and total: | |
progress((downloaded / total), desc=f"Downloading {os.path.basename(dest)}: {downloaded//1024}KB/{total//1024}KB") | |
return True, "" | |
except Exception as e: | |
return False, str(e) | |
def file_similarity(file1, file2, chunk_size=1024*1024): | |
""" | |
Compares two files byte-by-byte and returns percent similarity (by identical bytes). | |
""" | |
size1 = os.path.getsize(file1) | |
size2 = os.path.getsize(file2) | |
if size1 != size2: | |
return 0.0, f"File sizes differ: {size1} vs {size2} bytes." | |
total = size1 | |
same = 0 | |
with open(file1, 'rb') as f1, open(file2, 'rb') as f2: | |
while True: | |
b1 = f1.read(chunk_size) | |
b2 = f2.read(chunk_size) | |
if not b1: | |
break | |
for x, y in zip(b1, b2): | |
if x == y: | |
same += 1 | |
percent = (same / total) * 100 if total else 0.0 | |
return percent, None | |
# Gradio Interface | |
with gr.Blocks(theme="soft") as demo: | |
gr.Markdown( | |
""" | |
# 🤖 Hugging Face Model Cross-Checker | |
Easily check if two Hugging Face models are **identical (copy-paste)**, **fine-tuned**, or **completely different**—without downloading any weights! | |
- Enter two model IDs below (e.g. `deepseek-ai/DeepSeek-R1-0528` and `Parveshiiii/DeepSeek-R1-0528-MathX`). | |
- Click **Compare** to see a clear verdict and detailed breakdown. | |
""" | |
) | |
with gr.Row(): | |
model1 = gr.Textbox(label="Model ID 1", placeholder="e.g. deepseek-ai/DeepSeek-R1-0528") | |
model2 = gr.Textbox(label="Model ID 2", placeholder="e.g. Parveshiiii/DeepSeek-R1-0528-MathX") | |
compare_btn = gr.Button("Compare") | |
verdict = gr.HighlightedText(label="Result Verdict", color_map={"Copy-Paste":"green","Fine-Tuned":"orange","Different":"red","Error":"gray"}) | |
details = gr.Dataframe(headers=["File","Model 1 OID","Model 2 OID","Match"], label="File-by-File Comparison", interactive=False) | |
summary = gr.Textbox(label="Summary Details", lines=8, interactive=False) | |
def crosscheck_ui(m1, m2): | |
if not m1 or not m2: | |
return [("Error: Please provide both model IDs.", "Error")], [], "" | |
oids1, err1 = get_model_safetensors_info(m1) | |
oids2, err2 = get_model_safetensors_info(m2) | |
if err1 or err2: | |
return [(f"Error: {err1 or ''} {err2 or ''}", "Error")], [], "" | |
files = sorted(set(oids1.keys()) | set(oids2.keys())) | |
table = [] | |
all_match = True | |
all_present = True | |
diff_count = 0 | |
for f in files: | |
oid1 = oids1.get(f, "-") | |
oid2 = oids2.get(f, "-") | |
if oid1 == oid2 and oid1 != "-": | |
match = "Match" | |
else: | |
match = "Unmatch" | |
all_match = False | |
if oid1 != "-" and oid2 != "-": | |
diff_count += 1 | |
if oid1 == "-" or oid2 == "-": | |
all_present = False | |
table.append([f, oid1, oid2, match]) | |
# Verdict logic | |
if all_match and all_present and files: | |
verdict_text = [("Copy-Paste: Models are identical at the safetensors level!", "Copy-Paste")] | |
elif all_present and diff_count > 0: | |
verdict_text = [("Fine-Tuned: Same file structure, but weights differ.", "Fine-Tuned")] | |
else: | |
verdict_text = [("Different: File structure or weights are different.", "Different")] | |
# Summary | |
summary_lines = [ | |
f"Model 1: {m1} ({len(oids1)} .safetensors files)", | |
f"Model 2: {m2} ({len(oids2)} .safetensors files)", | |
f"Files compared: {len(files)}", | |
f"Matching files: {sum(1 for row in table if row[3]=='Match')}", | |
f"Unmatched files: {sum(1 for row in table if row[3]=='Unmatch')}", | |
] | |
missing1 = [f for f in files if oids1.get(f) is None] | |
missing2 = [f for f in files if oids2.get(f) is None] | |
if missing1: | |
summary_lines.append(f"Files missing in Model 1: {', '.join(missing1)}") | |
if missing2: | |
summary_lines.append(f"Files missing in Model 2: {', '.join(missing2)}") | |
return verdict_text, table, "\n".join(summary_lines) | |
compare_btn.click( | |
fn=crosscheck_ui, | |
inputs=[model1, model2], | |
outputs=[verdict, details, summary] | |
) | |
with gr.Accordion("Advanced: Compare File Shards Bitwise", open=False): | |
gr.Markdown(""" | |
## Compare a specific file (shard) from both models, byte-by-byte | |
- Enter the file name (e.g. `model-00001-of-00010.safetensors`). | |
- The tool will download this file from both models and compare their contents. | |
- Shows the percent of identical bytes (100% = exact copy). | |
""") | |
adv_model1 = gr.Textbox(label="Model ID 1", placeholder="e.g. deepseek-ai/DeepSeek-R1-0528") | |
adv_model2 = gr.Textbox(label="Model ID 2", placeholder="e.g. Parveshiiii/DeepSeek-R1-0528-MathX") | |
adv_filename = gr.Textbox(label="File Name", placeholder="e.g. model-00001-of-00010.safetensors") | |
adv_btn = gr.Button("Download & Compare File") | |
adv_result = gr.Textbox(label="Bitwise Comparison Result", lines=3, interactive=False) | |
def adv_compare(m1, m2, fname, progress=gr.Progress(track_tqdm=True)): | |
if not m1 or not m2 or not fname: | |
return "Please provide both model IDs and the file name." | |
url1 = f"https://huggingface.co/{m1}/resolve/main/{fname}?download=true" | |
url2 = f"https://huggingface.co/{m2}/resolve/main/{fname}?download=true" | |
with tempfile.TemporaryDirectory() as tmp: | |
f1 = os.path.join(tmp, "f1.safetensors") | |
f2 = os.path.join(tmp, "f2.safetensors") | |
ok1, err1 = download_file_with_progress(url1, f1, progress) | |
ok2, err2 = download_file_with_progress(url2, f2, progress) | |
if not ok1 or not ok2: | |
return f"Download error: {err1 or ''} {err2 or ''}" | |
percent, err = file_similarity(f1, f2) | |
if err: | |
return f"Comparison error: {err}" | |
return f"Similarity: {percent:.2f}% ({'identical' if percent==100 else 'different'})" | |
adv_btn.click( | |
fn=adv_compare, | |
inputs=[adv_model1, adv_model2, adv_filename], | |
outputs=[adv_result] | |
) | |
demo.launch() |