You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/distributed/launch.py

362 lines
12 KiB

# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
paddle.distributed.launch is a module that spawns multiple distributed
process on each training node for gpu training.
Usage:
In both of single node training or multiple node training, this module
launch a process on each of the given gpu card.
1. for single node training with all visible gpu cards:
python -m paddle.distributed.launch \
your_training_py (arg1 arg2 and all others)
2. for single node training with [0,4) cards
python -m paddle.distributed.launch --selected_gpus="0,1,2,3" \
your_training_py (arg1 arg2 and all others)
3. for multiple node training such as two node:192.168.0.16, 192.168.0.17
on 192.168.0.16:
python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \
--node_ip=192.168.0.16 \
your_training_py (arg1 arg2 and all others)
on 192.168.0.17:
python -m paddle.distributed.launch --cluster_node_ips="192.168.0.16,192.168.0.17" \
--node_ip=192.168.0.17 \
your_training_py (arg1 arg2 and all others)
"""
from __future__ import print_function
import logging
import sys
from sys import version
import subprocess
import os
import time
import six
import copy
from argparse import ArgumentParser, REMAINDER
import paddle.fluid as fluid
from contextlib import closing
import socket
logger = logging.getLogger()
logger.setLevel(logging.INFO)
log_handler = logging.StreamHandler()
log_format = logging.Formatter(
'%(levelname)s %(asctime)s %(filename)s:%(lineno)d] %(message)s')
log_handler.setFormatter(log_format)
logger.addHandler(log_handler)
def _print_arguments(args):
print("----------- Configuration Arguments -----------")
for arg, value in sorted(six.iteritems(vars(args))):
print("%s: %s" % (arg, value))
print("------------------------------------------------")
def find_free_ports(num):
def __free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(('', 0))
return s.getsockname()[1]
port_set = set()
step = 0
while True:
port = __free_port()
if port not in port_set:
port_set.add(port)
if len(port_set) >= num:
return port_set
step += 1
if step > 100:
print(
"can't find avilable port and use the specified static port now!"
)
return None
return None
def _parse_args():
"""
Helper function parsing the command line options
@retval ArgumentParser
"""
parser = ArgumentParser(
description='''start paddle training using multi-process mode.
NOTE: your train program ***must*** run as distributed nccl2 mode,
see: http://www.paddlepaddle.org/documentation/docs/zh/1.6/user_guides/howto/training/cluster_howto.html#permalink-8--nccl2-
And your train program must read environment variables below in order to let different
process init properly:
FLAGS_selected_gpus
PADDLE_TRAINER_ID
PADDLE_CURRENT_ENDPOINT
PADDLE_TRAINERS_NUM
PADDLE_TRAINER_ENDPOINTS
POD_IP (current node ip address, not needed for local training)
''')
#Optional arguments for the launch helper
parser.add_argument(
"--cluster_node_ips",
type=str,
default="127.0.0.1",
help="Paddle cluster nodes ips, such as 192.168.0.16,192.168.0.17..")
parser.add_argument(
"--node_ip",
type=str,
default="127.0.0.1",
help="The current node ip. ")
parser.add_argument(
"--use_paddlecloud",
action='store_true',
help="wheter to use paddlecloud platform to run your multi-process job. If false, no need to set this argument."
)
parser.add_argument(
"--started_port",
type=int,
default=None,
help="The trainer's started port on a single node")
parser.add_argument(
"--print_config",
type=bool,
default=True,
help="Print the config or not")
parser.add_argument(
"--selected_gpus",
type=str,
default=None,
help="It's for gpu training and the training process will run on the selected_gpus,"
"each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training."
)
parser.add_argument(
"--log_dir",
type=str,
help="The path for each process's log.If it's not set, the log will printed to default pipe."
)
#positional
parser.add_argument(
"training_script",
type=str,
help="The full path to the single GPU training "
"program/script to be launched in parallel, "
"followed by all the arguments for the "
"training script")
#rest from the training program
parser.add_argument('training_script_args', nargs=REMAINDER)
return parser.parse_args()
def terminate_procs(procs):
for p in procs:
if p.poll() is None:
p.terminate()
def start_procs(args):
"""
"""
default_env = os.environ.copy()
current_node_ip = args.node_ip
node_ips = [x.strip() for x in args.cluster_node_ips.split(',')]
node_id = node_ips.index(current_node_ip)
if args.use_paddlecloud:
trainer_nums = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
if trainer_nums != 1:
#you can automatically get ip info while using paddlecloud multi nodes mode.
current_node_ip = os.getenv("POD_IP")
assert current_node_ip is not None, "POD_IP should not be None"
node_ips = os.getenv("PADDLE_TRAINERS")
assert node_ips is not None, "PADDLE_TRAINERS should not be None"
node_ips = node_ips.split(",")
node_id = os.getenv("PADDLE_TRAINER_ID")
assert node_id is not None, "PADDLE_TRAINER_ID should not be None"
node_id = int(node_id)
if args.node_ip != "127.0.0.1" and current_node_ip != args.node_ip:
logger.warning(
"Please NOTE: When using paddlecloud, current_node_ip is \
automatically got from POD_IP. Your input node_ip: {} doesn't equals to \
current_node_ip: {} from paddlecloud environment."
.format(args.node_ip, current_node_ip))
if args.cluster_node_ips != "127.0.0.1" and args.cluster_node_ips != ",".join(
node_ips):
logger.warning(
"Please NOTE: When using paddlecloud, cluster_node_ips is \
automatically got from PADDLE_TRAINERS(multi nodes) or POD_IP(single node).\
Your input cluster_node_ips: {} doesn't equals to IPs: {} from \
paddlecloud environment.".format(args.cluster_node_ips, node_ips))
num_nodes = len(node_ips)
if args.selected_gpus is None:
gpus_num = fluid.core.get_cuda_device_count()
selected_gpus = [str(x) for x in range(0, gpus_num)]
else:
cuda_visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
if cuda_visible_devices is None or cuda_visible_devices == "":
selected_gpus = [x.strip() for x in args.selected_gpus.split(',')]
else:
# change selected_gpus into relative values
# e.g. CUDA_VISIBLE_DEVICES=4,5,6,7; args.selected_gpus=4,5,6,7;
# therefore selected_gpus=0,1,2,3
cuda_visible_devices_list = cuda_visible_devices.split(',')
for x in args.selected_gpus.split(','):
assert x in cuda_visible_devices_list, "Can't find "\
"your selected_gpus %s in CUDA_VISIBLE_DEVICES[%s]."\
% (x, cuda_visible_devices)
selected_gpus = [
cuda_visible_devices_list.index(x.strip())
for x in args.selected_gpus.split(',')
]
selected_gpus_num = len(selected_gpus)
if args.use_paddlecloud and num_nodes > 1:
cloud_paddle_port = os.getenv("PADDLE_PORT", "")
cloud_paddle_port_num = os.getenv("PADDLE_PORTS_NUM", "")
if cloud_paddle_port != "" and cloud_paddle_port_num != "":
cloud_paddle_port_num = int(cloud_paddle_port_num)
if cloud_paddle_port_num >= selected_gpus_num:
args.started_port = int(cloud_paddle_port)
logger.warning("Use Cloud specified port:{}.".format(
cloud_paddle_port))
free_ports = None
if not args.use_paddlecloud and num_nodes <= 1 and args.started_port is None:
free_ports = find_free_ports(selected_gpus_num)
if free_ports is not None:
free_ports = list(free_ports)
args.started_port = free_ports[0]
if args.started_port is None:
args.started_port = 6170
if free_ports is None:
free_ports = [
x
for x in range(args.started_port, args.started_port +
selected_gpus_num)
]
trainers_endpoints = ""
for ip in node_ips:
for i in range(0, selected_gpus_num):
if trainers_endpoints != "":
trainers_endpoints += ","
trainers_endpoints += "%s:%d" % (ip, free_ports[i])
nranks = num_nodes * selected_gpus_num
if args.print_config:
print("trainers_endpoints:", trainers_endpoints, ", node_id:", node_id,
", current_node_ip:", current_node_ip, ", num_nodes:", num_nodes,
", node_ips:", node_ips, ", nranks:", nranks)
current_env = copy.copy(default_env)
#paddle broadcast ncclUniqueId use socket, and
#proxy maybe make trainers unreachable, so delete them.
#if we set them to "", grpc will log error message "bad uri"
#so just delete them.
current_env.pop("http_proxy", None)
current_env.pop("https_proxy", None)
procs = []
log_fns = []
cmds = []
ranks = []
for i in range(0, selected_gpus_num):
rank = (node_id * selected_gpus_num + i)
current_env.update({
"FLAGS_selected_gpus": "%s" % selected_gpus[i],
"PADDLE_TRAINER_ID": "%d" % rank,
"PADDLE_CURRENT_ENDPOINT":
"%s:%d" % (current_node_ip, free_ports[i]),
"PADDLE_TRAINERS_NUM": "%d" % nranks,
"PADDLE_TRAINER_ENDPOINTS": trainers_endpoints
})
cmd = [sys.executable, "-u", args.training_script
] + args.training_script_args
cmds.append(cmd)
if args.log_dir is not None:
os.system("mkdir -p {}".format(args.log_dir))
fn = open("%s/workerlog.%d" % (args.log_dir, i), "w")
log_fns.append(fn)
proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
else:
proc = subprocess.Popen(cmd, env=current_env)
procs.append(proc)
ranks.append(rank)
try:
alive = True
error = False
error_rank = []
# wait all process finish or one error
while alive and not error:
alive = False
for rank, p in zip(ranks, procs):
ret = p.poll()
if ret is None:
alive = True
elif ret != 0:
error = True
error_rank.append(rank)
time.sleep(1)
if error:
terminate_procs(procs)
exit(1)
except KeyboardInterrupt:
logger.warning("KeyboardInterrupt, exit")
terminate_procs(procs)
raise
except SystemExit:
logger.error(
"ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.".
format(nranks, error_rank))
terminate_procs(procs)
raise
except:
logger.error(
"ABORT!!! Out of all {} trainers, the trainer process with rank={} was aborted. Please check its log.".
format(nranks, error_rank))
terminate_procs(procs)
raise
finally:
for fn in log_fns:
fn.close()
def launch():
args = _parse_args()
if args.print_config:
_print_arguments(args)
start_procs(args)
if __name__ == "__main__":
launch()