|
|
|
@ -49,7 +49,7 @@ class LocalSGDOptimizer(MetaOptimizerBase):
|
|
|
|
|
|
|
|
|
|
def _enable_strategy(self, dist_strategy, context):
|
|
|
|
|
dist_strategy.localsgd = True
|
|
|
|
|
dist_strategy.localsgd_configs = {"k_steps": 1}
|
|
|
|
|
dist_strategy.localsgd_configs = {"k_steps": 1, "begin_step": 1}
|
|
|
|
|
|
|
|
|
|
def snapshot_name(self, param_name):
|
|
|
|
|
return param_name + self.snapshot_key
|
|
|
|
@ -86,8 +86,9 @@ class LocalSGDOptimizer(MetaOptimizerBase):
|
|
|
|
|
minimized = self.inner_opt.minimize(
|
|
|
|
|
loss, startup_program=startup_program)
|
|
|
|
|
|
|
|
|
|
init_k_steps = self.user_defined_strategy.localsgd_configs['k_steps']
|
|
|
|
|
auto_steps = self.user_defined_strategy.auto
|
|
|
|
|
k_steps_value = self.user_defined_strategy.localsgd_configs['k_steps']
|
|
|
|
|
begin_step_value = self.user_defined_strategy.localsgd_configs[
|
|
|
|
|
'begin_step']
|
|
|
|
|
|
|
|
|
|
if startup_program is None:
|
|
|
|
|
startup_program = default_startup_program()
|
|
|
|
@ -101,45 +102,28 @@ class LocalSGDOptimizer(MetaOptimizerBase):
|
|
|
|
|
|
|
|
|
|
p2s = self.create_snapshot_vars(main_block.program)
|
|
|
|
|
with program_guard(main_block.program, startup_program):
|
|
|
|
|
step = layers.autoincreased_step_counter(begin=0)
|
|
|
|
|
step = layers.autoincreased_step_counter(begin=1)
|
|
|
|
|
k_steps = layers.create_global_var(
|
|
|
|
|
name="k_steps",
|
|
|
|
|
shape=[1],
|
|
|
|
|
value=init_k_steps,
|
|
|
|
|
value=k_steps_value,
|
|
|
|
|
dtype='int64',
|
|
|
|
|
persistable=True)
|
|
|
|
|
|
|
|
|
|
begin_step = layers.create_global_var(
|
|
|
|
|
name="begin_step",
|
|
|
|
|
shape=[1],
|
|
|
|
|
value=begin_step_value,
|
|
|
|
|
dtype='int64',
|
|
|
|
|
persistable=True)
|
|
|
|
|
|
|
|
|
|
last_step = layers.create_global_var(
|
|
|
|
|
name="last_step",
|
|
|
|
|
shape=[1],
|
|
|
|
|
value=int(0),
|
|
|
|
|
value=begin_step_value,
|
|
|
|
|
dtype='int64',
|
|
|
|
|
persistable=True)
|
|
|
|
|
|
|
|
|
|
if auto_steps:
|
|
|
|
|
avg_loss = layers.collective._c_allreduce(
|
|
|
|
|
loss) / self.role_maker.worker_num()
|
|
|
|
|
|
|
|
|
|
lr_0 = layers.create_global_var(
|
|
|
|
|
name="lr_0",
|
|
|
|
|
shape=[1],
|
|
|
|
|
value=float(0),
|
|
|
|
|
dtype='float32',
|
|
|
|
|
persistable=True)
|
|
|
|
|
loss_0 = layers.create_global_var(
|
|
|
|
|
name="loss_0",
|
|
|
|
|
shape=[1],
|
|
|
|
|
value=float(0),
|
|
|
|
|
dtype='float32',
|
|
|
|
|
persistable=True)
|
|
|
|
|
|
|
|
|
|
global_lr = self.inner_opt._global_learning_rate()
|
|
|
|
|
|
|
|
|
|
def initialize():
|
|
|
|
|
layers.assign(loss, loss_0)
|
|
|
|
|
layers.assign(global_lr, lr_0)
|
|
|
|
|
|
|
|
|
|
layers.cond(step == 0, initialize)
|
|
|
|
|
|
|
|
|
|
def communicate():
|
|
|
|
|
sub_block = default_main_program().current_block()
|
|
|
|
|
ring_id = -1
|
|
|
|
@ -195,20 +179,10 @@ class LocalSGDOptimizer(MetaOptimizerBase):
|
|
|
|
|
inputs={'X': [param]},
|
|
|
|
|
outputs={'Out': [snapshot]},
|
|
|
|
|
attrs={OP_ROLE_KEY: OpRole.Optimize})
|
|
|
|
|
|
|
|
|
|
if auto_steps:
|
|
|
|
|
next_local_steps = layers.cast(
|
|
|
|
|
layers.ceil(
|
|
|
|
|
layers.sqrt(lr_0 * loss / (global_lr * loss_0) *
|
|
|
|
|
float(init_k_steps))),
|
|
|
|
|
dtype='int64')
|
|
|
|
|
max_local_steps = layers.fill_constant(
|
|
|
|
|
shape=[1], dtype='int64', value=16)
|
|
|
|
|
next_local_steps = layers.elementwise_min(next_local_steps,
|
|
|
|
|
max_local_steps)
|
|
|
|
|
layers.assign(next_local_steps, k_steps)
|
|
|
|
|
layers.assign(step, last_step)
|
|
|
|
|
|
|
|
|
|
layers.cond(step - last_step == k_steps, communicate)
|
|
|
|
|
def begin_localsgd():
|
|
|
|
|
layers.cond(step - last_step == k_steps, communicate)
|
|
|
|
|
|
|
|
|
|
layers.cond(step > begin_step, begin_localsgd, communicate)
|
|
|
|
|
return minimized
|
|
|
|
|