Spaces:
Runtime error
Runtime error
#!/usr/bin/env python3 | |
""" | |
Analytics Analysis Tool for Neural OS Multi-GPU System | |
This script analyzes the structured analytics logs to generate reports and insights. | |
Usage: python analyze_analytics.py [--since HOURS] [--type TYPE] | |
""" | |
import json | |
import argparse | |
import glob | |
import time | |
from collections import defaultdict, Counter | |
from datetime import datetime, timedelta | |
import statistics | |
class AnalyticsAnalyzer: | |
def __init__(self, since_hours=24): | |
self.since_timestamp = time.time() - (since_hours * 3600) | |
self.data = { | |
'gpu_metrics': [], | |
'connection_events': [], | |
'queue_metrics': [], | |
'ip_stats': [] | |
} | |
self.load_data() | |
def load_data(self): | |
"""Load all analytics data files""" | |
file_types = { | |
'gpu_metrics': 'gpu_metrics_*.jsonl', | |
'connection_events': 'connection_events_*.jsonl', | |
'queue_metrics': 'queue_metrics_*.jsonl', | |
'ip_stats': 'ip_stats_*.jsonl' | |
} | |
for data_type, pattern in file_types.items(): | |
files = glob.glob(pattern) | |
for file_path in files: | |
try: | |
with open(file_path, 'r') as f: | |
for line in f: | |
try: | |
record = json.loads(line.strip()) | |
if record.get('type') != 'metadata' and record.get('timestamp', 0) >= self.since_timestamp: | |
self.data[data_type].append(record) | |
except json.JSONDecodeError: | |
continue | |
except FileNotFoundError: | |
continue | |
print(f"Loaded data from the last {(time.time() - self.since_timestamp) / 3600:.1f} hours:") | |
for data_type, records in self.data.items(): | |
print(f" {data_type}: {len(records)} records") | |
print() | |
def analyze_gpu_utilization(self): | |
"""Analyze GPU utilization patterns""" | |
print("🖥️ GPU UTILIZATION ANALYSIS") | |
print("=" * 40) | |
gpu_records = [r for r in self.data['gpu_metrics'] if r.get('type') == 'gpu_status'] | |
if not gpu_records: | |
print("No GPU utilization data found.") | |
return | |
utilizations = [r['utilization_percent'] for r in gpu_records] | |
total_gpus = gpu_records[-1].get('total_gpus', 0) | |
print(f"Total GPUs: {total_gpus}") | |
print(f"Average utilization: {statistics.mean(utilizations):.1f}%") | |
print(f"Peak utilization: {max(utilizations):.1f}%") | |
print(f"Minimum utilization: {min(utilizations):.1f}%") | |
print(f"Utilization std dev: {statistics.stdev(utilizations) if len(utilizations) > 1 else 0:.1f}%") | |
# Utilization distribution | |
high_util = sum(1 for u in utilizations if u >= 80) | |
med_util = sum(1 for u in utilizations if 40 <= u < 80) | |
low_util = sum(1 for u in utilizations if u < 40) | |
print(f"\nUtilization distribution:") | |
print(f" High (≥80%): {high_util} samples ({high_util/len(utilizations)*100:.1f}%)") | |
print(f" Medium (40-79%): {med_util} samples ({med_util/len(utilizations)*100:.1f}%)") | |
print(f" Low (<40%): {low_util} samples ({low_util/len(utilizations)*100:.1f}%)") | |
print() | |
def analyze_connections(self): | |
"""Analyze connection patterns""" | |
print("🔗 CONNECTION ANALYSIS") | |
print("=" * 40) | |
opens = [r for r in self.data['connection_events'] if r.get('type') == 'connection_open'] | |
closes = [r for r in self.data['connection_events'] if r.get('type') == 'connection_close'] | |
if not opens and not closes: | |
print("No connection data found.") | |
return | |
print(f"Total connections opened: {len(opens)}") | |
print(f"Total connections closed: {len(closes)}") | |
if closes: | |
durations = [r['duration'] for r in closes] | |
interactions = [r['interactions'] for r in closes] | |
reasons = [r['reason'] for r in closes] | |
print(f"\nSession durations:") | |
print(f" Average: {statistics.mean(durations):.1f}s") | |
print(f" Median: {statistics.median(durations):.1f}s") | |
print(f" Max: {max(durations):.1f}s") | |
print(f" Min: {min(durations):.1f}s") | |
print(f"\nInteractions per session:") | |
print(f" Average: {statistics.mean(interactions):.1f}") | |
print(f" Median: {statistics.median(interactions):.1f}") | |
print(f" Max: {max(interactions)}") | |
print(f"\nSession end reasons:") | |
reason_counts = Counter(reasons) | |
for reason, count in reason_counts.most_common(): | |
print(f" {reason}: {count} ({count/len(closes)*100:.1f}%)") | |
print() | |
def analyze_queue_performance(self): | |
"""Analyze queue performance""" | |
print("📝 QUEUE PERFORMANCE ANALYSIS") | |
print("=" * 40) | |
bypasses = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_bypass'] | |
waits = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_wait'] | |
statuses = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_status'] | |
limit_applications = [r for r in self.data['queue_metrics'] if r.get('type') == 'queue_limits_applied'] | |
total_users = len(bypasses) + len(waits) | |
if total_users == 0: | |
print("No queue data found.") | |
return | |
print(f"Total users processed: {total_users}") | |
print(f"Users bypassed queue: {len(bypasses)} ({len(bypasses)/total_users*100:.1f}%)") | |
print(f"Users waited in queue: {len(waits)} ({len(waits)/total_users*100:.1f}%)") | |
if waits: | |
wait_times = [r['wait_time'] for r in waits] | |
positions = [r['queue_position'] for r in waits] | |
print(f"\nWait time statistics:") | |
print(f" Average wait: {statistics.mean(wait_times):.1f}s") | |
print(f" Median wait: {statistics.median(wait_times):.1f}s") | |
print(f" Max wait: {max(wait_times):.1f}s") | |
print(f" Average queue position: {statistics.mean(positions):.1f}") | |
if statuses: | |
queue_sizes = [r['queue_size'] for r in statuses] | |
# Handle both old 'estimated_wait' and new 'maximum_wait' fields for backwards compatibility | |
maximum_waits = [] | |
for r in statuses: | |
if r['queue_size'] > 0: | |
if 'maximum_wait' in r: | |
maximum_waits.append(r['maximum_wait']) | |
elif 'estimated_wait' in r: | |
maximum_waits.append(r['estimated_wait']) | |
print(f"\nQueue size statistics:") | |
print(f" Average queue size: {statistics.mean(queue_sizes):.1f}") | |
print(f" Max queue size: {max(queue_sizes)}") | |
if maximum_waits: | |
print(f" Average maximum wait: {statistics.mean(maximum_waits):.1f}s") | |
print(f" Peak maximum wait: {max(maximum_waits):.1f}s") | |
if limit_applications: | |
total_affected = sum(r['affected_sessions'] for r in limit_applications) | |
print(f"\nQueue limit applications:") | |
print(f" Times limits applied to existing sessions: {len(limit_applications)}") | |
print(f" Total sessions affected: {total_affected}") | |
print(f" Average sessions affected per application: {total_affected/len(limit_applications):.1f}") | |
print() | |
def analyze_ip_usage(self): | |
"""Analyze IP address usage patterns""" | |
print("🌍 IP USAGE ANALYSIS") | |
print("=" * 40) | |
ip_records = self.data['ip_stats'] | |
if not ip_records: | |
print("No IP usage data found.") | |
return | |
# Get latest connection counts per IP | |
latest_ip_data = {} | |
for record in ip_records: | |
if record.get('type') == 'ip_update': | |
ip = record['ip_address'] | |
latest_ip_data[ip] = record['connection_count'] | |
if not latest_ip_data: | |
print("No IP connection data found.") | |
return | |
total_connections = sum(latest_ip_data.values()) | |
unique_ips = len(latest_ip_data) | |
print(f"Total unique IP addresses: {unique_ips}") | |
print(f"Total connections: {total_connections}") | |
print(f"Average connections per IP: {total_connections/unique_ips:.1f}") | |
print(f"\nTop IP addresses by connection count:") | |
sorted_ips = sorted(latest_ip_data.items(), key=lambda x: x[1], reverse=True) | |
for i, (ip, count) in enumerate(sorted_ips[:10], 1): | |
percentage = count / total_connections * 100 | |
print(f" {i:2d}. {ip}: {count} connections ({percentage:.1f}%)") | |
print() | |
def generate_summary_report(self): | |
"""Generate a comprehensive summary report""" | |
print("📊 SYSTEM SUMMARY REPORT") | |
print("=" * 50) | |
# Time range | |
start_time = datetime.fromtimestamp(self.since_timestamp) | |
end_time = datetime.now() | |
duration_hours = (end_time.timestamp() - self.since_timestamp) / 3600 | |
print(f"Report period: {start_time.strftime('%Y-%m-%d %H:%M:%S')} to {end_time.strftime('%Y-%m-%d %H:%M:%S')}") | |
print(f"Duration: {duration_hours:.1f} hours") | |
print() | |
self.analyze_gpu_utilization() | |
self.analyze_connections() | |
self.analyze_queue_performance() | |
self.analyze_ip_usage() | |
def main(): | |
parser = argparse.ArgumentParser(description='Analyze Neural OS analytics data') | |
parser.add_argument('--since', type=float, default=24, | |
help='Analyze data from the last N hours (default: 24)') | |
parser.add_argument('--type', choices=['gpu', 'connections', 'queue', 'ip', 'summary'], | |
default='summary', help='Type of analysis to perform') | |
args = parser.parse_args() | |
analyzer = AnalyticsAnalyzer(since_hours=args.since) | |
if args.type == 'gpu': | |
analyzer.analyze_gpu_utilization() | |
elif args.type == 'connections': | |
analyzer.analyze_connections() | |
elif args.type == 'queue': | |
analyzer.analyze_queue_performance() | |
elif args.type == 'ip': | |
analyzer.analyze_ip_usage() | |
else: | |
analyzer.generate_summary_report() | |
if __name__ == '__main__': | |
main() |