test / database /query_processor.py
christopher
query threads
e113735
raw
history blame
5.82 kB
import datetime
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from models.LexRank import degree_centrality_scores
import logging
from datetime import datetime as dt
import asyncio
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
class QueryProcessor:
def __init__(self, embedding_model, summarization_model, nlp_model, db_service):
self.embedding_model = embedding_model
self.summarization_model = summarization_model
self.nlp_model = nlp_model
self.db_service = db_service
self.executor = ThreadPoolExecutor(max_workers=4) # For CPU-bound tasks
logger.info("QueryProcessor initialized")
async def process(
self,
query: str,
topic: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> Dict[str, Any]:
try:
# Date handling (sync but fast)
start_dt = self._parse_date(start_date) if start_date else None
end_dt = self._parse_date(end_date) if end_date else None
# Async query processing
query_embedding = await self._async_encode(query)
logger.debug(f"Generated embedding for query: {query[:50]}...")
# Entity extraction (sync but fast)
entities = await asyncio.to_thread(self.nlp_model.extract_entities, query)
logger.debug(f"Extracted entities: {entities}")
# Async database search
articles = await self._execute_semantic_search(
query_embedding,
start_dt,
end_dt,
topic,
[ent[0] for ent in entities]
)
if not articles:
logger.info("No articles found matching criteria")
return {"message": "No articles found", "articles": []}
# Async summary generation
summary_data = await self._async_generate_summary(articles)
return {
"summary": summary_data["summary"],
"key_sentences": summary_data["key_sentences"],
"articles": articles,
"entities": entities
}
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
return {"error": str(e)}
def _parse_date(self, date_str: str) -> dt:
"""Safe date parsing with validation"""
try:
return dt.strptime(date_str, "%Y-%m-%d")
except ValueError as e:
logger.error(f"Invalid date format: {date_str}")
raise ValueError(f"Invalid date format. Expected YYYY-MM-DD, got {date_str}")
async def _execute_semantic_search(
self,
query_embedding: List[float],
start_date: Optional[dt],
end_date: Optional[dt],
topic: Optional[str],
entities: List[str]
) -> List[Dict[str, Any]]:
"""Execute search with proper error handling"""
try:
return await self.db_service.semantic_search(
query_embedding=query_embedding,
start_date=start_date,
end_date=end_date,
topic=topic,
entities=entities
)
except Exception as e:
logger.error(f"Semantic search failed: {str(e)}")
raise
async def _async_encode(self, text: str) -> List[float]:
"""Run embedding in thread pool"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
lambda: self.embedding_model.encode(text).tolist()
)
async def _async_generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Run summary generation in thread pool"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
lambda: self._sync_generate_summary(articles)
)
def _sync_generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Synchronous version for thread pool execution"""
try:
# Extract and process content
sentences = []
for article in articles:
if content := article.get("content"):
sentences.extend(
asyncio.run_coroutine_threadsafe(
asyncio.to_thread(self.nlp_model.tokenize_sentences, content),
loop=asyncio.get_event_loop()
).result()
)
if not sentences:
logger.warning("No sentences available for summarization")
return {
"summary": "No content available for summarization",
"key_sentences": []
}
# CPU-intensive operations
embeddings = self.embedding_model.encode(sentences)
similarity_matrix = np.inner(embeddings, embeddings)
centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None)
top_indices = np.argsort(-centrality_scores)[:10]
key_sentences = [sentences[idx].strip() for idx in top_indices]
return {
"summary": self.summarization_model.summarize(' '.join(key_sentences)),
"key_sentences": key_sentences
}
except Exception as e:
logger.error(f"Summary generation failed: {str(e)}")
return {
"summary": "Summary generation failed",
"key_sentences": []
}