neural-os / analyze_analytics.py
da03
.
b9e6b75
#!/usr/bin/env python3
"""
Analytics Analysis Tool for Neural OS Multi-GPU System
This script analyzes the structured analytics logs to generate reports and insights.
Usage: python analyze_analytics.py [--since HOURS] [--type TYPE]
"""
import json
import argparse
import glob
import time
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import statistics
class AnalyticsAnalyzer:
def __init__(self, since_hours=24):
self.since_timestamp = time.time() - (since_hours * 3600)
self.data = {
'gpu_metrics': [],
'connection_events': [],
'queue_metrics': [],
'ip_stats': []
}
self.load_data()
def load_data(self):
"""Load all analytics data files"""
file_types = {
'gpu_metrics': 'gpu_metrics_*.jsonl',
'connection_events': 'connection_events_*.jsonl',
'queue_metrics': 'queue_metrics_*.jsonl',
'ip_stats': 'ip_stats_*.jsonl'
}
for data_type, pattern in file_types.items():
files = glob.glob(pattern)
for file_path in files:
try:
with open(file_path, 'r') as f:
for line in f:
try:
record = json.loads(line.strip())
if record.get('type') != 'metadata' and record.get('timestamp', 0) >= self.since_timestamp:
self.data[data_type].append(record)
except json.JSONDecodeError:
continue
except FileNotFoundError:
continue
print(f"Loaded data from the last {(time.time() - self.since_timestamp) / 3600:.1f} hours:")
for data_type, records in self.data.items():
print(f" {data_type}: {len(records)} records")
print()
def analyze_gpu_utilization(self):
"""Analyze GPU utilization patterns"""
print("🖥️ GPU UTILIZATION ANALYSIS")
print("=" * 40)
gpu_records = [r for r in self.data['gpu_metrics'] if r.get('type') == 'gpu_status']
if not gpu_records:
print("No GPU utilization data found.")
return
utilizations = [r['utilization_percent'] for r in gpu_records]
total_gpus = gpu_records[-1].get('total_gpus', 0)
print(f"Total GPUs: {total_gpus}")
print(f"Average utilization: {statistics.mean(utilizations):.1f}%")
print(f"Peak utilization: {max(utilizations):.1f}%")
print(f"Minimum utilization: {min(utilizations):.1f}%")
print(f"Utilization std dev: {statistics.stdev(utilizations) if len(utilizations) > 1 else 0:.1f}%")
# Utilization distribution
high_util = sum(1 for u in utilizations if u >= 80)
med_util = sum(1 for u in utilizations if 40 <= u < 80)
low_util = sum(1 for u in utilizations if u < 40)
print(f"\nUtilization distribution:")
print(f" High (≥80%): {high_util} samples ({high_util/len(utilizations)*100:.1f}%)")
print(f" Medium (40-79%): {med_util} samples ({med_util/len(utilizations)*100:.1f}%)")
print(f" Low (<40%): {low_util} samples ({low_util/len(utilizations)*100:.1f}%)")
print()
def analyze_connections(self):
"""Analyze connection patterns"""
print("🔗 CONNECTION ANALYSIS")
print("=" * 40)
opens = [r for r in self.data['connection_events'] if r.get('type') == 'connection_open']
closes = [r for r in self.data['connection_events'] if r.get('type') == 'connection_close']
if not opens and not closes:
print("No connection data found.")
return
print(f"Total connections opened: {len(opens)}")
print(f"Total connections closed: {len(closes)}")
if closes:
durations = [r['duration'] for r in closes]
interactions = [r['interactions'] for r in closes]
reasons = [r['reason'] for r in closes]
print(f"\nSession durations:")
print(f" Average: {statistics.mean(durations):.1f}s")
print(f" Median: {statistics.median(durations):.1f}s")
print(f" Max: {max(durations):.1f}s")
print(f" Min: {min(durations):.1f}s")
print(f"\nInteractions per session:")
print(f" Average: {statistics.mean(interactions):.1f}")
print(f" Median: {statistics.median(interactions):.1f}")
print(f" Max: {max(interactions)}")
print(f"\nSession end reasons:")
reason_counts = Counter(reasons)
for reason, count in reason_counts.most_common():
print(f" {reason}: {count} ({count/len(closes)*100:.1f}%)")
print()
def analyze_queue_performance(self):
"""Analyze queue performance"""
print("📝 QUEUE PERFORMANCE ANALYSIS")
print("=" * 40)
bypasses = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_bypass']
waits = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_wait']
statuses = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_status']
limit_applications = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_limits_applied']
total_users = len(bypasses) + len(waits)
if total_users == 0:
print("No queue data found.")
return
print(f"Total users processed: {total_users}")
print(f"Users bypassed queue: {len(bypasses)} ({len(bypasses)/total_users*100:.1f}%)")
print(f"Users waited in queue: {len(waits)} ({len(waits)/total_users*100:.1f}%)")
if waits:
wait_times = [r['wait_time'] for r in waits]
positions = [r['queue_position'] for r in waits]
print(f"\nWait time statistics:")
print(f" Average wait: {statistics.mean(wait_times):.1f}s")
print(f" Median wait: {statistics.median(wait_times):.1f}s")
print(f" Max wait: {max(wait_times):.1f}s")
print(f" Average queue position: {statistics.mean(positions):.1f}")
if statuses:
queue_sizes = [r['queue_size'] for r in statuses]
# Handle both old 'estimated_wait' and new 'maximum_wait' fields for backwards compatibility
maximum_waits = []
for r in statuses:
if r['queue_size'] > 0:
if 'maximum_wait' in r:
maximum_waits.append(r['maximum_wait'])
elif 'estimated_wait' in r:
maximum_waits.append(r['estimated_wait'])
print(f"\nQueue size statistics:")
print(f" Average queue size: {statistics.mean(queue_sizes):.1f}")
print(f" Max queue size: {max(queue_sizes)}")
if maximum_waits:
print(f" Average maximum wait: {statistics.mean(maximum_waits):.1f}s")
print(f" Peak maximum wait: {max(maximum_waits):.1f}s")
if limit_applications:
total_affected = sum(r['affected_sessions'] for r in limit_applications)
print(f"\nQueue limit applications:")
print(f" Times limits applied to existing sessions: {len(limit_applications)}")
print(f" Total sessions affected: {total_affected}")
print(f" Average sessions affected per application: {total_affected/len(limit_applications):.1f}")
print()
def analyze_ip_usage(self):
"""Analyze IP address usage patterns"""
print("🌍 IP USAGE ANALYSIS")
print("=" * 40)
ip_records = self.data['ip_stats']
if not ip_records:
print("No IP usage data found.")
return
# Get latest connection counts per IP
latest_ip_data = {}
for record in ip_records:
if record.get('type') == 'ip_update':
ip = record['ip_address']
latest_ip_data[ip] = record['connection_count']
if not latest_ip_data:
print("No IP connection data found.")
return
total_connections = sum(latest_ip_data.values())
unique_ips = len(latest_ip_data)
print(f"Total unique IP addresses: {unique_ips}")
print(f"Total connections: {total_connections}")
print(f"Average connections per IP: {total_connections/unique_ips:.1f}")
print(f"\nTop IP addresses by connection count:")
sorted_ips = sorted(latest_ip_data.items(), key=lambda x: x[1], reverse=True)
for i, (ip, count) in enumerate(sorted_ips[:10], 1):
percentage = count / total_connections * 100
print(f" {i:2d}. {ip}: {count} connections ({percentage:.1f}%)")
print()
def generate_summary_report(self):
"""Generate a comprehensive summary report"""
print("📊 SYSTEM SUMMARY REPORT")
print("=" * 50)
# Time range
start_time = datetime.fromtimestamp(self.since_timestamp)
end_time = datetime.now()
duration_hours = (end_time.timestamp() - self.since_timestamp) / 3600
print(f"Report period: {start_time.strftime('%Y-%m-%d %H:%M:%S')} to {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
print(f"Duration: {duration_hours:.1f} hours")
print()
self.analyze_gpu_utilization()
self.analyze_connections()
self.analyze_queue_performance()
self.analyze_ip_usage()
def main():
parser = argparse.ArgumentParser(description='Analyze Neural OS analytics data')
parser.add_argument('--since', type=float, default=24,
help='Analyze data from the last N hours (default: 24)')
parser.add_argument('--type', choices=['gpu', 'connections', 'queue', 'ip', 'summary'],
default='summary', help='Type of analysis to perform')
args = parser.parse_args()
analyzer = AnalyticsAnalyzer(since_hours=args.since)
if args.type == 'gpu':
analyzer.analyze_gpu_utilization()
elif args.type == 'connections':
analyzer.analyze_connections()
elif args.type == 'queue':
analyzer.analyze_queue_performance()
elif args.type == 'ip':
analyzer.analyze_ip_usage()
else:
analyzer.generate_summary_report()
if __name__ == '__main__':
main()