|
|
|
@ -18,35 +18,36 @@ import os
|
|
|
|
|
import paddle.distributed.fleet as fleet
|
|
|
|
|
import paddle.distributed.fleet.base.role_maker as role_maker
|
|
|
|
|
|
|
|
|
|
from fleet_meta_optimizer_base import TestFleetMetaOptimizer
|
|
|
|
|
|
|
|
|
|
class TestFleetGradientMergeMetaOptimizer(unittest.TestCase):
|
|
|
|
|
def setUp(self):
|
|
|
|
|
os.environ["POD_IP"] = "127.0.0.1"
|
|
|
|
|
os.environ["PADDLE_TRAINER_ENDPOINTS"] = "127.0.0.1:36001"
|
|
|
|
|
os.environ["PADDLE_TRAINERS_NUM"] = "2"
|
|
|
|
|
os.environ["PADDLE_PSERVERS_IP_PORT_LIST"] = \
|
|
|
|
|
"127.0.0.1:36001,127.0.0.2:36001"
|
|
|
|
|
paddle.enable_static()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestFleetGradientMergeMetaOptimizer(TestFleetMetaOptimizer):
|
|
|
|
|
def test_gradient_merge_optimizer(self):
|
|
|
|
|
role = role_maker.PaddleCloudRoleMaker(is_collective=True)
|
|
|
|
|
fleet.init(role)
|
|
|
|
|
input_x = paddle.fluid.layers.data(
|
|
|
|
|
name="x", shape=[32], dtype='float32')
|
|
|
|
|
input_y = paddle.fluid.layers.data(name="y", shape=[1], dtype='int64')
|
|
|
|
|
|
|
|
|
|
fc_1 = paddle.fluid.layers.fc(input=input_x, size=64, act='tanh')
|
|
|
|
|
fc_2 = paddle.fluid.layers.fc(input=fc_1, size=64, act='tanh')
|
|
|
|
|
prediction = paddle.fluid.layers.fc(input=[fc_2], size=2, act='softmax')
|
|
|
|
|
cost = paddle.fluid.layers.cross_entropy(
|
|
|
|
|
input=prediction, label=input_y)
|
|
|
|
|
avg_cost = paddle.fluid.layers.mean(x=cost)
|
|
|
|
|
|
|
|
|
|
strategy = paddle.distributed.fleet.DistributedStrategy()
|
|
|
|
|
strategy.gradient_merge = True
|
|
|
|
|
strategy.gradient_merge_configs = {"k_steps": 2, "avg": True}
|
|
|
|
|
optimizer = paddle.fluid.optimizer.SGD(learning_rate=0.01)
|
|
|
|
|
optimizer = fleet.distributed_optimizer(optimizer, strategy=strategy)
|
|
|
|
|
optimizer.minimize(avg_cost)
|
|
|
|
|
train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program(
|
|
|
|
|
)
|
|
|
|
|
avg_cost, strategy = self.net(train_prog, startup_prog)
|
|
|
|
|
self.set_strategy(strategy, 'gradient_merge')
|
|
|
|
|
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
|
|
|
|
|
|
|
|
|
|
vars = [x.name for x in train_prog.list_vars()]
|
|
|
|
|
with open("main_program", 'w') as f:
|
|
|
|
|
f.write(str(train_prog))
|
|
|
|
|
|
|
|
|
|
self.assertIn('@GradientMerge', ''.join(vars))
|
|
|
|
|
|
|
|
|
|
def test_recom_gm_optimizer(self):
|
|
|
|
|
train_prog, startup_prog = paddle.fluid.Program(), paddle.fluid.Program(
|
|
|
|
|
)
|
|
|
|
|
avg_cost, strategy = self.net(train_prog, startup_prog)
|
|
|
|
|
self.set_strategy(strategy, 'gradient_merge')
|
|
|
|
|
self.set_strategy(strategy, 'recompute')
|
|
|
|
|
self.optimizer(avg_cost, strategy, train_prog, startup_prog)
|
|
|
|
|
|
|
|
|
|
vars = [x.name for x in train_prog.list_vars()]
|
|
|
|
|
self.assertIn('@GradientMerge', ''.join(vars))
|
|
|
|
|
self.assertIn('subprog', ''.join(vars))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
|