BF-WAB / media_queue_v2.py
SamiKoen's picture
Add Media Queue V2 with global cache - preserves media URLs
acecc24
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
WhatsApp Media Queue V2 - Global Cache ile
Medya URL'lerini kaybetmeden, mesajları akıllıca birleştirir
"""
import time
import threading
from typing import Dict, Optional, List, Tuple
import logging
logger = logging.getLogger(__name__)
class MediaQueueV2:
"""
Media Queue V2 - Basit ve güvenilir
"""
def __init__(self, wait_time: float = 3.0):
"""
Args:
wait_time: Medya sonrası bekleme süresi (saniye)
"""
self.wait_time = wait_time
# Global media cache - {user_id: {"media_urls": [...], "media_types": [...], "timestamp": ..., "caption": ...}}
self.media_cache: Dict[str, Dict] = {}
# Timer'lar - {user_id: threading.Timer}
self.timers: Dict[str, threading.Timer] = {}
# Lock for thread safety
self.lock = threading.Lock()
def handle_media(self, user_id: str, media_urls: List[str], media_types: List[str], caption: str = "") -> str:
"""
Medya mesajı geldiğinde çağrılır
Args:
user_id: WhatsApp kullanıcı numarası
media_urls: Medya URL listesi
media_types: Medya tipi listesi
caption: Medya başlığı (varsa)
Returns:
Kullanıcıya gönderilecek bekleme mesajı
"""
with self.lock:
# Önceki timer'ı iptal et (varsa)
self._cancel_timer(user_id)
# Media'yı cache'e kaydet
self.media_cache[user_id] = {
"media_urls": media_urls,
"media_types": media_types,
"caption": caption,
"timestamp": time.time()
}
logger.info(f"📸 {user_id}: Medya cache'e kaydedildi - {len(media_urls)} dosya")
# Yeni timer başlat
self._start_timer(user_id)
# Medya tipine göre bekleme mesajı
if media_types and "image" in media_types[0].lower():
return "🖼️ Görsel alındı. Hakkında sormak istediğiniz bir şey var mı?"
elif media_types and "video" in media_types[0].lower():
return "🎥 Video alındı. Hakkında sormak istediğiniz bir şey var mı?"
elif media_types and "audio" in media_types[0].lower():
return "🎵 Ses dosyası alındı. Hakkında sormak istediğiniz bir şey var mı?"
else:
return "📎 Dosya alındı. Hakkında sormak istediğiniz bir şey var mı?"
def handle_text(self, user_id: str, text: str) -> Tuple[Optional[str], Optional[List[str]], Optional[List[str]]]:
"""
Metin mesajı geldiğinde çağrılır
Args:
user_id: WhatsApp kullanıcı numarası
text: Metin mesajı
Returns:
(combined_text, media_urls, media_types) veya (None, None, None)
"""
with self.lock:
# Cache'de medya var mı?
if user_id in self.media_cache:
# Timer'ı iptal et
self._cancel_timer(user_id)
# Cache'den medya bilgilerini al
cached = self.media_cache[user_id]
media_urls = cached["media_urls"]
media_types = cached["media_types"]
caption = cached["caption"]
# Cache'i temizle
del self.media_cache[user_id]
# Mesajları birleştir
combined_text = self._combine_messages(caption, text)
logger.info(f"✅ {user_id}: Medya + metin birleştirildi")
logger.info(f" Birleşik mesaj: {combined_text[:100]}...")
logger.info(f" Medya URL'leri: {media_urls}")
return combined_text, media_urls, media_types
# Cache'de medya yoksa normal metin olarak dön
logger.info(f"💬 {user_id}: Normal metin mesajı (cache'de medya yok)")
return None, None, None
def _combine_messages(self, caption: str, text: str) -> str:
"""
Caption ve metin mesajını birleştir
Args:
caption: Medya başlığı
text: Kullanıcının sonraki mesajı
Returns:
Birleştirilmiş mesaj
"""
parts = []
if caption and caption.strip():
parts.append(caption.strip())
if text and text.strip():
parts.append(text.strip())
# Eğer hiç metin yoksa varsayılan
if not parts:
return "Bu görseli analiz et."
# Birleştir
combined = " ".join(parts)
# Eğer sadece caption varsa ve soru işareti yoksa, analiz isteği ekle
if len(parts) == 1 and "?" not in combined and len(combined) < 20:
combined += ". Bu hakkında bilgi ver."
return combined
def _start_timer(self, user_id: str):
"""Timeout timer'ını başlat"""
timer = threading.Timer(self.wait_time, self._handle_timeout, args=[user_id])
timer.start()
self.timers[user_id] = timer
logger.debug(f"⏰ {user_id}: {self.wait_time}s timer başlatıldı")
def _cancel_timer(self, user_id: str):
"""Timer'ı iptal et"""
if user_id in self.timers:
self.timers[user_id].cancel()
del self.timers[user_id]
logger.debug(f"⏰ {user_id}: Timer iptal edildi")
def _handle_timeout(self, user_id: str):
"""
Timeout olduğunda çağrılır
Sadece medyayı işlemek için kullanılabilir
"""
with self.lock:
if user_id in self.media_cache:
logger.info(f"⏱️ {user_id}: Timeout - metin beklenmedi, sadece medya işlenecek")
# Burada cache'i temizlemiyoruz, bir sonraki mesajda kullanılabilir
# Ama 5 dakikadan eski cache'leri temizleyebiliriz
self._cleanup_old_cache()
def _cleanup_old_cache(self):
"""5 dakikadan eski cache'leri temizle"""
current_time = time.time()
expired_users = []
for user_id, cached in self.media_cache.items():
if current_time - cached["timestamp"] > 300: # 5 dakika
expired_users.append(user_id)
for user_id in expired_users:
del self.media_cache[user_id]
logger.debug(f"🗑️ {user_id}: Eski cache temizlendi")
def get_cache_status(self, user_id: str) -> bool:
"""
Kullanıcının cache'inde medya var mı?
Args:
user_id: WhatsApp kullanıcı numarası
Returns:
True: Cache'de medya var
False: Cache boş
"""
with self.lock:
return user_id in self.media_cache
def clear_user_cache(self, user_id: str):
"""
Belirli bir kullanıcının cache'ini temizle
Args:
user_id: WhatsApp kullanıcı numarası
"""
with self.lock:
if user_id in self.media_cache:
del self.media_cache[user_id]
logger.info(f"🗑️ {user_id}: Cache manuel olarak temizlendi")
self._cancel_timer(user_id)
# Global instance
media_queue = MediaQueueV2(wait_time=3.0)
# Test fonksiyonu
def test_media_queue():
"""Test senaryoları"""
print("\n" + "="*60)
print("Media Queue V2 Test")
print("="*60)
# Test 1: Medya + Metin
print("\n📱 Test 1: Görsel + Soru")
print("-"*40)
# Görsel gönder
response = media_queue.handle_media(
"user123",
["https://example.com/image.jpg"],
["image/jpeg"],
"Bugünkü bisiklet"
)
print(f"Bot: {response}")
# 1 saniye bekle
time.sleep(1)
# Soru sor
combined, urls, types = media_queue.handle_text("user123", "Fiyatı ne kadar?")
if combined:
print(f"\n✅ Birleştirildi:")
print(f" Mesaj: {combined}")
print(f" URLs: {urls}")
print(f" Types: {types}")
# Test 2: Sadece metin
print("\n📱 Test 2: Sadece Metin")
print("-"*40)
combined, urls, types = media_queue.handle_text("user456", "Merhaba")
if combined:
print("Birleştirilmiş mesaj var")
else:
print("Normal metin mesajı (birleştirme yok)")
# Test 3: Medya + Timeout
print("\n📱 Test 3: Görsel + Timeout")
print("-"*40)
response = media_queue.handle_media(
"user789",
["https://example.com/image2.jpg"],
["image/jpeg"],
""
)
print(f"Bot: {response}")
print("3 saniye bekleniyor...")
time.sleep(4)
# Timeout sonrası cache durumu
if media_queue.get_cache_status("user789"):
print("✅ Medya hala cache'de (bir sonraki mesajda kullanılabilir)")
else:
print("❌ Cache temizlenmiş")
if __name__ == "__main__":
test_media_queue()