add save/load for parameter server (#26235)

* add save/load for parameter server
revert-24895-update_cub
123malin 5 years ago committed by GitHub
parent 0cc63cc343
commit 57d434df5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -48,4 +48,6 @@ init_server = fleet.init_server
run_server = fleet.run_server run_server = fleet.run_server
stop_worker = fleet.stop_worker stop_worker = fleet.stop_worker
distributed_optimizer = fleet.distributed_optimizer distributed_optimizer = fleet.distributed_optimizer
save_inference_model = fleet.save_inference_model
save_persistables = fleet.save_persistables
minimize = fleet.minimize minimize = fleet.minimize

@ -19,10 +19,26 @@ from .distributed_strategy import DistributedStrategy
from .meta_optimizer_factory import MetaOptimizerFactory from .meta_optimizer_factory import MetaOptimizerFactory
from .runtime_factory import RuntimeFactory from .runtime_factory import RuntimeFactory
from .util_factory import UtilFactory from .util_factory import UtilFactory
from paddle.fluid.wrapped_decorator import wrap_decorator
__all__ = ['Fleet'] __all__ = ['Fleet']
def _inited_runtime_handler_(func):
def __impl__(*args, **kwargs):
cls = args[0]
if cls._runtime_handle is None:
raise ValueError("Fleet can not find suitable runtime handler")
return func(*args, **kwargs)
return __impl__
inited_runtime_handler = wrap_decorator(_inited_runtime_handler_)
class Fleet(object): class Fleet(object):
""" """
Unified API for distributed training of PaddlePaddle Unified API for distributed training of PaddlePaddle
@ -182,34 +198,48 @@ class Fleet(object):
""" """
self._role_maker.barrier_worker() self._role_maker.barrier_worker()
@inited_runtime_handler
def init_worker(self): def init_worker(self):
""" """
init worker init worker
""" """
assert self._runtime_handle is not None
self._runtime_handle._init_worker() self._runtime_handle._init_worker()
@inited_runtime_handler
def init_server(self, *args, **kwargs): def init_server(self, *args, **kwargs):
""" """
init server init server
""" """
assert self._runtime_handle is not None
self._runtime_handle._init_server(*args, **kwargs) self._runtime_handle._init_server(*args, **kwargs)
@inited_runtime_handler
def run_server(self): def run_server(self):
""" """
run server run server
""" """
assert self._runtime_handle is not None
self._runtime_handle._run_server() self._runtime_handle._run_server()
@inited_runtime_handler
def stop_worker(self): def stop_worker(self):
""" """
stop worker stop worker
""" """
assert self._runtime_handle is not None
self._runtime_handle._stop_worker() self._runtime_handle._stop_worker()
def save_inference_model(self,
executor,
dirname,
feeded_var_names,
target_vars,
main_program=None,
export_for_deployment=True):
self._runtime_handle._save_inference_model(
executor, dirname, feeded_var_names, target_vars, main_program,
export_for_deployment)
def save_persistables(self, executor, dirname, main_program=None):
self._runtime_handle._save_persistables(executor, dirname, main_program)
def distributed_optimizer(self, optimizer, strategy=None): def distributed_optimizer(self, optimizer, strategy=None):
""" """
distirbuted_optimizer distirbuted_optimizer

@ -33,3 +33,9 @@ class RuntimeBase(object):
def _stop_worker(self): def _stop_worker(self):
pass pass
def _save_inference_model(self, *args, **kwargs):
pass
def _save_persistables(self, *args, **kwargs):
pass

@ -33,6 +33,8 @@ list(APPEND MIXED_DIST_TEST_OPS test_fleet_api_input)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_checkpoint) list(APPEND MIXED_DIST_TEST_OPS test_fleet_checkpoint)
list(APPEND MIXED_DIST_TEST_OPS test_collective_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_collective_optimizer)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_base) list(APPEND MIXED_DIST_TEST_OPS test_fleet_base)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_base_2)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_base_3)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_recompute_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_recompute_meta_optimizer)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer)
list(APPEND MIXED_DIST_TEST_OPS test_fleet_pipeline_meta_optimizer) list(APPEND MIXED_DIST_TEST_OPS test_fleet_pipeline_meta_optimizer)
@ -382,6 +384,8 @@ if(WITH_DISTRIBUTE)
py_test_modules(test_collective_optimizer MODULES test_collective_optimizer) py_test_modules(test_collective_optimizer MODULES test_collective_optimizer)
if(NOT APPLE) if(NOT APPLE)
py_test_modules(test_fleet_base MODULES test_fleet_base ENVS ${dist_ENVS}) py_test_modules(test_fleet_base MODULES test_fleet_base ENVS ${dist_ENVS})
py_test_modules(test_fleet_base_2 MODULES test_fleet_base_2 ENVS ${dist_ENVS})
py_test_modules(test_fleet_base_3 MODULES test_fleet_base_3 ENVS ${dist_ENVS})
py_test_modules(test_fleet_recompute_meta_optimizer MODULES test_fleet_recompute_meta_optimizer ENVS ${dist_ENVS}) py_test_modules(test_fleet_recompute_meta_optimizer MODULES test_fleet_recompute_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_graph_execution_meta_optimizer MODULES test_fleet_graph_execution_meta_optimizer ENVS ${dist_ENVS}) py_test_modules(test_fleet_graph_execution_meta_optimizer MODULES test_fleet_graph_execution_meta_optimizer ENVS ${dist_ENVS})
py_test_modules(test_fleet_graph_executor MODULES test_fleet_graph_executor ENVS ${dist_ENVS}) py_test_modules(test_fleet_graph_executor MODULES test_fleet_graph_executor ENVS ${dist_ENVS})

