import math, json import gradio as gr import torch, pandas as pd import matplotlib.pyplot as plt import seaborn as sns from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline # ZeroGPU support try: import spaces ZEROGPU_AVAILABLE = True print("ZeroGPU support enabled") except ImportError: ZEROGPU_AVAILABLE = False print("ZeroGPU not available, running in standard mode") # Create dummy decorator for local development def spaces_gpu_decorator(duration=60): def decorator(func): return func return decorator spaces = type('spaces', (), {'GPU': spaces_gpu_decorator}) # Model configuration - Foundation-Sec-8B only MODEL_NAME = "fdtn-ai/Foundation-Sec-8B" # Initialize tokenizer and model using pipeline approach print(f"Loading model: {MODEL_NAME}") try: print(f"Initializing Foundation-Sec-8B model...") tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) text_pipeline = pipeline( "text-generation", model=MODEL_NAME, tokenizer=tokenizer, torch_dtype=torch.bfloat16, device_map="auto", trust_remote_code=True ) print(f"Foundation-Sec-8B model initialized successfully") # Extract model and tokenizer from pipeline for direct access model = text_pipeline.model tok = text_pipeline.tokenizer except Exception as e: print(f"Error initializing Foundation-Sec-8B model: {str(e)}") print("Trying with simplified parameters...") try: # Try with simpler parameters text_pipeline = pipeline( "text-generation", model=MODEL_NAME, trust_remote_code=True ) model = text_pipeline.model tok = text_pipeline.tokenizer print(f"Foundation-Sec-8B model loaded with simplified parameters") except Exception as e2: print(f"Failed to load Foundation-Sec-8B model: {str(e2)}") raise RuntimeError(f"Could not load Foundation-Sec-8B model. Please ensure the model is accessible and try again. Error: {str(e2)}") # Log device information if hasattr(model, 'device'): print(f"Model loaded on device: {model.device}") else: device_info = next(model.parameters()).device print(f"Model parameters on device: {device_info}") print(f"CUDA available: {torch.cuda.is_available()}") if torch.cuda.is_available(): print(f"CUDA device count: {torch.cuda.device_count()}") print(f"Current CUDA device: {torch.cuda.current_device()}") print(f"CUDA device name: {torch.cuda.get_device_name()}") # Configuration parameters LEN_ALPHA = 0.7 # Length correction factor (0=no correction, 1=full average logP) # Sample data for testing CAMPAIGN_LIST = [ "Operation Aurora", "Dust Storm", "ShadowHammer", "NotPetya", "SolarWinds", ] ACTOR_LIST = ["APT1", "APT28", "APT33", "APT38", "FIN8"] # Sample ATT&CK technique IDs with names TECHNIQUE_LIST = [ "T1059 Command and Scripting Interpreter", "T1566 Phishing", "T1027 Obfuscated/Stored Files", "T1036 Masquerading", "T1105 Ingress Tool Transfer", "T1018 Remote System Discovery", "T1568 Dynamic Resolution", ] @torch.no_grad() def phrase_log_prob(prompt, phrase): """Calculate log probability of a phrase given a prompt using the language model.""" try: # Log GPU usage information device_info = next(model.parameters()).device print(f"Running phrase_log_prob on device: {device_info}") ids_prompt = tok(prompt, return_tensors="pt").to(model.device)["input_ids"][0] ids_phrase = tok(phrase, add_special_tokens=False)["input_ids"] lp = 0.0 cur = ids_prompt.unsqueeze(0) for tid in ids_phrase: logits = model(cur).logits[0, -1].float() lp += torch.log_softmax(logits, -1)[tid].item() cur = torch.cat([cur, torch.tensor([[tid]], device=model.device)], 1) return lp except Exception as e: print(f"Error in phrase_log_prob: {e}") raise e def binary_assoc_score(prompt: str, phrase: str, neg="does NOT use", prompt_template="typically uses") -> float: """ Calculate binary association score: p ≈ P(use) / (P(use)+P(not use)) Applies length normalization to correct for longer phrases. Args: prompt: Base prompt string phrase: Phrase to evaluate neg: Negative template to replace positive template prompt_template: Positive template to be replaced Returns: Length-normalized association score between 0 and 1 """ lp_pos = phrase_log_prob(prompt, phrase) lp_neg = phrase_log_prob(prompt.replace(prompt_template, neg), phrase) # Logistic transformation prob = 1 / (1 + math.exp(lp_neg - lp_pos)) # Length normalization n_tok = len(tok(phrase, add_special_tokens=False)["input_ids"]) return prob / (n_tok ** LEN_ALPHA) def campaign_actor_associations(campaigns, actors): """Campaign × Actor の関連度を計算し、各CampaignごとにTop Actorを返す""" results = {} for camp in campaigns: prompt_base = CAMPAIGN_ACTOR_PROMPT.format(campaign=camp) actor_scores = {} for actor in actors: score = binary_assoc_score(prompt_base, actor, neg="is NOT associated with") actor_scores[actor] = score # スコア順でソート sorted_actors = sorted(actor_scores.items(), key=lambda x: x[1], reverse=True) results[camp] = sorted_actors return results def campaign_technique_matrix(campaigns, techniques, prompt_template="typically uses", neg_template="typically does NOT use"): """ Generate Campaign × Technique association matrix using binary scoring. Args: campaigns: List of campaign names techniques: List of technique names prompt_template: Template for positive association neg_template: Template for negative association Returns: DataFrame with campaigns as rows, techniques as columns, scores as values """ rows = {} for camp in campaigns: prompt_base = f"{camp} {prompt_template}" rows[camp] = { tech: binary_assoc_score(prompt_base, tech, neg=neg_template, prompt_template=prompt_template) for tech in techniques } return pd.DataFrame.from_dict(rows, orient="index") def campaign_actor_matrix(campaigns, actors): """Campaign × Actor 行列を生成""" rows = {} for camp in campaigns: prompt_base = CAMPAIGN_ACTOR_PROMPT.format(campaign=camp) rows[camp] = { actor: binary_assoc_score(prompt_base, actor, neg="is NOT associated with") for actor in actors } return pd.DataFrame.from_dict(rows, orient="index") def campaign_actor_probs(campaigns, actors, prompt_template="is conducted by"): """ Generate Campaign × Actor probability matrix using softmax normalization. Args: campaigns: List of campaign names actors: List of actor names prompt_template: Template for actor association prompt Returns: DataFrame with campaigns as rows, actors as columns, probabilities as values """ rows = {} for camp in campaigns: prompt = f"{camp} {prompt_template}" logps = [phrase_log_prob(prompt, a) for a in actors] # Softmax normalization (with max-shift for numerical stability) m = max(logps) ps = [math.exp(lp - m) for lp in logps] s = sum(ps) rows[camp] = {a: p/s for a, p in zip(actors, ps)} return pd.DataFrame.from_dict(rows, orient="index") @spaces.GPU(duration=120) def generate_actor_heatmap(c_list, a_list, actor_prompt_template): """Generate Campaign-Actor association heatmap with probability visualization.""" try: campaigns = [c.strip() for c in c_list.split(",") if c.strip()] actors = [a.strip() for a in a_list.split(",") if a.strip()] if not campaigns or not actors: fig, ax = plt.subplots(figsize=(8, 6)) ax.text(0.5, 0.5, 'Please enter both Campaigns and Actors', ha='center', va='center', fontsize=16) ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.axis('off') return fig print(f"Processing {len(campaigns)} campaigns and {len(actors)} actors...") print(f"Using prompt template: '{actor_prompt_template}'") # Check GPU availability if torch.cuda.is_available(): print(f"GPU computation enabled - Device: {torch.cuda.get_device_name()}") else: print("Running on CPU") # Calculate probability matrix df_ca = campaign_actor_probs(campaigns, actors, actor_prompt_template) print(f"Actor probability matrix shape: {df_ca.shape}") print("Actor probability matrix:") print(df_ca.round(4)) # Create heatmap with matplotlib/seaborn fig, ax = plt.subplots(figsize=(max(8, len(actors)*1.2), max(6, len(campaigns)*0.8))) sns.heatmap(df_ca, annot=True, cmap='plasma', fmt='.3f', cbar_kws={'label': 'P(actor)'}, ax=ax) ax.set_title('Campaign-Actor Probabilities (softmax normalized)', fontsize=14, pad=20) ax.set_xlabel('Actor', fontsize=12) ax.set_ylabel('Campaign', fontsize=12) # Adjust label rotation plt.setp(ax.get_xticklabels(), rotation=45, ha='right') plt.setp(ax.get_yticklabels(), rotation=0) plt.tight_layout() print("Actor heatmap generated successfully!") return fig except Exception as e: print(f"Error in generate_actor_heatmap: {e}") import traceback traceback.print_exc() fig, ax = plt.subplots(figsize=(8, 6)) ax.text(0.5, 0.5, f'Error occurred: {str(e)}', ha='center', va='center', fontsize=12, color='red') ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.axis('off') return fig @spaces.GPU(duration=120) def generate_technique_heatmap(c_list, t_list, technique_prompt_template, technique_neg_template): """Generate Campaign-Technique association heatmap with binary scoring visualization.""" try: campaigns = [c.strip() for c in c_list.split(",") if c.strip()] techniques = [t.strip() for t in t_list.split(",") if t.strip()] if not campaigns or not techniques: fig, ax = plt.subplots(figsize=(8, 6)) ax.text(0.5, 0.5, 'Please enter both Campaigns and Techniques', ha='center', va='center', fontsize=16) ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.axis('off') return fig print(f"Processing {len(campaigns)} campaigns and {len(techniques)} techniques...") print(f"Using prompt templates: '{technique_prompt_template}' / '{technique_neg_template}'") # Check GPU availability if torch.cuda.is_available(): print(f"GPU computation enabled - Device: {torch.cuda.get_device_name()}") else: print("Running on CPU") # Calculate score matrix df_ct = campaign_technique_matrix(campaigns, techniques, technique_prompt_template, technique_neg_template) print(f"Score matrix shape: {df_ct.shape}") print("Score matrix:") print(df_ct.round(4)) # Create heatmap with matplotlib/seaborn fig, ax = plt.subplots(figsize=(max(8, len(techniques)*1.2), max(6, len(campaigns)*0.8))) sns.heatmap(df_ct, annot=True, cmap='viridis', fmt='.3f', cbar_kws={'label': 'Association Score'}, ax=ax) ax.set_title('Campaign-Technique Associations (len-norm, independent)', fontsize=14, pad=20) ax.set_xlabel('Technique', fontsize=12) ax.set_ylabel('Campaign', fontsize=12) # Adjust label rotation plt.setp(ax.get_xticklabels(), rotation=45, ha='right') plt.setp(ax.get_yticklabels(), rotation=0) plt.tight_layout() print("Technique heatmap generated successfully!") return fig except Exception as e: print(f"Error in generate_technique_heatmap: {e}") import traceback traceback.print_exc() fig, ax = plt.subplots(figsize=(8, 6)) ax.text(0.5, 0.5, f'Error occurred: {str(e)}', ha='center', va='center', fontsize=12, color='red') ax.set_xlim(0, 1) ax.set_ylim(0, 1) ax.axis('off') return fig with gr.Blocks(title="LLM Threat Graph Demo") as demo: gr.Markdown("# 🕸️ LLM Threat Association Analysis\n*Visualizing Campaign-Actor-Technique relationships using Language Models*") # Common inputs with gr.Row(): campaigns = gr.Textbox( "Operation Aurora, Dust Storm, ShadowHammer, NotPetya, SolarWinds", label="Campaigns (comma-separated)", placeholder="e.g., Operation Aurora, NotPetya, Stuxnet" ) # Campaign-Actor section (probabilistic) gr.Markdown("## 👤 Campaign-Actor Associations") gr.Markdown("Visualizing Campaign-Actor relationships with probabilistic heatmaps") gr.Markdown(""" **Calculation Method**: `P(actor | "{campaign} is conducted by") (softmax normalized)` 1. Calculate `phrase_log_prob("{campaign} is conducted by", actor)` for each Actor 2. Apply softmax normalization to create probability distribution (probabilities sum to 1.0 per Campaign) 3. Result: Shows relative likelihood of each Actor conducting each Campaign """) with gr.Row(): actor_prompt_template = gr.Textbox( "is conducted by", label="Actor Prompt Template", placeholder="e.g., is conducted by, is attributed to" ) actors = gr.Textbox( "APT1, APT28, APT33, APT38, FIN8", label="Actors (comma-separated)", placeholder="e.g., APT1, Lazarus Group, Cozy Bear" ) btn_actor = gr.Button("Generate Actor Heatmap", variant="primary") plot_actor = gr.Plot(label="Campaign-Actor Heatmap") btn_actor.click( fn=generate_actor_heatmap, inputs=[campaigns, actors, actor_prompt_template], outputs=plot_actor, show_progress=True ) # Campaign-Technique section (independent scoring) gr.Markdown("## 🛠️ Campaign-Technique Associations") gr.Markdown("Visualizing Campaign-Technique relationships with independent association scores") gr.Markdown(""" **Calculation Method**: `Binary Association Score (length-normalized, independent)` 1. For each Technique, calculate: - `lp_pos = phrase_log_prob("{campaign} typically uses", technique)` - `lp_neg = phrase_log_prob("{campaign} typically does NOT use", technique)` 2. Apply logistic transformation: `prob = 1 / (1 + exp(lp_neg - lp_pos))` 3. Length normalization: `score = prob / (n_tokens^0.7)` (penalty for longer phrases) 4. Result: Independent association scores (0-1) for each Campaign-Technique pair """) with gr.Row(): technique_prompt_template = gr.Textbox( "typically uses", label="Technique Prompt Template (positive)", placeholder="e.g., typically uses, commonly employs" ) technique_neg_template = gr.Textbox( "typically does NOT use", label="Technique Prompt Template (negative)", placeholder="e.g., typically does NOT use, never employs" ) techniques = gr.Textbox( "T1059 Command and Scripting Interpreter, T1566 Phishing, T1027 Obfuscated/Stored Files, T1036 Masquerading, T1105 Ingress Tool Transfer, T1018 Remote System Discovery, T1568 Dynamic Resolution", label="Techniques (comma-separated)", placeholder="e.g., T1059 Command and Scripting Interpreter, T1566 Phishing" ) btn_technique = gr.Button("Generate Technique Heatmap", variant="primary") plot_technique = gr.Plot(label="Campaign-Technique Heatmap") btn_technique.click( fn=generate_technique_heatmap, inputs=[campaigns, techniques, technique_prompt_template, technique_neg_template], outputs=plot_technique, show_progress=True ) demo.launch()