a1c00l's picture
Update src/aibom_generator/generator.py
018daa2 verified
raw
history blame
14.4 kB
import json
import uuid
import datetime
from typing import Dict, Optional, Any
from huggingface_hub import HfApi, ModelCard
from .utils import calculate_completeness_score
class AIBOMGenerator:
def __init__(
self,
hf_token: Optional[str] = None,
inference_model_url: Optional[str] = None,
use_inference: bool = True,
cache_dir: Optional[str] = None,
):
self.hf_api = HfApi(token=hf_token)
self.inference_model_url = inference_model_url
self.use_inference = use_inference
self.cache_dir = cache_dir
def generate_aibom(
self,
model_id: str,
output_file: Optional[str] = None,
include_inference: Optional[bool] = None,
) -> Dict[str, Any]:
use_inference = include_inference if include_inference is not None else self.use_inference
model_info = self._fetch_model_info(model_id)
model_card = self._fetch_model_card(model_id)
# Store original metadata before any AI enhancement
original_metadata = self._extract_structured_metadata(model_id, model_info, model_card)
# Create initial AIBOM with original metadata
original_aibom = self._create_aibom_structure(model_id, original_metadata)
# Calculate initial score
original_score = calculate_completeness_score(original_aibom)
# Final metadata starts with original metadata
final_metadata = original_metadata.copy()
# Apply AI enhancement if requested
ai_enhanced = False
ai_model_name = None
if use_inference and self.inference_model_url:
try:
# Extract additional metadata using AI
enhanced_metadata = self._extract_unstructured_metadata(model_card, model_id)
# If we got enhanced metadata, merge it with original
if enhanced_metadata:
ai_enhanced = True
ai_model_name = "BERT-base-uncased" # Will be replaced with actual model name
# Merge enhanced metadata with original (enhanced takes precedence)
for key, value in enhanced_metadata.items():
if value is not None and (key not in final_metadata or not final_metadata[key]):
final_metadata[key] = value
except Exception as e:
print(f"Error during AI enhancement: {e}")
# Continue with original metadata if enhancement fails
# Create final AIBOM with potentially enhanced metadata
aibom = self._create_aibom_structure(model_id, final_metadata)
# Calculate final score
final_score = calculate_completeness_score(aibom)
# Add score and enhancement info to metadata properties
if "metadata" in aibom and "properties" not in aibom["metadata"]:
aibom["metadata"]["properties"] = []
if "metadata" in aibom and "properties" in aibom["metadata"]:
# Add score information
aibom["metadata"]["properties"].append({"name": "aibom:quality-score", "value": str(final_score["total_score"])})
aibom["metadata"]["properties"].append({"name": "aibom:quality-breakdown", "value": json.dumps(final_score["section_scores"])})
aibom["metadata"]["properties"].append({"name": "aibom:max-scores", "value": json.dumps(final_score["max_scores"])})
# Add AI enhancement information
if ai_enhanced:
aibom["metadata"]["properties"].append({"name": "aibom:ai-enhanced", "value": "true"})
aibom["metadata"]["properties"].append({"name": "aibom:ai-model", "value": ai_model_name})
aibom["metadata"]["properties"].append({"name": "aibom:original-score", "value": str(original_score["total_score"])})
aibom["metadata"]["properties"].append({"name": "aibom:score-improvement",
"value": str(round(final_score["total_score"] - original_score["total_score"], 2))})
if output_file:
with open(output_file, 'w') as f:
json.dump(aibom, f, indent=2)
# Create enhancement report for UI display
enhancement_report = {
"ai_enhanced": ai_enhanced,
"ai_model": ai_model_name if ai_enhanced else None,
"original_score": original_score,
"final_score": final_score,
"improvement": round(final_score["total_score"] - original_score["total_score"], 2) if ai_enhanced else 0
}
return aibom, enhancement_report
def _fetch_model_info(self, model_id: str) -> Dict[str, Any]:
try:
return self.hf_api.model_info(model_id)
except Exception as e:
print(f"Error fetching model info for {model_id}: {e}")
return {}
def _fetch_model_card(self, model_id: str) -> Optional[ModelCard]:
try:
return ModelCard.load(model_id)
except Exception as e:
print(f"Error fetching model card for {model_id}: {e}")
return None
def _create_aibom_structure(
self,
model_id: str,
metadata: Dict[str, Any],
) -> Dict[str, Any]:
aibom = {
"bomFormat": "CycloneDX",
"specVersion": "1.6",
"serialNumber": f"urn:uuid:{str(uuid.uuid4())}",
"version": 1,
"metadata": self._create_metadata_section(model_id, metadata),
"components": [self._create_component_section(model_id, metadata)],
"dependencies": [
{
"ref": f"pkg:generic/{model_id.replace('/', '%2F')}",
"dependsOn": ["pkg:pypi/[email protected]"]
}
]
}
return aibom
def _extract_structured_metadata(
self,
model_id: str,
model_info: Dict[str, Any],
model_card: Optional[ModelCard],
) -> Dict[str, Any]:
metadata = {}
if model_info:
metadata.update({
"name": model_info.modelId.split("/")[-1] if hasattr(model_info, "modelId") else model_id.split("/")[-1],
"author": model_info.author if hasattr(model_info, "author") else None,
"tags": model_info.tags if hasattr(model_info, "tags") else [],
"pipeline_tag": model_info.pipeline_tag if hasattr(model_info, "pipeline_tag") else None,
"downloads": model_info.downloads if hasattr(model_info, "downloads") else 0,
"last_modified": model_info.lastModified if hasattr(model_info, "lastModified") else None,
"commit": model_info.sha[:7] if hasattr(model_info, "sha") and model_info.sha else None,
"commit_url": f"https://huggingface.co/{model_id}/commit/{model_info.sha}" if hasattr(model_info, "sha") and model_info.sha else None,
})
if model_card and model_card.data:
card_data = model_card.data.to_dict() if hasattr(model_card.data, "to_dict") else {}
metadata.update({
"language": card_data.get("language"),
"license": card_data.get("license"),
"library_name": card_data.get("library_name"),
"base_model": card_data.get("base_model"),
"datasets": card_data.get("datasets"),
"model_name": card_data.get("model_name"),
"tags": card_data.get("tags", metadata.get("tags", [])),
"description": card_data.get("model_summary", None)
})
if hasattr(model_card.data, "eval_results") and model_card.data.eval_results:
metadata["eval_results"] = model_card.data.eval_results
metadata["ai:type"] = "Transformer"
metadata["ai:task"] = metadata.get("pipeline_tag", "Text Generation")
metadata["ai:framework"] = "PyTorch" if "transformers" in metadata.get("library_name", "") else "Unknown"
return {k: v for k, v in metadata.items() if v is not None}
def _extract_unstructured_metadata(self, model_card: Optional[ModelCard], model_id: str) -> Dict[str, Any]:
"""
Extract additional metadata from model card using BERT model.
This is a placeholder implementation that would be replaced with actual BERT inference.
In a real implementation, this would:
1. Extract text from model card
2. Use BERT to identify key information
3. Structure the extracted information
For now, we'll simulate this with some basic extraction logic.
"""
enhanced_metadata = {}
# In a real implementation, we would use a BERT model here
# Since we can't install the required libraries due to space constraints,
# we'll simulate the enhancement with a placeholder implementation
if model_card and hasattr(model_card, "text"):
card_text = model_card.text
# Simulate BERT extraction with basic text analysis
# In reality, this would be done with NLP models
# Extract description if missing
if card_text and "description" not in enhanced_metadata:
# Take first paragraph that's longer than 20 chars as description
paragraphs = [p.strip() for p in card_text.split('\n\n')]
for p in paragraphs:
if len(p) > 20 and not p.startswith('#'):
enhanced_metadata["description"] = p
break
# Extract limitations if present
if "limitations" not in enhanced_metadata:
if "## Limitations" in card_text:
limitations_section = card_text.split("## Limitations")[1].split("##")[0].strip()
if limitations_section:
enhanced_metadata["limitations"] = limitations_section
# Extract ethical considerations if present
if "ethical_considerations" not in enhanced_metadata:
for heading in ["## Ethical Considerations", "## Ethics", "## Bias"]:
if heading in card_text:
section = card_text.split(heading)[1].split("##")[0].strip()
if section:
enhanced_metadata["ethical_considerations"] = section
break
# Extract risks if present
if "risks" not in enhanced_metadata:
if "## Risks" in card_text:
risks_section = card_text.split("## Risks")[1].split("##")[0].strip()
if risks_section:
enhanced_metadata["risks"] = risks_section
return enhanced_metadata
def _create_metadata_section(self, model_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
timestamp = datetime.datetime.utcnow().isoformat() + "Z"
tools = [{
"vendor": "Aetheris AI",
"name": "aibom-generator",
"version": "0.1.0"
}]
authors = []
if "author" in metadata and metadata["author"]:
authors.append({
"name": metadata["author"],
"url": f"https://huggingface.co/{metadata['author']}"
})
component = {
"type": "machine-learning-model",
"name": metadata.get("name", model_id.split("/")[-1]),
"bom-ref": f"pkg:generic/{model_id.replace('/', '%2F')}"
}
properties = []
for key, value in metadata.items():
if key not in ["name", "author", "license"] and value is not None:
if isinstance(value, (list, dict)):
value = json.dumps(value)
properties.append({"name": key, "value": str(value)})
metadata_section = {
"timestamp": timestamp,
"tools": tools,
"component": component
}
if authors:
metadata_section["authors"] = authors
if properties:
metadata_section["properties"] = properties
return metadata_section
def _create_component_section(self, model_id: str, metadata: Dict[str, Any]) -> Dict[str, Any]:
component = {
"type": "machine-learning-model",
"bom-ref": f"pkg:generic/{model_id.replace('/', '%2F')}",
"name": metadata.get("name", model_id.split("/")[-1]),
"purl": f"pkg:huggingface/{model_id.replace('/', '/')}"
}
if "description" in metadata:
component["description"] = metadata["description"]
if "commit" in metadata:
component["version"] = metadata["commit"]
if "license" in metadata:
component["licenses"] = [{"license": {"id": metadata["license"]}}]
external_refs = [{
"type": "website",
"url": f"https://huggingface.co/{model_id}"
}]
if "commit_url" in metadata:
external_refs.append({
"type": "vcs",
"url": metadata["commit_url"]
})
component["externalReferences"] = external_refs
component["modelCard"] = self._create_model_card_section(metadata)
return component
def _create_model_card_section(self, metadata: Dict[str, Any]) -> Dict[str, Any]:
model_card_section = {}
model_parameters = {k: metadata[k] for k in ["base_model", "library_name", "pipeline_tag"] if k in metadata}
if model_parameters:
model_card_section["modelParameters"] = model_parameters
if "eval_results" in metadata:
model_card_section["quantitativeAnalysis"] = {"performanceMetrics": metadata["eval_results"]}
considerations = {}
for k in ["limitations", "ethical_considerations", "bias", "risks"]:
if k in metadata:
considerations[k] = metadata[k]
if considerations:
model_card_section["considerations"] = considerations
return model_card_section