from langgraph.graph import StateGraph, START, END from models.chat_state import SecuritySurveyState, process_answer, get_next_question, has_unanswered_questions from security_checklist import security_checklist, UNDEFINED import chainlit as cl import json @cl.step(name="ユーザ入力処理", type="input_processing") async def process_input_node(state: SecuritySurveyState) -> SecuritySurveyState: """ユーザの入力を処理するノード。新規セッションか回答かを判断する""" # 新規セッションの場合は何もしない if state.get('is_new_session', True): state['is_new_session'] = False return state # ユーザの最後のメッセージを取得 user_messages = [msg for msg in state['messages'] if msg.type == 'human'] if not user_messages: return state answer = user_messages[-1].content # 回答を処理 state = process_answer(state, answer) return state @cl.step(name="質問表示", type="question_display") async def display_question_node(state: SecuritySurveyState) -> SecuritySurveyState: """質問を表示するノード""" question = get_next_question(state) if question: msg = cl.Message(content=question) await msg.send() return state @cl.step(name="調査完了", type="survey_complete") async def survey_complete_node(state: SecuritySurveyState) -> SecuritySurveyState: """全質問が回答された後の処理を行うノード""" # 回答をより人間が読みやすいフォーマットに変換する formatted_answers = {} for part, questions in state['answers'].items(): formatted_answers[part] = {} for question, answer in questions.items(): if answer is True: formatted_answers[part][question] = "対応済み" elif answer is False: formatted_answers[part][question] = "未対応" else: formatted_answers[part][question] = "未回答" result_json = json.dumps(formatted_answers, ensure_ascii=False, indent=2) # 結果のサマリーを作成する total_items = 0 implemented_items = 0 for part, questions in state['answers'].items(): for question, answer in questions.items(): total_items += 1 if answer is True: implemented_items += 1 implementation_rate = (implemented_items / total_items) * 100 if total_items > 0 else 0 summary = f"セキュリティ対策実施率: {implementation_rate:.1f}% ({implemented_items}/{total_items}項目)" # 結果を返す msg = cl.Message(content=f"セキュリティチェックリストの結果です。\n\n{summary}", author="system") await msg.send() msg2 = cl.Message(content=f"```json\n{result_json}\n```", language="json", author="system") await msg2.send() return state # ワークフローの定義 survey_workflow = StateGraph(SecuritySurveyState) # ノードの追加 survey_workflow.add_node("process_input", process_input_node) survey_workflow.add_node("display_question", display_question_node) survey_workflow.add_node("survey_complete", survey_complete_node) # エッジの追加 survey_workflow.add_edge(START, "process_input") # process_input から次のノードへの条件付きエッジ survey_workflow.add_conditional_edges( "process_input", # 条件関数: 質問が全て終わったかどうか lambda state: not has_unanswered_questions(state), { True: "survey_complete", # 全質問完了 → 調査完了 False: "display_question" # まだ質問あり → 質問表示 } ) # display_question から次のノードへのエッジ (必ずENDへ) survey_workflow.add_edge("display_question", END) # survey_complete から次のノードへのエッジ (必ずENDへ) survey_workflow.add_edge("survey_complete", END) # ワークフローをコンパイル survey_chainlit_app = survey_workflow.compile()