File size: 5,926 Bytes
be096d1
6e9dc4d
 
 
ce5e315
 
6e9dc4d
ce5e315
 
be096d1
ce5e315
 
be096d1
ce5e315
 
 
6e9dc4d
ce5e315
 
6e9dc4d
ce5e315
 
 
 
 
 
 
 
 
 
 
 
cf36ecc
 
 
df1519d
ce5e315
 
 
 
 
5f830c6
ce5e315
063d83e
ce5e315
 
6e9dc4d
ce5e315
 
 
 
6e9dc4d
ce5e315
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6e9dc4d
ce5e315
 
 
 
 
 
 
063d83e
ce5e315
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
063d83e
ce5e315
 
 
 
 
063d83e
 
ce5e315
 
 
 
 
 
 
 
 
063d83e
be096d1
ce5e315
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
063d83e
ce5e315
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
be096d1
ce5e315
39ee1aa
ce5e315
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import gradio as gr
import pandas as pd
import folium
from geopy.geocoders import Nominatim
from geopy.extra.rate_limiter import RateLimiter
import tempfile
import time
from typing import Optional, Tuple
import warnings

# Suppress geopy warnings
warnings.filterwarnings("ignore", category=UserWarning, module="geopy")

# Configure geocoder
class Geocoder:
    def __init__(self):
        self.geolocator = Nominatim(
            user_agent="historical_data_mapper",
            timeout=10
        )
        self.geocode = RateLimiter(
            self.geolocator.geocode,
            min_delay_seconds=1,
            max_retries=2,
            error_wait_seconds=5
        )
        self.cache = {}
        
    def get_coordinates(self, location: str) -> Optional[Tuple[float, float]]:
        if not location or pd.isna(location):
            return None
            
        if location in self.cache:
            return self.cache[location]
            
        try:
            location_data = self.geocode(location)
            if location_data:
                coords = (location_data.latitude, location_data.longitude)
                self.cache[location] = coords
                return coords
        except Exception as e:
            print(f"Geocoding error for '{location}': {str(e)}")
            
        self.cache[location] = None
        return None

def create_interactive_map(df: pd.DataFrame, location_column: str) -> str:
    """Create a folium map with all valid locations"""
    geocoder = Geocoder()
    valid_locations = []
    
    # Process all unique locations
    unique_locations = df[location_column].dropna().unique()
    
    for loc in unique_locations:
        coords = geocoder.get_coordinates(str(loc))
        if coords:
            valid_locations.append((loc, coords))
    
    if not valid_locations:
        return "<div style='color:red;text-align:center'>No valid locations found</div>"
    
    # Create map centered on first location
    m = folium.Map(
        location=valid_locations[0][1],
        zoom_start=5,
        tiles="CartoDB positron",
        control_scale=True
    )
    
    # Add all markers
    for loc, coords in valid_locations:
        folium.Marker(
            location=coords,
            popup=folium.Popup(loc, max_width=300),
            icon=folium.Icon(color="blue", icon="info-sign")
        ).add_to(m)
    
    # Fit bounds if multiple locations
    if len(valid_locations) > 1:
        m.fit_bounds([coords for _, coords in valid_locations])
    
    # Return HTML string
    return m._repr_html_()

def process_data(file_obj, column_name: str):
    """Process uploaded file and return results"""
    start_time = time.time()
    
    if not file_obj:
        return None, "Please upload a file", None
    
    try:
        # Read input file
        df = pd.read_excel(file_obj.name)
        
        # Validate column exists
        if column_name not in df.columns:
            return None, f"Column '{column_name}' not found in data", None
        
        # Create map
        map_html = create_interactive_map(df, column_name)
        
        # Create processed output
        with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as tmp_file:
            df.to_excel(tmp_file.name, index=False)
            processed_path = tmp_file.name
        
        # Generate statistics
        total_rows = len(df)
        unique_locations = df[column_name].nunique()
        processing_time = round(time.time() - start_time, 2)
        
        stats = (
            f"Total rows processed: {total_rows}\n"
            f"Unique locations found: {unique_locations}\n"
            f"Processing time: {processing_time}s"
        )
        
        return (
            f"<div style='width:100%; height:65vh'>{map_html}</div>",
            stats,
            processed_path
        )
        
    except Exception as e:
        error_msg = f"Error processing file: {str(e)}"
        print(error_msg)
        return None, error_msg, None

# Gradio Interface
with gr.Blocks(
    title="Historical Data Mapper",
    theme=gr.themes.Soft()
) as app:
    gr.Markdown("# Historical Data Analysis Tools")
    
    with gr.Tab("Location Mapping"):
        gr.Markdown("### Geocode and visualize location data from Excel files")
        
        with gr.Row():
            with gr.Column(scale=1):
                file_input = gr.File(
                    label="Upload Excel File",
                    type="file",
                    file_types=[".xlsx", ".xls"]
                )
                column_input = gr.Textbox(
                    label="Location Column Name",
                    value="dateline_locations",
                    placeholder="Enter the column containing location names"
                )
                process_btn = gr.Button(
                    "Process and Map",
                    variant="primary"
                )
                
            with gr.Column(scale=2):
                map_display = gr.HTML(
                    label="Interactive Map",
                    value="<div style='text-align:center;padding:20px;'>"
                          "Map will appear here after processing"
                          "</div>"
                )
                stats_output = gr.Textbox(
                    label="Processing Statistics",
                    interactive=False
                )
                download_output = gr.File(
                    label="Download Processed Data",
                    visible=False
                )
        
        # Configure button action
        process_btn.click(
            fn=process_data,
            inputs=[file_input, column_input],
            outputs=[map_display, stats_output, download_output]
        )

# Launch settings
if __name__ == "__main__":
    app.launch(
        server_name="0.0.0.0",
        server_port=7860,
        share=False
    )