Spaces:
Sleeping
Sleeping
File size: 6,704 Bytes
5b76d85 3966c38 5b76d85 3966c38 5b76d85 3966c38 5b76d85 3966c38 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
from typing import List, Optional, Union, Dict, Any
from langgraph.graph.message import MessagesState
from langchain_core.messages import AIMessage, HumanMessage
from pydantic import BaseModel
from collections import OrderedDict
import re
class AttackState(MessagesState, total=False):
"""State for the ATT&CK Navigator workflow"""
attack_json: Optional[Dict[str, Any]] = None
scenario: Optional[str] = None
is_valid_context: Optional[bool] = None
extracted_user_scenario: Optional[str] = None
extracted_user_layer_operation: Optional[str] = None
class SecuritySurveyState(MessagesState, total=False):
"""State for the Security Survey workflow"""
security_checklist: OrderedDict = None # 質問リスト
current_part: str = None # 今のパート名
current_question_index: int = 0 # 今のパート内の質問番号
answers: dict = None # 回答を格納
is_survey_complete: bool = False # 全質問終了フラグ
awaiting_clear_answer: bool = False # 明確な回答待ちフラグ
last_question: str = None # 直近の質問内容
expecting_answer: bool = False # 回答待ちフラグ
is_new_session: bool = True # 新規セッションフラグ
def get_initial_state() -> AttackState:
"""Get the initial state for the workflow"""
return AttackState(
messages=[],
attack_json=None,
scenario=None,
is_valid_context=None,
extracted_user_scenario=None,
extracted_user_layer_operation=None
)
def get_security_survey_initial_state(security_checklist: OrderedDict) -> SecuritySurveyState:
"""Get the initial state for the security survey workflow"""
first_part = next(iter(security_checklist))
return SecuritySurveyState(
messages=[],
security_checklist=security_checklist,
current_part=first_part,
current_question_index=0,
answers={part: {} for part in security_checklist},
is_survey_complete=False,
awaiting_clear_answer=False,
last_question=None,
expecting_answer=False,
is_new_session=True
)
def add_user_message(state: AttackState, content: str) -> AttackState:
"""Add a user message to the state"""
state['messages'].append(HumanMessage(content=content))
return state
def add_ai_message(state: AttackState, content: str) -> AttackState:
"""Add an AI message to the state"""
state['messages'].append(AIMessage(content=content))
return state
def set_attack_json(state: AttackState, attack_json: Dict[str, Any]) -> AttackState:
"""Set the ATT&CK JSON in the state"""
state['attack_json'] = attack_json
return state
def set_scenario(state: AttackState, scenario: str) -> AttackState:
"""Set the scenario text in the state"""
state['scenario'] = scenario
return state
def set_valid_context(state: AttackState, is_valid: bool) -> AttackState:
"""Set the context validity in the state"""
state['is_valid_context'] = is_valid
return state
def evaluate_answer(answer: str) -> Optional[bool]:
"""ユーザの回答からTrue/Falseを判定する。判断できない場合はNoneを返す"""
positive_patterns = [
r'(はい|イエス|yes|hai|true|正しい|実施|行[っな]て|対策済み|している|いる|やっている|やってる|対応している|対策している|やっております|している)',
r'導入しています',
r'設定しています',
r'確認しています',
r'実施しています'
]
negative_patterns = [
r'(いいえ|ノー|no|iie|false|違う|違います|してない|していない|いない|やっていない|対応していない|対策していない|行[っな]ていない)',
r'導入していません',
r'設定していません',
r'確認していません',
r'実施していません'
]
answer = answer.lower()
for pattern in positive_patterns:
if re.search(pattern, answer):
return True
for pattern in negative_patterns:
if re.search(pattern, answer):
return False
return None # 判断できない場合
def process_answer(state: SecuritySurveyState, answer: str) -> SecuritySurveyState:
"""ユーザの回答を処理する"""
if not state.get('expecting_answer', False):
# 回答待ちでなければ何もしない
return state
part = state['current_part']
idx = state['current_question_index']
questions = list(state['security_checklist'][part].keys())
question = questions[idx]
# 回答を評価
answer_value = evaluate_answer(answer)
if answer_value is None:
# 曖昧な回答の場合
state['awaiting_clear_answer'] = True
else:
# 明確な回答の場合
state['awaiting_clear_answer'] = False
state['answers'][part][question] = answer_value
# 次の質問インデックスを設定
if idx + 1 < len(questions):
state['current_question_index'] += 1
else:
# 次のパートへ
parts = list(state['security_checklist'].keys())
current_part_idx = parts.index(part)
if current_part_idx + 1 < len(parts):
state['current_part'] = parts[current_part_idx + 1]
state['current_question_index'] = 0
else:
# 全て終了
state['is_survey_complete'] = True
# 回答待ちフラグをオフ
state['expecting_answer'] = False
return state
def get_next_question(state: SecuritySurveyState) -> Optional[str]:
"""次の質問を取得する。全て終了している場合はNoneを返す"""
if state['is_survey_complete']:
return None
part = state['current_part']
idx = state['current_question_index']
questions = list(state['security_checklist'][part].keys())
question_text = questions[idx]
# 明確な回答が必要な場合は追加メッセージをつける
if state.get('awaiting_clear_answer', False):
full_question = f"【{part}】\n{question_text}\n\n※「はい」か「いいえ」ではっきりお答えください"
else:
full_question = f"【{part}】\n{question_text}"
# 質問を保存
state['last_question'] = full_question
state['expecting_answer'] = True
return full_question
def has_unanswered_questions(state: SecuritySurveyState) -> bool:
"""未回答の質問があるかチェック"""
return not state['is_survey_complete'] |