You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/fluid/tests/unittests/test_dist_base.py

411 lines
14 KiB

# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import time
import unittest
import os
import sys
import signal
import subprocess
import six
import argparse
import pickle
import numpy as np
import paddle.fluid as fluid
RUN_STEP = 10
DEFAULT_BATCH_SIZE = 2
class TestDistRunnerBase(object):
def get_model(self, batch_size=DEFAULT_BATCH_SIZE):
raise NotImplementedError(
"get_model should be implemented by child classes.")
@staticmethod
def get_transpiler(trainer_id,
main_program,
pserver_endpoints,
trainers,
sync_mode,
dc_asgd=False):
# NOTE: import fluid until runtime, or else forking processes will cause error.
config = fluid.DistributeTranspilerConfig()
config.enable_dc_asgd = dc_asgd
t = fluid.DistributeTranspiler(config=config)
t.transpile(
trainer_id=trainer_id,
program=main_program,
pservers=pserver_endpoints,
trainers=trainers,
sync_mode=sync_mode)
return t
def run_pserver(self, args):
self.get_model(batch_size=args.batch_size)
# NOTE: pserver should not call memory optimize
t = self.get_transpiler(args.trainer_id,
fluid.default_main_program(), args.endpoints,
args.trainers, args.sync_mode, args.dc_asgd)
pserver_prog = t.get_pserver_program(args.current_endpoint)
startup_prog = t.get_startup_program(args.current_endpoint,
pserver_prog)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_prog)
exe.run(pserver_prog)
def run_trainer(self, args):
test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
self.get_model(batch_size=args.batch_size)
if args.mem_opt:
fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
if args.is_dist:
t = self.get_transpiler(args.trainer_id,
fluid.default_main_program(),
args.endpoints, args.trainers,
args.sync_mode, args.dc_asgd)
trainer_prog = t.get_trainer_program()
else:
trainer_prog = fluid.default_main_program()
if args.use_cuda:
place = fluid.CUDAPlace(0)
else:
place = fluid.CPUPlace()
startup_exe = fluid.Executor(place)
startup_exe.run(fluid.default_startup_program())
strategy = fluid.ExecutionStrategy()
strategy.num_threads = 1
strategy.allow_op_delay = False
build_stra = fluid.BuildStrategy()
if args.batch_merge_repeat > 1:
pass_builder = build_stra._create_passes_from_strategy()
mypass = pass_builder.insert_pass(
len(pass_builder.all_passes()) - 2, "multi_batch_merge_pass")
mypass.set_int("num_repeats", args.batch_merge_repeat)
if args.use_reduce:
build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
else:
build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce
exe = fluid.ParallelExecutor(
args.use_cuda,
loss_name=avg_cost.name,
exec_strategy=strategy,
build_strategy=build_stra)
feed_var_list = [
var for var in trainer_prog.global_block().vars.values()
if var.is_data
]
feeder = fluid.DataFeeder(feed_var_list, place)
reader_generator = train_reader()
def get_data():
origin_batch = next(reader_generator)
if args.is_dist and args.use_reader_alloc:
new_batch = []
for offset, item in enumerate(origin_batch):
if offset % 2 == args.trainer_id:
new_batch.append(item)
return new_batch
else:
return origin_batch
out_losses = []
for _ in six.moves.xrange(RUN_STEP):
loss, = exe.run(fetch_list=[avg_cost.name],
feed=feeder.feed(get_data()))
out_losses.append(loss[0])
if six.PY2:
print(pickle.dumps(out_losses))
else:
sys.stdout.buffer.write(pickle.dumps(out_losses))
def runtime_main(test_class):
parser = argparse.ArgumentParser(description='Run dist test.')
parser.add_argument(
'--role', type=str, required=True, choices=['pserver', 'trainer'])
parser.add_argument('--endpoints', type=str, required=False, default="")
parser.add_argument('--is_dist', action='store_true')
parser.add_argument('--trainer_id', type=int, required=False, default=0)
parser.add_argument('--trainers', type=int, required=False, default=1)
parser.add_argument(
'--current_endpoint', type=str, required=False, default="")
parser.add_argument('--sync_mode', action='store_true')
parser.add_argument('--mem_opt', action='store_true')
parser.add_argument('--use_cuda', action='store_true')
parser.add_argument('--use_reduce', action='store_true')
parser.add_argument('--dc_asgd', action='store_true')
parser.add_argument(
'--use_reader_alloc', action='store_true', required=False)
parser.add_argument('--batch_size', required=False, type=int, default=2)
parser.add_argument(
'--batch_merge_repeat', required=False, type=int, default=1)
args = parser.parse_args()
model = test_class()
if args.role == "pserver" and args.is_dist:
model.run_pserver(args)
else:
model.run_trainer(args)
import paddle.compat as cpt
import socket
from contextlib import closing
class TestDistBase(unittest.TestCase):
def _setup_config(self):
raise NotImplementedError("tests should have _setup_config implemented")
def _after_setup_config(self):
if self._enforce_place == "CPU":
self.__use_cuda = False
elif self._enforce_place == "GPU":
self.__use_cuda = True
else:
if fluid.core.is_compiled_with_cuda():
self.__use_cuda = True
else:
self.__use_cuda = False
def setUp(self):
self._trainers = 2
self._pservers = 2
self._ps_endpoints = "127.0.0.1:%s,127.0.0.1:%s" % (
self._find_free_port(), self._find_free_port())
self._python_interp = sys.executable
self._sync_mode = True
self._enforce_place = None
self._mem_opt = False
self._use_reduce = False
self._dc_asgd = False # must use with async mode
self._use_reader_alloc = True
self._setup_config()
self._after_setup_config()
def _find_free_port(self):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(('', 0))
return s.getsockname()[1]
def start_pserver(self, model_file, check_error_log, required_envs):
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
ps_cmd = "%s %s --role pserver --endpoints %s --trainer_id 0 --current_endpoint %s --trainers %d --is_dist"
ps0_cmd = ps_cmd % \
(self._python_interp, model_file, self._ps_endpoints, ps0_ep,
self._trainers)
ps1_cmd = ps_cmd % \
(self._python_interp, model_file, self._ps_endpoints, ps1_ep,
self._trainers)
if self._sync_mode:
ps0_cmd += " --sync_mode"
ps1_cmd += " --sync_mode"
if self._mem_opt:
ps0_cmd += " --mem_opt"
ps1_cmd += " --mem_opt"
print(ps0_cmd)
print(ps1_cmd)
ps0_pipe = open("/tmp/ps0_err.log", "wb")
ps1_pipe = open("/tmp/ps1_err.log", "wb")
ps0_proc = subprocess.Popen(
ps0_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=ps0_pipe,
env=required_envs)
ps1_proc = subprocess.Popen(
ps1_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=ps1_pipe,
env=required_envs)
return ps0_proc, ps1_proc, ps0_pipe, ps1_pipe
def _run_local(self,
model,
envs,
check_error_log=False,
batch_size=DEFAULT_BATCH_SIZE,
batch_merge_repeat=1):
cmd = "%s %s --role trainer" % (self._python_interp, model)
if batch_size != DEFAULT_BATCH_SIZE:
cmd += " --batch_size %d" % batch_size
if batch_merge_repeat > 1:
cmd += " --batch_merge_repeat %d" % batch_merge_repeat
if self.__use_cuda:
cmd += " --use_cuda"
env_local = {"CUDA_VISIBLE_DEVICES": "0"}
else:
env_local = {'CPU_NUM': '1'}
envs.update(env_local)
if check_error_log:
err_log = open("/tmp/trainer.err.log", "wb")
local_proc = subprocess.Popen(
cmd.split(" "),
stdout=subprocess.PIPE,
stderr=err_log,
env=envs)
else:
local_proc = subprocess.Popen(
cmd.split(" "),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=envs)
local_out, local_err = local_proc.communicate()
if check_error_log:
err_log.close()
sys.stderr.write('local_stdout: %s\n' % pickle.loads(local_out))
sys.stderr.write('local_stderr: %s\n' % local_err)
return pickle.loads(local_out)
def _run_cluster(self, model, envs, check_error_log):
# Run dist train to compare with local results
ps0, ps1, ps0_pipe, ps1_pipe = self.start_pserver(model,
check_error_log, envs)
ps0_ep, ps1_ep = self._ps_endpoints.split(",")
tr_cmd = "%s %s --role trainer --endpoints %s --trainer_id %d --current_endpoint %s --trainers %d --is_dist"
tr0_cmd = tr_cmd % \
(self._python_interp, model, self._ps_endpoints,
0, ps0_ep, self._trainers)
tr1_cmd = tr_cmd % \
(self._python_interp, model, self._ps_endpoints,
1, ps1_ep, self._trainers)
if self._sync_mode:
tr0_cmd += " --sync_mode"
tr1_cmd += " --sync_mode"
if self._mem_opt:
tr0_cmd += " --mem_opt"
tr1_cmd += " --mem_opt"
if self._use_reduce:
tr0_cmd += " --use_reduce"
tr1_cmd += " --use_reduce"
if self._use_reader_alloc:
tr0_cmd += " --use_reader_alloc"
tr1_cmd += " --use_reader_alloc"
if self.__use_cuda:
tr0_cmd += " --use_cuda"
tr1_cmd += " --use_cuda"
env0 = {"CUDA_VISIBLE_DEVICES": "0"}
env1 = {"CUDA_VISIBLE_DEVICES": "1"}
else:
env0 = {'CPU_NUM': '1'}
env1 = {'CPU_NUM': '1'}
env0.update(envs)
env1.update(envs)
print("tr0_cmd:{}".format(tr0_cmd))
print("tr1_cmd:{}".format(tr1_cmd))
tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb")
tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr0_pipe,
env=env0)
tr1_proc = subprocess.Popen(
tr1_cmd.strip().split(" "),
stdout=subprocess.PIPE,
stderr=tr1_pipe,
env=env1)
tr0_out, tr0_err = tr0_proc.communicate()
tr1_out, tr1_err = tr1_proc.communicate()
# close trainer file
tr0_pipe.close()
tr1_pipe.close()
ps0_pipe.close()
ps1_pipe.close()
# FIXME: use terminate() instead of sigkill.
os.kill(ps0.pid, signal.SIGKILL)
os.kill(ps1.pid, signal.SIGKILL)
ps0.terminate()
ps1.terminate()
# print log
sys.stderr.write('trainer 0 stdout: %s\n' % pickle.loads(tr0_out))
sys.stderr.write('trainer 0 stderr: %s\n' % tr0_err)
sys.stderr.write('trainer 1 stdout: %s\n' % pickle.loads(tr1_out))
sys.stderr.write('trainer 1 stderr: %s\n' % tr1_err)
# return tr0_losses, tr1_losses
return pickle.loads(tr0_out), pickle.loads(tr1_out)
def check_with_place(self,
model_file,
delta=1e-3,
check_error_log=False,
need_envs={}):
# TODO(typhoonzero): should auto adapt GPU count on the machine.
required_envs = {
"PATH": os.getenv("PATH", ""),
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
"FLAGS_fraction_of_gpu_memory_to_use": "0.15",
"FLAGS_cudnn_deterministic": "1",
"http_proxy": ""
}
required_envs.update(need_envs)
if check_error_log:
required_envs["GLOG_v"] = "7"
required_envs["GLOG_logtostderr"] = "1"
local_losses\
= self._run_local(model_file, required_envs,
check_error_log)
tr0_losses, tr1_losses = self._run_cluster(model_file, required_envs,
check_error_log)
for step_id in range(RUN_STEP):
local_loss = local_losses[step_id]
tr0_loss = tr0_losses[step_id]
tr1_loss = tr1_losses[step_id]
dist_loss = (np.array([tr0_loss]) + np.array([tr1_loss])) / 2
print("=======", local_loss, ":", dist_loss[0], "=======")
self.assertAlmostEqual(local_loss, dist_loss[0], delta=delta)