File size: 13,931 Bytes
704902c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import streamlit as st
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers # For custom layer definitions
import numpy as np
from PIL import Image
import json
import os

# --- RepVGGBlock Class Definition (Latest Verified Version) ---
# Users will need this definition if it's a custom layer in your model.
class RepVGGBlock(layers.Layer):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1,
                 groups=1, deploy=False, use_se=False, **kwargs):
        super(RepVGGBlock, self).__init__(**kwargs)
        self.config_initial_in_channels = in_channels
        self.config_out_channels = out_channels
        self.config_kernel_size = kernel_size
        self.config_strides_val = stride
        self.config_groups = groups
        self._deploy_mode_internal = deploy
        self.config_use_se = use_se
        self.actual_in_channels = None

        self.rbr_dense_conv = layers.Conv2D(
            filters=self.config_out_channels, kernel_size=self.config_kernel_size,
            strides=self.config_strides_val, padding='same',
            groups=self.config_groups, use_bias=False, name=self.name + '_dense_conv'
        )
        self.rbr_dense_bn = layers.BatchNormalization(name=self.name + '_dense_bn')
        self.rbr_1x1_conv = layers.Conv2D(
            filters=self.config_out_channels, kernel_size=1,
            strides=self.config_strides_val, padding='valid',
            groups=self.config_groups, use_bias=False, name=self.name + '_1x1_conv'
        )
        self.rbr_1x1_bn = layers.BatchNormalization(name=self.name + '_1x1_bn')
        self.rbr_identity_bn = None
        self.rbr_reparam = layers.Conv2D(
            filters=self.config_out_channels, kernel_size=self.config_kernel_size,
            strides=self.config_strides_val, padding='same',
            groups=self.config_groups, use_bias=True, name=self.name + '_reparam_conv'
        )

    def build(self, input_shape):
        self.actual_in_channels = input_shape[-1]
        if self.config_initial_in_channels is None:
            self.config_initial_in_channels = self.actual_in_channels
        elif self.config_initial_in_channels != self.actual_in_channels:
            raise ValueError(f"Input channel mismatch for layer {self.name}: Expected {self.config_initial_in_channels}, got {self.actual_in_channels}")

        if self.rbr_identity_bn is None and \
           self.actual_in_channels == self.config_out_channels and self.config_strides_val == 1:
            self.rbr_identity_bn = layers.BatchNormalization(name=self.name + '_identity_bn')
        
        super(RepVGGBlock, self).build(input_shape)

        if not self.rbr_dense_conv.built: self.rbr_dense_conv.build(input_shape)
        if not self.rbr_dense_bn.built: self.rbr_dense_bn.build(self.rbr_dense_conv.compute_output_shape(input_shape))
        if not self.rbr_1x1_conv.built: self.rbr_1x1_conv.build(input_shape)
        if not self.rbr_1x1_bn.built: self.rbr_1x1_bn.build(self.rbr_1x1_conv.compute_output_shape(input_shape))
        if self.rbr_identity_bn is not None and not self.rbr_identity_bn.built:
            self.rbr_identity_bn.build(input_shape)
        if not self.rbr_reparam.built:
            self.rbr_reparam.build(input_shape)

    def call(self, inputs):
        if self._deploy_mode_internal:
            return self.rbr_reparam(inputs)
        else:
            out_dense = self.rbr_dense_bn(self.rbr_dense_conv(inputs))
            out_1x1 = self.rbr_1x1_bn(self.rbr_1x1_conv(inputs))
            if self.rbr_identity_bn is not None:
                out_identity = self.rbr_identity_bn(inputs)
                return out_dense + out_1x1 + out_identity
            else: return out_dense + out_1x1

    def _fuse_bn_tensor(self, conv_layer, bn_layer): # Not called during inference with deploy=True model
        kernel = conv_layer.kernel; dtype = kernel.dtype; out_channels = kernel.shape[-1]
        gamma = getattr(bn_layer, 'gamma', tf.ones(out_channels, dtype=dtype))
        beta = getattr(bn_layer, 'beta', tf.zeros(out_channels, dtype=dtype))
        running_mean = getattr(bn_layer, 'moving_mean', tf.zeros(out_channels, dtype=dtype))
        running_var = getattr(bn_layer, 'moving_variance', tf.ones(out_channels, dtype=dtype))
        epsilon = bn_layer.epsilon; std = tf.sqrt(running_var + epsilon)
        fused_kernel = kernel * (gamma / std)
        if conv_layer.use_bias: fused_bias = beta + (gamma * (conv_layer.bias - running_mean)) / std
        else: fused_bias = beta - (running_mean * gamma) / std
        return fused_kernel, fused_bias

    def reparameterize(self): # Not called during inference with deploy=True model
        if self._deploy_mode_internal: return
        branches_to_check = [self.rbr_dense_conv, self.rbr_dense_bn, self.rbr_1x1_conv, self.rbr_1x1_bn]
        if self.rbr_identity_bn: branches_to_check.append(self.rbr_identity_bn)
        for branch_layer in branches_to_check:
            if not branch_layer.built: raise Exception(f"ERROR: Branch layer {branch_layer.name} for {self.name} not built.")
        kernel_dense, bias_dense = self._fuse_bn_tensor(self.rbr_dense_conv, self.rbr_dense_bn)
        kernel_1x1_unpadded, bias_1x1 = self._fuse_bn_tensor(self.rbr_1x1_conv, self.rbr_1x1_bn)
        pad_amount = self.config_kernel_size // 2
        kernel_1x1_padded = tf.pad(kernel_1x1_unpadded, [[pad_amount,pad_amount],[pad_amount,pad_amount],[0,0],[0,0]])
        final_kernel = kernel_dense + kernel_1x1_padded; final_bias = bias_dense + bias_1x1
        if self.rbr_identity_bn is not None:
            running_mean_id = self.rbr_identity_bn.moving_mean; running_var_id = self.rbr_identity_bn.moving_variance
            gamma_id = self.rbr_identity_bn.gamma; beta_id = self.rbr_identity_bn.beta
            epsilon_id = self.rbr_identity_bn.epsilon; std_id = tf.sqrt(running_var_id + epsilon_id)
            kernel_id_scaler = gamma_id / std_id
            bias_id_term = beta_id - (running_mean_id * gamma_id) / std_id
            identity_kernel_np = np.zeros((self.config_kernel_size,self.config_kernel_size,self.actual_in_channels,self.config_out_channels),dtype=np.float32)
            for i in range(self.actual_in_channels): identity_kernel_np[pad_amount,pad_amount,i,i] = kernel_id_scaler[i].numpy()
            kernel_id_final = tf.convert_to_tensor(identity_kernel_np, dtype=tf.float32)
            final_kernel += kernel_id_final; final_bias += bias_id_term
        if not self.rbr_reparam.built: raise Exception(f"CRITICAL ERROR: {self.rbr_reparam.name} not built before set_weights.")
        self.rbr_reparam.set_weights([final_kernel, final_bias]); self._deploy_mode_internal = True

    def get_config(self):
        config = super(RepVGGBlock, self).get_config()
        config.update({
            "in_channels": self.config_initial_in_channels, "out_channels": self.config_out_channels,
            "kernel_size": self.config_kernel_size, "stride": self.config_strides_val,
            "groups": self.config_groups, "deploy": self._deploy_mode_internal, "use_se": self.config_use_se
        }); return config
    @classmethod
    def from_config(cls, config): return cls(**config)
