|
""" |
|
Professional Product Search Engine for Trek Chatbot |
|
Implements intelligent product matching with fuzzy search and NLP techniques |
|
""" |
|
|
|
import re |
|
from difflib import SequenceMatcher |
|
from typing import List, Tuple, Dict, Optional |
|
import unicodedata |
|
|
|
class ProductSearchEngine: |
|
"""Advanced product search with intelligent matching""" |
|
|
|
def __init__(self, products: List[Tuple]): |
|
""" |
|
Initialize with products list |
|
products: List of tuples (short_name, product_info, full_name) |
|
""" |
|
self.products = products |
|
self.product_index = self._build_index() |
|
|
|
def _build_index(self) -> Dict: |
|
"""Build search index for faster lookups""" |
|
index = { |
|
'by_name': {}, |
|
'by_words': {}, |
|
'by_category': {}, |
|
'by_model': {}, |
|
'normalized': {} |
|
} |
|
|
|
for product in self.products: |
|
short_name = product[0] |
|
full_name = product[2] |
|
|
|
|
|
normalized_full = self._normalize_text(full_name) |
|
normalized_short = self._normalize_text(short_name) |
|
|
|
|
|
index['by_name'][normalized_full] = product |
|
index['normalized'][normalized_full] = full_name |
|
|
|
|
|
words = normalized_full.split() |
|
for word in words: |
|
if len(word) > 2: |
|
if word not in index['by_words']: |
|
index['by_words'][word] = [] |
|
index['by_words'][word].append(product) |
|
|
|
|
|
model_match = re.search(r'\b(\d+\.?\d*)\b', full_name) |
|
if model_match: |
|
model_num = model_match.group(1) |
|
if model_num not in index['by_model']: |
|
index['by_model'][model_num] = [] |
|
index['by_model'][model_num].append(product) |
|
|
|
|
|
if words: |
|
category = words[0] |
|
if category not in index['by_category']: |
|
index['by_category'][category] = [] |
|
index['by_category'][category].append(product) |
|
|
|
return index |
|
|
|
def _normalize_text(self, text: str) -> str: |
|
"""Normalize text for better matching""" |
|
if not text: |
|
return "" |
|
|
|
|
|
text = text.lower() |
|
|
|
|
|
replacements = { |
|
'ı': 'i', 'İ': 'i', 'ş': 's', 'Ş': 's', |
|
'ğ': 'g', 'Ğ': 'g', 'ü': 'u', 'Ü': 'u', |
|
'ö': 'o', 'Ö': 'o', 'ç': 'c', 'Ç': 'c' |
|
} |
|
for tr_char, eng_char in replacements.items(): |
|
text = text.replace(tr_char, eng_char) |
|
|
|
|
|
text = re.sub(r'[^\w\s\d\.]', ' ', text) |
|
|
|
|
|
text = ' '.join(text.split()) |
|
|
|
return text |
|
|
|
def _calculate_similarity(self, str1: str, str2: str) -> float: |
|
"""Calculate similarity between two strings""" |
|
return SequenceMatcher(None, str1, str2).ratio() |
|
|
|
def search(self, query: str, threshold: float = 0.6) -> List[Tuple[float, Tuple]]: |
|
""" |
|
Search for products matching the query |
|
Returns list of (score, product) tuples sorted by relevance |
|
""" |
|
query_normalized = self._normalize_text(query) |
|
query_words = query_normalized.split() |
|
|
|
results = {} |
|
|
|
|
|
if query_normalized in self.product_index['by_name']: |
|
product = self.product_index['by_name'][query_normalized] |
|
results[id(product)] = (1.0, product) |
|
|
|
|
|
model_match = re.search(r'\b(\d+\.?\d*)\b', query) |
|
if model_match: |
|
model_num = model_match.group(1) |
|
if model_num in self.product_index['by_model']: |
|
for product in self.product_index['by_model'][model_num]: |
|
if id(product) not in results: |
|
|
|
score = 0.9 if model_num in product[2].lower() else 0.7 |
|
results[id(product)] = (score, product) |
|
|
|
|
|
word_matches = {} |
|
for word in query_words: |
|
if len(word) > 2 and word in self.product_index['by_words']: |
|
for product in self.product_index['by_words'][word]: |
|
if id(product) not in word_matches: |
|
word_matches[id(product)] = {'count': 0, 'product': product} |
|
word_matches[id(product)]['count'] += 1 |
|
|
|
|
|
for product_id, match_info in word_matches.items(): |
|
product = match_info['product'] |
|
matched_count = match_info['count'] |
|
total_query_words = len([w for w in query_words if len(w) > 2]) |
|
|
|
if total_query_words > 0: |
|
word_score = matched_count / total_query_words |
|
|
|
|
|
if matched_count == total_query_words: |
|
word_score = min(word_score * 1.2, 0.95) |
|
|
|
|
|
product_text = self._normalize_text(product[2]) |
|
if query_normalized in product_text: |
|
word_score = min(word_score * 1.3, 0.98) |
|
|
|
if id(product) not in results or results[id(product)][0] < word_score: |
|
results[id(product)] = (word_score, product) |
|
|
|
|
|
for product in self.products: |
|
product_normalized = self._normalize_text(product[2]) |
|
similarity = self._calculate_similarity(query_normalized, product_normalized) |
|
|
|
|
|
if query_normalized in product_normalized: |
|
similarity = max(similarity, 0.8) |
|
|
|
|
|
if all(word in product_normalized for word in query_words if len(word) > 2): |
|
similarity = max(similarity, 0.75) |
|
|
|
if similarity >= threshold: |
|
if id(product) not in results or results[id(product)][0] < similarity: |
|
results[id(product)] = (similarity, product) |
|
|
|
|
|
if not results and query_words: |
|
category = query_words[0] |
|
if category in self.product_index['by_category']: |
|
for product in self.product_index['by_category'][category]: |
|
results[id(product)] = (0.5, product) |
|
|
|
|
|
result_list = list(results.values()) |
|
result_list.sort(key=lambda x: x[0], reverse=True) |
|
|
|
return result_list |
|
|
|
def find_best_match(self, query: str) -> Optional[Tuple]: |
|
"""Find the single best matching product""" |
|
results = self.search(query) |
|
if results and results[0][0] >= 0.6: |
|
return results[0][1] |
|
return None |
|
|
|
def find_similar_products(self, product_name: str, limit: int = 5) -> List[Tuple]: |
|
"""Find products similar to the given product name""" |
|
results = self.search(product_name) |
|
similar = [] |
|
|
|
|
|
start_idx = 1 if results and results[0][0] > 0.95 else 0 |
|
|
|
for score, product in results[start_idx:start_idx + limit]: |
|
if score >= 0.5: |
|
similar.append(product) |
|
|
|
return similar |
|
|
|
def extract_product_context(self, query: str) -> Dict: |
|
"""Extract context from query (size, color, type, etc.)""" |
|
context = { |
|
'sizes': [], |
|
'colors': [], |
|
'types': [], |
|
'features': [], |
|
'price_range': None |
|
} |
|
|
|
|
|
size_patterns = [ |
|
r'\b(xs|s|m|l|xl|xxl|2xl|3xl)\b', |
|
r'\b(\d{2})\b(?=\s*beden|\s*numara|$)', |
|
r'\b(small|medium|large)\b' |
|
] |
|
for pattern in size_patterns: |
|
matches = re.findall(pattern, query.lower()) |
|
context['sizes'].extend(matches) |
|
|
|
|
|
colors = ['siyah', 'beyaz', 'mavi', 'kirmizi', 'yesil', 'gri', 'turuncu', |
|
'black', 'white', 'blue', 'red', 'green', 'grey', 'gray', 'orange'] |
|
for color in colors: |
|
if color in query.lower(): |
|
context['colors'].append(color) |
|
|
|
|
|
types = ['erkek', 'kadin', 'cocuk', 'yol', 'dag', 'sehir', 'elektrikli', |
|
'karbon', 'aluminyum', 'gravel', 'hybrid'] |
|
for type_word in types: |
|
if type_word in query.lower(): |
|
context['types'].append(type_word) |
|
|
|
|
|
features = ['disk fren', 'shimano', 'sram', 'karbon', 'aluminyum', |
|
'hidrolik', 'mekanik', '29 jant', '27.5 jant'] |
|
for feature in features: |
|
if feature in query.lower(): |
|
context['features'].append(feature) |
|
|
|
|
|
price_match = re.search(r'(\d+)\.?(\d*)\s*(bin|tl)', query.lower()) |
|
if price_match: |
|
price = float(price_match.group(1) + ('.' + price_match.group(2) if price_match.group(2) else '')) |
|
if 'bin' in price_match.group(3): |
|
price *= 1000 |
|
context['price_range'] = price |
|
|
|
return context |
|
|
|
def generate_suggestions(self, failed_query: str) -> List[str]: |
|
"""Generate suggestions for failed searches""" |
|
suggestions = [] |
|
query_normalized = self._normalize_text(failed_query) |
|
query_words = query_normalized.split() |
|
|
|
|
|
partial_matches = set() |
|
for word in query_words: |
|
if len(word) > 3: |
|
for product_word in self.product_index['by_words']: |
|
if word in product_word or product_word in word: |
|
partial_matches.add(product_word) |
|
|
|
|
|
for match in list(partial_matches)[:5]: |
|
if match in self.product_index['by_words']: |
|
products = self.product_index['by_words'][match] |
|
if products: |
|
suggestions.append(products[0][2]) |
|
|
|
|
|
for category in list(self.product_index['by_category'].keys())[:3]: |
|
if any(word in category for word in query_words): |
|
category_products = self.product_index['by_category'][category] |
|
if category_products: |
|
suggestions.append(category_products[0][2]) |
|
|
|
return list(set(suggestions))[:5] |