# Copyright (c) Microsoft Corporation. # Licensed under the MIT license. import argparse import ctypes import json import logging import os import re import shlex import sys import threading import time from subprocess import Popen import pkg_resources from pyhdfs import HdfsClient from .constants import (LOG_DIR, MULTI_PHASE, NNI_EXP_ID, NNI_PLATFORM, NNI_SYS_DIR, NNI_TRIAL_JOB_ID) from .hdfsClientUtility import (copyDirectoryToHdfs, copyHdfsDirectoryToLocal, copyHdfsFileToLocal) from .log_utils import LogType, RemoteLogger, StdOutputType, nni_log from .rest_utils import rest_get, rest_post from .url_utils import gen_parameter_meta_url, gen_send_version_url logger = logging.getLogger('trial_keeper') regular = re.compile('v?(?P[0-9](\.[0-9]){0,1}).*') _hdfs_client = None def get_hdfs_client(args): global _hdfs_client if _hdfs_client is not None: return _hdfs_client # backward compatibility hdfs_host = None if args.hdfs_host: hdfs_host = args.hdfs_host elif args.pai_hdfs_host: hdfs_host = args.pai_hdfs_host else: return None if hdfs_host is not None and args.nni_hdfs_exp_dir is not None: try: if args.webhdfs_path: _hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, webhdfs_path=args.webhdfs_path, timeout=5) else: # backward compatibility _hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, timeout=5) except Exception as e: nni_log(LogType.Error, 'Create HDFS client error: ' + str(e)) raise e return _hdfs_client def main_loop(args): '''main loop logic for trial keeper''' if not os.path.exists(LOG_DIR): os.makedirs(LOG_DIR) trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', StdOutputType.Stdout, args.log_collection) # redirect trial keeper's stdout and stderr to syslog trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, args.log_collection) sys.stdout = sys.stderr = trial_keeper_syslogger hdfs_output_dir = None if args.hdfs_output_dir: hdfs_output_dir = args.hdfs_output_dir elif args.pai_hdfs_output_dir: hdfs_output_dir = args.pai_hdfs_output_dir hdfs_client = get_hdfs_client(args) if hdfs_client is not None: copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client) if args.job_id_file: with open(args.job_id_file, 'w') as job_file: job_file.write("%d" % os.getpid()) # Notice: We don't appoint env, which means subprocess wil inherit current environment and that is expected behavior log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader() process = Popen(args.trial_command, shell=True, stdout=log_pipe_stdout, stderr=log_pipe_stdout) nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, shlex.split( args.trial_command))) while True: retCode = process.poll() # child worker process exits and all stdout data is read if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True: # In Windows, the retCode -1 is 4294967295. It's larger than c_long, and raise OverflowError. # So covert it to int32. retCode = ctypes.c_long(retCode).value nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode)) if hdfs_output_dir is not None: # Copy local directory to hdfs for OpenPAI nni_local_output_dir = os.environ['NNI_OUTPUT_DIR'] try: if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client): nni_log(LogType.Info, 'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir)) else: nni_log(LogType.Info, 'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir)) except Exception as e: nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e)) raise e # Exit as the retCode of subprocess(trial) exit(retCode) break time.sleep(2) def trial_keeper_help_info(*args): print('please run --help to see guidance') def check_version(args): try: trial_keeper_version = pkg_resources.get_distribution('nni').version except pkg_resources.ResolutionError as err: # package nni does not exist, try nni-tool package nni_log(LogType.Error, 'Package nni does not exist!') os._exit(1) if not args.nni_manager_version: # skip version check nni_log(LogType.Warning, 'Skipping version check!') else: try: trial_keeper_version = regular.search(trial_keeper_version).group('version') nni_log(LogType.Info, 'trial_keeper_version is {0}'.format(trial_keeper_version)) nni_manager_version = regular.search(args.nni_manager_version).group('version') nni_log(LogType.Info, 'nni_manager_version is {0}'.format(nni_manager_version)) log_entry = {} if trial_keeper_version != nni_manager_version: nni_log(LogType.Error, 'Version does not match!') error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format( nni_manager_version, trial_keeper_version) log_entry['tag'] = 'VCFail' log_entry['msg'] = error_message rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, False) os._exit(1) else: nni_log(LogType.Info, 'Version match!') log_entry['tag'] = 'VCSuccess' rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, False) except AttributeError as err: nni_log(LogType.Error, err) def is_multi_phase(): return MULTI_PHASE and (MULTI_PHASE in ['True', 'true']) def download_parameter(meta_list, args): """ Download parameter file to local working directory. meta_list format is defined in paiJobRestServer.ts example meta_list: [ {"experimentId":"yWFJarYa","trialId":"UpPkl","filePath":"/chec/nni/experiments/yWFJarYa/trials/UpPkl/parameter_1.cfg"}, {"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"} ] """ nni_log(LogType.Debug, str(meta_list)) nni_log(LogType.Debug, 'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID)) nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR))) for meta in meta_list: if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID: param_fp = os.path.join(NNI_SYS_DIR, os.path.basename(meta['filePath'])) if not os.path.exists(param_fp): hdfs_client = get_hdfs_client(args) copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False) def fetch_parameter_file(args): class FetchThread(threading.Thread): def __init__(self, args): super(FetchThread, self).__init__() self.args = args def run(self): uri = gen_parameter_meta_url(self.args.nnimanager_ip, self.args.nnimanager_port) nni_log(LogType.Info, uri) while True: res = rest_get(uri, 10) nni_log(LogType.Debug, 'status code: {}'.format(res.status_code)) if res.status_code == 200: meta_list = res.json() download_parameter(meta_list, self.args) else: nni_log(LogType.Warning, 'rest response: {}'.format(str(res))) time.sleep(5) fetch_file_thread = FetchThread(args) fetch_file_thread.start() if __name__ == '__main__': '''NNI Trial Keeper main function''' PARSER = argparse.ArgumentParser() PARSER.set_defaults(func=trial_keeper_help_info) PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process') PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP') PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port') PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs') # backward compatibility PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs') PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs') # backward compatibility PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs') PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs') PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs') PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL') PARSER.add_argument('--nni_manager_version', type=str, help='the nni version transmitted from nniManager') PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trialkeeper') PARSER.add_argument('--job_id_file', type=str, help='set job id file for operating and monitoring job.') args, unknown = PARSER.parse_known_args() if args.trial_command is None: exit(1) check_version(args) try: if NNI_PLATFORM == 'paiYarn' and is_multi_phase(): fetch_parameter_file(args) main_loop(args) except SystemExit as se: nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code)) os._exit(se.code) except Exception as e: nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e))) os._exit(1)