import re import time from concurrent.futures import ThreadPoolExecutor from app.llm.llm_completion import call_chat_model_openai from app.services.usage_logger import log_query_and_results def sanitize_markdown(md_text: str) -> str: return re.sub(r"!\[.*?\]\(.*?\)", "", md_text) def build_chat_fn(retriever, intent_classifier): def chat( question, history, media_type="movie", genres=None, providers=None, year_range=None, session_id=None, query_id=None, device_info=None ): full_t0 = time.time() with ThreadPoolExecutor() as executor: # Classify user intent to determine if it is a recommendation ask t0 = time.time() intent_future = executor.submit( lambda q: intent_classifier(q)[0]["label"] == "recommendation", question ) print(f"\n๐Ÿง  executor.submit(classify_intent) took {time.time() - t0:.3f}s") # Embed user query as dense vector asynchronously t0 = time.time() query_vector_future = executor.submit(retriever.embed_dense, question) print(f"๐Ÿงต executor.submit(embed_text) took {time.time() - t0:.3f}s") # Wait for results t0 = time.time() is_rec_intent = intent_future.result() print(f"โœ… classify_intent() result received in {time.time() - t0:.3f}s") t0 = time.time() dense_vector = query_vector_future.result() print(f"๐Ÿ“ˆ embed_text() result received in {time.time() - t0:.3f}s") # Embed user query as sparse vector for hybrid retrieval t0 = time.time() sparse_vector = retriever.embed_sparse(question, media_type) print(f"๐Ÿ“ˆ embed_sparse() result received in {time.time() - t0:.3f}s") if is_rec_intent: t0 = time.time() retrieved_movies, scored_lookup = retriever.retrieve_and_rerank( dense_vector, sparse_vector, media_type.lower(), genres, providers, year_range, ) print(f"\n๐Ÿ“š retrieve_and_rerank() took {time.time() - t0:.3f}s") query_entry = { "query_id": query_id, "session_id": session_id, "question": question, "intent": "recommendation", "media_type": media_type, "genres": genres, "providers": providers, "year_start": year_range[0], "year_end": year_range[1], "device_type": device_info.device_type, "platform" : device_info.platform, "user_agent": device_info.user_agent } result_entries = [] for rank, p in enumerate (retrieved_movies): s = scored_lookup[p.id] result_entries.append({ "query_id": query_id, "media_type": media_type, "media_id": p.payload["media_id"], "title": p.payload["title"], "rank": rank + 1, "dense_score": s["dense_score"], "sparse_score": s["sparse_score"], "reranked_score": s["reranked_score"], "is_final_rec": False }) try: log_query_and_results(query_entry, result_entries) except Exception as e: print("โš ๏ธ Failed to log to Supabase:", e) yield "[[MODE:recommendation]]\n" context = retriever.format_context(retrieved_movies) user_message = f"{question}\n\nContext:\nBased on the following retrieved {media_type.lower()}, suggest the best recommendations.\n\n{context}" print( f"โœจ Total chat() prep time before streaming: {time.time() - full_t0:.3f}s" ) for chunk in call_chat_model_openai(history, user_message): yield chunk else: log_query_and_results( query_entry={ "query_id": query_id, "session_id": session_id, "question": question, "intent": "chat", "media_type": media_type, }, result_entries=[] ) user_message = f"The user did not ask for a recommendation. Ask them to be more specific. Answer this as a general question: {question}" print( f"โœจ Total chat() prep time before streaming: {time.time() - full_t0:.3f}s" ) yield "[[MODE:chat]]\n" for chunk in call_chat_model_openai(history, user_message): yield sanitize_markdown(chunk) return chat