|
|
@ -396,7 +396,7 @@ class DistributeTranspiler(object):
|
|
|
|
return varname
|
|
|
|
return varname
|
|
|
|
return ""
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
|
|
def __clone_lr_op_sub_block__(op, program, new_block):
|
|
|
|
def __clone_lr_op_sub_block__(op, program, lr_block):
|
|
|
|
if not op.has_attr('sub_block'):
|
|
|
|
if not op.has_attr('sub_block'):
|
|
|
|
return
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
|
@ -405,36 +405,41 @@ class DistributeTranspiler(object):
|
|
|
|
assert isinstance(origin_block, Block)
|
|
|
|
assert isinstance(origin_block, Block)
|
|
|
|
# we put the new sub block to new block to follow the block
|
|
|
|
# we put the new sub block to new block to follow the block
|
|
|
|
# hierarchy of the original blocks
|
|
|
|
# hierarchy of the original blocks
|
|
|
|
new_sub_block = program.create_block(new_block.idx)
|
|
|
|
new_sub_block = program.create_block(lr_block.idx)
|
|
|
|
|
|
|
|
|
|
|
|
# clone vars
|
|
|
|
# clone vars
|
|
|
|
for var in origin_block.vars:
|
|
|
|
for var in origin_block.vars:
|
|
|
|
new_sub_block.clone_variable(var)
|
|
|
|
new_sub_block.clone_variable(var)
|
|
|
|
|
|
|
|
|
|
|
|
# clone ops
|
|
|
|
# clone ops
|
|
|
|
for op in origin_block.ops:
|
|
|
|
for origin_op in origin_block.ops:
|
|
|
|
self._clone_lr_op(program, new_sub_block, op)
|
|
|
|
cloned_op = self._clone_lr_op(program, new_sub_block, origin_op)
|
|
|
|
# clone sub_block of op
|
|
|
|
# clone sub_block of op
|
|
|
|
__clone_lr_op_sub_block__(op, program, new_sub_block)
|
|
|
|
__clone_lr_op_sub_block__(cloned_op, program, new_sub_block)
|
|
|
|
|
|
|
|
|
|
|
|
# reset the block of op
|
|
|
|
# reset the block of op
|
|
|
|
op.set_attr('sub_block', new_sub_block)
|
|
|
|
op.set_attr('sub_block', new_sub_block)
|
|
|
|
|
|
|
|
|
|
|
|
# append lr decay ops to the child block if exists
|
|
|
|
# append lr decay ops to the child block if exists
|
|
|
|
lr_ops = self._get_lr_ops()
|
|
|
|
lr_ops = self._get_lr_ops()
|
|
|
|
|
|
|
|
# record optimize blocks and we can run them on pserver parallel
|
|
|
|
|
|
|
|
optimize_blocks = []
|
|
|
|
if len(lr_ops) > 0:
|
|
|
|
if len(lr_ops) > 0:
|
|
|
|
lr_decay_block = pserver_program.create_block(
|
|
|
|
lr_decay_block = pserver_program.create_block(
|
|
|
|
pserver_program.num_blocks - 1)
|
|
|
|
pserver_program.num_blocks - 1)
|
|
|
|
|
|
|
|
optimize_blocks.append(lr_decay_block)
|
|
|
|
for _, op in enumerate(lr_ops):
|
|
|
|
for _, op in enumerate(lr_ops):
|
|
|
|
self._append_pserver_non_opt_ops(lr_decay_block, op)
|
|
|
|
cloned_op = self._append_pserver_non_opt_ops(lr_decay_block, op)
|
|
|
|
# append sub blocks to pserver_program in lr_decay_op
|
|
|
|
# append sub blocks to pserver_program in lr_decay_op
|
|
|
|
__clone_lr_op_sub_block__(op, pserver_program, lr_decay_block)
|
|
|
|
__clone_lr_op_sub_block__(cloned_op, pserver_program,
|
|
|
|
|
|
|
|
lr_decay_block)
|
|
|
|
|
|
|
|
|
|
|
|
# append op to the current block
|
|
|
|
# append op to the current block
|
|
|
|
grad_to_block_id = []
|
|
|
|
grad_to_block_id = []
|
|
|
|
pre_block_idx = pserver_program.num_blocks - 1
|
|
|
|
pre_block_idx = pserver_program.num_blocks - 1
|
|
|
|
for idx, opt_op in enumerate(opt_op_on_pserver):
|
|
|
|
for idx, opt_op in enumerate(opt_op_on_pserver):
|
|
|
|
per_opt_block = pserver_program.create_block(pre_block_idx)
|
|
|
|
per_opt_block = pserver_program.create_block(pre_block_idx)
|
|
|
|
|
|
|
|
optimize_blocks.append(per_opt_block)
|
|
|
|
# append grad merging ops before clip and weight decay
|
|
|
|
# append grad merging ops before clip and weight decay
|
|
|
|
for _, op in enumerate(self.optimize_ops):
|
|
|
|
for _, op in enumerate(self.optimize_ops):
|
|
|
|
# find the origin @GRAD var before clipping
|
|
|
|
# find the origin @GRAD var before clipping
|
|
|
@ -453,6 +458,7 @@ class DistributeTranspiler(object):
|
|
|
|
if global_ops:
|
|
|
|
if global_ops:
|
|
|
|
opt_state_block = pserver_program.create_block(
|
|
|
|
opt_state_block = pserver_program.create_block(
|
|
|
|
pserver_program.num_blocks - 1)
|
|
|
|
pserver_program.num_blocks - 1)
|
|
|
|
|
|
|
|
optimize_blocks.append(opt_state_block)
|
|
|
|
for glb_op in global_ops:
|
|
|
|
for glb_op in global_ops:
|
|
|
|
__append_optimize_op__(glb_op, opt_state_block,
|
|
|
|
__append_optimize_op__(glb_op, opt_state_block,
|
|
|
|
grad_to_block_id, None)
|
|
|
|
grad_to_block_id, None)
|
|
|
@ -474,11 +480,11 @@ class DistributeTranspiler(object):
|
|
|
|
assert len(prefetch_var_name_to_block_id) == 0
|
|
|
|
assert len(prefetch_var_name_to_block_id) == 0
|
|
|
|
|
|
|
|
|
|
|
|
attrs = {
|
|
|
|
attrs = {
|
|
|
|
"OptimizeBlock": pserver_program.block(1),
|
|
|
|
"optimize_blocks": optimize_blocks,
|
|
|
|
"endpoint": endpoint,
|
|
|
|
"endpoint": endpoint,
|
|
|
|
"Fanin": self.trainer_num,
|
|
|
|
"Fanin": self.trainer_num,
|
|
|
|
"sync_mode": self.sync_mode,
|
|
|
|
"sync_mode": self.sync_mode,
|
|
|
|
"grad_to_block_id": grad_to_block_id
|
|
|
|
"grad_to_block_id": grad_to_block_id,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(prefetch_var_name_to_block_id) > 0:
|
|
|
|
if len(prefetch_var_name_to_block_id) > 0:
|
|
|
|
attrs['prefetch_var_name_to_block_id'] \
|
|
|
|
attrs['prefetch_var_name_to_block_id'] \
|
|
|
@ -1211,7 +1217,7 @@ class DistributeTranspiler(object):
|
|
|
|
if var not in program.global_block().vars:
|
|
|
|
if var not in program.global_block().vars:
|
|
|
|
block.clone_variable(var)
|
|
|
|
block.clone_variable(var)
|
|
|
|
|
|
|
|
|
|
|
|
block.append_op(
|
|
|
|
return block.append_op(
|
|
|
|
type=op.type, inputs=inputs, outputs=outputs, attrs=op.attrs)
|
|
|
|
type=op.type, inputs=inputs, outputs=outputs, attrs=op.attrs)
|
|
|
|
|
|
|
|
|
|
|
|
def _append_pserver_non_opt_ops(self, optimize_block, opt_op):
|
|
|
|
def _append_pserver_non_opt_ops(self, optimize_block, opt_op):
|
|
|
@ -1249,7 +1255,7 @@ class DistributeTranspiler(object):
|
|
|
|
elif not program.global_block().vars.has_key(var.name):
|
|
|
|
elif not program.global_block().vars.has_key(var.name):
|
|
|
|
program.global_block().clone_variable(var)
|
|
|
|
program.global_block().clone_variable(var)
|
|
|
|
|
|
|
|
|
|
|
|
optimize_block.append_op(
|
|
|
|
return optimize_block.append_op(
|
|
|
|
type=opt_op.type,
|
|
|
|
type=opt_op.type,
|
|
|
|
inputs=inputs,
|
|
|
|
inputs=inputs,
|
|
|
|
outputs=outputs,
|
|
|
|
outputs=outputs,
|
|
|
|