Spaces:
Running
Running
# Copyright (c) Microsoft Corporation. | |
# Licensed under the MIT license. | |
import argparse | |
import ctypes | |
import json | |
import logging | |
import os | |
import re | |
import shlex | |
import sys | |
import threading | |
import time | |
from subprocess import Popen | |
import pkg_resources | |
from pyhdfs import HdfsClient | |
from .constants import (LOG_DIR, MULTI_PHASE, NNI_EXP_ID, NNI_PLATFORM, | |
NNI_SYS_DIR, NNI_TRIAL_JOB_ID) | |
from .hdfsClientUtility import (copyDirectoryToHdfs, copyHdfsDirectoryToLocal, | |
copyHdfsFileToLocal) | |
from .log_utils import LogType, RemoteLogger, StdOutputType, nni_log | |
from .rest_utils import rest_get, rest_post | |
from .url_utils import gen_parameter_meta_url, gen_send_version_url | |
logger = logging.getLogger('trial_keeper') | |
regular = re.compile('v?(?P<version>[0-9](\.[0-9]){0,1}).*') | |
_hdfs_client = None | |
def get_hdfs_client(args): | |
global _hdfs_client | |
if _hdfs_client is not None: | |
return _hdfs_client | |
# backward compatibility | |
hdfs_host = None | |
if args.hdfs_host: | |
hdfs_host = args.hdfs_host | |
elif args.pai_hdfs_host: | |
hdfs_host = args.pai_hdfs_host | |
else: | |
return None | |
if hdfs_host is not None and args.nni_hdfs_exp_dir is not None: | |
try: | |
if args.webhdfs_path: | |
_hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, | |
webhdfs_path=args.webhdfs_path, timeout=5) | |
else: | |
# backward compatibility | |
_hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, | |
timeout=5) | |
except Exception as e: | |
nni_log(LogType.Error, 'Create HDFS client error: ' + str(e)) | |
raise e | |
return _hdfs_client | |
def main_loop(args): | |
'''main loop logic for trial keeper''' | |
if not os.path.exists(LOG_DIR): | |
os.makedirs(LOG_DIR) | |
trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', | |
StdOutputType.Stdout, args.log_collection) | |
# redirect trial keeper's stdout and stderr to syslog | |
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, | |
args.log_collection) | |
sys.stdout = sys.stderr = trial_keeper_syslogger | |
hdfs_output_dir = None | |
if args.hdfs_output_dir: | |
hdfs_output_dir = args.hdfs_output_dir | |
elif args.pai_hdfs_output_dir: | |
hdfs_output_dir = args.pai_hdfs_output_dir | |
hdfs_client = get_hdfs_client(args) | |
if hdfs_client is not None: | |
copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client) | |
if args.job_id_file: | |
with open(args.job_id_file, 'w') as job_file: | |
job_file.write("%d" % os.getpid()) | |
# Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior | |
log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader() | |
process = Popen(args.trial_command, shell=True, stdout=log_pipe_stdout, stderr=log_pipe_stdout) | |
nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, | |
shlex.split( | |
args.trial_command))) | |
while True: | |
retCode = process.poll() | |
# child worker process exits and all stdout data is read | |
if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True: | |
# In Windows, the retCode -1 is 4294967295. It's larger than c_long, and raise OverflowError. | |
# So covert it to int32. | |
retCode = ctypes.c_long(retCode).value | |
nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode)) | |
if hdfs_output_dir is not None: | |
# Copy local directory to hdfs for OpenPAI | |
nni_local_output_dir = os.environ['NNI_OUTPUT_DIR'] | |
try: | |
if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client): | |
nni_log(LogType.Info, | |
'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir)) | |
else: | |
nni_log(LogType.Info, | |
'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir)) | |
except Exception as e: | |
nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e)) | |
raise e | |
# Exit as the retCode of subprocess(trial) | |
exit(retCode) | |
break | |
time.sleep(2) | |
def trial_keeper_help_info(*args): | |
print('please run --help to see guidance') | |
def check_version(args): | |
try: | |
trial_keeper_version = pkg_resources.get_distribution('nni').version | |
except pkg_resources.ResolutionError as err: | |
# package nni does not exist, try nni-tool package | |
nni_log(LogType.Error, 'Package nni does not exist!') | |
os._exit(1) | |
if not args.nni_manager_version: | |
# skip version check | |
nni_log(LogType.Warning, 'Skipping version check!') | |
else: | |
try: | |
trial_keeper_version = regular.search(trial_keeper_version).group('version') | |
nni_log(LogType.Info, 'trial_keeper_version is {0}'.format(trial_keeper_version)) | |
nni_manager_version = regular.search(args.nni_manager_version).group('version') | |
nni_log(LogType.Info, 'nni_manager_version is {0}'.format(nni_manager_version)) | |
log_entry = {} | |
if trial_keeper_version != nni_manager_version: | |
nni_log(LogType.Error, 'Version does not match!') | |
error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format( | |
nni_manager_version, trial_keeper_version) | |
log_entry['tag'] = 'VCFail' | |
log_entry['msg'] = error_message | |
rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, | |
False) | |
os._exit(1) | |
else: | |
nni_log(LogType.Info, 'Version match!') | |
log_entry['tag'] = 'VCSuccess' | |
rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, | |
False) | |
except AttributeError as err: | |
nni_log(LogType.Error, err) | |
def is_multi_phase(): | |
return MULTI_PHASE and (MULTI_PHASE in ['True', 'true']) | |
def download_parameter(meta_list, args): | |
""" | |
Download parameter file to local working directory. | |
meta_list format is defined in paiJobRestServer.ts | |
example meta_list: | |
[ | |
{"experimentId":"yWFJarYa","trialId":"UpPkl","filePath":"/chec/nni/experiments/yWFJarYa/trials/UpPkl/parameter_1.cfg"}, | |
{"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"} | |
] | |
""" | |
nni_log(LogType.Debug, str(meta_list)) | |
nni_log(LogType.Debug, | |
'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID)) | |
nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR))) | |
for meta in meta_list: | |
if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID: | |
param_fp = os.path.join(NNI_SYS_DIR, os.path.basename(meta['filePath'])) | |
if not os.path.exists(param_fp): | |
hdfs_client = get_hdfs_client(args) | |
copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False) | |
def fetch_parameter_file(args): | |
class FetchThread(threading.Thread): | |
def __init__(self, args): | |
super(FetchThread, self).__init__() | |
self.args = args | |
def run(self): | |
uri = gen_parameter_meta_url(self.args.nnimanager_ip, self.args.nnimanager_port) | |
nni_log(LogType.Info, uri) | |
while True: | |
res = rest_get(uri, 10) | |
nni_log(LogType.Debug, 'status code: {}'.format(res.status_code)) | |
if res.status_code == 200: | |
meta_list = res.json() | |
download_parameter(meta_list, self.args) | |
else: | |
nni_log(LogType.Warning, 'rest response: {}'.format(str(res))) | |
time.sleep(5) | |
fetch_file_thread = FetchThread(args) | |
fetch_file_thread.start() | |
if __name__ == '__main__': | |
'''NNI Trial Keeper main function''' | |
PARSER = argparse.ArgumentParser() | |
PARSER.set_defaults(func=trial_keeper_help_info) | |
PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process') | |
PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP') | |
PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port') | |
PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs') # backward compatibility | |
PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs') | |
PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs') # backward compatibility | |
PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs') | |
PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs') | |
PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs') | |
PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL') | |
PARSER.add_argument('--nni_manager_version', type=str, help='the nni version transmitted from nniManager') | |
PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trialkeeper') | |
PARSER.add_argument('--job_id_file', type=str, help='set job id file for operating and monitoring job.') | |
args, unknown = PARSER.parse_known_args() | |
if args.trial_command is None: | |
exit(1) | |
check_version(args) | |
try: | |
if NNI_PLATFORM == 'paiYarn' and is_multi_phase(): | |
fetch_parameter_file(args) | |
main_loop(args) | |
except SystemExit as se: | |
nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code)) | |
os._exit(se.code) | |
except Exception as e: | |
nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e))) | |
os._exit(1) | |