Spaces:
Running
Running
File size: 7,442 Bytes
cb61e8e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
import os
import torch
import json
import shutil
import re
import traceback
from datasets import Dataset
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
Trainer,
TrainingArguments,
default_data_collator,
AutoConfig,
)
from log import log
from core import llm_models
async def detect_intent(text, project_name):
llm_model_instance = llm_models.get(project_name)
if not llm_model_instance or not llm_model_instance.intent_model:
raise Exception(f"'{project_name}' için intent modeli yüklenmemiş.")
tokenizer = llm_model_instance.intent_tokenizer
model = llm_model_instance.intent_model
label2id = llm_model_instance.intent_label2id
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
outputs = model(**inputs)
predicted_id = outputs.logits.argmax(dim=-1).item()
detected_intent = [k for k, v in label2id.items() if v == predicted_id][0]
confidence = outputs.logits.softmax(dim=-1).max().item()
return detected_intent, confidence
def background_training(project_name, intents, model_id, output_path, confidence_threshold):
try:
log(f"🔧 Intent eğitimi başlatıldı (proje: {project_name})")
texts, labels, label2id = [], [], {}
for idx, intent in enumerate(intents):
label2id[intent["name"]] = idx
for ex in intent["examples"]:
texts.append(ex)
labels.append(idx)
dataset = Dataset.from_dict({"text": texts, "label": labels})
tokenizer = AutoTokenizer.from_pretrained(model_id)
config = AutoConfig.from_pretrained(model_id)
config.problem_type = "single_label_classification"
config.num_labels = len(label2id)
model = AutoModelForSequenceClassification.from_pretrained(model_id, config=config)
tokenized_data = {"input_ids": [], "attention_mask": [], "label": []}
for row in dataset:
out = tokenizer(row["text"], truncation=True, padding="max_length", max_length=128)
tokenized_data["input_ids"].append(out["input_ids"])
tokenized_data["attention_mask"].append(out["attention_mask"])
tokenized_data["label"].append(row["label"])
tokenized = Dataset.from_dict(tokenized_data)
tokenized.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
if os.path.exists(output_path):
shutil.rmtree(output_path)
os.makedirs(output_path, exist_ok=True)
trainer = Trainer(
model=model,
args=TrainingArguments(output_path, per_device_train_batch_size=4, num_train_epochs=3, logging_steps=10, save_strategy="no", report_to=[]),
train_dataset=tokenized,
data_collator=default_data_collator,
)
trainer.train()
log("🔧 Başarı raporu üretiliyor...")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
input_ids_tensor = torch.tensor(tokenized["input_ids"]).to(device)
attention_mask_tensor = torch.tensor(tokenized["attention_mask"]).to(device)
with torch.no_grad():
outputs = model(input_ids=input_ids_tensor, attention_mask=attention_mask_tensor)
predictions = outputs.logits.argmax(dim=-1).tolist()
actuals = tokenized["label"]
counts, correct = {}, {}
for pred, actual in zip(predictions, actuals):
intent_name = list(label2id.keys())[list(label2id.values()).index(actual)]
counts[intent_name] = counts.get(intent_name, 0) + 1
if pred == actual:
correct[intent_name] = correct.get(intent_name, 0) + 1
for intent_name, total in counts.items():
accuracy = correct.get(intent_name, 0) / total
log(f"📊 Intent '{intent_name}' doğruluk: {accuracy:.2f} — {total} örnek")
if accuracy < confidence_threshold or total < 5:
log(f"⚠️ Yetersiz performanslı intent: '{intent_name}' — Doğruluk: {accuracy:.2f}, Örnek: {total}")
# Eğitim sonrası model ve tokenizer'ı diske kaydet
model.save_pretrained(output_path)
tokenizer.save_pretrained(output_path)
with open(os.path.join(output_path, "label2id.json"), "w") as f:
json.dump(label2id, f)
log(f"✅ Intent eğitimi tamamlandı ve '{project_name}' için model disk üzerinde hazır.")
except Exception as e:
log(f"❌ Intent eğitimi hatası: {e}")
traceback.print_exc()
def extract_parameters(variables_list, user_input):
extracted_params = []
for pattern in variables_list:
# Örneğin: from_location:{Ankara} to_location:{İstanbul}
regex = re.sub(r"(\w+):\{(.+?)\}", r"(?P<\1>.+?)", pattern)
match = re.match(regex, user_input)
if match:
extracted_params = [{"key": k, "value": v} for k, v in match.groupdict().items()]
break
# Ek özel basit yakalama: iki şehir birden yazılırsa → sırayla atama
if not extracted_params:
city_pattern = r"(\bAnkara\b|\bİstanbul\b|\bİzmir\b)"
cities = re.findall(city_pattern, user_input)
if len(cities) >= 2:
extracted_params = [
{"key": "from_location", "value": cities[0]},
{"key": "to_location", "value": cities[1]}
]
return extracted_params
def resolve_placeholders(text: str, session: dict, variables: dict) -> str:
def replacer(match):
full = match.group(1)
try:
if full.startswith("variables."):
key = full.split(".", 1)[1]
return str(variables.get(key, f"{{{full}}}"))
elif full.startswith("session."):
key = full.split(".", 1)[1]
return str(session.get("variables", {}).get(key, f"{{{full}}}"))
elif full.startswith("auth_tokens."):
parts = full.split(".")
if len(parts) == 3:
intent, token_type = parts[1], parts[2]
return str(session.get("auth_tokens", {}).get(intent, {}).get(token_type, f"{{{full}}}"))
else:
return f"{{{full}}}"
else:
return f"{{{full}}}"
except Exception:
return f"{{{full}}}"
return re.sub(r"\{([^{}]+)\}", replacer, text)
def validate_variable_formats(variables, variable_format_map, data_formats):
errors = {}
for var_name, format_name in variable_format_map.items():
value = variables.get(var_name)
if value is None:
continue
format_def = data_formats.get(format_name)
if not format_def:
continue
if "valid_options" in format_def:
if value not in format_def["valid_options"]:
errors[var_name] = format_def.get("error_message", f"{var_name} değeri geçersiz.")
elif "pattern" in format_def:
if not re.fullmatch(format_def["pattern"], value):
errors[var_name] = format_def.get("error_message", f"{var_name} formatı geçersiz.")
return len(errors) == 0, errors
|