|
|
|
@ -36,10 +36,8 @@ class TestParallelExecutorBase(unittest.TestCase):
|
|
|
|
|
memory_opt=False,
|
|
|
|
|
iter=50,
|
|
|
|
|
batch_size=None,
|
|
|
|
|
allow_op_delay=False,
|
|
|
|
|
feed_dict=None,
|
|
|
|
|
get_data_from_feeder=None,
|
|
|
|
|
seed=None,
|
|
|
|
|
use_parallel_executor=True,
|
|
|
|
|
use_reduce=False,
|
|
|
|
|
use_ir_memory_optimize=True,
|
|
|
|
@ -57,51 +55,23 @@ class TestParallelExecutorBase(unittest.TestCase):
|
|
|
|
|
|
|
|
|
|
main = fluid.Program()
|
|
|
|
|
startup = fluid.Program()
|
|
|
|
|
startup.random_seed = 1 # Fix random seed
|
|
|
|
|
startup.random_seed = 1
|
|
|
|
|
main.random_seed = 1
|
|
|
|
|
|
|
|
|
|
with fluid.program_guard(main, startup):
|
|
|
|
|
if seed is not None:
|
|
|
|
|
startup.random_seed = seed
|
|
|
|
|
main.random_seed = seed
|
|
|
|
|
|
|
|
|
|
loss = method(use_feed=feed_dict is not None)
|
|
|
|
|
# NOTE(zjl): memory_optimize/inplace pass would not require
|
|
|
|
|
# that loss.persistable = True
|
|
|
|
|
loss.persistable = memory_opt
|
|
|
|
|
|
|
|
|
|
if optimizer:
|
|
|
|
|
optimizer().minimize(loss)
|
|
|
|
|
|
|
|
|
|
if memory_opt:
|
|
|
|
|
fluid.memory_optimize(main)
|
|
|
|
|
|
|
|
|
|
if get_data_from_feeder is not None:
|
|
|
|
|
assert feed_dict is None
|
|
|
|
|
feed_dict = get_data_from_feeder()
|
|
|
|
|
feed_dict, loss = cls.build_model(feed_dict, get_data_from_feeder,
|
|
|
|
|
main, memory_opt, method,
|
|
|
|
|
optimizer)
|
|
|
|
|
|
|
|
|
|
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
|
|
|
|
|
exe = fluid.Executor(place)
|
|
|
|
|
exe.run(startup)
|
|
|
|
|
exec_strategy = fluid.ExecutionStrategy()
|
|
|
|
|
exec_strategy.allow_op_delay = allow_op_delay
|
|
|
|
|
if use_fast_executor:
|
|
|
|
|
exec_strategy.use_experimental_executor = True
|
|
|
|
|
|
|
|
|
|
build_strategy = fluid.BuildStrategy()
|
|
|
|
|
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \
|
|
|
|
|
if use_reduce else fluid.BuildStrategy.ReduceStrategy.AllReduce
|
|
|
|
|
build_strategy.fuse_elewise_add_act_ops = fuse_elewise_add_act_ops
|
|
|
|
|
build_strategy.fuse_relu_depthwise_conv = fuse_relu_depthwise_conv
|
|
|
|
|
build_strategy.fuse_all_optimizer_ops = fuse_all_optimizer_ops
|
|
|
|
|
build_strategy.fuse_all_reduce_ops = fuse_all_reduce_ops
|
|
|
|
|
build_strategy, exec_strategy = cls.set_strategy(
|
|
|
|
|
enable_inplace, enable_sequential_execution, fuse_all_optimizer_ops,
|
|
|
|
|
fuse_all_reduce_ops, fuse_elewise_add_act_ops,
|
|
|
|
|
fuse_relu_depthwise_conv, use_fast_executor, use_ir_memory_optimize,
|
|
|
|
|
use_reduce, use_cuda)
|
|
|
|
|
|
|
|
|
|
build_strategy.memory_optimize = use_ir_memory_optimize
|
|
|
|
|
build_strategy.enable_inplace = enable_inplace
|
|
|
|
|
build_strategy.enable_sequential_execution = enable_sequential_execution
|
|
|
|
|
|
|
|
|
|
if use_cuda and core.is_compiled_with_cuda():
|
|
|
|
|
build_strategy.remove_unnecessary_lock = True
|
|
|
|
|
if use_parallel_executor:
|
|
|
|
|
binary = compiler.CompiledProgram(main).with_data_parallel(
|
|
|
|
|
loss_name=loss.name,
|
|
|
|
@ -114,13 +84,12 @@ class TestParallelExecutorBase(unittest.TestCase):
|
|
|
|
|
batch_size *= fluid.core.get_cuda_device_count(
|
|
|
|
|
) if use_cuda else int(
|
|
|
|
|
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
|
|
|
|
|
|
|
|
|
|
begin = time.time()
|
|
|
|
|
first_loss, = run_executor(
|
|
|
|
|
exe=exe, binary=binary, feed=feed_dict, fetch_list=[loss.name])
|
|
|
|
|
|
|
|
|
|
for i in range(iter):
|
|
|
|
|
for _ in range(iter):
|
|
|
|
|
run_executor(exe=exe, binary=binary, feed=feed_dict, fetch_list=[])
|
|
|
|
|
|
|
|
|
|
last_loss, = run_executor(
|
|
|
|
|
exe=exe, binary=binary, feed=feed_dict, fetch_list=[loss.name])
|
|
|
|
|
end = time.time()
|
|
|
|
@ -138,3 +107,85 @@ class TestParallelExecutorBase(unittest.TestCase):
|
|
|
|
|
print(first_loss, last_loss)
|
|
|
|
|
# self.assertGreater(first_loss[0], last_loss[0])
|
|
|
|
|
return first_loss, last_loss
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def check_pass_conflict(cls,
|
|
|
|
|
method,
|
|
|
|
|
use_cuda=True,
|
|
|
|
|
memory_opt=False,
|
|
|
|
|
feed_dict=None,
|
|
|
|
|
get_data_from_feeder=None,
|
|
|
|
|
use_reduce=False,
|
|
|
|
|
use_ir_memory_optimize=True,
|
|
|
|
|
enable_inplace=True,
|
|
|
|
|
fuse_elewise_add_act_ops=False,
|
|
|
|
|
fuse_all_optimizer_ops=False,
|
|
|
|
|
fuse_all_reduce_ops=False,
|
|
|
|
|
fuse_relu_depthwise_conv=False,
|
|
|
|
|
optimizer=fluid.optimizer.Adam,
|
|
|
|
|
use_fast_executor=True,
|
|
|
|
|
enable_sequential_execution=False):
|
|
|
|
|
|
|
|
|
|
main = fluid.Program()
|
|
|
|
|
startup = fluid.Program()
|
|
|
|
|
with fluid.program_guard(main, startup):
|
|
|
|
|
feed_dict, loss = cls.build_model(feed_dict, get_data_from_feeder,
|
|
|
|
|
main, memory_opt, method,
|
|
|
|
|
optimizer)
|
|
|
|
|
|
|
|
|
|
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
|
|
|
|
|
exe = fluid.Executor(place)
|
|
|
|
|
exe.run(startup)
|
|
|
|
|
|
|
|
|
|
build_strategy, exec_strategy = cls.set_strategy(
|
|
|
|
|
enable_inplace, enable_sequential_execution, fuse_all_optimizer_ops,
|
|
|
|
|
fuse_all_reduce_ops, fuse_elewise_add_act_ops,
|
|
|
|
|
fuse_relu_depthwise_conv, use_fast_executor, use_ir_memory_optimize,
|
|
|
|
|
use_reduce, use_cuda)
|
|
|
|
|
|
|
|
|
|
binary = compiler.CompiledProgram(main).with_data_parallel(
|
|
|
|
|
loss_name=loss.name,
|
|
|
|
|
build_strategy=build_strategy,
|
|
|
|
|
exec_strategy=exec_strategy)
|
|
|
|
|
|
|
|
|
|
exe.run(binary, feed=feed_dict, fetch_list=[loss.name])
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def set_strategy(cls, enable_inplace, enable_sequential_execution,
|
|
|
|
|
fuse_all_optimizer_ops, fuse_all_reduce_ops,
|
|
|
|
|
fuse_elewise_add_act_ops, fuse_relu_depthwise_conv,
|
|
|
|
|
use_fast_executor, use_ir_memory_optimize, use_reduce,
|
|
|
|
|
use_cuda):
|
|
|
|
|
exec_strategy = fluid.ExecutionStrategy()
|
|
|
|
|
if use_fast_executor:
|
|
|
|
|
exec_strategy.use_experimental_executor = True
|
|
|
|
|
build_strategy = fluid.BuildStrategy()
|
|
|
|
|
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \
|
|
|
|
|
if use_reduce else fluid.BuildStrategy.ReduceStrategy.AllReduce
|
|
|
|
|
build_strategy.fuse_elewise_add_act_ops = fuse_elewise_add_act_ops
|
|
|
|
|
build_strategy.fuse_relu_depthwise_conv = fuse_relu_depthwise_conv
|
|
|
|
|
build_strategy.fuse_all_optimizer_ops = fuse_all_optimizer_ops
|
|
|
|
|
build_strategy.fuse_all_reduce_ops = fuse_all_reduce_ops
|
|
|
|
|
build_strategy.memory_optimize = use_ir_memory_optimize
|
|
|
|
|
build_strategy.enable_inplace = enable_inplace
|
|
|
|
|
build_strategy.enable_sequential_execution = enable_sequential_execution
|
|
|
|
|
|
|
|
|
|
if use_cuda and core.is_compiled_with_cuda():
|
|
|
|
|
build_strategy.remove_unnecessary_lock = True
|
|
|
|
|
return build_strategy, exec_strategy
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def build_model(cls, feed_dict, get_data_from_feeder, main, memory_opt,
|
|
|
|
|
method, optimizer):
|
|
|
|
|
loss = method(use_feed=feed_dict is not None)
|
|
|
|
|
# NOTE(zjl): memory_optimize/inplace pass would not require
|
|
|
|
|
# that loss.persistable = True
|
|
|
|
|
loss.persistable = memory_opt
|
|
|
|
|
if optimizer:
|
|
|
|
|
optimizer().minimize(loss)
|
|
|
|
|
if memory_opt:
|
|
|
|
|
fluid.memory_optimize(main)
|
|
|
|
|
if get_data_from_feeder is not None:
|
|
|
|
|
assert feed_dict is None
|
|
|
|
|
feed_dict = get_data_from_feeder()
|
|
|
|
|
return feed_dict, loss
|
|
|
|
|