ceshi / vps_monitor.py
xjf6b's picture
Update vps_monitor.py
b613ce1 verified
raw
history blame
7.61 kB
import paramiko
import schedule
import time
import os
import sys
from flask import Flask, jsonify, render_template_string
from threading import Thread
import logging
from datetime import timedelta
app = Flask(__name__)
vps_status = {}
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.StreamHandler(sys.stderr)
]
)
logger = logging.getLogger()
def get_vps_configs():
configs = []
index = 1
while True:
hostname = os.environ.get(f'HOSTNAME_{index}')
if not hostname:
break
username = os.environ.get(f'USERNAME_{index}')
password = os.environ.get(f'PASSWORD_{index}')
script_paths = []
script_index = 1
while True:
script_path = os.environ.get(f'SCRIPT_PATHS_{index}_{script_index}')
if not script_path:
break
script_paths.append(script_path.strip())
script_index += 1
for script_path in script_paths:
configs.append({
'index': index,
'hostname': hostname,
'username': username,
'password': password,
'script_path': script_path
})
index += 1
return configs
def parse_runtime(etime):
parts = etime.split('-')
days = int(parts[0]) if len(parts) > 1 else 0
time_parts = parts[-1].split(':')
if len(time_parts) == 3:
hours, minutes, seconds = map(int, time_parts)
elif len(time_parts) == 2:
hours, minutes, seconds = int(time_parts[0]), int(time_parts[1]), 0
else:
return "0:00:00"
return str(timedelta(days=days, hours=hours, minutes=minutes, seconds=seconds))
def check_and_run_script(config):
logger.info(f"Checking VPS {config['index']}: {config['hostname']} - {config['script_path']}")
client = None
try:
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=config['hostname'], username=config['username'], password=config['password'], port=22)
script_path = config['script_path']
script_name = os.path.basename(script_path)
key = f"{config['hostname']}:{script_name}"
last_pid = vps_status.get(key, {}).get('pid', None)
check_command = f"ps -p {last_pid} -o pid=,etime=,args=" if last_pid else f"ps aux | grep '{script_path}' | grep -v grep"
stdin, stdout, stderr = client.exec_command(check_command)
output = stdout.read().decode('utf-8').strip()
if output and (last_pid or script_path in output):
parts = output.split()
if last_pid:
pid, runtime = last_pid, parse_runtime(parts[1]) if len(parts) > 1 else "0:00:00"
else:
pid, runtime = parts[1] if len(parts) > 1 else "Unknown", parse_runtime(parts[9]) if len(parts) > 9 else "0:00:00"
status = "Running"
else:
logger.info(f"Script {script_name} not running. Attempting to restart.")
stdin, stdout, stderr = client.exec_command(f"nohup /bin/sh {script_path} > /dev/null 2>&1 & echo $!")
new_pid = stdout.read().decode('utf-8').strip()
if new_pid.isdigit():
pid, runtime, status = new_pid, "0:00:00", "Restarted"
else:
pid, runtime, status = "N/A", "N/A", "Restart Failed"
vps_status[key] = {
'index': config['index'],
'status': status,
'last_check': time.strftime('%Y-%m-%d %H:%M:%S'),
'username': config['username'],
'script_name': script_name,
'runtime': runtime,
'pid': pid
}
except Exception as e:
logger.error(f"Error occurred while checking VPS {config['index']} - {config['hostname']} - {script_name}: {str(e)}")
vps_status[f"{config['hostname']}:{script_name}"] = {
'index': config['index'],
'status': f"Error: {str(e)}",
'last_check': time.strftime('%Y-%m-%d %H:%M:%S'),
'username': config['username'],
'script_name': script_name,
'runtime': "N/A",
'pid': "N/A"
}
finally:
if client:
client.close()
def check_all_vps():
logger.info("Starting VPS check")
for config in get_vps_configs():
check_and_run_script(config)
table = "+---------+-----------------------+------------------+----------+-------------------------+----------+----------+-------+\n"
table += "| Index | Hostname | Script Name | Status | Last Check | Username | Runtime | PID |\n"
table += "+---------+-----------------------+------------------+----------+-------------------------+----------+----------+-------+\n"
for key, status in vps_status.items():
hostname, script_name = key.split(':')
table += "| {:<7} | {:<21} | {:<16} | {:<8} | {:<23} | {:<8} | {:<8} | {:<5} |\n".format(
status['index'], hostname[:21], script_name[:16], status['status'][:8],
status['last_check'], status['username'][:8], status['runtime'], status['pid'][:5]
)
table += "+---------+-----------------------+------------------+----------+-------------------------+----------+----------+-------+\n"
logger.info("\n" + table)
@app.route('/')
def index():
html = '''
<h1>VPS Status Overview</h1>
<table border="1">
<tr>
<th>Index</th>
<th>Hostname</th>
<th>Script Name</th>
<th>Status</th>
<th>Last Check</th>
<th>Username</th>
<th>Runtime</th>
<th>PID</th>
</tr>
{% for key, data in vps_status.items() %}
<tr>
<td>{{ data.index }}</td>
<td><a href="/status/{{ key }}">{{ key.split(':')[0] }}</a></td>
<td>{{ data.script_name }}</td>
<td>{{ data.status }}</td>
<td>{{ data.last_check }}</td>
<td>{{ data.username }}</td>
<td>{{ data.runtime }}</td>
<td>{{ data.pid }}</td>
</tr>
{% endfor %}
</table>
'''
return render_template_string(html, vps_status=vps_status)
@app.route('/status/<path:key>')
def vps_status_detail(key):
return jsonify(vps_status[key]) if key in vps_status else (jsonify({"error": "VPS or script not found"}), 404)
@app.route('/health')
def health_check():
return jsonify({"status": "healthy", "uptime": time.time() - start_time}), 200
def run_flask():
app.run(host='0.0.0.0', port=8080)
def main():
global start_time
start_time = time.time()
logger.info("===== VPS monitoring script is starting =====")
Thread(target=run_flask).start()
logger.info("Flask server started in background")
check_all_vps()
schedule.every(15).minutes.do(check_all_vps)
logger.info("Scheduled VPS check every 15 minutes")
logger.info("===== VPS monitoring script is running =====")
heartbeat_count = 0
while True:
schedule.run_pending()
time.sleep(60)
heartbeat_count += 1
if heartbeat_count % 5 == 0:
logger.info(f"Heartbeat: Script is still running. Uptime: {heartbeat_count} minutes")
if __name__ == "__main__":
main()