You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/fleet/launch.py

320 lines
12 KiB

# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
paddle.distributed.launch is a module that spawns multiple distributed
process on each training node for gpu training and cpu training.
Usage:
In both of single node training or multiple node training, this module
launch a process on each of the given gpu card or cpu machine.
GPU training:
1. for single node training with all visible gpu cards:
fleetrun your_training_py (arg1 arg2 and all others)
2. for single node training with [0,4) cards
fleetrun --gpus="0,1,2,3" your_training_py (arg1 arg2 and all others)
3. for multiple node training such as two node:192.168.0.16, 192.168.0.17
on 192.168.0.16:
fleetrun --ips="192.168.0.16,192.168.0.17" --node_ip=192.168.0.16 \
your_training_py (arg1 arg2 and all others)
on 192.168.0.17:
fleetrun --ips="192.168.0.16,192.168.0.17" \
--node_ip=192.168.0.17 \
your_training_py (arg1 arg2 and all others)
CPU training:
1. for single node training with multi servers and workers:
fleetrun --server_num=1 --worker_num=4 your_training_py (arg1 arg2 and all others)
2. for multiple node training such as two node:192.168.0.16, 192.168.0.17 \
with 2 servers and 4 workers.
on 192.168.0.16:
fleetrun --servers="192.168.0.16:6170,192.168.0.17:6171" \
--workers="192.168.0.16:6172,192.168.0.17:6173,192.168.0.16:6174,192.168.0.17:6175" \
your_training_py (arg1 arg2 and all others)
on 192.168.0.17:
fleetrun --servers="192.168.0.16:6170,192.168.0.17:6171" \
--workers="192.168.0.16:6172,192.168.0.17:6173,192.168.0.16:6174,192.168.0.17:6175" \
your_training_py (arg1 arg2 and all others)
"""
from __future__ import print_function
import sys
from sys import version
import subprocess
import os
import time
import six
import copy
from argparse import ArgumentParser, REMAINDER
import paddle
import paddle.fluid as fluid
from paddle.fleet.launch_utils import *
import paddle.fleet.cloud_utils as cloud_utils
def _print_arguments(args):
print("----------- Configuration Arguments -----------")
for arg, value in sorted(six.iteritems(vars(args))):
print("%s: %s" % (arg, value))
print("------------------------------------------------")
def _parse_args():
"""
Helper function parsing the command line options
@retval ArgumentParser
"""
parser = ArgumentParser(
description='''start paddle training using multi-process mode.
see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2-
''')
#Optional arguments for the launch helper
parser.add_argument(
"--ips",
type=str,
default="127.0.0.1",
help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..")
parser.add_argument(
"--gpus",
type=str,
default=None,
help="It's for gpu training and the training process will run on the gpus,"
"each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training."
)
parser.add_argument(
"--servers", type=str, default="", help="User defined servers ip:port")
parser.add_argument(
"--workers", type=str, default="", help="User defined workers ip:port")
parser.add_argument(
"--worker_num", type=int, default=2, help="number of workers")
parser.add_argument(
"--server_num", type=int, default=2, help="number of servers")
parser.add_argument(
"--log_dir",
type=str,
help="The path for each process's log.If it's not set, the log will printed to default pipe."
)
#positional
parser.add_argument(
"training_script",
type=str,
help="The full path to the single GPU training "
"program/script to be launched in parallel, "
"followed by all the arguments for the "
"training script")
#rest from the training program
parser.add_argument('training_script_args', nargs=REMAINDER)
return parser.parse_args()
def get_cluster_from_args(args, gpus):
node_ips = [x.strip() for x in args.ips.split(',')]
if len(node_ips) == 1:
node_ip = node_ips[0]
else:
_, node_ip = get_host_name_ip()
# node_ip = args.node_ip
assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips:{%s}" \
% (node_ip, node_ips)
node_rank = node_ips.index(node_ip)
logger.debug("parsed from args:node_ips:{} node_ip:{} node_rank:{}".format(
node_ips, node_ip, node_rank))
free_ports = None
if not cloud_utils.use_paddlecloud() and len(
node_ips) <= 1 and os.environ.get('FLAGS_START_PORT') is None:
free_ports = find_free_ports(len(gpus))
if free_ports is not None:
free_ports = list(free_ports)
else:
start_port = 6070
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = os.environ.get('FLAGS_START_PORT')
free_ports = [x for x in range(start_port, start_port + len(gpus))]
return get_cluster(node_ips, node_ip, free_ports, gpus)
def get_gpus(gpus):
if gpus is None:
gpus_num = fluid.core.get_cuda_device_count()
gpus = [str(x) for x in range(0, gpus_num)]
else:
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_visible_devices is None or cuda_visible_devices == "":
gpus = [x.strip() for x in gpus.split(',')]
else:
# change gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.gpus=4,5,6,7;
# therefore gpus=0,1,2,3
cuda_visible_devices_list = cuda_visible_devices.split(',')
for x in gpus.split(','):
assert x in cuda_visible_devices_list, "Can't find "\
"your gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
% (x, cuda_visible_devices)
gpus = [
cuda_visible_devices_list.index(x.strip())
for x in gpus.split(',')
]
return gpus
def launch_collective(args):
# parse arguments, used for cloud-single-machine and local
gpus = get_gpus(args.gpus)
trainers_num = cloud_utils.get_trainers_num()
logger.debug("parsed from args trainerss_num:{} gpus:{}".format(
trainers_num, gpus))
cluster = None
pod = None
if cloud_utils.use_paddlecloud() and trainers_num != 1:
cluster, pod = cloud_utils.get_cloud_cluster(args.ips, gpus)
logger.info("get cluster from cloud:{}".format(cluster))
else:
# trainers_num = 1 or not use paddlecloud ips="a,b"
cluster, pod = get_cluster_from_args(args, gpus)
logger.info("get cluster from args:{}".format(cluster))
procs = start_local_trainers(
cluster,
pod,
training_script=args.training_script,
training_script_args=args.training_script_args,
log_dir=args.log_dir)
while True:
alive = watch_local_trainers(procs, cluster.trainers_nranks())
if not alive:
logger.info("Local procs complete, POD info:{}".format(pod))
break
time.sleep(3)
def launch_ps(args):
worker_num = args.worker_num
server_num = args.server_num
start_port = 6170
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = os.environ.get('FLAGS_START_PORT')
default_env = os.environ.copy()
current_env = copy.copy(default_env)
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
procs = []
cmds = []
log_fns = []
ports = range(start_port, start_port + server_num, 1)
default_endpoints = ",".join(["127.0.0.1:" + str(x) for x in ports])
user_endpoints = ""
if args.servers == "":
user_endpoints = default_endpoints
else:
user_endpoints = args.servers
user_endpoints_ips = [x.split(":")[0] for x in user_endpoints.split(",")]
user_endpoints_port = [x.split(":")[1] for x in user_endpoints.split(",")]
for i in range(server_num):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_PORT": user_endpoints_port[i],
"TRAINING_ROLE": "PSERVER",
"PADDLE_TRAINERS_NUM": str(worker_num),
"POD_IP": user_endpoints_ips[i]
})
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
cmds.append(cmd)
if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/serverlog.%d" % (args.log_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
else:
proc = subprocess.Popen(cmd, env=current_env)
procs.append(proc)
for i in range(worker_num):
current_env.update({
"PADDLE_PSERVERS_IP_PORT_LIST": user_endpoints,
"PADDLE_TRAINERS_NUM": str(worker_num),
"TRAINING_ROLE": "TRAINER",
"PADDLE_TRAINER_ID": str(i)
})
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
cmds.append(cmd)
if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/workerlog.%d" % (args.log_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
else:
proc = subprocess.Popen(cmd, env=current_env)
procs.append(proc)
# only wait worker to finish here
for i, proc in enumerate(procs):
if i < server_num:
continue
procs[i].wait()
if len(log_fns) > 0:
log_fns[i].close()
print("all workers exit, going to finish parameter server", file=sys.stderr)
for i in range(server_num):
if len(log_fns) > 0:
log_fns[i].close()
procs[i].terminate()
print("all parameter server are killed", file=sys.stderr)
def launch():
args = _parse_args()
logger = get_logger()
_print_arguments(args)
ps_args = ['--worker_num', '--server_num', '--servers', '--workers']
collective_args = ['--ips', '--gpus']
has_ps_args = [
ps_arg for ps_arg in ps_args if ps_arg in " ".join(sys.argv[1:-1])
]
has_collective_args = [
co_arg for co_arg in collective_args
if co_arg in " ".join(sys.argv[1:-1])
]
if len(has_ps_args) > 0 or fluid.core.get_cuda_device_count() == 0:
logger.info("Run cpu parameter-sever mode.")
launch_ps(args)
elif len(has_collective_args) > 0:
logger.info("Run gpu collective mode.")
launch_collective(args)
else:
logger.warning(
"Not found distinct args. Default use gpu collective mode")
launch_collective(args)
if __name__ == "__main__":
launch()