File size: 4,036 Bytes
5b76d85
3966c38
 
5b76d85
 
 
3966c38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b76d85
 
3966c38
 
 
 
 
 
5b76d85
 
 
3966c38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b76d85
3966c38
 
5b76d85
 
3966c38
 
5b76d85
3966c38
 
 
 
5b76d85
3966c38
 
 
 
 
 
 
 
5b76d85
3966c38
 
5b76d85
 
 
3966c38
 
5b76d85
3966c38
 
5b76d85
3966c38
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
from langgraph.graph import StateGraph, START, END
from models.chat_state import SecuritySurveyState, process_answer, get_next_question, has_unanswered_questions
from security_checklist import security_checklist, UNDEFINED
import chainlit as cl
import json

@cl.step(name="ユーザ入力処理", type="input_processing")
async def process_input_node(state: SecuritySurveyState) -> SecuritySurveyState:
    """ユーザの入力を処理するノード。新規セッションか回答かを判断する"""
    # 新規セッションの場合は何もしない
    if state.get('is_new_session', True):
        state['is_new_session'] = False
        return state
    
    # ユーザの最後のメッセージを取得
    user_messages = [msg for msg in state['messages'] if msg.type == 'human']
    if not user_messages:
        return state
    
    answer = user_messages[-1].content
    # 回答を処理
    state = process_answer(state, answer)
    return state

@cl.step(name="質問表示", type="question_display")
async def display_question_node(state: SecuritySurveyState) -> SecuritySurveyState:
    """質問を表示するノード"""
    question = get_next_question(state)
    if question:
        msg = cl.Message(content=question)
        await msg.send()
    return state

@cl.step(name="調査完了", type="survey_complete")
async def survey_complete_node(state: SecuritySurveyState) -> SecuritySurveyState:
    """全質問が回答された後の処理を行うノード"""
    # 回答をより人間が読みやすいフォーマットに変換する
    formatted_answers = {}
    for part, questions in state['answers'].items():
        formatted_answers[part] = {}
        for question, answer in questions.items():
            if answer is True:
                formatted_answers[part][question] = "対応済み"
            elif answer is False:
                formatted_answers[part][question] = "未対応"
            else:
                formatted_answers[part][question] = "未回答"
    
    result_json = json.dumps(formatted_answers, ensure_ascii=False, indent=2)
    
    # 結果のサマリーを作成する
    total_items = 0
    implemented_items = 0
    
    for part, questions in state['answers'].items():
        for question, answer in questions.items():
            total_items += 1
            if answer is True:
                implemented_items += 1
    
    implementation_rate = (implemented_items / total_items) * 100 if total_items > 0 else 0
    
    summary = f"セキュリティ対策実施率: {implementation_rate:.1f}% ({implemented_items}/{total_items}項目)"
    
    # 結果を返す
    msg = cl.Message(content=f"セキュリティチェックリストの結果です。\n\n{summary}", author="system")
    await msg.send()
    msg2 = cl.Message(content=f"```json\n{result_json}\n```", language="json", author="system")
    await msg2.send()
    return state

# ワークフローの定義
survey_workflow = StateGraph(SecuritySurveyState)

# ノードの追加
survey_workflow.add_node("process_input", process_input_node)
survey_workflow.add_node("display_question", display_question_node)
survey_workflow.add_node("survey_complete", survey_complete_node)

# エッジの追加
survey_workflow.add_edge(START, "process_input")

# process_input から次のノードへの条件付きエッジ
survey_workflow.add_conditional_edges(
    "process_input",
    # 条件関数: 質問が全て終わったかどうか
    lambda state: not has_unanswered_questions(state),
    {
        True: "survey_complete",   # 全質問完了 → 調査完了
        False: "display_question"  # まだ質問あり → 質問表示
    }
)

# display_question から次のノードへのエッジ (必ずENDへ)
survey_workflow.add_edge("display_question", END)

# survey_complete から次のノードへのエッジ (必ずENDへ)
survey_workflow.add_edge("survey_complete", END)

# ワークフローをコンパイル
survey_chainlit_app = survey_workflow.compile()