|
|
|
|
|
|
|
import argparse |
|
import ctypes |
|
import json |
|
import logging |
|
import os |
|
import re |
|
import shlex |
|
import sys |
|
import threading |
|
import time |
|
from subprocess import Popen |
|
|
|
import pkg_resources |
|
from pyhdfs import HdfsClient |
|
|
|
from .constants import (LOG_DIR, MULTI_PHASE, NNI_EXP_ID, NNI_PLATFORM, |
|
NNI_SYS_DIR, NNI_TRIAL_JOB_ID) |
|
from .hdfsClientUtility import (copyDirectoryToHdfs, copyHdfsDirectoryToLocal, |
|
copyHdfsFileToLocal) |
|
from .log_utils import LogType, RemoteLogger, StdOutputType, nni_log |
|
from .rest_utils import rest_get, rest_post |
|
from .url_utils import gen_parameter_meta_url, gen_send_version_url |
|
|
|
logger = logging.getLogger('trial_keeper') |
|
regular = re.compile('v?(?P<version>[0-9](\.[0-9]){0,1}).*') |
|
|
|
_hdfs_client = None |
|
|
|
|
|
def get_hdfs_client(args): |
|
global _hdfs_client |
|
|
|
if _hdfs_client is not None: |
|
return _hdfs_client |
|
|
|
hdfs_host = None |
|
|
|
if args.hdfs_host: |
|
hdfs_host = args.hdfs_host |
|
elif args.pai_hdfs_host: |
|
hdfs_host = args.pai_hdfs_host |
|
else: |
|
return None |
|
|
|
if hdfs_host is not None and args.nni_hdfs_exp_dir is not None: |
|
try: |
|
if args.webhdfs_path: |
|
_hdfs_client = HdfsClient(hosts='{0}:80'.format(hdfs_host), user_name=args.pai_user_name, |
|
webhdfs_path=args.webhdfs_path, timeout=5) |
|
else: |
|
|
|
_hdfs_client = HdfsClient(hosts='{0}:{1}'.format(hdfs_host, '50070'), user_name=args.pai_user_name, |
|
timeout=5) |
|
except Exception as e: |
|
nni_log(LogType.Error, 'Create HDFS client error: ' + str(e)) |
|
raise e |
|
return _hdfs_client |
|
|
|
|
|
def main_loop(args): |
|
'''main loop logic for trial keeper''' |
|
|
|
if not os.path.exists(LOG_DIR): |
|
os.makedirs(LOG_DIR) |
|
|
|
trial_keeper_syslogger = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial_keeper', |
|
StdOutputType.Stdout, args.log_collection) |
|
|
|
trial_syslogger_stdout = RemoteLogger(args.nnimanager_ip, args.nnimanager_port, 'trial', StdOutputType.Stdout, |
|
args.log_collection) |
|
sys.stdout = sys.stderr = trial_keeper_syslogger |
|
hdfs_output_dir = None |
|
|
|
if args.hdfs_output_dir: |
|
hdfs_output_dir = args.hdfs_output_dir |
|
elif args.pai_hdfs_output_dir: |
|
hdfs_output_dir = args.pai_hdfs_output_dir |
|
|
|
hdfs_client = get_hdfs_client(args) |
|
|
|
if hdfs_client is not None: |
|
copyHdfsDirectoryToLocal(args.nni_hdfs_exp_dir, os.getcwd(), hdfs_client) |
|
|
|
if args.job_id_file: |
|
with open(args.job_id_file, 'w') as job_file: |
|
job_file.write("%d" % os.getpid()) |
|
|
|
|
|
log_pipe_stdout = trial_syslogger_stdout.get_pipelog_reader() |
|
process = Popen(args.trial_command, shell=True, stdout=log_pipe_stdout, stderr=log_pipe_stdout) |
|
nni_log(LogType.Info, 'Trial keeper spawns a subprocess (pid {0}) to run command: {1}'.format(process.pid, |
|
shlex.split( |
|
args.trial_command))) |
|
|
|
while True: |
|
retCode = process.poll() |
|
|
|
if retCode is not None and log_pipe_stdout.set_process_exit() and log_pipe_stdout.is_read_completed == True: |
|
|
|
|
|
retCode = ctypes.c_long(retCode).value |
|
nni_log(LogType.Info, 'subprocess terminated. Exit code is {}. Quit'.format(retCode)) |
|
if hdfs_output_dir is not None: |
|
|
|
nni_local_output_dir = os.environ['NNI_OUTPUT_DIR'] |
|
try: |
|
if copyDirectoryToHdfs(nni_local_output_dir, hdfs_output_dir, hdfs_client): |
|
nni_log(LogType.Info, |
|
'copy directory from {0} to {1} success!'.format(nni_local_output_dir, hdfs_output_dir)) |
|
else: |
|
nni_log(LogType.Info, |
|
'copy directory from {0} to {1} failed!'.format(nni_local_output_dir, hdfs_output_dir)) |
|
except Exception as e: |
|
nni_log(LogType.Error, 'HDFS copy directory got exception: ' + str(e)) |
|
raise e |
|
|
|
|
|
exit(retCode) |
|
break |
|
|
|
time.sleep(2) |
|
|
|
|
|
def trial_keeper_help_info(*args): |
|
print('please run --help to see guidance') |
|
|
|
|
|
def check_version(args): |
|
try: |
|
trial_keeper_version = pkg_resources.get_distribution('nni').version |
|
except pkg_resources.ResolutionError as err: |
|
|
|
nni_log(LogType.Error, 'Package nni does not exist!') |
|
os._exit(1) |
|
if not args.nni_manager_version: |
|
|
|
nni_log(LogType.Warning, 'Skipping version check!') |
|
else: |
|
try: |
|
trial_keeper_version = regular.search(trial_keeper_version).group('version') |
|
nni_log(LogType.Info, 'trial_keeper_version is {0}'.format(trial_keeper_version)) |
|
nni_manager_version = regular.search(args.nni_manager_version).group('version') |
|
nni_log(LogType.Info, 'nni_manager_version is {0}'.format(nni_manager_version)) |
|
log_entry = {} |
|
if trial_keeper_version != nni_manager_version: |
|
nni_log(LogType.Error, 'Version does not match!') |
|
error_message = 'NNIManager version is {0}, TrialKeeper version is {1}, NNI version does not match!'.format( |
|
nni_manager_version, trial_keeper_version) |
|
log_entry['tag'] = 'VCFail' |
|
log_entry['msg'] = error_message |
|
rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, |
|
False) |
|
os._exit(1) |
|
else: |
|
nni_log(LogType.Info, 'Version match!') |
|
log_entry['tag'] = 'VCSuccess' |
|
rest_post(gen_send_version_url(args.nnimanager_ip, args.nnimanager_port), json.dumps(log_entry), 10, |
|
False) |
|
except AttributeError as err: |
|
nni_log(LogType.Error, err) |
|
|
|
|
|
def is_multi_phase(): |
|
return MULTI_PHASE and (MULTI_PHASE in ['True', 'true']) |
|
|
|
|
|
def download_parameter(meta_list, args): |
|
""" |
|
Download parameter file to local working directory. |
|
meta_list format is defined in paiJobRestServer.ts |
|
example meta_list: |
|
[ |
|
{"experimentId":"yWFJarYa","trialId":"UpPkl","filePath":"/chec/nni/experiments/yWFJarYa/trials/UpPkl/parameter_1.cfg"}, |
|
{"experimentId":"yWFJarYa","trialId":"aIUMA","filePath":"/chec/nni/experiments/yWFJarYa/trials/aIUMA/parameter_1.cfg"} |
|
] |
|
""" |
|
nni_log(LogType.Debug, str(meta_list)) |
|
nni_log(LogType.Debug, |
|
'NNI_SYS_DIR: {}, trial Id: {}, experiment ID: {}'.format(NNI_SYS_DIR, NNI_TRIAL_JOB_ID, NNI_EXP_ID)) |
|
nni_log(LogType.Debug, 'NNI_SYS_DIR files: {}'.format(os.listdir(NNI_SYS_DIR))) |
|
for meta in meta_list: |
|
if meta['experimentId'] == NNI_EXP_ID and meta['trialId'] == NNI_TRIAL_JOB_ID: |
|
param_fp = os.path.join(NNI_SYS_DIR, os.path.basename(meta['filePath'])) |
|
if not os.path.exists(param_fp): |
|
hdfs_client = get_hdfs_client(args) |
|
copyHdfsFileToLocal(meta['filePath'], param_fp, hdfs_client, override=False) |
|
|
|
|
|
def fetch_parameter_file(args): |
|
class FetchThread(threading.Thread): |
|
def __init__(self, args): |
|
super(FetchThread, self).__init__() |
|
self.args = args |
|
|
|
def run(self): |
|
uri = gen_parameter_meta_url(self.args.nnimanager_ip, self.args.nnimanager_port) |
|
nni_log(LogType.Info, uri) |
|
|
|
while True: |
|
res = rest_get(uri, 10) |
|
nni_log(LogType.Debug, 'status code: {}'.format(res.status_code)) |
|
if res.status_code == 200: |
|
meta_list = res.json() |
|
download_parameter(meta_list, self.args) |
|
else: |
|
nni_log(LogType.Warning, 'rest response: {}'.format(str(res))) |
|
time.sleep(5) |
|
|
|
fetch_file_thread = FetchThread(args) |
|
fetch_file_thread.start() |
|
|
|
|
|
if __name__ == '__main__': |
|
'''NNI Trial Keeper main function''' |
|
PARSER = argparse.ArgumentParser() |
|
PARSER.set_defaults(func=trial_keeper_help_info) |
|
PARSER.add_argument('--trial_command', type=str, help='Command to launch trial process') |
|
PARSER.add_argument('--nnimanager_ip', type=str, default='localhost', help='NNI manager rest server IP') |
|
PARSER.add_argument('--nnimanager_port', type=str, default='8081', help='NNI manager rest server port') |
|
PARSER.add_argument('--pai_hdfs_output_dir', type=str, help='the output dir of pai_hdfs') |
|
PARSER.add_argument('--hdfs_output_dir', type=str, help='the output dir of hdfs') |
|
PARSER.add_argument('--pai_hdfs_host', type=str, help='the host of pai_hdfs') |
|
PARSER.add_argument('--hdfs_host', type=str, help='the host of hdfs') |
|
PARSER.add_argument('--pai_user_name', type=str, help='the username of hdfs') |
|
PARSER.add_argument('--nni_hdfs_exp_dir', type=str, help='nni experiment directory in hdfs') |
|
PARSER.add_argument('--webhdfs_path', type=str, help='the webhdfs path used in webhdfs URL') |
|
PARSER.add_argument('--nni_manager_version', type=str, help='the nni version transmitted from nniManager') |
|
PARSER.add_argument('--log_collection', type=str, help='set the way to collect log in trialkeeper') |
|
PARSER.add_argument('--job_id_file', type=str, help='set job id file for operating and monitoring job.') |
|
args, unknown = PARSER.parse_known_args() |
|
if args.trial_command is None: |
|
exit(1) |
|
check_version(args) |
|
try: |
|
if NNI_PLATFORM == 'paiYarn' and is_multi_phase(): |
|
fetch_parameter_file(args) |
|
main_loop(args) |
|
except SystemExit as se: |
|
nni_log(LogType.Info, 'NNI trial keeper exit with code {}'.format(se.code)) |
|
os._exit(se.code) |
|
except Exception as e: |
|
nni_log(LogType.Error, 'Exit trial keeper with code 1 because Exception: {} is catched'.format(str(e))) |
|
os._exit(1) |
|
|