Spaces:
Running
Running
import gradio as gr | |
import pandas as pd | |
import numpy as np | |
import tempfile | |
import plotly.graph_objects as go | |
import plotly.express as px | |
from urllib.parse import urlencode | |
from plotly.subplots import make_subplots | |
import requests | |
from datetime import datetime, timedelta | |
import warnings | |
import json | |
warnings.filterwarnings('ignore') | |
class EnhancedOceanClimateAgent: | |
def __init__(self): | |
self.anomaly_threshold = 2.0 | |
self.critical_temp_change = 1.5 | |
# Fixed NOAA API base URL | |
self.noaa_base_url = "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter" | |
self.noaa_stations_url = "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/stations.json" | |
# Popular NOAA stations for different regions | |
self.default_stations = { | |
"San Francisco, CA": "9414290", | |
"New York, NY": "8518750", | |
"Miami, FL": "8723214", | |
"Seattle, WA": "9447130", | |
"Boston, MA": "8443970", | |
"Los Angeles, CA": "9410660", | |
"Galveston, TX": "8771450", | |
"Charleston, SC": "8665530" | |
} | |
def get_noaa_data(self, station_id, product, begin_date, end_date, units="metric"): | |
"""Fetch NOAA data for a given product and date range""" | |
# Format dates for NOAA API | |
begin_str = begin_date.strftime("%Y%m%d %H:%M") | |
end_str = end_date.strftime("%Y%m%d %H:%M") | |
params = { | |
'station': station_id, | |
'product': product, | |
'begin_date': begin_str, | |
'end_date': end_str, | |
'datum': 'MLLW', # Mean Lower Low Water | |
'application': 'OceanClimateAgent', | |
'time_zone': 'gmt', | |
'units': units, | |
'format': 'json' | |
} | |
try: | |
print(f"Fetching {product} data for station {station_id}") | |
print(f"Date range: {begin_str} to {end_str}") | |
response = requests.get(self.noaa_base_url, params=params, timeout=30) | |
if response.status_code != 200: | |
print(f"HTTP Error {response.status_code}: {response.text}") | |
return None | |
data = response.json() | |
if 'data' in data and data['data']: | |
print(f"Successfully fetched {len(data['data'])} records for {product}") | |
return pd.DataFrame(data['data']) | |
elif 'error' in data: | |
print(f"NOAA API error for {product}: {data['error']['message']}") | |
return None | |
else: | |
print(f"No data returned for {product}") | |
return None | |
except requests.exceptions.Timeout: | |
print(f"Timeout fetching {product} data") | |
return None | |
except requests.exceptions.RequestException as e: | |
print(f"Request failed for {product}: {str(e)}") | |
return None | |
except json.JSONDecodeError as e: | |
print(f"JSON decode error for {product}: {str(e)}") | |
return None | |
except Exception as e: | |
print(f"Unexpected error fetching {product}: {str(e)}") | |
return None | |
def get_comprehensive_station_data(self, station_name, days_back=30): | |
"""Get comprehensive data from a NOAA station""" | |
station_id = self.default_stations.get(station_name) | |
if not station_id: | |
return None, "Station not found" | |
# Ensure end_date is not in the future and allow for some buffer | |
end_date = datetime.utcnow() - timedelta(hours=2) # 2 hour buffer | |
start_date = end_date - timedelta(days=days_back) | |
print(f"Fetching data for {station_name} (ID: {station_id})") | |
print(f"Date range: {start_date} to {end_date}") | |
# Priority order - start with most reliable products | |
products_to_fetch = [ | |
('water_level', 'water_level'), | |
('water_temperature', 'water_temperature'), | |
('air_temperature', 'air_temperature'), | |
('wind', 'wind'), | |
('air_pressure', 'air_pressure') | |
] | |
all_data = {} | |
success_count = 0 | |
for product_name, product_code in products_to_fetch: | |
print(f"Attempting to fetch {product_name}...") | |
data = self.get_noaa_data(station_id, product_code, start_date, end_date) | |
if data is not None and not data.empty: | |
all_data[product_name] = data | |
success_count += 1 | |
print(f"{product_name}: {len(data)} records") | |
else: | |
print(f"{product_name}: No data available") | |
if success_count == 0: | |
return None, f"No data available for station {station_name} in the specified time period. This could be due to: station maintenance, data processing delays, or the station may not support the requested data types." | |
print(f"Successfully retrieved {success_count}/{len(products_to_fetch)} data types") | |
return all_data, f"Successfully retrieved {success_count}/{len(products_to_fetch)} data types" | |
def process_noaa_data(self, raw_data): | |
"""Process and combine NOAA data for analysis""" | |
if not raw_data: | |
return None | |
base_df = None | |
# Start with water level data if available (most common) | |
if 'water_level' in raw_data: | |
df = raw_data['water_level'].copy() | |
df['datetime'] = pd.to_datetime(df['t']) | |
df['water_level'] = pd.to_numeric(df['v'], errors='coerce') | |
base_df = df[['datetime', 'water_level']].copy() | |
print(f"Base dataset: water_level with {len(base_df)} records") | |
# If no water level, try other datasets | |
if base_df is None: | |
for product_name in ['water_temperature', 'air_temperature', 'wind', 'air_pressure']: | |
if product_name in raw_data: | |
df = raw_data[product_name].copy() | |
df['datetime'] = pd.to_datetime(df['t']) | |
if product_name == 'wind': | |
df['wind_speed'] = pd.to_numeric(df['s'], errors='coerce') | |
base_df = df[['datetime', 'wind_speed']].copy() | |
else: | |
column_name = product_name.replace('_temperature', '_temp') | |
df[column_name] = pd.to_numeric(df['v'], errors='coerce') | |
base_df = df[['datetime', column_name]].copy() | |
print(f"Base dataset: {product_name} with {len(base_df)} records") | |
break | |
if base_df is None: | |
return None | |
# Add other parameters when available | |
if 'water_temperature' in raw_data and 'water_temp' not in base_df.columns: | |
temp_df = raw_data['water_temperature'].copy() | |
temp_df['datetime'] = pd.to_datetime(temp_df['t']) | |
temp_df['water_temp'] = pd.to_numeric(temp_df['v'], errors='coerce') | |
base_df = base_df.merge(temp_df[['datetime', 'water_temp']], on='datetime', how='outer') | |
if 'air_temperature' in raw_data and 'air_temp' not in base_df.columns: | |
air_temp_df = raw_data['air_temperature'].copy() | |
air_temp_df['datetime'] = pd.to_datetime(air_temp_df['t']) | |
air_temp_df['air_temp'] = pd.to_numeric(air_temp_df['v'], errors='coerce') | |
base_df = base_df.merge(air_temp_df[['datetime', 'air_temp']], on='datetime', how='outer') | |
if 'wind' in raw_data and 'wind_speed' not in base_df.columns: | |
wind_df = raw_data['wind'].copy() | |
wind_df['datetime'] = pd.to_datetime(wind_df['t']) | |
wind_df['wind_speed'] = pd.to_numeric(wind_df['s'], errors='coerce') | |
wind_df['wind_direction'] = pd.to_numeric(wind_df['d'], errors='coerce') | |
base_df = base_df.merge(wind_df[['datetime', 'wind_speed', 'wind_direction']], on='datetime', how='outer') | |
if 'air_pressure' in raw_data and 'air_pressure' not in base_df.columns: | |
pressure_df = raw_data['air_pressure'].copy() | |
pressure_df['datetime'] = pd.to_datetime(pressure_df['t']) | |
pressure_df['air_pressure'] = pd.to_numeric(pressure_df['v'], errors='coerce') | |
base_df = base_df.merge(pressure_df[['datetime', 'air_pressure']], on='datetime', how='outer') | |
# Sort by datetime and remove duplicates | |
base_df = base_df.sort_values('datetime').drop_duplicates(subset=['datetime']) | |
print(f"Final processed dataset: {len(base_df)} records with {len(base_df.columns)-1} parameters") | |
return base_df | |
def detect_anomalies(self, data, column, window=24): # 24 hours for hourly data | |
"""Detect anomalies using rolling statistics""" | |
if column not in data.columns or data[column].isna().all(): | |
return pd.Series([False] * len(data)), pd.Series([0] * len(data)) | |
rolling_mean = data[column].rolling(window=window, center=True, min_periods=1).mean() | |
rolling_std = data[column].rolling(window=window, center=True, min_periods=1).std() | |
# Avoid division by zero | |
rolling_std = rolling_std.fillna(1) | |
rolling_std = rolling_std.replace(0, 1) | |
z_scores = np.abs((data[column] - rolling_mean) / rolling_std) | |
anomalies = z_scores > self.anomaly_threshold | |
return anomalies, z_scores | |
def calculate_trends(self, data, column, hours=168): # 7 days | |
"""Calculate trend over specified period""" | |
if column not in data.columns or data[column].isna().all(): | |
return 0 | |
recent_data = data.tail(hours) | |
if len(recent_data) < 2: | |
return 0 | |
x = np.arange(len(recent_data)) | |
y = recent_data[column].dropna() | |
if len(y) < 2: | |
return 0 | |
x = x[:len(y)] | |
slope = np.polyfit(x, y, 1)[0] if len(x) > 1 else 0 | |
return slope | |
def generate_climate_analysis(self, data, station_name): | |
"""Generate comprehensive climate analysis""" | |
if data is None or data.empty: | |
return {}, [] | |
analysis = {} | |
alerts = [] | |
# Water level analysis | |
if 'water_level' in data.columns: | |
wl_trend = self.calculate_trends(data, 'water_level') | |
analysis['water_level_trend'] = wl_trend * 24 # per day | |
if abs(wl_trend * 24) > 5: # >5cm per day change | |
alerts.append(f"Significant water level change: {wl_trend*24:.1f}cm/day at {station_name}") | |
# Temperature analysis | |
for temp_col in ['water_temp', 'air_temp']: | |
if temp_col in data.columns: | |
temp_trend = self.calculate_trends(data, temp_col) | |
analysis[f'{temp_col}_trend'] = temp_trend * 24 # per day | |
if temp_trend * 24 > 0.5: # >0.5°C per day | |
alerts.append(f"Rapid {temp_col.replace('_', ' ')} rise: {temp_trend*24:.2f}°C/day at {station_name}") | |
# Anomaly detection | |
for col in ['water_level', 'water_temp', 'air_temp', 'wind_speed']: | |
if col in data.columns: | |
anomalies, z_scores = self.detect_anomalies(data, col) | |
anomaly_pct = (anomalies.sum() / len(data)) * 100 | |
analysis[f'{col}_anomaly_frequency'] = anomaly_pct | |
if anomaly_pct > 10: | |
alerts.append(f"High {col.replace('_', ' ')} anomaly frequency: {anomaly_pct:.1f}% at {station_name}") | |
if not alerts: | |
alerts.append(f"No significant anomalies detected at {station_name}") | |
return analysis, alerts | |
# Initialize the enhanced agent | |
agent = EnhancedOceanClimateAgent() | |
def analyze_real_ocean_data(station_name, days_back, anomaly_sensitivity, use_real_data): | |
"""Main analysis function with real NOAA data""" | |
agent.anomaly_threshold = anomaly_sensitivity | |
if use_real_data: | |
print(f"Starting real data analysis for {station_name}") | |
# Fetch real NOAA data | |
raw_data, status_msg = agent.get_comprehensive_station_data(station_name, days_back) | |
if raw_data is None: | |
error_msg = f"Error fetching real data: {status_msg}" | |
print(error_msg) | |
return None, None, None, error_msg, "No alerts - data unavailable", None | |
# Process the data | |
data = agent.process_noaa_data(raw_data) | |
if data is None or data.empty: | |
error_msg = "No processable data available after fetching from NOAA" | |
print(error_msg) | |
return None, None, None, error_msg, "No alerts - data unavailable", None | |
data_source = f"Real NOAA data from {station_name} ({status_msg})" | |
print(f"{data_source}") | |
else: | |
print("🔧 Using synthetic demonstration data") | |
# Use synthetic data for demonstration | |
data = generate_synthetic_data(days_back) | |
data_source = f"🔧 Synthetic demonstration data ({days_back} days)" | |
# Generate analysis and alerts | |
analysis, alerts = agent.generate_climate_analysis(data, station_name) | |
# Create visualizations | |
fig1 = create_main_dashboard(data, agent) | |
fig2 = create_anomaly_plots(data, agent) | |
fig3 = create_correlation_plot(data) | |
# Format analysis text | |
analysis_text = format_analysis_results(analysis, data_source) | |
alerts_text = "\n".join([f"- {alert}" for alert in alerts]) | |
# Create CSV for download | |
csv_file_path = save_csv_temp(data) | |
print("Analysis completed successfully") | |
return fig1, fig2, fig3, analysis_text, alerts_text, csv_file_path | |
def generate_synthetic_data(days): | |
"""Generate synthetic data for demonstration""" | |
dates = pd.date_range(start=datetime.now() - timedelta(days=days), periods=days*24, freq='H') | |
# Synthetic water level with tidal patterns | |
tidal_pattern = 2 * np.sin(2 * np.pi * np.arange(len(dates)) / 12.42) # M2 tide | |
water_level = 100 + tidal_pattern + np.random.normal(0, 0.3, len(dates)) | |
# Water temperature with daily cycle | |
daily_temp_cycle = 2 * np.sin(2 * np.pi * np.arange(len(dates)) / 24) | |
water_temp = 15 + daily_temp_cycle + np.random.normal(0, 0.5, len(dates)) | |
# Wind patterns | |
wind_speed = 5 + 3 * np.sin(2 * np.pi * np.arange(len(dates)) / (24*3)) + np.random.normal(0, 1, len(dates)) | |
wind_direction = 180 + 45 * np.sin(2 * np.pi * np.arange(len(dates)) / (24*2)) + np.random.normal(0, 20, len(dates)) | |
return pd.DataFrame({ | |
'datetime': dates, | |
'water_level': water_level, | |
'water_temp': water_temp, | |
'wind_speed': np.maximum(0, wind_speed), | |
'wind_direction': wind_direction % 360, | |
'air_pressure': 1013 + np.random.normal(0, 10, len(dates)) | |
}) | |
def create_main_dashboard(data, agent): | |
"""Create main dashboard visualization""" | |
available_plots = [] | |
plot_data = [] | |
# Check what data is available | |
if 'water_level' in data.columns and not data['water_level'].isna().all(): | |
available_plots.append(('Water Level', 'water_level', 'blue')) | |
if 'water_temp' in data.columns and not data['water_temp'].isna().all(): | |
available_plots.append(('Water Temperature', 'water_temp', 'red')) | |
if 'air_temp' in data.columns and not data['air_temp'].isna().all(): | |
available_plots.append(('Air Temperature', 'air_temp', 'orange')) | |
if 'wind_speed' in data.columns and not data['wind_speed'].isna().all(): | |
available_plots.append(('Wind Speed', 'wind_speed', 'green')) | |
if 'air_pressure' in data.columns and not data['air_pressure'].isna().all(): | |
available_plots.append(('Air Pressure', 'air_pressure', 'purple')) | |
if not available_plots: | |
fig = go.Figure() | |
fig.add_annotation(text="No data available for visualization", | |
xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
return fig | |
# Create subplots based on available data | |
n_plots = len(available_plots) | |
rows = (n_plots + 1) // 2 # Ceiling division | |
cols = 2 if n_plots > 1 else 1 | |
fig = make_subplots( | |
rows=rows, cols=cols, | |
subplot_titles=[plot[0] for plot in available_plots], | |
vertical_spacing=0.1 | |
) | |
for i, (title, column, color) in enumerate(available_plots): | |
row = (i // 2) + 1 | |
col = (i % 2) + 1 | |
# Add main data line | |
fig.add_trace( | |
go.Scatter(x=data['datetime'], y=data[column], | |
name=title, line=dict(color=color)), | |
row=row, col=col | |
) | |
# Add anomalies if applicable | |
anomalies, _ = agent.detect_anomalies(data, column) | |
if anomalies.any(): | |
anomaly_data = data[anomalies] | |
fig.add_trace( | |
go.Scatter(x=anomaly_data['datetime'], y=anomaly_data[column], | |
mode='markers', name=f'{title} Anomalies', | |
marker=dict(color='red', size=6)), | |
row=row, col=col | |
) | |
fig.update_layout(height=300*rows, showlegend=False, title_text="Ocean and Atmospheric Data Dashboard") | |
return fig | |
def create_anomaly_plots(data, agent): | |
"""Create anomaly detection plots""" | |
available_cols = [col for col in ['water_level', 'water_temp', 'air_temp', 'wind_speed'] | |
if col in data.columns and not data[col].isna().all()] | |
if len(available_cols) == 0: | |
fig = go.Figure() | |
fig.add_annotation(text="No data available for anomaly detection", | |
xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
return fig | |
n_plots = min(len(available_cols), 2) # Maximum 2 plots | |
fig = make_subplots( | |
rows=1, cols=n_plots, | |
subplot_titles=[f'{col.replace("_", " ").title()} Anomalies' for col in available_cols[:n_plots]] | |
) | |
colors = ['blue', 'red', 'green', 'purple'] | |
for i, col in enumerate(available_cols[:n_plots]): | |
_, z_scores = agent.detect_anomalies(data, col) | |
fig.add_trace( | |
go.Scatter(x=data['datetime'], y=z_scores, | |
mode='lines', name=f'{col} Z-Score', | |
line=dict(color=colors[i % len(colors)])), | |
row=1, col=i+1 | |
) | |
fig.add_hline(y=agent.anomaly_threshold, line_dash="dash", line_color="red", row=1, col=i+1) | |
fig.update_layout(height=400, showlegend=False, title_text="Anomaly Detection Analysis") | |
return fig | |
def create_correlation_plot(data): | |
"""Create correlation heatmap""" | |
numeric_cols = [col for col in ['water_level', 'water_temp', 'air_temp', 'wind_speed', 'air_pressure'] | |
if col in data.columns and not data[col].isna().all()] | |
if len(numeric_cols) < 2: | |
# Return empty plot if insufficient data | |
fig = go.Figure() | |
fig.add_annotation(text="Insufficient data for correlation analysis", | |
xref="paper", yref="paper", x=0.5, y=0.5, showarrow=False) | |
return fig | |
corr_matrix = data[numeric_cols].corr() | |
fig = px.imshow(corr_matrix, | |
labels=dict(color="Correlation"), | |
color_continuous_scale='RdBu_r', | |
aspect="auto", | |
title="Parameter Correlations") | |
return fig | |
def format_analysis_results(analysis, data_source): | |
"""Format analysis results for display""" | |
result = f"### {data_source}\n\n**Key Trends:**\n" | |
if not analysis: | |
result += "- No analysis data available\n" | |
return result | |
for key, value in analysis.items(): | |
if 'trend' in key: | |
param = key.replace('_trend', '').replace('_', ' ').title() | |
unit = 'cm/day' if 'water_level' in key else '°C/day' if 'temp' in key else 'units/day' | |
result += f"- {param}: {value:.3f} {unit}\n" | |
elif 'anomaly_frequency' in key: | |
param = key.replace('_anomaly_frequency', '').replace('_', ' ').title() | |
result += f"- {param} anomalies: {value:.1f}%\n" | |
return result | |
def save_csv_temp(data): | |
"""Save data to temporary CSV file""" | |
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", mode='w', newline='', encoding='utf-8') | |
data.to_csv(tmp.name, index=False) | |
tmp.close() | |
return tmp.name | |
# Create Gradio interface | |
with gr.Blocks(title="Enhanced Ocean Climate Monitoring AI Agent", theme=gr.themes.Ocean()) as demo: | |
gr.Markdown(""" | |
# Enhanced Ocean Climate Monitoring AI Agent | |
### Real-time Analysis with NOAA Data Integration | |
This enhanced AI agent can fetch real ocean data from NOAA stations or use synthetic data for demonstration. | |
Monitor water levels, temperature, currents, and detect climate anomalies at major coastal locations. | |
**Note:** NOAA stations may not have all data types available. The system will use whatever data is accessible. | |
""") | |
with gr.Row(): | |
with gr.Column(scale=1): | |
gr.Markdown("### Configuration") | |
station_name = gr.Dropdown( | |
choices=list(agent.default_stations.keys()), | |
value="San Francisco, CA", | |
label="NOAA Station Location" | |
) | |
days_back = gr.Slider( | |
minimum=1, | |
maximum=30, | |
value=7, | |
step=1, | |
label="Days of Historical Data", | |
info="Shorter periods are more reliable" | |
) | |
anomaly_sensitivity = gr.Slider( | |
minimum=1.0, | |
maximum=3.0, | |
value=2.0, | |
step=0.1, | |
label="Anomaly Detection Sensitivity" | |
) | |
use_real_data = gr.Checkbox( | |
label="Use Real NOAA Data", | |
value=True, | |
info="Uncheck to use synthetic data" | |
) | |
analyze_btn = gr.Button("Analyze Ocean Data", variant="primary") | |
with gr.Column(scale=2): | |
gr.Markdown("### Climate Alerts") | |
alerts_output = gr.Markdown() | |
with gr.Row(): | |
analysis_output = gr.Markdown() | |
with gr.Tab("Main Dashboard"): | |
dashboard_plot = gr.Plot() | |
with gr.Tab("Anomaly Detection"): | |
anomaly_plot = gr.Plot() | |
with gr.Tab("Correlations"): | |
correlation_plot = gr.Plot() | |
with gr.Tab("Data Export"): | |
gr.Markdown("### Download Analyzed Data") | |
csv_output = gr.File(label="Download CSV Data") | |
gr.Markdown("*Note: Real NOAA data usage is subject to their terms of service*") | |
# Set up the analysis function | |
analyze_btn.click( | |
fn=analyze_real_ocean_data, | |
inputs=[station_name, days_back, anomaly_sensitivity, use_real_data], | |
outputs=[dashboard_plot, anomaly_plot, correlation_plot, analysis_output, alerts_output, csv_output] | |
) | |
# Auto-run on startup with synthetic data | |
demo.load( | |
fn=analyze_real_ocean_data, | |
inputs=[ | |
gr.Text(value="San Francisco, CA", visible=False), | |
gr.Number(value=7, visible=False), | |
gr.Number(value=2.0, visible=False), | |
gr.Checkbox(value=False, visible=False) # Start with synthetic data | |
], | |
outputs=[dashboard_plot, anomaly_plot, correlation_plot, analysis_output, alerts_output, csv_output] | |
) | |
if __name__ == "__main__": | |
demo.launch() |