SDLC / SDLC.py
shaikhmohammedmujammil's picture
Update SDLC.py
9f559a4 verified
# SDLC.py
import os
import sys
import shutil
from typing import List, Union, Dict, Annotated, Any
from typing_extensions import TypedDict
from pydantic import BaseModel, Field
from langchain.schema import AIMessage, HumanMessage
from langchain_core.language_models.base import BaseLanguageModel # Correct import path
from langchain_groq import ChatGroq
from langchain_openai import ChatOpenAI
# Add imports for other potential providers if needed
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_anthropic import ChatAnthropic
from tavily import TavilyClient
from dotenv import load_dotenv
import operator
import logging
import ast
import time
from plantuml import PlantUML
from functools import wraps
from tenacity import retry, stop_after_attempt, wait_exponential, wait_fixed, retry_if_exception_type
import nest_asyncio
# --- Basic logging setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Load Environment Variables ---
# Keep load_dotenv() in case some functions still rely on other env vars,
# but LLM/Tavily keys will now come from function args.
load_dotenv()
# --- REMOVED LLM / Tavily Initialization Block ---
# GLOBAL_LLM, OPENAI_LLM, tavily_client will be initialized dynamically
# --- Pydantic Models ---
# (Keep all Pydantic models as they were)
class DiagramSelection(BaseModel):
diagram_types: List[str] = Field(..., description="List of 5 selected UML/DFD diagram types")
justifications: List[str] = Field(..., description="Brief justifications for each diagram type")
class PlantUMLCode(BaseModel):
diagram_type: str = Field(..., description="Type of UML/DFD diagram")
code: str = Field(..., description="PlantUML code for the diagram")
class CodeFile(BaseModel):
filename: str = Field(..., description="Name of the file, including path relative to project root")
content: str = Field(..., description="Full content of the file")
class GeneratedCode(BaseModel):
files: List[CodeFile] = Field(..., description="List of all files in the project")
instructions: str = Field(..., description="Beginner-friendly setup and run instructions")
class TestCase(BaseModel):
description: str = Field(..., description="Description of the test case")
input_data: dict = Field(..., description="Fake input data, must be non-empty")
expected_output: dict = Field(..., description="Expected fake output, must be non-empty")
class TestCases(BaseModel):
test_cases: List[TestCase] = Field(..., description="List of test cases")
# --- Main State Definition ---
class MainState(TypedDict, total=False):
# --- ADDED instance storage ---
llm_instance: BaseLanguageModel | None # Store the initialized LLM
tavily_instance: TavilyClient | None # Store the initialized Tavily client
# --- END ADDED ---
# Core conversation history
messages: Annotated[List[Union[HumanMessage, AIMessage]], lambda x, y: (x or []) + (y or [])]
# Project definition
project_folder: str # Base name/relative path used for saving files
project: str
category: str
subcategory: str
coding_language: str
# User Input Cycle State
user_input_questions: List[str]
user_input_answers: List[str]
user_input_iteration: int
user_input_min_iterations: int
user_input_done: bool
# Core Artifacts
user_query_with_qa: str
refined_prompt: str
final_user_story: str
final_product_review: str
final_design_document: str
final_uml_codes: List[PlantUMLCode]
final_code_files: List[CodeFile]
final_code_review: str
final_security_issues: str
final_test_code_files: List[CodeFile]
final_quality_analysis: str
final_deployment_process: str
# File Paths
final_user_story_path: str
final_product_review_path: str
final_design_document_path: str
final_uml_diagram_folder: str
final_uml_png_paths: List[str]
final_review_security_folder: str
review_code_snapshot_folder: str
final_testing_folder: str
testing_passed_code_folder: str
final_quality_analysis_path: str
final_code_folder: str
final_deployment_path: str
# Intermediate States
user_story_current: str; user_story_feedback: str; user_story_human_feedback: str; user_story_done: bool;
product_review_current: str; product_review_feedback: str; product_review_human_feedback: str; product_review_done: bool;
design_doc_current: str; design_doc_feedback: str; design_doc_human_feedback: str; design_doc_done: bool;
uml_selected_diagrams: List[str]; uml_current_codes: List[PlantUMLCode]; uml_feedback: Dict[str, str]; uml_human_feedback: Dict[str, str]; uml_done: bool;
code_current: GeneratedCode;
code_human_input: str; code_web_search_results: str; code_feedback: str; code_human_feedback: str; code_done: bool;
code_review_current_feedback: str; security_current_feedback: str; review_security_human_feedback: str; review_security_done: bool;
test_cases_current: List[TestCase]; test_cases_feedback: str; test_cases_human_feedback: str; test_cases_passed: bool;
quality_current_analysis: str; quality_feedback: str; quality_human_feedback: str; quality_done: bool;
deployment_current_process: str; deployment_feedback: str; deployment_human_feedback: str; deployment_done: bool;
# --- Constants and Helper Functions ---
PLANTUML_SYNTAX_RULES = { # Keep the full dictionary
# ... (plantuml rules dictionary remains unchanged) ...
"Activity Diagram": {"template": "@startuml\nstart\nif (condition) then (yes)\n :action1;\nelse (no)\n :action2;\nendif\nwhile (condition)\n :action3;\nendwhile\nstop\n@enduml", "required_keywords": ["start", ":", "stop"], "notes": "Conditionals: if/else/endif. Loops: while/endwhile. Actions: :action;."},
"Sequence Diagram": {"template": "@startuml\nparticipant A\nparticipant B\nA -> B : message\nalt condition\n B --> A : success\nelse\n B --> A : failure\nend\n@enduml", "required_keywords": ["participant", "->", "-->"], "notes": "-> solid line, --> dashed line. alt/else/end for alternatives."},
"Use Case Diagram": {"template": "@startuml\nactor User\nusecase (UC1)\nUser --> (UC1)\n@enduml", "required_keywords": ["actor", "-->", "("], "notes": "Define actors and use cases, connect with -->."},
"Class Diagram": {"template": "@startuml\nclass MyClass {\n +field: Type\n +method()\n}\nMyClass --> OtherClass\n@enduml", "required_keywords": ["class", "{", "}", "-->"], "notes": "Define classes, attributes, methods. --> association, <|-- inheritance."},
"State Machine Diagram": {"template": "@startuml\n[*] --> State1\nState1 --> State2 : event [condition] / action\nState2 --> [*]\n@enduml", "required_keywords": ["[*]", "-->", ":"], "notes": "[*] start/end. --> transitions with event/condition/action."},
"Object Diagram": {"template": "@startuml\nobject obj1: Class1\nobj1 : attr = val\nobj1 --> obj2\n@enduml", "required_keywords": ["object", ":", "-->"], "notes": "Define objects (instances), set attributes, link."},
"Component Diagram": {"template": "@startuml\ncomponent Comp1\ninterface Iface\nComp1 ..> Iface\nComp1 --> Comp2\n@enduml", "required_keywords": ["component", "-->"], "notes": "Define components, interfaces. --> dependency, ..> usage."},
"Deployment Diagram": {"template": "@startuml\nnode Server {\n artifact app.jar\n}\n@enduml", "required_keywords": ["node", "artifact"], "notes": "Nodes for hardware/software envs, artifacts for deployed items."},
"Package Diagram": {"template": "@startuml\npackage \"My Package\" {\n class ClassA\n}\n@enduml", "required_keywords": ["package", "{"], "notes": "Group elements."},
"Composite Structure Diagram": {"template": "@startuml\nclass Composite {\n +part1 : Part1\n}\nComposite *-- Part1\n@enduml", "required_keywords": ["class", "{", "}", "*--"], "notes": "Show internal structure, *-- composition."},
"Timing Diagram": {"template": "@startuml\nrobust \"User\" as U\nconcise \"System\" as S\n@0\nU is Idle\nS is Ready\n@100\nU -> S : Request()\nS is Processing\n@300\nS --> U : Response()\nU is Active\nS is Ready\n@enduml", "required_keywords": ["@", "is"], "notes": "Show state changes over time."},
"Interaction Overview Diagram": {"template": "@startuml\nstart\nif (condition?) then (yes)\n ref over Actor : Interaction1\nelse (no)\n :Action A;\nendif\nstop\n@enduml", "required_keywords": ["start", ":", "ref", "stop"], "notes": "Combine activity diagrams with interaction refs."},
"Communication Diagram": {"template": "@startuml\nobject O1\nobject O2\nO1 -> O2 : message()\n@enduml", "required_keywords": ["object", "->", ":"], "notes": "Focus on object interactions."},
"Profile Diagram": {"template": "@startuml\nprofile MyProfile {\n stereotype MyStereotype\n}\n@enduml", "required_keywords": ["profile", "stereotype"], "notes": "Define custom stereotypes and tagged values."},
"Context Diagram (Level 0 DFD)": {"template": "@startuml\nrectangle System as S\nentity External as E\nE --> S : Data Input\nS --> E : Data Output\n@enduml", "required_keywords": ["rectangle", "entity", "-->", ":"], "notes": "System boundary, external entities, major data flows."},
"Level 1 DFD": {"template": "@startuml\nentity E\nrectangle P1\nrectangle P2\ndatabase DS\nE --> P1 : Input\nP1 --> P2 : Data\nP1 --> DS : Store\nP2 --> E : Output\n@enduml", "required_keywords": ["rectangle", "entity", "database", "-->", ":"], "notes": "Major processes, data stores, flows between them."},
"Level 2 DFD": {"template": "@startuml\nrectangle P1.1\nrectangle P1.2\ndatabase DS\nP1.1 --> P1.2 : Internal Data\nP1.2 --> DS : Store Detail\n@enduml", "required_keywords": ["rectangle", "-->", ":"], "notes": "Decomposition of Level 1 processes."},
"Level 3 DFD": {"template": "@startuml\nrectangle P1.1.1\nrectangle P1.1.2\nP1.1.1 --> P1.1.2 : Sub-detail\n@enduml", "required_keywords": ["rectangle", "-->", ":"], "notes": "Further decomposition."},
"General DFD": {"template": "@startuml\nentity E\nrectangle P\ndatabase DS\nE --> P : Input\nP --> DS : Store\nDS --> P : Retrieve\nP --> E : Output\n@enduml", "required_keywords": ["entity", "rectangle", "database", "-->", ":"], "notes": "Generic structure for DFDs."},
}
def validate_plantuml_code(diagram_type: str, code: str) -> bool:
# (validate_plantuml_code function remains unchanged)
if diagram_type not in PLANTUML_SYNTAX_RULES:
logger.warning(f"Unknown diagram type for validation: {diagram_type}")
return False
rules = PLANTUML_SYNTAX_RULES[diagram_type]
required_keywords = rules.get("required_keywords", [])
if not code:
logger.warning(f"Empty code provided for {diagram_type}.")
return False
code_cleaned = code.strip()
if not code_cleaned.startswith("@startuml"):
logger.warning(f"PlantUML code for {diagram_type} does not start with @startuml.")
if not code_cleaned.endswith("@enduml"):
logger.warning(f"PlantUML code for {diagram_type} does not end with @enduml.")
if required_keywords:
missing_keywords = [kw for kw in required_keywords if kw not in code]
if missing_keywords:
logger.warning(f"PlantUML code for {diagram_type} missing required keywords: {missing_keywords}.")
return True
# --- UPDATED: Initialization Function ---
def initialize_llm_clients(provider: str, model_name: str, llm_api_key: str, tavily_api_key: str) -> tuple[BaseLanguageModel | None, TavilyClient | None, str | None]:
"""
Initializes LLM and Tavily clients based on user-provided configuration.
Applies nest_asyncio patch for compatibility with Streamlit threads.
"""
# --- ADDED: Apply nest_asyncio ---
nest_asyncio.apply()
# --- END ADDED ---
llm_instance = None
tavily_instance = None
error_message = None
provider_lower = provider.lower()
# --- Initialize LLM ---
try:
logger.info(f"Attempting to initialize LLM: Provider='{provider}', Model='{model_name}'")
if not llm_api_key:
raise ValueError("LLM API Key is required.")
if provider_lower == "openai":
llm_instance = ChatOpenAI(model=model_name, temperature=0.5, api_key=llm_api_key)
elif provider_lower == "groq":
llm_instance = ChatGroq(model=model_name, temperature=0.5, api_key=llm_api_key)
elif provider_lower == "google":
# This initialization should now work after nest_asyncio.apply()
llm_instance = ChatGoogleGenerativeAI(model=model_name, google_api_key=llm_api_key, temperature=0.5)
elif provider_lower == "anthropic":
llm_instance = ChatAnthropic(model=model_name, anthropic_api_key=llm_api_key, temperature=0.5)
elif provider_lower == "xai":
xai_base_url = "https://api.x.ai/v1"
logger.info(f"Using xAI endpoint: {xai_base_url}")
llm_instance = ChatOpenAI(model=model_name, temperature=0.5, api_key=llm_api_key, base_url=xai_base_url)
else:
raise ValueError(f"Unsupported LLM provider: {provider}")
# Optional: Test call
# ...
logger.info(f"LLM {provider} - {model_name} initialized successfully.")
except ValueError as ve:
error_message = str(ve); logger.error(f"LLM Init Error: {error_message}"); llm_instance = None
except ImportError:
error_message = f"Missing library for {provider}. Install required package."; logger.error(error_message); llm_instance = None
except Exception as e:
# Check if it's the event loop error specifically, although nest_asyncio should fix it
if "no current event loop" in str(e):
error_message = f"Asyncio event loop issue persists even with nest_asyncio for {provider}: {e}"
else:
error_message = f"Unexpected error initializing LLM for {provider}: {e}"
logger.error(error_message, exc_info=True); llm_instance = None
# --- Initialize Tavily (No change) ---
# (Tavily part remains the same)
if tavily_api_key:
try:
logger.info("Initializing Tavily client..."); tavily_instance = TavilyClient(api_key=tavily_api_key); logger.info("Tavily client initialized.")
except Exception as e:
tavily_err = f"Failed to initialize Tavily: {e}"; logger.error(tavily_err, exc_info=True)
if error_message is None: error_message = tavily_err
tavily_instance = None
else: logger.warning("Tavily API Key not provided."); tavily_instance = None
return llm_instance, tavily_instance, error_message
# --- Modified Retry Decorator ---
# Removed the initial GLOBAL_LLM check
def with_retry(func):
"""Decorator to add retry logic to functions, especially LLM calls."""
@wraps(func)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(Exception),
before_sleep=lambda rs: logger.warning(
f"Retrying {func.__name__} (attempt {rs.attempt_number}) after {rs.next_action.sleep:.2f}s delay..."
)
)
def wrapper(*args, **kwargs):
try:
# Execute the decorated function
return func(*args, **kwargs)
except Exception as e:
# Log the error after all retries have failed
logger.error(f"Error in {func.__name__} after retries: {e}", exc_info=True)
raise # Re-raise the exception
return wrapper
# --- Workflow Functions ---
# --- MODIFIED TO USE state['llm_instance'] and state['tavily_instance'] ---
# --- User Input Cycle ---
@with_retry
def generate_questions(state: MainState) -> MainState:
"""Generates clarification questions."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
context = f"Project: {state['project']} ({state['category']}/{state['subcategory']}) in {state['coding_language']}."
iteration = state.get("user_input_iteration", 0)
if iteration == 0:
prompt = f"You are a requirements analyst. Ask exactly 5 concise questions to clarify the initial needs for this project: {context}"
else:
qa_history = "\n".join([f"Q: {q}\nA: {a}" for q, a in zip(state.get("user_input_questions",[]), state.get("user_input_answers",[]))])
prompt = f"Based on the previous Q&A for the project ({context}), ask up to 5 more concise clarification questions...\nPrevious Q&A:\n{qa_history}"
response = llm.invoke(prompt) # Use LLM from state
questions = [q.strip() for q in response.content.strip().split("\n") if q.strip()]
state["user_input_questions"] = state.get("user_input_questions", []) + questions
state["messages"].append(AIMessage(content="\n".join(questions)))
logger.info(f"Generated {len(questions)} questions for iteration {iteration}.")
return state
@with_retry
def refine_prompt(state: MainState) -> MainState:
"""Synthesizes Q&A into a refined prompt."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
qa_history = "\n".join([f"Q: {q}\nA: {a}" for q, a in zip(state.get("user_input_questions",[]), state.get("user_input_answers",[]))])
prompt = f"Based on the following Q&A history for project '{state['project']}', synthesize a concise 'Refined Prompt'...\nQ&A History:\n{qa_history}\n---\nOutput ONLY the refined prompt text."
response = llm.invoke(prompt) # Use LLM from state
refined_prompt_text = response.content.strip()
state["refined_prompt"] = refined_prompt_text
state["user_query_with_qa"] = qa_history
state["messages"].append(AIMessage(content=f"Refined Prompt:\n{refined_prompt_text}"))
logger.info("Refined project prompt based on Q&A.")
# Save logic remains the same
try:
project_folder_name = state.get("project_folder", "default_project")
abs_project_folder = os.path.abspath(project_folder_name)
intro_dir = os.path.join(abs_project_folder, "1_intro")
os.makedirs(intro_dir, exist_ok=True)
qa_path = os.path.join(intro_dir, "user_query_with_qa.txt")
prompt_path = os.path.join(intro_dir, "refined_prompt.md")
with open(qa_path, "w", encoding="utf-8") as f: f.write(qa_history)
with open(prompt_path, "w", encoding="utf-8") as f: f.write(refined_prompt_text)
logger.info(f"Saved Q&A history and refined prompt to {intro_dir}")
except Exception as e: logger.error(f"Failed to save intro files: {e}", exc_info=True)
return state
# --- User Story Cycle ---
@with_retry
def generate_initial_user_stories(state: MainState) -> MainState:
"""Generates initial user stories."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
prompt = f"Generate a list of user stories for project '{state['project']}' using standard format 'As a..., I want..., so that...'. Base on:\nRefined Prompt:\n{state['refined_prompt']}"
response = llm.invoke(prompt) # Use LLM from state
initial_user_stories = response.content.strip()
state["user_story_current"] = initial_user_stories
state["messages"].append(AIMessage(content=f"Initial User Stories:\n{initial_user_stories}"))
logger.info("Generated Initial User Stories.")
return state
@with_retry
def generate_user_story_feedback(state: MainState) -> MainState:
"""Generates AI feedback on user stories."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
prompt = f"Act as QA. Review user stories for clarity, atomicity, testability, alignment...\nUser Stories:\n{state.get('user_story_current', 'N/A')}\n---\nRefined Prompt (Context):\n{state.get('refined_prompt', 'N/A')[:500]}..."
response = llm.invoke(prompt) # Use LLM from state
feedback = response.content.strip()
state["user_story_feedback"] = feedback
state["messages"].append(AIMessage(content=f"User Story Feedback:\n{feedback}"))
logger.info("Generated feedback on user stories.")
return state
@with_retry
def refine_user_stories(state: MainState) -> MainState:
"""Refines user stories based on feedback."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
prompt = f"Refine user stories for '{state['project']}' based on feedback.\nCurrent Stories:\n{state.get('user_story_current', 'N/A')}\nAI FB:\n{state.get('user_story_feedback', 'N/A')}\nHuman FB:\n{state.get('user_story_human_feedback', 'N/A')}\n---\nOutput refined list."
response = llm.invoke(prompt) # Use LLM from state
refined_user_stories = response.content.strip()
state["user_story_current"] = refined_user_stories
state["messages"].append(AIMessage(content=f"Refined User Stories:\n{refined_user_stories}"))
logger.info("Refined User Stories based on feedback.")
return state
# save_final_user_story remains unchanged (no LLM calls)
def save_final_user_story(state: MainState) -> MainState:
"""Saves the final version of user stories to a file and updates the state."""
state["final_user_story"] = state.get("user_story_current", "No user stories generated.")
filepath = None # Initialize path as None
try:
abs_project_folder = os.path.abspath(state["project_folder"])
us_dir = os.path.join(abs_project_folder, "2_user_story")
os.makedirs(us_dir, exist_ok=True)
filepath = os.path.join(us_dir, "final_user_story.md")
with open(filepath, "w", encoding="utf-8") as f:
f.write(state["final_user_story"])
logger.info(f"Saved final user story to: {filepath}")
except Exception as e:
logger.error(f"Failed to save final user story: {e}", exc_info=True)
filepath = None # Ensure path is None if saving failed
state["final_user_story_path"] = filepath
return state
# --- Product Owner Review Cycle ---
@with_retry
def generate_initial_product_review(state: MainState) -> MainState:
"""Generates an initial product review."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
prompt = f"Act as Product Owner for '{state['project']}'. Review prompt and stories, assess alignment, completeness, concerns...\nPrompt:\n{state.get('refined_prompt', 'N/A')}\nStories:\n{state.get('final_user_story', 'N/A')}"
response = llm.invoke(prompt) # Use LLM from state
initial_review = response.content.strip()
state["product_review_current"] = initial_review
state["messages"].append(AIMessage(content=f"Initial Product Review:\n{initial_review}"))
logger.info("Generated initial product owner review.")
return state
@with_retry
def generate_product_review_feedback(state: MainState) -> MainState:
"""Generates AI feedback on the product review."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
prompt = f"Review the PO assessment for clarity, logic, priorities...\nPO Review:\n{state.get('product_review_current', 'N/A')}\nStories (Context):\n{state.get('final_user_story', 'N/A')[:1000]}..."
response = llm.invoke(prompt) # Use LLM from state
feedback = response.content.strip()
state["product_review_feedback"] = feedback
state["messages"].append(AIMessage(content=f"Product Review Feedback:\n{feedback}"))
logger.info("Generated feedback on product review.")
return state
@with_retry
def refine_product_review(state: MainState) -> MainState:
"""Refines the product review based on feedback."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
prompt = f"Refine the PO review for '{state['project']}' based on feedback.\nCurrent:\n{state.get('product_review_current', 'N/A')}\nAI FB:\n{state.get('product_review_feedback', 'N/A')}\nHuman FB:\n{state.get('product_review_human_feedback', 'N/A')}\n---\nOutput refined review."
response = llm.invoke(prompt) # Use LLM from state
refined_review = response.content.strip()
state["product_review_current"] = refined_review
state["messages"].append(AIMessage(content=f"Refined Product Review:\n{refined_review}"))
logger.info("Refined product owner review.")
return state
# save_final_product_review remains unchanged
def save_final_product_review(state: MainState) -> MainState:
"""Saves the final product review to a file."""
state["final_product_review"] = state.get("product_review_current", "No review generated.")
filepath = None
try:
abs_project_folder = os.path.abspath(state["project_folder"])
pr_dir = os.path.join(abs_project_folder, "3_product_review")
os.makedirs(pr_dir, exist_ok=True)
filepath = os.path.join(pr_dir, "final_product_review.md")
with open(filepath, "w", encoding="utf-8") as f:
f.write(state["final_product_review"])
logger.info(f"Saved final product review to: {filepath}")
except Exception as e:
logger.error(f"Failed to save final product review: {e}", exc_info=True)
filepath = None
state["final_product_review_path"] = filepath
return state
# --- Design Document Cycle ---
@with_retry
def generate_initial_design_doc(state: MainState) -> MainState:
"""Generates the initial design document."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
prompt = f"Act as System Architect for '{state['project']}'. Create high-level design (Arch, Components, Data, API, Tech, Deploy) based on...\nPrompt:\n{state.get('refined_prompt', 'N/A')}\nStories:\n{state.get('final_user_story', 'N/A')}\nReview:\n{state.get('final_product_review', 'N/A')}"
response = llm.invoke(prompt) # Use LLM from state
initial_doc = response.content.strip()
state["design_doc_current"] = initial_doc
state["messages"].append(AIMessage(content=f"Initial Design Document:\n{initial_doc}"))
logger.info("Generated Initial Design Document")
return state
@with_retry
def generate_design_doc_feedback(state: MainState) -> MainState:
"""Generates AI feedback on the design document."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
prompt = f"Review Design Doc for completeness, clarity, consistency, feasibility...\nDoc:\n{state.get('design_doc_current', 'N/A')}\nStories (Context):\n{state.get('final_user_story', 'N/A')[:1000]}..."
response = llm.invoke(prompt) # Use LLM from state
feedback = response.content.strip()
state["design_doc_feedback"] = feedback
state["messages"].append(AIMessage(content=f"Design Document Feedback:\n{feedback}"))
logger.info("Generated Design Document Feedback")
return state
@with_retry
def refine_design_doc(state: MainState) -> MainState:
"""Refines the design document based on feedback."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
prompt = f"Refine Design Doc for '{state['project']}' based on feedback.\nCurrent:\n{state.get('design_doc_current', 'N/A')}\nAI FB:\n{state.get('design_doc_feedback', 'N/A')}\nHuman FB:\n{state.get('design_doc_human_feedback', 'N/A')}\n---\nOutput refined doc."
response = llm.invoke(prompt) # Use LLM from state
refined_doc = response.content.strip()
state["design_doc_current"] = refined_doc
state["messages"].append(AIMessage(content=f"Refined Design Document:\n{refined_doc}"))
logger.info("Refined Design Document")
return state
# save_final_design_doc remains unchanged
def save_final_design_doc(state: MainState) -> MainState:
"""Saves the final design document."""
state["final_design_document"] = state.get("design_doc_current", "No design generated.")
filepath = None
try:
abs_project_folder = os.path.abspath(state["project_folder"])
dd_dir = os.path.join(abs_project_folder, "4_design_doc")
os.makedirs(dd_dir, exist_ok=True)
filepath = os.path.join(dd_dir, "final_design_document.md")
with open(filepath, "w", encoding="utf-8") as f: f.write(state["final_design_document"])
logger.info(f"Saved final design doc: {filepath}")
except Exception as e: logger.error(f"Failed save design doc: {e}", exc_info=True); filepath = None
state["final_design_document_path"] = filepath
return state
# --- UML Diagram Cycle ---
@with_retry
def select_uml_diagrams(state: MainState) -> MainState:
"""Selects relevant UML/DFD diagram types."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
all_diagram_types = ', '.join(PLANTUML_SYNTAX_RULES.keys())
prompt = f"Select 5 most relevant UML/DFD types for '{state['project']}' from list [{all_diagram_types}] based on Design Doc:\n{state.get('final_design_document', 'N/A')}\nJustify choices. Output ONLY JSON (DiagramSelection model)."
structured_llm = llm.with_structured_output(DiagramSelection) # Use LLM from state
response = structured_llm.invoke(prompt)
unique_types = list(dict.fromkeys(response.diagram_types))[:5]
final_justifications = response.justifications[:len(unique_types)]
state["uml_selected_diagrams"] = unique_types
display_msg = "Selected Diagrams:\n" + "\n".join(f"- {dt} - {j}" for dt, j in zip(unique_types, final_justifications))
state["messages"].append(AIMessage(content=display_msg))
logger.info(f"Selected UML Diagrams: {', '.join(unique_types)}")
return state
@with_retry
def generate_initial_uml_codes(state: MainState) -> MainState:
"""Generates initial PlantUML code for selected diagram types."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
generated_codes = []
selected_diagrams = state.get("uml_selected_diagrams", [])
if not selected_diagrams: logger.warning("No diagrams selected."); state["uml_current_codes"] = []; return state
logger.info(f"Generating initial PlantUML code for: {', '.join(selected_diagrams)}")
for diagram_type in selected_diagrams:
syntax_info = PLANTUML_SYNTAX_RULES.get(diagram_type, {})
default_code = "@startuml\n' Default template\n@enduml"
code_to_use = syntax_info.get("template", default_code)
prompt = f"Generate PlantUML code for a '{diagram_type}' for '{state['project']}'. Base on Design Doc:\n{state.get('final_design_document', 'N/A')[:2000]}...\nAdhere to syntax:\nTemplate:\n{syntax_info.get('template', 'N/A')}\nNotes: {syntax_info.get('notes', 'N/A')}\n---\nGenerate ONLY the PlantUML code block."
try:
structured_llm = llm.with_structured_output(PlantUMLCode) # Use LLM from state
response = structured_llm.invoke(prompt)
generated_code = response.code.strip() if response and response.code else ""
if validate_plantuml_code(diagram_type, generated_code): code_to_use = generated_code
else: logger.warning(f"Generated code for {diagram_type} failed validation. Using template.")
except Exception as e: logger.error(f"Failed to generate/validate PlantUML for {diagram_type}: {e}. Using template.", exc_info=True)
generated_codes.append(PlantUMLCode(diagram_type=diagram_type, code=code_to_use))
state["uml_current_codes"] = generated_codes
summary = "\n".join([f"**{c.diagram_type}**:\n```plantuml\n{c.code}\n```" for c in generated_codes])
state["messages"].append(AIMessage(content=f"Generated Initial UML Codes:\n{summary}"))
logger.info(f"Generated initial code for {len(generated_codes)} UML diagrams.")
return state
@with_retry
def generate_uml_feedback(state: MainState) -> MainState:
"""Generates AI feedback for each current UML diagram."""
# Use primary LLM from state, fallback needed? Or rely on app config? Assuming primary.
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
feedback_dict = {}
current_codes = state.get('uml_current_codes', [])
if not current_codes: logger.warning("No UML codes for feedback."); state["uml_feedback"] = {}; return state
logger.info(f"Generating feedback for {len(current_codes)} UML diagrams.")
for plantuml_code in current_codes:
diagram_type = plantuml_code.diagram_type; code_to_review = plantuml_code.code
syntax_info = PLANTUML_SYNTAX_RULES.get(diagram_type, {})
prompt = f"Review PlantUML code for '{diagram_type}' of '{state['project']}'. Check Syntax, Alignment with Design, Clarity.\nSyntax (Ref):\n{syntax_info.get('template', 'N/A')}\nNotes: {syntax_info.get('notes', 'N/A')}\nCode:\n```plantuml\n{code_to_review}\n```\nDesign (Context):\n{state.get('final_design_document', 'N/A')[:1000]}...\n---\nProvide feedback."
try:
# Maybe use OPENAI_LLM if available and different? For now, use primary.
response = llm.invoke(prompt) # Use LLM from state
feedback_dict[diagram_type] = response.content.strip()
except Exception as e: logger.error(f"Failed feedback for {diagram_type}: {e}"); feedback_dict[diagram_type] = f"Error: {e}"
state["uml_feedback"] = feedback_dict
summary = "\n\n".join([f"**Feedback for {dt}:**\n{fb}" for dt, fb in feedback_dict.items()])
state["messages"].append(AIMessage(content=f"UML Feedback Provided:\n{summary}"))
logger.info("Generated feedback for all current UML diagrams.")
return state
@with_retry
def refine_uml_codes(state: MainState) -> MainState:
"""Refines UML codes based on feedback."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
refined_codes_list = []
current_codes = state.get('uml_current_codes', [])
ai_feedback = state.get('uml_feedback', {})
human_feedback = state.get('uml_human_feedback', {})
if not current_codes: logger.warning("No UML codes to refine."); return state
logger.info(f"Refining {len(current_codes)} UML diagrams.")
for plantuml_code_obj in current_codes:
diagram_type = plantuml_code_obj.diagram_type; current_code = plantuml_code_obj.code
syntax_info = PLANTUML_SYNTAX_RULES.get(diagram_type, {})
specific_human_feedback = human_feedback.get(diagram_type, human_feedback.get('all', 'N/A'))
prompt = f"Refine PlantUML for '{diagram_type}' of '{state['project']}' based on feedback.\nSyntax (Ref):\n{syntax_info.get('template', 'N/A')}\nNotes: {syntax_info.get('notes', 'N/A')}\nCurrent:\n```plantuml\n{current_code}\n```\nAI FB:\n{ai_feedback.get(diagram_type, 'N/A')}\nHuman FB:\n{specific_human_feedback}\n---\nGenerate ONLY refined PlantUML block."
try:
structured_llm = llm.with_structured_output(PlantUMLCode) # Use LLM from state
response = structured_llm.invoke(prompt)
refined_code = response.code.strip() if response and response.code else ""
if validate_plantuml_code(diagram_type, refined_code):
refined_codes_list.append(PlantUMLCode(diagram_type=diagram_type, code=refined_code))
else: logger.warning(f"Refined {diagram_type} invalid. Reverting."); refined_codes_list.append(plantuml_code_obj)
except Exception as e: logger.error(f"Failed refine {diagram_type}: {e}. Reverting.", exc_info=True); refined_codes_list.append(plantuml_code_obj)
state["uml_current_codes"] = refined_codes_list
summary = "\n".join([f"**{c.diagram_type} (Refined):**\n```plantuml\n{c.code}\n```" for c in refined_codes_list])
state["messages"].append(AIMessage(content=f"Refined UML Codes:\n{summary}"))
logger.info(f"Refined {len(refined_codes_list)} UML diagrams.")
return state
# save_final_uml_diagrams remains unchanged (no LLM calls)
def save_final_uml_diagrams(state: MainState) -> MainState:
"""Saves the final Puml files and attempts to generate PNGs."""
state["final_uml_codes"] = state.get("uml_current_codes", [])
png_paths = [] # List to store paths of successfully generated PNGs
uml_dir = None
try:
abs_project_folder = os.path.abspath(state["project_folder"])
uml_dir = os.path.join(abs_project_folder, "5_uml_diagrams")
os.makedirs(uml_dir, exist_ok=True)
state["final_uml_diagram_folder"] = uml_dir # Store path to folder
can_generate_png = False
server = None
try:
server = PlantUML(url="http://www.plantuml.com/plantuml/png/")
can_generate_png = True
logger.info("PlantUML server connection appears OK.")
except Exception as p_e:
logger.warning(f"PlantUML server connection failed: {p_e}. PNG generation will be skipped. Check Java/PlantUML setup and network connectivity.", exc_info=True)
if not state["final_uml_codes"]:
logger.warning("No UML codes found to save."); state["final_uml_png_paths"] = []; return state
logger.info(f"Saving {len(state['final_uml_codes'])} UML diagrams to {uml_dir}...")
for i, pc in enumerate(state["final_uml_codes"], 1):
safe_type_name = "".join(c if c.isalnum() or c in ['_','-'] else '_' for c in pc.diagram_type).lower()
name = f"diagram_{i}_{safe_type_name}"
puml_path = os.path.join(uml_dir, f"{name}.puml")
png_path = os.path.join(uml_dir, f"{name}.png")
try:
with open(puml_path, "w", encoding="utf-8") as f: f.write(pc.code)
logger.debug(f"Saved PUML file: {puml_path}")
except Exception as file_e: logger.error(f"Error saving PUML file {puml_path}: {file_e}", exc_info=True); continue
if can_generate_png and server:
logger.debug(f"Attempting PNG generation for {name}...")
try:
server.processes_file(filename=puml_path, outfile=png_path)
if os.path.exists(png_path) and os.path.getsize(png_path) > 0:
logger.info(f"Successfully generated PNG: {png_path}"); png_paths.append(png_path)
else: logger.error(f"PlantUML processed '{name}' but output PNG is missing or empty: {png_path}")
except FileNotFoundError as fnf_err: logger.error(f"PNG generation failed for {name}: Executable/Java not found? Error: {fnf_err}", exc_info=False)
except Exception as png_e: logger.error(f"PNG generation failed for {name} ({pc.diagram_type}): {png_e}", exc_info=False)
elif not can_generate_png: logger.debug(f"Skipping PNG generation for {name} due to server connection issue.")
state["final_uml_png_paths"] = png_paths
logger.info(f"Finished UML saving. Saved {len(state['final_uml_codes'])} PUML files. Generated {len(png_paths)} PNG files.")
except Exception as e:
logger.error(f"General error in save_final_uml_diagrams: {e}", exc_info=True)
state["final_uml_diagram_folder"] = None; state["final_uml_png_paths"] = []
return state
# --- Code Generation Cycle ---
@with_retry
def generate_initial_code(state: MainState) -> MainState:
"""Generates the initial codebase."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
uml_types = ', '.join([c.diagram_type for c in state.get('final_uml_codes', [])])
prompt = f"Generate complete, runnable '{state['coding_language']}' project for '{state['project']}'. Base on Design Doc, User Stories, and UML ({uml_types}). Include main scripts, modules, requirements, basic README, comments.\nDesign:\n{state.get('final_design_document', 'N/A')}\nStories (Context):\n{state.get('final_user_story', 'N/A')}...\n---\nOutput ONLY JSON (GeneratedCode model)."
structured_llm = llm.with_structured_output(GeneratedCode) # Use LLM from state
response = structured_llm.invoke(prompt)
if not response or not isinstance(response, GeneratedCode) or not response.files:
logger.error("Initial code gen failed or invalid format."); raise ValueError("Did not produce expected file structure.")
state["code_current"] = response
summary = f"Generated {len(response.files)} files. Key: {', '.join([f.filename for f in response.files[:3]])}...\nInstructions:\n{response.instructions[:200]}..."
state["messages"].append(AIMessage(content=f"Initial Code Generation:\n{summary}"))
logger.info(f"Generated initial code with {len(response.files)} files.")
return state
@with_retry
def web_search_code(state: MainState) -> MainState:
"""Performs web search based on user feedback."""
tavily = state.get('tavily_instance') # Use Tavily from state
if not tavily: logger.warning("Tavily client not in state, skipping web search."); state["code_web_search_results"] = "Skipped (Tavily client not configured)"; state["messages"].append(AIMessage(content="Web Search: Skipped")); return state
if 'messages' not in state: state['messages'] = []
human_input = state.get('code_human_input', '')
if not human_input or not human_input.strip(): logger.info("Skipping web search - no issue provided."); state["code_web_search_results"] = "Skipped (No specific issue)"; state["messages"].append(AIMessage(content="Web Search: Skipped")); return state
human_input_summary = human_input[:200]; coding_language = state.get('coding_language', 'programming'); project_context = state.get('project', 'project')[:50]
search_query = f"{coding_language} issues related to '{human_input_summary}' in {project_context}"
logger.info(f"Performing Tavily search: {search_query}")
try:
response = tavily.search(query=search_query, search_depth="basic", max_results=3) # Use tavily from state
search_results = response.get("results", [])
if search_results:
results_text = "\n\n".join([f"**{r.get('title', 'N/A')}**\nURL: {r.get('url', 'N/A')}\nSnippet: {r.get('content', 'N/A')[:300]}..." for r in search_results])
state["code_web_search_results"] = results_text; logger.info(f"Tavily found {len(search_results)} results.")
else: state["code_web_search_results"] = "No relevant results found."; logger.info("Tavily found no results.")
except Exception as e:
error_detail = str(e); logger.error(f"Tavily search failed: {error_detail}", exc_info=True); state["code_web_search_results"] = f"Error during web search: {e}"
summary = state['code_web_search_results'][:500] + ('...' if len(state['code_web_search_results']) > 500 else '')
state["messages"].append(AIMessage(content=f"Web Search Summary:\n{summary}"))
logger.info("Completed Web Search.")
return state
@with_retry
def generate_code_feedback(state: MainState) -> MainState:
"""Generates AI feedback on the current code."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
func_name = "generate_code_feedback"
code_c = state.get("code_current"); instructions = ""
# --- CORRECTED LOOP ---
code_str_parts = []; total_len = 0; max_code_len = 250000
files_to_process = code_c.files if code_c and isinstance(code_c, GeneratedCode) else []
if not files_to_process: logger.warning(f"No files in code_current for {func_name}"); code_content = "No code files provided."; instructions = "N/A"
else:
instructions = code_c.instructions
for file in files_to_process:
header = f"--- {file.filename} ---\n"; remaining_len = max_code_len - total_len - len(header)
if remaining_len <= 0: code_str_parts.append("\n*... (Code context truncated)*"); logger.debug(f"Code context truncated for {func_name}"); break
snippet = file.content[:remaining_len]; is_truncated = len(file.content) > remaining_len
code_str_parts.append(header + snippet + ('...' if is_truncated else '')); total_len += len(header) + len(snippet)
if total_len >= max_code_len:
if not code_str_parts[-1].endswith("truncated)*"): code_str_parts.append("\n*... (Code context truncated)*")
logger.debug(f"Code context max length for {func_name}"); break
code_content = "\n".join(code_str_parts)
# --- END CORRECTED LOOP ---
prompt = f"Act as reviewer for '{state['project']}' ({state['coding_language']}). Review code, instructions, user feedback, search results. Suggest improvements.\nCode:\n{code_content}\nInstr:\n{instructions}\nUser FB:\n{state.get('code_human_input', 'N/A')}\nSearch:\n{state.get('code_web_search_results', 'N/A')}\n---\nProvide feedback."
response = llm.invoke(prompt) # Use LLM from state
feedback_text = response.content.strip()
state["code_feedback"] = feedback_text
state["messages"].append(AIMessage(content=f"AI Code Feedback:\n{feedback_text}"))
logger.info("Generated AI feedback on the code.")
return state
@with_retry
def refine_code(state: MainState) -> MainState:
"""Refines the code based on feedback."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
func_name = "refine_code"
code_c = state.get("code_current"); instructions = ""
# --- CORRECTED LOOP ---
code_str_parts = []; total_len = 0; max_code_len = 25000
files_to_process = code_c.files if code_c and isinstance(code_c, GeneratedCode) else []
if not files_to_process: logger.warning(f"No files in code_current for {func_name}"); code_content = "No previous code."; instructions = state.get("code_current", GeneratedCode(files=[], instructions="")).instructions
else:
instructions = code_c.instructions
for file in files_to_process:
header = f"--- {file.filename} ---\n"; remaining_len = max_code_len - total_len - len(header)
if remaining_len <= 0: code_str_parts.append("\n*... (Code context truncated)*"); logger.debug(f"Code context truncated for {func_name}"); break
snippet = file.content[:remaining_len]; is_truncated = len(file.content) > remaining_len
code_str_parts.append(header + snippet + ('...' if is_truncated else '')); total_len += len(header) + len(snippet)
if total_len >= max_code_len:
if not code_str_parts[-1].endswith("truncated)*"): code_str_parts.append("\n*... (Code context truncated)*")
logger.debug(f"Code context max length for {func_name}"); break
code_content = "\n".join(code_str_parts)
# --- END CORRECTED LOOP ---
prompt = f"Act as senior {state['coding_language']} dev refining '{state['project']}'. Update code based on all feedback. Address bugs, improve style, update instructions if needed.\nCode:\n{code_content}\nInstr:\n{instructions}\nUser Exec FB:\n{state.get('code_human_input','N/A')}\nSearch:\n{state.get('code_web_search_results','N/A')}\nAI Review:\n{state.get('code_feedback','N/A')}\nHuman Comments:\n{state.get('code_human_feedback','N/A')}\n---\nOutput ONLY JSON (GeneratedCode model)."
structured_llm = llm.with_structured_output(GeneratedCode) # Use LLM from state
response = structured_llm.invoke(prompt)
if not response or not isinstance(response, GeneratedCode) or not response.files:
logger.error("Code refinement failed or invalid format."); raise ValueError("Did not produce expected file structure.")
state["code_current"] = response
summary = f"Refined code - {len(response.files)} files. Instructions:\n{response.instructions[:200]}..."
state["messages"].append(AIMessage(content=f"Refined Code:\n{summary}"))
logger.info(f"Refined code, resulting in {len(response.files)} files.")
return state
# --- Code Review & Security Cycle ---
@with_retry
def code_review(state: MainState) -> MainState:
"""Performs code review on final_code_files."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
func_name = "code_review"
code_files_to_review = state.get("final_code_files", [])
if not code_files_to_review: logger.warning(f"No files in final_code_files for {func_name}"); state["code_review_current_feedback"] = "No code available."; state["messages"].append(AIMessage(content="Code Review: No code.")); return state
# --- CORRECTED LOOP ---
code_str_parts = []; total_len = 0; max_code_len = 25000
instructions = state.get("code_current", GeneratedCode(files=[], instructions="")).instructions
files_to_process = code_files_to_review
for file in files_to_process:
header = f"--- {file.filename} ---\n"; remaining_len = max_code_len - total_len - len(header)
if remaining_len <= 0: code_str_parts.append("\n*... (Code context truncated)*"); logger.debug(f"Code context truncated for {func_name}"); break
snippet = file.content[:remaining_len]; is_truncated = len(file.content) > remaining_len
code_str_parts.append(header + snippet + ('...' if is_truncated else '')); total_len += len(header) + len(snippet)
if total_len >= max_code_len:
if not code_str_parts[-1].endswith("truncated)*"): code_str_parts.append("\n*... (Code context truncated)*")
logger.debug(f"Code context max length for {func_name}"); break
code_content = "\n".join(code_str_parts)
# --- END CORRECTED LOOP ---
prompt = f"Perform detailed code review for '{state['project']}' ({state['coding_language']}). Focus on best practices, readability, logic, efficiency, robustness.\nCode:\n{code_content}\nInstr:\n{instructions}\n---\nProvide feedback."
response = llm.invoke(prompt) # Use LLM from state
feedback = response.content.strip()
state["code_review_current_feedback"] = feedback
state["messages"].append(AIMessage(content=f"Code Review:\n{feedback}"))
logger.info("Performed code review.")
return state
@with_retry
def security_check(state: MainState) -> MainState:
"""Performs security check on final_code_files."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
func_name = "security_check"
code_files_to_check = state.get("final_code_files", [])
if not code_files_to_check: logger.warning(f"No files in final_code_files for {func_name}"); state["security_current_feedback"] = "No code available."; state["messages"].append(AIMessage(content="Security Check: No code.")); return state
# --- CORRECTED LOOP ---
code_str_parts = []; total_len = 0; max_code_len = 25000
instructions = state.get("code_current", GeneratedCode(files=[], instructions="")).instructions
files_to_process = code_files_to_check
for file in files_to_process:
header = f"--- {file.filename} ---\n"; remaining_len = max_code_len - total_len - len(header)
if remaining_len <= 0: code_str_parts.append("\n*... (Code context truncated)*"); logger.debug(f"Code context truncated for {func_name}"); break
snippet = file.content[:remaining_len]; is_truncated = len(file.content) > remaining_len
code_str_parts.append(header + snippet + ('...' if is_truncated else '')); total_len += len(header) + len(snippet)
if total_len >= max_code_len:
if not code_str_parts[-1].endswith("truncated)*"): code_str_parts.append("\n*... (Code context truncated)*")
logger.debug(f"Code context max length for {func_name}"); break
code_content = "\n".join(code_str_parts)
# --- END CORRECTED LOOP ---
prompt = f"Act as security expert. Analyze {state['coding_language']} code for '{state['project']}'. Check for injection, XSS, auth issues, data exposure, input validation, misconfigs, vulnerable deps.\nCode:\n{code_content}\nInstr:\n{instructions}\n---\nProvide findings, impact, remediation."
response = llm.invoke(prompt) # Use LLM from state
feedback = response.content.strip()
state["security_current_feedback"] = feedback
state["messages"].append(AIMessage(content=f"Security Check:\n{feedback}"))
logger.info("Performed security check.")
return state
@with_retry
def refine_code_with_reviews(state: MainState) -> MainState:
"""Refines code based on review, security, and human feedback."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
func_name = "refine_code_with_reviews"
code_files_to_refine = state.get("final_code_files", [])
if not code_files_to_refine: logger.error(f"No files in final_code_files for {func_name}"); raise ValueError("No code available.")
instructions = state.get("code_current", GeneratedCode(files=[], instructions="")).instructions
# --- CORRECTED LOOP ---
code_str_parts = []; total_len = 0; max_code_len = 25000
files_to_process = code_files_to_refine
if not files_to_process: logger.warning(f"No files for {func_name}"); code_content = "No previous code."
else:
for file in files_to_process:
header = f"--- {file.filename} ---\n"; remaining_len = max_code_len - total_len - len(header)
if remaining_len <= 0: code_str_parts.append("\n*... (Code context truncated)*"); logger.debug(f"Code context truncated for {func_name}"); break
snippet = file.content[:remaining_len]; is_truncated = len(file.content) > remaining_len
code_str_parts.append(header + snippet + ('...' if is_truncated else '')); total_len += len(header) + len(snippet)
if total_len >= max_code_len:
if not code_str_parts[-1].endswith("truncated)*"): code_str_parts.append("\n*... (Code context truncated)*")
logger.debug(f"Code context max length for {func_name}"); break
code_content = "\n".join(code_str_parts)
# --- END CORRECTED LOOP ---
prompt = f"Refine {state['coding_language']} code for '{state['project']}'. Incorporate Code Review, Security Analysis, User Comments. Prioritize security/critical points. Update instructions if needed.\nCode:\n{code_content}\nInstr:\n{instructions}\nReview FB:\n{state.get('code_review_current_feedback', 'N/A')}\nSecurity FB:\n{state.get('security_current_feedback', 'N/A')}\nUser FB:\n{state.get('review_security_human_feedback', 'N/A')}\n---\nOutput ONLY JSON (GeneratedCode model)."
structured_llm = llm.with_structured_output(GeneratedCode) # Use LLM from state
response = structured_llm.invoke(prompt)
if not response or not isinstance(response, GeneratedCode) or not response.files:
logger.error("Code refinement post-review failed/invalid."); raise ValueError("Did not produce expected file structure.")
state["final_code_files"] = response.files; state["code_current"] = response
summary = f"Refined code ({len(response.files)} files) post-review."
state["messages"].append(AIMessage(content=f"Code Refined Post-Review:\n{summary}"))
logger.info(f"Refined code post-review, {len(response.files)} files.")
return state
# save_review_security_outputs remains unchanged
def save_review_security_outputs(state: MainState) -> MainState:
"""Saves review/security feedback and the corresponding code snapshot."""
state["final_code_review"] = state.get("code_review_current_feedback", "N/A")
state["final_security_issues"] = state.get("security_current_feedback", "N/A")
rs_dir, code_snap_dir = None, None # Initialize paths
try:
abs_project_folder = os.path.abspath(state["project_folder"])
rs_dir = os.path.join(abs_project_folder, "6_review_security")
os.makedirs(rs_dir, exist_ok=True)
code_snap_dir = os.path.join(rs_dir, "code_snapshot")
os.makedirs(code_snap_dir, exist_ok=True)
# Store paths in state
state["final_review_security_folder"] = rs_dir
state["review_code_snapshot_folder"] = code_snap_dir
# Save feedback files
review_path = os.path.join(rs_dir, "final_code_review.md")
security_path = os.path.join(rs_dir, "final_security_issues.md")
with open(review_path, "w", encoding="utf-8") as f: f.write(state["final_code_review"])
with open(security_path, "w", encoding="utf-8") as f: f.write(state["final_security_issues"])
logger.debug(f"Saved review feedback files to {rs_dir}")
# Save the code snapshot (should be the version just refined)
files_to_save = state.get("final_code_files", [])
instructions = state.get("code_current", GeneratedCode(files=[], instructions="")).instructions
if files_to_save:
logger.info(f"Saving {len(files_to_save)} code files to snapshot folder: {code_snap_dir}")
for file in files_to_save:
filename = file.filename; content = file.content
relative_path = filename.lstrip('/\\'); filepath = os.path.normpath(os.path.join(code_snap_dir, relative_path))
if not os.path.abspath(filepath).startswith(os.path.abspath(code_snap_dir)):
logger.warning(f"Attempted path traversal! Skipping file: {filename} -> {filepath}"); continue
try:
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, "w", encoding="utf-8") as f: f.write(content)
logger.debug(f"Saved code file: {filepath}")
except OSError as path_err: logger.error(f"Could not create directory or save file '{filepath}': {path_err}")
except Exception as write_err: logger.error(f"Error writing file '{filepath}': {write_err}")
try: # Save instructions
instr_path = os.path.join(code_snap_dir, "instructions.md")
with open(instr_path, "w", encoding="utf-8") as f: f.write(instructions)
logger.debug(f"Saved instructions file: {instr_path}")
except Exception as instr_err: logger.error(f"Error writing instructions file: {instr_err}")
logger.info(f"Finished saving review/security outputs and code snapshot to {rs_dir}")
else: logger.warning("No code files found in 'final_code_files' to save for review snapshot.")
except Exception as e:
logger.error(f"General error in save_review_security_outputs: {e}", exc_info=True)
state["final_review_security_folder"] = None; state["review_code_snapshot_folder"] = None
return state
# --- Test Case Generation Cycle ---
@with_retry
def generate_initial_test_cases(state: MainState) -> MainState:
"""Generates initial test cases."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
func_name = "generate_initial_test_cases"
# --- RECOMMENDED: Use corrected loop ---
code_str_parts = []; total_len = 0; max_code_len = 25000
files_to_process = state.get("final_code_files", [])
if not files_to_process: logger.warning(f"No files for {func_name}"); code_str = "No code files provided."
else:
for file in files_to_process:
header = f"--- {file.filename} ---\n"; remaining_len = max_code_len - total_len - len(header)
if remaining_len <= 0: code_str_parts.append("\n*... (Code context truncated)*"); break
snippet = file.content[:remaining_len]; is_truncated = len(file.content) > remaining_len
code_str_parts.append(header + snippet + ('...' if is_truncated else '')); total_len += len(header) + len(snippet)
if total_len >= max_code_len:
if not code_str_parts[-1].endswith("truncated)*"): code_str_parts.append("\n*... (Code context truncated)*")
break
code_str = "\n".join(code_str_parts)
# --- END RECOMMENDED LOOP ---
if not state.get("final_code_files"): raise ValueError("No code found for test case generation.")
prompt = f"Generate >=3 diverse test cases (happy, edge, error) for '{state['project']}' ({state['coding_language']}). Base on stories, design, code.\nStories:\n{state.get('final_user_story', 'N/A')[:1000]}...\nDesign:\n{state.get('final_design_document', 'N/A')[:1000]}...\nCode:\n{code_str}\n---\nOutput ONLY JSON (TestCases model)."
structured_llm = llm.with_structured_output(TestCases) # Use LLM from state
response = structured_llm.invoke(prompt)
if not response or not isinstance(response, TestCases) or not response.test_cases:
logger.error("Test case gen failed/invalid."); raise ValueError("Did not produce valid test cases.")
state["test_cases_current"] = response.test_cases
summary = "\n".join([f"- {tc.description}" for tc in response.test_cases])
state["messages"].append(AIMessage(content=f"Generated Initial Test Cases:\n{summary}"))
logger.info(f"Generated {len(response.test_cases)} initial test cases.")
return state
@with_retry
def generate_test_cases_feedback(state: MainState) -> MainState:
"""Generates AI feedback on test cases."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
current_tests = state.get("test_cases_current", [])
if not current_tests: logger.warning("No test cases for feedback."); state["test_cases_feedback"] = "No tests found."; return state
tests_str = "\n".join([f"- {tc.description}: Input={tc.input_data}, Expected={tc.expected_output}" for tc in current_tests])
code_files = state.get("final_code_files", []); code_sample = code_files[0].content[:500] + '...' if code_files else "N/A"
prompt = f"Review test cases for '{state['project']}'. Assess coverage, clarity, effectiveness, realism. Suggest improvements.\nTests:\n{tests_str}\nStories (Context):\n{state.get('final_user_story', 'N/A')[:1000]}...\nCode (Context):\n{code_sample}\n---\nProvide feedback."
response = llm.invoke(prompt) # Use LLM from state
feedback = response.content.strip()
state["test_cases_feedback"] = feedback
state["messages"].append(AIMessage(content=f"Test Case Feedback:\n{feedback}"))
logger.info("Generated feedback on test cases.")
return state
@with_retry
def refine_test_cases_and_code(state: MainState) -> MainState:
"""Refines test cases and code based on feedback."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
func_name = "refine_test_cases_and_code"
current_tests = state.get("test_cases_current", []); current_code_files = state.get("final_code_files", [])
instructions = state.get("code_current", GeneratedCode(files=[], instructions="")).instructions
if not current_tests or not current_code_files: logger.error(f"Missing tests or code for {func_name}"); raise ValueError("Missing data.")
tests_str = "\n".join([f"- {tc.description}: Input={tc.input_data}, Expected={tc.expected_output}" for tc in current_tests])
# --- CORRECTED LOOP ---
code_str_parts = []; total_len = 0; max_code_len = 25000
files_to_process = current_code_files
if not files_to_process: logger.warning(f"No files for {func_name}"); code_str = "No code."
else:
for file in files_to_process:
header = f"--- {file.filename} ---\n"; remaining_len = max_code_len - total_len - len(header)
if remaining_len <= 0: code_str_parts.append("\n*... (Code context truncated)*"); logger.debug(f"Code context truncated for {func_name}"); break
snippet = file.content[:remaining_len]; is_truncated = len(file.content) > remaining_len
code_str_parts.append(header + snippet + ('...' if is_truncated else '')); total_len += len(header) + len(snippet)
if total_len >= max_code_len:
if not code_str_parts[-1].endswith("truncated)*"): code_str_parts.append("\n*... (Code context truncated)*")
logger.debug(f"Code context max length for {func_name}"); break
code_str = "\n".join(code_str_parts)
# --- END CORRECTED LOOP ---
class TestAndCode(BaseModel):
test_cases: List[TestCase]; files: List[CodeFile]
prompt = f"Tests failed for '{state['project']}'. Refine BOTH tests AND code based on feedback. Goal: refined code passes refined tests.\nTests:\n{tests_str}\nCode:\n{code_str}\nInstr:\n{instructions}\nAI Test FB:\n{state.get('test_cases_feedback','N/A')}\nHuman FB/Results:\n{state.get('test_cases_human_feedback','N/A')}\n---\nOutput ONLY JSON (TestAndCode model)."
structured_llm = llm.with_structured_output(TestAndCode) # Use LLM from state
response = structured_llm.invoke(prompt)
if not response or not isinstance(response, TestAndCode) or not response.test_cases or not response.files:
logger.error("Refinement of tests/code failed/invalid."); raise ValueError("Did not produce expected results.")
state["test_cases_current"] = response.test_cases; state["final_code_files"] = response.files
state["code_current"] = GeneratedCode(files=response.files, instructions=instructions) # Keep old instructions
summary = f"Refined {len(response.files)} code files & {len(response.test_cases)} tests."
state["messages"].append(AIMessage(content=f"Refined Tests and Code:\n{summary}"))
logger.info("Refined test cases and code.")
return state
# save_testing_outputs remains unchanged
def save_testing_outputs(state: MainState) -> MainState:
"""Saves the final tests and the code version that passed them."""
state["final_test_code_files"] = state.get("final_code_files", [])
final_tests = state.get("test_cases_current", [])
test_dir, code_snap_dir = None, None
try:
abs_project_folder = os.path.abspath(state["project_folder"])
test_dir = os.path.join(abs_project_folder, "7_testing"); os.makedirs(test_dir, exist_ok=True)
code_snap_dir = os.path.join(test_dir, "passed_code"); os.makedirs(code_snap_dir, exist_ok=True)
state["final_testing_folder"] = test_dir; state["testing_passed_code_folder"] = code_snap_dir
# Save test cases file
tc_path = os.path.join(test_dir, "final_test_cases.md")
tc_str = "\n\n".join([f"**{tc.description}**\nInput:`{tc.input_data}`\nExpected:`{tc.expected_output}`" for tc in final_tests])
with open(tc_path, "w", encoding="utf-8") as f: f.write(f"# Final Test Cases ({len(final_tests)} Passed)\n\n{tc_str}")
logger.debug(f"Saved test cases file: {tc_path}")
# Save the code snapshot that passed
passed_code_files = state.get("final_test_code_files",[]);
instructions = state.get("code_current", GeneratedCode(files=[],instructions="")).instructions
if passed_code_files:
logger.info(f"Saving {len(passed_code_files)} passed code files to snapshot: {code_snap_dir}")
for file in passed_code_files: # Save files with path safety
fn=file.filename; content=file.content; safe_fn=os.path.basename(fn)
if not safe_fn or ('/' in fn and '..' in fn) or ('\\' in fn and '..' in fn): logger.warning(f"Skip unsafe file: {fn}"); continue
rel_path=fn.lstrip('/\\'); filepath=os.path.normpath(os.path.join(code_snap_dir, rel_path))
if not os.path.abspath(filepath).startswith(os.path.abspath(code_snap_dir)): logger.warning(f"Skip traversal: {fn}"); continue
try:
os.makedirs(os.path.dirname(filepath), exist_ok=True);
with open(filepath, "w", encoding="utf-8") as f: f.write(content)
logger.debug(f"Saved code file: {filepath}")
except OSError as path_err: logger.error(f"Path error saving '{filepath}': {path_err}")
except Exception as write_err: logger.error(f"Error writing '{filepath}': {write_err}")
try: # Save instructions
instr_path = os.path.join(code_snap_dir, "instructions.md")
with open(instr_path,"w",encoding="utf-8") as f: f.write(instructions)
logger.debug(f"Saved instructions: {instr_path}")
except Exception as instr_err: logger.error(f"Error writing instructions: {instr_err}")
logger.info(f"Finished saving testing outputs and passed code to {test_dir}")
else: logger.warning("No passed code files found in state to save.")
except Exception as e: logger.error(f"Failed save testing outputs: {e}", exc_info=True); state["final_testing_folder"]=None; state["testing_passed_code_folder"]=None
return state
# --- Quality Analysis Cycle ---
@with_retry
def generate_initial_quality_analysis(state: MainState) -> MainState:
"""Generates an overall quality analysis report."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
func_name = "generate_initial_quality_analysis"
code_files_passed = state.get("final_test_code_files", [])
instructions = state.get("code_current", GeneratedCode(files=[], instructions="")).instructions
if not code_files_passed: logger.warning(f"No tested code for {func_name}."); state["quality_current_analysis"] = "No passed code available."; return state
# --- CORRECTED LOOP ---
code_str_parts = []; total_len = 0; max_code_len = 25000
files_to_process = code_files_passed
if not files_to_process: logger.error(f"Logic error: files_to_process empty in {func_name}"); code_str = "Error retrieving code."
else:
for file in files_to_process:
header = f"--- {file.filename} ---\n"; remaining_len = max_code_len - total_len - len(header)
if remaining_len <= 0: code_str_parts.append("\n*... (Code context truncated)*"); logger.debug(f"Code context truncated for {func_name}"); break
snippet = file.content[:remaining_len]; is_truncated = len(file.content) > remaining_len
code_str_parts.append(header + snippet + ('...' if is_truncated else '')); total_len += len(header) + len(snippet)
if total_len >= max_code_len:
if not code_str_parts[-1].endswith("truncated)*"): code_str_parts.append("\n*... (Code context truncated)*")
logger.debug(f"Code context max length for {func_name}"); break
code_str = "\n".join(code_str_parts)
# --- END CORRECTED LOOP ---
tests_str = "\n".join([f"- {tc.description}" for tc in state.get("test_cases_current", [])])[:500] + "..."
prompt = f"Generate QA report for '{state['project']}' ({state['coding_language']}). Code passed tests. Assess Maintainability, Perf, Scale, Security, Coverage, Docs, Confidence Score (1-10).\nCode:\n{code_str}\nTests:\n{tests_str}\nInstr:\n{instructions}\nReview Sum:\n{state.get('final_code_review','N/A')[:500]}...\nSecurity Sum:\n{state.get('final_security_issues','N/A')[:500]}...\n---"
response = llm.invoke(prompt) # Use LLM from state
qa_report = response.content.strip()
state["quality_current_analysis"] = qa_report
state["messages"].append(AIMessage(content=f"Initial Quality Analysis Report:\n{qa_report}"))
logger.info("Generated Initial Quality Analysis Report.")
return state
@with_retry
def generate_quality_feedback(state: MainState) -> MainState:
"""Generates AI feedback on the QA report."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
current_qa_report = state.get('quality_current_analysis', 'N/A')
if current_qa_report == 'N/A': logger.warning("No QA report for feedback."); state["quality_feedback"] = "No QA report."; return state
prompt = f"Review QA report for '{state['project']}'. Critique fairness, comprehensiveness, logic, missing aspects.\nReport:\n{current_qa_report}"
response = llm.invoke(prompt) # Use LLM from state
feedback = response.content.strip()
state["quality_feedback"] = feedback
state["messages"].append(AIMessage(content=f"Feedback on QA Report:\n{feedback}"))
logger.info("Generated feedback on the Quality Analysis report.")
return state
@with_retry
def refine_quality_and_code(state: MainState) -> MainState:
"""Refines QA report and potentially minor code aspects."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
func_name = "refine_quality_and_code"
code_files_base = state.get("final_test_code_files", [])
instructions = state.get("code_current", GeneratedCode(files=[], instructions="")).instructions
# --- CORRECTED LOOP ---
code_str_parts = []; total_len = 0; max_code_len = 25000
files_to_process = code_files_base
if not files_to_process: logger.warning(f"No tested code for {func_name}"); code_content = "N/A"
else:
for file in files_to_process:
header = f"--- {file.filename} ---\n"; remaining_len = max_code_len - total_len - len(header)
if remaining_len <= 0: code_str_parts.append("\n*... (Code context truncated)*"); logger.debug(f"Code context truncated for {func_name}"); break
snippet = file.content[:remaining_len]; is_truncated = len(file.content) > remaining_len
code_str_parts.append(header + snippet + ('...' if is_truncated else '')); total_len += len(header) + len(snippet)
if total_len >= max_code_len:
if not code_str_parts[-1].endswith("truncated)*"): code_str_parts.append("\n*... (Code context truncated)*")
logger.debug(f"Code context max length for {func_name}"); break
code_content = "\n".join(code_str_parts)
# --- END CORRECTED LOOP ---
class QualityAndCode(BaseModel):
analysis: str; files: List[CodeFile]
prompt = f"Refine QA report for '{state['project']}' based on feedback. Also apply *minor, non-functional* code improvements (docs, names) suggested by feedback to 'Passed Code' if simple, else return original files.\nQA Report:\n{state.get('quality_current_analysis','N/A')}\nPassed Code:\n{code_content}\nInstr:\n{instructions}\nAI FB:\n{state.get('quality_feedback','N/A')}\nHuman FB:\n{state.get('quality_human_feedback','N/A')}\n---\nOutput ONLY JSON (QualityAndCode model)."
structured_llm = llm.with_structured_output(QualityAndCode) # Use LLM from state
response = structured_llm.invoke(prompt)
if not response or not isinstance(response, QualityAndCode) or not response.analysis:
logger.error("Refinement of QA report failed/invalid."); raise ValueError("Did not produce expected result.")
state["quality_current_analysis"] = response.analysis; state["final_code_files"] = response.files
current_instructions = state.get("code_current", GeneratedCode(files=[],instructions="")).instructions
state["code_current"] = GeneratedCode(files=response.files, instructions=current_instructions)
state["messages"].append(AIMessage(content=f"Refined Quality Analysis Report:\n{state['quality_current_analysis']}"))
logger.info("Refined Quality Analysis report.")
return state
# save_final_quality_analysis remains unchanged
def save_final_quality_analysis(state: MainState) -> MainState:
"""Saves the final QA report and the associated final code snapshot."""
state["final_quality_analysis"] = state.get("quality_current_analysis", "N/A")
qa_dir, code_snap_dir, qa_path = None, None, None
try:
abs_project_folder = os.path.abspath(state["project_folder"])
qa_dir = os.path.join(abs_project_folder, "8_quality_analysis"); os.makedirs(qa_dir, exist_ok=True)
qa_path = os.path.join(qa_dir, "final_quality_analysis.md")
with open(qa_path, "w", encoding="utf-8") as f: f.write(state["final_quality_analysis"])
state["final_quality_analysis_path"] = qa_path; logger.info(f"Saved final QA report: {qa_path}")
code_snap_dir = os.path.join(qa_dir, "final_code"); os.makedirs(code_snap_dir, exist_ok=True)
state["final_code_folder"] = code_snap_dir
files_to_save = state.get("final_code_files",[]); instructions = state.get("code_current", GeneratedCode(files=[],instructions="")).instructions
if files_to_save:
logger.info(f"Saving final code snapshot ({len(files_to_save)} files) to {code_snap_dir}")
for file in files_to_save:
fn=file.filename; content=file.content; safe_fn=os.path.basename(fn)
if not safe_fn or ('/' in fn and '..' in fn) or ('\\' in fn and '..' in fn): logger.warning(f"Skip unsafe file: {fn}"); continue
rel_path=fn.lstrip('/\\'); filepath=os.path.normpath(os.path.join(code_snap_dir, rel_path))
if not os.path.abspath(filepath).startswith(os.path.abspath(code_snap_dir)): logger.warning(f"Skip traversal: {fn}"); continue
try:
os.makedirs(os.path.dirname(filepath), exist_ok=True);
with open(filepath, "w", encoding="utf-8") as f: f.write(content)
logger.debug(f"Saved final code file: {filepath}")
except OSError as path_err: logger.error(f"Path error saving final code '{filepath}': {path_err}")
except Exception as write_err: logger.error(f"Error writing final code '{filepath}': {write_err}")
try: # Save instructions
instr_path = os.path.join(code_snap_dir, "instructions.md")
with open(instr_path,"w",encoding="utf-8") as f: f.write(instructions)
logger.debug(f"Saved final instructions: {instr_path}")
except Exception as instr_err: logger.error(f"Error writing final instructions: {instr_err}")
else: logger.warning("No final code files found to save with QA report.")
except Exception as e:
logger.error(f"Failed saving QA outputs: {e}", exc_info=True);
state["final_quality_analysis_path"]=None; state["final_code_folder"]=None
return state
# --- Deployment Cycle ---
@with_retry
def generate_initial_deployment(state: MainState, prefs: str) -> MainState:
"""Generates initial deployment plan."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
func_name = "generate_initial_deployment"
final_code = state.get("final_code_files", [])
if not final_code: logger.error(f"No final code for {func_name}"); raise ValueError("Final code missing.")
instructions = state.get("code_current", GeneratedCode(files=[], instructions="")).instructions
# --- CORRECTED LOOP ---
code_str_parts = []; total_len = 0; max_code_len = 25000
files_to_process = final_code
if not files_to_process: logger.warning(f"No files for {func_name}"); code_context = "No code files."
else:
for file in files_to_process:
is_key_file = ("requirements" in file.filename.lower() or "dockerfile" in file.filename.lower() or "main." in file.filename.lower() or "app." in file.filename.lower() or ".env" in file.filename.lower() or "config" in file.filename.lower())
if is_key_file:
header = f"--- {file.filename} ---\n"; remaining_len = max_code_len - total_len - len(header)
if remaining_len <= 0: code_str_parts.append("\n*... (Key file context truncated)*"); logger.debug(f"Key file context truncated for {func_name}"); break
snippet = file.content[:remaining_len]; is_truncated = len(file.content) > remaining_len
code_str_parts.append(header + snippet + ('...' if is_truncated else '')); total_len += len(header) + len(snippet)
if total_len >= max_code_len:
if not code_str_parts[-1].endswith("truncated)*"): code_str_parts.append("\n*... (Key file context truncated)*")
logger.debug(f"Key file context max length for {func_name}"); break
code_context = "\n".join(code_str_parts) if code_str_parts else "No key deployment files found."
# --- END CORRECTED LOOP ---
prompt = f"Act as DevOps. Generate detailed deployment plan for '{state['project']}' ({state['coding_language']}). Base on user prefs, code structure (reqs, docker). Include commands, examples, verification steps.\nPrefs:\n{prefs}\nCode Context (Key Files):\n{code_context}\nInstr:\n{instructions}\n---"
response = llm.invoke(prompt) # Use LLM from state
deployment_plan = response.content.strip()
state["deployment_current_process"] = deployment_plan
state["messages"].append(AIMessage(content=f"Initial Deployment Plan:\n{deployment_plan}"))
logger.info("Generated initial deployment plan.")
return state
@with_retry
def generate_deployment_feedback(state: MainState) -> MainState:
"""Generates AI feedback on deployment plan."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
current_plan = state.get('deployment_current_process', 'N/A')
if current_plan == 'N/A': logger.warning("No deploy plan to review."); state["deployment_feedback"] = "No plan."; return state
prompt = f"Review Deployment Plan for '{state['project']}'. Assess clarity, correctness, completeness, security, alignment with practices.\nPlan:\n{current_plan}\n---\nSuggest improvements."
response = llm.invoke(prompt) # Use LLM from state
feedback = response.content.strip()
state["deployment_feedback"] = feedback
state["messages"].append(AIMessage(content=f"Deployment Plan Feedback:\n{feedback}"))
logger.info("Generated feedback on deployment plan.")
return state
@with_retry
def refine_deployment(state: MainState) -> MainState:
"""Refines deployment plan based on feedback."""
llm = state.get('llm_instance')
if not llm: raise ConnectionError("LLM instance not found in state.")
if 'messages' not in state: state['messages'] = []
func_name = "refine_deployment"
current_plan = state.get('deployment_current_process', 'N/A'); ai_feedback = state.get('deployment_feedback', 'N/A'); human_feedback = state.get('deployment_human_feedback', 'N/A')
# --- ADDED LOOP ---
code_str_parts = []; total_len = 0; max_code_len = 25000
final_code = state.get("final_code_files", []); instructions = state.get("code_current", GeneratedCode(files=[], instructions="")).instructions
files_to_process = final_code
if not files_to_process: logger.warning(f"No files for {func_name}"); code_context = "No code files."
else:
for file in files_to_process:
is_key_file = ("requirements" in file.filename.lower() or "dockerfile" in file.filename.lower() or "main." in file.filename.lower() or "app." in file.filename.lower() or ".env" in file.filename.lower() or "config" in file.filename.lower())
if is_key_file:
header = f"--- {file.filename} ---\n"; remaining_len = max_code_len - total_len - len(header)
if remaining_len <= 0: code_str_parts.append("\n*... (Key file context truncated)*"); logger.debug(f"Key file context truncated for {func_name}"); break
snippet = file.content[:remaining_len]; is_truncated = len(file.content) > remaining_len
code_str_parts.append(header + snippet + ('...' if is_truncated else '')); total_len += len(header) + len(snippet)
if total_len >= max_code_len:
if not code_str_parts[-1].endswith("truncated)*"): code_str_parts.append("\n*... (Key file context truncated)*")
logger.debug(f"Key file context max length for {func_name}"); break
code_context = "\n".join(code_str_parts) if code_str_parts else "No key files."
# --- END ADDED LOOP ---
prompt = f"Refine deployment plan for '{state['project']}'. Update based on feedback.\nCurrent Plan:\n{current_plan}\nCode Context:\n{code_context}\nInstr:\n{instructions}\nAI FB:\n{ai_feedback}\nHuman FB:\n{human_feedback}\n---\nGenerate updated plan."
response = llm.invoke(prompt) # Use LLM from state
refined_plan = response.content.strip()
state["deployment_current_process"] = refined_plan
state["messages"].append(AIMessage(content=f"Refined Deployment Plan:\n{refined_plan}"))
logger.info("Refined deployment plan.")
return state
# save_final_deployment_plan remains unchanged
def save_final_deployment_plan(state: MainState) -> MainState:
"""Saves the final deployment plan."""
state["final_deployment_process"] = state.get("deployment_current_process", "No deployment plan generated.")
filepath = None
try:
abs_project_folder = os.path.abspath(state["project_folder"])
deploy_dir = os.path.join(abs_project_folder, "9_deployment"); os.makedirs(deploy_dir, exist_ok=True)
filepath = os.path.join(deploy_dir, "final_deployment_plan.md")
with open(filepath, "w", encoding="utf-8") as f: f.write(state["final_deployment_process"])
logger.info(f"Saved final deployment plan: {filepath}")
except Exception as e: logger.error(f"Failed save deployment plan: {e}", exc_info=True); filepath=None
state["final_deployment_path"] = filepath
return state
# --- END OF SDLC.py ---