File size: 5,819 Bytes
28ec96b
e67b064
a2682b3
28ec96b
c8d57fb
e67b064
e113735
 
c8d57fb
 
a2682b3
 
 
 
 
 
 
e113735
e21244d
 
a2682b3
 
 
 
 
 
 
c8d57fb
e113735
e67b064
 
c8d57fb
e113735
 
64f44e3
 
e113735
 
64f44e3
e21244d
e113735
64f44e3
e67b064
 
 
 
e113735
c8d57fb
 
 
64f44e3
e67b064
 
e113735
 
64f44e3
e67b064
e21244d
 
e67b064
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
64f44e3
e67b064
 
 
 
 
 
 
 
64f44e3
e67b064
 
 
 
 
e113735
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e67b064
64f44e3
c8d57fb
64f44e3
 
e113735
 
 
 
 
 
a2682b3
e67b064
c8d57fb
e67b064
 
 
 
 
e113735
e67b064
 
 
 
 
 
c8d57fb
 
64f44e3
e67b064
c8d57fb
 
 
e67b064
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import datetime
from typing import List, Dict, Any, Optional, Tuple
import numpy as np
from models.LexRank import degree_centrality_scores
import logging
from datetime import datetime as dt
import asyncio
from concurrent.futures import ThreadPoolExecutor

logger = logging.getLogger(__name__)

class QueryProcessor:
    def __init__(self, embedding_model, summarization_model, nlp_model, db_service):
        self.embedding_model = embedding_model
        self.summarization_model = summarization_model
        self.nlp_model = nlp_model
        self.db_service = db_service
        self.executor = ThreadPoolExecutor(max_workers=4)  # For CPU-bound tasks
        logger.info("QueryProcessor initialized")

    async def process(
        self,
        query: str,
        topic: Optional[str] = None,
        start_date: Optional[str] = None,
        end_date: Optional[str] = None
    ) -> Dict[str, Any]:
        try:
            # Date handling (sync but fast)
            start_dt = self._parse_date(start_date) if start_date else None
            end_dt = self._parse_date(end_date) if end_date else None
            
            # Async query processing
            query_embedding = await self._async_encode(query)
            logger.debug(f"Generated embedding for query: {query[:50]}...")
            
            # Entity extraction (sync but fast)
            entities = await asyncio.to_thread(self.nlp_model.extract_entities, query)
            logger.debug(f"Extracted entities: {entities}")
            
            # Async database search
            articles = await self._execute_semantic_search(
                query_embedding,
                start_dt,
                end_dt,
                topic,
                [ent[0] for ent in entities]
            )
            
            if not articles:
                logger.info("No articles found matching criteria")
                return {"message": "No articles found", "articles": []}

            # Async summary generation
            summary_data = await self._async_generate_summary(articles)
            
            return {
                "summary": summary_data["summary"],
                "key_sentences": summary_data["key_sentences"],
                "articles": articles,
                "entities": entities
            }

        except Exception as e:
            logger.error(f"Processing failed: {str(e)}", exc_info=True)
            return {"error": str(e)}

    def _parse_date(self, date_str: str) -> dt:
        """Safe date parsing with validation"""
        try:
            return dt.strptime(date_str, "%Y-%m-%d")
        except ValueError as e:
            logger.error(f"Invalid date format: {date_str}")
            raise ValueError(f"Invalid date format. Expected YYYY-MM-DD, got {date_str}")

    async def _execute_semantic_search(
        self,
        query_embedding: List[float],
        start_date: Optional[dt],
        end_date: Optional[dt],
        topic: Optional[str],
        entities: List[str]
    ) -> List[Dict[str, Any]]:
        """Execute search with proper error handling"""
        try:
            return await self.db_service.semantic_search(
                query_embedding=query_embedding,
                start_date=start_date,
                end_date=end_date,
                topic=topic,
                entities=entities
            )
        except Exception as e:
            logger.error(f"Semantic search failed: {str(e)}")
            raise

    async def _async_encode(self, text: str) -> List[float]:
        """Run embedding in thread pool"""
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self.executor,
            lambda: self.embedding_model.encode(text).tolist()
        )

    async def _async_generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Run summary generation in thread pool"""
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(
            self.executor,
            lambda: self._sync_generate_summary(articles)
        )

    def _sync_generate_summary(self, articles: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Synchronous version for thread pool execution"""
        try:
            # Extract and process content
            sentences = []
            for article in articles:
                if content := article.get("content"):
                    sentences.extend(
                        asyncio.run_coroutine_threadsafe(
                            asyncio.to_thread(self.nlp_model.tokenize_sentences, content),
                            loop=asyncio.get_event_loop()
                        ).result()
                    )
            
            if not sentences:
                logger.warning("No sentences available for summarization")
                return {
                    "summary": "No content available for summarization",
                    "key_sentences": []
                }

            # CPU-intensive operations
            embeddings = self.embedding_model.encode(sentences)
            similarity_matrix = np.inner(embeddings, embeddings)
            centrality_scores = degree_centrality_scores(similarity_matrix, threshold=None)
            
            top_indices = np.argsort(-centrality_scores)[:10]
            key_sentences = [sentences[idx].strip() for idx in top_indices]
            
            return {
                "summary": self.summarization_model.summarize(' '.join(key_sentences)),
                "key_sentences": key_sentences
            }

        except Exception as e:
            logger.error(f"Summary generation failed: {str(e)}")
            return {
                "summary": "Summary generation failed",
                "key_sentences": []
            }