import pandas as pd
import numpy as np
import json
import colorsys
import folium
import gradio as gr
from datetime import datetime
import os
from functools import lru_cache
import geopandas as gpd
from shapely.geometry import Point
from folium import plugins
import zipfile
import tempfile
import shutil
SEED = 42
# Initialize global variables
df = None
cluster_df = None
regions_gdf = None
# Add global variable for shapefile path
current_shp_path = 'data/gadm41_KOR_shp/gadm41_KOR_3.shp'
def process_upload(file_obj):
"""Process uploaded CSV file"""
global df # 전역 변수임을 명시
if file_obj is None:
return "No file uploaded.", None
try:
file_path = file_obj.name
file_name = os.path.basename(file_path)
_, ext = os.path.splitext(file_path)
if ext.lower() != '.csv':
return "Please upload a CSV file.", None
# Try different encodings
for encoding in ['utf-8', 'cp949', 'euc-kr']:
try:
temp_df = pd.read_csv(file_path, engine='python', encoding=encoding)
# Remove rows where 'name' is null
original_len = len(temp_df)
temp_df = temp_df.dropna(subset=['name'])
rows_dropped = original_len - len(temp_df)
# Update the global df
df = temp_df # 전역 변수 업데이트
return f"File uploaded and processed successfully. {len(df)} records loaded with {encoding} encoding. {rows_dropped} rows with null names were removed.", file_name
except UnicodeDecodeError:
continue
except Exception as e:
return f"Error processing file with {encoding} encoding: {str(e)}", None
return "Could not process the file with any of the supported encodings.", None
except Exception as e:
return f"Error processing upload: {str(e)}", None
def process_cluster_upload(file_obj):
"""Process uploaded cluster CSV file"""
global cluster_df # 전역 변수임을 명시
if file_obj is None:
return "No cluster file uploaded.", None
try:
file_path = file_obj.name
file_name = os.path.basename(file_path)
_, ext = os.path.splitext(file_path)
if ext.lower() != '.csv':
return "Please upload a CSV file.", None
# Try different encodings
for encoding in ['utf-8', 'cp949', 'euc-kr']:
try:
temp_df = pd.read_csv(file_path, engine='python', encoding=encoding)
# Update the global cluster_df
cluster_df = temp_df # 전역 변수 업데이트
return f"Cluster file uploaded and processed successfully. {len(cluster_df)} records loaded with {encoding} encoding.", file_name
except UnicodeDecodeError:
continue
except Exception as e:
return f"Error processing cluster file with {encoding} encoding: {str(e)}", None
return "Could not process the cluster file with any of the supported encodings.", None
except Exception as e:
return f"Error processing cluster upload: {str(e)}", None
def process_shp_upload(file_obj):
"""Process uploaded shapefile ZIP"""
global regions_gdf, current_shp_path
if file_obj is None:
return "No file uploaded.", None
try:
file_path = file_obj.name
file_name = os.path.basename(file_path)
_, ext = os.path.splitext(file_path)
if ext.lower() != '.zip':
return "Please upload a ZIP file containing shapefile components.", None
# Create a temporary directory to extract files
with tempfile.TemporaryDirectory() as temp_dir:
# Extract ZIP contents
with zipfile.ZipFile(file_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
# Find .shp file in the extracted contents, excluding __MACOSX directory
shp_files = []
for root, _, files in os.walk(temp_dir):
# Skip __MACOSX directory
if '__MACOSX' in root:
continue
for file in files:
if file.endswith('.shp'):
shp_files.append(os.path.join(root, file))
if not shp_files:
return "No .shp file found in the ZIP archive.", None
# Use the first .shp file found
shp_path = shp_files[0]
try:
# Read the shapefile
regions_gdf = gpd.read_file(shp_path).to_crs("EPSG:4326")
# Create a permanent directory for the shapefiles if it doesn't exist
permanent_dir = os.path.join('data', 'uploaded_shapefiles')
os.makedirs(permanent_dir, exist_ok=True)
# Generate a unique subdirectory name using timestamp
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
target_dir = os.path.join(permanent_dir, f'shapefile_{timestamp}')
os.makedirs(target_dir)
# Copy all related files to the permanent location
shp_base = os.path.splitext(shp_path)[0]
for ext in ['.shp', '.shx', '.dbf', '.prj', '.cpg', '.sbn', '.sbx']:
src_file = f"{shp_base}{ext}"
if os.path.exists(src_file):
shutil.copy2(src_file, target_dir)
# Update the current shapefile path to point to the permanent location
current_shp_path = os.path.join(target_dir, os.path.basename(shp_path))
return f"Shapefile uploaded and processed successfully. {len(regions_gdf)} features loaded.", file_name
except Exception as e:
return f"Error processing shapefile: {str(e)}", None
except Exception as e:
return f"Error processing ZIP upload: {str(e)}", None
def print_route_info(df, shp_file_path, sample_checkbox=False, path_checkbox=False):
"""Print route information to console based on checkbox settings"""
output_lines = []
for _, row in df.iterrows():
if sample_checkbox:
date_str = pd.to_datetime(row['created']).strftime('%Y-%m-%d %H:%M:%S')
output_lines.append(f"\nSample: {row['name']} ({date_str})")
output_lines.append(f" - Vehicle: {row['vehicle_type']}")
if path_checkbox:
route = row['route'] if isinstance(row['route'], (dict, list)) else json.loads(row['route'])
output_lines.append(" - Path list:")
# Create GeoDataFrame for location lookup
coords = []
for loc in route:
if isinstance(loc, dict):
if 'latitude' in loc and 'longitude' in loc:
lat = float(loc['latitude']) / 360000.0
lng = float(loc['longitude']) / 360000.0
coords.append((lat, lng))
if coords:
gdf_sample = gpd.GeoDataFrame(
geometry=[Point(lon, lat) for lat, lon in coords],
crs="EPSG:4326"
)
# Load regions shapefile using provided path
regions_gdf = gpd.read_file(shp_file_path).to_crs("EPSG:4326")
# Join with regions
joined = gpd.sjoin(gdf_sample, regions_gdf, how="left", predicate="within")
# Get available columns for location info
location_columns = []
for col in ['NAME_1', 'NAME_2', 'NAME_3', 'TYPE_3']:
if col in joined.columns:
location_columns.append(col)
if location_columns:
# Create location string based on available columns
joined['location'] = joined[location_columns].astype(str).apply(
lambda x: "_".join(str(val) for val in x), axis=1
)
else:
# Fallback to coordinates if no matching columns found
joined['location'] = joined.geometry.apply(
lambda x: f"lat: {x.y:.6f}, lon: {x.x:.6f}"
)
for _, point in joined.iterrows():
output_lines.append(f" - {point['location']}")
output_lines.append("-" * 50)
return "\n".join(output_lines)
def get_colors(n, s=1.0, v=1.0):
colors = []
for i in range(n):
h = i / n
s = s # Maximum saturation
v = v # Maximum value/brightness
r, g, b = colorsys.hsv_to_rgb(h, s, v)
colors.append(f'#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}')
return colors
def cal_paths_folium(df, shp_file_path, n_samples=None, start_d=None, end_d=None, company=None,
sample_checkbox=False, path_checkbox=False):
log_messages = []
working_df = df.copy()
log_messages.append(f"Initial dataframe size: {len(working_df)} rows")
# Convert created column to datetime and remove timezone information
working_df['created'] = pd.to_datetime(working_df['created']).dt.tz_localize(None)
# Date filtering with better error handling and debugging
if start_d:
try:
start_d = pd.to_datetime(start_d).normalize()
log_messages.append(f"Filtering from date: {start_d}")
working_df = working_df[working_df['created'] >= start_d]
log_messages.append(f"After start date filter: {len(working_df)} rows")
except Exception as e:
log_messages.append(f"Error in start date filtering: {str(e)}")
if end_d:
try:
end_d = pd.to_datetime(end_d).normalize() + pd.Timedelta(days=1) - pd.Timedelta(seconds=1)
log_messages.append(f"Filtering until date: {end_d}")
working_df = working_df[working_df['created'] <= end_d]
log_messages.append(f"After end date filter: {len(working_df)} rows")
except Exception as e:
log_messages.append(f"Error in end date filtering: {str(e)}")
# Company filtering with better error handling and debugging
if company and company.strip():
try:
log_messages.append(f"Filtering for company: {company}")
working_df = working_df[working_df['name'].str.contains(company, na=False)]
log_messages.append(f"After company filter: {len(working_df)} rows")
except Exception as e:
log_messages.append(f"Error in company filtering: {str(e)}")
# Sample n
if n_samples and len(working_df) > 0:
working_df = working_df.sample(n=min(n_samples, len(working_df)), random_state=42)
log_messages.append(f"After sampling: {len(working_df)} rows")
# Print column names and a few rows for debugging
log_messages.append(f"Columns in dataframe: {list(working_df.columns)}")
if len(working_df) > 0:
log_messages.append("First row sample:")
log_messages.append(str(working_df.iloc[0]))
# Generate colors
colors = get_colors(max(1, len(working_df)), s=0.5, v=1.0)
# Print route information
if sample_checkbox or path_checkbox:
console_output = print_route_info(working_df, shp_file_path, sample_checkbox, path_checkbox)
log_messages.append(console_output)
# Generate route data
routes = []
for i, (_, row) in enumerate(working_df.iterrows()):
# Convert route to dict/list if it's a string
route = row['route'] if isinstance(row['route'], (dict, list)) else json.loads(row['route'])
# Handle different possible formats of coordinates
coords = []
for loc in route:
if isinstance(loc, dict):
# Handle 'latitude/longitude' format
if 'latitude' in loc and 'longitude' in loc:
lat = float(loc['latitude'])
lng = float(loc['longitude'])
# Scale coordinates if needed
if abs(lat) > 90 or abs(lng) > 180:
lat /= 360000.0
lng /= 360000.0
coords.append([lat, lng])
# Handle 'lat/lng' format
elif 'lat' in loc and 'lng' in loc:
lat = float(loc['lat'])
lng = float(loc['lng'])
# Scale coordinates if needed
if abs(lat) > 90 or abs(lng) > 180:
lat /= 360000.0
lng /= 360000.0
coords.append([lat, lng])
if coords:
routes.append({
'coordinates': coords,
'color': colors[i % len(colors)],
'company': str(row.get('name', 'Unknown')),
'created': row['created'].strftime('%Y-%m-%d %H:%M:%S')
})
print(f"Generated {len(routes)} valid routes")
log_messages.append(f"Generated {len(routes)} valid routes")
# routes와 함께 로그 메시지도 반환
return routes, "\n".join(log_messages)
def plot_paths_folium(routes, cluster_df=cluster_df, cluster_num_samples=None, cluster_company_search=None, cluster_date_start=None, cluster_date_end=None, map_location="Seoul", map_type="Satellite map", path_type="point+line", brightness=100):
"""Plot routes on a Folium map with customizable settings"""
# Map center coordinates based on location selection
centers = {
"Korea": (36.5, 127.5),
"Seoul": (37.5665, 126.9780),
"Busan": (35.1796, 129.0756)
}
zoom_levels = {
"Korea": 7,
"Seoul": 12,
"Busan": 12
}
center = centers.get(map_location, centers["Korea"])
zoom_start = zoom_levels.get(map_location, 7)
# Create map with appropriate type
if map_type == "Satellite map":
m = folium.Map(location=center, zoom_start=zoom_start,
tiles='https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}',
attr='Esri')
else:
m = folium.Map(location=center, zoom_start=zoom_start)
path_fg = folium.FeatureGroup(name="Path").add_to(m)
# Add routes to the map
for route in routes:
if path_type in ["point", "point+line"] and len(route['coordinates']) > 0:
for i, coord in enumerate(route['coordinates']):
x_icon_html = f'''
×
'''
folium.DivIcon(
html=x_icon_html
).add_to(folium.Marker(
location=coord,
popup=f"{route.get('company', 'Unknown')} - Point {i+1}"
).add_to(path_fg))
if path_type in ["line", "point+line"]:
folium.PolyLine(
route['coordinates'],
color=route['color'],
weight=0.5,
dash_array='1, 1', # 점선 스타일 (선 길이, 간격)
popup=route.get('company', 'Unknown')
).add_to(path_fg)
cluster_df['t_pickup'] = pd.to_datetime(cluster_df['t_pickup'])
if cluster_date_start:
# Convert string to datetime without timezone
cluster_date_start = pd.to_datetime(cluster_date_start).normalize()
cluster_df = cluster_df[cluster_df['t_pickup'] >= cluster_date_start]
if cluster_date_end:
# Convert string to datetime without timezone
cluster_date_end = pd.to_datetime(cluster_date_end).normalize() + pd.Timedelta(days=1) - pd.Timedelta(seconds=1)
cluster_df = cluster_df[cluster_df['t_pickup'] <= cluster_date_end]
if cluster_company_search:
cluster_df = cluster_df.query("company.str.contains(@cluster_company_search)")
if cluster_num_samples:
cluster_df = cluster_df.sample(n=min(cluster_num_samples, len(cluster_df)), random_state=42)
cluster_geo_fg = folium.FeatureGroup(name="Cluster Geo").add_to(m)
cluster_pmi_fg = folium.FeatureGroup(name="Cluster PMI", show=False).add_to(m)
cluster_geo_values = cluster_df['cluster_geo'].unique()
cluster_pmi_values = cluster_df['cluster_pmi'].unique()
# Create a mapping from cluster numbers to color indices
cluster_geo_mapping = {val: idx for idx, val in enumerate(sorted(cluster_geo_values))}
cluster_pmi_mapping = {val: idx for idx, val in enumerate(sorted(cluster_pmi_values))}
cluster_geo_colors = get_colors(len(cluster_geo_values))
cluster_pmi_colors = get_colors(len(cluster_pmi_values))
for _, row in cluster_df.iterrows():
# Geo cluster markers remain as circles
folium.CircleMarker(
location=(row['latitude'], row['longitude']),
popup=f"{row['company']} - Cluster {row['cluster_geo']}",
radius=3,
color=cluster_geo_colors[cluster_geo_mapping[row['cluster_geo']]],
fill=True,
fill_color=cluster_geo_colors[cluster_geo_mapping[row['cluster_geo']]],
).add_to(cluster_geo_fg)
# PMI cluster markers as stars
star_html = f'''
★
'''
folium.DivIcon(
html=star_html
).add_to(folium.Marker(
location=(row['latitude'], row['longitude']),
popup=f"{row['company']} - Cluster {row['cluster_pmi']}",
).add_to(cluster_pmi_fg))
# Group points by cluster for both geo and pmi
geo_clusters = {}
pmi_clusters = {}
for _, row in cluster_df.iterrows():
# For geo clusters
geo_cluster = row['cluster_geo']
if geo_cluster not in geo_clusters:
geo_clusters[geo_cluster] = []
geo_clusters[geo_cluster].append((row['latitude'], row['longitude']))
# For pmi clusters
pmi_cluster = row['cluster_pmi']
if pmi_cluster not in pmi_clusters:
pmi_clusters[pmi_cluster] = []
pmi_clusters[pmi_cluster].append((row['latitude'], row['longitude']))
# Function to create a closed path by connecting nearest points
def create_closed_path(points):
if len(points) <= 1:
return points
# Start with the first point
path = [points[0]]
remaining_points = points[1:]
# Keep finding the closest point until none are left
while remaining_points:
current = path[-1]
# Find closest point to the current point
closest_idx = 0
closest_dist = float('inf')
for i, point in enumerate(remaining_points):
dist = ((current[0] - point[0])**2 + (current[1] - point[1])**2)**0.5
if dist < closest_dist:
closest_dist = dist
closest_idx = i
# Add the closest point to the path
path.append(remaining_points[closest_idx])
remaining_points.pop(closest_idx)
# Connect back to the first point to close the path
path.append(path[0])
return path
# Create polylines for geo clusters
for cluster_num, points in geo_clusters.items():
if len(points) >= 2: # Need at least 2 points to make a line
path = create_closed_path(points)
folium.PolyLine(
path,
color=cluster_geo_colors[cluster_geo_mapping[cluster_num]],
weight=2,
).add_to(cluster_geo_fg)
# Create polylines for pmi clusters
for cluster_num, points in pmi_clusters.items():
if len(points) >= 2: # Need at least 2 points to make a line
path = create_closed_path(points)
folium.PolyLine(
path,
color=cluster_pmi_colors[cluster_pmi_mapping[cluster_num]],
weight=2,
).add_to(cluster_pmi_fg)
# Create custom legend HTML with three scrollable sections
legend_html = '''
Path Routes
'''
# Add path routes to the legend with larger X symbol
for route in routes:
legend_html += f'''
×
{route.get('company', 'Unknown')}_{route.get('created', '')}
'''
# Get unique cluster values from already filtered cluster_df
visible_cluster_geo = sorted(cluster_df['cluster_geo'].unique())
visible_cluster_pmi = sorted(cluster_df['cluster_pmi'].unique())
# Add Cluster Geo section with larger circle symbol
legend_html += '''
Cluster Geo
'''
# Add only visible cluster geo information with larger circles
for cluster_value in visible_cluster_geo:
color = cluster_geo_colors[cluster_geo_mapping[cluster_value]]
legend_html += f'''
'''
# Add Cluster PMI section with larger star symbol
legend_html += '''
Cluster PMI
'''
# Add only visible cluster PMI information with larger stars
for cluster_value in visible_cluster_pmi:
color = cluster_pmi_colors[cluster_pmi_mapping[cluster_value]]
legend_html += f'''
★
Cluster {cluster_value}
'''
legend_html += '''
'''
folium.LayerControl(collapsed=False).add_to(m)
folium.plugins.Fullscreen(
position="bottomright",
title="Expand me",
title_cancel="Exit me",
force_separate_button=True,
).add_to(m)
# Add the legend to the map
m.get_root().html.add_child(folium.Element(legend_html))
# Add custom CSS for brightness control - only affecting the satellite tiles
custom_css = f"""
"""
m.get_root().header.add_child(folium.Element(custom_css))
return m._repr_html_()
def update_map(map_location, map_type, path_type, n_samples, company, date_start, date_end,
cluster_num_samples, cluster_company_search, cluster_date_start, cluster_date_end,
pick_all_date, sample_checkbox, path_checkbox, brightness_slider):
"""Update the map based on user selections"""
global df, cluster_df, regions_gdf, current_shp_path
log_messages = []
log_messages.append(f"Updating map with settings: Location={map_location}, Type={map_type}, Path={path_type}")
# Check if data is loaded
if df is None:
log_messages.append("Loading default data because df is None")
df_loaded, msg, _ = load_default_data()
if df_loaded is None:
return "No data available. Please upload a CSV file.", None
else:
log_messages.append(f"Using existing df with {len(df)} rows")
try:
# Process date filters with better error handling
start_d = None
end_d = None
if not pick_all_date:
if date_start and date_start.strip():
start_d = date_start
log_messages.append(f"Using start date: {start_d}")
if date_end and date_end.strip():
end_d = date_end
log_messages.append(f"Using end date: {end_d}")
else:
log_messages.append("Using all dates")
# Check if shapefile exists at current_shp_path
if not os.path.exists(current_shp_path):
log_messages.append(f"Warning: Shapefile not found at {current_shp_path}")
# Try to find the most recently uploaded shapefile
permanent_dir = os.path.join('data', 'uploaded_shapefiles')
if os.path.exists(permanent_dir):
subdirs = [os.path.join(permanent_dir, d) for d in os.listdir(permanent_dir)
if os.path.isdir(os.path.join(permanent_dir, d))]
if subdirs:
# Get the most recent directory
latest_dir = max(subdirs, key=os.path.getctime)
# Find .shp file in that directory
shp_files = [f for f in os.listdir(latest_dir) if f.endswith('.shp')]
if shp_files:
current_shp_path = os.path.join(latest_dir, shp_files[0])
log_messages.append(f"Using most recent shapefile: {current_shp_path}")
# Calculate routes with full error reporting
try:
routes, cal_logs = cal_paths_folium(df, current_shp_path, n_samples=n_samples,
start_d=start_d, end_d=end_d,
company=company, sample_checkbox=sample_checkbox,
path_checkbox=path_checkbox)
log_messages.append(cal_logs)
except Exception as e:
log_messages.append(f"Error in route calculation: {str(e)}")
import traceback
log_messages.append(traceback.format_exc())
return "\n".join(log_messages), None
# Check if we have routes to display
if not routes:
log_messages.append("No routes to display after applying filters.")
empty_map = folium.Map(location=(36.5, 127.5), zoom_start=7)
return "\n".join(log_messages), empty_map._repr_html_()
# Create map
html_output = plot_paths_folium(routes, cluster_df, cluster_num_samples, cluster_company_search,
cluster_date_start, cluster_date_end, map_location, map_type, path_type, brightness_slider)
return "\n".join(log_messages), html_output
except Exception as e:
error_msg = f"Error updating map: {str(e)}"
log_messages.append(error_msg)
import traceback
log_messages.append(traceback.format_exc())
return "\n".join(log_messages), None
# Initialize data
def load_default_data():
"""Load the default dataset"""
global df, cluster_df, regions_gdf
default_file = 'data/20250122_Order_List_202411_12_CJW.csv'
default_cluster_file = 'data/path_clustering_2024.csv'
default_gadm_shp_file = 'data/gadm41_KOR_shp/gadm41_KOR_3.shp'
messages = []
path_filename = ""
cluster_filename = ""
shp_filename = ""
# Try different encodings for the main file
for encoding in ['utf-8', 'cp949', 'euc-kr']:
try:
df = pd.read_csv(default_file, engine='python', encoding=encoding)
path_filename = os.path.basename(default_file)
messages.append(f"Path file loaded successfully: {path_filename}")
break
except UnicodeDecodeError:
continue
except Exception as e:
messages.append(f"Error loading path file: {str(e)}")
return None, None, None, "\n".join(messages), "", "", ""
# Try different encodings for the cluster file
for encoding in ['utf-8', 'cp949', 'euc-kr']:
try:
cluster_df = pd.read_csv(default_cluster_file, engine='python', encoding=encoding)
cluster_filename = os.path.basename(default_cluster_file)
messages.append(f"Cluster file loaded successfully: {cluster_filename}")
break
except UnicodeDecodeError:
continue
except Exception as e:
messages.append(f"Error loading cluster file: {str(e)}")
return None, None, None, "\n".join(messages), "", "", ""
# Load shapefile
try:
regions_gdf = gpd.read_file(default_gadm_shp_file).to_crs("EPSG:4326")
shp_filename = os.path.basename(default_gadm_shp_file)
messages.append(f"Shapefile loaded successfully: {shp_filename}")
except Exception as e:
messages.append(f"Error loading shapefile: {str(e)}")
return None, None, None, "\n".join(messages), "", "", ""
return df, cluster_df, regions_gdf, "\n".join(messages), path_filename, cluster_filename, shp_filename
init_n_samples = 20
init_path_company_search = "백년화편"
init_path_date_start = "2024-12-01"
init_path_date_end = "2024-12-31"
init_cluster_num_samples = 200
init_cluster_date_start = "2025-02-24"
init_cluster_date_end = "2025-02-24"
init_brightness = 50
init_df, init_cluster_df, init_regions_gdf, init_msg, init_path_file, init_cluster_file, init_shp_file = load_default_data()
# Initial map
init_shp_file_path = 'data/gadm41_KOR_shp/gadm41_KOR_3.shp'
init_routes, _ = cal_paths_folium(df, init_shp_file_path, n_samples=init_n_samples,
start_d=init_path_date_start, end_d=init_path_date_end,
company=init_path_company_search) if df is not None else ([], "")
init_html = plot_paths_folium(routes=init_routes, cluster_df=init_cluster_df, cluster_num_samples=init_cluster_num_samples, cluster_date_start=init_cluster_date_start, cluster_date_end=init_cluster_date_end, brightness=init_brightness) if init_routes else None
# Create Gradio interface
with gr.Blocks() as demo:
# Layout
with gr.Column():
# Map controls
with gr.Row():
map_location = gr.Radio(
["Korea", "Seoul", "Busan"],
label="Map Location Shortcuts",
value="Seoul"
)
map_type = gr.Radio(
["Normal map", "Satellite map"],
label="Map Type",
value="Satellite map"
)
path_type = gr.Radio(
["point", "line", "point+line"],
label="Path Type",
value="point+line"
)
brightness_slider = gr.Slider(
minimum=1,
maximum=300,
value=50,
step=1,
label="Map Brightness (%)"
)
# Map display
map_html = gr.HTML(init_html, elem_classes=["map-container"])
generate_btn = gr.Button("Generate Map")
# Filter controls
with gr.Column():
with gr.Row():
path_file_upload = gr.File(label="Upload Path File", height=89, file_count="single", scale=1)
path_current_file = gr.Textbox(label="Current Path File", value=init_path_file, scale=2)
with gr.Row():
cluster_file_upload = gr.File(label="Upload Cluster File", height=89, file_count="single", scale=1)
cluster_current_file = gr.Textbox(label="Current Cluster File", value=init_cluster_file, scale=2)
with gr.Row():
gadm_shp_upload = gr.File(label="Upload gadm .zip File", height=89, file_count="single", scale=1)
gadm_shp_current_file = gr.Textbox(label="Current gadm .zip File", value=init_shp_file, scale=2)
with gr.Row():
with gr.Row():
path_num_samples = gr.Number(label="Path Sample Count", precision=0, value=20, scale=1, minimum=1, maximum=200)
path_company_search = gr.Textbox(label="Path Company Search", value="백년화편", scale=2)
with gr.Row():
cluster_num_samples = gr.Number(label="Cluster Sample Count", precision=0, value=200, scale=1, minimum=1, maximum=200)
cluster_company_search = gr.Textbox(label="Cluster Company Search", scale=2)
# Date range
with gr.Row():
with gr.Row():
path_date_start = gr.Textbox(label="Path Start Date", placeholder="YYYY-MM-DD", value="2024-12-01")
path_date_end = gr.Textbox(label="Path End Date", placeholder="YYYY-MM-DD", value="2024-12-31")
with gr.Row():
cluster_date_start = gr.Textbox(label="Cluster Start Date", placeholder="YYYY-MM-DD", value="2025-02-24")
cluster_date_end = gr.Textbox(label="Cluster End Date", placeholder="YYYY-MM-DD", value="2025-02-24")
# Checkboxes
with gr.Row():
pick_all_date = gr.Checkbox(label="Select All Dates")
sample_checkbox = gr.Checkbox(label="Print Sample", value=True)
path_checkbox = gr.Checkbox(label="Print Path")
# Console
console = gr.Textbox(
label="Console",
lines=10,
max_lines=100,
interactive=False,
value=init_msg,
elem_classes=["console"]
)
# Style
gr.Markdown("""
""")
# Event handlers
path_file_upload.upload(
fn=process_upload,
inputs=[path_file_upload],
outputs=[console, path_current_file]
)
cluster_file_upload.upload(
fn=process_cluster_upload,
inputs=[cluster_file_upload],
outputs=[console, cluster_current_file]
)
gadm_shp_upload.upload(
fn=process_shp_upload,
inputs=[gadm_shp_upload],
outputs=[console, gadm_shp_current_file]
)
generate_btn.click(
fn=update_map,
inputs=[
map_location, map_type, path_type, path_num_samples, path_company_search,
path_date_start, path_date_end, cluster_num_samples, cluster_company_search,
cluster_date_start, cluster_date_end, pick_all_date, sample_checkbox, path_checkbox,
brightness_slider
],
outputs=[console, map_html]
)
# Auto-update radio buttons
for control in [map_location, map_type, path_type, brightness_slider]:
control.change(
fn=update_map,
inputs=[
map_location, map_type, path_type, path_num_samples, path_company_search,
path_date_start, path_date_end, cluster_num_samples, cluster_company_search,
cluster_date_start, cluster_date_end, pick_all_date, sample_checkbox, path_checkbox,
brightness_slider
],
outputs=[console, map_html]
)
# Launch the app
demo.launch(share=True)