|
|
|
import numpy as np |
|
import matplotlib.pylab as plt |
|
import ruptures as rpt |
|
import streamlit as st |
|
from ruptures.metrics import precision_recall |
|
from ruptures.metrics import hausdorff |
|
from ruptures.metrics import randindex |
|
|
|
|
|
st.title("Change Point Detection") |
|
|
|
|
|
def pw_constant_input(n,dim,n_bkps,sigma): |
|
"""Piecewise constant (pw_constant)""" |
|
|
|
|
|
signal, bkps = rpt.pw_constant(n, dim, n_bkps, noise_std=sigma) |
|
rpt.display(signal, bkps) |
|
return signal,bkps |
|
|
|
def pw_linear_input(n,dim,n_bkps,sigma): |
|
"""Piecewise Linear""" |
|
|
|
|
|
|
|
signal, bkps = rpt.pw_linear(n, dim, n_bkps, noise_std=sigma) |
|
rpt.display(signal, bkps) |
|
return signal,bkps |
|
|
|
def pw_normal_input(n,dim,n_bkps,sigma): |
|
"""Piecewise 2D Gaussian process (pw_normal)#""" |
|
|
|
|
|
|
|
signal, bkps = rpt.pw_normal(n, n_bkps) |
|
rpt.display(signal, bkps) |
|
return signal,bkps |
|
|
|
def pw_wavy_input(n,dim,n_bkps,sigma): |
|
|
|
|
|
|
|
signal, bkps = rpt.pw_wavy(n, n_bkps, noise_std=sigma) |
|
rpt.display(signal, bkps) |
|
return signal,bkps |
|
|
|
input_list = ['piecewiseConstant','piecewiseLinear','piecewiseNormal','piecewiseSinusoidal'] |
|
generate_signal = st.selectbox(label = "Choose an input signal", options = input_list) |
|
|
|
n,dim,n_bkps,sigma = st.columns(4) |
|
with n: |
|
n= st.number_input('No of Samples',min_value=100,step=1) |
|
with dim: |
|
dim = st.number_input('No of dimesions',min_value=1,max_value = 5,step=1) |
|
with n_bkps: |
|
n_bkps = st.number_input('No of breakpoints',min_value=2,step=1) |
|
with sigma: |
|
sigma = st.number_input('Variance',min_value=1,max_value=4,step=1) |
|
|
|
if generate_signal == 'piecewiseConstant': |
|
signal,bkps = pw_constant_input(n,dim,n_bkps,sigma) |
|
elif generate_signal== 'piecewiseLinear': |
|
signal,bkps = pw_linear_input(n,dim,n_bkps,sigma) |
|
elif generate_signal == 'piecewiseNormal': |
|
signal,bkps = pw_normal_input(n,dim,n_bkps,sigma) |
|
else: |
|
signal,bkps= pw_wavy_input(n,dim,n_bkps,sigma) |
|
|
|
fig, axarr = rpt.display(signal,bkps) |
|
st.pyplot(fig) |
|
|
|
def dynp_method(signal,bkps,n_bkps): |
|
|
|
model = "l1" |
|
algo = rpt.Dynp(model=model, min_size=3, jump=5).fit(signal) |
|
my_bkps = algo.predict(n_bkps) |
|
|
|
fig,axarr = rpt.show.display(signal, bkps, my_bkps, figsize=(10, 3)) |
|
|
|
st.pyplot(fig) |
|
return my_bkps |
|
|
|
def pelt_method(signal,bkps,n_bkps): |
|
|
|
model = "l1" |
|
algo = rpt.Pelt(model=model, min_size=3, jump=5).fit(signal) |
|
my_bkps = algo.predict(pen=3) |
|
|
|
|
|
fig, ax_arr = rpt.display(signal, bkps, my_bkps, figsize=(10, 3)) |
|
st.pyplot(fig) |
|
return my_bkps |
|
|
|
|
|
def bin_seg_method(signal,bkps,n_bkps): |
|
|
|
model = "l2" |
|
algo = rpt.Binseg(model=model).fit(signal) |
|
my_bkps = algo.predict(n_bkps) |
|
|
|
|
|
fg,axxarr = rpt.show.display(signal, bkps, my_bkps, figsize=(10, 3)) |
|
st.pyplot(fig) |
|
return my_bkps |
|
|
|
def bot_up_seg(signal,bkps,n_bkps): |
|
|
|
model = "l2" |
|
algo = rpt.Binseg(model=model).fit(signal) |
|
my_bkps = algo.predict(n_bkps) |
|
|
|
|
|
fig,axxar = rpt.show.display(signal, bkps, my_bkps, figsize=(10, 3)) |
|
st.pyplot(fig) |
|
return my_bkps |
|
|
|
def win_sli_seg(signal,bkps,n_bkps): |
|
|
|
model = "l2" |
|
algo = rpt.Window(width=40, model=model).fit(signal) |
|
my_bkps = algo.predict(n_bkps) |
|
|
|
|
|
fig,axxar= rpt.show.display(signal, bkps, my_bkps, figsize=(10, 3)) |
|
st.pyplot(fig) |
|
return my_bkps |
|
|
|
|
|
searchmethod_list = ['Dynamic Programming','Pelt','Binary Segmentation','Bottom-up Segmentation','Window sliding segmentation'] |
|
detection_model = st.selectbox(label = "Choose a Detection Method",options = searchmethod_list) |
|
|
|
if detection_model== 'Dynamic Programming': |
|
bkps1 = dynp_method(signal,bkps,n_bkps) |
|
|
|
elif detection_model=='Pelt': |
|
bkps1 = pelt_method(signal,bkps,n_bkps) |
|
elif detection_model=='Binary Segmentation': |
|
bkps1 = bin_seg_method(signal,bkps,n_bkps) |
|
elif detection_model=='Bottom-up Segmentation': |
|
bkps1 = bot_up_seg(signal,bkps,n_bkps) |
|
else: |
|
bkps1 = win_sli_seg(signal,bkps,n_bkps) |
|
|
|
p, r = precision_recall(bkps, bkps1) |
|
st.header('Precision and Recall') |
|
st.write(p, r) |
|
|
|
st.header('Hausdorff metric') |
|
st.write(hausdorff(bkps, bkps1)) |
|
|
|
st.header('Rand index') |
|
|
|
|
|
st.write(randindex(bkps, bkps1)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|