import numpy as np import pandas as pd from utils.kpi_analysis_utils import ( analyze_lcg_utilization, combine_comments, create_daily_date, create_dfs_per_kpi, kpi_naming_cleaning, ) from utils.utils_vars import get_physical_db lcg_comments_mapping = { "2": "No Congestion", "1": "No Congestion", "lcg1 exceeded threshold, lcg2 exceeded threshold, 2": "Need BB SU upgrage", "lcg1 exceeded threshold, 2": "Need LCG balancing", "lcg1 exceeded threshold, 1": "Need BB SU upgrage", "lcg2 exceeded threshold, 2": "Need LCG balancing", } KPI_COLUMNS = [ "date", "WBTS_name", "lcg_id", "BB_SU_LCG_MAX_R", ] LCG_ANALYSIS_COLUMNS = [ "WBTS_name", "lcg1_utilisation", "avg_lcg1", "max_lcg1", "number_of_days_with_lcg1_exceeded", "lcg1_comment", "lcg2_utilisation", "avg_lcg2", "max_lcg2", "number_of_days_with_lcg2_exceeded", "lcg2_comment", "difference_between_lcgs", "difference_between_lcgs_comment", "lcg_comment", "number_of_lcg", "final_comments", ] def lcg_kpi_analysis( df, num_last_days, num_threshold_days, lcg_utilization_threshold, difference_between_lcgs, ) -> pd.DataFrame: """ Analyze LCG capacity data. Args: df: DataFrame containing LCG capacity data num_last_days: Number of days for analysis num_threshold_days: Minimum days above threshold to flag for upgrade lcg_utilization_threshold: Utilization threshold percentage for flagging difference_between_lcgs: Difference between LCGs for flagging Returns: Processed DataFrame with LCG capacity analysis results """ lcg1_df = df[df["lcg_id"] == 1] lcg2_df = df[df["lcg_id"] == 2] pivoted_kpi_dfs = create_dfs_per_kpi( df=df, pivot_date_column="date", pivot_name_column="WBTS_name", kpi_columns_from=2, ) pivoted_lcg1_df = create_dfs_per_kpi( df=lcg1_df, pivot_date_column="date", pivot_name_column="WBTS_name", kpi_columns_from=2, ) pivoted_lcg2_df = create_dfs_per_kpi( df=lcg2_df, pivot_date_column="date", pivot_name_column="WBTS_name", kpi_columns_from=2, ) # BB_SU_LCG_MAX_R to have all site with LCG 1 and/ or LCG 2 BB_SU_LCG_MAX_R_df = pivoted_kpi_dfs["BB_SU_LCG_MAX_R"] pivoted_lcg1_df = pivoted_lcg1_df["BB_SU_LCG_MAX_R"] pivoted_lcg2_df = pivoted_lcg2_df["BB_SU_LCG_MAX_R"] # rename column pivoted_lcg1_df = pivoted_lcg1_df.rename( columns={"BB_SU_LCG_MAX_R": "lcg1_utilisation"} ) pivoted_lcg2_df = pivoted_lcg2_df.rename( columns={"BB_SU_LCG_MAX_R": "lcg2_utilisation"} ) # analyze lcg utilization for each site per number_of_kpi_days and number_of_threshold_days pivoted_lcg1_df = analyze_lcg_utilization( df=pivoted_lcg1_df, number_of_kpi_days=num_last_days, number_of_threshold_days=num_threshold_days, kpi_threshold=lcg_utilization_threshold, kpi_column_name="lcg1", ) pivoted_lcg2_df = analyze_lcg_utilization( df=pivoted_lcg2_df, number_of_kpi_days=num_last_days, number_of_threshold_days=num_threshold_days, kpi_threshold=lcg_utilization_threshold, kpi_column_name="lcg2", ) kpi_df = pd.concat( [ BB_SU_LCG_MAX_R_df, pivoted_lcg1_df, pivoted_lcg2_df, ], axis=1, ) kpi_df = kpi_df.reset_index() # Number of available lcgs # kpi_df = pd.merge(kpi_df, available_lcgs_df, on="WBTS_name", how="left") # calculate difference between lcg1 and lcg2 kpi_df["difference_between_lcgs"] = kpi_df[["avg_lcg1", "avg_lcg2"]].apply( lambda row: max(row) - min(row), axis=1 ) # flag if difference between lcg1 and lcg2 is above threshold kpi_df["difference_between_lcgs_comment"] = np.where( kpi_df["difference_between_lcgs"] > difference_between_lcgs, "difference between lcgs exceeded threshold", None, ) # Combine comments kpi_df = combine_comments( kpi_df, "lcg1_comment", "lcg2_comment", # "difference_between_lcgs_comment", new_column="lcg_comment", ) # Replace if "lcg_comment" contains "nan" and ", nan" and "nan, " with None kpi_df["lcg_comment"] = kpi_df["lcg_comment"].replace("nan", None) # Remove "nan" from comma-separated strings kpi_df["lcg_comment"] = ( kpi_df["lcg_comment"].str.replace(r"\bnan\b,?\s?", "", regex=True).str.strip() ) kpi_df["number_of_lcg"] = np.where( kpi_df["avg_lcg1"].notna() & kpi_df["avg_lcg2"].notna(), 2, np.where(kpi_df["avg_lcg1"].notna() | kpi_df["avg_lcg2"].notna(), 1, 0), ) # Combine comments kpi_df = combine_comments( kpi_df, "lcg_comment", "number_of_lcg", new_column="final_comments", ) kpi_df["final_comments"] = kpi_df["final_comments"].apply( lambda x: lcg_comments_mapping.get(x, x) ) kpi_df = kpi_df[LCG_ANALYSIS_COLUMNS] lcg_analysis_df = kpi_df.copy() lcg_analysis_df = lcg_analysis_df[ [ "WBTS_name", "avg_lcg1", "max_lcg1", "number_of_days_with_lcg1_exceeded", "lcg1_comment", "avg_lcg2", "max_lcg2", "number_of_days_with_lcg2_exceeded", "lcg2_comment", "difference_between_lcgs", "final_comments", ] ] lcg_analysis_df = lcg_analysis_df.droplevel(level=1, axis=1) # Remove row if code less than 5 characters lcg_analysis_df = lcg_analysis_df[lcg_analysis_df["WBTS_name"].str.len() >= 5] # Add code lcg_analysis_df["code"] = lcg_analysis_df["WBTS_name"].str.split("_").str[0] lcg_analysis_df["code"] = ( pd.to_numeric(lcg_analysis_df["code"], errors="coerce").fillna(0).astype(int) ) lcg_analysis_df["Region"] = ( lcg_analysis_df["WBTS_name"].str.split("_").str[1:2].str.join("_") ) lcg_analysis_df["Region"] = lcg_analysis_df["Region"].fillna("UNKNOWN") # move code to the first column lcg_analysis_df = lcg_analysis_df[ ["code", "Region"] + [col for col in lcg_analysis_df if col != "code" and col != "Region"] ] # Load physical database physical_db: pd.DataFrame = get_physical_db() # Convert code_sector to code physical_db["code"] = physical_db["Code_Sector"].str.split("_").str[0] # remove duplicates physical_db = physical_db.drop_duplicates(subset="code") # keep only code and longitude and latitude physical_db = physical_db[["code", "Longitude", "Latitude"]] physical_db["code"] = ( pd.to_numeric(physical_db["code"], errors="coerce").fillna(0).astype(int) ) lcg_analysis_df = pd.merge( lcg_analysis_df, physical_db, on="code", how="left", ) return [lcg_analysis_df, kpi_df] def load_and_process_lcg_data( uploaded_file, num_last_days, num_threshold_days, lcg_utilization_threshold, difference_between_lcgs, ) -> pd.DataFrame: """Load and process data for LCG capacity analysis.""" try: # Load data df = pd.read_csv(uploaded_file, delimiter=";") if df.empty: raise ValueError("Uploaded file is empty") df = kpi_naming_cleaning(df) df = create_daily_date(df) # Validate required columns missing_cols = [col for col in KPI_COLUMNS if col not in df.columns] if missing_cols: raise ValueError(f"Missing required columns: {', '.join(missing_cols)}") df = df[KPI_COLUMNS] # Process the data dfs = lcg_kpi_analysis( df, num_last_days, num_threshold_days, lcg_utilization_threshold, difference_between_lcgs, ) return dfs except Exception as e: # Log the error and re-raise with a user-friendly message error_msg = f"Error processing LCG data: {str(e)}" st.error(error_msg) raise