File size: 8,568 Bytes
fc5fa78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e7b58b1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fc5fa78
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
"""
RESTful API for Federated Learning Server
Handles client registration, model updates, and coordination
"""

from flask import Flask, request, jsonify
import logging
import threading
import time
from typing import Dict, Any, List
from ..server.coordinator import FederatedCoordinator
from ..utils.metrics import calculate_model_similarity

logger = logging.getLogger(__name__)

class FederatedAPI:
    def __init__(self, coordinator: FederatedCoordinator, host: str = "0.0.0.0", port: int = 8080):
        self.app = Flask(__name__)
        self.coordinator = coordinator
        self.host = host
        self.port = port
        self._setup_routes()
        
    def _setup_routes(self):
        """Setup API routes"""
        
        @self.app.route('/health', methods=['GET'])
        def health_check():
            """Health check endpoint"""
            return jsonify({
                'status': 'healthy',
                'timestamp': time.time(),
                'active_clients': len(self.coordinator.clients),
                'current_round': getattr(self.coordinator, 'current_round', 0)
            })
        
        @self.app.route('/register', methods=['POST'])
        def register_client():
            """Register a new client"""
            try:
                data = request.get_json()
                client_id = data.get('client_id')
                client_info = data.get('client_info', {})
                
                if not client_id:
                    return jsonify({'error': 'client_id is required'}), 400
                
                success = self.coordinator.register_client(client_id, client_info)
                
                if success:
                    return jsonify({
                        'status': 'registered',
                        'client_id': client_id,
                        'server_config': self.coordinator.get_client_config()
                    })
                else:
                    return jsonify({'error': 'Registration failed'}), 400
                    
            except Exception as e:
                logger.error(f"Error registering client: {str(e)}")
                return jsonify({'error': str(e)}), 500
        
        @self.app.route('/get_model', methods=['POST'])
        def get_global_model():
            """Get the current global model"""
            try:
                data = request.get_json()
                client_id = data.get('client_id')
                
                if not client_id or client_id not in self.coordinator.clients:
                    return jsonify({'error': 'Invalid client_id'}), 400
                
                model_weights = self.coordinator.get_global_model()
                
                return jsonify({
                    'model_weights': model_weights,
                    'round': getattr(self.coordinator, 'current_round', 0),
                    'timestamp': time.time()
                })
                
            except Exception as e:
                logger.error(f"Error getting global model: {str(e)}")
                return jsonify({'error': str(e)}), 500
        
        @self.app.route('/submit_update', methods=['POST'])
        def submit_model_update():
            """Submit a model update from client"""
            try:
                data = request.get_json()
                client_id = data.get('client_id')
                model_weights = data.get('model_weights')
                training_metrics = data.get('metrics', {})
                
                if not client_id or not model_weights:
                    return jsonify({'error': 'client_id and model_weights are required'}), 400
                
                if client_id not in self.coordinator.clients:
                    return jsonify({'error': 'Client not registered'}), 400
                
                # Store the update
                self.coordinator.receive_model_update(client_id, model_weights, training_metrics)
                
                return jsonify({
                    'status': 'update_received',
                    'client_id': client_id,
                    'timestamp': time.time()
                })
                
            except Exception as e:
                logger.error(f"Error submitting model update: {str(e)}")
                return jsonify({'error': str(e)}), 500
        
        @self.app.route('/training_status', methods=['GET'])
        def get_training_status():
            """Get current training status"""
            try:
                return jsonify({
                    'current_round': getattr(self.coordinator, 'current_round', 0),
                    'total_rounds': self.coordinator.config.get('federated', {}).get('num_rounds', 10),
                    'active_clients': len(self.coordinator.clients),
                    'clients_ready': len(getattr(self.coordinator, 'client_updates', {})),
                    'min_clients': self.coordinator.config.get('federated', {}).get('min_clients', 2),
                    'training_active': getattr(self.coordinator, 'training_active', False)
                })
                
            except Exception as e:
                logger.error(f"Error getting training status: {str(e)}")
                return jsonify({'error': str(e)}), 500
        
        @self.app.route('/rag/query', methods=['POST'])
        def rag_query():
            """Handle RAG queries"""
            try:
                data = request.get_json()
                query = data.get('query')
                client_id = data.get('client_id')
                
                if not query:
                    return jsonify({'error': 'query is required'}), 400
                
                # This will be implemented when we integrate RAG
                return jsonify({
                    'response': 'RAG functionality coming soon',
                    'query': query,
                    'timestamp': time.time()
                })
                
            except Exception as e:
                logger.error(f"Error processing RAG query: {str(e)}")
                return jsonify({'error': str(e)}), 500
    
        @self.app.route('/predict', methods=['POST'])
        def predict():
            """Predict using the current global model."""
            try:
                data = request.get_json()
                features = data.get('features')
                if features is None or not isinstance(features, list) or len(features) != 32:
                    return jsonify({'error': 'features must be a list of 32 floats'}), 400

                # Get global model weights
                model_weights = self.coordinator.get_global_model()
                if model_weights is None:
                    return jsonify({'error': 'Global model not available yet'}), 503

                # Build the model (same as client)
                import tensorflow as tf
                import numpy as np
                input_dim = 32
                model = tf.keras.Sequential([
                    tf.keras.layers.Input(shape=(input_dim,)),
                    tf.keras.layers.Dense(128, activation='relu'),
                    tf.keras.layers.Dense(64, activation='relu'),
                    tf.keras.layers.Dense(1)
                ])
                model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.001), loss='mse')
                model.set_weights([np.array(w) for w in model_weights])

                # Prepare input and predict
                x = np.array(features, dtype=np.float32).reshape(1, -1)
                pred = model.predict(x)
                prediction = float(pred[0, 0])
                return jsonify({'prediction': prediction})
            except Exception as e:
                logger.error(f"Error in prediction endpoint: {str(e)}")
                return jsonify({'error': str(e)}), 500
    
    def run(self, debug: bool = False):
        """Run the API server"""
        logger.info(f"Starting Federated API server on {self.host}:{self.port}")
        self.app.run(host=self.host, port=self.port, debug=debug, threaded=True)
    
    def run_threaded(self, debug: bool = False):
        """Run the API server in a separate thread"""
        def run_server():
            self.app.run(host=self.host, port=self.port, debug=debug, threaded=True)
        
        thread = threading.Thread(target=run_server, daemon=True)
        thread.start()
        logger.info(f"Federated API server started in background on {self.host}:{self.port}")
        return thread