import pandas as pd import numpy as np import datetime as dt import warnings from statsmodels.tsa.holtwinters import ExponentialSmoothing import plotly.graph_objects as go import gradio as gr warnings.filterwarnings("ignore") # ----------------------------- # CONFIG # ----------------------------- DATA_FILE = "202503-domae.parquet" # 같은 경로에 놓여 있어야 함 FORECAST_END_YEAR = 2030 # 예측 종료 연도(12월까지) SEASONAL_PERIODS = 12 # 월별 seasonality # ----------------------------- # 1. 데이터 적재 & 전처리 # ----------------------------- def load_data(path: str) -> pd.DataFrame: """Parquet → 월별 피벗 테이블(DateIndex, 열: 품목, 값: 가격).""" df = pd.read_parquet(path) # 날짜 컬럼 생성/정규화 (두 가지 케이스 지원) if "date" in df.columns: df["date"] = pd.to_datetime(df["date"]) elif "PRCE_REG_MM" in df.columns: df["date"] = pd.to_datetime(df["PRCE_REG_MM"].astype(str), format="%Y%m") else: raise ValueError("지원되지 않는 날짜 컬럼 형식입니다.") # 기본 컬럼명 통일 item_col = "PDLT_NM" if "PDLT_NM" in df.columns else "item" price_col = "AVRG_PRCE" if "AVRG_PRCE" in df.columns else "price" monthly = ( df.groupby(["date", item_col])[price_col] .mean() .reset_index() ) pivot = ( monthly .pivot(index="date", columns=item_col, values=price_col) .sort_index() ) # 월 시작일 MS 빈도로 정렬 pivot.index = pd.to_datetime(pivot.index).to_period("M").to_timestamp() return pivot pivot = load_data(DATA_FILE) products = pivot.columns.tolist() # ----------------------------- # 2. 고유 모델 정의 (Holt‑Winters + fallback) # ----------------------------- def _fit_forecast(series: pd.Series) -> pd.Series: """월별 시계열 → 2025‑04 이후 FORECAST_END_YEAR‑12까지 예측.""" # Ensure Monthly Start frequency series = series.asfreq("MS") # 예측 기간 계산 last_date = series.index[-1] end_date = dt.datetime(FORECAST_END_YEAR, 12, 1) horizon = (end_date.year - last_date.year) * 12 + (end_date.month - last_date.month) if horizon <= 0: return pd.Series(dtype=float) try: model = ExponentialSmoothing( series, trend="add", seasonal="mul", seasonal_periods=SEASONAL_PERIODS, initialization_method="estimated", ) res = model.fit(optimized=True) fc = res.forecast(horizon) except Exception: # 홀트윈터스 학습 실패 시 단순 CAGR 기반 예측 growth = series.pct_change().fillna(0).mean() fc = pd.Series( [series.iloc[-1] * (1 + growth) ** i for i in range(1, horizon + 1)], index=pd.date_range( series.index[-1] + pd.DateOffset(months=1), periods=horizon, freq="MS", ), ) return fc # 품목별 전체 시리즈(과거+예측) 사전 구축 → 앱 반응 속도 개선 FULL_SERIES = {} FORECASTS = {} for item in products: hist = pivot[item].dropna() fc = _fit_forecast(hist) FULL_SERIES[item] = pd.concat([hist, fc]) FORECASTS[item] = fc # ----------------------------- # 3. 내일 가격 예측 함수 # ----------------------------- today = dt.date.today() tomorrow = today + dt.timedelta(days=1) def build_tomorrow_df() -> pd.DataFrame: """내일(일 단위) 예상 가격 DataFrame 반환.""" preds = {} for item, series in FULL_SERIES.items(): # 일 단위 선형 보간 daily = series.resample("D").interpolate("linear") preds[item] = round(daily.loc[tomorrow], 2) if tomorrow in daily.index else np.nan return ( pd.DataFrame.from_dict(preds, orient="index", columns=[f"내일({tomorrow}) 예상가(KRW)"]) .sort_index() ) tomorrow_df = build_tomorrow_df() # ----------------------------- # 4. 시각화 함수 # ----------------------------- def plot_item(item: str): hist = pivot[item].dropna().asfreq("MS") fc = FORECASTS[item] fig = go.Figure() fig.add_trace(go.Scatter(x=hist.index, y=hist.values, mode="lines", name="Historical")) fig.add_trace(go.Scatter(x=fc.index, y=fc.values, mode="lines", name="Forecast")) fig.update_layout( title=f"{item} – Monthly Avg Price (1996‑2025) & Forecast(2025‑04→2030‑12)", xaxis_title="Date", yaxis_title="Price (KRW)", legend=dict(orientation="h", y=1.02, x=0.01), margin=dict(l=40, r=20, t=60, b=40), ) return fig # ----------------------------- # 5. Gradio UI # ----------------------------- with gr.Blocks(title="도매 가격 예측 App") as demo: gr.Markdown("## 📈 도매 가격 예측 대시보드 (1996‑2030)") # 품목 선택 → 그래프 업데이트 item_dd = gr.Dropdown(products, value=products[0], label="품목 선택") chart_out = gr.Plot(label="가격 추세") # 내일 가격 표 (초기 고정) gr.Markdown(f"### 내일({tomorrow}) 각 품목 예상가 (KRW)") tomorrow_table = gr.Dataframe(tomorrow_df, interactive=False, height=400) def update_chart(product): return plot_item(product) item_dd.change(update_chart, inputs=item_dd, outputs=chart_out, queue=False) # ----------------------------- # 6. 실행 스크립트 엔트리포인트 # ----------------------------- if __name__ == "__main__": demo.launch()