Spaces:
Running
Running
""" | |
Flare β ConfigProvider (TTS/STT support) | |
""" | |
from __future__ import annotations | |
import json, os | |
import threading | |
from pathlib import Path | |
from typing import Any, Dict, List, Optional, Union | |
from datetime import datetime | |
import commentjson | |
from utils import log | |
from pydantic import BaseModel, Field, HttpUrl, ValidationError, field_validator, validator | |
from encryption_utils import decrypt | |
# ---------------- Parameter Collection Config --------- | |
class ParameterCollectionConfig(BaseModel): | |
"""Configuration for smart parameter collection""" | |
max_params_per_question: int = Field(default=2, ge=1, le=5) | |
smart_grouping: bool = Field(default=True) | |
retry_unanswered: bool = Field(default=True) | |
collection_prompt: str = Field(default=""" | |
You are a helpful assistant collecting information from the user. | |
Conversation context: | |
{{conversation_history}} | |
Intent: {{intent_name}} - {{intent_caption}} | |
Already collected: | |
{{collected_params}} | |
Still needed: | |
{{missing_params}} | |
Previously asked but not answered: | |
{{unanswered_params}} | |
Rules: | |
1. Ask for maximum {{max_params}} parameters in one question | |
2. Group parameters that naturally go together (like from/to cities, dates) | |
3. If some parameters were asked before but not answered, include them again | |
4. Be natural and conversational in {{project_language}} | |
5. Use context from the conversation to make the question flow naturally | |
Generate ONLY the question, nothing else.""") | |
class Config: | |
extra = "allow" | |
class GlobalConfig(BaseModel): | |
work_mode: str = Field("hfcloud", pattern=r"^(hfcloud|cloud|on-premise|gpt4o|gpt4o-mini)$") | |
cloud_token: Optional[str] = None | |
spark_endpoint: HttpUrl | |
internal_prompt: Optional[str] = None | |
# TTS configurations | |
tts_engine: str = Field("no_tts", pattern=r"^(no_tts|elevenlabs|blaze)$") | |
tts_engine_api_key: Optional[str] = None | |
tts_settings: Optional[Dict[str, Any]] = Field(default_factory=lambda: { | |
"use_ssml": False | |
}) | |
# STT configurations | |
stt_engine: str = Field("no_stt", pattern=r"^(no_stt|google|azure|amazon|gpt4o_realtime|flicker)$") | |
stt_engine_api_key: Optional[str] = None | |
stt_settings: Optional[Dict[str, Any]] = Field(default_factory=lambda: { | |
"speech_timeout_ms": 2000, | |
"noise_reduction_level": 2, | |
"vad_sensitivity": 0.5, | |
"language": "tr-TR", | |
"model": "latest_long", | |
"use_enhanced": True, | |
"enable_punctuation": True, | |
"interim_results": True | |
}) | |
parameter_collection_config: ParameterCollectionConfig = Field(default_factory=ParameterCollectionConfig) | |
users: List["UserConfig"] = [] | |
def get_plain_token(self) -> Optional[str]: | |
if self.cloud_token: | |
# Lazy import to avoid circular dependency | |
from encryption_utils import decrypt | |
return decrypt(self.cloud_token) | |
return None | |
def get_tts_api_key(self) -> Optional[str]: | |
"""Get decrypted TTS API key""" | |
raw_key = self.tts_engine_api_key | |
if raw_key and raw_key.startswith("enc:"): | |
from encryption_utils import decrypt | |
decrypted = decrypt(raw_key) | |
log(f"π TTS key decrypted: {'***' + decrypted[-4:] if decrypted else 'None'}") | |
return decrypted | |
log(f"π TTS key not encrypted: {'***' + raw_key[-4:] if raw_key else 'None'}") | |
return raw_key | |
def get_tts_settings(self) -> Dict[str, Any]: | |
"""Get TTS settings with defaults""" | |
return self.tts_settings or { | |
"use_ssml": False | |
} | |
def get_stt_api_key(self) -> Optional[str]: | |
"""Get decrypted STT API key or credentials path""" | |
raw_key = self.stt_engine_api_key | |
if raw_key and raw_key.startswith("enc:"): | |
from encryption_utils import decrypt | |
decrypted = decrypt(raw_key) | |
log(f"π STT key decrypted: {'***' + decrypted[-4:] if decrypted else 'None'}") | |
return decrypted | |
log(f"π STT key/path: {'***' + raw_key[-4:] if raw_key else 'None'}") | |
return raw_key | |
def get_stt_settings(self) -> Dict[str, Any]: | |
"""Get STT settings with defaults""" | |
return self.stt_settings or { | |
"speech_timeout_ms": 2000, | |
"noise_reduction_level": 2, | |
"vad_sensitivity": 0.5, | |
"language": "tr-TR", | |
"model": "latest_long", | |
"use_enhanced": True, | |
"enable_punctuation": True, | |
"interim_results": True | |
} | |
def is_cloud_mode(self) -> bool: | |
"""Check if running in cloud mode (anything except on-premise)""" | |
return self.work_mode != "on-premise" | |
def is_on_premise(self) -> bool: | |
"""Check if running in on-premise mode""" | |
return self.work_mode == "on-premise" | |
def is_gpt_mode(self) -> bool: | |
"""Check if running in GPT mode (any variant)""" | |
return self.work_mode in ("gpt4o", "gpt4o-mini") | |
def get_gpt_model(self) -> str: | |
"""Get the GPT model name for OpenAI API""" | |
if self.work_mode == "gpt4o": | |
return "gpt-4o" | |
elif self.work_mode == "gpt4o-mini": | |
return "gpt-4o-mini" | |
return None | |
# ---------------- Global ----------------- | |
class UserConfig(BaseModel): | |
username: str | |
password_hash: str | |
salt: str | |
# ---------------- Retry / Proxy ---------- | |
class RetryConfig(BaseModel): | |
retry_count: int = Field(3, alias="max_attempts") | |
backoff_seconds: int = 2 | |
strategy: str = Field("static", pattern=r"^(static|exponential)$") | |
class ProxyConfig(BaseModel): | |
enabled: bool = True | |
url: HttpUrl | |
# ---------------- API & Auth ------------- | |
class APIAuthConfig(BaseModel): | |
enabled: bool = False | |
token_endpoint: Optional[HttpUrl] = None | |
response_token_path: str = "access_token" | |
token_request_body: Dict[str, Any] = Field({}, alias="body_template") | |
token_refresh_endpoint: Optional[HttpUrl] = None | |
token_refresh_body: Dict[str, Any] = {} | |
class Config: | |
extra = "allow" | |
populate_by_name = True | |
class APIConfig(BaseModel): | |
name: str | |
url: HttpUrl | |
method: str = Field("GET", pattern=r"^(GET|POST|PUT|PATCH|DELETE)$") | |
headers: Dict[str, Any] = {} | |
body_template: Dict[str, Any] = {} | |
timeout_seconds: int = 10 | |
retry: RetryConfig = RetryConfig() | |
proxy: Optional[str | ProxyConfig] = None | |
auth: Optional[APIAuthConfig] = None | |
response_prompt: Optional[str] = None | |
class Config: | |
extra = "allow" | |
populate_by_name = True | |
# ---------------- Intent / Param --------- | |
class ParameterConfig(BaseModel): | |
name: str | |
caption: Optional[str] = "" | |
type: str = Field(..., pattern=r"^(int|float|bool|str|string|date)$") # Added 'date' | |
required: bool = True | |
variable_name: str | |
extraction_prompt: Optional[str] = None | |
validation_regex: Optional[str] = None | |
invalid_prompt: Optional[str] = None | |
type_error_prompt: Optional[str] = None | |
def canonical_type(self) -> str: | |
if self.type == "string": | |
return "str" | |
elif self.type == "date": | |
return "str" # Store dates as strings in ISO format | |
return self.type | |
class IntentConfig(BaseModel): | |
name: str | |
caption: Optional[str] = "" | |
locale: str = "tr-TR" | |
dependencies: List[str] = [] | |
examples: List[str] = [] | |
detection_prompt: Optional[str] = None | |
parameters: List[ParameterConfig] = [] | |
action: str | |
fallback_timeout_prompt: Optional[str] = None | |
fallback_error_prompt: Optional[str] = None | |
class Config: | |
extra = "allow" | |
# ---------------- Version / Project ------ | |
class LLMConfig(BaseModel): | |
repo_id: str | |
generation_config: Dict[str, Any] = {} | |
use_fine_tune: bool = False | |
fine_tune_zip: str = "" | |
class VersionConfig(BaseModel): | |
id: int = Field(..., alias="version_number") | |
caption: Optional[str] = "" | |
published: bool = False | |
general_prompt: str | |
llm: "LLMConfig" | |
intents: List["IntentConfig"] | |
class Config: | |
extra = "allow" | |
populate_by_name = True | |
class ProjectConfig(BaseModel): | |
id: Optional[int] = None | |
name: str | |
caption: Optional[str] = "" | |
enabled: bool = True | |
last_version_number: Optional[int] = None | |
versions: List[VersionConfig] | |
class Config: | |
extra = "allow" | |
# ---------------- Activity Log ----------- | |
class ActivityLogEntry(BaseModel): | |
timestamp: str | |
username: str | |
action: str | |
entity_type: str | |
entity_id: Optional[int] = None | |
entity_name: Optional[str] = None | |
details: Optional[str] = None | |
# ---------------- Service Config --------- | |
class ServiceConfig(BaseModel): | |
global_config: GlobalConfig = Field(..., alias="config") | |
projects: List[ProjectConfig] | |
apis: List[APIConfig] | |
activity_log: List[ActivityLogEntry] = [] | |
# Config level fields | |
project_id_counter: int = 1 | |
last_update_date: Optional[str] = None | |
last_update_user: Optional[str] = None | |
# runtime helpers (skip validation) | |
_api_by_name: Dict[str, APIConfig] = {} | |
def build_index(self): | |
self._api_by_name = {a.name: a for a in self.apis} | |
def get_api(self, name: str) -> Optional[APIConfig]: | |
return self._api_by_name.get(name) | |
def to_jsonc_dict(self) -> dict: | |
"""Convert to dict for saving to JSONC file""" | |
data = self.model_dump(by_alias=True, exclude={'_api_by_name'}) | |
# Convert API configs | |
for api in data.get('apis', []): | |
# Convert headers and body_template to JSON strings | |
if 'headers' in api and isinstance(api['headers'], dict): | |
api['headers'] = json.dumps(api['headers'], ensure_ascii=False) | |
if 'body_template' in api and isinstance(api['body_template'], dict): | |
api['body_template'] = json.dumps(api['body_template'], ensure_ascii=False) | |
# Convert auth configs | |
if 'auth' in api and api['auth']: | |
if 'token_request_body' in api['auth'] and isinstance(api['auth']['token_request_body'], dict): | |
api['auth']['token_request_body'] = json.dumps(api['auth']['token_request_body'], ensure_ascii=False) | |
if 'token_refresh_body' in api['auth'] and isinstance(api['auth']['token_refresh_body'], dict): | |
api['auth']['token_refresh_body'] = json.dumps(api['auth']['token_refresh_body'], ensure_ascii=False) | |
return data | |
def save(self): | |
"""Save configuration to file""" | |
config_path = Path(__file__).parent / "service_config.jsonc" | |
data = self.to_jsonc_dict() | |
with open(config_path, 'w', encoding='utf-8') as f: | |
json.dump(data, f, ensure_ascii=False, indent=2) | |
log("β Configuration saved to service_config.jsonc") | |
# ---------------- Provider Singleton ----- | |
class ConfigProvider: | |
_instance: Optional[ServiceConfig] = None | |
_CONFIG_PATH = Path(__file__).parent / "service_config.jsonc" | |
_lock = threading.Lock() # Thread-safe access iΓ§in lock | |
_environment_checked = False # Environment kontrolΓΌ iΓ§in flag | |
def get(cls) -> ServiceConfig: | |
"""Get cached config - thread-safe""" | |
if cls._instance is None: | |
with cls._lock: | |
# Double-checked locking pattern | |
if cls._instance is None: | |
cls._instance = cls._load() | |
cls._instance.build_index() | |
# Environment kontrolΓΌnΓΌ sadece ilk yΓΌklemede yap | |
if not cls._environment_checked: | |
cls._check_environment_setup() | |
cls._environment_checked = True | |
return cls._instance | |
def reload(cls) -> ServiceConfig: | |
"""Force reload configuration from file - used after UI saves""" | |
with cls._lock: | |
log("π Reloading configuration...") | |
cls._instance = None | |
# reload'da environment kontrolΓΌnΓΌ tekrar yapmΔ±yoruz, flag'i reset etmiyoruz | |
return cls.get() | |
def update_config(cls, config_dict: dict): | |
"""Update the current configuration with new values""" | |
if cls._instance is None: | |
cls.get() | |
# Update global config | |
if 'config' in config_dict: | |
for key, value in config_dict['config'].items(): | |
if hasattr(cls._instance.global_config, key): | |
setattr(cls._instance.global_config, key, value) | |
# Save to file | |
cls._instance.save() | |
def _load(cls) -> ServiceConfig: | |
"""Load configuration from service_config.jsonc""" | |
try: | |
log(f"π Loading config from: {cls._CONFIG_PATH}") | |
if not cls._CONFIG_PATH.exists(): | |
raise FileNotFoundError(f"Config file not found: {cls._CONFIG_PATH}") | |
with open(cls._CONFIG_PATH, 'r', encoding='utf-8') as f: | |
config_data = commentjson.load(f) | |
# Ensure required fields exist in config data | |
if 'config' not in config_data: | |
config_data['config'] = {} | |
config_section = config_data['config'] | |
# Set defaults for missing fields | |
config_section.setdefault('work_mode', 'hfcloud') | |
config_section.setdefault('spark_endpoint', 'http://localhost:7861') | |
config_section.setdefault('cloud_token', None) | |
config_section.setdefault('internal_prompt', None) | |
config_section.setdefault('tts_engine', 'no_tts') | |
config_section.setdefault('tts_engine_api_key', None) | |
config_section.setdefault('tts_settings', {'use_ssml': False}) | |
config_section.setdefault('stt_engine', 'no_stt') | |
config_section.setdefault('stt_engine_api_key', None) | |
config_section.setdefault('stt_settings', { | |
'speech_timeout_ms': 2000, | |
'noise_reduction_level': 2, | |
'vad_sensitivity': 0.5, | |
'language': 'tr-TR', | |
'model': 'latest_long', | |
'use_enhanced': True, | |
'enable_punctuation': True, | |
'interim_results': True | |
}) | |
pcc = config_data['config'].get('parameter_collection_config') | |
if pcc is None: | |
# Yoksa default deΔerlerle ekle | |
config_data['config']['parameter_collection_config'] = ParameterCollectionConfig().model_dump() | |
config_section.setdefault('users', []) | |
# Convert string body/headers to dict if needed | |
for api in config_data.get('apis', []): | |
if isinstance(api.get('headers'), str): | |
try: | |
api['headers'] = json.loads(api['headers']) | |
except: | |
api['headers'] = {} | |
if isinstance(api.get('body_template'), str): | |
try: | |
api['body_template'] = json.loads(api['body_template']) | |
except: | |
api['body_template'] = {} | |
# Handle auth section | |
if api.get('auth'): | |
auth = api['auth'] | |
if isinstance(auth.get('token_request_body'), str): | |
try: | |
auth['token_request_body'] = json.loads(auth['token_request_body']) | |
except: | |
auth['token_request_body'] = {} | |
if isinstance(auth.get('token_refresh_body'), str): | |
try: | |
auth['token_refresh_body'] = json.loads(auth['token_refresh_body']) | |
except: | |
auth['token_refresh_body'] = {} | |
# Fix activity_log entries if needed | |
if 'activity_log' in config_data: | |
for entry in config_data['activity_log']: | |
# Add missing username field | |
if 'username' not in entry: | |
entry['username'] = entry.get('user', 'system') | |
# Ensure all required fields exist | |
entry.setdefault('action', 'UNKNOWN') | |
entry.setdefault('entity_type', 'unknown') | |
entry.setdefault('timestamp', datetime.now().isoformat()) | |
# Create ServiceConfig instance | |
service_config = ServiceConfig(**config_data) | |
log("β Configuration loaded successfully") | |
return service_config | |
except FileNotFoundError as e: | |
log(f"β Config file not found: {e}") | |
raise | |
except json.JSONDecodeError as e: | |
log(f"β Invalid JSON in config file: {e}") | |
raise | |
except ValidationError as e: | |
log(f"β Config validation error: {e}") | |
raise | |
except Exception as e: | |
log(f"β Unexpected error loading config: {e}") | |
raise | |
def _check_environment_setup(cls): | |
"""Check if environment is properly configured based on work_mode""" | |
config = cls._instance.global_config | |
if config.is_cloud_mode(): | |
# Cloud mode - check for HuggingFace Secrets | |
missing_secrets = [] | |
if not os.getenv("JWT_SECRET"): | |
missing_secrets.append("JWT_SECRET") | |
if not os.getenv("FLARE_TOKEN_KEY"): | |
missing_secrets.append("FLARE_TOKEN_KEY") | |
if not os.getenv("SPARK_TOKEN"): | |
missing_secrets.append("SPARK_TOKEN") | |
if missing_secrets: | |
log(f"β οΈ Running in {config.work_mode} mode. Missing secrets: {', '.join(missing_secrets)}") | |
log("Please set these as HuggingFace Space Secrets for cloud deployment.") | |
else: | |
# On-premise mode - check for .env file | |
env_path = Path(__file__).parent / ".env" | |
if not env_path.exists(): | |
log("β οΈ Running in on-premise mode but .env file not found") | |
# Docker ortamΔ±nda yazma izni olmayabilir, sadece uyarΔ± ver | |
log("β οΈ Cannot create .env file in Docker environment. Using default values.") | |
# Set default environment variables if not already set | |
if not os.getenv("JWT_SECRET"): | |
os.environ["JWT_SECRET"] = "flare-admin-secret-key-change-in-production" | |
if not os.getenv("JWT_ALGORITHM"): | |
os.environ["JWT_ALGORITHM"] = "HS256" | |
if not os.getenv("JWT_EXPIRATION_HOURS"): | |
os.environ["JWT_EXPIRATION_HOURS"] = "24" | |
if not os.getenv("FLARE_TOKEN_KEY"): | |
os.environ["FLARE_TOKEN_KEY"] = "flare-token-encryption-key" | |
if not os.getenv("SPARK_TOKEN"): | |
os.environ["SPARK_TOKEN"] = "your-spark-token-here" | |
log("β Default environment variables set.") | |
def update_user_password(cls, username: str, new_hash: str, new_salt: str) -> None: | |
"""Update user password""" | |
if cls._instance is None: | |
cls.get() | |
user = next((u for u in cls._instance.global_config.users if u.username == username), None) | |
if user: | |
user.password_hash = new_hash | |
user.salt = new_salt | |
cls._instance.save() | |
cls.add_activity_log(username, "CHANGE_PASSWORD", "user", None, username) | |
def update_environment(cls, update_data: dict, username: str) -> None: | |
"""Update environment configuration""" | |
if cls._instance is None: | |
cls.get() | |
config = cls._instance.global_config | |
# Update fields | |
if 'work_mode' in update_data: | |
config.work_mode = update_data['work_mode'] | |
if 'cloud_token' in update_data: | |
from encryption_utils import encrypt | |
config.cloud_token = encrypt(update_data['cloud_token']) if update_data['cloud_token'] else "" | |
if 'spark_endpoint' in update_data: | |
config.spark_endpoint = update_data['spark_endpoint'] | |
if 'internal_prompt' in update_data: | |
config.internal_prompt = update_data['internal_prompt'] | |
# TTS/STT settings | |
if 'tts_engine' in update_data: | |
config.tts_engine = update_data['tts_engine'] | |
if 'tts_engine_api_key' in update_data and update_data['tts_engine_api_key'] != "***": | |
from encryption_utils import encrypt | |
config.tts_engine_api_key = encrypt(update_data['tts_engine_api_key']) if update_data['tts_engine_api_key'] else "" | |
if 'tts_settings' in update_data: | |
config.tts_settings = update_data['tts_settings'] | |
if 'stt_engine' in update_data: | |
config.stt_engine = update_data['stt_engine'] | |
if 'stt_engine_api_key' in update_data: | |
from encryption_utils import encrypt | |
config.stt_engine_api_key = encrypt(update_data['stt_engine_api_key']) if update_data['stt_engine_api_key'] else "" | |
if 'stt_settings' in update_data: | |
config.stt_settings = update_data['stt_settings'] | |
if 'parameter_collection_config' in update_data: | |
config.parameter_collection_config = ParameterCollectionConfig(**update_data['parameter_collection_config']) | |
# Update metadata | |
cls._instance.last_update_date = datetime.now().isoformat() | |
cls._instance.last_update_user = username | |
# Add activity log | |
cls.add_activity_log(username, "UPDATE_ENVIRONMENT", "config", None, "environment", | |
f"Changed to {config.work_mode}, TTS: {config.tts_engine}, STT: {config.stt_engine}") | |
# Save to file | |
cls._instance.save() | |
# Reload to ensure consistency | |
cls.reload() | |
def add_activity_log(cls, username: str, action: str, entity_type: str, | |
entity_id: Any = None, entity_name: str = None, details: str = "") -> None: | |
"""Add activity log entry""" | |
if cls._instance is None: | |
cls.get() | |
entry = ActivityLogEntry( | |
timestamp=datetime.now().isoformat() + "Z", | |
username=username, | |
action=action, | |
entity_type=entity_type, | |
entity_id=entity_id, | |
entity_name=entity_name, | |
details=details | |
) | |
cls._instance.activity_log.append(entry) | |
# Keep only last 1000 entries | |
if len(cls._instance.activity_log) > 1000: | |
cls._instance.activity_log = cls._instance.activity_log[-1000:] | |
def get_project(cls, project_id: int) -> Optional[ProjectConfig]: | |
"""Get project by ID""" | |
if cls._instance is None: | |
cls.get() | |
return next((p for p in cls._instance.projects if p.id == project_id), None) | |
def get_project_by_name(cls, name: str) -> Optional[ProjectConfig]: | |
"""Get project by name""" | |
if cls._instance is None: | |
cls.get() | |
return next((p for p in cls._instance.projects if p.name == name), None) | |
def create_project(cls, project_data: dict, username: str) -> ProjectConfig: | |
"""Create new project""" | |
if cls._instance is None: | |
cls.get() | |
# Check duplicate name | |
existing = [p.name for p in cls._instance.projects if not getattr(p, 'deleted', False)] | |
if project_data['name'] in existing: | |
raise ValueError(f"Project name '{project_data['name']}' already exists") | |
# Get new project ID | |
project_id = cls._instance.project_id_counter | |
cls._instance.project_id_counter += 1 | |
# Create project | |
new_project = ProjectConfig( | |
id=project_id, | |
name=project_data['name'], | |
caption=project_data.get('caption', ''), | |
icon=project_data.get('icon', 'folder'), | |
description=project_data.get('description', ''), | |
default_language=project_data.get('default_language', 'TΓΌrkΓ§e'), | |
supported_languages=project_data.get('supported_languages', ['tr-TR']), | |
timezone=project_data.get('timezone', 'Europe/Istanbul'), | |
region=project_data.get('region', 'tr-TR'), | |
enabled=True, | |
deleted=False, | |
version_id_counter=2, # Start from 2 since we create version 1 | |
last_version_number=1, | |
created_date=datetime.now().isoformat() + "Z", | |
created_by=username, | |
last_update_date=datetime.now().isoformat() + "Z", | |
last_update_user=username, | |
versions=[ | |
VersionConfig( | |
id=1, | |
version_number=1, | |
no=1, | |
caption="Version 1", | |
published=False, | |
created_date=datetime.now().isoformat() + "Z", | |
created_by=username, | |
last_update_date=datetime.now().isoformat() + "Z", | |
last_update_user=username, | |
general_prompt="", | |
llm=LLMConfig( | |
repo_id="Qwen/Qwen2.5-72B-Instruct", | |
generation_config={ | |
"temperature": 0.5, | |
"max_tokens": 2048, | |
"top_p": 0.7, | |
"repetition_penalty": 1.1 | |
}, | |
use_fine_tune=False, | |
fine_tune_zip="" | |
), | |
intents=[] | |
) | |
] | |
) | |
cls._instance.projects.append(new_project) | |
# Add activity log | |
cls.add_activity_log(username, "CREATE_PROJECT", "project", project_id, project_data['name']) | |
# Save | |
cls._instance.save() | |
return new_project | |
def update_project(cls, project_id: int, update_data: dict, username: str) -> ProjectConfig: | |
"""Update project""" | |
if cls._instance is None: | |
cls.get() | |
project = cls.get_project(project_id) | |
if not project: | |
raise ValueError(f"Project {project_id} not found") | |
# Check race condition | |
if project.last_update_date != update_data.get('last_update_date'): | |
raise ValueError("Project was modified by another user") | |
# Update fields | |
project.caption = update_data['caption'] | |
project.icon = update_data.get('icon', project.icon) | |
project.description = update_data.get('description', project.description) | |
project.default_language = update_data.get('default_language', project.default_language) | |
project.supported_languages = update_data.get('supported_languages', project.supported_languages) | |
project.timezone = update_data.get('timezone', project.timezone) | |
project.region = update_data.get('region', project.region) | |
project.last_update_date = datetime.now().isoformat() + "Z" | |
project.last_update_user = username | |
# Add activity log | |
cls.add_activity_log(username, "UPDATE_PROJECT", "project", project_id, project.name) | |
# Save | |
cls._instance.save() | |
return project | |
def delete_project(cls, project_id: int, username: str) -> None: | |
"""Delete project (soft delete)""" | |
if cls._instance is None: | |
cls.get() | |
project = cls.get_project(project_id) | |
if not project: | |
raise ValueError(f"Project {project_id} not found") | |
project.deleted = True | |
project.last_update_date = datetime.now().isoformat() + "Z" | |
project.last_update_user = username | |
# Add activity log | |
cls.add_activity_log(username, "DELETE_PROJECT", "project", project_id, project.name) | |
# Save | |
cls._instance.save() | |
def toggle_project(cls, project_id: int, username: str) -> bool: | |
"""Toggle project enabled status""" | |
if cls._instance is None: | |
cls.get() | |
project = cls.get_project(project_id) | |
if not project: | |
raise ValueError(f"Project {project_id} not found") | |
project.enabled = not project.enabled | |
project.last_update_date = datetime.now().isoformat() + "Z" | |
project.last_update_user = username | |
# Add activity log | |
action = "ENABLE_PROJECT" if project.enabled else "DISABLE_PROJECT" | |
cls.add_activity_log(username, action, "project", project_id, project.name) | |
# Save | |
cls._instance.save() | |
return project.enabled | |
def create_version(cls, project_id: int, version_data: dict, username: str) -> VersionConfig: | |
"""Create new version""" | |
if cls._instance is None: | |
cls.get() | |
project = cls.get_project(project_id) | |
if not project: | |
raise ValueError(f"Project {project_id} not found") | |
# Get next version ID | |
version_id = project.version_id_counter | |
project.version_id_counter += 1 | |
# Get next version number | |
existing_versions = [v for v in project.versions if not getattr(v, 'deleted', False)] | |
version_no = max([v.no for v in existing_versions], default=0) + 1 | |
# Create base version | |
new_version = VersionConfig( | |
id=version_id, | |
version_number=version_id, | |
no=version_no, | |
caption=version_data['caption'], | |
description=f"Version {version_no}", | |
published=False, | |
deleted=False, | |
created_date=datetime.now().isoformat() + "Z", | |
created_by=username, | |
last_update_date=datetime.now().isoformat() + "Z", | |
last_update_user=username, | |
general_prompt="", | |
llm=LLMConfig( | |
repo_id="", | |
generation_config={ | |
"max_new_tokens": 512, | |
"temperature": 0.7, | |
"top_p": 0.95, | |
"top_k": 50, | |
"repetition_penalty": 1.1 | |
}, | |
use_fine_tune=False, | |
fine_tune_zip="" | |
), | |
intents=[] | |
) | |
# Copy from source version if specified | |
if version_data.get('source_version_id'): | |
source_version = next( | |
(v for v in project.versions if v.id == version_data['source_version_id']), | |
None | |
) | |
if source_version: | |
new_version.general_prompt = source_version.general_prompt | |
new_version.llm = source_version.llm.model_copy(deep=True) | |
new_version.intents = [i.model_copy(deep=True) for i in source_version.intents] | |
project.versions.append(new_version) | |
project.last_update_date = datetime.now().isoformat() + "Z" | |
project.last_update_user = username | |
# Add activity log | |
cls.add_activity_log(username, "CREATE_VERSION", "version", version_id, | |
f"{project.name} v{version_no}") | |
# Save | |
cls._instance.save() | |
return new_version | |
def update_version(cls, project_id: int, version_id: int, update_data: dict, username: str) -> VersionConfig: | |
"""Update version""" | |
if cls._instance is None: | |
cls.get() | |
project = cls.get_project(project_id) | |
if not project: | |
raise ValueError(f"Project {project_id} not found") | |
version = next((v for v in project.versions if v.id == version_id), None) | |
if not version: | |
raise ValueError(f"Version {version_id} not found") | |
# Check race condition | |
if version.last_update_date != update_data.get('last_update_date'): | |
raise ValueError("Version was modified by another user") | |
# Cannot update published version | |
if version.published: | |
raise ValueError("Cannot modify published version") | |
# Update version | |
version.caption = update_data['caption'] | |
version.general_prompt = update_data['general_prompt'] | |
version.llm = LLMConfig(**update_data['llm']) | |
version.intents = [IntentConfig(**i) for i in update_data['intents']] | |
version.last_update_date = datetime.now().isoformat() + "Z" | |
version.last_update_user = username | |
# Update project timestamp | |
project.last_update_date = datetime.now().isoformat() + "Z" | |
project.last_update_user = username | |
# Add activity log | |
cls.add_activity_log(username, "UPDATE_VERSION", "version", version_id, | |
f"{project.name} v{version.no}") | |
# Save | |
cls._instance.save() | |
return version | |
def publish_version(cls, project_id: int, version_id: int, username: str) -> tuple[ProjectConfig, VersionConfig]: | |
"""Publish version""" | |
if cls._instance is None: | |
cls.get() | |
project = cls.get_project(project_id) | |
if not project: | |
raise ValueError(f"Project {project_id} not found") | |
version = next((v for v in project.versions if v.id == version_id), None) | |
if not version: | |
raise ValueError(f"Version {version_id} not found") | |
# Unpublish all other versions | |
for v in project.versions: | |
if v.id != version_id: | |
v.published = False | |
# Publish this version | |
version.published = True | |
version.publish_date = datetime.now().isoformat() + "Z" | |
version.published_by = username | |
version.last_update_date = datetime.now().isoformat() + "Z" | |
version.last_update_user = username | |
# Update project timestamp | |
project.last_update_date = datetime.now().isoformat() + "Z" | |
project.last_update_user = username | |
# Add activity log | |
cls.add_activity_log(username, "PUBLISH_VERSION", "version", version_id, | |
f"{project.name} v{version.no}") | |
# Save | |
cls._instance.save() | |
return project, version | |
def delete_version(cls, project_id: int, version_id: int, username: str) -> None: | |
"""Delete version (soft delete)""" | |
if cls._instance is None: | |
cls.get() | |
project = cls.get_project(project_id) | |
if not project: | |
raise ValueError(f"Project {project_id} not found") | |
version = next((v for v in project.versions if v.id == version_id), None) | |
if not version: | |
raise ValueError(f"Version {version_id} not found") | |
# Cannot delete published version | |
if version.published: | |
raise ValueError("Cannot delete published version") | |
version.deleted = True | |
version.last_update_date = datetime.now().isoformat() + "Z" | |
version.last_update_user = username | |
project.last_update_date = datetime.now().isoformat() + "Z" | |
project.last_update_user = username | |
# Add activity log | |
cls.add_activity_log(username, "DELETE_VERSION", "version", version_id, | |
f"{project.name} v{version.no}") | |
# Save | |
cls._instance.save() | |
def create_api(cls, api_data: dict, username: str) -> APIConfig: | |
"""Create new API""" | |
if cls._instance is None: | |
cls.get() | |
# Check duplicate name | |
existing = [a.name for a in cls._instance.apis if not getattr(a, 'deleted', False)] | |
if api_data['name'] in existing: | |
raise ValueError(f"API name '{api_data['name']}' already exists") | |
# Create API | |
new_api = APIConfig( | |
**api_data, | |
deleted=False, | |
created_date=datetime.now().isoformat() + "Z", | |
created_by=username, | |
last_update_date=datetime.now().isoformat() + "Z", | |
last_update_user=username | |
) | |
cls._instance.apis.append(new_api) | |
cls._instance.build_index() # Rebuild index | |
# Add activity log | |
cls.add_activity_log(username, "CREATE_API", "api", None, api_data['name']) | |
# Save | |
cls._instance.save() | |
return new_api | |
def update_api(cls, api_name: str, update_data: dict, username: str) -> APIConfig: | |
"""Update API""" | |
if cls._instance is None: | |
cls.get() | |
api = cls._instance.get_api(api_name) | |
if not api: | |
raise ValueError(f"API '{api_name}' not found") | |
# Check race condition | |
if api.last_update_date != update_data.get('last_update_date'): | |
raise ValueError("API was modified by another user") | |
# Check if API is in use in published versions | |
for project in cls._instance.projects: | |
for version in project.versions: | |
if version.published: | |
for intent in version.intents: | |
if intent.action == api_name: | |
raise ValueError(f"API is used in published version of project '{project.name}'") | |
# Update API | |
for key, value in update_data.items(): | |
if key != 'last_update_date' and hasattr(api, key): | |
setattr(api, key, value) | |
api.last_update_date = datetime.now().isoformat() + "Z" | |
api.last_update_user = username | |
# Add activity log | |
cls.add_activity_log(username, "UPDATE_API", "api", None, api_name) | |
# Save | |
cls._instance.save() | |
return api | |
def delete_api(cls, api_name: str, username: str) -> None: | |
"""Delete API (soft delete)""" | |
if cls._instance is None: | |
cls.get() | |
api = cls._instance.get_api(api_name) | |
if not api: | |
raise ValueError(f"API '{api_name}' not found") | |
# Check if API is in use | |
for project in cls._instance.projects: | |
if getattr(project, 'deleted', False): | |
continue | |
for version in project.versions: | |
if getattr(version, 'deleted', False): | |
continue | |
for intent in version.intents: | |
if intent.action == api_name: | |
raise ValueError(f"API is used in intent '{intent.name}' in project '{project.name}' version {version.no}") | |
api.deleted = True | |
api.last_update_date = datetime.now().isoformat() + "Z" | |
api.last_update_user = username | |
# Add activity log | |
cls.add_activity_log(username, "DELETE_API", "api", None, api_name) | |
# Save | |
cls._instance.save() | |
def import_project(cls, project_data: dict, username: str) -> ProjectConfig: | |
"""Import project from JSON""" | |
if cls._instance is None: | |
cls.get() | |
# Validate structure | |
if "name" not in project_data: | |
raise ValueError("Invalid project data") | |
# Create new project with imported data | |
imported_data = { | |
"name": project_data["name"], | |
"caption": project_data.get("caption", ""), | |
"icon": project_data.get("icon", "folder"), | |
"description": project_data.get("description", "") | |
} | |
# Create project | |
new_project = cls.create_project(imported_data, username) | |
# Clear default version | |
new_project.versions = [] | |
# Import versions | |
for idx, version_data in enumerate(project_data.get("versions", [])): | |
new_version = VersionConfig( | |
id=idx + 1, | |
version_number=idx + 1, | |
no=idx + 1, | |
caption=version_data.get("caption", f"Version {idx + 1}"), | |
description=version_data.get("description", ""), | |
published=False, | |
deleted=False, | |
created_date=datetime.now().isoformat() + "Z", | |
created_by=username, | |
last_update_date=datetime.now().isoformat() + "Z", | |
last_update_user=username, | |
general_prompt=version_data.get("general_prompt", ""), | |
llm=LLMConfig(**version_data.get("llm", {})) if version_data.get("llm") else LLMConfig( | |
repo_id="", | |
generation_config={}, | |
use_fine_tune=False, | |
fine_tune_zip="" | |
), | |
intents=[IntentConfig(**i) for i in version_data.get("intents", [])] | |
) | |
new_project.versions.append(new_version) | |
new_project.version_id_counter = idx + 2 | |
# Add activity log (already added in create_project) | |
cls.add_activity_log(username, "IMPORT_PROJECT", "project", new_project.id, new_project.name) | |
# Save | |
cls._instance.save() | |
return new_project | |
def export_project(cls, project_id: int, username: str) -> dict: | |
"""Export project as JSON""" | |
if cls._instance is None: | |
cls.get() | |
project = cls.get_project(project_id) | |
if not project: | |
raise ValueError(f"Project {project_id} not found") | |
# Create export data | |
export_data = { | |
"name": project.name, | |
"caption": project.caption, | |
"icon": project.icon, | |
"description": project.description, | |
"versions": [] | |
} | |
# Export versions | |
for version in project.versions: | |
if not getattr(version, 'deleted', False): | |
export_version = { | |
"caption": version.caption, | |
"description": getattr(version, 'description', ''), | |
"general_prompt": version.general_prompt, | |
"llm": version.llm.model_dump(), | |
"intents": [i.model_dump() for i in version.intents] | |
} | |
export_data["versions"].append(export_version) | |
# Add activity log | |
cls.add_activity_log(username, "EXPORT_PROJECT", "project", project_id, project.name) | |
cls._instance.save() | |
return export_data | |
def cleanup_activity_logs(cls, keep_count: int = 1000) -> None: | |
"""Cleanup old activity logs""" | |
if cls._instance is None: | |
cls.get() | |
if len(cls._instance.activity_log) > keep_count: | |
# Keep only last entries | |
cls._instance.activity_log = cls._instance.activity_log[-keep_count:] | |
cls._instance.save() | |
# Forward references | |
GlobalConfig.model_rebuild() | |
VersionConfig.model_rebuild() | |
ServiceConfig.model_rebuild() |