import pandas as pd import numpy as np import json import colorsys import folium import gradio as gr from datetime import datetime import os from functools import lru_cache import geopandas as gpd from shapely.geometry import Point from folium import plugins import zipfile import tempfile import shutil SEED = 42 # Initialize global variables df = None cluster_df = None regions_gdf = None # Add global variable for shapefile path current_shp_path = 'data/gadm41_KOR_shp/gadm41_KOR_3.shp' def process_upload(file_obj): """Process uploaded CSV file""" global df # 전역 변수임을 명시 if file_obj is None: return "No file uploaded.", None try: file_path = file_obj.name file_name = os.path.basename(file_path) _, ext = os.path.splitext(file_path) if ext.lower() != '.csv': return "Please upload a CSV file.", None # Try different encodings for encoding in ['utf-8', 'cp949', 'euc-kr']: try: temp_df = pd.read_csv(file_path, engine='python', encoding=encoding) # Remove rows where 'name' is null original_len = len(temp_df) temp_df = temp_df.dropna(subset=['name']) rows_dropped = original_len - len(temp_df) # Update the global df df = temp_df # 전역 변수 업데이트 return f"File uploaded and processed successfully. {len(df)} records loaded with {encoding} encoding. {rows_dropped} rows with null names were removed.", file_name except UnicodeDecodeError: continue except Exception as e: return f"Error processing file with {encoding} encoding: {str(e)}", None return "Could not process the file with any of the supported encodings.", None except Exception as e: return f"Error processing upload: {str(e)}", None def process_cluster_upload(file_obj): """Process uploaded cluster CSV file""" global cluster_df # 전역 변수임을 명시 if file_obj is None: return "No cluster file uploaded.", None try: file_path = file_obj.name file_name = os.path.basename(file_path) _, ext = os.path.splitext(file_path) if ext.lower() != '.csv': return "Please upload a CSV file.", None # Try different encodings for encoding in ['utf-8', 'cp949', 'euc-kr']: try: temp_df = pd.read_csv(file_path, engine='python', encoding=encoding) # Update the global cluster_df cluster_df = temp_df # 전역 변수 업데이트 return f"Cluster file uploaded and processed successfully. {len(cluster_df)} records loaded with {encoding} encoding.", file_name except UnicodeDecodeError: continue except Exception as e: return f"Error processing cluster file with {encoding} encoding: {str(e)}", None return "Could not process the cluster file with any of the supported encodings.", None except Exception as e: return f"Error processing cluster upload: {str(e)}", None def process_shp_upload(file_obj): """Process uploaded shapefile ZIP""" global regions_gdf, current_shp_path if file_obj is None: return "No file uploaded.", None try: file_path = file_obj.name file_name = os.path.basename(file_path) _, ext = os.path.splitext(file_path) if ext.lower() != '.zip': return "Please upload a ZIP file containing shapefile components.", None # Create a temporary directory to extract files with tempfile.TemporaryDirectory() as temp_dir: # Extract ZIP contents with zipfile.ZipFile(file_path, 'r') as zip_ref: zip_ref.extractall(temp_dir) # Find .shp file in the extracted contents, excluding __MACOSX directory shp_files = [] for root, _, files in os.walk(temp_dir): # Skip __MACOSX directory if '__MACOSX' in root: continue for file in files: if file.endswith('.shp'): shp_files.append(os.path.join(root, file)) if not shp_files: return "No .shp file found in the ZIP archive.", None # Use the first .shp file found shp_path = shp_files[0] try: # Read the shapefile regions_gdf = gpd.read_file(shp_path).to_crs("EPSG:4326") # Create a permanent directory for the shapefiles if it doesn't exist permanent_dir = os.path.join('data', 'uploaded_shapefiles') os.makedirs(permanent_dir, exist_ok=True) # Generate a unique subdirectory name using timestamp timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') target_dir = os.path.join(permanent_dir, f'shapefile_{timestamp}') os.makedirs(target_dir) # Copy all related files to the permanent location shp_base = os.path.splitext(shp_path)[0] for ext in ['.shp', '.shx', '.dbf', '.prj', '.cpg', '.sbn', '.sbx']: src_file = f"{shp_base}{ext}" if os.path.exists(src_file): shutil.copy2(src_file, target_dir) # Update the current shapefile path to point to the permanent location current_shp_path = os.path.join(target_dir, os.path.basename(shp_path)) return f"Shapefile uploaded and processed successfully. {len(regions_gdf)} features loaded.", file_name except Exception as e: return f"Error processing shapefile: {str(e)}", None except Exception as e: return f"Error processing ZIP upload: {str(e)}", None def print_route_info(df, shp_file_path, sample_checkbox=False, path_checkbox=False): """Print route information to console based on checkbox settings""" output_lines = [] for _, row in df.iterrows(): if sample_checkbox: date_str = pd.to_datetime(row['created']).strftime('%Y-%m-%d %H:%M:%S') output_lines.append(f"\nSample: {row['name']} ({date_str})") output_lines.append(f" - Vehicle: {row['vehicle_type']}") if path_checkbox: route = row['route'] if isinstance(row['route'], (dict, list)) else json.loads(row['route']) output_lines.append(" - Path list:") # Create GeoDataFrame for location lookup coords = [] for loc in route: if isinstance(loc, dict): if 'latitude' in loc and 'longitude' in loc: lat = float(loc['latitude']) / 360000.0 lng = float(loc['longitude']) / 360000.0 coords.append((lat, lng)) if coords: gdf_sample = gpd.GeoDataFrame( geometry=[Point(lon, lat) for lat, lon in coords], crs="EPSG:4326" ) # Load regions shapefile using provided path regions_gdf = gpd.read_file(shp_file_path).to_crs("EPSG:4326") # Join with regions joined = gpd.sjoin(gdf_sample, regions_gdf, how="left", predicate="within") # Get available columns for location info location_columns = [] for col in ['NAME_1', 'NAME_2', 'NAME_3', 'TYPE_3']: if col in joined.columns: location_columns.append(col) if location_columns: # Create location string based on available columns joined['location'] = joined[location_columns].astype(str).apply( lambda x: "_".join(str(val) for val in x), axis=1 ) else: # Fallback to coordinates if no matching columns found joined['location'] = joined.geometry.apply( lambda x: f"lat: {x.y:.6f}, lon: {x.x:.6f}" ) for _, point in joined.iterrows(): output_lines.append(f" - {point['location']}") output_lines.append("-" * 50) return "\n".join(output_lines) def get_colors(n, s=1.0, v=1.0): colors = [] for i in range(n): h = i / n s = s # Maximum saturation v = v # Maximum value/brightness r, g, b = colorsys.hsv_to_rgb(h, s, v) colors.append(f'#{int(r*255):02x}{int(g*255):02x}{int(b*255):02x}') return colors def cal_paths_folium(df, shp_file_path, n_samples=None, start_d=None, end_d=None, company=None, sample_checkbox=False, path_checkbox=False): log_messages = [] working_df = df.copy() log_messages.append(f"Initial dataframe size: {len(working_df)} rows") # Convert created column to datetime and remove timezone information working_df['created'] = pd.to_datetime(working_df['created']).dt.tz_localize(None) # Date filtering with better error handling and debugging if start_d: try: start_d = pd.to_datetime(start_d).normalize() log_messages.append(f"Filtering from date: {start_d}") working_df = working_df[working_df['created'] >= start_d] log_messages.append(f"After start date filter: {len(working_df)} rows") except Exception as e: log_messages.append(f"Error in start date filtering: {str(e)}") if end_d: try: end_d = pd.to_datetime(end_d).normalize() + pd.Timedelta(days=1) - pd.Timedelta(seconds=1) log_messages.append(f"Filtering until date: {end_d}") working_df = working_df[working_df['created'] <= end_d] log_messages.append(f"After end date filter: {len(working_df)} rows") except Exception as e: log_messages.append(f"Error in end date filtering: {str(e)}") # Company filtering with better error handling and debugging if company and company.strip(): try: log_messages.append(f"Filtering for company: {company}") working_df = working_df[working_df['name'].str.contains(company, na=False)] log_messages.append(f"After company filter: {len(working_df)} rows") except Exception as e: log_messages.append(f"Error in company filtering: {str(e)}") # Sample n if n_samples and len(working_df) > 0: working_df = working_df.sample(n=min(n_samples, len(working_df)), random_state=42) log_messages.append(f"After sampling: {len(working_df)} rows") # Print column names and a few rows for debugging log_messages.append(f"Columns in dataframe: {list(working_df.columns)}") if len(working_df) > 0: log_messages.append("First row sample:") log_messages.append(str(working_df.iloc[0])) # Generate colors colors = get_colors(max(1, len(working_df)), s=0.5, v=1.0) # Print route information if sample_checkbox or path_checkbox: console_output = print_route_info(working_df, shp_file_path, sample_checkbox, path_checkbox) log_messages.append(console_output) # Generate route data routes = [] for i, (_, row) in enumerate(working_df.iterrows()): # Convert route to dict/list if it's a string route = row['route'] if isinstance(row['route'], (dict, list)) else json.loads(row['route']) # Handle different possible formats of coordinates coords = [] for loc in route: if isinstance(loc, dict): # Handle 'latitude/longitude' format if 'latitude' in loc and 'longitude' in loc: lat = float(loc['latitude']) lng = float(loc['longitude']) # Scale coordinates if needed if abs(lat) > 90 or abs(lng) > 180: lat /= 360000.0 lng /= 360000.0 coords.append([lat, lng]) # Handle 'lat/lng' format elif 'lat' in loc and 'lng' in loc: lat = float(loc['lat']) lng = float(loc['lng']) # Scale coordinates if needed if abs(lat) > 90 or abs(lng) > 180: lat /= 360000.0 lng /= 360000.0 coords.append([lat, lng]) if coords: routes.append({ 'coordinates': coords, 'color': colors[i % len(colors)], 'company': str(row.get('name', 'Unknown')), 'created': row['created'].strftime('%Y-%m-%d %H:%M:%S') }) print(f"Generated {len(routes)} valid routes") log_messages.append(f"Generated {len(routes)} valid routes") # routes와 함께 로그 메시지도 반환 return routes, "\n".join(log_messages) def plot_paths_folium(routes, cluster_df=cluster_df, cluster_num_samples=None, cluster_company_search=None, cluster_date_start=None, cluster_date_end=None, map_location="Seoul", map_type="Satellite map", path_type="point+line", brightness=100): """Plot routes on a Folium map with customizable settings""" # Map center coordinates based on location selection centers = { "Korea": (36.5, 127.5), "Seoul": (37.5665, 126.9780), "Busan": (35.1796, 129.0756) } zoom_levels = { "Korea": 7, "Seoul": 12, "Busan": 12 } center = centers.get(map_location, centers["Korea"]) zoom_start = zoom_levels.get(map_location, 7) # Create map with appropriate type if map_type == "Satellite map": m = folium.Map(location=center, zoom_start=zoom_start, tiles='https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}', attr='Esri') else: m = folium.Map(location=center, zoom_start=zoom_start) path_fg = folium.FeatureGroup(name="Path").add_to(m) # Add routes to the map for route in routes: if path_type in ["point", "point+line"] and len(route['coordinates']) > 0: for i, coord in enumerate(route['coordinates']): x_icon_html = f'''
×
''' folium.DivIcon( html=x_icon_html ).add_to(folium.Marker( location=coord, popup=f"{route.get('company', 'Unknown')} - Point {i+1}" ).add_to(path_fg)) if path_type in ["line", "point+line"]: folium.PolyLine( route['coordinates'], color=route['color'], weight=0.5, dash_array='1, 1', # 점선 스타일 (선 길이, 간격) popup=route.get('company', 'Unknown') ).add_to(path_fg) cluster_df['t_pickup'] = pd.to_datetime(cluster_df['t_pickup']) if cluster_date_start: # Convert string to datetime without timezone cluster_date_start = pd.to_datetime(cluster_date_start).normalize() cluster_df = cluster_df[cluster_df['t_pickup'] >= cluster_date_start] if cluster_date_end: # Convert string to datetime without timezone cluster_date_end = pd.to_datetime(cluster_date_end).normalize() + pd.Timedelta(days=1) - pd.Timedelta(seconds=1) cluster_df = cluster_df[cluster_df['t_pickup'] <= cluster_date_end] if cluster_company_search: cluster_df = cluster_df.query("company.str.contains(@cluster_company_search)") if cluster_num_samples: cluster_df = cluster_df.sample(n=min(cluster_num_samples, len(cluster_df)), random_state=42) cluster_geo_fg = folium.FeatureGroup(name="Cluster Geo").add_to(m) cluster_pmi_fg = folium.FeatureGroup(name="Cluster PMI", show=False).add_to(m) cluster_geo_values = cluster_df['cluster_geo'].unique() cluster_pmi_values = cluster_df['cluster_pmi'].unique() # Create a mapping from cluster numbers to color indices cluster_geo_mapping = {val: idx for idx, val in enumerate(sorted(cluster_geo_values))} cluster_pmi_mapping = {val: idx for idx, val in enumerate(sorted(cluster_pmi_values))} cluster_geo_colors = get_colors(len(cluster_geo_values)) cluster_pmi_colors = get_colors(len(cluster_pmi_values)) for _, row in cluster_df.iterrows(): # Geo cluster markers remain as circles folium.CircleMarker( location=(row['latitude'], row['longitude']), popup=f"{row['company']} - Cluster {row['cluster_geo']}", radius=3, color=cluster_geo_colors[cluster_geo_mapping[row['cluster_geo']]], fill=True, fill_color=cluster_geo_colors[cluster_geo_mapping[row['cluster_geo']]], ).add_to(cluster_geo_fg) # PMI cluster markers as stars star_html = f'''
''' folium.DivIcon( html=star_html ).add_to(folium.Marker( location=(row['latitude'], row['longitude']), popup=f"{row['company']} - Cluster {row['cluster_pmi']}", ).add_to(cluster_pmi_fg)) # Group points by cluster for both geo and pmi geo_clusters = {} pmi_clusters = {} for _, row in cluster_df.iterrows(): # For geo clusters geo_cluster = row['cluster_geo'] if geo_cluster not in geo_clusters: geo_clusters[geo_cluster] = [] geo_clusters[geo_cluster].append((row['latitude'], row['longitude'])) # For pmi clusters pmi_cluster = row['cluster_pmi'] if pmi_cluster not in pmi_clusters: pmi_clusters[pmi_cluster] = [] pmi_clusters[pmi_cluster].append((row['latitude'], row['longitude'])) # Function to create a closed path by connecting nearest points def create_closed_path(points): if len(points) <= 1: return points # Start with the first point path = [points[0]] remaining_points = points[1:] # Keep finding the closest point until none are left while remaining_points: current = path[-1] # Find closest point to the current point closest_idx = 0 closest_dist = float('inf') for i, point in enumerate(remaining_points): dist = ((current[0] - point[0])**2 + (current[1] - point[1])**2)**0.5 if dist < closest_dist: closest_dist = dist closest_idx = i # Add the closest point to the path path.append(remaining_points[closest_idx]) remaining_points.pop(closest_idx) # Connect back to the first point to close the path path.append(path[0]) return path # Create polylines for geo clusters for cluster_num, points in geo_clusters.items(): if len(points) >= 2: # Need at least 2 points to make a line path = create_closed_path(points) folium.PolyLine( path, color=cluster_geo_colors[cluster_geo_mapping[cluster_num]], weight=2, ).add_to(cluster_geo_fg) # Create polylines for pmi clusters for cluster_num, points in pmi_clusters.items(): if len(points) >= 2: # Need at least 2 points to make a line path = create_closed_path(points) folium.PolyLine( path, color=cluster_pmi_colors[cluster_pmi_mapping[cluster_num]], weight=2, ).add_to(cluster_pmi_fg) # Create custom legend HTML with three scrollable sections legend_html = '''
Path Routes
''' # Add path routes to the legend with larger X symbol for route in routes: legend_html += f'''
×
{route.get('company', 'Unknown')}_{route.get('created', '')}
''' # Get unique cluster values from already filtered cluster_df visible_cluster_geo = sorted(cluster_df['cluster_geo'].unique()) visible_cluster_pmi = sorted(cluster_df['cluster_pmi'].unique()) # Add Cluster Geo section with larger circle symbol legend_html += '''
Cluster Geo
''' # Add only visible cluster geo information with larger circles for cluster_value in visible_cluster_geo: color = cluster_geo_colors[cluster_geo_mapping[cluster_value]] legend_html += f'''
Cluster {cluster_value}
''' # Add Cluster PMI section with larger star symbol legend_html += '''
Cluster PMI
''' # Add only visible cluster PMI information with larger stars for cluster_value in visible_cluster_pmi: color = cluster_pmi_colors[cluster_pmi_mapping[cluster_value]] legend_html += f'''
Cluster {cluster_value}
''' legend_html += '''
''' folium.LayerControl(collapsed=False).add_to(m) folium.plugins.Fullscreen( position="bottomright", title="Expand me", title_cancel="Exit me", force_separate_button=True, ).add_to(m) # Add the legend to the map m.get_root().html.add_child(folium.Element(legend_html)) # Add custom CSS for brightness control - only affecting the satellite tiles custom_css = f""" """ m.get_root().header.add_child(folium.Element(custom_css)) return m._repr_html_() def update_map(map_location, map_type, path_type, n_samples, company, date_start, date_end, cluster_num_samples, cluster_company_search, cluster_date_start, cluster_date_end, pick_all_date, sample_checkbox, path_checkbox, brightness_slider): """Update the map based on user selections""" global df, cluster_df, regions_gdf, current_shp_path log_messages = [] log_messages.append(f"Updating map with settings: Location={map_location}, Type={map_type}, Path={path_type}") # Check if data is loaded if df is None: log_messages.append("Loading default data because df is None") df_loaded, msg, _ = load_default_data() if df_loaded is None: return "No data available. Please upload a CSV file.", None else: log_messages.append(f"Using existing df with {len(df)} rows") try: # Process date filters with better error handling start_d = None end_d = None if not pick_all_date: if date_start and date_start.strip(): start_d = date_start log_messages.append(f"Using start date: {start_d}") if date_end and date_end.strip(): end_d = date_end log_messages.append(f"Using end date: {end_d}") else: log_messages.append("Using all dates") # Check if shapefile exists at current_shp_path if not os.path.exists(current_shp_path): log_messages.append(f"Warning: Shapefile not found at {current_shp_path}") # Try to find the most recently uploaded shapefile permanent_dir = os.path.join('data', 'uploaded_shapefiles') if os.path.exists(permanent_dir): subdirs = [os.path.join(permanent_dir, d) for d in os.listdir(permanent_dir) if os.path.isdir(os.path.join(permanent_dir, d))] if subdirs: # Get the most recent directory latest_dir = max(subdirs, key=os.path.getctime) # Find .shp file in that directory shp_files = [f for f in os.listdir(latest_dir) if f.endswith('.shp')] if shp_files: current_shp_path = os.path.join(latest_dir, shp_files[0]) log_messages.append(f"Using most recent shapefile: {current_shp_path}") # Calculate routes with full error reporting try: routes, cal_logs = cal_paths_folium(df, current_shp_path, n_samples=n_samples, start_d=start_d, end_d=end_d, company=company, sample_checkbox=sample_checkbox, path_checkbox=path_checkbox) log_messages.append(cal_logs) except Exception as e: log_messages.append(f"Error in route calculation: {str(e)}") import traceback log_messages.append(traceback.format_exc()) return "\n".join(log_messages), None # Check if we have routes to display if not routes: log_messages.append("No routes to display after applying filters.") empty_map = folium.Map(location=(36.5, 127.5), zoom_start=7) return "\n".join(log_messages), empty_map._repr_html_() # Create map html_output = plot_paths_folium(routes, cluster_df, cluster_num_samples, cluster_company_search, cluster_date_start, cluster_date_end, map_location, map_type, path_type, brightness_slider) return "\n".join(log_messages), html_output except Exception as e: error_msg = f"Error updating map: {str(e)}" log_messages.append(error_msg) import traceback log_messages.append(traceback.format_exc()) return "\n".join(log_messages), None # Initialize data def load_default_data(): """Load the default dataset""" global df, cluster_df, regions_gdf default_file = 'data/20250122_Order_List_202411_12_CJW.csv' default_cluster_file = 'data/path_clustering_2024.csv' default_gadm_shp_file = 'data/gadm41_KOR_shp/gadm41_KOR_3.shp' messages = [] path_filename = "" cluster_filename = "" shp_filename = "" # Try different encodings for the main file for encoding in ['utf-8', 'cp949', 'euc-kr']: try: df = pd.read_csv(default_file, engine='python', encoding=encoding) path_filename = os.path.basename(default_file) messages.append(f"Path file loaded successfully: {path_filename}") break except UnicodeDecodeError: continue except Exception as e: messages.append(f"Error loading path file: {str(e)}") return None, None, None, "\n".join(messages), "", "", "" # Try different encodings for the cluster file for encoding in ['utf-8', 'cp949', 'euc-kr']: try: cluster_df = pd.read_csv(default_cluster_file, engine='python', encoding=encoding) cluster_filename = os.path.basename(default_cluster_file) messages.append(f"Cluster file loaded successfully: {cluster_filename}") break except UnicodeDecodeError: continue except Exception as e: messages.append(f"Error loading cluster file: {str(e)}") return None, None, None, "\n".join(messages), "", "", "" # Load shapefile try: regions_gdf = gpd.read_file(default_gadm_shp_file).to_crs("EPSG:4326") shp_filename = os.path.basename(default_gadm_shp_file) messages.append(f"Shapefile loaded successfully: {shp_filename}") except Exception as e: messages.append(f"Error loading shapefile: {str(e)}") return None, None, None, "\n".join(messages), "", "", "" return df, cluster_df, regions_gdf, "\n".join(messages), path_filename, cluster_filename, shp_filename init_n_samples = 20 init_path_company_search = "백년화편" init_path_date_start = "2024-12-01" init_path_date_end = "2024-12-31" init_cluster_num_samples = 200 init_cluster_date_start = "2025-02-24" init_cluster_date_end = "2025-02-24" init_brightness = 50 init_df, init_cluster_df, init_regions_gdf, init_msg, init_path_file, init_cluster_file, init_shp_file = load_default_data() # Initial map init_shp_file_path = 'data/gadm41_KOR_shp/gadm41_KOR_3.shp' init_routes, _ = cal_paths_folium(df, init_shp_file_path, n_samples=init_n_samples, start_d=init_path_date_start, end_d=init_path_date_end, company=init_path_company_search) if df is not None else ([], "") init_html = plot_paths_folium(routes=init_routes, cluster_df=init_cluster_df, cluster_num_samples=init_cluster_num_samples, cluster_date_start=init_cluster_date_start, cluster_date_end=init_cluster_date_end, brightness=init_brightness) if init_routes else None # Create Gradio interface with gr.Blocks() as demo: # Layout with gr.Column(): # Map controls with gr.Row(): map_location = gr.Radio( ["Korea", "Seoul", "Busan"], label="Map Location Shortcuts", value="Seoul" ) map_type = gr.Radio( ["Normal map", "Satellite map"], label="Map Type", value="Satellite map" ) path_type = gr.Radio( ["point", "line", "point+line"], label="Path Type", value="point+line" ) brightness_slider = gr.Slider( minimum=1, maximum=300, value=50, step=1, label="Map Brightness (%)" ) # Map display map_html = gr.HTML(init_html, elem_classes=["map-container"]) generate_btn = gr.Button("Generate Map") # Filter controls with gr.Column(): with gr.Row(): path_file_upload = gr.File(label="Upload Path File", height=89, file_count="single", scale=1) path_current_file = gr.Textbox(label="Current Path File", value=init_path_file, scale=2) with gr.Row(): cluster_file_upload = gr.File(label="Upload Cluster File", height=89, file_count="single", scale=1) cluster_current_file = gr.Textbox(label="Current Cluster File", value=init_cluster_file, scale=2) with gr.Row(): gadm_shp_upload = gr.File(label="Upload gadm .zip File", height=89, file_count="single", scale=1) gadm_shp_current_file = gr.Textbox(label="Current gadm .zip File", value=init_shp_file, scale=2) with gr.Row(): with gr.Row(): path_num_samples = gr.Number(label="Path Sample Count", precision=0, value=20, scale=1, minimum=1, maximum=200) path_company_search = gr.Textbox(label="Path Company Search", value="백년화편", scale=2) with gr.Row(): cluster_num_samples = gr.Number(label="Cluster Sample Count", precision=0, value=200, scale=1, minimum=1, maximum=200) cluster_company_search = gr.Textbox(label="Cluster Company Search", scale=2) # Date range with gr.Row(): with gr.Row(): path_date_start = gr.Textbox(label="Path Start Date", placeholder="YYYY-MM-DD", value="2024-12-01") path_date_end = gr.Textbox(label="Path End Date", placeholder="YYYY-MM-DD", value="2024-12-31") with gr.Row(): cluster_date_start = gr.Textbox(label="Cluster Start Date", placeholder="YYYY-MM-DD", value="2025-02-24") cluster_date_end = gr.Textbox(label="Cluster End Date", placeholder="YYYY-MM-DD", value="2025-02-24") # Checkboxes with gr.Row(): pick_all_date = gr.Checkbox(label="Select All Dates") sample_checkbox = gr.Checkbox(label="Print Sample", value=True) path_checkbox = gr.Checkbox(label="Print Path") # Console console = gr.Textbox( label="Console", lines=10, max_lines=100, interactive=False, value=init_msg, elem_classes=["console"] ) # Style gr.Markdown(""" """) # Event handlers path_file_upload.upload( fn=process_upload, inputs=[path_file_upload], outputs=[console, path_current_file] ) cluster_file_upload.upload( fn=process_cluster_upload, inputs=[cluster_file_upload], outputs=[console, cluster_current_file] ) gadm_shp_upload.upload( fn=process_shp_upload, inputs=[gadm_shp_upload], outputs=[console, gadm_shp_current_file] ) generate_btn.click( fn=update_map, inputs=[ map_location, map_type, path_type, path_num_samples, path_company_search, path_date_start, path_date_end, cluster_num_samples, cluster_company_search, cluster_date_start, cluster_date_end, pick_all_date, sample_checkbox, path_checkbox, brightness_slider ], outputs=[console, map_html] ) # Auto-update radio buttons for control in [map_location, map_type, path_type, brightness_slider]: control.change( fn=update_map, inputs=[ map_location, map_type, path_type, path_num_samples, path_company_search, path_date_start, path_date_end, cluster_num_samples, cluster_company_search, cluster_date_start, cluster_date_end, pick_all_date, sample_checkbox, path_checkbox, brightness_slider ], outputs=[console, map_html] ) # Launch the app demo.launch(share=True)