nyasukun commited on
Commit
3966c38
·
1 Parent(s): fa3b0ec

initial commit for security survey

Browse files
Files changed (4) hide show
  1. app.py +19 -28
  2. models/chat_state.py +127 -1
  3. security_checklist.py +38 -0
  4. workflows/chat_workflow.py +81 -358
app.py CHANGED
@@ -1,52 +1,43 @@
1
  import chainlit as cl
2
- from models.chat_state import get_initial_state, add_user_message
3
- from workflows.chat_workflow import chainlit_app
 
 
4
 
5
- # Chainlit message handler
6
  @cl.on_message
7
  async def process_user_message(message: cl.Message):
8
- """Process user message and generate response"""
9
- # Get state from session
10
  state = cl.user_session.get("state")
11
 
12
- # Add user message to state
13
- state = add_user_message(state, message.content)
14
 
15
- # Generate response
16
- state = await chainlit_app.ainvoke(state)
17
 
18
- # Update state in session
19
  cl.user_session.set("state", state)
20
 
21
  @cl.on_chat_start
22
  async def start():
23
- """
24
- This function is called when a new chat starts.
25
- Initialize the chat state and store it in the session
26
- """
27
- # Initialize state
28
- state = get_initial_state()
29
-
30
- # Store state in session
31
  cl.user_session.set("state", state)
32
-
33
-
 
34
 
35
  @cl.set_chat_profiles
36
  async def chat_profile():
