flare / intent_utils.py
ciyidogan's picture
Upload 22 files
cb61e8e verified
raw
history blame
7.44 kB
import os
import torch
import json
import shutil
import re
import traceback
from datasets import Dataset
from transformers import (
AutoTokenizer,
AutoModelForSequenceClassification,
Trainer,
TrainingArguments,
default_data_collator,
AutoConfig,
)
from log import log
from core import llm_models
async def detect_intent(text, project_name):
llm_model_instance = llm_models.get(project_name)
if not llm_model_instance or not llm_model_instance.intent_model:
raise Exception(f"'{project_name}' için intent modeli yüklenmemiş.")
tokenizer = llm_model_instance.intent_tokenizer
model = llm_model_instance.intent_model
label2id = llm_model_instance.intent_label2id
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
outputs = model(**inputs)
predicted_id = outputs.logits.argmax(dim=-1).item()
detected_intent = [k for k, v in label2id.items() if v == predicted_id][0]
confidence = outputs.logits.softmax(dim=-1).max().item()
return detected_intent, confidence
def background_training(project_name, intents, model_id, output_path, confidence_threshold):
try:
log(f"🔧 Intent eğitimi başlatıldı (proje: {project_name})")
texts, labels, label2id = [], [], {}
for idx, intent in enumerate(intents):
label2id[intent["name"]] = idx
for ex in intent["examples"]:
texts.append(ex)
labels.append(idx)
dataset = Dataset.from_dict({"text": texts, "label": labels})
tokenizer = AutoTokenizer.from_pretrained(model_id)
config = AutoConfig.from_pretrained(model_id)
config.problem_type = "single_label_classification"
config.num_labels = len(label2id)
model = AutoModelForSequenceClassification.from_pretrained(model_id, config=config)
tokenized_data = {"input_ids": [], "attention_mask": [], "label": []}
for row in dataset:
out = tokenizer(row["text"], truncation=True, padding="max_length", max_length=128)
tokenized_data["input_ids"].append(out["input_ids"])
tokenized_data["attention_mask"].append(out["attention_mask"])
tokenized_data["label"].append(row["label"])
tokenized = Dataset.from_dict(tokenized_data)
tokenized.set_format(type="torch", columns=["input_ids", "attention_mask", "label"])
if os.path.exists(output_path):
shutil.rmtree(output_path)
os.makedirs(output_path, exist_ok=True)
trainer = Trainer(
model=model,
args=TrainingArguments(output_path, per_device_train_batch_size=4, num_train_epochs=3, logging_steps=10, save_strategy="no", report_to=[]),
train_dataset=tokenized,
data_collator=default_data_collator,
)
trainer.train()
log("🔧 Başarı raporu üretiliyor...")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
input_ids_tensor = torch.tensor(tokenized["input_ids"]).to(device)
attention_mask_tensor = torch.tensor(tokenized["attention_mask"]).to(device)
with torch.no_grad():
outputs = model(input_ids=input_ids_tensor, attention_mask=attention_mask_tensor)
predictions = outputs.logits.argmax(dim=-1).tolist()
actuals = tokenized["label"]
counts, correct = {}, {}
for pred, actual in zip(predictions, actuals):
intent_name = list(label2id.keys())[list(label2id.values()).index(actual)]
counts[intent_name] = counts.get(intent_name, 0) + 1
if pred == actual:
correct[intent_name] = correct.get(intent_name, 0) + 1
for intent_name, total in counts.items():
accuracy = correct.get(intent_name, 0) / total
log(f"📊 Intent '{intent_name}' doğruluk: {accuracy:.2f}{total} örnek")
if accuracy < confidence_threshold or total < 5:
log(f"⚠️ Yetersiz performanslı intent: '{intent_name}' — Doğruluk: {accuracy:.2f}, Örnek: {total}")
# Eğitim sonrası model ve tokenizer'ı diske kaydet
model.save_pretrained(output_path)
tokenizer.save_pretrained(output_path)
with open(os.path.join(output_path, "label2id.json"), "w") as f:
json.dump(label2id, f)
log(f"✅ Intent eğitimi tamamlandı ve '{project_name}' için model disk üzerinde hazır.")
except Exception as e:
log(f"❌ Intent eğitimi hatası: {e}")
traceback.print_exc()
def extract_parameters(variables_list, user_input):
extracted_params = []
for pattern in variables_list:
# Örneğin: from_location:{Ankara} to_location:{İstanbul}
regex = re.sub(r"(\w+):\{(.+?)\}", r"(?P<\1>.+?)", pattern)
match = re.match(regex, user_input)
if match:
extracted_params = [{"key": k, "value": v} for k, v in match.groupdict().items()]
break
# Ek özel basit yakalama: iki şehir birden yazılırsa → sırayla atama
if not extracted_params:
city_pattern = r"(\bAnkara\b|\bİstanbul\b|\bİzmir\b)"
cities = re.findall(city_pattern, user_input)
if len(cities) >= 2:
extracted_params = [
{"key": "from_location", "value": cities[0]},
{"key": "to_location", "value": cities[1]}
]
return extracted_params
def resolve_placeholders(text: str, session: dict, variables: dict) -> str:
def replacer(match):
full = match.group(1)
try:
if full.startswith("variables."):
key = full.split(".", 1)[1]
return str(variables.get(key, f"{{{full}}}"))
elif full.startswith("session."):
key = full.split(".", 1)[1]
return str(session.get("variables", {}).get(key, f"{{{full}}}"))
elif full.startswith("auth_tokens."):
parts = full.split(".")
if len(parts) == 3:
intent, token_type = parts[1], parts[2]
return str(session.get("auth_tokens", {}).get(intent, {}).get(token_type, f"{{{full}}}"))
else:
return f"{{{full}}}"
else:
return f"{{{full}}}"
except Exception:
return f"{{{full}}}"
return re.sub(r"\{([^{}]+)\}", replacer, text)
def validate_variable_formats(variables, variable_format_map, data_formats):
errors = {}
for var_name, format_name in variable_format_map.items():
value = variables.get(var_name)
if value is None:
continue
format_def = data_formats.get(format_name)
if not format_def:
continue
if "valid_options" in format_def:
if value not in format_def["valid_options"]:
errors[var_name] = format_def.get("error_message", f"{var_name} değeri geçersiz.")
elif "pattern" in format_def:
if not re.fullmatch(format_def["pattern"], value):
errors[var_name] = format_def.get("error_message", f"{var_name} formatı geçersiz.")
return len(errors) == 0, errors