File size: 13,132 Bytes
13a5750 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 |
import pandas as pd
class WbtsCapacity:
final_results: pd.DataFrame = None
def check_deviation(row: pd.Series, max_diff: float = 3.0, type: str = "") -> str:
"""
Check if any value in the row deviates more than max_diff from the most common value.
Args:
row: Series of values to check for deviation
max_diff: Maximum allowed difference from the most common value
type: Type identifier for the deviation message
Returns:
A message indicating deviation if found, otherwise an empty string
"""
numeric_row = row.astype(float) # Ensure numeric
mode_series = numeric_row.mode()
# Safe fallback in case mode is empty
most_common = mode_series.iloc[0] if not mode_series.empty else numeric_row.iloc[0]
diffs = abs(numeric_row - most_common)
if (diffs > max_diff).any():
return f"{type} Deviation > {max_diff} detected"
else:
return ""
def create_daily_date(df: pd.DataFrame) -> pd.DataFrame:
"""
Create a daily date column from PERIOD_START_TIME and drop unnecessary columns.
Args:
df: DataFrame containing PERIOD_START_TIME column
Returns:
DataFrame with new date column and unnecessary columns removed
"""
date_df = df.copy()
date_df[["mois", "jour", "annee"]] = date_df["PERIOD_START_TIME"].str.split(
".", expand=True
)
date_df["date"] = date_df["annee"] + "-" + date_df["mois"] + "-" + date_df["jour"]
# Remove unnecessary columns
date_df = date_df.drop(["annee", "mois", "jour", "PERIOD_START_TIME"], axis=1)
return date_df
def kpi_naming_cleaning(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean KPI column names by replacing special characters and standardizing format.
Args:
df: DataFrame with KPI column names to clean
Returns:
DataFrame with cleaned column names
"""
name_df = df.copy()
name_df.columns = name_df.columns.str.replace("[ /(),-.']", "_", regex=True)
name_df.columns = name_df.columns.str.replace("___", "_")
name_df.columns = name_df.columns.str.replace("__", "_")
name_df.columns = name_df.columns.str.replace("%", "perc")
name_df.columns = name_df.columns.str.rstrip("_")
return name_df
def max_used_bb_subunits_analysis(
df: pd.DataFrame,
days: int = 7,
threshold: int = 80,
number_of_threshold_days: int = 3,
) -> pd.DataFrame:
"""
Analyze maximum used baseband subunits and identify sites needing upgrades.
Args:
df: DataFrame containing baseband utilization data
days: Number of days to analyze
threshold: Utilization threshold percentage for flagging
number_of_threshold_days: Minimum days above threshold to flag for upgrade
Returns:
DataFrame with analysis results and upgrade recommendations
"""
result_df = df.copy()
last_days_df = result_df.iloc[:, -days:]
last_days_df = last_days_df.fillna(0)
result_df["Average_used_bb_ratio"] = last_days_df.mean(axis=1).round(2)
# Count the number of days above threshold
result_df["bb_number_of_days_exceeding_threshold"] = last_days_df.apply(
lambda row: sum(1 for x in row if x >= threshold), axis=1
)
# Initialize comment column
result_df["Average_used_bb_ratio_comment"] = ""
# Apply condition for upgrade recommendation
result_df.loc[
(result_df["bb_number_of_days_exceeding_threshold"] >= number_of_threshold_days)
& (result_df["Average_used_bb_ratio"] >= threshold),
"Average_used_bb_ratio_comment",
] = "need BB upgrade"
return result_df
def cell_availability_analysis(df: pd.DataFrame, days: int = 7) -> pd.DataFrame:
"""
Analyze cell availability and categorize sites based on availability metrics.
Args:
df: DataFrame containing cell availability data
days: Number of days to analyze
Returns:
DataFrame with availability analysis and site status comments
"""
result_df = df.copy().fillna(0)
last_days_df = result_df.iloc[:, -days:]
result_df["Average_cell_availability"] = last_days_df.mean(axis=1).round(2)
# Categorize sites based on availability
def categorize_availability(x: float) -> str:
if x == 0 or pd.isnull(x):
return "Down Site"
elif 0 < x <= 70:
return "critical instability"
elif 70 < x <= 95:
return "instability"
else:
return "Site Ok"
result_df["availability_comment"] = result_df["Average_cell_availability"].apply(
categorize_availability
)
return result_df
def max_used_ce_analysis(
df: pd.DataFrame,
days: int = 7,
threshold: int = 80,
number_of_threshold_days: int = 3,
) -> pd.DataFrame:
"""
Analyze maximum used channel elements and identify sites needing upgrades.
Args:
df: DataFrame containing channel element utilization data
days: Number of days to analyze
threshold: Utilization threshold percentage for flagging
number_of_threshold_days: Minimum days above threshold to flag for upgrade
Returns:
DataFrame with analysis results and upgrade recommendations
"""
result_df = df.copy().fillna(0)
last_days_df = result_df.iloc[:, -days:]
result_df["Average_used_ce_ratio"] = last_days_df.mean(axis=1).round(2)
# Count the number of days above threshold
result_df["ce_number_of_days_exceeding_threshold"] = last_days_df.apply(
lambda row: sum(1 for x in row if x >= threshold), axis=1
)
# Initialize comment column
result_df["Average_used_ce_ratio_comment"] = ""
# Apply condition for upgrade recommendation
result_df.loc[
(result_df["ce_number_of_days_exceeding_threshold"] >= number_of_threshold_days)
& (result_df["Average_used_ce_ratio"] >= threshold),
"Average_used_ce_ratio_comment",
] = "need CE upgrade"
return result_df
def num_bb_subunits_analysis(df: pd.DataFrame, days: int = 3) -> pd.DataFrame:
"""
Analyze baseband subunit count for deviations.
Args:
df: DataFrame containing baseband subunit count data
days: Number of days to analyze
Returns:
DataFrame with deviation analysis comments
"""
result_df = df.copy()
last_days_df = result_df.iloc[:, -days:]
result_df["num_bb_subunits_comment"] = last_days_df.apply(
lambda row: check_deviation(row, type="bb"), axis=1
)
return result_df
def avail_ce_analysis(df: pd.DataFrame, days: int = 7) -> pd.DataFrame:
"""
Analyze available channel elements for deviations.
Args:
df: DataFrame containing available channel element data
days: Number of days to analyze
Returns:
DataFrame with deviation analysis comments
"""
result_df = df.copy()
last_days_df = result_df.iloc[:, -days:]
result_df["avail_ce_comment"] = last_days_df.apply(
lambda row: check_deviation(row, max_diff=96, type="ce"), axis=1
)
return result_df
def combine_comments(df: pd.DataFrame, *columns: str, new_column: str) -> pd.DataFrame:
"""
Combine comments from multiple columns into one column.
Args:
df: DataFrame containing comment columns
*columns: Variable number of column names containing comments
new_column: Name for the new combined comments column
Returns:
DataFrame with a new column containing combined comments
"""
result_df = df.copy()
result_df[new_column] = result_df[list(columns)].apply(
lambda row: ", ".join([x for x in row if x]), axis=1
)
# Trim all trailing commas
result_df[new_column] = result_df[new_column].str.replace(
r"^[,\s]+|[,\s]+$", "", regex=True
)
# Replace multiple commas with a single comma
result_df[new_column] = result_df[new_column].str.replace(
r",\s*,", ", ", regex=True
)
return result_df
def bb_comments_analysis(df: pd.DataFrame) -> pd.DataFrame:
"""
Combine baseband related comments into a single column.
Args:
df: DataFrame containing baseband comment columns
Returns:
DataFrame with combined baseband comments
"""
return combine_comments(
df,
"num_bb_subunits_comment",
"Average_used_bb_ratio_comment",
"availability_comment",
new_column="bb_comments",
)
def ce_comments_analysis(df: pd.DataFrame) -> pd.DataFrame:
"""
Combine channel element related comments into a single column.
Args:
df: DataFrame containing channel element comment columns
Returns:
DataFrame with combined channel element comments
"""
return combine_comments(
df,
"avail_ce_comment",
"Average_used_ce_ratio_comment",
"availability_comment",
new_column="ce_comments",
)
def create_dfs_per_kpi(
df: pd.DataFrame,
num_days: int = 7,
threshold: int = 80,
number_of_threshold_days: int = 3,
) -> pd.DataFrame:
"""
Create pivoted DataFrames for each KPI and perform analysis.
Args:
df: DataFrame containing KPI data
num_days: Number of days to analyze
threshold: Utilization threshold percentage for flagging
number_of_threshold_days: Minimum days above threshold to flag for upgrade
Returns:
DataFrame with combined analysis results
"""
kpi_columns = df.columns[5:]
pivoted_kpi_dfs = {}
# Loop through each KPI and create pivoted DataFrames
for kpi in kpi_columns:
temp_df = df[["date", "DN", kpi]].copy()
# Pivot the dataframe
pivot_df = temp_df.pivot(index="DN", columns="date", values=kpi)
pivot_df.columns = pd.MultiIndex.from_product([[kpi], pivot_df.columns])
pivot_df.columns.names = ["KPI", "Date"]
# Store in dictionary with KPI name as key
pivoted_kpi_dfs[kpi] = pivot_df
# Extract individual KPI DataFrames
wbts_name_df = pivoted_kpi_dfs["WBTS_name"].iloc[:, 0]
licensed_ce_df = pivoted_kpi_dfs["LICENSED_R99CE_WBTS_M5008C48"]
max_used_ce_dl_df = pivoted_kpi_dfs["MAX_USED_CE_R99_DL_M5008C12"]
max_used_ce_ul_df = pivoted_kpi_dfs["MAX_USED_CE_R99_UL_M5008C15"]
max_avail_ce_df = pivoted_kpi_dfs["MAX_AVAIL_R99_CE_M5006C0"]
max_used_bb_subunits_df = pivoted_kpi_dfs["MAX_USED_BB_SUBUNITS_M5008C38"]
num_bb_subunits_df = pivoted_kpi_dfs["NUM_BB_SUBUNITS_M5008C39"]
max_bb_sus_util_ratio_df = pivoted_kpi_dfs["Max_BB_SUs_Util_ratio"]
cell_availability_df = pivoted_kpi_dfs[
"Cell_Availability_excluding_blocked_by_user_state_BLU"
]
total_cs_traffic_df = pivoted_kpi_dfs["Total_CS_traffic_Erl"]
total_data_traffic_df = pivoted_kpi_dfs["Total_Data_Traffic"]
max_used_ce_ratio_flexi_df = pivoted_kpi_dfs["Max_Used_CE_s_ratio_Flexi_R2"]
# Perform analysis on each KPI DataFrame
max_bb_sus_util_ratio_df = max_used_bb_subunits_analysis(
max_bb_sus_util_ratio_df, num_days, threshold, number_of_threshold_days
)
cell_availability_df = cell_availability_analysis(cell_availability_df, num_days)
max_used_ce_ratio_flexi_df = max_used_ce_analysis(
max_used_ce_ratio_flexi_df, num_days, threshold, number_of_threshold_days
)
num_bb_subunits_df = num_bb_subunits_analysis(num_bb_subunits_df, num_days)
licensed_ce_df = avail_ce_analysis(licensed_ce_df, num_days)
# Concatenate all DataFrames
result_df = pd.concat(
[
wbts_name_df,
licensed_ce_df,
max_used_ce_dl_df,
max_used_ce_ul_df,
max_avail_ce_df,
max_used_bb_subunits_df,
num_bb_subunits_df,
max_bb_sus_util_ratio_df,
cell_availability_df,
total_cs_traffic_df,
total_data_traffic_df,
max_used_ce_ratio_flexi_df,
],
axis=1,
)
# Add combined comments analysis
result_df = bb_comments_analysis(result_df)
result_df = ce_comments_analysis(result_df)
return result_df
def load_data(
filepath: str,
num_days: int,
threshold: int,
number_of_threshold_days: int,
) -> pd.DataFrame:
"""
Load data from CSV file and perform preprocessing and analysis.
Args:
filepath: Path to CSV file or uploaded file object
num_days: Number of days to analyze
threshold: Utilization threshold percentage for flagging
number_of_threshold_days: Minimum days above threshold to flag for upgrade
Returns:
DataFrame with processed and analyzed data
"""
df = pd.read_csv(filepath, delimiter=";")
# Preprocess data
df = create_daily_date(df)
df = kpi_naming_cleaning(df)
# Reorder columns for better organization
df = df[["date"] + [col for col in df.columns if col not in ["date"]]]
df = df[[col for col in df.columns if col != "WBTS_name"] + ["WBTS_name"]]
# Perform KPI analysis
df = create_dfs_per_kpi(df, num_days, threshold, number_of_threshold_days)
return df
|