# --- End of RepVGGBlock ---

# --- NECALayer Class Definition (Verified Version) ---
class NECALayer(layers.Layer):
    def __init__(self, channels, gamma=2, b=1, **kwargs):
        super(NECALayer, self).__init__(**kwargs)
        self.channels = channels; self.gamma = gamma; self.b = b
        tf_channels = tf.cast(self.channels, tf.float32)
        k_float = (tf.math.log(tf_channels) / tf.math.log(2.0) + self.b) / self.gamma
        k_int = tf.cast(tf.round(k_float), tf.int32)
        if tf.equal(k_int % 2, 0): self.k_scalar_val = k_int + 1
        else: self.k_scalar_val = k_int
        self.k_scalar_val = tf.maximum(1, self.k_scalar_val)
        kernel_size_for_conv1d = (int(self.k_scalar_val.numpy()),)
        self.gap = layers.GlobalAveragePooling2D(keepdims=True)
        self.conv1d = layers.Conv1D(filters=1, kernel_size=kernel_size_for_conv1d, padding='same', use_bias=False, name=self.name + '_eca_conv1d')
        self.sigmoid = layers.Activation('sigmoid')
    def call(self, inputs):
        if self.channels != inputs.shape[-1]: raise ValueError(f"Input channels {inputs.shape[-1]} != layer channels {self.channels} for {self.name}")
        x = self.gap(inputs); x = tf.squeeze(x, axis=[1,2]); x = tf.expand_dims(x, axis=-1)
        x = self.conv1d(x); x = tf.squeeze(x, axis=-1); attention = self.sigmoid(x)
        return inputs * tf.reshape(attention, [-1, 1, 1, self.channels])
    def get_config(self):
        config = super(NECALayer, self).get_config()
        config.update({"channels": self.channels, "gamma": self.gamma, "b": self.b}); return config
    @classmethod
    def from_config(cls, config): return cls(**config)
# --- End of NECALayer ---


