You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
131 lines
3.8 KiB
131 lines
3.8 KiB
5 years ago
|
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
|
||
|
#
|
||
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
# you may not use this file except in compliance with the License.
|
||
|
# You may obtain a copy of the License at
|
||
|
#
|
||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||
|
#
|
||
|
# Unless required by applicable law or agreed to in writing, software
|
||
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
# See the License for the specific language governing permissions and
|
||
|
# limitations under the License.
|
||
|
|
||
|
from __future__ import print_function
|
||
|
|
||
|
import unittest
|
||
|
import os
|
||
|
import time
|
||
|
import copy
|
||
|
import subprocess
|
||
|
import paddle.fluid as fluid
|
||
|
|
||
|
from paddle.distributed.utils import find_free_ports, watch_local_trainers, get_cluster, TrainerProc
|
||
|
|
||
|
|
||
|
def get_cluster_from_args(selected_gpus):
|
||
|
cluster_node_ips = '127.0.0.1'
|
||
|
node_ip = '127.0.0.1'
|
||
|
|
||
|
node_ips = [x.strip() for x in cluster_node_ips.split(',')]
|
||
|
|
||
|
node_ips.index(node_ip)
|
||
|
|
||
|
free_ports = None
|
||
|
|
||
|
free_ports = find_free_ports(len(selected_gpus))
|
||
|
if free_ports is not None:
|
||
|
free_ports = list(free_ports)
|
||
|
return get_cluster(node_ips, node_ip, free_ports, selected_gpus)
|
||
|
|
||
|
|
||
|
def get_gpus(selected_gpus):
|
||
|
selected_gpus = [x.strip() for x in selected_gpus.split(',')]
|
||
|
return selected_gpus
|
||
|
|
||
|
|
||
|
def start_local_trainers(cluster,
|
||
|
pod,
|
||
|
training_script,
|
||
|
training_script_args,
|
||
|
log_dir=None):
|
||
|
current_env = copy.copy(os.environ.copy())
|
||
|
#paddle broadcast ncclUniqueId use socket, and
|
||
|
#proxy maybe make trainers unreachable, so delete them.
|
||
|
#if we set them to "", grpc will log error message "bad uri"
|
||
|
#so just delete them.
|
||
|
current_env.pop("http_proxy", None)
|
||
|
current_env.pop("https_proxy", None)
|
||
|
|
||
|
procs = []
|
||
|
for t in pod.trainers:
|
||
|
proc_env = {
|
||
|
"FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]),
|
||
|
"PADDLE_TRAINER_ID": "%d" % t.rank,
|
||
|
"PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
|
||
|
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
|
||
|
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints())
|
||
|
}
|
||
|
|
||
|
current_env.update(proc_env)
|
||
|
|
||
|
print("trainer proc env:{}".format(current_env))
|
||
|
|
||
|
if os.getenv('WITH_COVERAGE', 'OFF') == 'ON':
|
||
|
cmd = "python -m coverage run --branch -p " + training_script
|
||
|
else:
|
||
|
cmd = "python -u " + training_script
|
||
|
|
||
|
print("start trainer proc:{} env:{}".format(cmd, proc_env))
|
||
|
|
||
|
fn = None
|
||
|
|
||
|
proc = subprocess.Popen(cmd.split(" "), env=current_env)
|
||
|
|
||
|
tp = TrainerProc()
|
||
|
tp.proc = proc
|
||
|
tp.rank = t.rank
|
||
|
tp.log_fn = fn
|
||
|
tp.cmd = cmd
|
||
|
|
||
|
procs.append(tp)
|
||
|
|
||
|
return procs
|
||
|
|
||
|
|
||
|
class TestMultipleGpus(unittest.TestCase):
|
||
|
def run_mnist_2gpu(self, target_file_name):
|
||
|
if fluid.core.get_cuda_device_count() == 0:
|
||
|
return
|
||
|
|
||
|
selected_gpus = get_gpus('0,1')
|
||
|
cluster = None
|
||
|
pod = None
|
||
|
|
||
|
cluster, pod = get_cluster_from_args(selected_gpus)
|
||
|
|
||
|
procs = start_local_trainers(
|
||
|
cluster,
|
||
|
pod,
|
||
|
training_script=target_file_name,
|
||
|
training_script_args=[])
|
||
|
|
||
|
while True:
|
||
|
alive = watch_local_trainers(procs, cluster.trainers_nranks())
|
||
|
|
||
|
if not alive:
|
||
|
print("Local procs complete, POD info:{}".format(pod))
|
||
|
break
|
||
|
time.sleep(3)
|
||
|
|
||
|
def test_hapi_multiple_gpus_static(self):
|
||
|
self.run_mnist_2gpu('dist_hapi_mnist_static.py')
|
||
|
|
||
|
def test_hapi_multiple_gpus_dynamic(self):
|
||
|
self.run_mnist_2gpu('dist_hapi_mnist_dynamic.py')
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
unittest.main()
|