flare / config_provider.py
ciyidogan's picture
Update config_provider.py
2392be3 verified
raw
history blame
44.4 kB
"""
Flare – ConfigProvider (TTS/STT support)
"""
from __future__ import annotations
import json, os
import threading
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from datetime import datetime
import commentjson
from utils import log
from pydantic import BaseModel, Field, HttpUrl, ValidationError, field_validator, validator
from encryption_utils import decrypt
# ---------------- Parameter Collection Config ---------
class ParameterCollectionConfig(BaseModel):
"""Configuration for smart parameter collection"""
max_params_per_question: int = Field(default=2, ge=1, le=5)
smart_grouping: bool = Field(default=True)
retry_unanswered: bool = Field(default=True)
collection_prompt: str = Field(default="""
You are a helpful assistant collecting information from the user.
Conversation context:
{{conversation_history}}
Intent: {{intent_name}} - {{intent_caption}}
Already collected:
{{collected_params}}
Still needed:
{{missing_params}}
Previously asked but not answered:
{{unanswered_params}}
Rules:
1. Ask for maximum {{max_params}} parameters in one question
2. Group parameters that naturally go together (like from/to cities, dates)
3. If some parameters were asked before but not answered, include them again
4. Be natural and conversational in {{project_language}}
5. Use context from the conversation to make the question flow naturally
Generate ONLY the question, nothing else.""")
class Config:
extra = "allow"
class GlobalConfig(BaseModel):
work_mode: str = Field("hfcloud", pattern=r"^(hfcloud|cloud|on-premise|gpt4o|gpt4o-mini)$")
cloud_token: Optional[str] = None
spark_endpoint: HttpUrl
internal_prompt: Optional[str] = None
# TTS configurations
tts_engine: str = Field("no_tts", pattern=r"^(no_tts|elevenlabs|blaze)$")
tts_engine_api_key: Optional[str] = None
tts_settings: Optional[Dict[str, Any]] = Field(default_factory=lambda: {
"use_ssml": False
})
# STT configurations
stt_engine: str = Field("no_stt", pattern=r"^(no_stt|google|azure|amazon|gpt4o_realtime|flicker)$")
stt_engine_api_key: Optional[str] = None
stt_settings: Optional[Dict[str, Any]] = Field(default_factory=lambda: {
"speech_timeout_ms": 2000,
"noise_reduction_level": 2,
"vad_sensitivity": 0.5,
"language": "tr-TR",
"model": "latest_long",
"use_enhanced": True,
"enable_punctuation": True,
"interim_results": True
})
parameter_collection_config: ParameterCollectionConfig = Field(default_factory=ParameterCollectionConfig)
users: List["UserConfig"] = []
def get_plain_token(self) -> Optional[str]:
if self.cloud_token:
# Lazy import to avoid circular dependency
from encryption_utils import decrypt
return decrypt(self.cloud_token)
return None
def get_tts_api_key(self) -> Optional[str]:
"""Get decrypted TTS API key"""
raw_key = self.tts_engine_api_key
if raw_key and raw_key.startswith("enc:"):
from encryption_utils import decrypt
decrypted = decrypt(raw_key)
log(f"πŸ”“ TTS key decrypted: {'***' + decrypted[-4:] if decrypted else 'None'}")
return decrypted
log(f"πŸ”‘ TTS key not encrypted: {'***' + raw_key[-4:] if raw_key else 'None'}")
return raw_key
def get_tts_settings(self) -> Dict[str, Any]:
"""Get TTS settings with defaults"""
return self.tts_settings or {
"use_ssml": False
}
def get_stt_api_key(self) -> Optional[str]:
"""Get decrypted STT API key or credentials path"""
raw_key = self.stt_engine_api_key
if raw_key and raw_key.startswith("enc:"):
from encryption_utils import decrypt
decrypted = decrypt(raw_key)
log(f"πŸ”“ STT key decrypted: {'***' + decrypted[-4:] if decrypted else 'None'}")
return decrypted
log(f"πŸ”‘ STT key/path: {'***' + raw_key[-4:] if raw_key else 'None'}")
return raw_key
def get_stt_settings(self) -> Dict[str, Any]:
"""Get STT settings with defaults"""
return self.stt_settings or {
"speech_timeout_ms": 2000,
"noise_reduction_level": 2,
"vad_sensitivity": 0.5,
"language": "tr-TR",
"model": "latest_long",
"use_enhanced": True,
"enable_punctuation": True,
"interim_results": True
}
def is_cloud_mode(self) -> bool:
"""Check if running in cloud mode (anything except on-premise)"""
return self.work_mode != "on-premise"
def is_on_premise(self) -> bool:
"""Check if running in on-premise mode"""
return self.work_mode == "on-premise"
def is_gpt_mode(self) -> bool:
"""Check if running in GPT mode (any variant)"""
return self.work_mode in ("gpt4o", "gpt4o-mini")
def get_gpt_model(self) -> str:
"""Get the GPT model name for OpenAI API"""
if self.work_mode == "gpt4o":
return "gpt-4o"
elif self.work_mode == "gpt4o-mini":
return "gpt-4o-mini"
return None
# ---------------- Global -----------------
class UserConfig(BaseModel):
username: str
password_hash: str
salt: str
# ---------------- Retry / Proxy ----------
class RetryConfig(BaseModel):
retry_count: int = Field(3, alias="max_attempts")
backoff_seconds: int = 2
strategy: str = Field("static", pattern=r"^(static|exponential)$")
class ProxyConfig(BaseModel):
enabled: bool = True
url: HttpUrl
# ---------------- API & Auth -------------
class APIAuthConfig(BaseModel):
enabled: bool = False
token_endpoint: Optional[HttpUrl] = None
response_token_path: str = "access_token"
token_request_body: Dict[str, Any] = Field({}, alias="body_template")
token_refresh_endpoint: Optional[HttpUrl] = None
token_refresh_body: Dict[str, Any] = {}
class Config:
extra = "allow"
populate_by_name = True
class APIConfig(BaseModel):
name: str
url: HttpUrl
method: str = Field("GET", pattern=r"^(GET|POST|PUT|PATCH|DELETE)$")
headers: Dict[str, Any] = {}
body_template: Dict[str, Any] = {}
timeout_seconds: int = 10
retry: RetryConfig = RetryConfig()
proxy: Optional[str | ProxyConfig] = None
auth: Optional[APIAuthConfig] = None
response_prompt: Optional[str] = None
class Config:
extra = "allow"
populate_by_name = True
# ---------------- Intent / Param ---------
class ParameterConfig(BaseModel):
name: str
caption: Optional[str] = ""
type: str = Field(..., pattern=r"^(int|float|bool|str|string|date)$") # Added 'date'
required: bool = True
variable_name: str
extraction_prompt: Optional[str] = None
validation_regex: Optional[str] = None
invalid_prompt: Optional[str] = None
type_error_prompt: Optional[str] = None
def canonical_type(self) -> str:
if self.type == "string":
return "str"
elif self.type == "date":
return "str" # Store dates as strings in ISO format
return self.type
class IntentConfig(BaseModel):
name: str
caption: Optional[str] = ""
locale: str = "tr-TR"
dependencies: List[str] = []
examples: List[str] = []
detection_prompt: Optional[str] = None
parameters: List[ParameterConfig] = []
action: str
fallback_timeout_prompt: Optional[str] = None
fallback_error_prompt: Optional[str] = None
class Config:
extra = "allow"
# ---------------- Version / Project ------
class LLMConfig(BaseModel):
repo_id: str
generation_config: Dict[str, Any] = {}
use_fine_tune: bool = False
fine_tune_zip: str = ""
class VersionConfig(BaseModel):
id: int = Field(..., alias="version_number")
caption: Optional[str] = ""
published: bool = False
general_prompt: str
llm: "LLMConfig"
intents: List["IntentConfig"]
class Config:
extra = "allow"
populate_by_name = True
class ProjectConfig(BaseModel):
id: Optional[int] = None
name: str
caption: Optional[str] = ""
enabled: bool = True
last_version_number: Optional[int] = None
versions: List[VersionConfig]
class Config:
extra = "allow"
# ---------------- Activity Log -----------
class ActivityLogEntry(BaseModel):
timestamp: str
username: str
action: str
entity_type: str
entity_id: Optional[int] = None
entity_name: Optional[str] = None
details: Optional[str] = None
# ---------------- Service Config ---------
class ServiceConfig(BaseModel):
global_config: GlobalConfig = Field(..., alias="config")
projects: List[ProjectConfig]
apis: List[APIConfig]
activity_log: List[ActivityLogEntry] = []
# Config level fields
project_id_counter: int = 1
last_update_date: Optional[str] = None
last_update_user: Optional[str] = None
# runtime helpers (skip validation)
_api_by_name: Dict[str, APIConfig] = {}
def build_index(self):
self._api_by_name = {a.name: a for a in self.apis}
def get_api(self, name: str) -> Optional[APIConfig]:
return self._api_by_name.get(name)
def to_jsonc_dict(self) -> dict:
"""Convert to dict for saving to JSONC file"""
data = self.model_dump(by_alias=True, exclude={'_api_by_name'})
# Convert API configs
for api in data.get('apis', []):
# Convert headers and body_template to JSON strings
if 'headers' in api and isinstance(api['headers'], dict):
api['headers'] = json.dumps(api['headers'], ensure_ascii=False)
if 'body_template' in api and isinstance(api['body_template'], dict):
api['body_template'] = json.dumps(api['body_template'], ensure_ascii=False)
# Convert auth configs
if 'auth' in api and api['auth']:
if 'token_request_body' in api['auth'] and isinstance(api['auth']['token_request_body'], dict):
api['auth']['token_request_body'] = json.dumps(api['auth']['token_request_body'], ensure_ascii=False)
if 'token_refresh_body' in api['auth'] and isinstance(api['auth']['token_refresh_body'], dict):
api['auth']['token_refresh_body'] = json.dumps(api['auth']['token_refresh_body'], ensure_ascii=False)
return data
def save(self):
"""Save configuration to file"""
config_path = Path(__file__).parent / "service_config.jsonc"
data = self.to_jsonc_dict()
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
log("βœ… Configuration saved to service_config.jsonc")
# ---------------- Provider Singleton -----
class ConfigProvider:
_instance: Optional[ServiceConfig] = None
_CONFIG_PATH = Path(__file__).parent / "service_config.jsonc"
_lock = threading.Lock() # Thread-safe access iΓ§in lock
_environment_checked = False # Environment kontrolΓΌ iΓ§in flag
@classmethod
def get(cls) -> ServiceConfig:
"""Get cached config - thread-safe"""
if cls._instance is None:
with cls._lock:
# Double-checked locking pattern
if cls._instance is None:
cls._instance = cls._load()
cls._instance.build_index()
# Environment kontrolΓΌnΓΌ sadece ilk yΓΌklemede yap
if not cls._environment_checked:
cls._check_environment_setup()
cls._environment_checked = True
return cls._instance
@classmethod
def reload(cls) -> ServiceConfig:
"""Force reload configuration from file - used after UI saves"""
with cls._lock:
log("πŸ”„ Reloading configuration...")
cls._instance = None
# reload'da environment kontrolΓΌnΓΌ tekrar yapmΔ±yoruz, flag'i reset etmiyoruz
return cls.get()
@classmethod
def update_config(cls, config_dict: dict):
"""Update the current configuration with new values"""
if cls._instance is None:
cls.get()
# Update global config
if 'config' in config_dict:
for key, value in config_dict['config'].items():
if hasattr(cls._instance.global_config, key):
setattr(cls._instance.global_config, key, value)
# Save to file
cls._instance.save()
@classmethod
def _load(cls) -> ServiceConfig:
"""Load configuration from service_config.jsonc"""
try:
log(f"πŸ“‚ Loading config from: {cls._CONFIG_PATH}")
if not cls._CONFIG_PATH.exists():
raise FileNotFoundError(f"Config file not found: {cls._CONFIG_PATH}")
with open(cls._CONFIG_PATH, 'r', encoding='utf-8') as f:
config_data = commentjson.load(f)
# Ensure required fields exist in config data
if 'config' not in config_data:
config_data['config'] = {}
config_section = config_data['config']
# Set defaults for missing fields
config_section.setdefault('work_mode', 'hfcloud')
config_section.setdefault('spark_endpoint', 'http://localhost:7861')
config_section.setdefault('cloud_token', None)
config_section.setdefault('internal_prompt', None)
config_section.setdefault('tts_engine', 'no_tts')
config_section.setdefault('tts_engine_api_key', None)
config_section.setdefault('tts_settings', {'use_ssml': False})
config_section.setdefault('stt_engine', 'no_stt')
config_section.setdefault('stt_engine_api_key', None)
config_section.setdefault('stt_settings', {
'speech_timeout_ms': 2000,
'noise_reduction_level': 2,
'vad_sensitivity': 0.5,
'language': 'tr-TR',
'model': 'latest_long',
'use_enhanced': True,
'enable_punctuation': True,
'interim_results': True
})
pcc = config_data['config'].get('parameter_collection_config')
if pcc is None:
# Yoksa default değerlerle ekle
config_data['config']['parameter_collection_config'] = ParameterCollectionConfig().model_dump()
config_section.setdefault('users', [])
# Convert string body/headers to dict if needed
for api in config_data.get('apis', []):
if isinstance(api.get('headers'), str):
try:
api['headers'] = json.loads(api['headers'])
except:
api['headers'] = {}
if isinstance(api.get('body_template'), str):
try:
api['body_template'] = json.loads(api['body_template'])
except:
api['body_template'] = {}
# Handle auth section
if api.get('auth'):
auth = api['auth']
if isinstance(auth.get('token_request_body'), str):
try:
auth['token_request_body'] = json.loads(auth['token_request_body'])
except:
auth['token_request_body'] = {}
if isinstance(auth.get('token_refresh_body'), str):
try:
auth['token_refresh_body'] = json.loads(auth['token_refresh_body'])
except:
auth['token_refresh_body'] = {}
# Fix activity_log entries if needed
if 'activity_log' in config_data:
for entry in config_data['activity_log']:
# Add missing username field
if 'username' not in entry:
entry['username'] = entry.get('user', 'system')
# Ensure all required fields exist
entry.setdefault('action', 'UNKNOWN')
entry.setdefault('entity_type', 'unknown')
entry.setdefault('timestamp', datetime.now().isoformat())
# Create ServiceConfig instance
service_config = ServiceConfig(**config_data)
log("βœ… Configuration loaded successfully")
return service_config
except FileNotFoundError as e:
log(f"❌ Config file not found: {e}")
raise
except json.JSONDecodeError as e:
log(f"❌ Invalid JSON in config file: {e}")
raise
except ValidationError as e:
log(f"❌ Config validation error: {e}")
raise
except Exception as e:
log(f"❌ Unexpected error loading config: {e}")
raise
@classmethod
def _check_environment_setup(cls):
"""Check if environment is properly configured based on work_mode"""
config = cls._instance.global_config
if config.is_cloud_mode():
# Cloud mode - check for HuggingFace Secrets
missing_secrets = []
if not os.getenv("JWT_SECRET"):
missing_secrets.append("JWT_SECRET")
if not os.getenv("FLARE_TOKEN_KEY"):
missing_secrets.append("FLARE_TOKEN_KEY")
if not os.getenv("SPARK_TOKEN"):
missing_secrets.append("SPARK_TOKEN")
if missing_secrets:
log(f"⚠️ Running in {config.work_mode} mode. Missing secrets: {', '.join(missing_secrets)}")
log("Please set these as HuggingFace Space Secrets for cloud deployment.")
else:
# On-premise mode - check for .env file
env_path = Path(__file__).parent / ".env"
if not env_path.exists():
log("⚠️ Running in on-premise mode but .env file not found")
# Docker ortamΔ±nda yazma izni olmayabilir, sadece uyarΔ± ver
log("⚠️ Cannot create .env file in Docker environment. Using default values.")
# Set default environment variables if not already set
if not os.getenv("JWT_SECRET"):
os.environ["JWT_SECRET"] = "flare-admin-secret-key-change-in-production"
if not os.getenv("JWT_ALGORITHM"):
os.environ["JWT_ALGORITHM"] = "HS256"
if not os.getenv("JWT_EXPIRATION_HOURS"):
os.environ["JWT_EXPIRATION_HOURS"] = "24"
if not os.getenv("FLARE_TOKEN_KEY"):
os.environ["FLARE_TOKEN_KEY"] = "flare-token-encryption-key"
if not os.getenv("SPARK_TOKEN"):
os.environ["SPARK_TOKEN"] = "your-spark-token-here"
log("βœ… Default environment variables set.")
@classmethod
def update_user_password(cls, username: str, new_hash: str, new_salt: str) -> None:
"""Update user password"""
if cls._instance is None:
cls.get()
user = next((u for u in cls._instance.global_config.users if u.username == username), None)
if user:
user.password_hash = new_hash
user.salt = new_salt
cls._instance.save()
cls.add_activity_log(username, "CHANGE_PASSWORD", "user", None, username)
@classmethod
def update_environment(cls, update_data: dict, username: str) -> None:
"""Update environment configuration"""
if cls._instance is None:
cls.get()
config = cls._instance.global_config
# Update fields
if 'work_mode' in update_data:
config.work_mode = update_data['work_mode']
if 'cloud_token' in update_data:
from encryption_utils import encrypt
config.cloud_token = encrypt(update_data['cloud_token']) if update_data['cloud_token'] else ""
if 'spark_endpoint' in update_data:
config.spark_endpoint = update_data['spark_endpoint']
if 'internal_prompt' in update_data:
config.internal_prompt = update_data['internal_prompt']
# TTS/STT settings
if 'tts_engine' in update_data:
config.tts_engine = update_data['tts_engine']
if 'tts_engine_api_key' in update_data and update_data['tts_engine_api_key'] != "***":
from encryption_utils import encrypt
config.tts_engine_api_key = encrypt(update_data['tts_engine_api_key']) if update_data['tts_engine_api_key'] else ""
if 'tts_settings' in update_data:
config.tts_settings = update_data['tts_settings']
if 'stt_engine' in update_data:
config.stt_engine = update_data['stt_engine']
if 'stt_engine_api_key' in update_data:
from encryption_utils import encrypt
config.stt_engine_api_key = encrypt(update_data['stt_engine_api_key']) if update_data['stt_engine_api_key'] else ""
if 'stt_settings' in update_data:
config.stt_settings = update_data['stt_settings']
if 'parameter_collection_config' in update_data:
config.parameter_collection_config = ParameterCollectionConfig(**update_data['parameter_collection_config'])
# Update metadata
cls._instance.last_update_date = datetime.now().isoformat()
cls._instance.last_update_user = username
# Add activity log
cls.add_activity_log(username, "UPDATE_ENVIRONMENT", "config", None, "environment",
f"Changed to {config.work_mode}, TTS: {config.tts_engine}, STT: {config.stt_engine}")
# Save to file
cls._instance.save()
# Reload to ensure consistency
cls.reload()
@classmethod
def add_activity_log(cls, username: str, action: str, entity_type: str,
entity_id: Any = None, entity_name: str = None, details: str = "") -> None:
"""Add activity log entry"""
if cls._instance is None:
cls.get()
entry = ActivityLogEntry(
timestamp=datetime.now().isoformat() + "Z",
username=username,
action=action,
entity_type=entity_type,
entity_id=entity_id,
entity_name=entity_name,
details=details
)
cls._instance.activity_log.append(entry)
# Keep only last 1000 entries
if len(cls._instance.activity_log) > 1000:
cls._instance.activity_log = cls._instance.activity_log[-1000:]
@classmethod
def get_project(cls, project_id: int) -> Optional[ProjectConfig]:
"""Get project by ID"""
if cls._instance is None:
cls.get()
return next((p for p in cls._instance.projects if p.id == project_id), None)
@classmethod
def get_project_by_name(cls, name: str) -> Optional[ProjectConfig]:
"""Get project by name"""
if cls._instance is None:
cls.get()
return next((p for p in cls._instance.projects if p.name == name), None)
@classmethod
def create_project(cls, project_data: dict, username: str) -> ProjectConfig:
"""Create new project"""
if cls._instance is None:
cls.get()
# Check duplicate name
existing = [p.name for p in cls._instance.projects if not getattr(p, 'deleted', False)]
if project_data['name'] in existing:
raise ValueError(f"Project name '{project_data['name']}' already exists")
# Get new project ID
project_id = cls._instance.project_id_counter
cls._instance.project_id_counter += 1
# Create project
new_project = ProjectConfig(
id=project_id,
name=project_data['name'],
caption=project_data.get('caption', ''),
icon=project_data.get('icon', 'folder'),
description=project_data.get('description', ''),
default_language=project_data.get('default_language', 'TΓΌrkΓ§e'),
supported_languages=project_data.get('supported_languages', ['tr-TR']),
timezone=project_data.get('timezone', 'Europe/Istanbul'),
region=project_data.get('region', 'tr-TR'),
enabled=True,
deleted=False,
version_id_counter=2, # Start from 2 since we create version 1
last_version_number=1,
created_date=datetime.now().isoformat() + "Z",
created_by=username,
last_update_date=datetime.now().isoformat() + "Z",
last_update_user=username,
versions=[
VersionConfig(
id=1,
version_number=1,
no=1,
caption="Version 1",
published=False,
created_date=datetime.now().isoformat() + "Z",
created_by=username,
last_update_date=datetime.now().isoformat() + "Z",
last_update_user=username,
general_prompt="",
llm=LLMConfig(
repo_id="Qwen/Qwen2.5-72B-Instruct",
generation_config={
"temperature": 0.5,
"max_tokens": 2048,
"top_p": 0.7,
"repetition_penalty": 1.1
},
use_fine_tune=False,
fine_tune_zip=""
),
intents=[]
)
]
)
cls._instance.projects.append(new_project)
# Add activity log
cls.add_activity_log(username, "CREATE_PROJECT", "project", project_id, project_data['name'])
# Save
cls._instance.save()
return new_project
@classmethod
def update_project(cls, project_id: int, update_data: dict, username: str) -> ProjectConfig:
"""Update project"""
if cls._instance is None:
cls.get()
project = cls.get_project(project_id)
if not project:
raise ValueError(f"Project {project_id} not found")
# Check race condition
if project.last_update_date != update_data.get('last_update_date'):
raise ValueError("Project was modified by another user")
# Update fields
project.caption = update_data['caption']
project.icon = update_data.get('icon', project.icon)
project.description = update_data.get('description', project.description)
project.default_language = update_data.get('default_language', project.default_language)
project.supported_languages = update_data.get('supported_languages', project.supported_languages)
project.timezone = update_data.get('timezone', project.timezone)
project.region = update_data.get('region', project.region)
project.last_update_date = datetime.now().isoformat() + "Z"
project.last_update_user = username
# Add activity log
cls.add_activity_log(username, "UPDATE_PROJECT", "project", project_id, project.name)
# Save
cls._instance.save()
return project
@classmethod
def delete_project(cls, project_id: int, username: str) -> None:
"""Delete project (soft delete)"""
if cls._instance is None:
cls.get()
project = cls.get_project(project_id)
if not project:
raise ValueError(f"Project {project_id} not found")
project.deleted = True
project.last_update_date = datetime.now().isoformat() + "Z"
project.last_update_user = username
# Add activity log
cls.add_activity_log(username, "DELETE_PROJECT", "project", project_id, project.name)
# Save
cls._instance.save()
@classmethod
def toggle_project(cls, project_id: int, username: str) -> bool:
"""Toggle project enabled status"""
if cls._instance is None:
cls.get()
project = cls.get_project(project_id)
if not project:
raise ValueError(f"Project {project_id} not found")
project.enabled = not project.enabled
project.last_update_date = datetime.now().isoformat() + "Z"
project.last_update_user = username
# Add activity log
action = "ENABLE_PROJECT" if project.enabled else "DISABLE_PROJECT"
cls.add_activity_log(username, action, "project", project_id, project.name)
# Save
cls._instance.save()
return project.enabled
@classmethod
def create_version(cls, project_id: int, version_data: dict, username: str) -> VersionConfig:
"""Create new version"""
if cls._instance is None:
cls.get()
project = cls.get_project(project_id)
if not project:
raise ValueError(f"Project {project_id} not found")
# Get next version ID
version_id = project.version_id_counter
project.version_id_counter += 1
# Get next version number
existing_versions = [v for v in project.versions if not getattr(v, 'deleted', False)]
version_no = max([v.no for v in existing_versions], default=0) + 1
# Create base version
new_version = VersionConfig(
id=version_id,
version_number=version_id,
no=version_no,
caption=version_data['caption'],
description=f"Version {version_no}",
published=False,
deleted=False,
created_date=datetime.now().isoformat() + "Z",
created_by=username,
last_update_date=datetime.now().isoformat() + "Z",
last_update_user=username,
general_prompt="",
llm=LLMConfig(
repo_id="",
generation_config={
"max_new_tokens": 512,
"temperature": 0.7,
"top_p": 0.95,
"top_k": 50,
"repetition_penalty": 1.1
},
use_fine_tune=False,
fine_tune_zip=""
),
intents=[]
)
# Copy from source version if specified
if version_data.get('source_version_id'):
source_version = next(
(v for v in project.versions if v.id == version_data['source_version_id']),
None
)
if source_version:
new_version.general_prompt = source_version.general_prompt
new_version.llm = source_version.llm.model_copy(deep=True)
new_version.intents = [i.model_copy(deep=True) for i in source_version.intents]
project.versions.append(new_version)
project.last_update_date = datetime.now().isoformat() + "Z"
project.last_update_user = username
# Add activity log
cls.add_activity_log(username, "CREATE_VERSION", "version", version_id,
f"{project.name} v{version_no}")
# Save
cls._instance.save()
return new_version
@classmethod
def update_version(cls, project_id: int, version_id: int, update_data: dict, username: str) -> VersionConfig:
"""Update version"""
if cls._instance is None:
cls.get()
project = cls.get_project(project_id)
if not project:
raise ValueError(f"Project {project_id} not found")
version = next((v for v in project.versions if v.id == version_id), None)
if not version:
raise ValueError(f"Version {version_id} not found")
# Check race condition
if version.last_update_date != update_data.get('last_update_date'):
raise ValueError("Version was modified by another user")
# Cannot update published version
if version.published:
raise ValueError("Cannot modify published version")
# Update version
version.caption = update_data['caption']
version.general_prompt = update_data['general_prompt']
version.llm = LLMConfig(**update_data['llm'])
version.intents = [IntentConfig(**i) for i in update_data['intents']]
version.last_update_date = datetime.now().isoformat() + "Z"
version.last_update_user = username
# Update project timestamp
project.last_update_date = datetime.now().isoformat() + "Z"
project.last_update_user = username
# Add activity log
cls.add_activity_log(username, "UPDATE_VERSION", "version", version_id,
f"{project.name} v{version.no}")
# Save
cls._instance.save()
return version
@classmethod
def publish_version(cls, project_id: int, version_id: int, username: str) -> tuple[ProjectConfig, VersionConfig]:
"""Publish version"""
if cls._instance is None:
cls.get()
project = cls.get_project(project_id)
if not project:
raise ValueError(f"Project {project_id} not found")
version = next((v for v in project.versions if v.id == version_id), None)
if not version:
raise ValueError(f"Version {version_id} not found")
# Unpublish all other versions
for v in project.versions:
if v.id != version_id:
v.published = False
# Publish this version
version.published = True
version.publish_date = datetime.now().isoformat() + "Z"
version.published_by = username
version.last_update_date = datetime.now().isoformat() + "Z"
version.last_update_user = username
# Update project timestamp
project.last_update_date = datetime.now().isoformat() + "Z"
project.last_update_user = username
# Add activity log
cls.add_activity_log(username, "PUBLISH_VERSION", "version", version_id,
f"{project.name} v{version.no}")
# Save
cls._instance.save()
return project, version
@classmethod
def delete_version(cls, project_id: int, version_id: int, username: str) -> None:
"""Delete version (soft delete)"""
if cls._instance is None:
cls.get()
project = cls.get_project(project_id)
if not project:
raise ValueError(f"Project {project_id} not found")
version = next((v for v in project.versions if v.id == version_id), None)
if not version:
raise ValueError(f"Version {version_id} not found")
# Cannot delete published version
if version.published:
raise ValueError("Cannot delete published version")
version.deleted = True
version.last_update_date = datetime.now().isoformat() + "Z"
version.last_update_user = username
project.last_update_date = datetime.now().isoformat() + "Z"
project.last_update_user = username
# Add activity log
cls.add_activity_log(username, "DELETE_VERSION", "version", version_id,
f"{project.name} v{version.no}")
# Save
cls._instance.save()
@classmethod
def create_api(cls, api_data: dict, username: str) -> APIConfig:
"""Create new API"""
if cls._instance is None:
cls.get()
# Check duplicate name
existing = [a.name for a in cls._instance.apis if not getattr(a, 'deleted', False)]
if api_data['name'] in existing:
raise ValueError(f"API name '{api_data['name']}' already exists")
# Create API
new_api = APIConfig(
**api_data,
deleted=False,
created_date=datetime.now().isoformat() + "Z",
created_by=username,
last_update_date=datetime.now().isoformat() + "Z",
last_update_user=username
)
cls._instance.apis.append(new_api)
cls._instance.build_index() # Rebuild index
# Add activity log
cls.add_activity_log(username, "CREATE_API", "api", None, api_data['name'])
# Save
cls._instance.save()
return new_api
@classmethod
def update_api(cls, api_name: str, update_data: dict, username: str) -> APIConfig:
"""Update API"""
if cls._instance is None:
cls.get()
api = cls._instance.get_api(api_name)
if not api:
raise ValueError(f"API '{api_name}' not found")
# Check race condition
if api.last_update_date != update_data.get('last_update_date'):
raise ValueError("API was modified by another user")
# Check if API is in use in published versions
for project in cls._instance.projects:
for version in project.versions:
if version.published:
for intent in version.intents:
if intent.action == api_name:
raise ValueError(f"API is used in published version of project '{project.name}'")
# Update API
for key, value in update_data.items():
if key != 'last_update_date' and hasattr(api, key):
setattr(api, key, value)
api.last_update_date = datetime.now().isoformat() + "Z"
api.last_update_user = username
# Add activity log
cls.add_activity_log(username, "UPDATE_API", "api", None, api_name)
# Save
cls._instance.save()
return api
@classmethod
def delete_api(cls, api_name: str, username: str) -> None:
"""Delete API (soft delete)"""
if cls._instance is None:
cls.get()
api = cls._instance.get_api(api_name)
if not api:
raise ValueError(f"API '{api_name}' not found")
# Check if API is in use
for project in cls._instance.projects:
if getattr(project, 'deleted', False):
continue
for version in project.versions:
if getattr(version, 'deleted', False):
continue
for intent in version.intents:
if intent.action == api_name:
raise ValueError(f"API is used in intent '{intent.name}' in project '{project.name}' version {version.no}")
api.deleted = True
api.last_update_date = datetime.now().isoformat() + "Z"
api.last_update_user = username
# Add activity log
cls.add_activity_log(username, "DELETE_API", "api", None, api_name)
# Save
cls._instance.save()
@classmethod
def import_project(cls, project_data: dict, username: str) -> ProjectConfig:
"""Import project from JSON"""
if cls._instance is None:
cls.get()
# Validate structure
if "name" not in project_data:
raise ValueError("Invalid project data")
# Create new project with imported data
imported_data = {
"name": project_data["name"],
"caption": project_data.get("caption", ""),
"icon": project_data.get("icon", "folder"),
"description": project_data.get("description", "")
}
# Create project
new_project = cls.create_project(imported_data, username)
# Clear default version
new_project.versions = []
# Import versions
for idx, version_data in enumerate(project_data.get("versions", [])):
new_version = VersionConfig(
id=idx + 1,
version_number=idx + 1,
no=idx + 1,
caption=version_data.get("caption", f"Version {idx + 1}"),
description=version_data.get("description", ""),
published=False,
deleted=False,
created_date=datetime.now().isoformat() + "Z",
created_by=username,
last_update_date=datetime.now().isoformat() + "Z",
last_update_user=username,
general_prompt=version_data.get("general_prompt", ""),
llm=LLMConfig(**version_data.get("llm", {})) if version_data.get("llm") else LLMConfig(
repo_id="",
generation_config={},
use_fine_tune=False,
fine_tune_zip=""
),
intents=[IntentConfig(**i) for i in version_data.get("intents", [])]
)
new_project.versions.append(new_version)
new_project.version_id_counter = idx + 2
# Add activity log (already added in create_project)
cls.add_activity_log(username, "IMPORT_PROJECT", "project", new_project.id, new_project.name)
# Save
cls._instance.save()
return new_project
@classmethod
def export_project(cls, project_id: int, username: str) -> dict:
"""Export project as JSON"""
if cls._instance is None:
cls.get()
project = cls.get_project(project_id)
if not project:
raise ValueError(f"Project {project_id} not found")
# Create export data
export_data = {
"name": project.name,
"caption": project.caption,
"icon": project.icon,
"description": project.description,
"versions": []
}
# Export versions
for version in project.versions:
if not getattr(version, 'deleted', False):
export_version = {
"caption": version.caption,
"description": getattr(version, 'description', ''),
"general_prompt": version.general_prompt,
"llm": version.llm.model_dump(),
"intents": [i.model_dump() for i in version.intents]
}
export_data["versions"].append(export_version)
# Add activity log
cls.add_activity_log(username, "EXPORT_PROJECT", "project", project_id, project.name)
cls._instance.save()
return export_data
@classmethod
def cleanup_activity_logs(cls, keep_count: int = 1000) -> None:
"""Cleanup old activity logs"""
if cls._instance is None:
cls.get()
if len(cls._instance.activity_log) > keep_count:
# Keep only last entries
cls._instance.activity_log = cls._instance.activity_log[-keep_count:]
cls._instance.save()
# Forward references
GlobalConfig.model_rebuild()
VersionConfig.model_rebuild()
ServiceConfig.model_rebuild()