# --- Streamlit App Configuration ---
MODEL_FILENAME = 'genera_cic_v1.keras'
LABEL_MAPPING_FILENAME = 'label_mapping.json'
IMG_WIDTH = 299
IMG_HEIGHT = 299

st.set_page_config(page_title="Genera Cloud Classifier", layout="wide")

# --- Load Model and Label Mapping (Cached for performance) ---
@st.cache_resource
def load_keras_model(model_path):
    """Loads the Keras model with custom layer definitions."""
    if not os.path.exists(model_path):
        st.error(f"Model file not found: {model_path}")
        st.error(f"Please ensure '{model_path}' is in the same directory as this script, or update the path.")
        return None
    try:
        custom_objects = {'RepVGGBlock': RepVGGBlock, 'NECALayer': NECALayer}
        model = tf.keras.models.load_model(model_path, custom_objects=custom_objects, compile=False)
        print("Model loaded successfully.")
        return model
    except Exception as e:
        st.error(f"Error loading Keras model from '{model_path}': {e}")
        st.error("Make sure the custom layer definitions (RepVGGBlock, NECALayer) are correct and match the saved model.")
        return None

@st.cache_data
def load_label_map(mapping_path):
    """Loads the label mapping from a JSON file."""
    if not os.path.exists(mapping_path):
        st.error(f"Label mapping file not found: {mapping_path}")
        st.error(f"Please ensure '{mapping_path}' is in the same directory as this script, or update the path.")
        return None
    try:
        with open(mapping_path, 'r') as f:
            label_data = json.load(f)
        # Ensure int_to_label keys are integers, as they might be saved as strings in JSON
        int_to_label = {int(k): v for k, v in label_data['int_to_label'].items()}
        return int_to_label
    except Exception as e:
        st.error(f"Error loading label mapping from '{mapping_path}': {e}")
        return None

# Load resources
model = load_keras_model(MODEL_FILENAME)
int_to_label = load_label_map(LABEL_MAPPING_FILENAME)

# --- Image Preprocessing Function ---
def preprocess_for_prediction(image_pil, target_size=(IMG_HEIGHT, IMG_WIDTH)):
    """Prepares a PIL image for model prediction."""
    img = image_pil.convert('RGB') # Ensure 3 channels
    img_resized = img.resize(target_size)
    img_array = np.array(img_resized, dtype=np.float32)
    img_array = img_array / 255.0   # Normalize to [0, 1]
    img_array = np.expand_dims(img_array, axis=0) # Add batch dimension
    return img_array

# --- Streamlit App UI ---
st.title("☁️ Genera - Cloud Classifier 🌥️")
st.markdown("Upload an image of the sky, and this app will predict the dominant cloud genus.")

# Check if model and labels loaded successfully before proceeding
if model is None or int_to_label is None:
    st.error("Application cannot start due to errors loading model or label mapping. Please check the console/logs for details.")
else:
    uploaded_file = st.file_uploader("Choose a cloud image...", type=["jpg", "jpeg", "png"])

    if uploaded_file is not None:
        try:
            image_pil = Image.open(uploaded_file)
            
            col1, col2 = st.columns(2)
            with col1:
                st.image(image_pil, caption='Uploaded Image.', use_container_width=True)
            
            # Preprocess and predict
            with st.spinner('Analyzing the sky...'):
                processed_image_tensor = preprocess_for_prediction(image_pil)
                predictions = model.predict(processed_image_tensor)
                pred_probabilities = predictions[0] # Get probabilities for the single uploaded image

            with col2:
                st.subheader("🔍 Prediction Results:")
                # Display top N predictions with confidence
                top_n = 5 # Show top 5 predictions
                # Get indices of sorted probabilities (highest first)
                sorted_indices = np.argsort(pred_probabilities)[::-1]

                for i in range(min(top_n, len(pred_probabilities))):
                    class_index = sorted_indices[i]
                    class_name = int_to_label.get(class_index, f"Unknown Class ({class_index})")
                    confidence = pred_probabilities[class_index]
                    st.markdown(f"**{class_name}**: `{confidence*100:.2f}%`")

                # Highlight the top prediction
                top_pred_idx = sorted_indices[0]
                top_class_name = int_to_label.get(top_pred_idx, "Unknown Class")
                top_confidence = pred_probabilities[top_pred_idx]
                st.success(f"**Top Prediction: {top_class_name} ({top_confidence*100:.2f}%)**")

        except Exception as e:
            st.error(f"An error occurred during image processing or prediction: {e}")
            st.error("Please ensure the uploaded file is a valid image format (JPG, JPEG, PNG).")
    else:
        st.info("Please upload an image to classify.")

st.markdown("---")
st.markdown("Developed as part of the Personalized Weather Intelligence project.")