|
import tensorflow as tf
|
|
from tensorflow.keras import layers, models
|
|
|
|
def encoder_block(inputs, filters):
|
|
x = layers.Conv3D(filters=filters, kernel_size=(3, 3, 4), padding="same", activation="relu")(inputs)
|
|
x = layers.BatchNormalization()(x)
|
|
return x
|
|
|
|
def convlstm_block(inputs, filters):
|
|
|
|
x = layers.Reshape((inputs.shape[1], inputs.shape[2], inputs.shape[3], inputs.shape[4]))(inputs)
|
|
x = layers.ConvLSTM2D(filters=filters, kernel_size=(3, 3), padding="same", return_sequences=True)(x)
|
|
x = layers.BatchNormalization()(x)
|
|
|
|
x = layers.Reshape((inputs.shape[1], inputs.shape[2], inputs.shape[3], filters))(x)
|
|
return x
|
|
|
|
def decoder_block(inputs, skip_connection, filters):
|
|
x = layers.Conv3DTranspose(filters=filters, kernel_size=(3, 3, 4), padding="same", activation="relu")(inputs)
|
|
x = layers.BatchNormalization()(x)
|
|
skip_resized = layers.Conv3D(filters, (1, 1, 1), padding="same")(skip_connection)
|
|
x = layers.Concatenate()([x, skip_resized])
|
|
x = layers.ConvLSTM2D(filters=filters, kernel_size=(3, 3), padding="same", return_sequences=True)(x)
|
|
return x
|
|
|
|
def build_unet_convlstm(input_shape=(8, 95, 95, 3)):
|
|
input_tensor = layers.Input(shape=input_shape)
|
|
|
|
|
|
skip1 = encoder_block(input_tensor, filters=8)
|
|
skip1 = convlstm_block(skip1, filters=8)
|
|
|
|
skip2 = encoder_block(skip1, filters=16)
|
|
skip2 = convlstm_block(skip2, filters=16)
|
|
|
|
|
|
x = layers.Conv3D(filters=32, kernel_size=(3, 3, 3), padding="same", activation="relu")(skip2)
|
|
x = layers.BatchNormalization()(x)
|
|
x = convlstm_block(x, filters=32)
|
|
|
|
|
|
x = decoder_block(x, skip2, filters=16)
|
|
x = decoder_block(x, skip1, filters=8)
|
|
|
|
|
|
x = layers.Conv3D(filters=1, kernel_size=(1, 1, 1), activation="relu")(x)
|
|
x = layers.GlobalAveragePooling3D()(x)
|
|
|
|
model = models.Model(inputs=input_tensor, outputs=x)
|
|
return model
|
|
|
|
|
|
import tensorflow as tf
|
|
from tensorflow.keras import layers, models
|
|
|
|
def RSTNet(input_shape):
|
|
"""
|
|
Creates the subnet for extracting TC radial structure features using a five-branch CNN design with 2D convolutions.
|
|
|
|
Parameters:
|
|
- input_shape: tuple, shape of the input data (e.g., (95, 95, 3))
|
|
|
|
Returns:
|
|
- model: tf.keras.Model, the radial structure subnet model
|
|
"""
|
|
|
|
input_tensor = layers.Input(shape=input_shape)
|
|
|
|
|
|
|
|
|
|
|
|
nw_quadrant = input_tensor[:, :input_shape[0]//2, :input_shape[1]//2, :]
|
|
ne_quadrant = input_tensor[:, :input_shape[0]//2, input_shape[1]//2:, :]
|
|
sw_quadrant = input_tensor[:, input_shape[0]//2:, :input_shape[1]//2, :]
|
|
se_quadrant = input_tensor[:, input_shape[0]//2:, input_shape[1]//2:, :]
|
|
|
|
|
|
target_height = max(input_shape[0]//2, input_shape[0] - input_shape[0]//2)
|
|
target_width = max(input_shape[1]//2, input_shape[1] - input_shape[1]//2)
|
|
|
|
|
|
nw_quadrant = layers.ZeroPadding2D(padding=((0, target_height - nw_quadrant.shape[1]),
|
|
(0, target_width - nw_quadrant.shape[2])))(nw_quadrant)
|
|
ne_quadrant = layers.ZeroPadding2D(padding=((0, target_height - ne_quadrant.shape[1]),
|
|
(0, target_width - ne_quadrant.shape[2])))(ne_quadrant)
|
|
sw_quadrant = layers.ZeroPadding2D(padding=((0, target_height - sw_quadrant.shape[1]),
|
|
(0, target_width - sw_quadrant.shape[2])))(sw_quadrant)
|
|
se_quadrant = layers.ZeroPadding2D(padding=((0, target_height - se_quadrant.shape[1]),
|
|
(0, target_width - se_quadrant.shape[2])))(se_quadrant)
|
|
|
|
print(nw_quadrant.shape)
|
|
print(ne_quadrant.shape)
|
|
print(sw_quadrant.shape)
|
|
print(se_quadrant.shape)
|
|
|
|
main_branch = layers.Conv2D(filters=8, kernel_size=(3, 3), padding='same', activation='relu')(input_tensor)
|
|
y=layers.MaxPool2D()(main_branch)
|
|
|
|
y = layers.ZeroPadding2D(padding=((0, target_height - y.shape[1]),
|
|
(0, target_width - y.shape[2])))(y)
|
|
|
|
nw_branch = layers.Conv2D(filters=8, kernel_size=(3, 3), padding='same', activation='relu')(nw_quadrant)
|
|
ne_branch = layers.Conv2D(filters=8, kernel_size=(3, 3), padding='same', activation='relu')(ne_quadrant)
|
|
sw_branch = layers.Conv2D(filters=8, kernel_size=(3, 3), padding='same', activation='relu')(sw_quadrant)
|
|
se_branch = layers.Conv2D(filters=8, kernel_size=(3, 3), padding='same', activation='relu')(se_quadrant)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
fusion = layers.concatenate([y, nw_branch, ne_branch, sw_branch, se_branch], axis=-1)
|
|
|
|
|
|
|
|
x=layers.Reshape((1, 48, 48, 40))(fusion)
|
|
x = layers.ConvLSTM2D(filters=16, kernel_size=(3, 3), padding="same", return_sequences=True)(x)
|
|
x=layers.Reshape((48, 48, 16))(x)
|
|
x=layers.MaxPool2D(pool_size=(2, 2))(x)
|
|
|
|
nw_branch = layers.Conv2D(filters=16, kernel_size=(3, 3), padding='same', activation='relu')(nw_branch)
|
|
|
|
ne_branch = layers.Conv2D(filters=16, kernel_size=(3, 3), padding='same', activation='relu')(ne_branch)
|
|
sw_branch = layers.Conv2D(filters=16, kernel_size=(3, 3), padding='same', activation='relu')(sw_branch)
|
|
se_branch = layers.Conv2D(filters=16, kernel_size=(3, 3), padding='same', activation='relu')(se_branch)
|
|
nw_branch = layers.MaxPool2D(pool_size=(2, 2))(nw_branch)
|
|
ne_branch = layers.MaxPool2D(pool_size=(2, 2))(ne_branch)
|
|
sw_branch = layers.MaxPool2D(pool_size=(2, 2))(sw_branch)
|
|
se_branch = layers.MaxPool2D(pool_size=(2, 2))(se_branch)
|
|
|
|
fusion = layers.concatenate([x, nw_branch, ne_branch, sw_branch, se_branch], axis=-1)
|
|
|
|
x=layers.Reshape((1, 24, 24, 80))(fusion)
|
|
x = layers.ConvLSTM2D(filters=32, kernel_size=(3, 3), padding="same", return_sequences=True)(x)
|
|
x=layers.Reshape((24, 24, 32))(x)
|
|
x=layers.MaxPool2D(pool_size=(2, 2))(x)
|
|
|
|
nw_branch = layers.Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu')(nw_branch)
|
|
|
|
ne_branch = layers.Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu')(ne_branch)
|
|
sw_branch = layers.Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu')(sw_branch)
|
|
se_branch = layers.Conv2D(filters=32, kernel_size=(3, 3), padding='same', activation='relu')(se_branch)
|
|
nw_branch = layers.MaxPool2D(pool_size=(2, 2))(nw_branch)
|
|
ne_branch = layers.MaxPool2D(pool_size=(2, 2))(ne_branch)
|
|
sw_branch = layers.MaxPool2D(pool_size=(2, 2))(sw_branch)
|
|
se_branch = layers.MaxPool2D(pool_size=(2, 2))(se_branch)
|
|
|
|
fusion = layers.concatenate([x, nw_branch, ne_branch, sw_branch, se_branch], axis=-1)
|
|
|
|
x=layers.Reshape((1,12, 12, 160))(fusion)
|
|
x = layers.ConvLSTM2D(filters=32, kernel_size=(3, 3), padding="same", return_sequences=True)(x)
|
|
x=layers.Reshape((12, 12, 32))(x)
|
|
x=layers.Conv2D(filters=32, kernel_size=(3, 3), activation=None)(x)
|
|
|
|
x=layers.Flatten()(x)
|
|
model = models.Model(inputs=input_tensor, outputs=x)
|
|
return model
|
|
|
|
from tensorflow.keras import layers, models
|
|
|
|
def build_cnn_model(input_shape=(8, 8, 1)):
|
|
|
|
input_tensor = layers.Input(shape=input_shape)
|
|
|
|
|
|
x = layers.Conv2D(64, (3, 3), padding='same')(input_tensor)
|
|
x = layers.BatchNormalization()(x)
|
|
x = layers.ReLU()(x)
|
|
|
|
|
|
x = layers.Flatten()(x)
|
|
|
|
|
|
model = models.Model(inputs=input_tensor, outputs=x)
|
|
|
|
return model
|
|
|
|
from tensorflow.keras import layers, models, Input
|
|
|
|
def build_combined_model():
|
|
|
|
input_shape_3d = (8, 95, 95, 2)
|
|
input_shape_radial = (95, 95, 8)
|
|
input_shape_cnn = (8, 8, 1)
|
|
|
|
input_shape_latitude = (8,)
|
|
input_shape_longitude = (8,)
|
|
input_shape_other = (9,)
|
|
|
|
|
|
model_3d = build_unet_convlstm(input_shape=input_shape_3d)
|
|
model_radial = RSTNet(input_shape=input_shape_radial)
|
|
model_cnn = build_cnn_model(input_shape=input_shape_cnn)
|
|
|
|
|
|
input_latitude = Input(shape=input_shape_latitude ,name="latitude_input")
|
|
input_longitude = Input(shape=input_shape_longitude, name="longitude_input")
|
|
input_other = Input(shape=input_shape_other, name="other_input")
|
|
|
|
|
|
flat_latitude = layers.Dense(32,activation='relu')(input_latitude)
|
|
flat_longitude = layers.Dense(32,activation='relu')(input_longitude)
|
|
flat_other = layers.Dense(64,activation='relu')(input_other)
|
|
|
|
|
|
combined = layers.concatenate([
|
|
model_3d.output,
|
|
model_radial.output,
|
|
model_cnn.output,
|
|
flat_latitude,
|
|
flat_longitude,
|
|
flat_other
|
|
])
|
|
|
|
|
|
x = layers.Dense(128, activation='relu')(combined)
|
|
x = layers.Dense(1, activation=None)(x)
|
|
|
|
|
|
final_model = models.Model(
|
|
inputs=[model_3d.input, model_radial.input, model_cnn.input,
|
|
input_latitude, input_longitude, input_other ],
|
|
outputs=x
|
|
)
|
|
|
|
return final_model
|
|
|
|
import h5py
|
|
with h5py.File(r"E:\1MAIN PROJECT\tf_env\final_model.h5", 'r') as f:
|
|
print(f.attrs.get('keras_version'))
|
|
print(f.attrs.get('backend'))
|
|
print("Model layers:", list(f['model_weights'].keys()))
|
|
|
|
model = build_combined_model()
|
|
model.load_weights(r"E:\1MAIN PROJECT\tf_env\final_model.h5")
|
|
|
|
|
|
def predict_unetlstm(reduced_images_test,hov_m_test,test_vmax_3d,lat_test,lon_test,int_diff_test):
|
|
y=model.predict([reduced_images_test,hov_m_test,test_vmax_3d,lat_test,lon_test,int_diff_test ])
|
|
return y |