|
import streamlit as st
|
|
import numpy as np
|
|
from PIL import Image
|
|
import cv2
|
|
from scipy.ndimage import gaussian_filter
|
|
|
|
|
|
|
|
def find_tc_center(ir_image, smoothing_sigma=3):
|
|
smoothed_image = gaussian_filter(ir_image, sigma=smoothing_sigma)
|
|
min_coords = np.unravel_index(np.argmin(smoothed_image), smoothed_image.shape)
|
|
return min_coords[::-1]
|
|
|
|
def extract_local_region(ir_image, center, region_size=95):
|
|
h, w = ir_image.shape
|
|
half_size = region_size // 2
|
|
x_min = max(center[0] - half_size, 0)
|
|
x_max = min(center[0] + half_size, w)
|
|
y_min = max(center[1] - half_size, 0)
|
|
y_max = min(center[1] + half_size, h)
|
|
region = np.full((region_size, region_size), np.nan)
|
|
extracted = ir_image[y_min:y_max, x_min:x_max]
|
|
region[:extracted.shape[0], :extracted.shape[1]] = extracted
|
|
return region
|
|
|
|
def generate_hovmoller(X_data):
|
|
hovmoller_list = []
|
|
for ir_images in X_data:
|
|
time_steps = ir_images.shape[0]
|
|
hovmoller_data = np.zeros((time_steps, 95, 95))
|
|
for t in range(time_steps):
|
|
tc_center = find_tc_center(ir_images[t])
|
|
hovmoller_data[t] = extract_local_region(ir_images[t], tc_center, 95)
|
|
hovmoller_list.append(hovmoller_data)
|
|
return np.array(hovmoller_list)
|
|
|
|
def reshape_vmax(vmax_values, chunk_size=8):
|
|
trimmed_size = (len(vmax_values) // chunk_size) * chunk_size
|
|
vmax_values_trimmed = vmax_values[:trimmed_size]
|
|
return vmax_values_trimmed.reshape(-1, chunk_size)
|
|
def create_3d_vmax(vmax_2d_array):
|
|
|
|
vmax_3d_array = np.zeros((vmax_2d_array.shape[0], 8, 8))
|
|
|
|
|
|
for i in range(vmax_2d_array.shape[0]):
|
|
np.fill_diagonal(vmax_3d_array[i], vmax_2d_array[i])
|
|
|
|
|
|
vmax_3d_array = vmax_3d_array.reshape(-1, 8, 8, 1)
|
|
|
|
|
|
return vmax_3d_array
|
|
|
|
def process_lat_values(data):
|
|
lat_values = data
|
|
|
|
|
|
trimmed_size = (len(lat_values) // 8) * 8
|
|
lat_values_trimmed = lat_values[:trimmed_size]
|
|
lat_values_trimmed=np.array(lat_values_trimmed)
|
|
|
|
lat_2d_array = lat_values_trimmed.reshape(-1, 8)
|
|
|
|
return lat_2d_array
|
|
|
|
def process_lon_values(data):
|
|
lon_values =data
|
|
lon_values = np.array(lon_values)
|
|
|
|
trimmed_size = (len(lon_values) // 8) * 8
|
|
lon_values_trimmed = lon_values[:trimmed_size]
|
|
|
|
|
|
lon_2d_array = lon_values_trimmed.reshape(-1, 8)
|
|
|
|
return lon_2d_array
|
|
|
|
import numpy as np
|
|
|
|
def calculate_intensity_difference(vmax_2d_array):
|
|
"""Calculates intensity difference for each row in Vmax 2D array."""
|
|
int_diff = []
|
|
|
|
for i in vmax_2d_array:
|
|
k = abs(i[0] - i[-1])
|
|
i = np.append(i, k)
|
|
int_diff.append(i)
|
|
|
|
return np.array(int_diff)
|
|
|
|
import numpy as np
|
|
|
|
|
|
def process_images(images, batch_size=8, img_size=(95, 95, 1)):
|
|
num_images = images.shape[0]
|
|
|
|
|
|
trimmed_size = (num_images // batch_size) * batch_size
|
|
images_trimmed = images[:trimmed_size]
|
|
|
|
|
|
images_reshaped = images_trimmed.reshape(-1, batch_size, *img_size)
|
|
|
|
return images_reshaped
|
|
|
|
import numpy as np
|
|
|
|
def process_cc_mask(cc_data):
|
|
"""Processes CC mask images by trimming and reshaping into (x, 8, 95, 95, 1)."""
|
|
num_images = cc_data.shape[0]
|
|
batch_size = 8
|
|
trimmed_size = (num_images // batch_size) * batch_size
|
|
|
|
images_trimmed = cc_data[:trimmed_size]
|
|
cc_images = images_trimmed.reshape(-1, batch_size, 95, 95, 1)
|
|
|
|
return cc_images
|
|
def extract_convective_cores(ir_data):
|
|
"""
|
|
Extract Convective Cores (CCs) from IR imagery based on the criteria in the paper.
|
|
Args:
|
|
ir_data: IR imagery of shape (height, width).
|
|
Returns:
|
|
cc_mask: Binary mask of CCs (1 for CC, 0 otherwise) of shape (height, width).
|
|
"""
|
|
height, width,c = ir_data.shape
|
|
cc_mask = np.zeros_like(ir_data, dtype=np.float32)
|
|
|
|
|
|
neighbors = [(-1, -1), (-1, 0), (-1, 1),
|
|
(0, -1), (0,0) , (0, 1),
|
|
(1, -1), (1, 0), (1, 1)]
|
|
|
|
for i in range(1, height - 1):
|
|
for j in range(1, width - 1):
|
|
bt_ij = ir_data[i, j]
|
|
|
|
|
|
if (bt_ij >= 253).any():
|
|
continue
|
|
|
|
|
|
is_local_min = True
|
|
for di, dj in neighbors:
|
|
if ir_data[i + di, j + dj] < bt_ij:
|
|
is_local_min = False
|
|
break
|
|
if not is_local_min:
|
|
continue
|
|
|
|
|
|
numerator1 = (ir_data[i - 1, j] + ir_data[i + 1, j] - 2 * bt_ij) / 3.1
|
|
numerator2 = (ir_data[i, j - 1] + ir_data[i, j + 1] - 2 * bt_ij) / 8.0
|
|
lhs = numerator1 + numerator2
|
|
rhs = (4 / 5.8) * np.exp(0.0826 * (bt_ij - 217))
|
|
|
|
if lhs > rhs:
|
|
cc_mask[i, j] = 1
|
|
|
|
return cc_mask
|
|
|
|
def compute_convective_core_masks(ir_data):
|
|
"""Extracts convective core masks for each IR image."""
|
|
cc_mask = []
|
|
|
|
for i in ir_data:
|
|
c = extract_convective_cores(i)
|
|
c = np.array(c)
|
|
cc_mask.append(c)
|
|
|
|
return np.array(cc_mask)
|
|
|
|
|
|
|
|
st.set_page_config(page_title="TCIR Daily Input", layout="wide")
|
|
|
|
st.title("Tropical Cyclone Input Uploader (8 sets/day)")
|
|
|
|
ir_images = st.file_uploader("Upload 8 IR images", type=["jpg", "jpeg", "png"], accept_multiple_files=True)
|
|
pmw_images = st.file_uploader("Upload 8 PMW images", type=["jpg", "jpeg", "png"], accept_multiple_files=True)
|
|
|
|
if len(ir_images) != 8 or len(pmw_images) != 8:
|
|
st.warning("Please upload exactly 8 IR and 8 PMW images.")
|
|
else:
|
|
st.success("Uploaded 8 IR and 8 PMW images successfully.")
|
|
|
|
st.header("Input Latitude, Longitude, Vmax")
|
|
lat_values, lon_values, vmax_values = [], [], []
|
|
|
|
col1, col2, col3 = st.columns(3)
|
|
with col1:
|
|
for i in range(8):
|
|
lat_values.append(st.number_input(f"Latitude {i+1}", key=f"lat{i}"))
|
|
with col2:
|
|
for i in range(8):
|
|
lon_values.append(st.number_input(f"Longitude {i+1}", key=f"lon{i}"))
|
|
with col3:
|
|
for i in range(8):
|
|
vmax_values.append(st.number_input(f"Vmax {i+1}", key=f"vmax{i}"))
|
|
st.header("Select Prediction Model")
|
|
model_choice = st.selectbox(
|
|
"Choose a model for prediction",
|
|
("ConvGRU", "ConvLSTM", "Traj-GRU","3DCNN","spatiotemporalLSTM","Unet_LSTM"),
|
|
index=0
|
|
)
|
|
|
|
if st.button("Submit for Processing"):
|
|
|
|
if len(ir_images) == 8 and len(pmw_images) == 8:
|
|
|
|
if model_choice == "Unet_LSTM":
|
|
from unetlstm import predict_unetlstm
|
|
model_predict_fn = predict_unetlstm
|
|
elif model_choice == "ConvGRU":
|
|
from gru_model import predict
|
|
model_predict_fn = predict
|
|
elif model_choice == "ConvLSTM":
|
|
from convlstm import predict_lstm
|
|
model_predict_fn = predict_lstm
|
|
elif model_choice == "3DCNN":
|
|
from cnn3d import predict_3dcnn
|
|
model_predict_fn = predict_3dcnn
|
|
elif model_choice == "Traj-GRU":
|
|
from trjgru import predict_trajgru
|
|
model_predict_fn = predict_trajgru
|
|
elif model_choice == "spatiotemporalLSTM":
|
|
from spaio_temp import predict_stlstm
|
|
model_predict_fn = predict_stlstm
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ir_arrays = []
|
|
pmw_arrays = []
|
|
train_vmax_2d = reshape_vmax(np.array(vmax_values))
|
|
|
|
train_vmax_3d= create_3d_vmax(train_vmax_2d)
|
|
|
|
lat_processed = process_lat_values(lat_values)
|
|
lon_processed = process_lon_values(lon_values)
|
|
|
|
|
|
v_max_diff = calculate_intensity_difference(train_vmax_2d)
|
|
|
|
for ir in ir_images:
|
|
img = Image.open(ir).convert("L")
|
|
arr = np.array(img).astype(np.float32)
|
|
bt_arr = (arr / 255.0) * (310 - 190) + 190
|
|
resized = cv2.resize(bt_arr, (95, 95), interpolation=cv2.INTER_CUBIC)
|
|
ir_arrays.append(resized)
|
|
|
|
for pmw in pmw_images:
|
|
img = Image.open(pmw).convert("L")
|
|
arr = np.array(img).astype(np.float32) / 255.0
|
|
resized = cv2.resize(arr, (95, 95), interpolation=cv2.INTER_CUBIC)
|
|
pmw_arrays.append(resized)
|
|
ir=np.array(ir_arrays)
|
|
pmw=np.array(pmw_arrays)
|
|
|
|
ir_seq = process_images(ir)
|
|
pmw_seq = process_images(pmw)
|
|
|
|
|
|
|
|
|
|
|
|
X_train_new = ir_seq.reshape((1, 8, 95, 95))
|
|
|
|
|
|
cc_mask= compute_convective_core_masks(X_train_new)
|
|
|
|
hov_m_train = generate_hovmoller(X_train_new)
|
|
|
|
|
|
hov_m_train[np.isnan(hov_m_train)] = 0
|
|
hov_m_train = hov_m_train.transpose(0, 2, 3, 1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cc_mask[np.isnan(cc_mask)] = 0
|
|
cc_mask=cc_mask.reshape(1, 8, 95, 95, 1)
|
|
i_images=cc_mask+ir_seq
|
|
reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1)
|
|
reduced_images[np.isnan(reduced_images)] = 0
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if model_choice == "Unet_LSTM":
|
|
import tensorflow as tf
|
|
|
|
def tf_gradient_magnitude(images):
|
|
|
|
sobel_x = tf.constant([[1, 0, -1], [2, 0, -2], [1, 0, -1]], dtype=tf.float32)
|
|
sobel_y = tf.constant([[1, 2, 1], [0, 0, 0], [-1, -2, -1]], dtype=tf.float32)
|
|
sobel_x = tf.reshape(sobel_x, [3, 3, 1, 1])
|
|
sobel_y = tf.reshape(sobel_y, [3, 3, 1, 1])
|
|
|
|
images = tf.convert_to_tensor(images, dtype=tf.float32)
|
|
images = tf.expand_dims(images, -1)
|
|
|
|
gx = tf.nn.conv2d(images, sobel_x, strides=1, padding='SAME')
|
|
gy = tf.nn.conv2d(images, sobel_y, strides=1, padding='SAME')
|
|
grad_mag = tf.sqrt(tf.square(gx) + tf.square(gy) + 1e-6)
|
|
|
|
return tf.squeeze(grad_mag, -1).numpy()
|
|
def GM_maps_prep(ir):
|
|
GM_maps=[]
|
|
for i in ir:
|
|
GM_map = tf_gradient_magnitude(i)
|
|
GM_maps.append(GM_map)
|
|
GM_maps=np.array(GM_maps)
|
|
return GM_maps
|
|
ir_seq=ir_seq.reshape(8, 95, 95, 1)
|
|
GM_maps = GM_maps_prep(ir_seq)
|
|
print(GM_maps.shape)
|
|
GM_maps=GM_maps.reshape(1, 8, 95, 95, 1)
|
|
i_images=cc_mask+ir_seq+GM_maps
|
|
reduced_images = np.concatenate([i_images,pmw_seq ], axis=-1)
|
|
reduced_images[np.isnan(reduced_images)] = 0
|
|
print(reduced_images.shape)
|
|
y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff)
|
|
else:
|
|
y = model_predict_fn(reduced_images, hov_m_train, train_vmax_3d, lat_processed, lon_processed, v_max_diff)
|
|
st.write("Predicted Vmax:", y)
|
|
else:
|
|
st.error("Make sure you uploaded exactly 8 IR and 8 PMW images.")
|
|
|