Spaces:
Sleeping
Sleeping
import gradio as gr | |
import json | |
import requests | |
import os | |
import pandas as pd | |
import folium | |
from folium.plugins import MeasureControl, Fullscreen, MarkerCluster | |
from geopy.geocoders import Nominatim | |
from geopy.exc import GeocoderTimedOut, GeocoderServiceError | |
import time | |
import random | |
from typing import List, Tuple, Optional | |
import io | |
import tempfile | |
import warnings | |
warnings.filterwarnings("ignore") | |
# Map Tile Providers with reliable sources | |
MAP_TILES = { | |
"GreenMap": { | |
"url": "https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}", | |
"attr": "Esri" | |
} | |
} | |
# NuExtract API configuration | |
API_URL = "https://api-inference.huggingface.co/models/numind/NuExtract-1.5" | |
headers = {"Authorization": f"Bearer {os.environ.get('HF_TOKEN', '')}"} | |
class SafeGeocoder: | |
def __init__(self): | |
user_agent = f"location_mapper_v1_{random.randint(1000, 9999)}" | |
self.geolocator = Nominatim(user_agent=user_agent, timeout=10) | |
self.cache = {} | |
self.last_request = 0 | |
def _respect_rate_limit(self): | |
current_time = time.time() | |
elapsed = current_time - self.last_request | |
if elapsed < 1.0: | |
time.sleep(1.0 - elapsed) | |
self.last_request = time.time() | |
def get_coords(self, location: str): | |
if not location or pd.isna(location): | |
return None | |
location = str(location).strip() | |
if location in self.cache: | |
return self.cache[location] | |
try: | |
self._respect_rate_limit() | |
result = self.geolocator.geocode(location) | |
if result: | |
coords = (result.latitude, result.longitude) | |
self.cache[location] = coords | |
return coords | |
self.cache[location] = None | |
return None | |
except Exception as e: | |
print(f"Geocoding error for '{location}': {e}") | |
self.cache[location] = None | |
return None | |
def extract_info(template, text): | |
try: | |
prompt = f"<|input|>\n### Template:\n{template}\n### Text:\n{text}\n\n<|output|>" | |
payload = { | |
"inputs": prompt, | |
"parameters": { | |
"max_new_tokens": 1000, | |
"do_sample": False | |
} | |
} | |
response = requests.post(API_URL, headers=headers, json=payload) | |
if response.status_code == 503: | |
response_json = response.json() | |
if "error" in response_json and "loading" in response_json["error"]: | |
estimated_time = response_json.get("estimated_time", "unknown") | |
return f"β³ Model is loading (ETA: {int(float(estimated_time)) if isinstance(estimated_time, (int, float, str)) else 'unknown'} seconds)", "Please try again in a few minutes" | |
if response.status_code != 200: | |
return f"β API Error: {response.status_code}", response.text | |
result = response.json() | |
if isinstance(result, list) and len(result) > 0: | |
result_text = result[0].get("generated_text", "") | |
else: | |
result_text = str(result) | |
if "<|output|>" in result_text: | |
json_text = result_text.split("<|output|>")[1].strip() | |
else: | |
json_text = result_text | |
try: | |
extracted = json.loads(json_text) | |
formatted = json.dumps(extracted, indent=2) | |
except json.JSONDecodeError: | |
return "β JSON parsing error", json_text | |
return "β Success", formatted | |
except Exception as e: | |
return f"β Error: {str(e)}", "{}" | |
def create_map(df, location_col): | |
m = folium.Map( | |
location=[20, 0], | |
zoom_start=2, | |
control_scale=True | |
) | |
folium.TileLayer( | |
tiles=MAP_TILES["GreenMap"]["url"], | |
attr=MAP_TILES["GreenMap"]["attr"], | |
name="GreenMap", | |
overlay=False, | |
control=False | |
).add_to(m) | |
Fullscreen().add_to(m) | |
MeasureControl(position='topright', primary_length_unit='kilometers').add_to(m) | |
geocoder = SafeGeocoder() | |
coords = [] | |
marker_cluster = MarkerCluster(name="Locations").add_to(m) | |
processed_count = 0 | |
for idx, row in df.iterrows(): | |
if pd.isna(row[location_col]): | |
continue | |
location = str(row[location_col]).strip() | |
additional_info = "" | |
for col in df.columns: | |
if col != location_col and not pd.isna(row[col]): | |
additional_info += f"<br><b>{col}:</b> {row[col]}" | |
try: | |
locations = [loc.strip() for loc in location.split(',') if loc.strip()] | |
if not locations: | |
locations = [location] | |
except: | |
locations = [location] | |
for loc in locations: | |
point = geocoder.get_coords(loc) | |
if point: | |
popup_content = f""" | |
<div style="min-width: 200px; max-width: 300px"> | |
<h4 style="font-family: 'Source Sans Pro', sans-serif; margin-bottom: 5px;">{loc}</h4> | |
<div style="font-family: 'Source Sans Pro', sans-serif; font-size: 14px;"> | |
{additional_info} | |
</div> | |
</div> | |
""" | |
folium.Marker( | |
location=point, | |
popup=folium.Popup(popup_content, max_width=300), | |
tooltip=loc, | |
icon=folium.Icon(color="blue", icon="info-sign") | |
).add_to(marker_cluster) | |
coords.append(point) | |
processed_count += 1 | |
if coords: | |
m.fit_bounds(coords) | |
custom_css = """ | |
<style> | |
@import url('https://fonts.googleapis.com/css2?family=Source+Sans+Pro:wght@400;600&display=swap'); | |
.leaflet-container { | |
font-family: 'Source Sans Pro', sans-serif; | |
} | |
.leaflet-popup-content { | |
font-family: 'Source Sans Pro', sans-serif; | |
} | |
.leaflet-popup-content h4 { | |
font-weight: 600; | |
margin-bottom: 8px; | |
} | |
</style> | |
""" | |
m.get_root().header.add_child(folium.Element(custom_css)) | |
return m._repr_html_(), processed_count | |
def process_excel(file, places_column): | |
if file is None: | |
return None, "No file uploaded", None | |
try: | |
if hasattr(file, 'name'): | |
df = pd.read_excel(file.name) | |
elif isinstance(file, bytes): | |
df = pd.read_excel(io.BytesIO(file)) | |
else: | |
df = pd.read_excel(file) | |
print(f"Columns in Excel file: {list(df.columns)}") | |
if places_column not in df.columns: | |
return None, f"Column '{places_column}' not found in the Excel file. Available columns: {', '.join(df.columns)}", None | |
map_html, processed_count = create_map(df, places_column) | |
with tempfile.NamedTemporaryFile(suffix=".xlsx", delete=False) as tmp: | |
processed_path = tmp.name | |
df.to_excel(processed_path, index=False) | |
total_locations = df[places_column].count() | |
success_rate = (processed_count / total_locations * 100) if total_locations > 0 else 0 | |
stats = f"Found {processed_count} of {total_locations} locations ({success_rate:.1f}%)" | |
return map_html, stats, processed_path | |
except Exception as e: | |
import traceback | |
trace = traceback.format_exc() | |
print(f"Error processing file: {e}\n{trace}") | |
return None, f"Error processing file: {str(e)}", None | |
custom_css = """ | |
<style> | |
@import url('https://fonts.googleapis.com/css2?family=Source+Sans+Pro:wght@300;400;600;700&display=swap'); | |
body, .gradio-container { | |
font-family: 'Source Sans Pro', sans-serif !important; | |
color: #333333; | |
} | |
h1 { | |
font-weight: 700 !important; | |
color: #2c6bb3 !important; | |
font-size: 2.5rem !important; | |
margin-bottom: 1rem !important; | |
} | |
h2 { | |
font-weight: 600 !important; | |
color: #4e8fd1 !important; | |
font-size: 1.5rem !important; | |
margin-top: 1rem !important; | |
margin-bottom: 0.75rem !important; | |
} | |
.gradio-button.primary { | |
background-color: #ff7518 !important; | |
} | |
.info-box { | |
background-color: #e8f4fd; | |
border-left: 4px solid #2c6bb3; | |
padding: 15px; | |
margin: 15px 0; | |
border-radius: 4px; | |
} | |
.file-upload-box { | |
border: 2px dashed #e0e0e0; | |
border-radius: 8px; | |
padding: 20px; | |
text-align: center; | |
transition: all 0.3s ease; | |
} | |
/* Fix for map container spacing */ | |
#map-container { | |
height: 65vh !important; | |
margin-bottom: 0 !important; | |
padding-bottom: 0 !important; | |
} | |
/* Stats box styling */ | |
.stats-box { | |
margin-top: 10px !important; | |
margin-bottom: 0 !important; | |
padding: 10px; | |
background: #f8f9fa; | |
border-radius: 4px; | |
} | |
/* Remove extra space around components */ | |
.gr-box { | |
margin-bottom: 0 !important; | |
} | |
</style> | |
""" | |
with gr.Blocks(css=custom_css, title="Historical Data Analysis") as demo: | |
gr.HTML(""" | |
<div style="text-align: center; margin-bottom: 1rem"> | |
<h1>Historical Data Analysis Tools</h1> | |
<p style="font-size: 1.1rem; margin-top: -10px;">Extract, visualize, and analyze historical data with ease</p> | |
</div> | |
""") | |
with gr.Tabs() as tabs: | |
with gr.TabItem("π Text Extraction"): | |
gr.HTML(""" | |
<div class="info-box"> | |
<h3 style="margin-top: 0;">Extract Structured Data from Text</h3> | |
<p>Use NuExtract-1.5 to automatically extract structured information from historical texts.</p> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
template = gr.Textbox( | |
label="JSON Template", | |
value='{"earthquake location": "", "dateline location": ""}', | |
lines=5 | |
) | |
text = gr.Textbox( | |
label="Text to Extract From", | |
value="Neues Erdbeben in Japan. Aus Tokio wird berichtet...", | |
lines=8 | |
) | |
extract_btn = gr.Button("Extract Information", variant="primary") | |
with gr.Column(): | |
status = gr.Textbox(label="Status") | |
output = gr.Textbox(label="Output", lines=10) | |
extract_btn.click( | |
fn=extract_info, | |
inputs=[template, text], | |
outputs=[status, output] | |
) | |
with gr.TabItem("π Location Mapping"): | |
gr.HTML(""" | |
<div class="info-box"> | |
<h3 style="margin-top: 0;">Map Your Historical Locations</h3> | |
<p>Upload an Excel file containing location data to create an interactive map visualization.</p> | |
</div> | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
excel_file = gr.File( | |
label="Upload Excel File", | |
file_types=[".xlsx", ".xls"], | |
elem_classes="file-upload-box" | |
) | |
places_column = gr.Textbox( | |
label="Location Column Name", | |
value="dateline_locations", | |
placeholder="Enter the column containing locations" | |
) | |
process_btn = gr.Button("Generate Map", variant="primary") | |
with gr.Column(): | |
map_output = gr.HTML( | |
label="Interactive Map", | |
value=""" | |
<div style="text-align:center; height:65vh; width:100%; display:flex; align-items:center; justify-content:center; | |
background-color:#f5f5f5; border:1px solid #e0e0e0; border-radius:8px;"> | |
<div> | |
<img src="https://cdn-icons-png.flaticon.com/512/854/854878.png" width="100"> | |
<p style="margin-top:20px; color:#666;">Your map will appear here after processing</p> | |
</div> | |
</div> | |
""", | |
elem_id="map-container" | |
) | |
stats_output = gr.Textbox( | |
label="Location Statistics", | |
lines=2, | |
elem_classes="stats-box" | |
) | |
processed_file = gr.File( | |
label="Download Processed Data", | |
visible=True, | |
interactive=False | |
) | |
def process_and_map(file, column): | |
if file is None: | |
return None, "Please upload an Excel file", None | |
try: | |
map_html, stats, processed_path = process_excel(file, column) | |
if map_html and processed_path: | |
responsive_html = f""" | |
<div style="width:100%; height:65vh; margin:0; padding:0; border:1px solid #e0e0e0; border-radius:8px; overflow:hidden;"> | |
{map_html} | |
</div> | |
""" | |
return responsive_html, stats, processed_path | |
else: | |
return None, stats, None | |
except Exception as e: | |
import traceback | |
trace = traceback.format_exc() | |
print(f"Error in process_and_map: {e}\n{trace}") | |
return None, f"Error: {str(e)}", None | |
process_btn.click( | |
fn=process_and_map, | |
inputs=[excel_file, places_column], | |
outputs=[map_output, stats_output, processed_file] | |
) | |
gr.HTML(""" | |
<div style="text-align: center; margin-top: 2rem; padding-top: 1rem; border-top: 1px solid #eee; font-size: 0.9rem; color: #666;"> | |
<p>Made with <span style="color: #e25555;">β€</span> for historical research</p> | |
</div> | |
""") | |
if __name__ == "__main__": | |
demo.launch() |