Spaces:
Sleeping
Sleeping
File size: 4,036 Bytes
5b76d85 3966c38 5b76d85 3966c38 5b76d85 3966c38 5b76d85 3966c38 5b76d85 3966c38 5b76d85 3966c38 5b76d85 3966c38 5b76d85 3966c38 5b76d85 3966c38 5b76d85 3966c38 5b76d85 3966c38 5b76d85 3966c38 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
from langgraph.graph import StateGraph, START, END
from models.chat_state import SecuritySurveyState, process_answer, get_next_question, has_unanswered_questions
from security_checklist import security_checklist, UNDEFINED
import chainlit as cl
import json
@cl.step(name="ユーザ入力処理", type="input_processing")
async def process_input_node(state: SecuritySurveyState) -> SecuritySurveyState:
"""ユーザの入力を処理するノード。新規セッションか回答かを判断する"""
# 新規セッションの場合は何もしない
if state.get('is_new_session', True):
state['is_new_session'] = False
return state
# ユーザの最後のメッセージを取得
user_messages = [msg for msg in state['messages'] if msg.type == 'human']
if not user_messages:
return state
answer = user_messages[-1].content
# 回答を処理
state = process_answer(state, answer)
return state
@cl.step(name="質問表示", type="question_display")
async def display_question_node(state: SecuritySurveyState) -> SecuritySurveyState:
"""質問を表示するノード"""
question = get_next_question(state)
if question:
msg = cl.Message(content=question)
await msg.send()
return state
@cl.step(name="調査完了", type="survey_complete")
async def survey_complete_node(state: SecuritySurveyState) -> SecuritySurveyState:
"""全質問が回答された後の処理を行うノード"""
# 回答をより人間が読みやすいフォーマットに変換する
formatted_answers = {}
for part, questions in state['answers'].items():
formatted_answers[part] = {}
for question, answer in questions.items():
if answer is True:
formatted_answers[part][question] = "対応済み"
elif answer is False:
formatted_answers[part][question] = "未対応"
else:
formatted_answers[part][question] = "未回答"
result_json = json.dumps(formatted_answers, ensure_ascii=False, indent=2)
# 結果のサマリーを作成する
total_items = 0
implemented_items = 0
for part, questions in state['answers'].items():
for question, answer in questions.items():
total_items += 1
if answer is True:
implemented_items += 1
implementation_rate = (implemented_items / total_items) * 100 if total_items > 0 else 0
summary = f"セキュリティ対策実施率: {implementation_rate:.1f}% ({implemented_items}/{total_items}項目)"
# 結果を返す
msg = cl.Message(content=f"セキュリティチェックリストの結果です。\n\n{summary}", author="system")
await msg.send()
msg2 = cl.Message(content=f"```json\n{result_json}\n```", language="json", author="system")
await msg2.send()
return state
# ワークフローの定義
survey_workflow = StateGraph(SecuritySurveyState)
# ノードの追加
survey_workflow.add_node("process_input", process_input_node)
survey_workflow.add_node("display_question", display_question_node)
survey_workflow.add_node("survey_complete", survey_complete_node)
# エッジの追加
survey_workflow.add_edge(START, "process_input")
# process_input から次のノードへの条件付きエッジ
survey_workflow.add_conditional_edges(
"process_input",
# 条件関数: 質問が全て終わったかどうか
lambda state: not has_unanswered_questions(state),
{
True: "survey_complete", # 全質問完了 → 調査完了
False: "display_question" # まだ質問あり → 質問表示
}
)
# display_question から次のノードへのエッジ (必ずENDへ)
survey_workflow.add_edge("display_question", END)
# survey_complete から次のノードへのエッジ (必ずENDへ)
survey_workflow.add_edge("survey_complete", END)
# ワークフローをコンパイル
survey_chainlit_app = survey_workflow.compile() |