File size: 5,819 Bytes
28ec96b e67b064 a2682b3 28ec96b c8d57fb e67b064 e113735 c8d57fb a2682b3 e113735 e21244d a2682b3 c8d57fb e113735 e67b064 c8d57fb e113735 64f44e3 e113735 64f44e3 e21244d e113735 64f44e3 e67b064 e113735 c8d57fb 64f44e3 e67b064 e113735 64f44e3 e67b064 e21244d e67b064 64f44e3 e67b064 64f44e3 e67b064 e113735 e67b064 64f44e3 c8d57fb 64f44e3 e113735 a2682b3 e67b064 c8d57fb e67b064 e113735 e67b064 c8d57fb 64f44e3 e67b064 c8d57fb e67b064 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
import datetime
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from models.LexRank import degree_centrality_scores
import logging
from datetime import datetime as dt
import asyncio
from concurrent.futures import ThreadPoolExecutor
logger = logging.getLogger(__name__)
class QueryProcessor:
def __init__(self, embedding_model, summarization_model, nlp_model, db_service):
self.embedding_model = embedding_model
self.summarization_model = summarization_model
self.nlp_model = nlp_model
self.db_service = db_service
self.executor = ThreadPoolExecutor(max_workers=4) # For CPU-bound tasks
logger.info("QueryProcessor initialized")
async def process(
self,
query: str,
topic: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None
) -> Dict[str, Any]:
try:
# Date handling (sync but fast)
start_dt = self._parse_date(start_date) if start_date else None
end_dt = self._parse_date(end_date) if end_date else None
# Async query processing
query_embedding = await self._async_encode(query)
logger.debug(f"Generated embedding for query: {query[:50]}...")
# Entity extraction (sync but fast)
entities = await asyncio.to_thread(self.nlp_model.extract_entities, query)
logger.debug(f"Extracted entities: {entities}")
# Async database search
articles = await self._execute_semantic_search(
query_embedding,
start_dt,
end_dt,
topic,
[ent[0] for ent in entities]
)
if not articles:
logger.info("No articles found matching criteria")
return {"message": "No articles found", "articles": []}
# Async summary generation
summary_data = await self._async_generate_summary(articles)
return {
"summary": summary_data["summary"],
"key_sentences": summary_data["key_sentences"],
"articles": articles,
"entities": entities
}
except Exception as e:
logger.error(f"Processing failed: {str(e)}", exc_info=True)
return {"error": str(e)}
def _parse_date(self, date_str: str) -> dt:
"""Safe date parsing with validation"""
try:
return dt.strptime(date_str, "%Y-%m-%d")
except ValueError as e:
logger.error(f"Invalid date format: {date_str}")
raise ValueError(f"Invalid date format. Expected YYYY-MM-DD, got {date_str}")
async def _execute_semantic_search(
self,
query_embedding: List[float],
start_date: Optional[dt],
end_date: Optional[dt],
topic: Optional[str],
entities: List[str]
) -> List[Dict[str, Any]]:
"""Execute search with proper error handling"""
try:
return await self.db_service.semantic_search(
query_embedding=query_embedding,
start_date=start_date,
end_date=end_date,
topic=topic,
entities=entities
)
except Exception as e:
logger.error(f"Semantic search failed: {str(e)}")
raise
async def _async_encode(self, text: str) -> List[float]:
"""Run embedding in thread pool"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
lambda: self.embedding_model.encode(text).tolist()
)
async def _async_generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Run summary generation in thread pool"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
lambda: self._sync_generate_summary(articles)
)
def _sync_generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Synchronous version for thread pool execution"""
try:
# Extract and process content
sentences = []
for article in articles:
if content := article.get("content"):
sentences.extend(
asyncio.run_coroutine_threadsafe(
asyncio.to_thread(self.nlp_model.tokenize_sentences, content),
loop=asyncio.get_event_loop()
).result()
)
if not sentences:
logger.warning("No sentences available for summarization")
return {
"summary": "No content available for summarization",
"key_sentences": []
}
# CPU-intensive operations
embeddings = self.embedding_model.encode(sentences)
similarity_matrix = np.inner(embeddings, embeddings)
centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None)
top_indices = np.argsort(-centrality_scores)[:10]
key_sentences = [sentences[idx].strip() for idx in top_indices]
return {
"summary": self.summarization_model.summarize(' '.join(key_sentences)),
"key_sentences": key_sentences
}
except Exception as e:
logger.error(f"Summary generation failed: {str(e)}")
return {
"summary": "Summary generation failed",
"key_sentences": []
} |