@ -162,24 +162,17 @@ class TestDistCTR2x2(FleetDistRunnerBase):
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
fleet.init_worker() fleet.init_worker()
exe.run(fleet.startup_program) exe.run(fluid.default_startup_program())
batch_size = 4 batch_size = 4
train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size) train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size)
self.reader.decorate_sample_list_generator(train_reader) self.reader.decorate_sample_list_generator(train_reader)
compiled_prog = fluid.compiler.CompiledProgram(
fleet.main_program).with_data_parallel(
loss_name=self.avg_cost.name,
build_strategy=self.strategy.get_build_strategy(),
exec_strategy=self.strategy.get_execute_strategy())
for epoch_id in range(1): for epoch_id in range(1):
self.reader.start() self.reader.start()
try: try:
pass_start = time.time() pass_start = time.time()
while True: while True:
loss_val = exe.run(program=compiled_prog, loss_val = exe.run(program=fluid.default_main_program(),
fetch_list=[self.avg_cost.name]) fetch_list=[self.avg_cost.name])
loss_val = np.mean(loss_val) loss_val = np.mean(loss_val)
# TODO(randomly fail) # TODO(randomly fail)
@ -209,7 +202,7 @@ class TestDistCTR2x2(FleetDistRunnerBase):
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
fleet.init_worker() fleet.init_worker()
exe.run(fleet.startup_program) exe.run(fluid.default_startup_program())
thread_num = 2 thread_num = 2
batch_size = 128 batch_size = 128
@ -231,7 +224,7 @@ class TestDistCTR2x2(FleetDistRunnerBase):
pass_start = time.time() pass_start = time.time()
dataset.set_filelist(filelist) dataset.set_filelist(filelist)
exe.train_from_dataset( exe.train_from_dataset(
program=fleet.main_program, program=fluid.default_main_program(),
dataset=dataset, dataset=dataset,
fetch_list=[self.avg_cost], fetch_list=[self.avg_cost],
fetch_info=["cost"], fetch_info=["cost"],

@ -152,24 +152,18 @@ class TestDistCTR2x2(FleetDistRunnerBase):
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
fleet.init_worker() fleet.init_worker()
exe.run(fleet.startup_program) exe.run(fluid.default_startup_program())
batch_size = 4 batch_size = 4
train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size) train_reader = paddle.batch(fake_ctr_reader(), batch_size=batch_size)
self.reader.decorate_sample_list_generator(train_reader) self.reader.decorate_sample_list_generator(train_reader)
compiled_prog = fluid.compiler.CompiledProgram(
fleet.main_program).with_data_parallel(
loss_name=self.avg_cost.name,
build_strategy=self.strategy.get_build_strategy(),
exec_strategy=self.strategy.get_execute_strategy())
for epoch_id in range(1): for epoch_id in range(1):
self.reader.start() self.reader.start()
try: try:
while True: while True:
loss_val = exe.run(program=compiled_prog, loss_val = exe.run(program=fluid.default_main_program(),
fetch_list=[self.avg_cost.name]) fetch_list=[self.avg_cost.name])
loss_val = np.mean(loss_val) loss_val = np.mean(loss_val)
print("TRAIN ---> pass: {} loss: {}\n".format(epoch_id, print("TRAIN ---> pass: {} loss: {}\n".format(epoch_id,

@ -31,10 +31,11 @@ import time
import tempfile import tempfile
import unittest import unittest
import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
from paddle.distributed.fleet.base.util_factory import fleet_util from paddle.distributed.fleet.base.util_factory import fleet_util
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet from paddle.distributed.fleet import fleet
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler.distributed_strategy import StrategyFactory
__all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main'] __all__ = ['FleetDistRunnerBase', 'TestFleetBase', 'runtime_main']
@ -75,21 +76,23 @@ class FleetDistRunnerBase(object):
return role return role
def build_strategy(self, args): def build_strategy(self, args):
self.strategy = None self.strategy = paddle.distributed.fleet.DistributedStrategy()
self.strategy.a_sync = False
if args.mode == "async": if args.mode == "async":
self.strategy = StrategyFactory.create_async_strategy() self.strategy = paddle.distributed.fleet.DistributedStrategy()
elif args.mode == "sync": self.strategy.a_sync = True
self.strategy = StrategyFactory.create_sync_strategy()
elif args.mode == "half_async":
self.strategy = StrategyFactory.create_half_async_strategy()
elif args.mode == "geo": elif args.mode == "geo":
self.strategy = StrategyFactory.create_geo_strategy( self.strategy = paddle.distributed.fleet.DistributedStrategy()
args.geo_sgd_need_push_nums) self.strategy.a_sync = True
self.strategy.a_sync_configs = {
"k_steps": args.geo_sgd_need_push_nums
}
self.dump_param = os.getenv("dump_param", "").split(",") self.dump_param = os.getenv("dump_param", "").split(",")
self.dump_fields = os.getenv("dump_fields", "").split(",") self.dump_fields = os.getenv("dump_fields", "").split(",")
self.dump_fields_path = os.getenv("dump_fields_path", "") self.dump_fields_path = os.getenv("dump_fields_path", "")
debug = int(os.getenv("Debug", "0")) debug = int(os.getenv("Debug", "0"))
if debug: # TODO(update strategy to support dump params)
if False: #debug:
self.strategy.set_debug_opt({ self.strategy.set_debug_opt({
"dump_param": self.dump_param, "dump_param": self.dump_param,
"dump_fields": self.dump_fields, "dump_fields": self.dump_fields,
@ -122,7 +125,7 @@ class FleetDistRunnerBase(object):
staircase=True)) staircase=True))
else: else:
optimizer = fluid.optimizer.SGD(LEARNING_RATE) optimizer = fluid.optimizer.SGD(LEARNING_RATE)
optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost) optimizer.minimize(avg_cost)
def run_pserver(self, args): def run_pserver(self, args):

@ -22,7 +22,7 @@ from test_dist_fleet_base import TestFleetBase
class TestDistMnistSync2x2(TestFleetBase): class TestDistMnistSync2x2(TestFleetBase):
def _setup_config(self): def _setup_config(self):
self._mode = "async" self._mode = "sync"
self._reader = "pyreader" self._reader = "pyreader"
def check_with_place(self, def check_with_place(self,
@ -123,7 +123,7 @@ class TestDistMnistAsyncDataset2x2(TestFleetBase):
class TestDistCtrHalfAsync2x2(TestFleetBase): class TestDistCtrHalfAsync2x2(TestFleetBase):
def _setup_config(self): def _setup_config(self):
self._mode = "half_async" self._mode = "async"
self._reader = "pyreader" self._reader = "pyreader"
def check_with_place(self, def check_with_place(self,

@ -17,6 +17,7 @@ import paddle
import paddle.distributed.fleet as fleet import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet.base.role_maker as role_maker
import os import os
import paddle.fluid as fluid
class TestFleetBase(unittest.TestCase): class TestFleetBase(unittest.TestCase):
@ -119,24 +120,9 @@ class TestFleetBase(unittest.TestCase):
optimizer = paddle.optimizer.SGD(learning_rate=0.001) optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer) optimizer = fleet.distributed_optimizer(optimizer)
def test_minimize(self): def test_exception(self):
input_x = paddle.fluid.layers.data( import paddle.distributed.fleet as fleet
name="x", shape=[32], dtype='float32') self.assertRaises(Exception, fleet.init_worker)
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
strategy = fleet.DistributedStrategy()
optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
if __name__ == "__main__": if __name__ == "__main__":

@ -0,0 +1,102 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import os
import paddle.fluid as fluid
class TestFleetBase(unittest.TestCase):
def setUp(self):
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
def test_ps_minimize(self):
import paddle
import paddle.distributed.fleet as fleet
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
os.environ["TRAINING_ROLE"] = "PSERVER"
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_PORT"] = "36001"
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
role = role_maker.PaddleCloudRoleMaker(is_collective=False)
fleet.init(role)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = False
optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
place = fluid.CPUPlace()
exe = fluid.Executor(place)
pe = fluid.ParallelExecutor(use_cuda=False, loss_name=avg_cost.name)
compiled_prog = fluid.compiler.CompiledProgram(
fluid.default_main_program())
self.assertRaises(
Exception,
fleet.save_inference_model,
dirname='/tmp/',
feeded_var_names=['x', 'y'],
target_vars=[avg_cost],
executor=pe)
self.assertRaises(
Exception,
fleet.save_inference_model,
dirname='/tmp/',
feeded_var_names=['x', 'y'],
target_vars=[avg_cost],
executor="exe")
self.assertRaises(
Exception,
fleet.save_inference_model,
dirname='/tmp/',
feeded_var_names=['x', 'y'],
target_vars=[avg_cost],
executor=exe,
main_program=compiled_prog)
self.assertRaises(
Exception, fleet.save_persistables, executor=pe, dirname='/tmp/')
self.assertRaises(
Exception, fleet.save_persistables, executor="exe", dirname='/tmp/')
self.assertRaises(
Exception,
fleet.save_persistables,
executor=exe,
dirname='/tmp/',
main_program=compiled_prog)
if __name__ == "__main__":
unittest.main()

@ -0,0 +1,52 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import os
import paddle
import paddle.distributed.fleet as fleet
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.fluid as fluid
class TestFleetBase(unittest.TestCase):
def setUp(self):
os.environ["POD_IP"] = "127.0.0.1"
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
os.environ["PADDLE_TRAINERS_NUM"] = "2"
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
"127.0.0.1:36001,127.0.0.2:36001"
def test_collective_minimize(self):
input_x = paddle.fluid.layers.data(
name="x", shape=[32], dtype='float32')
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
cost = paddle.fluid.layers.cross_entropy(
input=prediction, label=input_y)
avg_cost = paddle.fluid.layers.mean(x=cost)
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
fleet.init(role)
strategy = fleet.DistributedStrategy()
optimizer = paddle.optimizer.SGD(learning_rate=0.001)
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
optimizer.minimize(avg_cost)
if __name__ == "__main__":
unittest.main()

@ -25,6 +25,8 @@ class TestFleetRuntime(unittest.TestCase):
base._init_server() base._init_server()
base._run_server() base._run_server()
base._stop_worker() base._stop_worker()
base._save_inference_model()
base._save_persistables()
def test_fleet_collective_runtime(self): def test_fleet_collective_runtime(self):
import paddle.distributed.fleet.runtime import paddle.distributed.fleet.runtime
@ -35,6 +37,27 @@ class TestFleetRuntime(unittest.TestCase):
collective_runtime._init_worker() collective_runtime._init_worker()
collective_runtime._run_server() collective_runtime._run_server()
collective_runtime._stop_worker() collective_runtime._stop_worker()
collective_runtime._save_inference_model()
collective_runtime._save_persistables()
def test_fleet_ps_runtime(self):
ps_runtime = paddle.distributed.fleet.runtime.ParameterServerRuntime()
self.assertRaises(Exception, ps_runtime._get_optimizer_status,
"test_op", None)
reshaped_names, origin_names = ps_runtime._get_optimizer_status("adam",
"param")
self.assertTrue(
len(reshaped_names) == 2 and
reshaped_names[0] == 'param_moment1_0' and
reshaped_names[1] == 'param_moment2_0')
self.assertTrue(
len(origin_names) == 2 and
origin_names[0] == 'param_beta1_pow_acc_0' and
origin_names[1] == 'param_beta2_pow_acc_0')
reshaped_names, origin_names = ps_runtime._get_optimizer_status("sgd",
"param")
self.assertTrue(len(reshaped_names) == 0 and len(origin_names) == 0)
if __name__ == "__main__": if __name__ == "__main__":

Loading…
Cancel
Save