File size: 15,307 Bytes
a963d65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9a9f096
 
a963d65
 
9a9f096
 
 
 
 
a963d65
 
 
db4216f
 
 
a963d65
db4216f
a963d65
 
 
 
 
db4216f
 
 
 
 
 
 
 
 
a963d65
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
#!/usr/bin/env python3
"""
FhirFlame PostgreSQL Database Manager
Handles persistent storage for job tracking, processing history, and system metrics
Uses the existing PostgreSQL database from the Langfuse infrastructure
"""

import psycopg2
import psycopg2.extras
import json
import time
import os
from datetime import datetime
from typing import Dict, List, Any, Optional

class DatabaseManager:
    """
    PostgreSQL database manager for FhirFlame job tracking and processing history
    Connects to the existing langfuse-db PostgreSQL instance
    """
    
    import os

    def __init__(self):
        self.db_config = {
            'host': os.getenv('DB_HOST', 'langfuse-db'),
            'port': int(os.getenv('DB_PORT', 5432)),
            'database': os.getenv('DB_NAME', 'langfuse'),
            'user': os.getenv('DB_USER', 'langfuse'),
            'password': os.getenv('DB_PASSWORD', 'langfuse')
        }
        self.init_database()
    
    import sqlite3
    import os

    def get_connection(self):
        """Get PostgreSQL connection with proper configuration, fallback to SQLite"""
        try:
            conn = psycopg2.connect(**self.db_config)
            return conn
        except Exception as e:
            print(f"❌ Database connection failed: {e}")
            # Fallback to SQLite
            try:
                sqlite_path = os.getenv('SQLITE_DB_PATH', 'fhirflame_fallback.db')
                conn = sqlite3.connect(sqlite_path)
                print(f"βœ… Connected to SQLite fallback database at {sqlite_path}")
                return conn
            except Exception as e2:
                print(f"❌ SQLite fallback connection failed: {e2}")
                raise e
    def init_database(self):
        """Initialize database schema with proper tables and indexes"""
        try:
            conn = self.get_connection()
            cursor = conn.cursor()
            
            # Create fhirflame schema if not exists
            cursor.execute('CREATE SCHEMA IF NOT EXISTS fhirflame')
            
            # Create jobs table with comprehensive tracking
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS fhirflame.jobs (
                    id VARCHAR(255) PRIMARY KEY,
                    job_type VARCHAR(50) NOT NULL,
                    name TEXT NOT NULL,
                    text_input TEXT,
                    status VARCHAR(20) NOT NULL DEFAULT 'pending',
                    provider_used VARCHAR(50),
                    success BOOLEAN,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    completed_at TIMESTAMP,
                    processing_time VARCHAR(50),
                    entities_found INTEGER,
                    error_message TEXT,
                    result_data JSONB,
                    file_path TEXT,
                    batch_id VARCHAR(255),
                    workflow_type VARCHAR(50)
                )
            ''')
            
            # Create batch jobs table
            cursor.execute('''
                CREATE TABLE IF NOT EXISTS fhirflame.batch_jobs (
                    id VARCHAR(255) PRIMARY KEY,
                    workflow_type VARCHAR(50) NOT NULL,
                    status VARCHAR(20) NOT NULL DEFAULT 'pending',
                    batch_size INTEGER DEFAULT 0,
                    processed_count INTEGER DEFAULT 0,
                    success_count INTEGER DEFAULT 0,
                    failed_count INTEGER DEFAULT 0,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    completed_at TIMESTAMP
                )
            ''')
            
            # Create indexes for performance
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_fhirflame_jobs_status ON fhirflame.jobs(status)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_fhirflame_jobs_created_at ON fhirflame.jobs(created_at)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_fhirflame_jobs_job_type ON fhirflame.jobs(job_type)')
            cursor.execute('CREATE INDEX IF NOT EXISTS idx_fhirflame_batch_jobs_status ON fhirflame.batch_jobs(status)')
            
            # Create trigger for updated_at auto-update
            cursor.execute('''
                CREATE OR REPLACE FUNCTION fhirflame.update_updated_at_column()
                RETURNS TRIGGER AS $$
                BEGIN
                    NEW.updated_at = CURRENT_TIMESTAMP;
                    RETURN NEW;
                END;
                $$ language 'plpgsql'
            ''')
            
            cursor.execute('''
                DROP TRIGGER IF EXISTS update_fhirflame_jobs_updated_at ON fhirflame.jobs
            ''')
            
            cursor.execute('''
                CREATE TRIGGER update_fhirflame_jobs_updated_at 
                BEFORE UPDATE ON fhirflame.jobs 
                FOR EACH ROW 
                EXECUTE FUNCTION fhirflame.update_updated_at_column()
            ''')
            
            conn.commit()
            cursor.close()
            conn.close()
            print(f"βœ… PostgreSQL database initialized with fhirflame schema")
            
        except Exception as e:
            print(f"❌ Database initialization failed: {e}")
            # Don't raise - allow app to continue with in-memory fallback
    
    def add_job(self, job_data: Dict[str, Any]) -> bool:
        """Add new job to PostgreSQL database"""
        try:
            conn = self.get_connection()
            cursor = conn.cursor()
            
            # Ensure required fields
            job_id = job_data.get('id', f"job_{int(time.time())}")
            job_type = job_data.get('job_type', 'text')
            name = job_data.get('name', 'Unknown Job')
            status = job_data.get('status', 'pending')
            
            cursor.execute('''
                INSERT INTO fhirflame.jobs (
                    id, job_type, name, text_input, status, provider_used,
                    success, processing_time, entities_found, error_message,
                    result_data, file_path, batch_id, workflow_type
                ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (id) DO UPDATE SET
                    status = EXCLUDED.status,
                    updated_at = CURRENT_TIMESTAMP
            ''', (
                job_id,
                job_type,
                name,
                job_data.get('text_input'),
                status,
                job_data.get('provider_used'),
                job_data.get('success'),
                job_data.get('processing_time'),
                job_data.get('entities_found'),
                job_data.get('error_message'),
                json.dumps(job_data.get('result_data')) if job_data.get('result_data') else None,
                job_data.get('file_path'),
                job_data.get('batch_id'),
                job_data.get('workflow_type')
            ))
            
            conn.commit()
            cursor.close()
            conn.close()
            print(f"βœ… Job added to PostgreSQL database: {job_id}")
            return True
            
        except Exception as e:
            print(f"❌ Failed to add job to PostgreSQL database: {e}")
            return False
    
    def update_job(self, job_id: str, updates: Dict[str, Any]) -> bool:
        """Update existing job in PostgreSQL database"""
        try:
            conn = self.get_connection()
            cursor = conn.cursor()
            
            # Build update query dynamically
            update_fields = []
            values = []
            
            for field, value in updates.items():
                if field in ['status', 'provider_used', 'success', 'processing_time', 
                           'entities_found', 'error_message', 'result_data', 'completed_at']:
                    update_fields.append(f"{field} = %s")
                    if field == 'result_data' and value is not None:
                        values.append(json.dumps(value))
                    else:
                        values.append(value)
            
            if update_fields:
                values.append(job_id)
                
                query = f"UPDATE fhirflame.jobs SET {', '.join(update_fields)} WHERE id = %s"
                cursor.execute(query, values)
                
                conn.commit()
                cursor.close()
                conn.close()
                print(f"βœ… Job updated in PostgreSQL database: {job_id}")
                return True
            
        except Exception as e:
            print(f"❌ Failed to update job in PostgreSQL database: {e}")
            return False
    
    def get_job(self, job_id: str) -> Optional[Dict[str, Any]]:
        """Get specific job from PostgreSQL database"""
        try:
            conn = self.get_connection()
            cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
            
            cursor.execute("SELECT * FROM fhirflame.jobs WHERE id = %s", (job_id,))
            row = cursor.fetchone()
            cursor.close()
            conn.close()
            
            if row:
                job_data = dict(row)
                if job_data.get('result_data'):
                    try:
                        job_data['result_data'] = json.loads(job_data['result_data'])
                    except:
                        pass
                return job_data
            return None
            
        except Exception as e:
            print(f"❌ Failed to get job from PostgreSQL database: {e}")
            return None
    
    def get_jobs_history(self, limit: int = 50) -> List[Dict[str, Any]]:
        """Get recent jobs for UI display"""
        try:
            conn = self.get_connection()
            cursor = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
            
            cursor.execute('''
                SELECT * FROM fhirflame.jobs 
                ORDER BY created_at DESC 
                LIMIT %s
            ''', (limit,))
            
            rows = cursor.fetchall()
            cursor.close()
            conn.close()
            
            jobs = []
            for row in rows:
                job_data = dict(row)
                if job_data.get('result_data'):
                    try:
                        job_data['result_data'] = json.loads(job_data['result_data'])
                    except:
                        pass
                jobs.append(job_data)
            
            print(f"βœ… Retrieved {len(jobs)} jobs from PostgreSQL database")
            return jobs
            
        except Exception as e:
            print(f"❌ Failed to get jobs history from PostgreSQL: {e}")
            return []
    
    def get_dashboard_metrics(self) -> Dict[str, int]:
        """Get dashboard metrics from PostgreSQL database"""
        try:
            conn = self.get_connection()
            cursor = conn.cursor()
            
            # Get total jobs
            cursor.execute("SELECT COUNT(*) FROM fhirflame.jobs")
            total_jobs = cursor.fetchone()[0]
            
            # Get completed jobs
            cursor.execute("SELECT COUNT(*) FROM fhirflame.jobs WHERE status = 'completed'")
            completed_jobs = cursor.fetchone()[0]
            
            # Get successful jobs
            cursor.execute("SELECT COUNT(*) FROM fhirflame.jobs WHERE success = true")
            successful_jobs = cursor.fetchone()[0]
            
            # Get failed jobs
            cursor.execute("SELECT COUNT(*) FROM fhirflame.jobs WHERE success = false")
            failed_jobs = cursor.fetchone()[0]
            
            # Get active jobs
            cursor.execute("SELECT COUNT(*) FROM fhirflame.jobs WHERE status IN ('pending', 'processing')")
            active_jobs = cursor.fetchone()[0]
            
            cursor.close()
            conn.close()
            
            metrics = {
                'total_jobs': total_jobs,
                'completed_jobs': completed_jobs,
                'successful_jobs': successful_jobs,
                'failed_jobs': failed_jobs,
                'active_jobs': active_jobs
            }
            
            print(f"βœ… Retrieved dashboard metrics from PostgreSQL: {metrics}")
            return metrics
            
        except Exception as e:
            print(f"❌ Failed to get dashboard metrics from PostgreSQL: {e}")
            return {
                'total_jobs': 0,
                'completed_jobs': 0,
                'successful_jobs': 0,
                'failed_jobs': 0,
                'active_jobs': 0
            }
    
    def add_batch_job(self, batch_data: Dict[str, Any]) -> bool:
        """Add batch job to PostgreSQL database"""
        try:
            conn = self.get_connection()
            cursor = conn.cursor()
            
            batch_id = batch_data.get('id', f"batch_{int(time.time())}")
            
            cursor.execute('''
                INSERT INTO fhirflame.batch_jobs (
                    id, workflow_type, status, batch_size, processed_count,
                    success_count, failed_count
                ) VALUES (%s, %s, %s, %s, %s, %s, %s)
                ON CONFLICT (id) DO UPDATE SET
                    status = EXCLUDED.status,
                    processed_count = EXCLUDED.processed_count,
                    success_count = EXCLUDED.success_count,
                    failed_count = EXCLUDED.failed_count,
                    updated_at = CURRENT_TIMESTAMP
            ''', (
                batch_id,
                batch_data.get('workflow_type', 'unknown'),
                batch_data.get('status', 'pending'),
                batch_data.get('batch_size', 0),
                batch_data.get('processed_count', 0),
                batch_data.get('success_count', 0),
                batch_data.get('failed_count', 0)
            ))
            
            conn.commit()
            cursor.close()
            conn.close()
            print(f"βœ… Batch job added to PostgreSQL database: {batch_id}")
            return True
            
        except Exception as e:
            print(f"❌ Failed to add batch job to PostgreSQL database: {e}")
            return False

# Global database instance
db_manager = DatabaseManager()

def get_db_connection():
    """Backward compatibility function"""
    return db_manager.get_connection()
def clear_all_jobs():
    """Clear all jobs from the database - utility function for UI"""
    try:
        db_manager = DatabaseManager()
        conn = db_manager.get_connection()
        cursor = conn.cursor()
        
        # Clear both regular jobs and batch jobs
        cursor.execute("DELETE FROM fhirflame.jobs")
        cursor.execute("DELETE FROM fhirflame.batch_jobs")
        
        conn.commit()
        cursor.close()
        conn.close()
        
        print("βœ… All jobs cleared from database")
        return True
        
    except Exception as e:
        print(f"❌ Failed to clear database: {e}")
        return False