Spaces:
Sleeping
Sleeping
from langgraph.graph import StateGraph, START, END | |
from models.chat_state import SecuritySurveyState, process_answer, get_next_question, has_unanswered_questions | |
from security_checklist import security_checklist, UNDEFINED | |
import chainlit as cl | |
import json | |
async def process_input_node(state: SecuritySurveyState) -> SecuritySurveyState: | |
"""ユーザの入力を処理するノード。新規セッションか回答かを判断する""" | |
# 新規セッションの場合は何もしない | |
if state.get('is_new_session', True): | |
state['is_new_session'] = False | |
return state | |
# ユーザの最後のメッセージを取得 | |
user_messages = [msg for msg in state['messages'] if msg.type == 'human'] | |
if not user_messages: | |
return state | |
answer = user_messages[-1].content | |
# 回答を処理 | |
state = process_answer(state, answer) | |
return state | |
async def display_question_node(state: SecuritySurveyState) -> SecuritySurveyState: | |
"""質問を表示するノード""" | |
question = get_next_question(state) | |
if question: | |
msg = cl.Message(content=question) | |
await msg.send() | |
return state | |
async def survey_complete_node(state: SecuritySurveyState) -> SecuritySurveyState: | |
"""全質問が回答された後の処理を行うノード""" | |
# 回答をより人間が読みやすいフォーマットに変換する | |
formatted_answers = {} | |
for part, questions in state['answers'].items(): | |
formatted_answers[part] = {} | |
for question, answer in questions.items(): | |
if answer is True: | |
formatted_answers[part][question] = "対応済み" | |
elif answer is False: | |
formatted_answers[part][question] = "未対応" | |
else: | |
formatted_answers[part][question] = "未回答" | |
result_json = json.dumps(formatted_answers, ensure_ascii=False, indent=2) | |
# 結果のサマリーを作成する | |
total_items = 0 | |
implemented_items = 0 | |
for part, questions in state['answers'].items(): | |
for question, answer in questions.items(): | |
total_items += 1 | |
if answer is True: | |
implemented_items += 1 | |
implementation_rate = (implemented_items / total_items) * 100 if total_items > 0 else 0 | |
summary = f"セキュリティ対策実施率: {implementation_rate:.1f}% ({implemented_items}/{total_items}項目)" | |
# 結果を返す | |
msg = cl.Message(content=f"セキュリティチェックリストの結果です。\n\n{summary}", author="system") | |
await msg.send() | |
msg2 = cl.Message(content=f"```json\n{result_json}\n```", language="json", author="system") | |
await msg2.send() | |
return state | |
# ワークフローの定義 | |
survey_workflow = StateGraph(SecuritySurveyState) | |
# ノードの追加 | |
survey_workflow.add_node("process_input", process_input_node) | |
survey_workflow.add_node("display_question", display_question_node) | |
survey_workflow.add_node("survey_complete", survey_complete_node) | |
# エッジの追加 | |
survey_workflow.add_edge(START, "process_input") | |
# process_input から次のノードへの条件付きエッジ | |
survey_workflow.add_conditional_edges( | |
"process_input", | |
# 条件関数: 質問が全て終わったかどうか | |
lambda state: not has_unanswered_questions(state), | |
{ | |
True: "survey_complete", # 全質問完了 → 調査完了 | |
False: "display_question" # まだ質問あり → 質問表示 | |
} | |
) | |
# display_question から次のノードへのエッジ (必ずENDへ) | |
survey_workflow.add_edge("display_question", END) | |
# survey_complete から次のノードへのエッジ (必ずENDへ) | |
survey_workflow.add_edge("survey_complete", END) | |
# ワークフローをコンパイル | |
survey_chainlit_app = survey_workflow.compile() |