import gradio as gr import requests from concurrent.futures import ThreadPoolExecutor, as_completed import tempfile import os import hashlib import time from gradio import Progress # Function to get OID from a raw Hugging Face LFS file URL def get_lfs_oid(raw_url: str) -> str | None: """ Fetches the content of a raw Hugging Face LFS file URL and extracts the SHA256 OID. """ try: response = requests.get(raw_url, timeout=10) response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx) content = response.text for line in content.splitlines(): if line.startswith("oid sha256:"): return line.split("sha256:")[1].strip() return None except requests.exceptions.RequestException as e: print(f"Error fetching OID from {raw_url}: {e}") return None # Function to get .safetensors file info (file list and OIDs) using only HTTP requests def get_model_safetensors_info(model_id: str) -> tuple[dict, str]: """ Fetches safetensors file information for a Hugging Face model using HTTP requests. Returns {filename: oid} and error_message. """ safetensors_oids = {} error_message = "" try: # Use Hugging Face Hub REST API to get file list api_url = f"https://huggingface.co/api/models/{model_id}" resp = requests.get(api_url, timeout=10) if resp.status_code != 200: error_message += f"Could not fetch file list for {model_id}: HTTP {resp.status_code}\n" return safetensors_oids, error_message data = resp.json() files = [f['rfilename'] for f in data.get('siblings', []) if f['rfilename'].endswith('.safetensors')] if not files: error_message += f"No .safetensors files found for {model_id}.\n" return safetensors_oids, error_message # Parallel OID fetch def fetch_oid(f): raw_url = f"https://huggingface.co/{model_id}/raw/main/{f}" oid = get_lfs_oid(raw_url) return f, oid with ThreadPoolExecutor(max_workers=min(8, len(files))) as executor: future_to_file = {executor.submit(fetch_oid, f): f for f in files} for future in as_completed(future_to_file): f, oid = future.result() if oid: safetensors_oids[f] = oid else: error_message += f"Could not get OID for {f} in {model_id}.\n" except Exception as e: error_message += f"Error fetching info for {model_id}: {e}\n" return safetensors_oids, error_message # Main comparison function (no config, only file structure and OIDs) def compare_hf_models(model_id1: str, model_id2: str) -> str: """ Compares two Hugging Face models based on their safetensors OIDs. """ if not model_id1 or not model_id2: return "Please provide both model IDs." output = [] output.append(f"--- Fetching info for Model 1: {model_id1} ---") oids1, err1 = get_model_safetensors_info(model_id1) if err1: output.append(err1) output.append(f"Found {len(oids1)} .safetensors files for {model_id1}.") output.append(f"\n--- Fetching info for Model 2: {model_id2} ---") oids2, err2 = get_model_safetensors_info(model_id2) if err2: output.append(err2) output.append(f"Found {len(oids2)} .safetensors files for {model_id2}.") # 1. Compare Safetensors OIDs output.append("\n--- Safetensors Weight File Comparison (via OID) ---") if not oids1 and not oids2: output.append("No .safetensors files found for either model. Cannot compare weights.") weights_identical = False elif not oids1: output.append(f"No .safetensors files found for {model_id1}. Cannot compare weights.") weights_identical = False elif not oids2: output.append(f"No .safetensors files found for {model_id2}. Cannot compare weights.") weights_identical = False else: # Check if file lists are identical files1_set = set(oids1.keys()) files2_set = set(oids2.keys()) if files1_set != files2_set: output.append("The set of .safetensors files differs between models.") output.append(f"Files in {model_id1} but not {model_id2}: {files1_set - files2_set}") output.append(f"Files in {model_id2} but not {model_id1}: {files2_set - files1_set}") weights_identical = False else: output.append("The models have the same set of .safetensors files.") all_oids_match = True diff_files = [] for filename in files1_set: if oids1[filename] != oids2[filename]: all_oids_match = False diff_files.append(filename) if all_oids_match: output.append("All corresponding .safetensors OIDs are IDENTICAL.") output.append(f"This strongly suggests '{model_id1}' and '{model_id2}' are 'copy-paste' models at the weight level.") weights_identical = True else: output.append(f"Some .safetensors OIDs DIFFER. Differing files: {', '.join(diff_files)}") output.append(f"This indicates different weights. If file structure is identical, '{model_id2}' could be a 'fine-tuned' version of '{model_id1}' (or vice-versa, or both fine-tuned from a common base).") weights_identical = False output.append("\n--- Summary ---") if weights_identical: output.append(f"Conclusion: Models '{model_id1}' and '{model_id2}' are IDENTICAL (copy-paste).") else: output.append(f"Conclusion: Models '{model_id1}' and '{model_id2}' have different weights or file structures. They are distinct or fine-tuned models.") return "\n".join(output) def multi_compare_hf_models(model_ids: list) -> tuple: if not model_ids or len(model_ids) < 2: return "Please provide at least two model IDs.", None, None details = [] safetensors_data = {} errors = {} # Fetch all model info in parallel with ThreadPoolExecutor(max_workers=min(8, len(model_ids))) as executor: future_to_model = {executor.submit(get_model_safetensors_info, mid): mid for mid in model_ids} for future in as_completed(future_to_model): mid = future_to_model[future] oids, err = future.result() safetensors_data[mid] = oids errors[mid] = err # Build summary summary = [] all_files = set() for mid, oids in safetensors_data.items(): all_files.update(oids.keys()) all_files = sorted(all_files) # Table header table = [["File"] + model_ids + ["Match"]] for f in all_files: row = [f] oids_for_file = [] for mid in model_ids: oid = safetensors_data.get(mid, {}).get(f, "-") oids_for_file.append(oid if oid else "-") row.append(oid if oid else "-") # Determine if all OIDs for this file match (ignoring missing) present_oids = [oid for oid in oids_for_file if oid != "-"] if len(present_oids) > 1 and all(oid == present_oids[0] for oid in present_oids): row.append("Match") else: row.append("Unmatch") table.append(row) # Per-model details for mid in model_ids: oids = safetensors_data.get(mid, {}) summary.append(f"{mid}: {len(oids)} .safetensors files.") if errors[mid]: summary.append(f"Errors for {mid}: {errors[mid]}") # File presence summary for f in all_files: present = [mid for mid in model_ids if f in safetensors_data.get(mid, {})] if len(present) != len(model_ids): summary.append(f"File '{f}' missing in: {set(model_ids) - set(present)}") return "\n".join(summary), table, safetensors_data def download_file(url, dest): try: r = requests.get(url, stream=True, timeout=30) r.raise_for_status() with open(dest, 'wb') as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return True, "" except Exception as e: return False, str(e) def download_file_with_progress(url, dest, progress: Progress = None, progress_offset=0, progress_scale=1): try: r = requests.get(url, stream=True, timeout=30) r.raise_for_status() total = int(r.headers.get('content-length', 0)) downloaded = 0 start_time = time.time() last_update_time = start_time update_interval = 1.0 # Update every 1 second for HF Spaces compatibility if progress and total: mb_total = total // 1024 // 1024 progress(progress_offset, desc=f"🎯 Starting: {os.path.basename(dest)} ({mb_total}MB)") with open(dest, 'wb') as f: for chunk in r.iter_content(chunk_size=65536): # 64KB chunks for better performance on HF Spaces if chunk: f.write(chunk) downloaded += len(chunk) current_time = time.time() # Update progress less frequently for HF Spaces if progress and total and (current_time - last_update_time) >= update_interval: file_progress = downloaded / total overall_progress = progress_offset + (file_progress * progress_scale) # Calculate download speed elapsed_time = current_time - start_time if elapsed_time > 0: speed_bps = downloaded / elapsed_time speed_mbps = speed_bps / (1024 * 1024) if speed_mbps >= 1: speed_str = f"{speed_mbps:.1f}MB/s" else: speed_kbps = speed_bps / 1024 speed_str = f"{speed_kbps:.0f}KB/s" else: speed_str = "calculating..." # Calculate ETA if speed_bps > 0: remaining_bytes = total - downloaded eta_seconds = remaining_bytes / speed_bps if eta_seconds < 60: eta_str = f"{eta_seconds:.0f}s" else: eta_minutes = eta_seconds / 60 eta_str = f"{eta_minutes:.1f}min" else: eta_str = "calculating..." mb_downloaded = downloaded // 1024 // 1024 mb_total = total // 1024 // 1024 # Simplified progress message for HF Spaces progress(overall_progress, desc=f"⬇️ {mb_downloaded}/{mb_total}MB ({file_progress*100:.0f}%) β€’ {speed_str} β€’ ETA: {eta_str}") last_update_time = current_time if progress: final_time = time.time() total_time = final_time - start_time avg_speed = (downloaded / total_time) / (1024 * 1024) if total_time > 0 else 0 mb_total = total // 1024 // 1024 progress(progress_offset + progress_scale, desc=f"βœ… Complete: {mb_total}MB downloaded (avg {avg_speed:.1f}MB/s)") return True, "" except Exception as e: if progress: progress(progress_offset + progress_scale, desc=f"❌ Download failed: {str(e)[:50]}...") return False, str(e) def file_similarity(file1, file2, chunk_size=1024*1024): """ Compares two files byte-by-byte and returns percent similarity (by identical bytes). """ size1 = os.path.getsize(file1) size2 = os.path.getsize(file2) if size1 != size2: return 0.0, f"File sizes differ: {size1} vs {size2} bytes." total = size1 same = 0 with open(file1, 'rb') as f1, open(file2, 'rb') as f2: while True: b1 = f1.read(chunk_size) b2 = f2.read(chunk_size) if not b1: break for x, y in zip(b1, b2): if x == y: same += 1 percent = (same / total) * 100 if total else 0.0 return percent, None # Gradio Interface with gr.Blocks(theme="soft") as demo: gr.Markdown( """ # πŸ€– Hugging Face Model Cross-Checker Easily check if two Hugging Face models are **identical (copy-paste)**, **fine-tuned**, or **completely different**β€”without downloading any weights! - Enter two model IDs below (e.g. `deepseek-ai/DeepSeek-R1-0528` and `Parveshiiii/DeepSeek-R1-0528-MathX`). - Click **Compare** to see a clear verdict and detailed breakdown. """ ) with gr.Row(): model1 = gr.Textbox(label="Model ID 1", placeholder="e.g. deepseek-ai/DeepSeek-R1-0528") model2 = gr.Textbox(label="Model ID 2", placeholder="e.g. Parveshiiii/DeepSeek-R1-0528-MathX") compare_btn = gr.Button("Compare") verdict = gr.HighlightedText(label="Result Verdict", color_map={"Copy-Paste":"green","Fine-Tuned":"orange","Different":"red","Error":"gray"}) details = gr.Dataframe(headers=["File","Model 1 OID","Model 2 OID","Match"], label="File-by-File Comparison", interactive=False) summary = gr.Textbox(label="Summary Details", lines=8, interactive=False) def crosscheck_ui(m1, m2): if not m1 or not m2: return [("Error: Please provide both model IDs.", "Error")], [], "" oids1, err1 = get_model_safetensors_info(m1) oids2, err2 = get_model_safetensors_info(m2) if err1 or err2: return [(f"Error: {err1 or ''} {err2 or ''}", "Error")], [], "" files = sorted(set(oids1.keys()) | set(oids2.keys())) table = [] all_match = True all_present = True diff_count = 0 for f in files: oid1 = oids1.get(f, "-") oid2 = oids2.get(f, "-") if oid1 == oid2 and oid1 != "-": match = "Match" else: match = "Unmatch" all_match = False if oid1 != "-" and oid2 != "-": diff_count += 1 if oid1 == "-" or oid2 == "-": all_present = False table.append([f, oid1, oid2, match]) # Verdict logic if all_match and all_present and files: verdict_text = [("Copy-Paste: Models are identical at the safetensors level!", "Copy-Paste")] elif all_present and diff_count > 0: verdict_text = [("Fine-Tuned: Same file structure, but weights differ.", "Fine-Tuned")] else: verdict_text = [("Different: File structure or weights are different.", "Different")] # Summary summary_lines = [ f"Model 1: {m1} ({len(oids1)} .safetensors files)", f"Model 2: {m2} ({len(oids2)} .safetensors files)", f"Files compared: {len(files)}", f"Matching files: {sum(1 for row in table if row[3]=='Match')}", f"Unmatched files: {sum(1 for row in table if row[3]=='Unmatch')}", ] missing1 = [f for f in files if oids1.get(f) is None] missing2 = [f for f in files if oids2.get(f) is None] if missing1: summary_lines.append(f"Files missing in Model 1: {', '.join(missing1)}") if missing2: summary_lines.append(f"Files missing in Model 2: {', '.join(missing2)}") return verdict_text, table, "\n".join(summary_lines) compare_btn.click( fn=crosscheck_ui, inputs=[model1, model2], outputs=[verdict, details, summary] ) with gr.Accordion("Advanced: Compare File Shards Bitwise", open=False): gr.Markdown(""" ## Compare a specific file (shard) from both models, byte-by-byte - Enter the file name (e.g. `model-00001-of-00010.safetensors`). - The tool will download this file from both models and compare their contents. - Shows the percent of identical bytes (100% = exact copy). """) adv_model1 = gr.Textbox(label="Model ID 1", placeholder="e.g. deepseek-ai/DeepSeek-R1-0528") adv_model2 = gr.Textbox(label="Model ID 2", placeholder="e.g. Parveshiiii/DeepSeek-R1-0528-MathX") adv_filename = gr.Textbox(label="File Name", placeholder="e.g. model-00001-of-00010.safetensors") adv_btn = gr.Button("Download & Compare File") adv_result = gr.Textbox(label="Bitwise Comparison Result", lines=3, interactive=False) def adv_compare(m1, m2, fname, progress=gr.Progress()): if not m1 or not m2 or not fname: return "Please provide both model IDs and the file name." progress(0.0, desc="πŸš€ Initializing comparison...") url1 = f"https://huggingface.co/{m1}/resolve/main/{fname}?download=true" url2 = f"https://huggingface.co/{m2}/resolve/main/{fname}?download=true" with tempfile.TemporaryDirectory() as tmp: f1 = os.path.join(tmp, f"model1_{fname}") f2 = os.path.join(tmp, f"model2_{fname}") # Download first file (5% to 47.5%) progress(0.05, desc=f"πŸ“‘ Connecting to {m1.split('/')[-1]}...") ok1, err1 = download_file_with_progress(url1, f1, progress, progress_offset=0.05, progress_scale=0.425) if not ok1: return f"❌ Download failed from {m1}: {err1}" # Download second file (50% to 92.5%) progress(0.5, desc=f"πŸ“‘ Connecting to {m2.split('/')[-1]}...") ok2, err2 = download_file_with_progress(url2, f2, progress, progress_offset=0.5, progress_scale=0.425) if not ok2: return f"❌ Download failed from {m2}: {err2}" # Compare files (95% to 100%) progress(0.95, desc="πŸ” Analyzing files byte-by-byte...") percent, err = file_similarity(f1, f2) if err: return f"❌ Comparison error: {err}" progress(1.0, desc="βœ… Analysis complete!") # Get file info size1 = os.path.getsize(f1) size2 = os.path.getsize(f2) size_mb = size1 // 1024 // 1024 # Enhanced result formatting if percent == 100: result_icon = "🟒" result_text = "IDENTICAL" elif percent >= 99: result_icon = "🟑" result_text = "NEARLY IDENTICAL" elif percent >= 90: result_icon = "🟠" result_text = "SIMILAR" else: result_icon = "πŸ”΄" result_text = "DIFFERENT" return f"{result_icon} **{result_text}** ({percent:.3f}% similarity)\nπŸ“ File size: {size_mb}MB\nπŸ”— Models: {m1.split('/')[-1]} vs {m2.split('/')[-1]}" adv_btn.click( fn=adv_compare, inputs=[adv_model1, adv_model2, adv_filename], outputs=[adv_result] ) demo.launch()