Spaces:
Running
Running
import json | |
import uuid | |
import datetime | |
from typing import Dict, Optional, Any | |
from huggingface_hub import HfApi, ModelCard | |
from .utils import calculate_completeness_score | |
class AIBOMGenerator: | |
def __init__( | |
self, | |
hf_token: Optional[str] = None, | |
inference_model_url: Optional[str] = None, | |
use_inference: bool = True, | |
cache_dir: Optional[str] = None, | |
): | |
self.hf_api = HfApi(token=hf_token) | |
self.inference_model_url = inference_model_url | |
self.use_inference = use_inference | |
self.cache_dir = cache_dir | |
def generate_aibom( | |
self, | |
model_id: str, | |
output_file: Optional[str] = None, | |
include_inference: Optional[bool] = None, | |
) -> Dict[str, Any]: | |
use_inference = include_inference if include_inference is not None else self.use_inference | |
model_info = self._fetch_model_info(model_id) | |
model_card = self._fetch_model_card(model_id) | |
# Store original metadata before any AI enhancement | |
original_metadata = self._extract_structured_metadata(model_id, model_info, model_card) | |
# Create initial AIBOM with original metadata | |
original_aibom = self._create_aibom_structure(model_id, original_metadata) | |
# Calculate initial score | |
original_score = calculate_completeness_score(original_aibom) | |
# Final metadata starts with original metadata | |
final_metadata = original_metadata.copy() | |
# Apply AI enhancement if requested | |
ai_enhanced = False | |
ai_model_name = None | |
if use_inference and self.inference_model_url: | |
try: | |
# Extract additional metadata using AI | |
enhanced_metadata = self._extract_unstructured_metadata(model_card, model_id) | |
# If we got enhanced metadata, merge it with original | |
if enhanced_metadata: | |
ai_enhanced = True | |
ai_model_name = "BERT-base-uncased" # Will be replaced with actual model name | |
# Merge enhanced metadata with original (enhanced takes precedence) | |
for key, value in enhanced_metadata.items(): | |
if value is not None and (key not in final_metadata or not final_metadata[key]): | |
final_metadata[key] = value | |
except Exception as e: | |
print(f"Error during AI enhancement: {e}") | |
# Continue with original metadata if enhancement fails | |
# Create final AIBOM with potentially enhanced metadata | |
aibom = self._create_aibom_structure(model_id, final_metadata) | |
# Calculate final score | |
final_score = calculate_completeness_score(aibom) | |
# Add score and enhancement info to metadata properties | |
if "metadata" in aibom and "properties" not in aibom["metadata"]: | |
aibom["metadata"]["properties"] = [] | |
if "metadata" in aibom and "properties" in aibom["metadata"]: | |
# Add score information | |
aibom["metadata"]["properties"].append({"name": "aibom:quality-score", "value": str(final_score["total_score"])}) | |
aibom["metadata"]["properties"].append({"name": "aibom:quality-breakdown", "value": json.dumps(final_score["section_scores"])}) | |
aibom["metadata"]["properties"].append({"name": "aibom:max-scores", "value": json.dumps(final_score["max_scores"])}) | |
# Add AI enhancement information | |
if ai_enhanced: | |
aibom["metadata"]["properties"].append({"name": "aibom:ai-enhanced", "value": "true"}) | |
aibom["metadata"]["properties"].append({"name": "aibom:ai-model", "value": ai_model_name}) | |
aibom["metadata"]["properties"].append({"name": "aibom:original-score", "value": str(original_score["total_score"])}) | |
aibom["metadata"]["properties"].append({"name": "aibom:score-improvement", | |
"value": str(round(final_score["total_score"] - original_score["total_score"], 2))}) | |
if output_file: | |
with open(output_file, 'w') as f: | |
json.dump(aibom, f, indent=2) | |
# Create enhancement report for UI display | |
enhancement_report = { | |
"ai_enhanced": ai_enhanced, | |
"ai_model": ai_model_name if ai_enhanced else None, | |
"original_score": original_score, | |
"final_score": final_score, | |
"improvement": round(final_score["total_score"] - original_score["total_score"], 2) if ai_enhanced else 0 | |
} | |
return aibom, enhancement_report | |
def _fetch_model_info(self, model_id: str) -> Dict[str, Any]: | |
try: | |
return self.hf_api.model_info(model_id) | |
except Exception as e: | |
print(f"Error fetching model info for {model_id}: {e}") | |
return {} | |
def _fetch_model_card(self, model_id: str) -> Optional[ModelCard]: | |
try: | |
return ModelCard.load(model_id) | |
except Exception as e: | |
print(f"Error fetching model card for {model_id}: {e}") | |
return None | |
def _create_aibom_structure( | |
self, | |
model_id: str, | |
metadata: Dict[str, Any], | |
) -> Dict[str, Any]: | |
aibom = { | |
"bomFormat": "CycloneDX", | |
"specVersion": "1.6", | |
"serialNumber": f"urn:uuid:{str(uuid.uuid4())}", | |
"version": 1, | |
"metadata": self._create_metadata_section(model_id, metadata), | |
"components": [self._create_component_section(model_id, metadata)], | |
"dependencies": [ | |
{ | |
"ref": f"pkg:generic/{model_id.replace('/', '%2F')}", | |
"dependsOn": ["pkg:pypi/[email protected]"] | |
} | |
] | |
} | |
return aibom | |
def _extract_structured_metadata( | |
self, | |
model_id: str, | |
model_info: Dict[str, Any], | |
model_card: Optional[ModelCard], | |
) -> Dict[str, Any]: | |
metadata = {} | |
if model_info: | |
metadata.update({ | |
"name": model_info.modelId.split("/")[-1] if hasattr(model_info, "modelId") else model_id.split("/")[-1], | |
"author": model_info.author if hasattr(model_info, "author") else None, | |
"tags": model_info.tags if hasattr(model_info, "tags") else [], | |
"pipeline_tag": model_info.pipeline_tag if hasattr(model_info, "pipeline_tag") else None, | |
"downloads": model_info.downloads if hasattr(model_info, "downloads") else 0, | |
"last_modified": model_info.lastModified if hasattr(model_info, "lastModified") else None, | |
"commit": model_info.sha[:7] if hasattr(model_info, "sha") and model_info.sha else None, | |
"commit_url": f"https://huggingface.co/{model_id}/commit/{model_info.sha}" if hasattr(model_info, "sha") and model_info.sha else None, | |
}) | |
if model_card and model_card.data: | |
card_data = model_card.data.to_dict() if hasattr(model_card.data, "to_dict") else {} | |
metadata.update({ | |
"language": card_data.get("language"), | |
"license": card_data.get("license"), | |
"library_name": card_data.get("library_name"), | |
"base_model": card_data.get("base_model"), | |
"datasets": card_data.get("datasets"), | |
"model_name": card_data.get("model_name"), | |
"tags": card_data.get("tags", metadata.get("tags", [])), | |
"description": card_data.get("model_summary", None) | |
}) | |
if hasattr(model_card.data, "eval_results") and model_card.data.eval_results: | |
metadata["eval_results"] = model_card.data.eval_results | |
metadata["ai:type"] = "Transformer" | |
metadata["ai:task"] = metadata.get("pipeline_tag", "Text Generation") | |
metadata["ai:framework"] = "PyTorch" if "transformers" in metadata.get("library_name", "") else "Unknown" | |
return {k: v for k, v in metadata.items() if v is not None} | |
def _extract_unstructured_metadata(self, model_card: Optional[ModelCard], model_id: str) -> Dict[str, Any]: | |
""" | |
Extract additional metadata from model card using BERT model. | |
This is a placeholder implementation that would be replaced with actual BERT inference. | |
In a real implementation, this would: | |
1. Extract text from model card | |
2. Use BERT to identify key information | |
3. Structure the extracted information | |
For now, we'll simulate this with some basic extraction logic. | |
""" | |
enhanced_metadata = {} | |
# In a real implementation, we would use a BERT model here | |
# Since we can't install the required libraries due to space constraints, | |
# we'll simulate the enhancement with a placeholder implementation | |
if model_card and hasattr(model_card, "text"): | |
card_text = model_card.text | |
# Simulate BERT extraction with basic text analysis | |
# In reality, this would be done with NLP models | |
# Extract description if missing | |
if card_text and "description" not in enhanced_metadata: | |
# Take first paragraph that's longer than 20 chars as description | |
paragraphs = [p.strip() for p in card_text.split('\n\n')] | |
for p in paragraphs: | |
if len(p) > 20 and not p.startswith('#'): | |
enhanced_metadata["description"] = p | |
break | |
# Extract limitations if present | |
if "limitations" not in enhanced_metadata: | |
if "## Limitations" in card_text: | |
limitations_section = card_text.split("## Limitations")[1].split("##")[0].strip() | |
if limitations_section: | |
enhanced_metadata["limitations"] = limitations_section | |
# Extract ethical considerations if present | |
if "ethical_considerations" not in enhanced_metadata: | |
for heading in ["## Ethical Considerations", "## Ethics", "## Bias"]: | |
if heading in card_text: | |
section = card_text.split(heading)[1].split("##")[0].strip() | |
if section: | |
enhanced_metadata["ethical_considerations"] = section | |
break | |
# Extract risks if present | |
if "risks" not in enhanced_metadata: | |
if "## Risks" in card_text: | |
risks_section = card_text.split("## Risks")[1].split("##")[0].strip() | |
if risks_section: | |
enhanced_metadata["risks"] = risks_section | |
return enhanced_metadata | |
def _create_metadata_section(self, model_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]: | |
timestamp = datetime.datetime.utcnow().isoformat() + "Z" | |
tools = [{ | |
"vendor": "Aetheris AI", | |
"name": "aibom-generator", | |
"version": "0.1.0" | |
}] | |
authors = [] | |
if "author" in metadata and metadata["author"]: | |
authors.append({ | |
"name": metadata["author"], | |
"url": f"https://huggingface.co/{metadata['author']}" | |
}) | |
component = { | |
"type": "machine-learning-model", | |
"name": metadata.get("name", model_id.split("/")[-1]), | |
"bom-ref": f"pkg:generic/{model_id.replace('/', '%2F')}" | |
} | |
properties = [] | |
for key, value in metadata.items(): | |
if key not in ["name", "author", "license"] and value is not None: | |
if isinstance(value, (list, dict)): | |
value = json.dumps(value) | |
properties.append({"name": key, "value": str(value)}) | |
metadata_section = { | |
"timestamp": timestamp, | |
"tools": tools, | |
"component": component | |
} | |
if authors: | |
metadata_section["authors"] = authors | |
if properties: | |
metadata_section["properties"] = properties | |
return metadata_section | |
def _create_component_section(self, model_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]: | |
component = { | |
"type": "machine-learning-model", | |
"bom-ref": f"pkg:generic/{model_id.replace('/', '%2F')}", | |
"name": metadata.get("name", model_id.split("/")[-1]), | |
"purl": f"pkg:huggingface/{model_id.replace('/', '/')}" | |
} | |
if "description" in metadata: | |
component["description"] = metadata["description"] | |
if "commit" in metadata: | |
component["version"] = metadata["commit"] | |
if "license" in metadata: | |
component["licenses"] = [{"license": {"id": metadata["license"]}}] | |
external_refs = [{ | |
"type": "website", | |
"url": f"https://huggingface.co/{model_id}" | |
}] | |
if "commit_url" in metadata: | |
external_refs.append({ | |
"type": "vcs", | |
"url": metadata["commit_url"] | |
}) | |
component["externalReferences"] = external_refs | |
component["modelCard"] = self._create_model_card_section(metadata) | |
return component | |
def _create_model_card_section(self, metadata: Dict[str, Any]) -> Dict[str, Any]: | |
model_card_section = {} | |
model_parameters = {k: metadata[k] for k in ["base_model", "library_name", "pipeline_tag"] if k in metadata} | |
if model_parameters: | |
model_card_section["modelParameters"] = model_parameters | |
if "eval_results" in metadata: | |
model_card_section["quantitativeAnalysis"] = {"performanceMetrics": metadata["eval_results"]} | |
considerations = {} | |
for k in ["limitations", "ethical_considerations", "bias", "risks"]: | |
if k in metadata: | |
considerations[k] = metadata[k] | |
if considerations: | |
model_card_section["considerations"] = considerations | |
return model_card_section | |