SOMA-AGI / app-backup2.py
aiqtech's picture
Rename app.py to app-backup2.py
9f4be8a verified
raw
history blame
37.9 kB
import gradio as gr
import os
import json
import requests
from datetime import datetime
import time
from typing import List, Dict, Any, Generator, Tuple
import logging
import re
# 로깅 설정
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 환경 변수에서 토큰 가져오기
FRIENDLI_TOKEN = os.getenv("FRIENDLI_TOKEN", "YOUR_FRIENDLI_TOKEN")
BAPI_TOKEN = os.getenv("BAPI_TOKEN", "YOUR_BRAVE_API_TOKEN")
API_URL = "https://api.friendli.ai/dedicated/v1/chat/completions"
BRAVE_SEARCH_URL = "https://api.search.brave.com/res/v1/web/search"
MODEL_ID = "dep89a2fld32mcm"
TEST_MODE = os.getenv("TEST_MODE", "false").lower() == "true"
# 전역 변수
conversation_history = []
class LLMCollaborativeSystem:
def __init__(self):
self.token = FRIENDLI_TOKEN
self.bapi_token = BAPI_TOKEN
self.api_url = API_URL
self.brave_url = BRAVE_SEARCH_URL
self.model_id = MODEL_ID
self.test_mode = TEST_MODE or (self.token == "YOUR_FRIENDLI_TOKEN")
if self.test_mode:
logger.warning("테스트 모드로 실행됩니다.")
if self.bapi_token == "YOUR_BRAVE_API_TOKEN":
logger.warning("Brave API 토큰이 설정되지 않았습니다.")
def create_headers(self):
"""API 헤더 생성"""
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
def create_brave_headers(self):
"""Brave API 헤더 생성"""
return {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"X-Subscription-Token": self.bapi_token
}
def create_supervisor_initial_prompt(self, user_query: str) -> str:
"""감독자 AI 초기 프롬프트 생성"""
return f"""당신은 거시적 관점에서 분석하고 지도하는 감독자 AI입니다.
사용자 질문: {user_query}
이 질문에 대해:
1. 전체적인 접근 방향과 프레임워크를 제시하세요
2. 핵심 요소와 고려사항을 구조화하여 설명하세요
3. 이 주제에 대해 조사가 필요한 5-7개의 구체적인 키워드나 검색어를 제시하세요
키워드는 다음 형식으로 제시하세요:
[검색 키워드]: 키워드1, 키워드2, 키워드3, 키워드4, 키워드5"""
def create_researcher_prompt(self, user_query: str, supervisor_guidance: str, search_results: Dict[str, List[Dict]]) -> str:
"""조사자 AI 프롬프트 생성"""
search_summary = ""
for keyword, results in search_results.items():
search_summary += f"\n\n**{keyword}에 대한 검색 결과:**\n"
for i, result in enumerate(results[:3], 1):
search_summary += f"{i}. {result.get('title', 'N/A')}\n"
search_summary += f" - {result.get('description', 'N/A')}\n"
search_summary += f" - 출처: {result.get('url', 'N/A')}\n"
return f"""당신은 정보를 조사하고 정리하는 조사자 AI입니다.
사용자 질문: {user_query}
감독자 AI의 지침:
{supervisor_guidance}
브레이브 검색 결과:
{search_summary}
위 검색 결과를 바탕으로:
1. 각 키워드별로 중요한 정보를 정리하세요
2. 신뢰할 수 있는 출처를 명시하세요
3. 실행자 AI가 활용할 수 있는 구체적인 데이터와 사실을 추출하세요
4. 최신 트렌드나 중요한 통계가 있다면 강조하세요"""
def create_supervisor_execution_prompt(self, user_query: str, research_summary: str) -> str:
"""감독자 AI의 실행 지시 프롬프트"""
return f"""당신은 거시적 관점에서 분석하고 지도하는 감독자 AI입니다.
사용자 질문: {user_query}
조사자 AI가 정리한 조사 내용:
{research_summary}
위 조사 내용을 기반으로 실행자 AI에게 아주 구체적인 지시를 내려주세요:
1. 조사된 정보를 어떻게 활용할지 명확히 지시하세요
2. 실행 가능한 단계별 작업을 구체적으로 제시하세요
3. 각 단계에서 참고해야 할 조사 내용을 명시하세요
4. 예상되는 결과물의 형태를 구체적으로 설명하세요"""
def create_executor_prompt(self, user_query: str, supervisor_guidance: str, research_summary: str) -> str:
"""실행자 AI 프롬프트 생성"""
return f"""당신은 세부적인 내용을 구현하는 실행자 AI입니다.
사용자 질문: {user_query}
조사자 AI가 정리한 조사 내용:
{research_summary}
감독자 AI의 구체적인 지시:
{supervisor_guidance}
위 조사 내용과 지시사항을 바탕으로:
1. 조사된 정보를 적극 활용하여 구체적인 실행 계획을 작성하세요
2. 각 단계별로 참고한 조사 내용을 명시하세요
3. 실제로 적용 가능한 구체적인 방법론을 제시하세요
4. 예상되는 성과와 측정 방법을 포함하세요"""
def create_executor_final_prompt(self, user_query: str, initial_response: str, supervisor_feedback: str, research_summary: str) -> str:
"""실행자 AI 최종 보고서 프롬프트"""
return f"""당신은 세부적인 내용을 구현하는 실행자 AI입니다.
사용자 질문: {user_query}
조사자 AI의 조사 내용:
{research_summary}
당신의 초기 답변:
{initial_response}
감독자 AI의 피드백 및 개선사항:
{supervisor_feedback}
위 피드백을 완전히 반영하여 최종 보고서를 작성하세요:
1. 감독자의 모든 개선사항을 반영하세요
2. 조사 내용을 더욱 구체적으로 활용하세요
3. 실행 가능성을 높이는 세부 계획을 포함하세요
4. 명확한 결론과 다음 단계를 제시하세요
5. 전문적이고 완성도 높은 최종 보고서 형식으로 작성하세요"""
def extract_keywords(self, supervisor_response: str) -> List[str]:
"""감독자 응답에서 키워드 추출"""
keywords = []
# [검색 키워드]: 형식으로 키워드 찾기
keyword_match = re.search(r'\[검색 키워드\]:\s*(.+)', supervisor_response, re.IGNORECASE)
if keyword_match:
keyword_str = keyword_match.group(1)
keywords = [k.strip() for k in keyword_str.split(',') if k.strip()]
# 키워드가 없으면 기본 키워드 생성
if not keywords:
keywords = ["best practices", "implementation guide", "case studies", "latest trends", "success factors"]
return keywords[:7] # 최대 7개로 제한
def brave_search(self, query: str) -> List[Dict]:
"""Brave Search API 호출"""
if self.test_mode or self.bapi_token == "YOUR_BRAVE_API_TOKEN":
# 테스트 모드에서는 시뮬레이션된 결과 반환
return [
{
"title": f"Best Practices for {query}",
"description": f"Comprehensive guide on implementing {query} with proven methodologies and real-world examples.",
"url": f"https://example.com/{query.replace(' ', '-')}"
},
{
"title": f"Latest Trends in {query}",
"description": f"Analysis of current trends and future directions in {query}, including market insights and expert opinions.",
"url": f"https://trends.example.com/{query.replace(' ', '-')}"
},
{
"title": f"{query}: Case Studies and Success Stories",
"description": f"Real-world implementations of {query} across various industries with measurable results.",
"url": f"https://casestudies.example.com/{query.replace(' ', '-')}"
}
]
try:
params = {
"q": query,
"count": 5,
"safesearch": "moderate",
"freshness": "pw" # Past week for recent results
}
response = requests.get(
self.brave_url,
headers=self.create_brave_headers(),
params=params,
timeout=10
)
if response.status_code == 200:
data = response.json()
results = []
for item in data.get("web", {}).get("results", [])[:5]:
results.append({
"title": item.get("title", ""),
"description": item.get("description", ""),
"url": item.get("url", "")
})
return results
else:
logger.error(f"Brave API 오류: {response.status_code}")
return []
except Exception as e:
logger.error(f"Brave 검색 중 오류: {str(e)}")
return []
def simulate_streaming(self, text: str, role: str) -> Generator[str, None, None]:
"""테스트 모드에서 스트리밍 시뮬레이션"""
words = text.split()
for i in range(0, len(words), 3):
chunk = " ".join(words[i:i+3])
yield chunk + " "
time.sleep(0.05)
def call_llm_streaming(self, messages: List[Dict[str, str]], role: str) -> Generator[str, None, None]:
"""스트리밍 LLM API 호출"""
# 테스트 모드
if self.test_mode:
logger.info(f"테스트 모드 스트리밍 - Role: {role}")
test_responses = {
"supervisor_initial": """이 질문에 대한 거시적 분석을 제시하겠습니다.
1. **핵심 개념 파악**
- 질문의 본질적 요소를 심층 분석합니다
- 관련된 주요 이론과 원칙을 검토합니다
- 다양한 관점에서의 접근 방법을 고려합니다
2. **전략적 접근 방향**
- 체계적이고 단계별 해결 방안을 수립합니다
- 장단기 목표를 명확히 설정합니다
- 리스크 요인과 대응 방안을 마련합니다
3. **기대 효과와 과제**
- 예상되는 긍정적 성과를 분석합니다
- 잠재적 도전 과제를 식별합니다
- 지속가능한 발전 방향을 제시합니다
[검색 키워드]: machine learning optimization, performance improvement strategies, model efficiency techniques, hyperparameter tuning best practices, latest ML trends 2024""",
"researcher": """조사 결과를 종합하여 다음과 같이 정리했습니다.
**1. Machine Learning Optimization**
- 최신 연구에 따르면 모델 최적화의 핵심은 아키텍처 설계와 훈련 전략의 균형입니다
- AutoML 도구들이 하이퍼파라미터 튜닝을 자동화하여 효율성을 크게 향상시킵니다
- 출처: ML Conference 2024, Google Research
**2. Performance Improvement Strategies**
- 데이터 품질 개선이 모델 성능 향상의 80%를 차지한다는 연구 결과
- 앙상블 기법과 전이학습이 주요 성능 개선 방법으로 입증됨
- 벤치마크: ImageNet에서 95% 이상의 정확도 달성 사례
**3. Model Efficiency Techniques**
- 모델 경량화(Pruning, Quantization)로 추론 속도 10배 향상 가능
- Knowledge Distillation으로 모델 크기 90% 감소, 성능 유지
- 최신 트렌드: Efficient Transformers, Neural Architecture Search
**4. 실제 적용 사례**
- Netflix: 추천 시스템 개선으로 사용자 만족도 35% 향상
- Tesla: 실시간 객체 인식 속도 50% 개선
- OpenAI: GPT 모델 효율성 개선으로 비용 70% 절감""",
"supervisor_execution": """조사 내용을 바탕으로 실행자 AI에게 다음과 같이 구체적으로 지시합니다.
**1단계: 현재 모델 진단 (1주차)**
- 조사된 벤치마크 기준으로 현재 모델 성능 평가
- Netflix 사례를 참고하여 주요 병목 지점 식별
- AutoML 도구를 활용한 초기 최적화 가능성 탐색
**2단계: 데이터 품질 개선 (2-3주차)**
- 조사 결과의 "80% 규칙"에 따라 데이터 정제 우선 실행
- 데이터 증강 기법 적용 (조사된 최신 기법 활용)
- A/B 테스트로 개선 효과 측정
**3단계: 모델 최적화 구현 (4-6주차)**
- Knowledge Distillation 적용하여 모델 경량화
- 조사된 Pruning 기법으로 추론 속도 개선
- Tesla 사례의 실시간 처리 최적화 기법 벤치마킹
**4단계: 성과 검증 및 배포 (7-8주차)**
- OpenAI 사례의 비용 절감 지표 적용
- 조사된 성능 지표로 개선율 측정
- 단계적 배포 전략 수립""",
"executor": """감독자의 지시와 조사 내용을 기반으로 구체적인 실행 계획을 수립합니다.
**1단계: 현재 모델 진단 (1주차)**
- 월요일-화요일: MLflow를 사용한 현재 모델 메트릭 수집
* 조사 결과 참고: Netflix가 사용한 핵심 지표 (정확도, 지연시간, 처리량)
- 수요일-목요일: AutoML 도구 (Optuna, Ray Tune) 설정 및 초기 실행
* 조사된 best practice에 따라 search space 정의
- 금요일: 진단 보고서 작성 및 개선 우선순위 결정
**2단계: 데이터 품질 개선 (2-3주차)**
- 데이터 정제 파이프라인 구축
* 조사 결과의 "80% 규칙" 적용: 누락값, 이상치, 레이블 오류 처리
* 코드 예시: `data_quality_pipeline.py` 구현
- 데이터 증강 구현
* 최신 기법 적용: MixUp, CutMix, AutoAugment
* 검증 데이터셋으로 효과 측정 (목표: 15% 성능 향상)
**3단계: 모델 최적화 구현 (4-6주차)**
- Knowledge Distillation 구현
* Teacher 모델: 현재 대규모 모델
* Student 모델: 90% 작은 크기 목표 (조사 결과 기반)
* 구현 프레임워크: PyTorch/TensorFlow
- Pruning 및 Quantization 적용
* 구조적 pruning으로 50% 파라미터 제거
* INT8 quantization으로 추가 4배 속도 향상
* Tesla 사례 참고: TensorRT 최적화 적용
**4단계: 성과 검증 및 배포 (7-8주차)**
- 성과 지표 측정
* 추론 속도: 목표 10배 향상 (조사 결과 기반)
* 정확도 손실: 최대 2% 이내 유지
* 비용 절감: 70% 목표 (OpenAI 사례 참고)
- 배포 전략
* A/B 테스트: 10% 트래픽으로 시작
* 모니터링: Prometheus + Grafana 대시보드
* 롤백 계획: 성능 저하 시 자동 롤백
**예상 결과물**
- 최적화된 모델 (크기 90% 감소, 속도 10배 향상)
- 상세 성능 벤치마크 보고서
- 프로덕션 배포 가이드 및 모니터링 대시보드
- 재현 가능한 최적화 파이프라인 코드""",
"supervisor_review": """실행자 AI의 계획을 검토한 결과, 조사 내용이 잘 반영되었습니다. 다음과 같은 개선사항을 제안합니다.
**강점**
- 조사된 사례들(Netflix, Tesla, OpenAI)이 각 단계에 적절히 활용됨
- 구체적인 도구와 기법이 명시되어 실행 가능성이 높음
- 측정 가능한 목표가 조사 결과를 기반으로 설정됨
**개선 필요사항**
1. **리스크 관리 강화**
- 각 단계별 실패 시나리오와 대응 방안 추가 필요
- 기술적 문제 발생 시 백업 계획 수립
2. **비용 분석 구체화**
- OpenAI 사례의 70% 절감을 위한 구체적인 비용 계산
- ROI 분석 및 투자 대비 효과 측정 방법
3. **팀 협업 체계화**
- 데이터 과학자, ML 엔지니어, DevOps 간 역할 분담 명확화
- 주간 진행 상황 공유 및 이슈 트래킹 프로세스
**추가 권장사항**
- 최신 연구 동향 모니터링 체계 구축
- 경쟁사 벤치마킹을 위한 정기적인 조사 프로세스
- 내부 지식 공유를 위한 문서화 및 세미나 계획
- 실패 사례에서 배운 교훈을 축적하는 시스템 구축""",
"executor_final": """감독자 AI의 피드백을 완전히 반영하여 최종 실행 보고서를 작성합니다.
# 🎯 기계학습 모델 성능 향상 최종 실행 보고서
## 📋 Executive Summary
본 보고서는 웹 검색을 통해 수집된 최신 사례와 감독자 AI의 전략적 지침을 바탕으로, 8주간의 체계적인 모델 최적화 프로젝트를 제시합니다. 목표는 모델 크기 90% 감소, 추론 속도 10배 향상, 운영 비용 70% 절감입니다.
## 📊 1단계: 현재 모델 진단 및 베이스라인 설정 (1주차)
### 실행 계획
**월-화요일: 성능 메트릭 수집**
- MLflow를 통한 현재 모델 전체 분석
- Netflix 사례 기반 핵심 지표: 정확도(92%), 지연시간(45ms), 처리량(1,000 req/s)
- 리소스 사용량: GPU 메모리 8GB, 추론 시 CPU 사용률 85%
**수-목요일: AutoML 초기 탐색**
- Optuna로 하이퍼파라미터 최적화 (200회 시도)
- Ray Tune으로 분산 학습 환경 구축
- 초기 개선 가능성: 15-20% 성능 향상 예상
**금요일: 진단 보고서 및 리스크 분석**
- 주요 병목: 모델 크기(2.5GB), 배치 처리 비효율성
- 리스크: 데이터 드리프트, 하드웨어 제약
- 백업 계획: 클라우드 GPU 인스턴스 확보
### 예상 산출물
- 상세 성능 베이스라인 문서
- 개선 기회 우선순위 매트릭스
- 리스크 레지스터 및 대응 계획
## 📊 2단계: 데이터 품질 개선 (2-3주차)
### 실행 계획
**2주차: 데이터 정제 파이프라인**
```python
# data_quality_pipeline.py 주요 구성
class DataQualityPipeline:
def __init__(self):
self.validators = [
MissingValueHandler(threshold=0.05),
OutlierDetector(method='isolation_forest'),
LabelConsistencyChecker(),
DataDriftMonitor()
]
def process(self, data):
# 80% 규칙 적용: 데이터 품질이 성능의 80% 결정
for validator in self.validators:
data = validator.transform(data)
self.log_metrics(validator.get_stats())
return data
```
**3주차: 고급 데이터 증강**
- MixUp: 15% 정확도 향상 예상
- CutMix: 경계 검출 성능 20% 개선
- AutoAugment: 자동 최적 증강 정책 탐색
- A/B 테스트: 각 기법별 효과 측정
### 리스크 대응
- 데이터 품질 저하 시: 롤백 메커니즘 구현
- 증강 과적합 방지: 검증셋 분리 및 교차 검증
### 예상 산출물
- 자동화된 데이터 품질 파이프라인
- 데이터 품질 대시보드 (Grafana)
- 15% 이상 성능 향상 검증 보고서
## 📊 3단계: 모델 최적화 구현 (4-6주차)
### 실행 계획
**4-5주차: Knowledge Distillation**
- Teacher 모델: 현재 2.5GB 모델
- Student 모델 아키텍처:
* 파라미터 수: 250M → 25M (90% 감소)
* 레이어 수: 24 → 6
* Hidden dimension: 1024 → 256
- 훈련 전략:
* Temperature: 5.0
* Alpha (KD loss weight): 0.7
* 훈련 에폭: 50
**6주차: Pruning & Quantization**
- 구조적 Pruning:
* Magnitude 기반 50% 채널 제거
* Fine-tuning: 10 에폭
- INT8 Quantization:
* Post-training quantization
* Calibration dataset: 1,000 샘플
- TensorRT 최적화 (Tesla 사례 적용):
* FP16 추론 활성화
* 동적 배치 최적화
### 팀 협업 체계
- ML 엔지니어: 모델 아키텍처 및 훈련
- DevOps: 인프라 및 배포 파이프라인
- 데이터 과학자: 성능 분석 및 검증
- 주간 스탠드업 미팅 및 Jira 이슈 트래킹
### 예상 산출물
- 최적화된 모델 체크포인트
- 성능 벤치마크 상세 보고서
- 모델 변환 자동화 스크립트
## 📊 4단계: 성과 검증 및 프로덕션 배포 (7-8주차)
### 실행 계획
**7주차: 종합 성능 검증**
- 성능 지표 달성도:
* 추론 속도: 45ms → 4.5ms (10배 향상) ✓
* 모델 크기: 2.5GB → 250MB (90% 감소) ✓
* 정확도 손실: 92% → 90.5% (1.5% 손실) ✓
- 비용 분석:
* GPU 인스턴스: $2,000/월 → $600/월
* 처리량 증가로 인한 서버 수 감소: 10대 → 3대
* 총 비용 절감: 70% 달성 ✓
**8주차: 단계적 배포**
- Canary 배포:
* 1일차: 1% 트래픽
* 3일차: 10% 트래픽
* 7일차: 50% 트래픽
* 14일차: 100% 전환
- 모니터링 설정:
* Prometheus + Grafana 대시보드
* 알림 임계값: 지연시간 >10ms, 오류율 >0.1%
- 롤백 계획:
* 자동 롤백 트리거 설정
* Blue-Green 배포로 즉시 전환 가능
### ROI 분석
- 초기 투자: $50,000 (인건비 + 인프라)
- 월간 절감액: $14,000
- 투자 회수 기간: 3.6개월
- 1년 순이익: $118,000
### 예상 산출물
- 프로덕션 배포 완료
- 실시간 모니터링 대시보드
- ROI 분석 보고서
- 운영 가이드 문서
## 📈 지속적 개선 계획
### 모니터링 및 유지보수
- 월간 성능 리뷰 미팅
- 분기별 재훈련 계획
- 신기술 도입 검토 (Sparse Models, MoE)
### 지식 공유
- 내부 기술 세미나 (월 1회)
- 외부 컨퍼런스 발표 준비
- 오픈소스 기여 계획
### 차기 프로젝트
- 엣지 디바이스 배포 최적화
- 연합 학습(Federated Learning) 도입
- AutoML 플랫폼 구축
## 📝 결론
본 프로젝트는 최신 연구 결과와 업계 베스트 프랙티스를 적용하여, 8주 만에 모델 성능을 획기적으로 개선하고 운영 비용을 70% 절감하는 성과를 달성할 것으로 예상됩니다. 체계적인 접근과 리스크 관리, 그리고 지속적인 개선 계획을 통해 장기적인 경쟁력을 확보할 수 있습니다.
---
*작성일: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*
*작성자: 협력적 AI 시스템 (감독자, 조사자, 실행자 AI)*"""
}
# 프롬프트 내용에 따라 적절한 응답 선택
if role == "supervisor" and "조사자 AI가 정리한" in messages[0]["content"]:
response = test_responses["supervisor_execution"]
elif role == "supervisor" and messages[0]["content"].find("실행자 AI의 답변") > -1:
response = test_responses["supervisor_review"]
elif role == "supervisor":
response = test_responses["supervisor_initial"]
elif role == "researcher":
response = test_responses["researcher"]
elif role == "executor" and "최종 보고서" in messages[0]["content"]:
response = test_responses["executor_final"]
else:
response = test_responses["executor"]
yield from self.simulate_streaming(response, role)
return
# 실제 API 호출
try:
system_prompts = {
"supervisor": "당신은 거시적 관점에서 분석하고 지도하는 감독자 AI입니다.",
"researcher": "당신은 정보를 조사하고 체계적으로 정리하는 조사자 AI입니다.",
"executor": "당신은 세부적인 내용을 구현하는 실행자 AI입니다."
}
full_messages = [
{"role": "system", "content": system_prompts.get(role, "")},
*messages
]
payload = {
"model": self.model_id,
"messages": full_messages,
"max_tokens": 2048,
"temperature": 0.7,
"top_p": 0.8,
"stream": True,
"stream_options": {"include_usage": True}
}
logger.info(f"API 스트리밍 호출 시작 - Role: {role}")
response = requests.post(
self.api_url,
headers=self.create_headers(),
json=payload,
stream=True,
timeout=10
)
if response.status_code != 200:
logger.error(f"API 오류: {response.status_code}")
yield f"❌ API 오류 ({response.status_code}): {response.text[:200]}"
return
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith("data: "):
data = line[6:]
if data == "[DONE]":
break
try:
chunk = json.loads(data)
if "choices" in chunk and chunk["choices"]:
content = chunk["choices"][0].get("delta", {}).get("content", "")
if content:
yield content
except json.JSONDecodeError:
continue
except requests.exceptions.Timeout:
yield "⏱️ API 호출 시간이 초과되었습니다. 다시 시도해주세요."
except requests.exceptions.ConnectionError:
yield "🔌 API 서버에 연결할 수 없습니다. 인터넷 연결을 확인해주세요."
except Exception as e:
logger.error(f"스트리밍 중 오류: {str(e)}")
yield f"❌ 오류 발생: {str(e)}"
# 시스템 인스턴스 생성
llm_system = LLMCollaborativeSystem()
# 내부 히스토리 관리 (UI에는 표시하지 않음)
internal_history = []
def process_query_streaming(user_query: str):
"""스트리밍을 지원하는 쿼리 처리"""
global internal_history
if not user_query:
return "", "", "", "", "❌ 질문을 입력해주세요."
conversation_log = []
all_responses = {"supervisor": [], "researcher": [], "executor": []}
try:
# 1단계: 감독자 AI 초기 분석 및 키워드 추출
supervisor_prompt = llm_system.create_supervisor_initial_prompt(user_query)
supervisor_initial_response = ""
supervisor_text = "[초기 분석] 🔄 생성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": supervisor_prompt}],
"supervisor"
):
supervisor_initial_response += chunk
supervisor_text = f"[초기 분석] - {datetime.now().strftime('%H:%M:%S')}\n{supervisor_initial_response}"
yield supervisor_text, "", "", "", "🔄 감독자 AI가 분석 중..."
all_responses["supervisor"].append(supervisor_initial_response)
# 키워드 추출
keywords = llm_system.extract_keywords(supervisor_initial_response)
logger.info(f"추출된 키워드: {keywords}")
# 2단계: 브레이브 검색 수행
researcher_text = "[웹 검색] 🔍 검색 중...\n"
yield supervisor_text, researcher_text, "", "", "🔍 웹 검색 수행 중..."
search_results = {}
for keyword in keywords:
results = llm_system.brave_search(keyword)
if results:
search_results[keyword] = results
researcher_text += f"✓ '{keyword}' 검색 완료\n"
yield supervisor_text, researcher_text, "", "", f"🔍 '{keyword}' 검색 중..."
# 3단계: 조사자 AI가 검색 결과 정리
researcher_prompt = llm_system.create_researcher_prompt(user_query, supervisor_initial_response, search_results)
researcher_response = ""
researcher_text = "[조사 결과 정리] 🔄 생성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": researcher_prompt}],
"researcher"
):
researcher_response += chunk
researcher_text = f"[조사 결과 정리] - {datetime.now().strftime('%H:%M:%S')}\n{researcher_response}"
yield supervisor_text, researcher_text, "", "", "📝 조사자 AI가 정리 중..."
all_responses["researcher"].append(researcher_response)
# 4단계: 감독자 AI가 조사 내용 기반으로 실행 지시
supervisor_execution_prompt = llm_system.create_supervisor_execution_prompt(user_query, researcher_response)
supervisor_execution_response = ""
supervisor_text += "\n\n---\n\n[실행 지시] 🔄 생성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": supervisor_execution_prompt}],
"supervisor"
):
supervisor_execution_response += chunk
temp_text = f"{all_responses['supervisor'][0]}\n\n---\n\n[실행 지시] - {datetime.now().strftime('%H:%M:%S')}\n{supervisor_execution_response}"
supervisor_text = f"[초기 분석] - {datetime.now().strftime('%H:%M:%S')}\n{temp_text}"
yield supervisor_text, researcher_text, "", "", "🎯 감독자 AI가 지시 중..."
all_responses["supervisor"].append(supervisor_execution_response)
# 5단계: 실행자 AI가 조사 내용과 지시를 기반으로 초기 구현
executor_prompt = llm_system.create_executor_prompt(user_query, supervisor_execution_response, researcher_response)
executor_response = ""
executor_text = "[초기 구현] 🔄 생성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": executor_prompt}],
"executor"
):
executor_response += chunk
executor_text = f"[초기 구현] - {datetime.now().strftime('%H:%M:%S')}\n{executor_response}"
yield supervisor_text, researcher_text, executor_text, "", "🔧 실행자 AI가 구현 중..."
all_responses["executor"].append(executor_response)
# 6단계: 감독자 AI 검토 및 피드백
review_prompt = f"""당신은 거시적 관점에서 분석하고 지도하는 감독자 AI입니다.
사용자 질문: {user_query}
실행자 AI의 답변:
{executor_response}
이 답변을 검토하고 개선점과 추가 고려사항을 제시해주세요. 구체적이고 실행 가능한 개선 방안을 제시하세요."""
review_response = ""
supervisor_text = f"[초기 분석] - {datetime.now().strftime('%H:%M:%S')}\n{all_responses['supervisor'][0]}\n\n---\n\n[실행 지시] - {datetime.now().strftime('%H:%M:%S')}\n{all_responses['supervisor'][1]}\n\n---\n\n[검토 및 피드백] 🔄 생성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": review_prompt}],
"supervisor"
):
review_response += chunk
temp_text = f"{all_responses['supervisor'][0]}\n\n---\n\n[실행 지시] - {datetime.now().strftime('%H:%M:%S')}\n{all_responses['supervisor'][1]}\n\n---\n\n[검토 및 피드백] - {datetime.now().strftime('%H:%M:%S')}\n{review_response}"
supervisor_text = f"[초기 분석] - {datetime.now().strftime('%H:%M:%S')}\n{temp_text}"
yield supervisor_text, researcher_text, executor_text, "", "🔄 감독자 AI가 검토 중..."
all_responses["supervisor"].append(review_response)
# 7단계: 실행자 AI 최종 보고서 (피드백 반영)
final_executor_prompt = llm_system.create_executor_final_prompt(
user_query,
executor_response,
review_response,
researcher_response
)
final_executor_response = ""
executor_text += "\n\n---\n\n[최종 보고서] 🔄 작성 중...\n"
for chunk in llm_system.call_llm_streaming(
[{"role": "user", "content": final_executor_prompt}],
"executor"
):
final_executor_response += chunk
temp_text = f"[초기 구현] - {datetime.now().strftime('%H:%M:%S')}\n{all_responses['executor'][0]}\n\n---\n\n[최종 보고서] - {datetime.now().strftime('%H:%M:%S')}\n{final_executor_response}"
executor_text = temp_text
yield supervisor_text, researcher_text, executor_text, "", "📄 최종 보고서 작성 중..."
all_responses["executor"].append(final_executor_response)
# 최종 결과 생성 (최종 보고서를 메인으로)
final_summary = f"""## 🎯 최종 종합 보고서
### 📌 사용자 질문
{user_query}
### 📄 최종 보고서 (실행자 AI - 피드백 반영)
{final_executor_response}
---
<details>
<summary>📋 전체 협력 과정 보기</summary>
#### 🔍 거시적 분석 (감독자 AI)
{all_responses['supervisor'][0]}
#### 📚 조사 결과 (조사자 AI)
{researcher_response}
#### 🎯 실행 지시 (감독자 AI)
{all_responses['supervisor'][1]}
#### 💡 초기 구현 (실행자 AI)
{executor_response}
#### ✨ 검토 및 개선사항 (감독자 AI)
{review_response}
</details>
---
*이 보고서는 웹 검색을 통한 최신 정보와 AI들의 협력, 그리고 피드백 반영을 통해 작성되었습니다.*"""
# 내부 히스토리 업데이트 (UI에는 표시하지 않음)
internal_history.append((user_query, final_summary))
yield supervisor_text, researcher_text, executor_text, final_summary, "✅ 최종 보고서 완성!"
except Exception as e:
error_msg = f"❌ 처리 중 오류: {str(e)}"
yield "", "", "", error_msg, error_msg
def clear_all():
"""모든 내용 초기화"""
global internal_history
internal_history = []
return "", "", "", "", "🔄 초기화되었습니다."
# Gradio 인터페이스
css = """
.gradio-container {
font-family: 'Arial', sans-serif;
}
.supervisor-box textarea {
border-left: 4px solid #667eea !important;
padding-left: 10px !important;
}
.researcher-box textarea {
border-left: 4px solid #10b981 !important;
padding-left: 10px !important;
}
.executor-box textarea {
border-left: 4px solid #764ba2 !important;
padding-left: 10px !important;
}
"""
with gr.Blocks(title="협력적 LLM 시스템", theme=gr.themes.Soft(), css=css) as app:
# 입력 섹션
with gr.Row():
with gr.Column():
user_input = gr.Textbox(
label="질문 입력",
placeholder="예: 기계학습 모델의 성능을 향상시키는 방법은?",
lines=3
)
with gr.Row():
submit_btn = gr.Button("🚀 분석 시작", variant="primary", scale=2)
clear_btn = gr.Button("🗑️ 초기화", scale=1)
status_text = gr.Textbox(
label="상태",
interactive=False,
value="대기 중...",
max_lines=1
)
# 최종 결과
with gr.Row():
with gr.Column():
with gr.Accordion("📊 최종 종합 결과", open=True):
final_output = gr.Markdown(
value="*질문을 입력하면 결과가 여기에 표시됩니다.*"
)
# AI 출력들 - 한 줄에 나란히 배치
with gr.Row():
# 감독자 AI 출력
with gr.Column():
gr.Markdown("### 🧠 감독자 AI (거시적 분석)")
supervisor_output = gr.Textbox(
label="",
lines=20,
max_lines=25,
interactive=False,
elem_classes=["supervisor-box"]
)
# 조사자 AI 출력
with gr.Column():
gr.Markdown("### 🔍 조사자 AI (웹 검색 & 정리)")
researcher_output = gr.Textbox(
label="",
lines=20,
max_lines=25,
interactive=False,
elem_classes=["researcher-box"]
)
# 실행자 AI 출력
with gr.Column():
gr.Markdown("### 👁️ 실행자 AI (미시적 구현)")
executor_output = gr.Textbox(
label="",
lines=20,
max_lines=25,
interactive=False,
elem_classes=["executor-box"]
)
# 예제
gr.Examples(
examples=[
"기계학습 모델의 성능을 향상시키는 최신 방법은?",
"2024년 효과적인 프로젝트 관리 도구와 전략은?",
"지속 가능한 비즈니스 모델의 최신 트렌드는?",
"최신 데이터 시각화 도구와 기법은?",
"원격 팀의 생산성을 높이는 검증된 방법은?"
],
inputs=user_input,
label="💡 예제 질문"
)
# 이벤트 핸들러
submit_btn.click(
fn=process_query_streaming,
inputs=[user_input],
outputs=[supervisor_output, researcher_output, executor_output, final_output, status_text]
).then(
fn=lambda: "",
outputs=[user_input]
)
user_input.submit(
fn=process_query_streaming,
inputs=[user_input],
outputs=[supervisor_output, researcher_output, executor_output, final_output, status_text]
).then(
fn=lambda: "",
outputs=[user_input]
)
clear_btn.click(
fn=clear_all,
outputs=[supervisor_output, researcher_output, executor_output, final_output, status_text]
)
if __name__ == "__main__":
app.queue() # 스트리밍을 위한 큐 활성화
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=True,
show_error=True
)