[1.1] Load vars on PSERVER (#14037)
* fix dim0 in _load_slice_up_vars
* fix dim0 in _load_slice_up_vars, fix innershape in delete_var_op
* Revert "fix lookuptable in reduce strategy"
This reverts commit 0e722c5
* add unit test for dist
* add unit test for dist, test=develop
* cancel revert, test=develop
fix_recordio_link
parent
a8532f1a67
commit
d325e668b8
@ -0,0 +1,174 @@
|
||||
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import os
|
||||
import sys
|
||||
import signal
|
||||
import subprocess
|
||||
import argparse
|
||||
import time
|
||||
import math
|
||||
import random
|
||||
from multiprocessing import Process
|
||||
from functools import reduce
|
||||
|
||||
import numpy as np
|
||||
import unittest
|
||||
import six
|
||||
|
||||
import paddle
|
||||
import paddle.fluid as fluid
|
||||
from paddle.fluid import core
|
||||
from paddle.fluid import io
|
||||
|
||||
from test_dist_base import TestDistRunnerBase, runtime_main, RUN_STEP
|
||||
from dist_simnet_bow import TestDistSimnetBow2x2, DATA_URL, DATA_MD5
|
||||
|
||||
|
||||
class TestDistSaveLoad2x2(TestDistSimnetBow2x2):
|
||||
def _load_persistable_vars(self, executor, dirname, program):
|
||||
def _is_checkpoint_var(var):
|
||||
"""
|
||||
the checkpoint will not save or load all the variables.
|
||||
var type is FEED_MINIBATCH/FETCH_LIST/RAW or var name ends with @GRAD are discarded.
|
||||
|
||||
: param var(Variable)
|
||||
"""
|
||||
if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \
|
||||
var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \
|
||||
var.desc.type() == core.VarDesc.VarType.RAW:
|
||||
return False
|
||||
# @GRAD are named for gradient variables, checkpoint will not save it.
|
||||
if "@GRAD" in var.name:
|
||||
return False
|
||||
# .trainer_ are named for distribute train variables, checkpoint will not save it.
|
||||
if ".trainer_" in var.name:
|
||||
return False
|
||||
|
||||
# .block is named for distribute train variables, checkpoint will not save it.
|
||||
if ".block" in var.name:
|
||||
return False
|
||||
|
||||
if "tmp_" in var.name:
|
||||
return False
|
||||
|
||||
return var.persistable
|
||||
|
||||
io.load_vars(
|
||||
executor,
|
||||
dirname=dirname,
|
||||
main_program=program,
|
||||
predicate=_is_checkpoint_var,
|
||||
filename=None)
|
||||
|
||||
def run_pserver(self, args):
|
||||
self.get_model(batch_size=2)
|
||||
# NOTE: pserver should not call memory optimize
|
||||
t = self.get_transpiler(args.trainer_id,
|
||||
fluid.default_main_program(), args.endpoints,
|
||||
args.trainers, args.sync_mode)
|
||||
pserver_prog = t.get_pserver_program(args.current_endpoint)
|
||||
startup_prog = t.get_startup_program(args.current_endpoint,
|
||||
pserver_prog)
|
||||
|
||||
need_load = bool(int(os.getenv("LOAD", "0")))
|
||||
model_dir = os.getenv("MODEL_DIR", "")
|
||||
|
||||
place = fluid.CPUPlace()
|
||||
exe = fluid.Executor(place)
|
||||
exe.run(startup_prog)
|
||||
|
||||
if need_load and model_dir:
|
||||
self._load_persistable_vars(exe, model_dir, startup_prog)
|
||||
exe.run(pserver_prog)
|
||||
|
||||
def run_trainer(self, args):
|
||||
test_program, avg_cost, train_reader, test_reader, batch_acc, predict = \
|
||||
self.get_model(batch_size=2)
|
||||
|
||||
if args.mem_opt:
|
||||
fluid.memory_optimize(fluid.default_main_program(), skip_grads=True)
|
||||
if args.is_dist:
|
||||
t = self.get_transpiler(args.trainer_id,
|
||||
fluid.default_main_program(),
|
||||
args.endpoints, args.trainers,
|
||||
args.sync_mode)
|
||||
|
||||
trainer_prog = t.get_trainer_program()
|
||||
else:
|
||||
trainer_prog = fluid.default_main_program()
|
||||
|
||||
if args.use_cuda:
|
||||
place = fluid.CUDAPlace(0)
|
||||
else:
|
||||
place = fluid.CPUPlace()
|
||||
|
||||
startup_exe = fluid.Executor(place)
|
||||
startup_exe.run(fluid.default_startup_program())
|
||||
|
||||
strategy = fluid.ExecutionStrategy()
|
||||
strategy.num_threads = 1
|
||||
strategy.allow_op_delay = False
|
||||
|
||||
build_stra = fluid.BuildStrategy()
|
||||
|
||||
if args.use_reduce:
|
||||
build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
|
||||
else:
|
||||
build_stra.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce
|
||||
|
||||
exe = fluid.ParallelExecutor(
|
||||
args.use_cuda,
|
||||
loss_name=avg_cost.name,
|
||||
exec_strategy=strategy,
|
||||
build_strategy=build_stra)
|
||||
|
||||
feed_var_list = [
|
||||
var for var in trainer_prog.global_block().vars.values()
|
||||
if var.is_data
|
||||
]
|
||||
|
||||
feeder = fluid.DataFeeder(feed_var_list, place)
|
||||
reader_generator = train_reader()
|
||||
|
||||
def get_data():
|
||||
origin_batch = next(reader_generator)
|
||||
if args.is_dist and args.use_reader_alloc:
|
||||
new_batch = []
|
||||
for offset, item in enumerate(origin_batch):
|
||||
if offset % 2 == args.trainer_id:
|
||||
new_batch.append(item)
|
||||
return new_batch
|
||||
else:
|
||||
return origin_batch
|
||||
|
||||
need_save = bool(int(os.getenv("SAVE", "0")))
|
||||
model_dir = os.getenv("MODEL_DIR", "")
|
||||
|
||||
if need_save:
|
||||
for _ in six.moves.xrange(RUN_STEP):
|
||||
loss, = exe.run(fetch_list=[avg_cost.name],
|
||||
feed=feeder.feed(get_data()))
|
||||
if need_save and model_dir:
|
||||
io.save_persistables(startup_exe, model_dir, trainer_prog)
|
||||
|
||||
var = np.array(fluid.global_scope().find_var('__fc_b__').get_tensor())
|
||||
print(np.ravel(var).tolist())
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
paddle.dataset.common.download(DATA_URL, 'simnet', DATA_MD5, "train")
|
||||
runtime_main(TestDistSaveLoad2x2)
|
@ -0,0 +1,89 @@
|
||||
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from __future__ import print_function
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import unittest
|
||||
import tempfile
|
||||
|
||||
import numpy as np
|
||||
|
||||
from test_dist_base import TestDistBase, RUN_STEP
|
||||
|
||||
|
||||
class TestDistSaveLoadDense2x2(TestDistBase):
|
||||
def _setup_config(self):
|
||||
self._sync_mode = True
|
||||
self._enforce_place = "CPU"
|
||||
|
||||
def check_with_place(self,
|
||||
model_file,
|
||||
delta=1e-3,
|
||||
check_error_log=False,
|
||||
need_envs={}):
|
||||
|
||||
required_envs = {
|
||||
"PATH": os.getenv("PATH", ""),
|
||||
"PYTHONPATH": os.getenv("PYTHONPATH", ""),
|
||||
"LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""),
|
||||
"http_proxy": ""
|
||||
}
|
||||
|
||||
required_envs.update(need_envs)
|
||||
|
||||
if check_error_log:
|
||||
required_envs["GLOG_v"] = "7"
|
||||
required_envs["GLOG_logtostderr"] = "1"
|
||||
|
||||
model_dir = tempfile.mkdtemp()
|
||||
|
||||
local_env = {}
|
||||
local_env["SAVE"] = "1"
|
||||
local_env["MODEL_DIR"] = model_dir
|
||||
local_env.update(required_envs)
|
||||
|
||||
cluster_env = {}
|
||||
cluster_env["LOAD"] = "1"
|
||||
cluster_env["MODEL_DIR"] = model_dir
|
||||
cluster_env.update(required_envs)
|
||||
|
||||
local_var = self._run_local(model_file, local_env, check_error_log)
|
||||
tr0_var, tr1_var = self._run_cluster(model_file, cluster_env,
|
||||
check_error_log)
|
||||
|
||||
shutil.rmtree(model_dir)
|
||||
|
||||
local_np = np.array(eval(local_var[0]))
|
||||
train0_np = np.array(eval(tr0_var[0]))
|
||||
train1_np = np.array(eval(tr1_var[0]))
|
||||
self.assertAlmostEqual(local_np.all(), train0_np.all(), delta=delta)
|
||||
self.assertAlmostEqual(local_np.all(), train1_np.all(), delta=delta)
|
||||
self.assertAlmostEqual(train0_np.all(), train1_np.all(), delta=delta)
|
||||
|
||||
def test_dist(self):
|
||||
need_envs = {
|
||||
"IS_DISTRIBUTED": '0',
|
||||
"IS_SPARSE": '0',
|
||||
'IS_SELF_CONTAINED_LR': '1'
|
||||
}
|
||||
self.check_with_place(
|
||||
"dist_save_load.py",
|
||||
delta=0,
|
||||
check_error_log=False,
|
||||
need_envs=need_envs)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Loading…
Reference in new issue