christopher commited on
Commit
e113735
·
1 Parent(s): 95f7578

query threads

Browse files
Files changed (1) hide show
  1. database/query_processor.py +37 -14
database/query_processor.py CHANGED
@@ -4,6 +4,8 @@ import numpy as np
4
  from models.LexRank import degree_centrality_scores
5
  import logging
6
  from datetime import datetime as dt
 
 
7
 
8
  logger = logging.getLogger(__name__)
9
 
@@ -13,6 +15,7 @@ class QueryProcessor:
13
  self.summarization_model = summarization_model
14
  self.nlp_model = nlp_model
15
  self.db_service = db_service
 
16
  logger.info("QueryProcessor initialized")
17
 
18
  async def process(
@@ -23,33 +26,33 @@ class QueryProcessor:
23
  end_date: Optional[str] = None
24
  ) -> Dict[str, Any]:
25
  try:
26
- # Date handling
27
  start_dt = self._parse_date(start_date) if start_date else None
28
  end_dt = self._parse_date(end_date) if end_date else None
29
 
30
- # Query processing
31
- query_embedding = self.embedding_model.encode(query).tolist()
32
  logger.debug(f"Generated embedding for query: {query[:50]}...")
33
 
34
- # Entity extraction
35
- entities = self.nlp_model.extract_entities(query)
36
  logger.debug(f"Extracted entities: {entities}")
37
 
38
- # Database search
39
  articles = await self._execute_semantic_search(
40
  query_embedding,
41
  start_dt,
42
  end_dt,
43
  topic,
44
- [ent[0] for ent in entities] # Just the entity texts
45
  )
46
 
47
  if not articles:
48
  logger.info("No articles found matching criteria")
49
  return {"message": "No articles found", "articles": []}
50
 
51
- # Summary generation
52
- summary_data = self._generate_summary(articles)
53
 
54
  return {
55
  "summary": summary_data["summary"],
@@ -91,14 +94,35 @@ class QueryProcessor:
91
  logger.error(f"Semantic search failed: {str(e)}")
92
  raise
93
 
94
- def _generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
95
- """Generate summary from articles with fallback handling"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
  try:
97
  # Extract and process content
98
  sentences = []
99
  for article in articles:
100
  if content := article.get("content"):
101
- sentences.extend(self.nlp_model.tokenize_sentences(content))
 
 
 
 
 
102
 
103
  if not sentences:
104
  logger.warning("No sentences available for summarization")
@@ -107,12 +131,11 @@ class QueryProcessor:
107
  "key_sentences": []
108
  }
109
 
110
- # Generate summary
111
  embeddings = self.embedding_model.encode(sentences)
112
  similarity_matrix = np.inner(embeddings, embeddings)
113
  centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None)
114
 
115
- # Get top 10 most central sentences
116
  top_indices = np.argsort(-centrality_scores)[:10]
117
  key_sentences = [sentences[idx].strip() for idx in top_indices]
118
 
 
4
  from models.LexRank import degree_centrality_scores
5
  import logging
6
  from datetime import datetime as dt
7
+ import asyncio
8
+ from concurrent.futures import ThreadPoolExecutor
9
 
10
  logger = logging.getLogger(__name__)
11
 
 
15
  self.summarization_model = summarization_model
16
  self.nlp_model = nlp_model
17
  self.db_service = db_service
18
+ self.executor = ThreadPoolExecutor(max_workers=4) # For CPU-bound tasks
19
  logger.info("QueryProcessor initialized")
20
 
21
  async def process(
 
26
  end_date: Optional[str] = None
27
  ) -> Dict[str, Any]:
28
  try:
29
+ # Date handling (sync but fast)
30
  start_dt = self._parse_date(start_date) if start_date else None
31
  end_dt = self._parse_date(end_date) if end_date else None
32
 
33
+ # Async query processing
34
+ query_embedding = await self._async_encode(query)
35
  logger.debug(f"Generated embedding for query: {query[:50]}...")
36
 
37
+ # Entity extraction (sync but fast)
38
+ entities = await asyncio.to_thread(self.nlp_model.extract_entities, query)
39
  logger.debug(f"Extracted entities: {entities}")
40
 
41
+ # Async database search
42
  articles = await self._execute_semantic_search(
43
  query_embedding,
44
  start_dt,
45
  end_dt,
46
  topic,
47
+ [ent[0] for ent in entities]
48
  )
49
 
50
  if not articles:
51
  logger.info("No articles found matching criteria")
52
  return {"message": "No articles found", "articles": []}
53
 
54
+ # Async summary generation
55
+ summary_data = await self._async_generate_summary(articles)
56
 
57
  return {
58
  "summary": summary_data["summary"],
 
94
  logger.error(f"Semantic search failed: {str(e)}")
95
  raise
96
 
97
+ async def _async_encode(self, text: str) -> List[float]:
98
+ """Run embedding in thread pool"""
99
+ loop = asyncio.get_running_loop()
100
+ return await loop.run_in_executor(
101
+ self.executor,
102
+ lambda: self.embedding_model.encode(text).tolist()
103
+ )
104
+
105
+ async def _async_generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
106
+ """Run summary generation in thread pool"""
107
+ loop = asyncio.get_running_loop()
108
+ return await loop.run_in_executor(
109
+ self.executor,
110
+ lambda: self._sync_generate_summary(articles)
111
+ )
112
+
113
+ def _sync_generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
114
+ """Synchronous version for thread pool execution"""
115
  try:
116
  # Extract and process content
117
  sentences = []
118
  for article in articles:
119
  if content := article.get("content"):
120
+ sentences.extend(
121
+ asyncio.run_coroutine_threadsafe(
122
+ asyncio.to_thread(self.nlp_model.tokenize_sentences, content),
123
+ loop=asyncio.get_event_loop()
124
+ ).result()
125
+ )
126
 
127
  if not sentences:
128
  logger.warning("No sentences available for summarization")
 
131
  "key_sentences": []
132
  }
133
 
134
+ # CPU-intensive operations
135
  embeddings = self.embedding_model.encode(sentences)
136
  similarity_matrix = np.inner(embeddings, embeddings)
137
  centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None)
138
 
 
139
  top_indices = np.argsort(-centrality_scores)[:10]
140
  key_sentences = [sentences[idx].strip() for idx in top_indices]
141