import numpy as np import pandas as pd from queries.process_gsm import combined_gsm_database from utils.check_sheet_exist import execute_checks_sheets_exist from utils.convert_to_excel import convert_dfs, save_dataframe from utils.kpi_analysis_utils import ( GsmAnalysis, create_daily_date, create_dfs_per_kpi, create_hourly_date, kpi_naming_cleaning, ) class GsmCapacity: final_results = None GSM_COLUMNS = [ "ID_BTS", "site_name", "name", "BSC", "BCF", "BTS", "code", "Region", "adminState", "frequencyBandInUse", "amrSegLoadDepTchRateLower", "amrSegLoadDepTchRateUpper", "dedicatedGPRScapacity", "defaultGPRScapacity", "cellId", "band", "site_config_band", "trxRfPower", "BCCH", "number_trx_per_cell", "number_trx_per_bcf", "TRX_TCH", "MAL_TCH", ] TRX_COLUMNS = [ "ID_BTS", "number_tch_per_cell", "number_sd_per_cell", "number_bcch_per_cell", "number_ccch_per_cell", "number_cbc_per_cell", "number_total_channels_per_cell", "number_signals_per_cell", ] KPI_COLUMNS = [ "date", "BTS_name", "TCH_availability_ratio", "2G_Carried_Traffic", "TCH_call_blocking", "TCH_ABIS_FAIL_CALL_c001084", "SDCCH_real_blocking", ] BH_COLUMNS_FOR_CAPACITY = [ "Max_Traffic BH", "Avg_Traffic BH", "Max_tch_call_blocking BH", "Avg_tch_call_blocking BH", "number_of_days_with_tch_blocking_exceeded", "Max_sdcch_real_blocking BH", "Avg_sdcch_real_blocking BH", "number_of_days_with_sdcch_blocking_exceeded", ] def bh_tch_call_blocking_analysis( df: pd.DataFrame, number_of_kpi_days: int, tch_blocking_threshold: int, number_of_threshold_days: int, ) -> pd.DataFrame: result_df = df.copy() last_days_df = result_df.iloc[:, -number_of_kpi_days:] # last_days_df = last_days_df.fillna(0) result_df["Avg_tch_call_blocking BH"] = last_days_df.mean(axis=1).round(2) result_df["Max_tch_call_blocking BH"] = last_days_df.max(axis=1) # Count the number of days above threshold result_df["number_of_days_with_tch_blocking_exceeded"] = last_days_df.apply( lambda row: sum(1 for x in row if x >= tch_blocking_threshold), axis=1 ) return result_df def bh_sdcch_call_blocking_analysis( df: pd.DataFrame, number_of_kpi_days: int, sdcch_blocking_threshold: int, number_of_threshold_days: int, ) -> pd.DataFrame: result_df = df.copy() last_days_df = result_df.iloc[:, -number_of_kpi_days:] # last_days_df = last_days_df.fillna(0) result_df["Avg_sdcch_real_blocking BH"] = last_days_df.mean(axis=1).round(2) result_df["Max_sdcch_real_blocking BH"] = last_days_df.max(axis=1) # Count the number of days above threshold result_df["number_of_days_with_sdcch_blocking_exceeded"] = last_days_df.apply( lambda row: sum(1 for x in row if x >= sdcch_blocking_threshold), axis=1 ) return result_df def bh_traffic_analysis( df: pd.DataFrame, number_of_kpi_days: int, ) -> pd.DataFrame: result_df = df.copy() last_days_df = result_df.iloc[:, -number_of_kpi_days:] # last_days_df = last_days_df.fillna(0) result_df["Avg_Traffic BH"] = last_days_df.mean(axis=1).round(2) result_df["Max_Traffic BH"] = last_days_df.max(axis=1) return result_df def bh_dfs_per_kpi( df: pd.DataFrame, number_of_kpi_days: int = 7, tch_blocking_threshold: int = 0.50, sdcch_blocking_threshold: int = 0.50, number_of_threshold_days: int = 3, ) -> pd.DataFrame: """ Create pivoted DataFrames for each KPI and perform analysis. Args: df: DataFrame containing KPI data number_of_kpi_days: Number of days to analyze threshold: Utilization threshold percentage for flagging number_of_threshold_days: Minimum days above threshold to flag for upgrade Returns: DataFrame with combined analysis results """ pivoted_kpi_dfs = {} pivoted_kpi_dfs = create_dfs_per_kpi( df=df, pivot_date_column="date", pivot_name_column="BTS_name", kpi_columns_from=2, ) tch_call_blocking_df: pd.DataFrame = pivoted_kpi_dfs["TCH_call_blocking"] sdcch_real_blocking_df: pd.DataFrame = pivoted_kpi_dfs["SDCCH_real_blocking"] Carried_Traffic_df: pd.DataFrame = pivoted_kpi_dfs["2G_Carried_Traffic"] tch_availability_ratio_df: pd.DataFrame = pivoted_kpi_dfs["TCH_availability_ratio"] # ANALISYS tch_call_blocking_df = bh_tch_call_blocking_analysis( df=tch_call_blocking_df, number_of_kpi_days=number_of_kpi_days, tch_blocking_threshold=tch_blocking_threshold, number_of_threshold_days=number_of_threshold_days, ) sdcch_real_blocking_df = bh_sdcch_call_blocking_analysis( df=sdcch_real_blocking_df, number_of_kpi_days=number_of_kpi_days, sdcch_blocking_threshold=sdcch_blocking_threshold, number_of_threshold_days=number_of_threshold_days, ) Carried_Traffic_df = bh_traffic_analysis( df=Carried_Traffic_df, number_of_kpi_days=number_of_kpi_days, ) # Carried_Traffic_df["Max_Traffic BH"] = Carried_Traffic_df.max(axis=1) # Carried_Traffic_df["Avg_Traffic BH"] = Carried_Traffic_df.mean(axis=1) bh_kpi_df = pd.concat( [ tch_availability_ratio_df, Carried_Traffic_df, tch_call_blocking_df, sdcch_real_blocking_df, ], axis=1, ) # print(Carried_Traffic_df) return bh_kpi_df def analyse_bh_data( bh_report_path: str, number_of_kpi_days: int, tch_blocking_threshold: int, sdcch_blocking_threshold: int, number_of_threshold_days: int, ) -> pd.DataFrame: df = pd.read_csv(bh_report_path, delimiter=";") df = kpi_naming_cleaning(df) df = create_hourly_date(df) df = df[KPI_COLUMNS] df = bh_dfs_per_kpi( df=df, number_of_kpi_days=number_of_kpi_days, tch_blocking_threshold=tch_blocking_threshold, sdcch_blocking_threshold=sdcch_blocking_threshold, number_of_threshold_days=number_of_threshold_days, ) bh_df_for_capacity = df.copy() bh_df_for_capacity = bh_df_for_capacity[BH_COLUMNS_FOR_CAPACITY] bh_df_for_capacity = bh_df_for_capacity.reset_index() # If columns have multiple levels (MultiIndex), flatten them if isinstance(bh_df_for_capacity.columns, pd.MultiIndex): bh_df_for_capacity.columns = [ "_".join([str(el) for el in col if el]) for col in bh_df_for_capacity.columns.values ] # bh_df_for_capacity = bh_df_for_capacity.reset_index() # rename Bts_name to name bh_df_for_capacity = bh_df_for_capacity.rename(columns={"BTS_name": "name"}) return [bh_df_for_capacity, df] def daily_dfs_per_kpi( df: pd.DataFrame, number_of_kpi_days: int = 7, availability_threshold: int = 95, number_of_threshold_days: int = 3, tch_abis_fails_threshold: int = 10, ) -> pd.DataFrame: """ Create pivoted DataFrames for each KPI and perform analysis. Args: df: DataFrame containing KPI data number_of_kpi_days: Number of days to analyze threshold: Utilization threshold percentage for flagging number_of_threshold_days: Minimum days above threshold to flag for upgrade Returns: DataFrame with combined analysis results """ pivoted_kpi_dfs = {} pivoted_kpi_dfs = create_dfs_per_kpi( df=df, pivot_date_column="date", pivot_name_column="BTS_name", kpi_columns_from=2, ) tch_call_blocking_df: pd.DataFrame = pivoted_kpi_dfs["TCH_call_blocking"] sdcch_real_blocking_df: pd.DataFrame = pivoted_kpi_dfs["SDCCH_real_blocking"] Carried_Traffic_df: pd.DataFrame = pivoted_kpi_dfs["2G_Carried_Traffic"] tch_availability_ratio_df: pd.DataFrame = pivoted_kpi_dfs["TCH_availability_ratio"] tch_abis_fails_df: pd.DataFrame = pivoted_kpi_dfs["TCH_ABIS_FAIL_CALL_c001084"] def analyse_daily_data( daily_report_path: str, number_of_kpi_days: int, tch_abis_fails_threshold: int, availability_threshold: int, number_of_threshold_days: int, ) -> pd.DataFrame: df = pd.read_csv(daily_report_path, delimiter=";") df = kpi_naming_cleaning(df) df = create_daily_date(df) df = df[KPI_COLUMNS] df = daily_dfs_per_kpi( df=df, number_of_kpi_days=number_of_kpi_days, availability_threshold=availability_threshold, tch_abis_fails_threshold=tch_abis_fails_threshold, number_of_threshold_days=number_of_threshold_days, ) # print(df) def get_gsm_databases(dump_path: str) -> pd.DataFrame: dfs = combined_gsm_database(dump_path) bts_df: pd.DataFrame = dfs[0] trx_df: pd.DataFrame = dfs[2] # Clean GSM df bts_df = bts_df[GSM_COLUMNS] trx_df = trx_df[TRX_COLUMNS] # Remove duplicate in TRX df trx_df = trx_df.drop_duplicates(subset=["ID_BTS"]) gsm_df = pd.merge(bts_df, trx_df, on="ID_BTS", how="left") # add hf_rate_coef gsm_df["hf_rate_coef"] = gsm_df["amrSegLoadDepTchRateLower"].map( GsmAnalysis.hf_rate_coef ) # Add "GPRS" colomn equal to (dedicatedGPRScapacity * number_tch_per_cell)/100 gsm_df["GPRS"] = ( gsm_df["dedicatedGPRScapacity"] * gsm_df["number_tch_per_cell"] ) / 100 # "TCH Actual HR%" equal to "number of TCH" multiplyed by "Coef HF rate" gsm_df["TCH Actual HR%"] = gsm_df["number_tch_per_cell"] * gsm_df["hf_rate_coef"] # Remove empty rows gsm_df = gsm_df.dropna(subset=["TCH Actual HR%"]) # Get "Offered Traffic BH" by mapping approximate "TCH Actual HR%" to 2G analysis_utility "erlangB" dict gsm_df["Offered Traffic BH"] = gsm_df["TCH Actual HR%"].apply( lambda x: GsmAnalysis.erlangB_table.get(int(x), 0) ) # save_dataframe(gsm_df, "GSM") return gsm_df def analyze_gsm_data( dump_path: str, daily_report_path: str, bh_report_path: str, number_of_kpi_days: int, number_of_threshold_days: int, availability_threshold: int, tch_abis_fails_threshold: int, sddch_blocking_threshold: float, tch_blocking_threshold: float, ): # print("Analyzing data...") # print(f"Number of days: {number_of_kpi_days}") # print(f"availability_threshold: {availability_threshold}") analyse_daily_data( daily_report_path=daily_report_path, number_of_kpi_days=number_of_kpi_days, availability_threshold=availability_threshold, tch_abis_fails_threshold=tch_abis_fails_threshold, number_of_threshold_days=number_of_threshold_days, ) gsm_database_df: pd.DataFrame = get_gsm_databases(dump_path) bh_kpi_dfs = analyse_bh_data( bh_report_path=bh_report_path, number_of_kpi_days=number_of_kpi_days, tch_blocking_threshold=tch_blocking_threshold, sdcch_blocking_threshold=sddch_blocking_threshold, number_of_threshold_days=number_of_threshold_days, ) bh_kpi_df = bh_kpi_dfs[0] bh_kpi_full_df = bh_kpi_dfs[1] gsm_analysis_df = gsm_database_df.merge(bh_kpi_df, on="name", how="left") # "TCH UTILIZATION (@Max Traffic)" equal to "(Max_Trafic" divided by "Offered Traffic BH)*100" gsm_analysis_df["TCH UTILIZATION (@Max Traffic)"] = ( gsm_analysis_df["Max_Traffic BH"] / gsm_analysis_df["Offered Traffic BH"] ) * 100 # Add "ERLANGB value" =MAX TRAFFIC/(1-(MAX TCH call blocking/200)) gsm_analysis_df["ErlabngB_value"] = gsm_analysis_df["Max_Traffic BH"] / ( 1 - (gsm_analysis_df["Max_tch_call_blocking BH"] / 200) ) # - Get "Target FR CHs" by mapping "ERLANG value" to 2G analysis_utility "erlangB" dict gsm_analysis_df["Target FR CHs"] = gsm_analysis_df["ErlabngB_value"].apply( lambda x: GsmAnalysis.erlangB_table.get(int(x) if pd.notnull(x) else 0, 0) ) # "Target HR CHs" equal to "Target FR CHs" * 2 gsm_analysis_df["Target HR CHs"] = gsm_analysis_df["Target FR CHs"] * 2 # - Target TCHs equal to Target HR CHs + Signal + GPRS + SDCCH gsm_analysis_df["Target TCHs"] = ( gsm_analysis_df["Target HR CHs"] + gsm_analysis_df["number_signals_per_cell"] + gsm_analysis_df["GPRS"] + gsm_analysis_df["number_sd_per_cell"] ) # "Target TRXs" equal to roundup(Target TCHs/8) gsm_analysis_df["Target TRXs"] = np.ceil( gsm_analysis_df["Target TCHs"] / 8 ) # df["Target TCHs"] / 8 # "Numberof required TRXs" equal to difference between "Target TRXs" and "number_trx_per_cell" gsm_analysis_df["Numberof required TRXs"] = ( gsm_analysis_df["Target TRXs"] - gsm_analysis_df["number_trx_per_cell"] ) return [gsm_analysis_df, bh_kpi_full_df]