|
|
|
@ -98,6 +98,8 @@ def parse_args():
|
|
|
|
|
'--use_fake_data',
|
|
|
|
|
action='store_true',
|
|
|
|
|
help='If set ommit the actual read data operators.')
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
'--profile', action='store_true', help='If set, profile a few steps.')
|
|
|
|
|
parser.add_argument(
|
|
|
|
|
'--update_method',
|
|
|
|
|
type=str,
|
|
|
|
@ -108,8 +110,8 @@ def parse_args():
|
|
|
|
|
return args
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def append_nccl2_prepare():
|
|
|
|
|
if os.getenv("PADDLE_TRAINER_ID", None) != None:
|
|
|
|
|
def append_nccl2_prepare(trainer_id):
|
|
|
|
|
if trainer_id >= 0:
|
|
|
|
|
# append gen_nccl_id at the end of startup program
|
|
|
|
|
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
|
|
|
|
|
port = os.getenv("PADDLE_PSERVER_PORT")
|
|
|
|
@ -136,12 +138,12 @@ def append_nccl2_prepare():
|
|
|
|
|
})
|
|
|
|
|
return nccl_id_var, num_trainers, trainer_id
|
|
|
|
|
else:
|
|
|
|
|
raise Exception(
|
|
|
|
|
"must set PADDLE_TRAINER_ID env variables for dist train.")
|
|
|
|
|
raise Exception("must set positive PADDLE_TRAINER_ID env variables for "
|
|
|
|
|
"nccl-based dist train.")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def dist_transpile():
|
|
|
|
|
if "PADDLE_TRAINING_ROLE" not in os.environ:
|
|
|
|
|
def dist_transpile(trainer_id):
|
|
|
|
|
if trainer_id < 0:
|
|
|
|
|
return None, None
|
|
|
|
|
|
|
|
|
|
# the port of all pservers, needed by both trainer and pserver
|
|
|
|
@ -158,9 +160,6 @@ def dist_transpile():
|
|
|
|
|
trainers = int(os.getenv("PADDLE_TRAINERS"))
|
|
|
|
|
# the IP of the local machine, needed by pserver only
|
|
|
|
|
current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port
|
|
|
|
|
# the unique trainer id, starting from 0, needed by trainer
|
|
|
|
|
# only
|
|
|
|
|
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
|
|
|
|
|
# the role, should be either PSERVER or TRAINER
|
|
|
|
|
training_role = os.getenv("PADDLE_TRAINING_ROLE")
|
|
|
|
|
|
|
|
|
@ -295,6 +294,11 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
|
|
|
|
|
iters = 0
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
for batch_id, data in enumerate(train_reader()):
|
|
|
|
|
if args.profile and pass_id == 0 and batch_id == 5:
|
|
|
|
|
profiler.start_profiler("All")
|
|
|
|
|
elif args.profile and pass_id == 0 and batch_id == 10:
|
|
|
|
|
profiler.stop_profiler("total", "/tmp/profile_%d" % trainer_id)
|
|
|
|
|
|
|
|
|
|
if iters == args.skip_batch_num:
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
num_samples = 0
|
|
|
|
@ -334,7 +338,11 @@ def print_arguments(args):
|
|
|
|
|
def main():
|
|
|
|
|
args = parse_args()
|
|
|
|
|
print_arguments(args)
|
|
|
|
|
nccl_id_var, num_trainers, trainer_id = None, 1, 0
|
|
|
|
|
|
|
|
|
|
# the unique trainer id, starting from 0, needed by trainer
|
|
|
|
|
# only
|
|
|
|
|
nccl_id_var, num_trainers, trainer_id = (
|
|
|
|
|
None, 1, int(os.getenv("PADDLE_TRAINER_ID", "-1")))
|
|
|
|
|
|
|
|
|
|
if args.use_cprof:
|
|
|
|
|
pr = cProfile.Profile()
|
|
|
|
@ -348,7 +356,7 @@ def main():
|
|
|
|
|
fluid.memory_optimize(fluid.default_main_program())
|
|
|
|
|
|
|
|
|
|
if args.update_method == "pserver":
|
|
|
|
|
train_prog, startup_prog = dist_transpile()
|
|
|
|
|
train_prog, startup_prog = dist_transpile(trainer_id)
|
|
|
|
|
if not train_prog:
|
|
|
|
|
raise Exception(
|
|
|
|
|
"Must configure correct environments to run dist train.")
|
|
|
|
@ -364,7 +372,7 @@ def main():
|
|
|
|
|
train_args.append(fluid.default_startup_program())
|
|
|
|
|
|
|
|
|
|
if args.update_method == "nccl2":
|
|
|
|
|
nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare()
|
|
|
|
|
nccl_id_var, num_trainers, trainer_id = append_nccl2_prepare(trainer_id)
|
|
|
|
|
if args.gpus == 1:
|
|
|
|
|
# NOTE: parallel executor use profiler interanlly
|
|
|
|
|
if args.use_nvprof and args.device == 'GPU':
|
|
|
|
|