Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| import requests | |
| import json | |
| from datetime import datetime | |
| def get_series_name_and_unit(series, dataset_description): | |
| """ | |
| Extract the name and unit from a time series using its dataset description. | |
| Args: | |
| series: Dictionary containing series data | |
| dataset_description: Dictionary containing dataset field descriptions | |
| Returns: | |
| tuple: (name, unit) of the series | |
| """ | |
| field_id = series['datacellar:datasetFieldID'] | |
| field = next((f for f in dataset_description['datacellar:datasetFields'] | |
| if f['datacellar:datasetFieldID'] == field_id), None) | |
| name = field['datacellar:fieldName'] if field else f'Series {field_id}' | |
| unit = field['datacellar:type']['datacellar:unitText'] if field else 'Unknown' | |
| # Override name if metadata contains loadType | |
| if 'datacellar:timeSeriesMetadata' in series: | |
| metadata = series['datacellar:timeSeriesMetadata'] | |
| if 'datacellar:loadType' in metadata: | |
| name = metadata['datacellar:loadType'] | |
| return name, unit | |
| def process_series(series, dataset_description, is_input=False): | |
| """ | |
| Process a single time series into a pandas DataFrame. | |
| Args: | |
| series: Dictionary containing series data | |
| dataset_description: Dictionary containing dataset field descriptions | |
| is_input: Boolean indicating if this is input data | |
| Returns: | |
| tuple: (DataFrame, unit, name) of the processed series | |
| """ | |
| name, unit = get_series_name_and_unit(series, dataset_description) | |
| df = pd.DataFrame(series['datacellar:dataPoints']) | |
| # Convert timestamp to datetime and ensure values are numeric | |
| df['datacellar:timeStamp'] = pd.to_datetime(df['datacellar:timeStamp']) | |
| df['datacellar:value'] = pd.to_numeric(df['datacellar:value'], errors='coerce') | |
| # Add series identifier | |
| df['series_id'] = f'{name} (Input)' if is_input else name | |
| return df, unit, name | |
| def load_and_process_data(json_data, input_data=None): | |
| """ | |
| Load and process time series from the JSON data, filtering out empty series. | |
| """ | |
| series_by_unit = {} | |
| try: | |
| dataset_description = json_data['datacellar:datasetSelfDescription'] | |
| except: | |
| dataset_description = { | |
| "@type": "datacellar:DatasetField", | |
| "datacellar:datasetFieldID": 0, | |
| "datacellar:fieldName": "anomaly", | |
| "datacellar:description": "Anomalies", | |
| "datacellar:type": { | |
| "@type": "datacellar:boolean", | |
| "datacellar:unitText": "-" | |
| } | |
| } | |
| # Process output series | |
| try: | |
| for series in json_data['datacellar:timeSeriesList']: | |
| # Check if series has any data points | |
| if series.get('datacellar:dataPoints'): | |
| df, unit, _ = process_series(series, dataset_description) | |
| # Additional check for non-empty DataFrame | |
| if not df.empty and df['datacellar:value'].notna().any(): | |
| if unit not in series_by_unit: | |
| series_by_unit[unit] = [] | |
| series_by_unit[unit].append(df) | |
| except Exception as e: | |
| st.error(f"Error processing series: {str(e)}") | |
| # Process input series if provided | |
| if input_data: | |
| input_description = input_data['datacellar:datasetSelfDescription'] | |
| for series in input_data['datacellar:timeSeriesList']: | |
| if series.get('datacellar:dataPoints'): | |
| df, unit, _ = process_series(series, input_description, is_input=True) | |
| if not df.empty and df['datacellar:value'].notna().any(): | |
| if unit not in series_by_unit: | |
| series_by_unit[unit] = [] | |
| series_by_unit[unit].append(df) | |
| # Concatenate and filter out units with no valid data | |
| result = {} | |
| for unit, dfs in series_by_unit.items(): | |
| if dfs: # Check if there are any DataFrames for this unit | |
| combined_df = pd.concat(dfs) | |
| if not combined_df.empty and combined_df['datacellar:value'].notna().any(): | |
| result[unit] = combined_df | |
| return result | |
| def create_time_series_plot(df, unit, service_type=None,fig=None): | |
| """ | |
| Create visualization for time series data, handling empty series appropriately. | |
| """ | |
| if service_type == "Anomaly Detection": | |
| if not fig: | |
| fig = go.Figure() | |
| # Filter for non-empty input data | |
| input_data = df[df['series_id'].str.contains('Input')] | |
| input_data = input_data[input_data['datacellar:value'].notna()] | |
| if not input_data.empty: | |
| fig.add_trace(go.Scatter( | |
| x=input_data['datacellar:timeStamp'], | |
| y=input_data['datacellar:value'], | |
| mode='lines', | |
| name='Energy Consumption', | |
| line=dict(color='blue') | |
| )) | |
| # Handle anomalies | |
| anomalies = df[(~df['series_id'].str.contains('Output')) & | |
| (df['datacellar:value'] == True) & | |
| (df['datacellar:value'].notna())] | |
| if not anomalies.empty: | |
| anomaly_values = [] | |
| for timestamp in anomalies['datacellar:timeStamp']: | |
| value = input_data.loc[input_data['datacellar:timeStamp'] == timestamp, 'datacellar:value'] | |
| anomaly_values.append(value.iloc[0] if not value.empty else None) | |
| # fig.add_trace(go.Scatter( | |
| # x=anomalies['datacellar:timeStamp'], | |
| # y=anomaly_values, | |
| # mode='markers', | |
| # name='Anomalies', | |
| # marker=dict(color='red', size=10) | |
| # )) | |
| fig.update_layout( | |
| title=f'Time Series Data with Anomalies ({unit})', | |
| xaxis_title="Time", | |
| yaxis_title=f"Value ({unit})", | |
| hovermode='x unified', | |
| legend_title="Series" | |
| ) | |
| return fig | |
| else: | |
| # Filter out series with no valid data | |
| valid_series = [] | |
| for series_id in df['series_id'].unique(): | |
| series_data = df[df['series_id'] == series_id] | |
| if not series_data.empty and series_data['datacellar:value'].notna().any(): | |
| valid_series.append(series_id) | |
| # Create plot only for valid series | |
| if valid_series: | |
| filtered_df = df[df['series_id'].isin(valid_series)] | |
| return px.line( | |
| filtered_df, | |
| x='datacellar:timeStamp', | |
| y='datacellar:value', | |
| color='series_id', | |
| title=f'Time Series Data ({unit})' | |
| ).update_layout( | |
| xaxis_title="Time", | |
| yaxis_title=f"Value ({unit})", | |
| hovermode='x unified', | |
| legend_title="Series" | |
| ) | |
| else: | |
| # Return None or an empty figure if no valid series | |
| return None | |
| def display_statistics(dfs_by_unit): | |
| """ | |
| Display statistics only for non-empty series. | |
| """ | |
| for unit, df in dfs_by_unit.items(): | |
| st.write(f"## Measurements in {unit}") | |
| for series_id in df['series_id'].unique(): | |
| series_data = df[df['series_id'] == series_id] | |
| # Check if series has valid data | |
| if not series_data.empty and series_data['datacellar:value'].notna().any(): | |
| st.write(f"### {series_id}") | |
| cols = st.columns(4) | |
| metrics = [ | |
| ("Average", series_data['datacellar:value'].mean()), | |
| ("Max", series_data['datacellar:value'].max()), | |
| ("Min", series_data['datacellar:value'].min()), | |
| ("Total", series_data['datacellar:value'].sum() * 6/3600) | |
| ] | |
| for col, (label, value) in zip(cols, metrics): | |
| with col: | |
| unit_suffix = "h" if label == "Total" else "" | |
| st.metric(label, f"{value:.2f} {unit}{unit_suffix}") | |
| def call_api(file_content, token, service_endpoint): | |
| """ | |
| Call the analysis API with the provided data. | |
| Args: | |
| file_content: Binary content of the JSON file | |
| token: API authentication token | |
| service_endpoint: String indicating which API endpoint to call | |
| Returns: | |
| dict: JSON response from the API or None if the call fails | |
| """ | |
| try: | |
| url = f'https://loki.linksfoundation.com/datacellar/{service_endpoint}' | |
| response = requests.post( | |
| url, | |
| headers={'Authorization': f'Bearer {token}'}, | |
| files={'input_file': ('data.json', file_content, 'application/json')} | |
| ) | |
| if response.status_code == 401: | |
| st.error("Authentication failed. Please check your API token.") | |
| return None | |
| return response.json() | |
| except Exception as e: | |
| st.error(f"API Error: {str(e)}") | |
| return None | |
| def get_dataset_type(json_data): | |
| """ | |
| Determine the type of dataset from its description. | |
| Args: | |
| json_data: Dictionary containing the JSON data | |
| Returns: | |
| str: "production", "consumption", or "other" | |
| """ | |
| desc = json_data.get('datacellar:description', '').lower() | |
| if 'production' in desc: | |
| return "production" | |
| elif 'consumption' in desc: | |
| return "consumption" | |
| return "other" | |
| def get_forecast_horizon(json_data): | |
| """ | |
| Determine the forecast horizon from dataset description. | |
| Args: | |
| json_data: Dictionary containing the JSON data | |
| Returns: | |
| str: "long", "short", or None | |
| """ | |
| desc = json_data.get('datacellar:description', '').lower() | |
| if 'long term' in desc: | |
| return "long" | |
| elif 'short term' in desc: | |
| return "short" | |
| return None |