Spaces:
Running
Running
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
import requests | |
import time | |
from collections import defaultdict | |
import datetime | |
import altair as alt | |
# Set page layout to wide mode and set page title | |
st.set_page_config(layout="wide", page_title="影城效率与内容分析工具") | |
# --- Helper Functions --- | |
def clean_movie_title(title): | |
if not isinstance(title, str): | |
return title | |
return title.split(' ', 1)[0] | |
def style_efficiency(row): | |
green = 'background-color: #E6F5E6;' # Light Green | |
red = 'background-color: #FFE5E5;' # Light Red | |
default = '' | |
styles = [default] * len(row) | |
seat_efficiency = row.get('座次效率', 0) | |
session_efficiency = row.get('场次效率', 0) | |
if seat_efficiency > 1.5 or session_efficiency > 1.5: | |
styles = [green] * len(row) | |
elif seat_efficiency < 0.5 or session_efficiency < 0.5: | |
styles = [red] * len(row) | |
return styles | |
def process_and_analyze_data(df): | |
if df.empty: | |
return pd.DataFrame() | |
analysis_df = df.groupby('影片名称_清理后').agg( | |
座位数=('座位数', 'sum'), | |
场次=('影片名称_清理后', 'size'), | |
票房=('总收入', 'sum'), | |
人次=('总人次', 'sum') | |
).reset_index() | |
analysis_df.rename(columns={'影片名称_清理后': '影片'}, inplace=True) | |
analysis_df = analysis_df.sort_values(by='票房', ascending=False).reset_index(drop=True) | |
total_seats = analysis_df['座位数'].sum() | |
total_sessions = analysis_df['场次'].sum() | |
total_revenue = analysis_df['票房'].sum() | |
analysis_df['均价'] = np.divide(analysis_df['票房'], analysis_df['人次']).fillna(0) | |
analysis_df['座次比'] = np.divide(analysis_df['座位数'], total_seats).fillna(0) | |
analysis_df['场次比'] = np.divide(analysis_df['场次'], total_sessions).fillna(0) | |
analysis_df['票房比'] = np.divide(analysis_df['票房'], total_revenue).fillna(0) | |
analysis_df['座次效率'] = np.divide(analysis_df['票房比'], analysis_df['座次比']).fillna(0) | |
analysis_df['场次效率'] = np.divide(analysis_df['票房比'], analysis_df['场次比']).fillna(0) | |
final_columns = ['影片', '座位数', '场次', '票房', '人次', '均价', '座次比', '场次比', '票房比', '座次效率', | |
'场次效率'] | |
analysis_df = analysis_df[final_columns] | |
return analysis_df | |
def get_circled_number(hall_name): | |
mapping = {'1': '①', '2': '②', '3': '③', '4': '④', '5': '⑤', '6': '⑥', '7': '⑦', '8': '⑧', '9': '⑨'} | |
num_str = ''.join(filter(str.isdigit, hall_name)) | |
return mapping.get(num_str, '') | |
def format_play_time(time_str): | |
if not time_str or not isinstance(time_str, str): return None | |
try: | |
parts = time_str.split(':'); | |
hours = int(parts[0]); | |
minutes = int(parts[1]) | |
return hours * 60 + minutes | |
except (ValueError, IndexError): | |
return None | |
def add_tms_locations_to_analysis(analysis_df, tms_movie_list): | |
locations = [] | |
for index, row in analysis_df.iterrows(): | |
movie_title = row['影片'] | |
found_versions = [] | |
for tms_movie in tms_movie_list: | |
if tms_movie['assert_name'].startswith(movie_title): | |
version_name = tms_movie['assert_name'].replace(movie_title, '').strip() | |
circled_halls = " ".join(sorted([get_circled_number(h) for h in tms_movie['halls']])) | |
if version_name: | |
found_versions.append(f"{version_name}:{circled_halls}") | |
else: | |
found_versions.append(circled_halls) | |
locations.append('|'.join(found_versions)) | |
analysis_df['影片所在影厅位置'] = locations | |
return analysis_df | |
def get_chinese_holidays_2025(): | |
holidays = set() | |
holidays.add(datetime.date(2025, 1, 1)) | |
holidays.update([datetime.date(2025, 1, 28), datetime.date(2025, 1, 29), datetime.date(2025, 1, 30), | |
datetime.date(2025, 1, 31), datetime.date(2025, 2, 1), datetime.date(2025, 2, 2), | |
datetime.date(2025, 2, 3)]) | |
holidays.update([datetime.date(2025, 4, 4), datetime.date(2025, 4, 5), datetime.date(2025, 4, 6)]) | |
holidays.update([datetime.date(2025, 5, 1), datetime.date(2025, 5, 2), datetime.date(2025, 5, 3), | |
datetime.date(2025, 5, 4), datetime.date(2025, 5, 5)]) | |
holidays.update([datetime.date(2025, 5, 30), datetime.date(2025, 5, 31), datetime.date(2025, 6, 1)]) | |
holidays.add(datetime.date(2025, 10, 6)) | |
holidays.update([datetime.date(2025, 10, 1), datetime.date(2025, 10, 2), datetime.date(2025, 10, 3), | |
datetime.date(2025, 10, 4), datetime.date(2025, 10, 5), datetime.date(2025, 10, 6), | |
datetime.date(2025, 10, 7)]) | |
return holidays | |
def plot_daily_box_office(df, selected_movie='全部影片'): | |
if selected_movie != '全部影片': | |
plot_df = df[df['影片名称_清理后'] == selected_movie].copy() | |
else: | |
plot_df = df.copy() | |
if plot_df.empty: | |
st.warning(f"影片《{selected_movie}》在所分析的文件中没有找到数据。") | |
return None | |
daily_revenue = plot_df.groupby('放映日期')['总收入'].sum().reset_index() | |
daily_revenue.rename(columns={'放映日期': '日期', '总收入': '票房'}, inplace=True) | |
total_box_office = daily_revenue['票房'].sum() | |
chart_title = f'每日票房表现 - {selected_movie} | 总票房: {total_box_office:,.0f} 元' | |
start_date = pd.to_datetime(df['放映日期'].min()) | |
end_date = pd.to_datetime(df['放映日期'].max()) | |
full_date_range = pd.to_datetime(pd.date_range(start=start_date, end=end_date, freq='D')) | |
daily_revenue['日期'] = pd.to_datetime(daily_revenue['日期']) | |
daily_revenue = pd.merge(pd.DataFrame({'日期': full_date_range}), daily_revenue, on='日期', how='left').fillna(0) | |
holidays = get_chinese_holidays_2025() | |
daily_revenue['day_of_week'] = daily_revenue['日期'].dt.dayofweek | |
daily_revenue['类型'] = daily_revenue.apply( | |
lambda row: '节假日' if row['日期'].date() in holidays else ( | |
'周末' if row['day_of_week'] in [4, 5, 6] else '工作日'), | |
axis=1 | |
) | |
chart = alt.Chart(daily_revenue).mark_bar().encode( | |
x=alt.X('日期:T', title='日期', axis=alt.Axis(labelAngle=-45, format='%m-%d')), | |
y=alt.Y('票房:Q', title='票房 (元)', scale=alt.Scale(domainMin=0)), | |
color=alt.Color('类型:N', | |
scale=alt.Scale(domain=['工作日', '周末', '节假日'], range=['#87CEEB', '#FFA500', '#FF4500']), | |
legend=alt.Legend(title="日期类型")), | |
tooltip=[alt.Tooltip('日期:T', format='%Y-%m-%d', title='日期'), | |
alt.Tooltip('票房:Q', format=',.2f', title='票房'), | |
alt.Tooltip('类型:N', title='类型')] | |
).properties( | |
title=chart_title | |
).interactive() | |
return chart | |
def round_time_to_5min(t_datetime): | |
if not isinstance(t_datetime, datetime.datetime): | |
if isinstance(t_datetime, datetime.time): | |
t_datetime = datetime.datetime.combine(datetime.date.today(), t_datetime) | |
else: | |
return None | |
minute = (t_datetime.minute // 5) * 5 | |
rounded_datetime = t_datetime.replace(minute=minute, second=0, microsecond=0) | |
return rounded_datetime.time() | |
# --- REQUIREMENT 1: New function to plot daily box office by time period --- | |
def plot_daily_box_office_by_time(df, selected_movie='全部影片'): | |
if selected_movie != '全部影片': | |
plot_df = df[df['影片名称_清理后'] == selected_movie].copy() | |
else: | |
plot_df = df.copy() | |
if plot_df.empty: | |
return | |
plot_df['时间点'] = plot_df['放映时间'].apply(round_time_to_5min) | |
time_revenue = plot_df.groupby('时间点')['总收入'].sum().reset_index() | |
time_revenue.rename(columns={'总收入': '票房'}, inplace=True) | |
time_revenue['时间点'] = time_revenue['时间点'].apply(lambda t: t.strftime('%H:%M')) | |
chart_title = f'影城每日时间段票房表现 - {selected_movie}' | |
chart = alt.Chart(time_revenue).mark_bar().encode( | |
x=alt.X('时间点:N', title='时间点', sort=None, axis=alt.Axis(labelAngle=-45)), | |
y=alt.Y('票房:Q', title='票房 (元)'), | |
tooltip=[ | |
alt.Tooltip('时间点:N', title='时间点'), | |
alt.Tooltip('票房:Q', format=',.2f', title='票房') | |
] | |
).properties( | |
title=chart_title | |
).interactive() | |
st.altair_chart(chart, use_container_width=True) | |
# --- Original time efficiency function (for the first tab) --- | |
def plot_time_efficiency_analysis(df): | |
df_filtered = df[(df['放映时间'] >= datetime.time(9, 30)) & (df['放映时间'] <= datetime.time(23, 59))].copy() | |
if df_filtered.empty: | |
st.warning("在 9:30 - 23:59 时间段内没有找到场次数据。") | |
return | |
df_filtered['时间点'] = df_filtered['放映时间'].apply(round_time_to_5min) | |
total_revenue_full_day = df['总收入'].sum() | |
total_seats_full_day = df['座位数'].sum() | |
total_sessions_full_day = len(df) | |
if total_revenue_full_day == 0 or total_seats_full_day == 0 or total_sessions_full_day == 0: | |
st.warning("总收入、总座位数或总场次数为零,无法计算效率。") | |
return | |
time_analysis = df_filtered.groupby(['放映日期', '时间点']).agg( | |
票房=('总收入', 'sum'), | |
座位数=('座位数', 'sum'), | |
场次=('场次', 'size'), | |
).reset_index() | |
time_analysis['票房比'] = time_analysis['票房'] / total_revenue_full_day | |
time_analysis['座次比'] = time_analysis['座位数'] / total_seats_full_day | |
time_analysis['场次比'] = time_analysis['场次'] / total_sessions_full_day | |
time_analysis['座次效率'] = (time_analysis['票房比'] / time_analysis['座次比']).fillna(0) | |
time_analysis['场次效率'] = (time_analysis['票房比'] / time_analysis['场次比']).fillna(0) | |
avg_time_efficiency = time_analysis.groupby('时间点')[['座次效率', '场次效率']].mean().reset_index() | |
avg_time_efficiency['时间点'] = avg_time_efficiency['时间点'].apply(lambda t: t.strftime('%H:%M')) | |
source = avg_time_efficiency.melt(id_vars=['时间点'], value_vars=['座次效率', '场次效率'], var_name='效率类型', | |
value_name='效率值') | |
chart = alt.Chart(source).mark_bar().encode( | |
x=alt.X('时间点:N', title='时间点', sort=None, axis=alt.Axis(labelAngle=-45)), | |
y=alt.Y('效率值:Q', title='平均效率'), | |
color=alt.Color('效率类型:N', title='效率类型'), | |
xOffset='效率类型:N', | |
tooltip=[alt.Tooltip('时间点:N'), alt.Tooltip('效率类型:N'), alt.Tooltip('效率值:Q', format='.2f')] | |
).properties(title='每日时间点平均效率分析 (对比全天)').interactive() | |
st.altair_chart(chart, use_container_width=True) | |
# --- Original movie time efficiency function (for the second tab) --- | |
def plot_movie_time_efficiency_analysis(df, selected_movie): | |
if selected_movie == '全部影片': | |
st.info("请选择一部具体的影片进行分析。") | |
return | |
df_movie = df[df['影片名称_清理后'] == selected_movie].copy() | |
df_movie = df_movie[ | |
(df_movie['放映时间'] >= datetime.time(9, 30)) & (df_movie['放映时间'] <= datetime.time(23, 59))] | |
if df_movie.empty: | |
st.warning(f"在 9:30 - 23:59 时间段内没有找到影片《{selected_movie}》的场次数据。") | |
return | |
df_movie['时间点'] = df_movie['放映时间'].apply(round_time_to_5min) | |
daily_totals = df.groupby('放映日期').agg(总票房=('总收入', 'sum'), 总座位数=('座位数', 'sum'), | |
总场次数=('场次', 'sum')).reset_index() | |
if daily_totals.empty: | |
st.warning("无法计算每日总计数据,分析中止。") | |
return | |
df_movie = pd.merge(df_movie, daily_totals, on='放映日期') | |
df_movie = df_movie[(df_movie['总票房'] > 0) & (df_movie['总座位数'] > 0) & (df_movie['总场次数'] > 0)] | |
df_movie['票房比'] = df_movie['总收入'] / df_movie['总票房'] | |
df_movie['座次比'] = df_movie['座位数'] / df_movie['总座位数'] | |
df_movie['场次比'] = 1 / df_movie['总场次数'] | |
df_movie['座次效率'] = (df_movie['票房比'] / df_movie['座次比']).fillna(0) | |
df_movie['场次效率'] = (df_movie['票房比'] / df_movie['场次比']).fillna(0) | |
avg_movie_time_efficiency = df_movie.groupby('时间点')[['座次效率', '场次效率']].mean().reset_index() | |
avg_movie_time_efficiency['时间点'] = avg_movie_time_efficiency['时间点'].apply(lambda t: t.strftime('%H:%M')) | |
source = avg_movie_time_efficiency.melt(id_vars=['时间点'], value_vars=['座次效率', '场次效率'], | |
var_name='效率类型', value_name='效率值') | |
chart = alt.Chart(source).mark_bar().encode( | |
x=alt.X('时间点:N', title='时间点', sort=None, axis=alt.Axis(labelAngle=-45)), | |
y=alt.Y('效率值:Q', title='平均效率'), | |
color='效率类型:N', | |
xOffset='效率类型:N', | |
tooltip=[alt.Tooltip('时间点:N'), alt.Tooltip('效率类型:N'), alt.Tooltip('效率值:Q', format='.2f')] | |
).properties(title=f'影片《{selected_movie}》各时间点平均效率分析 (对比全天)').interactive() | |
st.altair_chart(chart, use_container_width=True) | |
# --- REQUIREMENT 2: New function for windowed daily efficiency analysis --- | |
def plot_windowed_daily_efficiency(df, window_minutes): | |
df['时间点'] = df['放映时间'].apply(round_time_to_5min) | |
time_slots = sorted(df['时间点'].unique()) | |
all_days = df['放映日期'].unique() | |
results = [] | |
for center_time in time_slots: | |
center_dt = datetime.datetime.combine(datetime.date.today(), center_time) | |
start_dt = center_dt - datetime.timedelta(minutes=window_minutes) | |
end_dt = center_dt + datetime.timedelta(minutes=window_minutes) | |
daily_efficiencies = [] | |
for day in all_days: | |
day_df = df[df['放映日期'] == day] | |
# Numerator: Center point's performance | |
center_df = day_df[day_df['时间点'] == center_time] | |
center_revenue = center_df['总收入'].sum() | |
center_seats = center_df['座位数'].sum() | |
center_sessions = len(center_df) | |
# Denominator: Window's performance | |
window_df = day_df[day_df['放映时间'].between(start_dt.time(), end_dt.time())] | |
window_revenue = window_df['总收入'].sum() | |
window_seats = window_df['座位数'].sum() | |
window_sessions = len(window_df) | |
if window_revenue > 0 and window_seats > 0 and window_sessions > 0: | |
票房比 = center_revenue / window_revenue | |
座次比 = center_seats / window_seats | |
场次比 = center_sessions / window_sessions | |
seat_efficiency = (票房比 / 座次比) if 座次比 > 0 else 0 | |
session_efficiency = (票房比 / 场次比) if 场次比 > 0 else 0 | |
daily_efficiencies.append({'seat': seat_efficiency, 'session': session_efficiency}) | |
if daily_efficiencies: | |
avg_seat_eff = np.mean([d['seat'] for d in daily_efficiencies]) | |
avg_session_eff = np.mean([d['session'] for d in daily_efficiencies]) | |
results.append( | |
{'时间点': center_time.strftime('%H:%M'), '座次效率': avg_seat_eff, '场次效率': avg_session_eff}) | |
if not results: | |
st.warning("没有足够的数据来计算分时间段的每日效率。") | |
return | |
results_df = pd.DataFrame(results) | |
source = results_df.melt(id_vars=['时间点'], value_vars=['座次效率', '场次效率'], var_name='效率类型', | |
value_name='效率值') | |
chart = alt.Chart(source).mark_bar().encode( | |
x=alt.X('时间点:N', sort=None, axis=alt.Axis(labelAngle=-45)), | |
y=alt.Y('效率值:Q', title=f'平均效率 (对比±{window_minutes}分钟窗口)'), | |
color='效率类型:N', | |
xOffset='效率类型:N', | |
tooltip=[alt.Tooltip('时间点:N'), alt.Tooltip('效率类型:N'), alt.Tooltip('效率值:Q', format='.2f')] | |
).properties(title=f'每日时间效率分析 (移动窗口: {window_minutes * 2}分钟)').interactive() | |
st.altair_chart(chart, use_container_width=True) | |
# --- REQUIREMENT 3: New function for windowed movie efficiency analysis --- | |
def plot_windowed_movie_efficiency(df, center_time, window_minutes): | |
df['时间点'] = df['放映时间'].apply(round_time_to_5min) | |
center_dt = datetime.datetime.combine(datetime.date.today(), center_time) | |
start_dt = center_dt - datetime.timedelta(minutes=window_minutes) | |
end_dt = center_dt + datetime.timedelta(minutes=window_minutes) | |
all_days = df['放映日期'].unique() | |
movie_list = df['影片名称_清理后'].unique() | |
results = [] | |
for movie in movie_list: | |
daily_efficiencies = [] | |
for day in all_days: | |
day_df = df[df['放映日期'] == day] | |
# Denominator: Window's performance on a specific day | |
window_df = day_df[day_df['放映时间'].between(start_dt.time(), end_dt.time())] | |
window_revenue = window_df['总收入'].sum() | |
window_seats = window_df['座位数'].sum() | |
window_sessions = len(window_df) | |
if window_revenue > 0 and window_seats > 0 and window_sessions > 0: | |
# Numerator: Movie's performance at the center point on that day | |
movie_center_df = day_df[(day_df['时间点'] == center_time) & (day_df['影片名称_清理后'] == movie)] | |
movie_center_revenue = movie_center_df['总收入'].sum() | |
movie_center_seats = movie_center_df['座位数'].sum() | |
movie_center_sessions = len(movie_center_df) | |
if movie_center_revenue > 0: # Only calculate if the movie had a show | |
票房比 = movie_center_revenue / window_revenue | |
座次比 = movie_center_seats / window_seats | |
场次比 = movie_center_sessions / window_sessions | |
seat_efficiency = (票房比 / 座次比) if 座次比 > 0 else 0 | |
session_efficiency = (票房比 / 场次比) if 场次比 > 0 else 0 | |
daily_efficiencies.append({'seat': seat_efficiency, 'session': session_efficiency}) | |
if daily_efficiencies: | |
avg_seat_eff = np.mean([d['seat'] for d in daily_efficiencies]) | |
avg_session_eff = np.mean([d['session'] for d in daily_efficiencies]) | |
results.append({'影片': movie, '座次效率': avg_seat_eff, '场次效率': avg_session_eff}) | |
if not results: | |
st.warning( | |
f"在 {start_dt.time().strftime('%H:%M')} - {end_dt.time().strftime('%H:%M')} 时间段内没有足够的数据进行单片效率分析。") | |
return | |
results_df = pd.DataFrame(results).sort_values(by='座次效率', ascending=False) | |
source = results_df.melt(id_vars=['影片'], value_vars=['座次效率', '场次效率'], var_name='效率类型', | |
value_name='效率值') | |
chart = alt.Chart(source).mark_bar().encode( | |
x=alt.X('效率值:Q'), | |
y=alt.Y('影片:N', sort='-x'), | |
color='效率类型:N', | |
tooltip=[alt.Tooltip('影片:N'), alt.Tooltip('效率类型:N'), alt.Tooltip('效率值:Q', format='.2f')] | |
).properties( | |
title=f"时间段 {start_dt.time().strftime('%H:%M')}-{end_dt.time().strftime('%H:%M')} 内单片平均效率").interactive() | |
st.altair_chart(chart, use_container_width=True) | |
# --- TMS Server Movie Content Inquiry --- | |
def fetch_and_process_server_movies(priority_movie_titles=None): | |
if priority_movie_titles is None: | |
priority_movie_titles = [] | |
# (The rest of the TMS function remains unchanged) | |
# 1. Get Token | |
try: | |
token_headers = { | |
'Host': 'oa.hengdianfilm.com:7080', 'Content-Type': 'application/json', | |
'Origin': 'http://115.239.253.233:7080', 'Connection': 'keep-alive', | |
'Accept': 'application/json, text/javascript, */*; q=0.01', | |
'User-Agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 18_5_0 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/138.0.7204.156 Mobile/15E148 Safari/604.1', | |
'Accept-Language': 'zh-CN,zh-Hans;q=0.9', | |
} | |
token_json_data = {'appId': 'hd', 'appSecret': 'ad761f8578cc6170', 'timeStamp': int(time.time() * 1000)} | |
token_url = 'http://oa.hengdianfilm.com:7080/cinema-api/admin/generateToken?token=hd&murl=?token=hd&murl=ticket=-1495916529737643774' | |
response = requests.post(token_url, headers=token_headers, json=token_json_data, timeout=10) | |
response.raise_for_status() | |
token_data = response.json() | |
if token_data.get('error_code') != '0000': | |
st.error(f"获取Token失败: {token_data.get('error_desc', '未知错误')}") | |
return {}, [] | |
auth_token = token_data['param'] | |
except requests.exceptions.RequestException as e: | |
st.error(f"网络请求错误: {e}") | |
return {}, [] | |
except Exception as e: | |
st.error(f"获取Token时发生未知错误: {e}") | |
return {}, [] | |
# 2. Fetch movie list (with pagination and delay) | |
all_movies = [] | |
page_index = 1 | |
while True: | |
try: | |
list_headers = { | |
'Accept': 'application/json, text/javascript, */*; q=0.01', | |
'Content-Type': 'application/json; charset=UTF-8', | |
'Origin': 'http://115.239.253.233:7080', 'Proxy-Connection': 'keep-alive', 'Token': auth_token, | |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/138.0.0.0 Safari/537.36', | |
'X-SESSIONID': 'PQ0J3K85GJEDVYIGZE1KEG1K80USDAP4', | |
} | |
list_params = {'token': 'hd', 'murl': 'ContentMovie'} | |
list_json_data = {'THEATER_ID': 38205954, 'SOURCE': 'SERVER', 'ASSERT_TYPE': 2, 'PAGE_CAPACITY': 20, | |
'PAGE_INDEX': page_index} | |
list_url = 'http://oa.hengdianfilm.com:7080/cinema-api/cinema/server/dcp/list' | |
response = requests.post(list_url, params=list_params, headers=list_headers, json=list_json_data, | |
verify=False) | |
response.raise_for_status() | |
movie_data = response.json() | |
if movie_data.get("RSPCD") != "000000": | |
st.error(f"获取影片列表失败: {movie_data.get('RSPMSG', '未知错误')}") | |
return {}, [] | |
body = movie_data.get("BODY", {}) | |
movies_on_page = body.get("LIST", []) | |
if not movies_on_page: break | |
all_movies.extend(movies_on_page) | |
if len(all_movies) >= body.get("COUNT", 0): break | |
page_index += 1 | |
time.sleep(1) | |
except requests.exceptions.RequestException as e: | |
st.error(f"网络请求错误: {e}") | |
return {}, [] | |
except Exception as e: | |
st.error(f"获取影片列表时发生未知错误: {e}") | |
return {}, [] | |
# 3. Process data | |
movie_details = {m['CONTENT_NAME']: {'assert_name': m.get('ASSERT_NAME'), | |
'halls': sorted([h.get('HALL_NAME') for h in m.get('HALL_INFO', [])]), | |
'play_time': m.get('PLAY_TIME')} for m in all_movies if m.get('CONTENT_NAME')} | |
by_hall = defaultdict(list) | |
for name, details in movie_details.items(): | |
for hall in details['halls']: by_hall[hall].append({'content_name': name, 'details': details}) | |
for hall in by_hall: by_hall[hall].sort( | |
key=lambda item: (item['details']['assert_name'] is None or item['details']['assert_name'] == '', | |
item['details']['assert_name'] or item['content_name'])) | |
view2_list = [ | |
{'assert_name': d['assert_name'], 'content_name': name, 'halls': d['halls'], 'play_time': d['play_time']} for | |
name, d in movie_details.items() if d.get('assert_name')] | |
priority_list = [item for item in view2_list if any(p in item['assert_name'] for p in priority_movie_titles)] | |
other_list = [item for item in view2_list if item not in priority_list] | |
priority_list.sort(key=lambda x: x['assert_name']); | |
other_list.sort(key=lambda x: x['assert_name']) | |
return dict(sorted(by_hall.items())), priority_list + other_list | |
# --- Streamlit Main UI --- | |
st.title('影城排片效率与内容分析工具') | |
st.write("上传 `影片映出日累计报表.xlsx` 进行效率分析,或点击下方按钮查询 TMS 服务器影片内容。") | |
uploaded_file = st.file_uploader("请在此处上传 Excel 文件", type=['xlsx', 'xls', 'csv']) | |
query_tms_for_location = st.checkbox("查询 TMS 找影片所在影厅") | |
if uploaded_file is not None: | |
try: | |
df = pd.read_excel(uploaded_file, skiprows=3, header=None) | |
df['场次'] = 1 | |
df.rename(columns={0: '影片名称', 1: '放映日期', 2: '放映时间', 5: '总人次', 6: '总收入', 7: '座位数'}, | |
inplace=True) | |
required_cols = ['影片名称', '放映日期', '放映时间', '座位数', '总收入', '总人次', '场次'] | |
df = df[required_cols] | |
df.dropna(subset=['影片名称', '放映日期', '放映时间'], inplace=True) | |
df['放映日期'] = pd.to_datetime(df['放映日期'], errors='coerce').dt.date | |
df.dropna(subset=['放映日期'], inplace=True) | |
for col in ['座位数', '总收入', '总人次']: | |
df[col] = pd.to_numeric(df[col], errors='coerce').fillna(0) | |
df['放映时间'] = pd.to_datetime(df['放映时间'], format='%H:%M:%S', errors='coerce').dt.time | |
df.dropna(subset=['放映时间'], inplace=True) | |
df['影片名称_清理后'] = df['影片名称'].apply(clean_movie_title) | |
st.toast("文件上传成功,效率分析已生成!", icon="🎉") | |
format_config = {'座位数': '{:,.0f}', '场次': '{:,.0f}', '人次': '{:,.0f}', '票房': '{:,.2f}', '均价': '{:.2f}', | |
'座次比': '{:.2%}', '场次比': '{:.2%}', '票房比': '{:.2%}', '座次效率': '{:.2f}', | |
'场次效率': '{:.2f}'} | |
full_day_analysis = process_and_analyze_data(df.copy()) | |
prime_time_analysis = process_and_analyze_data( | |
df[df['放映时间'].between(datetime.time(14, 0), datetime.time(21, 0))].copy()) | |
if query_tms_for_location: | |
# ... (TMS logic remains unchanged) | |
pass | |
st.markdown("### 全天排片效率分析") | |
if not full_day_analysis.empty: | |
st.dataframe(full_day_analysis.style.format(format_config), use_container_width=True, hide_index=True) | |
st.markdown("#### 黄金时段排片效率分析 (14:00-21:00)") | |
if not prime_time_analysis.empty: | |
st.dataframe(prime_time_analysis.style.format(format_config), use_container_width=True, hide_index=True) | |
if not full_day_analysis.empty: | |
st.markdown("##### 复制当日排片列表") | |
movie_titles = full_day_analysis['影片'].tolist() | |
formatted_titles = ''.join([f'《{title}》' for title in movie_titles]) | |
st.code(formatted_titles, language='text') | |
if not df.empty: | |
with st.expander("影城每日票房表现", expanded=True): | |
movie_options = ['全部影片'] + full_day_analysis['影片'].unique().tolist() | |
selected_movie_for_chart = st.selectbox('选择影片查看其每日票房', options=movie_options, | |
key='daily_box_office_selector') | |
daily_chart = plot_daily_box_office(df.copy(), selected_movie_for_chart) | |
if daily_chart: | |
st.altair_chart(daily_chart, use_container_width=True) | |
# --- UI CHANGE FOR REQUIREMENT 1 --- | |
st.markdown("---") | |
plot_daily_box_office_by_time(df.copy(), selected_movie_for_chart) | |
# --- UI CHANGE FOR REQUIREMENTS 2 & 3 --- | |
with st.expander("每日时间效率分析", expanded=False): | |
tab1, tab2, tab3, tab4 = st.tabs([ | |
"每日效率(对比全天)", | |
"单片效率(对比全天)", | |
"每日效率(分时间段)", | |
"单片效率(分时间段)" | |
]) | |
with tab1: | |
st.write("分析所有影片在各时间点(5分钟聚合)的平均效率。效率值通过对比 **全天** 的总表现得出。") | |
plot_time_efficiency_analysis(df.copy()) | |
with tab2: | |
st.write("选择一部影片,查看其在各时间点的平均效率。效率值通过对比 **全天** 的总表现得出。") | |
movie_options_for_time = ['全部影片'] + full_day_analysis['影片'].unique().tolist() | |
selected_movie_for_time_chart = st.selectbox('选择影片', options=movie_options_for_time, | |
key='movie_time_selector') | |
plot_movie_time_efficiency_analysis(df.copy(), selected_movie_for_time_chart) | |
with tab3: | |
st.write("分析每个时间点的效率,效率值通过对比该时间点 **周边指定时间窗口** 的总表现得出。") | |
window_daily = st.number_input("时间窗口(前后各x分钟)", min_value=5, value=20, step=5, | |
key='daily_window') | |
plot_windowed_daily_efficiency(df.copy(), window_daily) | |
with tab4: | |
st.write( | |
"在指定时间窗口内,分析各影片的效率。效率值通过对比影片在 **中心时间点** 的表现与 **整个窗口** 的总表现得出。") | |
col1, col2 = st.columns(2) | |
with col1: | |
center_time_movie = st.time_input("中心时间点", value=datetime.time(19, 30), | |
step=datetime.timedelta(minutes=5), key='movie_time_center') | |
with col2: | |
window_movie = st.number_input("时间窗口(前后各x分钟)", min_value=5, value=20, step=5, | |
key='movie_window') | |
plot_windowed_movie_efficiency(df.copy(), center_time_movie, window_movie) | |
except Exception as e: | |
st.error(f"处理文件时出错: {e}") | |
st.error("请检查您的 Excel 文件格式是否正确,特别是日期和时间列。") | |
# (TMS UI part remains unchanged) | |
st.divider() | |
st.markdown("### TMS 服务器影片内容查询") | |
if st.button('点击查询 TMS 服务器'): | |
with st.spinner("正在从 TMS 服务器获取数据中..."): | |
try: | |
halls_data, movie_list_sorted = fetch_and_process_server_movies() | |
st.toast("TMS 服务器数据获取成功!", icon="🎉") | |
if halls_data or movie_list_sorted: | |
st.markdown("#### 按影片查看所在影厅") | |
view2_data = [{'影片名称': item['assert_name'], | |
'所在影厅': " ".join(sorted([get_circled_number(h) for h in item['halls']])), | |
'文件名': item['content_name'], '时长': format_play_time(item['play_time'])} for item in | |
movie_list_sorted] | |
df_view2 = pd.DataFrame(view2_data) | |
st.dataframe(df_view2, hide_index=True, use_container_width=True) | |
st.markdown("#### 按影厅查看影片内容") | |
hall_tabs = st.tabs(list(halls_data.keys())) | |
for tab, hall_name in zip(hall_tabs, halls_data.keys()): | |
with tab: | |
view1_data_for_tab = [{'影片名称': item['details']['assert_name'], | |
'所在影厅': " ".join( | |
sorted([get_circled_number(h) for h in item['details']['halls']])), | |
'文件名': item['content_name'], | |
'时长': format_play_time(item['details']['play_time'])} for item in | |
halls_data[hall_name]] | |
df_view1_tab = pd.DataFrame(view1_data_for_tab) | |
st.dataframe(df_view1_tab, hide_index=True, use_container_width=True) | |
except Exception as e: | |
st.error(f"查询服务器时出错: {e}") |