37
  return [
38
  cl.ChatProfile(
39
- name="mitre_attck_navigator_layer_writer",
40
- markdown_description="MITRE ATT&CK Navigatorのlayerを生成するチャットボットです。",
41
  icon="public/icon.jpg",
42
  starters=[
43
  cl.Starter(
44
- label="Lockbitの攻撃シナリオ",
45
- message="Lockbitの攻撃シナリオを生成してください"
46
- ),
47
- cl.Starter(
48
- label="ランサムウェアギャングが使っているテクニックの頻度",
49
- message="最近のランサムウェアギャングが使っているテクニックと頻度を色の濃さで表現してください"
50
  ),
51
  ]
52
  )
 
1
  import chainlit as cl
2
+ from models.chat_state import get_security_survey_initial_state
3
+ from workflows.chat_workflow import survey_chainlit_app
4
+ from security_checklist import security_checklist
5
+ from langchain_core.messages import HumanMessage
6
 
 
7
  @cl.on_message
8
  async def process_user_message(message: cl.Message):
9
+ """ユーザメッセージを処理する"""
 
10
  state = cl.user_session.get("state")
11
 
12
+ # ユーザ回答をmessagesに追加(HumanMessage型で追加)
13
+ state['messages'].append(HumanMessage(content=message.content))
14
 
15
+ # ワークフロー進行 - process_input -> display_question or survey_complete
16
+ state = await survey_chainlit_app.ainvoke(state)
17
 
 
18
  cl.user_session.set("state", state)
19
 
20
  @cl.on_chat_start
21
  async def start():
22
+ """チャット開始時の処理"""
23
+ # 初期状態を作成
24
+ state = get_security_survey_initial_state(security_checklist)
 
 
 
 
 
25
  cl.user_session.set("state", state)
26
+
27
+ # 最初の質問を表示するためにワークフローを呼び出す
28
+ await process_user_message(cl.Message(content="セキュリティチェックリスト診断を開始したい"))
29
 
30
  @cl.set_chat_profiles
31
  async def chat_profile():
32
  return [
33
  cl.ChatProfile(
34
+ name="security_survey_bot",
35
+ markdown_description="セキュリティチェックリスト診断を行うチャットボットです。",
36
  icon="public/icon.jpg",
37
  starters=[
38
  cl.Starter(
39
+ label="セキュリティチェックリスト診断",
40
+ message="セキュリティチェックリスト診断を開始したい"
 
 
 
 
41
  ),
42
  ]
43
  )
models/chat_state.py CHANGED
@@ -2,6 +2,8 @@ from typing import List, Optional, Union, Dict, Any
2
  from langgraph.graph.message import MessagesState
3
  from langchain_core.messages import AIMessage, HumanMessage
4
  from pydantic import BaseModel
 
 
5
 
6
  class AttackState(MessagesState, total=False):
7
  """State for the ATT&CK Navigator workflow"""
@@ -11,6 +13,18 @@ class AttackState(MessagesState, total=False):
11
  extracted_user_scenario: Optional[str] = None
12
  extracted_user_layer_operation: Optional[str] = None
13
 
 
 
 
 
 
 
 
 
 
 
 
 
14
  def get_initial_state() -> AttackState:
15
  """Get the initial state for the workflow"""
16
  return AttackState(
@@ -22,6 +36,22 @@ def get_initial_state() -> AttackState:
22
  extracted_user_layer_operation=None
23
  )
24
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  def add_user_message(state: AttackState, content: str) -> AttackState:
26
  """Add a user message to the state"""
27
  state['messages'].append(HumanMessage(content=content))
@@ -45,4 +75,100 @@ def set_scenario(state: AttackState, scenario: str) -> AttackState:
45
  def set_valid_context(state: AttackState, is_valid: bool) -> AttackState:
46
  """Set the context validity in the state"""
47
  state['is_valid_context'] = is_valid
48
- return state
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
  from langgraph.graph.message import MessagesState
3
  from langchain_core.messages import AIMessage, HumanMessage
4
  from pydantic import BaseModel
5
+ from collections import OrderedDict
6
+ import re
7
 
8
  class AttackState(MessagesState, total=False):
9
  """State for the ATT&CK Navigator workflow"""
 
13
  extracted_user_scenario: Optional[str] = None
14
  extracted_user_layer_operation: Optional[str] = None
15
 
16
+ class SecuritySurveyState(MessagesState, total=False):
17
+ """State for the Security Survey workflow"""
18
+ security_checklist: OrderedDict = None # 質問リスト
19
+ current_part: str = None # 今のパート名
20
+ current_question_index: int = 0 # 今のパート内の質問番号
21
+ answers: dict = None # 回答を格納
22
+ is_survey_complete: bool = False # 全質問終了フラグ
23
+ awaiting_clear_answer: bool = False # 明確な回答待ちフラグ
24
+ last_question: str = None # 直近の質問内容
25
+ expecting_answer: bool = False # 回答待ちフラグ
26
+ is_new_session: bool = True # 新規セッションフラグ
27
+
28
  def get_initial_state() -> AttackState:
29
  """Get the initial state for the workflow"""
30
  return AttackState(
 
36
  extracted_user_layer_operation=None
37
  )
38
 
39
+ def get_security_survey_initial_state(security_checklist: OrderedDict) -> SecuritySurveyState:
40
+ """Get the initial state for the security survey workflow"""
41
+ first_part = next(iter(security_checklist))
42
+ return SecuritySurveyState(
43
+ messages=[],
44
+ security_checklist=security_checklist,
45
+ current_part=first_part,
46
+ current_question_index=0,
47
+ answers={part: {} for part in security_checklist},
48
+ is_survey_complete=False,
49
+ awaiting_clear_answer=False,
50
+ last_question=None,
51
+ expecting_answer=False,
52
+ is_new_session=True
53
+ )
54
+
55
  def add_user_message(state: AttackState, content: str) -> AttackState:
56
  """Add a user message to the state"""
57
  state['messages'].append(HumanMessage(content=content))
 
75
  def set_valid_context(state: AttackState, is_valid: bool) -> AttackState:
76
  """Set the context validity in the state"""
77
  state['is_valid_context'] = is_valid
78
+ return state
79
+
80
+ def evaluate_answer(answer: str) -> Optional[bool]:
81
+ """ユーザの回答からTrue/Falseを判定する。判断できない場合はNoneを返す"""
82
+ positive_patterns = [
83
+ r'(はい|イエス|yes|hai|true|正しい|実施|行[っな]て|対策済み|している|いる|やっている|やってる|対応している|対策している|やっております|している)',
84
+ r'導入しています',
85
+ r'設定しています',
86
+ r'確認しています',
87
+ r'実施しています'
88
+ ]
89
+
90
+ negative_patterns = [
91
+ r'(いいえ|ノー|no|iie|false|違う|違います|してない|していない|いない|やっていない|対応していない|対策していない|行[っな]ていない)',
92
+ r'導入していません',
93
+ r'設定していません',
94
+ r'確認していません',
95
+ r'実施していません'
96
+ ]
97
+
98
+ answer = answer.lower()
99
+
100
+ for pattern in positive_patterns:
101
+ if re.search(pattern, answer):
102
+ return True
103
+
104
+ for pattern in negative_patterns:
105
+ if re.search(pattern, answer):
106
+ return False
107
+
108
+ return None # 判断できない場合
109
+
110
+ def process_answer(state: SecuritySurveyState, answer: str) -> SecuritySurveyState:
111
+ """ユーザの回答を処理する"""
112
+ if not state.get('expecting_answer', False):
113
+ # 回答待ちでなければ何もしない
114
+ return state
115
+
116
+ part = state['current_part']
117
+ idx = state['current_question_index']
118
+ questions = list(state['security_checklist'][part].keys())
119
+ question = questions[idx]
120
+
121
+ # 回答を評価
122
+ answer_value = evaluate_answer(answer)
123
+
124
+ if answer_value is None:
125
+ # 曖昧な回答の場合
126
+ state['awaiting_clear_answer'] = True
127
+ else:
128
+ # 明確な回答の場合
129
+ state['awaiting_clear_answer'] = False
130
+ state['answers'][part][question] = answer_value
131
+
132
+ # 次の質問インデックスを設定
133
+ if idx + 1 < len(questions):
134
+ state['current_question_index'] += 1
135
+ else:
136
+ # 次のパートへ
137
+ parts = list(state['security_checklist'].keys())
138
+ current_part_idx = parts.index(part)
139
+ if current_part_idx + 1 < len(parts):
140
+ state['current_part'] = parts[current_part_idx + 1]
141
+ state['current_question_index'] = 0
142
+ else:
143
+ # 全て終了
144
+ state['is_survey_complete'] = True
145
+
146
+ # 回答待ちフラグをオフ
147
+ state['expecting_answer'] = False
148
+ return state
149
+
150
+ def get_next_question(state: SecuritySurveyState) -> Optional[str]:
151
+ """次の質問を取得する。全て終了している場合はNoneを返す"""
152
+ if state['is_survey_complete']:
153
+ return None
154
+
155
+ part = state['current_part']
156
+ idx = state['current_question_index']
157
+ questions = list(state['security_checklist'][part].keys())
158
+ question_text = questions[idx]
159
+
160
+ # 明確な回答が必要な場合は追加メッセージをつける
161
+ if state.get('awaiting_clear_answer', False):
162
+ full_question = f"【{part}】\n{question_text}\n\n※「はい」か「いいえ」ではっきりお答えください"
163
+ else:
164
+ full_question = f"【{part}】\n{question_text}"
165
+
166
+ # 質問を保存
167
+ state['last_question'] = full_question
168
+ state['expecting_answer'] = True
169
+
170
+ return full_question
171
+
172
+ def has_unanswered_questions(state: SecuritySurveyState) -> bool:
173
+ """未回答の質問があるかチェック"""
174
+ return not state['is_survey_complete']
security_checklist.py ADDED
@@ -0,0 +1,38 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from collections import OrderedDict
2
+
3
+ # 未回答を示す特殊値
4
+ UNDEFINED = "未回答"
5
+
6
+ security_checklist = OrderedDict({
7
+ "Part 1: 基本的対策": OrderedDict({
8
+ "パソコンやスマホなど情報機器のOSやソフトウェアは常に最新の状態にしていますか?": UNDEFINED,
9
+ "パソコンやスマホなどにはウイルス対策ソフトを導入し、ウイルス定義ファイルは最新の状態にしていますか?": UNDEFINED,
10
+ "パスワードは破られにくい「長く」「複雑な」パスワードを設定していますか?": UNDEFINED,
11
+ "重要情報に対する適切なアクセス制限を行っていますか?": UNDEFINED
12
+ }),
13
+ "Part 2: 従業員としての対策": OrderedDict({
14
+ "新たな脅威や攻撃の手口を知り対策を社内共有する仕組みはできていますか?": UNDEFINED,
15
+ "電子メールの添付ファイルや本文中のURLリンクを介したウイルス感染に気をつけていますか?": UNDEFINED,
16
+ "電子メールやFAXの宛先の送信ミスを防ぐ取り組みを実施していますか?": UNDEFINED,
17
+ "重要情報は電子メール本文に書くのではなく、添付するファイルに書いてパスワードなどで保護していますか?": UNDEFINED,
18
+ "無線LANを安全に使うために適切な暗号化方式を設定するなどの対策をしていますか?": UNDEFINED,
19
+ "インターネットを介したウイルス感染やSNSへの書き込みなどのトラブルへの対策をしていますか?": UNDEFINED,
20
+ "パソコンやサーバーのウイルス感染、故障や誤操作による重要情報の消失に備えてバックアップを取得していますか?": UNDEFINED,
21
+ "紛失や盗難を防止するため、重要情報が記載された書類や電子媒体は机上に放置せず、書庫などに安全に保管していますか?": UNDEFINED,
22
+ "重要情報が記載された書類や電子媒体を持ち出す時に、盗難や紛失の対策をしていますか?": UNDEFINED,
23
+ "離席時にパソコン画面の見えや勝手に操作できないようにしていますか?": UNDEFINED,
24
+ "関係者以外の事務所への立ち入りを制限していますか?": UNDEFINED,
25
+ "退社時にノートパソコンや備品を施錠保管するなど盗難防止対策をしていますか?": UNDEFINED,
26
+ "事務所が無人になる時の施錠対策を実施していますか?": UNDEFINED,
27
+ "不要になった重要書類や重要データは、復元できないようにして破棄していますか?": UNDEFINED,
28
+ "従業員に守秘義務を課しており、業務上知り得た情報を外部に漏らさないなどのルールを守らせていますか?": UNDEFINED,
29
+ "従業員にセキュリティに関する教育や注意喚起を行っていますか?": UNDEFINED
30
+ }),
31
+ "Part 3: 組織としての対策": OrderedDict({
32
+ "個人所有の情報機器を業務で利用する場合のセキュリティ対策を明確にしていますか?": UNDEFINED,
33
+ "重要情報の授受を行う取引先との契約書に、秘密保持条件を規定していますか?": UNDEFINED,
34
+ "クラウドサービスやアウトソーシング先の選定において、安全・信頼性を把握して選定していますか?": UNDEFINED,
35
+ "セキュリティ事象が発生した場合に備え、緊急時の体制整備や対応手順を作成するなど準備をしていますか?": UNDEFINED,
36
+ "情報セキュリティ対策(上記1〜24など)をルール化し、従業員に明示していますか?": UNDEFINED
37
+ })
38
+ })
workflows/chat_workflow.py CHANGED
@@ -1,378 +1,101 @@
1
  from langgraph.graph import StateGraph, START, END
2
- from langgraph.graph.message import MessagesState
3
- from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
4
- from services.llm_service import LLMService
5
- from services.file_service import save_response, create_file_element
6
- from models.chat_state import AttackState, get_initial_state
7
  import chainlit as cl
8
  import json
9
- import logging
10
- import os
11
- from urllib.parse import quote
12
- # Initialize services
13
- llm_service = LLMService()
14
- logger = logging.getLogger(__name__)
15
 
16
- CHAINLIT_URL = os.environ.get("SPACE_HOST")
17
- if not CHAINLIT_URL:
18
- CHAINLIT_URL = "http://localhost:8080"
19
- if not CHAINLIT_URL.startswith("https://"):
20
- CHAINLIT_URL = "https://" + CHAINLIT_URL
21
-
22
- @cl.step(name="コンテキスト評価", type="evaluation")
23
- async def evaluate_context_node(state: AttackState) -> AttackState:
24
- """Node for evaluating if the user input is valid for ATT&CK context"""
25
- msg = cl.Message(content="")
26
-
27
- # Get the last user message
28
- user_messages = [msg for msg in state['messages'] if isinstance(msg, HumanMessage)]
29
- user_message = user_messages[-1].content if user_messages else ""
30
-
31
- try:
32
- evaluation_result = await llm_service.evaluate_context(user_message)
33
- state['is_valid_context'] = evaluation_result.is_valid
34
- state['extracted_user_scenario'] = evaluation_result.extracted_scenario
35
- state['extracted_user_layer_operation'] = evaluation_result.extracted_layer_operation
36
-
37
- if state['is_valid_context']:
38
- response_text = "入力はATT&CKフレームワークのコンテキストに合致します。シナリオの評価を続けます。"
39
- else:
40
- response_text = "申し訳ありませんが、この入力はサイバー攻撃の分析やATT&CKフレームワークのレイヤーに関する指示として認識できませんでした。適切な指示を入力してください。"
41
-
42
- await msg.stream_token(response_text)
43
- await msg.send()
44
- state['messages'].append(AIMessage(content=response_text))
45
-
46
- except Exception as e:
47
- error_msg = f"コンテキスト評価中にエラーが発生しました: {str(e)}"
48
- await msg.stream_token(error_msg)
49
- await msg.send()
50
- state['messages'].append(AIMessage(content=error_msg))
51
- state['is_valid_context'] = False
52
-
53
  return state
54
 
55
- @cl.step(name="シナリオ更新", type="update")
56
- async def update_scenario_node(state: AttackState) -> AttackState:
57
- """Node for updating the scenario based on user input"""
58
- msg = cl.Message(content="")
59
-
60
- # Get the last user message
61
- user_message = state.get('extracted_user_scenario', None)
62
- current_scenario = state.get('scenario', None)
63
- if not user_message and not current_scenario:
64
- raise ValueError("シナリオの更新に必要な情報がありません。")
65
-
66
- try:
67
- updated_scenario = await llm_service.generate_scenario(user_message, current_scenario)
68
- state['scenario'] = updated_scenario
69
-
70
- message = "新しいシナリオを作成しました。" if not current_scenario else "シナリオを更新しました。"
71
- await msg.stream_token(message)
72
- await msg.send()
73
- state['messages'].append(AIMessage(content=message))
74
-
75
- except Exception as e:
76
- error_msg = f"シナリオの{'作成' if not current_scenario else '更新'}中にエラーが発生しました: {str(e)}"
77
- await msg.stream_token(error_msg)
78
  await msg.send()
79
- state['messages'].append(AIMessage(content=error_msg))
80
-
81
  return state
82
 
83
- @cl.step(name="JSON生成/更新", type="generation", language="json")
84
- async def generate_json_node(state: AttackState) -> AttackState:
85
- """Node for generating or updating ATT&CK Navigator JSON"""
86
- user_message = state.get('extracted_user_layer_operation')
87
- current_scenario = state.get('scenario')
88
- existing_json = state.get('attack_json')
89
-
90
- try:
91
- json_content = await llm_service.generate_attack_json(user_message, current_scenario, existing_json)
92
-
93
- # Save JSON to file
94
- filename, filepath = save_response(json_content)
95
- file_element = create_file_element(filename, filepath)
96
-
97
- json_url = CHAINLIT_URL + "/" + filepath
98
- json_url = quote(json_url)
99
-
100
- # Prepare and send the response message
101
- response = "MITRE ATT&CK Navigatorレイヤーを更新しました。" if existing_json else "MITRE ATT&CK Navigatorレイヤーを生成しました。"
102
- response += " ファイルをダウンロードしてインポートできます。"
103
- response += f"ATT&CK Navigator : https://mitre-attack.github.io/attack-navigator//#layerURL={json_url}"
104
-
105
- msg = cl.Message(content=response, elements=[file_element])
106
- await msg.send()
107
-
108
- # Update state
109
- state['messages'].append(AIMessage(content=response))
110
- state['attack_json'] = json.loads(json_content)
111
-
112
- except Exception as e:
113
- error_msg = f"ATT&CK Navigatorレイヤーの生成中にエラーが発生しました: {str(e)}"
114
- msg = cl.Message(content=error_msg)
115
- await msg.send()
116
- state['messages'].append(AIMessage(content=error_msg))
117
-
118
- return state
119
-
120
- async def display_state_node(state: AttackState) -> AttackState:
121
- """Node for displaying the current state before ending"""
122
- async with cl.Step(name="状態表示", type="display") as step:
123
- # コンテキストの評価結果
124
- if state.get('is_valid_context') is not None:
125
- status = "有効" if state['is_valid_context'] else "無効"
126
- step.input = "コンテキスト評価"
127
- step.output = f"コンテキストの評価結果: {status}"
128
-
129
- # 現在のシナリオ
130
- if state.get('scenario'):
131
- async with cl.Step(name="シナリオ情報", type="display") as scenario_step:
132
- scenario_step.input = "現在のシナリオ"
133
- scenario_step.output = state['scenario']
134
-
135
- # JSONの状態
136
- if state.get('attack_json'):
137
- async with cl.Step(name="ATT&CK情報", type="display") as attack_step:
138
- techniques = state['attack_json'].get('techniques', [])
139
- technique_count = len(techniques)
140
- attack_step.input = "登録済みテクニック"
141
-
142
- if technique_count > 0:
143
- technique_list = "\n".join([
144
- f"- {t.get('techniqueID', 'Unknown')}: {t.get('comment', '説明なし')}"
145
- for t in techniques[:5]
146
- ])
147
- if technique_count > 5:
148
- technique_list += f"\n... 他 {technique_count - 5} 件"
149
- attack_step.output = f"登録済みテクニック数: {technique_count}\n\n{technique_list}"
150
- else:
151
- attack_step.output = "登録済みテクニックはありません"
152
- msg = cl.Message(content="処理が完了しました。")
153
  await msg.send()
 
 
154
  return state
155
 
156
- # Create the graph
157
- workflow = StateGraph(AttackState)
158
 
159
- # Add nodes
160
- workflow.add_node("evaluate_context", evaluate_context_node)
161
- workflow.add_node("update_scenario", update_scenario_node)
162
- workflow.add_node("generate_json", generate_json_node)
163
- workflow.add_node("display_state", display_state_node)
164
 
165
- # Add edges
166
- workflow.add_edge(START, "evaluate_context")
167
- workflow.add_conditional_edges(
168
- "evaluate_context",
169
- lambda state: state.get('is_valid_context', False),
 
 
 
170
  {
171
- True: "update_scenario",
172
- False: "display_state"
173
  }
174
  )
175
- workflow.add_edge("update_scenario", "generate_json")
176
- workflow.add_edge("generate_json", "display_state")
177
- workflow.add_edge("display_state", END)
178
-
179
- # Compile the graph
180
- chainlit_app = workflow.compile()
181
-
182
- async def test_evaluate_context_node(state: AttackState) -> AttackState:
183
- """テスト用のコンテキスト評価ノード"""
184
- user_messages = [msg for msg in state['messages'] if isinstance(msg, HumanMessage)]
185
- user_message = user_messages[-1].content if user_messages else ""
186
-
187
- try:
188
- evaluation_result = await llm_service.evaluate_context(user_message)
189
- state['is_valid_context'] = evaluation_result.is_valid
190
- state['extracted_user_scenario'] = evaluation_result.extracted_scenario
191
- state['extracted_user_layer_operation'] = evaluation_result.extracted_layer_operation
192
- response_text = "入力はATT&CKフレームワークのコンテキストに合致します。シナリオの評価を続けます。" if state['is_valid_context'] else "申し訳ありませんが、この入力はサイバー攻撃の分析やATT&CKフレームワークのレイヤーに関する指示として認識できませんでした。適切な指示を入力してください。"
193
- state['messages'].append(AIMessage(content=response_text))
194
- except Exception as e:
195
- error_msg = f"コンテキスト評価中にエラーが発生しました: {str(e)}"
196
- state['messages'].append(AIMessage(content=error_msg))
197
- state['is_valid_context'] = False
198
-
199
- return state
200
-
201
- async def test_update_scenario_node(state: AttackState) -> AttackState:
202
- """テスト用のシナリオ更新ノード"""
203
- # Get the last user message
204
- user_message = state.get('extracted_user_scenario')
205
- current_scenario = state.get('scenario')
206
-
207
- try:
208
- updated_scenario = await llm_service.generate_scenario(user_message, current_scenario)
209
- state['scenario'] = updated_scenario
210
- message = "新しいシナリオを作成しました。" if not current_scenario else "シナリオを更新しました。"
211
- state['messages'].append(AIMessage(content=message))
212
- except Exception as e:
213
- error_msg = f"シナリオの{'作成' if not current_scenario else '更新'}中にエラーが発生しました: {str(e)}"
214
- state['messages'].append(AIMessage(content=error_msg))
215
-
216
- return state
217
-
218
- async def test_generate_json_node(state: AttackState) -> AttackState:
219
- """テスト用のJSON生成ノード"""
220
- user_message = state.get('extracted_user_layer_operation')
221
- current_scenario = state.get('scenario')
222
- existing_json = state.get('attack_json')
223
-
224
- try:
225
- json_content = await llm_service.generate_attack_json(user_message, current_scenario, existing_json)
226
- response = "MITRE ATT&CK Navigatorレイヤーを更新しました。" if existing_json else "MITRE ATT&CK Navigatorレイヤーを生成しました。"
227
- response += " ファイルをダウンロードしてインポートできます。"
228
- state['messages'].append(AIMessage(content=response))
229
- state['attack_json'] = json.loads(json_content)
230
- except Exception as e:
231
- error_msg = f"ATT&CK Navigatorレイヤーの生成中にエラーが発生しました: {str(e)}"
232
- state['messages'].append(AIMessage(content=error_msg))
233
-
234
- return state
235
-
236
- async def test_display_state_node(state: AttackState) -> AttackState:
237
- """テスト用の状態表示ノード"""
238
- summary = []
239
-
240
- if state.get('is_valid_context') is not None:
241
- status = "有効" if state['is_valid_context'] else "無効"
242
- summary.append(f"コンテキストの評価結果: {status}")
243
-
244
- if state.get('scenario'):
245
- summary.append(f"現在のシナリオ:\n{state['scenario']}")
246
-
247
- if state.get('attack_json'):
248
- techniques = state['attack_json'].get('techniques', [])
249
- technique_count = len(techniques)
250
- summary.append(f"登録済みテクニック数: {technique_count}")
251
-
252
- if technique_count > 0:
253
- technique_list = "\n".join([
254
- f"- {t.get('techniqueID', 'Unknown')}: {t.get('comment', '説明なし')}"
255
- for t in techniques[:5]
256
- ])
257
- if technique_count > 5:
258
- technique_list += f"\n... 他 {technique_count - 5} 件"
259
- summary.append(f"\n登録済みテクニック:\n{technique_list}")
260
-
261
- if summary:
262
- state_summary = "\n\n".join(summary)
263
- state['messages'].append(AIMessage(content=f"現在の状態:\n{state_summary}"))
264
-
265
- return state
266
-
267
- async def main():
268
- """テスト用のメイン関数"""
269
- try:
270
- # 初期状態の作成
271
- initial_state = get_initial_state()
272
-
273
- # テスト用の既存シナリオ
274
- existing_scenario = """
275
- 標的システムへの不正アクセスシナリオ
276
-
277
- 概要:
278
- 攻撃者は、標的のシステムに不正アクセスを試み、機密情報を窃取します。
279
-
280
- 攻撃フェーズ:
281
- 1. 初期アクセス
282
- - パスワードスプレー攻撃による認証情報の取得
283
- - 有効なアカウントの特定
284
-
285
- 2. 実行
286
- - 取得した認証情報を使用してシステムにログイン
287
- - 不正なコマンドの実行
288
-
289
- 3. 権限昇格
290
- - 管理者権限の取得
291
- - システム設定の変更
292
 
293
- 4. 防御回避
294
- - ログの削除
295
- - 攻撃痕跡の隠蔽
296
- """
297
-
298
- # テスト用の既存JSON
299
- existing_json = {
300
- "name": "Test Layer",
301
- "versions": {
302
- "attack": "16.0",
303
- "navigator": "4.9.0",
304
- "layer": "4.5"
305
- },
306
- "domain": "enterprise-attack",
307
- "description": "Test layer for development",
308
- "filters": {
309
- "platforms": ["Windows", "Linux", "macOS"]
310
- },
311
- "gradient": {
312
- "colors": ["#ffffff", "#ff6666"],
313
- "minValue": 0,
314
- "maxValue": 100
315
- },
316
- "techniques": [
317
- {
318
- "techniqueID": "T1110",
319
- "score": 50,
320
- "color": "#ff6666",
321
- "comment": "パスワードスプレー攻撃による認証情報の取得",
322
- "enabled": True
323
- },
324
- {
325
- "techniqueID": "T1078",
326
- "score": 50,
327
- "color": "#ff6666",
328
- "comment": "有効なアカウントを使用した不正アクセス",
329
- "enabled": True
330
- }
331
- ]
332
- }
333
-
334
- # 初期状態に既存データを設定
335
- initial_state['scenario'] = existing_scenario
336
- initial_state['attack_json'] = existing_json
337
-
338
- # テスト用のユーザーメッセージ
339
- test_message = """
340
- 以下の攻撃シナリオを分析してください:
341
-
342
- 攻撃者は、標的のシステムに不正アクセスを試みます。
343
- まず、パスワードスプレー攻撃を実行して、有効なアカウントの認証情報を取得します。
344
- 取得した認証情報を使用して、システムにログインし、機密情報を窃取します。
345
- 最後に、攻撃の痕跡を隠蔽するために、ログを削除します。
346
- """
347
-
348
- test_message = "ATTACKのバージョンを16にして、テクニックは青にして。"
349
 
350
- # ユーザーメッセージを状態に追加
351
- initial_state['messages'].append(HumanMessage(content=test_message))
352
-
353
- # テスト用ワークフローの実行
354
- state = await test_evaluate_context_node(initial_state)
355
-
356
- if state.get('is_valid_context', False):
357
- # シナリオ更新
358
- state = await test_update_scenario_node(state)
359
-
360
- # JSON生成
361
- state = await test_generate_json_node(state)
362
-
363
- # 状態表示
364
- state = await test_display_state_node(state)
365
-
366
- # 結果の表示
367
- for msg in state['messages']:
368
- role = "User" if isinstance(msg, HumanMessage) else "Assistant"
369
- print(f"\n{role}:")
370
- print(msg.content)
371
-
372
- except Exception as e:
373
- print(f"エラーが発生しました: {str(e)}")
374
- raise
375
 
376
- if __name__ == "__main__":
377
- import asyncio
378
- asyncio.run(main())
 
1
  from langgraph.graph import StateGraph, START, END
2
+ from models.chat_state import SecuritySurveyState, process_answer, get_next_question, has_unanswered_questions
3
+ from security_checklist import security_checklist, UNDEFINED
 
 
 
4
  import chainlit as cl
5
  import json
 
 
 
 
 
 
6
 
7
+ @cl.step(name="ユーザ入力処理", type="input_processing")
8
+ async def process_input_node(state: SecuritySurveyState) -> SecuritySurveyState:
9
+ """ユーザの入力を処理するノード。新規セッションか回答かを判断する"""
10
+ # 新規セッションの場合は何もしない
11
+ if state.get('is_new_session', True):
12
+ state['is_new_session'] = False
13
+ return state
14
+
15
+ # ユーザの最後のメッセージを取得
16
+ user_messages = [msg for msg in state['messages'] if msg.type == 'human']
17
+ if not user_messages:
18
+ return state
19
+
20
+ answer = user_messages[-1].content
21
+ # 回答を処理
22
+ state = process_answer(state, answer)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
  return state
24
 
25
+ @cl.step(name="質問表示", type="question_display")
26
+ async def display_question_node(state: SecuritySurveyState) -> SecuritySurveyState:
27
+ """質問を表示するノード"""
28
+ question = get_next_question(state)
29
+ if question:
30
+ msg = cl.Message(content=question)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
31
  await msg.send()
 
 
32
  return state
33
 
34
+ @cl.step(name="調査完了", type="survey_complete")
35
+ async def survey_complete_node(state: SecuritySurveyState) -> SecuritySurveyState:
36
+ """全質問が回答された後の処理を行うノード"""
37
+ # 回答をより人間が読みやすいフォーマットに変換する
38
+ formatted_answers = {}
39
+ for part, questions in state['answers'].items():
40
+ formatted_answers[part] = {}
41
+ for question, answer in questions.items():
42
+ if answer is True:
43
+ formatted_answers[part][question] = "対応済み"
44
+ elif answer is False:
45
+ formatted_answers[part][question] = "未対応"
46
+ else:
47
+ formatted_answers[part][question] = "未回答"
48
+
49
+ result_json = json.dumps(formatted_answers, ensure_ascii=False, indent=2)
50
+
51
+ # 結果のサマリーを作成する
52
+ total_items = 0
53
+ implemented_items = 0
54
+
55
+ for part, questions in state['answers'].items():
56
+ for question, answer in questions.items():
57
+ total_items += 1
58
+ if answer is True:
59
+ implemented_items += 1
60
+
61
+ implementation_rate = (implemented_items / total_items) * 100 if total_items > 0 else 0
62
+
63
+ summary = f"セキュリティ対策実施率: {implementation_rate:.1f}% ({implemented_items}/{total_items}項目)"
64
+
65
+ # 結果を返す
66
+ msg = cl.Message(content=f"セキュリティチェックリストの結果です。\n\n{summary}", author="system")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
  await msg.send()
68
+ msg2 = cl.Message(content=f"```json\n{result_json}\n```", language="json", author="system")
69
+ await msg2.send()
70
  return state
71
 
72
+ # ワークフローの定義
73
+ survey_workflow = StateGraph(SecuritySurveyState)
74
 
75
+ # ノードの追加
76
+ survey_workflow.add_node("process_input", process_input_node)
77
+ survey_workflow.add_node("display_question", display_question_node)
78
+ survey_workflow.add_node("survey_complete", survey_complete_node)
 
79
 
80
+ # エッジの追加
81
+ survey_workflow.add_edge(START, "process_input")
82
+
83
+ # process_input から次のノードへの条件付きエッジ
84
+ survey_workflow.add_conditional_edges(
85
+ "process_input",
86
+ # 条件関数: 質問が全て終わったかどうか
87
+ lambda state: not has_unanswered_questions(state),
88
  {
89
+ True: "survey_complete", # 全質問完了 → 調査完了
90
+ False: "display_question" # まだ質問あり → 質問表示
91
  }
92
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
93
 
94
+ # display_question から次のノードへのエッジ (必ずENDへ)
95
+ survey_workflow.add_edge("display_question", END)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
 
97
+ # survey_complete から次のノード��のエッジ (必ずENDへ)
98
+ survey_workflow.add_edge("survey_complete", END)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
99
 
100
+ # ワークフローをコンパイル
101
+ survey_chainlit_app = survey_workflow.compile()