diff --git a/doc/v2/build_and_install/build_from_source_cn.rst b/doc/v2/build_and_install/build_from_source_cn.rst index de7e9eb75c..6421c53082 100644 --- a/doc/v2/build_and_install/build_from_source_cn.rst +++ b/doc/v2/build_and_install/build_from_source_cn.rst @@ -106,7 +106,7 @@ PaddlePaddle需要使用Docker环境完成编译,这样可以免去单独安 - 学习 Docker 有多难? - 理解 Docker 并不难,大概花十分钟看一下 `这篇文章 `_ 。这可以帮您省掉花一小时安装和配置各种开发工具,以及切换机器时需要新安装的辛苦。别忘了 PaddlePaddle 更新可能导致需要新的开发工具。更别提简化问题复现带来的好处了。 + 理解 Docker 并不难,大概花十分钟看一下 `如何使用Docker `_ 。这可以帮您省掉花一小时安装和配置各种开发工具,以及切换机器时需要新安装的辛苦。别忘了 PaddlePaddle 更新可能导致需要新的开发工具。更别提简化问题复现带来的好处了。 - 我可以用 IDE 吗? @@ -123,7 +123,7 @@ PaddlePaddle需要使用Docker环境完成编译,这样可以免去单独安 - 可以并行编译吗? - 是的。我们的 Docker image 运行一个 `Bash脚本 `_ 。这个脚本调用 `make -j$(nproc)` 来启动和 CPU 核一样多的进程来并行编译。 + 是的。我们的 Docker image 运行一个 `Paddle编译Bash脚本 `_ 。这个脚本调用 `make -j$(nproc)` 来启动和 CPU 核一样多的进程来并行编译。 - Docker 需要 sudo @@ -131,11 +131,11 @@ PaddlePaddle需要使用Docker环境完成编译,这样可以免去单独安 - 在 Windows/MacOS 上编译很慢 - Docker 在 Windows 和 MacOS 都可以运行。不过实际上是运行在一个 Linux 虚拟机上。可能需要注意给这个虚拟机多分配一些 CPU 和内存,以保证编译高效。具体做法请参考 `这个issue `_ 。 + Docker 在 Windows 和 MacOS 都可以运行。不过实际上是运行在一个 Linux 虚拟机上。可能需要注意给这个虚拟机多分配一些 CPU 和内存,以保证编译高效。具体做法请参考 `如何为Windows/Mac计算机上的Docker增加内存和虚拟机 `_ 。 - 磁盘不够 - 本文中的例子里,`docker run` 命令里都用了 `--rm` 参数,这样保证运行结束之后的 containers 不会保留在磁盘上。可以用 `docker ps -a` 命令看到停止后但是没有删除的 containers。`docker build` 命令有时候会产生一些中间结果,是没有名字的 images,也会占用磁盘。可以参考 `这篇文章 `_ 来清理这些内容。 + 本文中的例子里,`docker run` 命令里都用了 `--rm` 参数,这样保证运行结束之后的 containers 不会保留在磁盘上。可以用 `docker ps -a` 命令看到停止后但是没有删除的 containers。`docker build` 命令有时候会产生一些中间结果,是没有名字的 images,也会占用磁盘。可以参考 `如何删除Docker Container `_ 来清理这些内容。 .. _compile_deps: @@ -195,7 +195,7 @@ BLAS PaddlePaddle支持 `MKL `_ 和 `OpenBlAS `_ 两种BLAS库。默认使用MKL。如果使用MKL并且机器含有AVX2指令集, -还会下载MKL-DNN数学库,详细参考 `这里 `_ 。 +还会下载MKL-DNN数学库,详细参考 `mkldnn设计文档 `_ 。 如果关闭MKL,则会使用OpenBLAS作为BLAS库。 diff --git a/paddle/fluid/framework/data_type.cc b/paddle/fluid/framework/data_type.cc index b6b93cf422..60382faffb 100644 --- a/paddle/fluid/framework/data_type.cc +++ b/paddle/fluid/framework/data_type.cc @@ -28,6 +28,9 @@ struct DataTypeMap { }; static DataTypeMap* InitDataTypeMap(); +// C++11 removes the need for manual locking. Concurrent execution shall wait if +// a static local variable is already being initialized. +// https://stackoverflow.com/questions/11711920/how-to-implement-multithread-safe-singleton-in-c11-without-using-mutex static DataTypeMap& gDataTypeMap() { static DataTypeMap* g_data_type_map_ = InitDataTypeMap(); return *g_data_type_map_; diff --git a/paddle/fluid/framework/details/fuse_vars_op_handle.cc b/paddle/fluid/framework/details/fuse_vars_op_handle.cc index 32415c192f..018c9bff71 100644 --- a/paddle/fluid/framework/details/fuse_vars_op_handle.cc +++ b/paddle/fluid/framework/details/fuse_vars_op_handle.cc @@ -42,7 +42,7 @@ void FuseVarsOpHandle::RunImpl() { out_t->ShareDataWith(out_tensor->Slice(s, s + numel)); s += numel; } - this->RunAndRecordEvent([this] {}); + this->RunAndRecordEvent([] {}); } std::string FuseVarsOpHandle::Name() const { return "fuse vars"; } diff --git a/paddle/fluid/inference/tensorrt/convert/ut_helper.h b/paddle/fluid/inference/tensorrt/convert/ut_helper.h index 8613d5b1c1..236d169017 100644 --- a/paddle/fluid/inference/tensorrt/convert/ut_helper.h +++ b/paddle/fluid/inference/tensorrt/convert/ut_helper.h @@ -151,7 +151,8 @@ class TRTConvertValidation { // Compare two output ASSERT_FALSE(fluid_out.empty()); for (size_t i = 0; i < fluid_out.size(); i++) { - EXPECT_LT(std::abs(fluid_out[i] - trt_out[i]), 1e-6); + // Loose the threshold for CI in different machine model. + EXPECT_LT(std::abs(fluid_out[i] - trt_out[i]), 2e-5); } } } diff --git a/paddle/fluid/operators/activation_op.cc b/paddle/fluid/operators/activation_op.cc index 8478ae20a5..96e4c0e04c 100644 --- a/paddle/fluid/operators/activation_op.cc +++ b/paddle/fluid/operators/activation_op.cc @@ -24,12 +24,12 @@ namespace operators { : public ::paddle::framework::OpProtoAndCheckerMaker { \ public: \ void Make() override { \ - AddInput("X", "Input of " #OP_NAME "operator"); \ - AddOutput("Out", "Output of" #OP_NAME "operator"); \ + AddInput("X", "Input of " #OP_NAME " operator"); \ + AddOutput("Out", "Output of " #OP_NAME " operator"); \ AddAttr("use_mkldnn", \ "(bool, default false) Only used in mkldnn kernel") \ .SetDefault(false); \ - AddComment(#OP_COMMENT); \ + AddComment(OP_COMMENT); \ } \ } diff --git a/paddle/fluid/operators/crop_op.cc b/paddle/fluid/operators/crop_op.cc index 669b3bbe9d..5b5a220cf9 100644 --- a/paddle/fluid/operators/crop_op.cc +++ b/paddle/fluid/operators/crop_op.cc @@ -48,6 +48,13 @@ class CropOp : public framework::OperatorWithKernel { ctx->SetOutputDim("Out", y_dim); } } + + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext& ctx) const override { + return framework::OpKernelType( + framework::ToDataType(ctx.Input("X")->type()), + ctx.device_context()); + } }; class CropOpMaker : public framework::OpProtoAndCheckerMaker { @@ -60,13 +67,19 @@ class CropOpMaker : public framework::OpProtoAndCheckerMaker { "The input used as reference for cropping, " "which is of the same dimensions as X.") .AsDispensable(); + AddInput("Offsets", + "The input used to describe offsets in runtime, which is a " + "1-D vector whose size equals to the rank of input 'X'. The " + "elements data type must be int.") + .AsDispensable(); AddOutput("Out", "The output of crop op, " "which is of the same dimensions as X."); AddAttr>("offsets", "A list describing offsets to be cropped. " "The size of offsets list should be the same as " - "the dimension size of input X."); + "the dimension size of input X.") + .SetDefault(std::vector()); AddAttr>("shape", "A list describing the shape of output. " "The size of shape list should be the same as " @@ -77,6 +90,17 @@ Crop Operator. Crop input into output, as specified by offsets and shape. +There are two ways to set the offsets: +1. In runtime: Using the input 'Offsets', which is a Vairbale and can be + output of other operators. This way is suitable for + dynamic offsets. +2. In network configuration: Using the attribute 'offsets', which will be + set in Python configure script. This way is + suitable for fixed offsets. +You CANNOT use these two ways at the same time. An exception will be raised +if input 'Offset' is configured and meanwhile the attribute 'offsets' is +not empty. + There are two ways to set shape: 1. reference input: crop input X into the same shape as reference input. The dimension of reference input should @@ -146,6 +170,15 @@ class CropOpGrad : public framework::OperatorWithKernel { ctx->SetOutputDim(x_grad_name, x_dims); } } + + framework::OpKernelType GetExpectedKernelType( + const framework::ExecutionContext& ctx) const override { + return framework::OpKernelType( + framework::ToDataType( + ctx.Input(framework::GradVarName("Out")) + ->type()), + ctx.device_context()); + } }; } // namespace operators diff --git a/paddle/fluid/operators/crop_op.h b/paddle/fluid/operators/crop_op.h index f05c2e2328..91cfbbda73 100644 --- a/paddle/fluid/operators/crop_op.h +++ b/paddle/fluid/operators/crop_op.h @@ -27,6 +27,37 @@ template ; using framework::Tensor; +static std::vector GetOffsets(const framework::ExecutionContext& ctx) { + std::vector res; + int rank = ctx.Input("X")->dims().size(); + if (ctx.HasInput("Offsets")) { + PADDLE_ENFORCE(ctx.Attr>("offsets").empty(), + "Input 'Offsets' and attribute 'offsets' should not be used " + "at the same time."); + const auto* offsets_tensor = ctx.Input("Offsets"); + PADDLE_ENFORCE_EQ(offsets_tensor->dims().size(), 1); + PADDLE_ENFORCE_EQ( + rank, offsets_tensor->dims()[0], + "Offsets size should be equal to dimension size of input tensor."); + const int* offsets_data; + framework::Tensor cpu_tmp_tensor; + if (platform::is_cpu_place(offsets_tensor->place())) { + offsets_data = offsets_tensor->data(); + } else { + framework::TensorCopySync(*offsets_tensor, platform::CPUPlace(), + &cpu_tmp_tensor); + offsets_data = cpu_tmp_tensor.data(); + } + res = std::vector(offsets_data, offsets_data + rank); + } else { + res = ctx.Attr>("offsets"); + PADDLE_ENFORCE_EQ( + rank, res.size(), + "Offsets size should be equal to dimension size of input tensor."); + } + return res; +} + template class CropKernel : public framework::OpKernel { public: @@ -37,10 +68,7 @@ class CropKernel : public framework::OpKernel { T* out_data = out->mutable_data(context.GetPlace()); auto x_stride = framework::stride(x->dims()); auto out_stride = framework::stride(out->dims()); - auto offsets = context.Attr>("offsets"); - PADDLE_ENFORCE_EQ( - x->dims().size(), static_cast(offsets.size()), - "Offsets size should be equal to dimension size of input tensor."); + auto offsets = GetOffsets(context); int64_t offset = 0; for (size_t i = 0; i < offsets.size(); ++i) { offset += (x_stride[i] * offsets[i]); @@ -56,7 +84,7 @@ void CropGradFunction(const framework::ExecutionContext& context) { if (d_x != nullptr) { auto* d_out = context.Input(framework::GradVarName("Out")); d_x->mutable_data(context.GetPlace()); - auto offsets = context.Attr>("offsets"); + auto offsets = GetOffsets(context); Eigen::array, D> paddings; for (size_t i = 0; i < D; ++i) { paddings[i].first = offsets[i]; diff --git a/paddle/fluid/operators/detail/request_handler.h b/paddle/fluid/operators/detail/request_handler.h index 4bc5e7f10e..d74206aaba 100644 --- a/paddle/fluid/operators/detail/request_handler.h +++ b/paddle/fluid/operators/detail/request_handler.h @@ -80,7 +80,6 @@ class RequestHandler { } framework::ProgramDesc* program() { return program_; } framework::Executor* executor() { return executor_; } - std::vector& sparse_vars() { return sparse_vars_; } // This function processes user's rpc request. // The implemention is in request_handler_impl. @@ -113,13 +112,7 @@ class RequestHandler { std::unordered_map>* grad_to_prepared_ctx_; - - // Record received sparse variables, so that - // we could reset those after execute optimize program - std::vector sparse_vars_; RPCServer* rpc_server_; - - std::mutex sparse_var_mutex_; }; } // namespace detail diff --git a/paddle/fluid/operators/detail/request_handler_impl.cc b/paddle/fluid/operators/detail/request_handler_impl.cc index f16c06d52f..9473dce550 100644 --- a/paddle/fluid/operators/detail/request_handler_impl.cc +++ b/paddle/fluid/operators/detail/request_handler_impl.cc @@ -63,16 +63,22 @@ bool RequestSendHandler::Handle(const std::string& varname, PADDLE_THROW("sync: Can not find server side var"); return false; } - if (invar->IsType()) { - std::unique_lock lock(sparse_var_mutex_); + std::unique_lock lock(mutex_sparse_vars_); sparse_vars_.push_back(invar); } } - return true; } +void RequestSendHandler::ResetSparseVarRecorder() { + std::unique_lock lock(mutex_sparse_vars_); + for (auto* var : sparse_vars_) { + var->GetMutable()->mutable_rows()->clear(); + } + sparse_vars_.clear(); +} + bool RequestGetHandler::Handle(const std::string& varname, framework::Scope* scope, framework::Variable* invar, diff --git a/paddle/fluid/operators/detail/request_handler_impl.h b/paddle/fluid/operators/detail/request_handler_impl.h index 8d0c62232b..443d951914 100644 --- a/paddle/fluid/operators/detail/request_handler_impl.h +++ b/paddle/fluid/operators/detail/request_handler_impl.h @@ -41,6 +41,11 @@ class RequestSendHandler final : public RequestHandler { virtual ~RequestSendHandler() {} bool Handle(const std::string& varname, framework::Scope* scope, framework::Variable* var, framework::Variable** outvar) override; + void ResetSparseVarRecorder(); + + private: + std::mutex mutex_sparse_vars_; + std::vector sparse_vars_; }; class RequestGetHandler final : public RequestHandler { diff --git a/paddle/fluid/operators/detail/rpc_server.h b/paddle/fluid/operators/detail/rpc_server.h index c2e7ae706c..f809c13c72 100644 --- a/paddle/fluid/operators/detail/rpc_server.h +++ b/paddle/fluid/operators/detail/rpc_server.h @@ -60,6 +60,7 @@ class RPCServer { void SetCond(const std::string& rpc_name); void WaitCond(const std::string& rpc_name); void IncreaseBatchBarrier(const std::string rpc_name); + void ResetBarrierCounter(); protected: diff --git a/paddle/fluid/operators/gather_test.cc b/paddle/fluid/operators/gather_test.cc index 9c0561b016..f6b156eb30 100644 --- a/paddle/fluid/operators/gather_test.cc +++ b/paddle/fluid/operators/gather_test.cc @@ -43,7 +43,8 @@ TEST(Gather, GatherData) { auto* cpu_place = new paddle::platform::CPUPlace(); paddle::platform::CPUDeviceContext ctx(*cpu_place); paddle::operators::CPUGather(ctx, *src, *index, output); - + delete cpu_place; + cpu_place = NULL; for (int i = 0; i < 4; ++i) EXPECT_EQ(p_output[i], i + 4); for (int i = 4; i < 8; ++i) EXPECT_EQ(p_output[i], i - 4); diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 66a0f87b46..66d31c8895 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -108,9 +108,6 @@ void ListenAndServOp::RunSyncLoop(framework::Executor *executor, std::shared_ptr(nullptr)); rpc_service_->ResetBarrierCounter(); - // Record received sparse variables, so that - // we could reset those after execute optimize program - std::vector sparse_vars; while (true) { // Get from multiple trainers, we don't care about the order in which // the gradients arrives, just add suffix 0~n and merge the gradient. @@ -146,18 +143,12 @@ void ListenAndServOp::RunSyncLoop(framework::Executor *executor, recv_scope); VLOG(2) << "run all blocks spent " << detail::GetTimestamp() - ts << "(ms)"; - // Reset the received sparse variables, the sum operator would not - // sum the input sparse variables which rows is empty at the next - // mini-batch. - // TODO(Yancey1989): move the reset action into an operator, we couldn't - // have any hide logic in the operator. - for (framework::Variable *var : sparse_vars) { - var->GetMutable()->mutable_rows()->clear(); - } - rpc_service_->SetCond(detail::kRequestGet); rpc_service_->WaitBarrier(detail::kRequestGet); rpc_service_->ResetBarrierCounter(); + // reset received sparse vars to avoid reuse it in the next mini-batch + dynamic_cast(request_send_handler_.get()) + ->ResetSparseVarRecorder(); } // while(true) } diff --git a/paddle/fluid/operators/math/math_function_test.cc b/paddle/fluid/operators/math/math_function_test.cc index 3719a264e9..b545671b43 100644 --- a/paddle/fluid/operators/math/math_function_test.cc +++ b/paddle/fluid/operators/math/math_function_test.cc @@ -77,6 +77,8 @@ TEST(math_function, gemm_trans_clbas) { paddle::platform::CPUDeviceContext context(*cpu_place); GetBlas(context).GEMM(false, true, m, n, k, 1, input1_ptr, 3, input2_ptr + 3, 3, 1, input3_ptr + 1, 4); + delete cpu_place; + cpu_place = NULL; EXPECT_EQ(input3_ptr[0], 0); EXPECT_EQ(input3_ptr[1], 24); diff --git a/paddle/fluid/operators/random_crop_op.cc b/paddle/fluid/operators/random_crop_op.cc index b14b559e31..528a6e4a1b 100644 --- a/paddle/fluid/operators/random_crop_op.cc +++ b/paddle/fluid/operators/random_crop_op.cc @@ -20,7 +20,6 @@ class RandomCropOp : public framework::OperatorWithKernel { public: using framework::OperatorWithKernel::OperatorWithKernel; - protected: framework::OpKernelType GetExpectedKernelType( const framework::ExecutionContext& ctx) const override { return framework::OpKernelType( @@ -36,11 +35,11 @@ class RandomCropOpMaker : public framework::OpProtoAndCheckerMaker { AddInput("Seed", "The random seed."); AddOutput("Out", "The cropped instance batch."); AddOutput("SeedOut", "The random seed after random cropping.") - .AsDispensable(); + .AsIntermediate(); AddAttr>("shape", "The shape of a cropped instance."); AddComment(R"DOC( - This operator takes a batch of instance, and do random cropping on each instance. - It means that cropping positions differs on each instance, which is determined + This operator takes a batch of instance, and do random cropping on each instance. + It means that cropping positions differs on each instance, which is determined by an uniform random generator. All cropped instances have the same shape, which is determined by the operator's attribute 'shape'. )DOC"); diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 4914719279..bd985ad733 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -26,6 +26,7 @@ from trainer import BeginEpochEvent from trainer import EndEpochEvent from trainer import BeginStepEvent from trainer import EndStepEvent +from trainer import CheckpointConfig import inferencer from inferencer import Inferencer diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 9dc9038f44..bbd35aaecb 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -363,6 +363,13 @@ class OpProtoHolder(object): raise ValueError("Operator \"%s\" has not been registered." % type) return self.op_proto_map[type] + @staticmethod + def generated_op_attr_names(): + return { + core.op_proto_and_checker_maker.kOpRoleAttrName(), + core.op_proto_and_checker_maker.kOpRoleVarAttrName() + } + class Operator(object): """ diff --git a/python/paddle/fluid/inferencer.py b/python/paddle/fluid/inferencer.py index 9f242cf29a..6baac00905 100644 --- a/python/paddle/fluid/inferencer.py +++ b/python/paddle/fluid/inferencer.py @@ -56,6 +56,8 @@ class Inferencer(object): else: self.exe = executor.Executor(self.place) + self.inference_program = self.inference_program.clone(for_test=True) + def infer(self, inputs, return_numpy=True): """ :param inputs: a map of {"input_name": input_var} that will be feed into the inference program diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index 8e58e5eb79..6323c9899e 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -24,7 +24,8 @@ __all__ = [ 'save_vars', 'save_params', 'save_persistables', 'load_vars', 'load_params', 'load_persistables', 'save_inference_model', 'load_inference_model', 'get_inference_program', 'save_checkpoint', 'load_checkpoint', - 'clean_checkpoint' + 'clean_checkpoint', 'load_persist_vars_without_grad', + 'save_persist_vars_without_grad', 'get_latest_checkpoint_serial' ] @@ -457,95 +458,161 @@ def get_parameter_value_by_name(name, executor, program=None): SUCCESS_MARK_FILENAME = "_SUCCESS" CHECKPOINT_PREFIX = "checkpoint" +MODEL_DIR = "__model__" +TRAINER_PREFIX = "trainer" CHECKPOINT_SEPARATOR = "_" def save_checkpoint(executor, - checkpoint_dir=None, - max_num_checkpoints=3, - save_interval_secs=600, - main_program=None): + checkpoint_dir, + trainer_id, + trainer_args=None, + main_program=None, + max_num_checkpoints=3): """ Save Checkpoint will save persistable LodTensor variables from main_program in checkpoint directory, the directory named by serial number from 0 to (n -1), save_checkpoint use LRU strategy to keep numbers of checkpoint directory, the numbers of checkpoint directory are max_num_checkpoints at most, The interval between two saved checkpoints must greater than save_interval_secs. - :param executor - :param checkpoint_dir - :param max_num_checkpoints - :param save_interval_secs - :param main_program + :param executor executor for save the value + :param checkpoint_dir the checkpoint directory + :param trainer_id currect trainer id, if id is equal to 0, the trainer is chief + :param main_program will save all variables in program + :param max_num_checkpoints will keep numbers of checkpoint serials not bigger than max_num_checkpoints """ if checkpoint_dir is None: - checkpoint_dir = os.getcwd() + raise ValueError("'checkpoint_dir' should not be None") + + if trainer_args: + assert isinstance(trainer_args, dict) if not os.path.isdir(checkpoint_dir): os.makedirs(checkpoint_dir) - serial = _get_lastest_checkpoint_dir(checkpoint_dir) - if serial >= 0 and not _interval_secs_exceed( - _get_serial_dir(serial, checkpoint_dir), save_interval_secs): - return + serial = get_latest_checkpoint_serial(checkpoint_dir) + 1 + cur_dir = _get_serial_dir(checkpoint_dir, serial) - serial += 1 - cur_dir = _get_serial_dir(serial, checkpoint_dir) + save_trainer_args(cur_dir, trainer_id, trainer_args) - save_vars( - executor, - dirname=cur_dir, - main_program=main_program, - vars=None, - predicate=_is_checkpoint_var, - filename=None) - _write_success(cur_dir) - _lru_delete(checkpoint_dir, max_num_checkpoints) + if trainer_id == 0: + save_persist_vars_without_grad(executor, cur_dir, main_program) + + _scroll_delete(checkpoint_dir, max_num_checkpoints) -def load_checkpoint(executor, checkpoint_dir=None, main_program=None): +def load_checkpoint(executor, checkpoint_dir, serial, main_program): """ Load checkpoint from a directory by executor, it will find the most recent saved checkpoint file and load it auto. - :param executor - :param checkpoint_dir - :param main_program + :param executor executor for load the value + :param checkpoint_dir the checkpoint directory + :param serial the serial folder in checkpoint directory will be load + :param main_program will load all variables in program """ if checkpoint_dir is None: - checkpoint_dir = os.getcwd() + raise ValueError("'checkpoint_dir' should not be None") - serial = _get_lastest_checkpoint_dir(checkpoint_dir) + if serial is None or serial < 0: + raise ValueError("'serial' should not be None or <0 ") - if serial < 0: - return + if main_program is None: + raise ValueError('main_program should not be None.') - cur_dir = _get_serial_dir(serial, checkpoint_dir) - - load_vars( - executor, - dirname=cur_dir, - main_program=main_program, - predicate=_is_checkpoint_var, - filename=None) + cur_dir = _get_serial_dir(checkpoint_dir, serial) + load_persist_vars_without_grad(executor, cur_dir, main_program, True) def clean_checkpoint(checkpoint_dir, delete_dir=False): """ clean the checkpoint dir, when the train exits normally, the trainer will call clean_checkpoint to delete checkpoint directory saved before. delete_dir only works when the directory is empty, otherwise, OSError is raised. + + :param checkpoint_dir + :param delete_dir """ + if checkpoint_dir is None: - checkpoint_dir = os.getcwd() - _lru_delete(checkpoint_dir, max_num_checkpoints=0) + raise ValueError("'checkpoint_dir' should not be None") + _scroll_delete(checkpoint_dir, max_num_checkpoints=0) if delete_dir and not os.listdir(checkpoint_dir): os.rmdir(checkpoint_dir) -def _get_serial_dir(serial, checkpoint_dir): - serial_folder = CHECKPOINT_PREFIX + CHECKPOINT_SEPARATOR + str(serial) - return os.path.join(checkpoint_dir, serial_folder) +def load_persist_vars_without_grad(executor, + dirname, + program, + has_model_dir=False): + """ + load_persist_vars_without_grad will load variables from a directory by an executor, + the variable named end with "@GRAD" will not be loaded. + + :param executor executor for load the value + :param dirname the checkpoint directory + :param program will load all variables in program + :param has_model_dir if has_model_dir is True, will load variables from sub directory named __model__ + """ + + if has_model_dir: + dirname = _get_model_dir(dirname) + + load_vars( + executor, + dirname=dirname, + main_program=program, + predicate=_is_checkpoint_var, + filename=None) + + +def save_persist_vars_without_grad(executor, dirname, program): + """ + save_persist_vars_without_grad will save variables to a directory by an executor, + the variable named end with "@GRAD" will not be saved. + + :param executor executor for load the value + :param dirname the checkpoint directory + :param program will load all variables in program + """ + cur_dir = _get_model_dir(dirname) + save_vars( + executor, + dirname=cur_dir, + main_program=program, + vars=None, + predicate=_is_checkpoint_var, + filename=None) + _write_success(cur_dir) + + +def save_trainer_args(dirname, trainer_id, trainer_args): + assert isinstance(trainer_args, dict) + + cur_dir = _get_trainer_dir(dirname, trainer_id) + + for name, value in trainer_args.iteritems(): + args_file = os.path.join(cur_dir, name) + with open(args_file, 'w') as f: + f.write(str(value)) + _write_success(cur_dir) + + +def load_trainer_args(checkpoint_dir, serial, trainer_id, trainer_args): + assert isinstance(trainer_args, list) + + cur_dir = _get_serial_dir(checkpoint_dir, serial) + cur_dir = _get_trainer_dir(cur_dir, trainer_id) + + ret_values = [] + + for arg in trainer_args: + cur_file = os.path.join(cur_dir, arg) + with open(cur_file, 'r') as f: + contents = f.read() + ret_values.append(contents.strip()) + return ret_values def _is_checkpoint_var(var): @@ -559,36 +626,74 @@ def _is_checkpoint_var(var): var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ var.desc.type() == core.VarDesc.VarType.RAW: return False + # @GRAD are named for gradient variables, checkpoint will not save it. + if "@GRAD" in var.name: + return False + # .trainer_ are named for distribute train variables, checkpoint will not save it. + if ".trainer_" in var.name: + return False - if var.name.endswith("@GRAD"): + # .block is named for distribute train variables, checkpoint will not save it. + if ".block" in var.name: return False return var.persistable -def _interval_secs_exceed(dirname, save_interval_secs): - dir_time = os.path.getmtime(dirname) - if save_interval_secs > (time.time() - dir_time): - return False - return True +def _get_dir_serial(dirname): + _, serial = dirname.split(CHECKPOINT_SEPARATOR) + + try: + serial_num = int(serial) + except ValueError: + serial_num = -1 + return serial_num + + +def _get_serial_dir(dirname, serial): + serial_folder = CHECKPOINT_PREFIX + CHECKPOINT_SEPARATOR + str(serial) + serial_dir = os.path.join(dirname, serial_folder) + + if not os.path.isdir(serial_dir): + os.makedirs(serial_dir) + + return serial_dir + +def _get_model_dir(dirname): + model_dir = os.path.join(dirname, MODEL_DIR) -def _lru_delete(dirname, max_num_checkpoints=3): + if not os.path.isdir(model_dir): + os.makedirs(model_dir) + + return model_dir + + +def _get_trainer_dir(dirname, trainer_id): + trainer_folder = TRAINER_PREFIX + CHECKPOINT_SEPARATOR + str(trainer_id) + trainer_dir = os.path.join(dirname, trainer_folder) + + if not os.path.isdir(trainer_dir): + os.makedirs(trainer_dir) + + return trainer_dir + + +def _scroll_delete(dirname, max_num_checkpoints=3): dirs = os.listdir(dirname) - serials = [] + serial_map = {} for serial in dirs: - try: - serials.append(int(serial)) - except ValueError: - continue + serial_num = _get_dir_serial(serial) + serial_map[serial_num] = serial - if len(serials) <= max_num_checkpoints: + if len(serial_map.keys()) <= max_num_checkpoints: return + serials = serial_map.keys() serials.sort(reverse=True) serials = serials[max_num_checkpoints:] for serial in serials: - cur_dir = os.path.join(dirname, str(serial)) + cur_dir = _get_serial_dir(dirname, serial) shutil.rmtree(cur_dir) @@ -604,33 +709,30 @@ def _write_success(dirname): f.write(now) -def _get_lastest_checkpoint_dir(checkpoint_dir): +def get_latest_checkpoint_serial(checkpoint_dir): """ get the latest file in checkpoint directory, the _SUCCESS file must exist in the directory :param checkpoint_dir """ - if not checkpoint_dir.strip(): + if not checkpoint_dir: return -1 def has_success(checkpoint_dir, cur_dir): """ is _SUCCESS in this dir """ - _, serial = cur_dir.split(CHECKPOINT_SEPARATOR) - - try: - int(serial) - except ValueError: - return -1 - if not os.path.isdir(os.path.join(checkpoint_dir, cur_dir)): + serial = _get_dir_serial(cur_dir) + if serial == -1 or not os.path.isdir( + os.path.join(checkpoint_dir, cur_dir)): return -1 success_path = os.path.join( - _get_serial_dir(serial, checkpoint_dir), SUCCESS_MARK_FILENAME) + _get_serial_dir(checkpoint_dir, serial), MODEL_DIR, + SUCCESS_MARK_FILENAME) if os.path.isfile(success_path): - return int(serial) + return serial if not os.path.isdir(checkpoint_dir): return -1 diff --git a/python/paddle/fluid/layers/layer_function_generator.py b/python/paddle/fluid/layers/layer_function_generator.py index 295d1b7190..904413cc11 100644 --- a/python/paddle/fluid/layers/layer_function_generator.py +++ b/python/paddle/fluid/layers/layer_function_generator.py @@ -15,16 +15,13 @@ import re import cStringIO import functools import warnings +import string from ..proto import framework_pb2 from ..framework import OpProtoHolder, Variable from ..layer_helper import LayerHelper -__all__ = [ - 'deprecated', - 'generate_layer_fn', - 'autodoc', -] +__all__ = ['deprecated', 'generate_layer_fn', 'autodoc', 'templatedoc'] def _convert_(name): @@ -43,6 +40,10 @@ def _convert_(name): return re.sub('([a-z0-9])([A-Z])', r'\1_\2', s1).lower() +def _type_to_str_(tp): + return framework_pb2.AttrType.Name(tp) + + def _generate_doc_string_(op_proto): """ Generate docstring by OpProto @@ -54,9 +55,6 @@ def _generate_doc_string_(op_proto): str: the document string """ - def _type_to_str_(tp): - return framework_pb2.AttrType.Name(tp) - if not isinstance(op_proto, framework_pb2.OpProto): raise TypeError("OpProto should be `framework_pb2.OpProto`") @@ -75,7 +73,11 @@ def _generate_doc_string_(op_proto): buf.write(str(each_input.dispensable)) buf.write('\n') + skip_attrs = OpProtoHolder.generated_op_attr_names() + for each_attr in op_proto.attrs: + if each_attr.name in skip_attrs: + continue buf.write(' ') buf.write(each_attr.name) buf.write(' (') @@ -220,3 +222,49 @@ def autodoc(comment=""): return func return __impl__ + + +def templatedoc(): + """ + Decorator of layer function. It will use the docstring from the layer + function as the template. The template arguments are: + + * ${comment}: The operator comment written in CPP. + * ${{name}_comment}: The comment of ${name} written with AddAttr, AddOutput, + and AddInput. The ${name} is Python snake style. i.e., xxx_xxx. + * ${{name}_type}: The type of ${name}. + + Returns: + Decorated function. + """ + + def __impl__(func): + op_proto = OpProtoHolder.instance().get_op_proto(func.__name__) + tmpl = string.Template(func.__doc__) + + comment_lines = op_proto.comment.split("\n") + comment = "" + for line in comment_lines: + line = line.lstrip() + comment += line + comment += "\n" + + args = {"comment": comment} + for each_input in op_proto.inputs: + input_name = _convert_(each_input.name) + args["{0}_comment".format(input_name)] = each_input.comment + args["{0}_type".format(input_name)] = "Variable" + for each_attr in op_proto.attrs: + input_name = _convert_(each_attr.name) + args["{0}_comment".format(input_name)] = each_attr.comment + args["{0}_type".format(input_name)] = _type_to_str_(each_attr.type) + + for each_opt in op_proto.outputs: + output_name = _convert_(each_opt.name) + args["{0}_comment".format(output_name)] = each_opt.comment + args["{0}_type".format(output_name)] = "Variable" + + func.__doc__ = tmpl.substitute(args) + return func + + return __impl__ diff --git a/python/paddle/fluid/layers/metric.py b/python/paddle/fluid/layers/metric.py index cab2eb5551..a1c64ce277 100644 --- a/python/paddle/fluid/layers/metric.py +++ b/python/paddle/fluid/layers/metric.py @@ -64,10 +64,6 @@ def auc(input, label, curve='ROC', num_thresholds=200): topk_indices = helper.create_tmp_variable(dtype="int64") topk_out, topk_indices = nn.topk(input, k=k) auc_out = helper.create_tmp_variable(dtype="float32") - if correct is None: - correct = helper.create_tmp_variable(dtype="int64") - if total is None: - total = helper.create_tmp_variable(dtype="int64") helper.append_op( type="accuracy", inputs={ diff --git a/python/paddle/fluid/layers/nn.py b/python/paddle/fluid/layers/nn.py index 221f3ddae5..ddaeb415af 100644 --- a/python/paddle/fluid/layers/nn.py +++ b/python/paddle/fluid/layers/nn.py @@ -19,9 +19,10 @@ from ..layer_helper import LayerHelper from ..initializer import Normal, Constant from ..framework import Variable from ..param_attr import ParamAttr -from layer_function_generator import autodoc +from layer_function_generator import autodoc, templatedoc from tensor import concat import utils +import random __all__ = [ 'fc', @@ -801,7 +802,22 @@ def gru_unit(input, return updated_hidden, reset_hidden_pre, gate +@templatedoc() def linear_chain_crf(input, label, param_attr=None): + """ + Linear Chain CRF. + + ${comment} + + Args: + input(${emission_type}): ${emission_comment} + label(${label_type}): ${label_comment} + param_attr(ParamAttr): The attribute of the learnable parameter. + + Returns: + ${log_likelihood_comment} + + """ helper = LayerHelper('linear_chain_crf', **locals()) size = input.shape[1] transition = helper.create_parameter( @@ -827,7 +843,19 @@ def linear_chain_crf(input, label, param_attr=None): return log_likelihood +@templatedoc() def crf_decoding(input, param_attr, label=None): + """ + ${comment} + + Args: + input(${emission_type}): ${emission_comment} + param_attr(ParamAttr): The parameter attribute for training. + label(${label_type}): ${label_comment} + + Returns: + ${viterbi_path_comment} + """ helper = LayerHelper('crf_decoding', **locals()) transition = helper.get_parameter(param_attr.name) viterbi_path = helper.create_tmp_variable(dtype=helper.input_dtype()) @@ -4107,10 +4135,31 @@ def gather(input, index): return out -def random_crop(input, shape, seed=1): +@templatedoc() +def random_crop(x, shape, seed=None): + """ + ${comment} + + Examples: + >>> img = fluid.layers.data("img", [3, 256, 256]) + >>> cropped_img = fluid.layers.random_crop(img, shape=[3, 224, 224]) + + Args: + x(${x_type}): ${x_comment} + shape(${shape_type}): ${shape_comment} + seed(int|${seed_type}|None): ${seed_comment} By default, the seed will + get from `random.randint(-65536, 65535)`. + + Returns: + ${out_comment} + + """ helper = LayerHelper("random_crop", **locals()) dtype = helper.input_dtype() out = helper.create_tmp_variable(dtype) + if seed is None: + seed = random.randint(-65536, 65535) + if isinstance(seed, int): seed_value = seed seed = helper.create_tmp_variable(dtype="int64") diff --git a/python/paddle/fluid/layers/ops.py b/python/paddle/fluid/layers/ops.py index 69cfde852d..3260f81e9e 100644 --- a/python/paddle/fluid/layers/ops.py +++ b/python/paddle/fluid/layers/ops.py @@ -73,6 +73,7 @@ __all__ = [ 'sum', 'polygon_box_transform', 'shape', + 'maxout', ] + __activations__ for _OP in set(__all__): diff --git a/python/paddle/fluid/tests/book/high-level-api/fit_a_line/test_fit_a_line.py b/python/paddle/fluid/tests/book/high-level-api/fit_a_line/test_fit_a_line.py index b3117cf2e5..ad28c9eff5 100644 --- a/python/paddle/fluid/tests/book/high-level-api/fit_a_line/test_fit_a_line.py +++ b/python/paddle/fluid/tests/book/high-level-api/fit_a_line/test_fit_a_line.py @@ -38,7 +38,7 @@ def inference_program(): return y_predict -def linear(): +def train_program(): y = fluid.layers.data(name='y', shape=[1], dtype='float32') y_predict = inference_program() @@ -104,7 +104,7 @@ def main(use_cuda): # Directory for saving the trained model params_dirname = "fit_a_line.inference.model" - train(use_cuda, linear, params_dirname) + train(use_cuda, train_program, params_dirname) infer(use_cuda, inference_program, params_dirname) diff --git a/python/paddle/fluid/tests/unittests/test_checkpoint.py b/python/paddle/fluid/tests/unittests/test_checkpoint.py new file mode 100644 index 0000000000..e22400a045 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_checkpoint.py @@ -0,0 +1,75 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid as fluid +import unittest +import os +import tempfile + + +class TestCheckpoint(unittest.TestCase): + def setUp(self): + self.dirname = tempfile.mktemp() + self.max_num_checkpoints = 3 + self.epoch_interval = 1 + self.step_interval = 1 + self.trainer_id = 0 + self.chief = self.trainer_id == 0 + self.place = fluid.CPUPlace() + self.epoch_id = 100 + self.step_id = 20 + + def test_checkpoint(self): + self.save_checkpoint() + serial = fluid.io.get_latest_checkpoint_serial(self.dirname) + self.assertTrue(serial >= 0) + trainer_args = ["epoch_id", "step_id"] + epoch_id, step_id = fluid.io.load_trainer_args( + self.dirname, serial, self.trainer_id, trainer_args) + self.assertEqual(self.step_id, int(step_id)) + self.assertEqual(self.epoch_id, int(epoch_id)) + + program = fluid.Program() + with fluid.program_guard(program): + exe = fluid.Executor(self.place) + fluid.io.load_checkpoint(exe, self.dirname, serial, program) + + fluid.io.clean_checkpoint(self.dirname, delete_dir=True) + self.assertFalse(os.path.isdir(self.dirname)) + + def save_checkpoint(self): + config = fluid.CheckpointConfig(self.dirname, self.max_num_checkpoints, + self.epoch_interval, self.step_interval) + + trainer_args = {} + trainer_args["epoch_id"] = self.epoch_id + trainer_args["step_id"] = self.step_id + + program = fluid.Program() + with fluid.program_guard(program): + program.global_block().create_var( + name="scale_0", + psersistable=True, + dtype="float32", + shape=[32, 32]) + + exe = fluid.Executor(self.place) + for i in xrange(10): + fluid.io.save_checkpoint(exe, config.checkpoint_dir, + self.trainer_id, trainer_args, program, + config.max_num_checkpoints) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_crop_op.py b/python/paddle/fluid/tests/unittests/test_crop_op.py index 20cc3a643f..4016089c01 100644 --- a/python/paddle/fluid/tests/unittests/test_crop_op.py +++ b/python/paddle/fluid/tests/unittests/test_crop_op.py @@ -42,9 +42,9 @@ class TestCropOp(OpTest): def setUp(self): self.op_type = "crop" self.crop_by_input = False + self.offset_by_input = False self.attrs = {} self.initTestCase() - self.attrs['offsets'] = self.offsets if self.crop_by_input: self.inputs = { 'X': np.random.random(self.x_shape).astype("float32"), @@ -55,6 +55,10 @@ class TestCropOp(OpTest): self.inputs = { 'X': np.random.random(self.x_shape).astype("float32"), } + if self.offset_by_input: + self.inputs['Offsets'] = np.array(self.offsets).astype('int32') + else: + self.attrs['offsets'] = self.offsets self.outputs = { 'Out': crop(self.inputs['X'], self.offsets, self.crop_shape) } @@ -101,5 +105,22 @@ class TestCase4(TestCropOp): self.crop_by_input = True +class TestCase5(TestCropOp): + def initTestCase(self): + self.x_shape = (3, 4, 5) + self.crop_shape = [2, 2, 3] + self.offsets = [1, 0, 2] + self.offset_by_input = True + + +class TestCase6(TestCropOp): + def initTestCase(self): + self.x_shape = (10, 9, 14) + self.crop_shape = [3, 3, 5] + self.offsets = [3, 5, 4] + self.crop_by_input = True + self.offset_by_input = True + + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_dynrnn_gradient_check.py b/python/paddle/fluid/tests/unittests/test_dynrnn_gradient_check.py index 2232939075..95af51f1b2 100644 --- a/python/paddle/fluid/tests/unittests/test_dynrnn_gradient_check.py +++ b/python/paddle/fluid/tests/unittests/test_dynrnn_gradient_check.py @@ -30,9 +30,6 @@ class Memory(object): assert val.dtype == self.ex.dtype self.cur = val - def ex(self): - return self.ex - def next(self): self.ex = self.cur self.cur = None diff --git a/python/paddle/fluid/tests/unittests/test_layers.py b/python/paddle/fluid/tests/unittests/test_layers.py index 621a450fa6..8b0ebe3cf5 100644 --- a/python/paddle/fluid/tests/unittests/test_layers.py +++ b/python/paddle/fluid/tests/unittests/test_layers.py @@ -387,6 +387,14 @@ class TestBook(unittest.TestCase): self.assertIsNotNone(output) print(str(program)) + def test_maxout(self): + program = Program() + with program_guard(program): + data = layers.data(name='x', shape=[8, 6, 6], dtype="float32") + output = layers.maxout(x=data, groups=2) + self.assertIsNotNone(output) + print(str(program)) + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/fluid/trainer.py b/python/paddle/fluid/trainer.py index cdacb41986..efc28d8993 100644 --- a/python/paddle/fluid/trainer.py +++ b/python/paddle/fluid/trainer.py @@ -27,11 +27,8 @@ import parallel_executor from transpiler import distribute_transpiler __all__ = [ - 'Trainer', - 'BeginEpochEvent', - 'EndEpochEvent', - 'BeginStepEvent', - 'EndStepEvent', + 'Trainer', 'BeginEpochEvent', 'EndEpochEvent', 'BeginStepEvent', + 'EndStepEvent', 'CheckpointConfig' ] @@ -59,6 +56,35 @@ class EndStepEvent(object): self.metrics = metrics +class CheckpointConfig(object): + def __init__(self, + checkpoint_dir=None, + max_num_checkpoints=3, + epoch_interval=1, + step_interval=10): + if checkpoint_dir is None: + self.checkpoint_dir = os.getcwd() + else: + self.checkpoint_dir = checkpoint_dir + + self.max_num_checkpoints = max_num_checkpoints + + if epoch_interval < 1: + self.epoch_interval = 1 + else: + self.epoch_interval = epoch_interval + + if step_interval < 1: + self.step_interval = 10 + else: + self.step_interval = step_interval + + self.epoch_id = 0 + self.step_id = 0 + self.load_serial = None + self.is_pserver = False + + def check_and_get_place(place): """ Check the type of place or get the default place @@ -99,13 +125,24 @@ class Trainer(object): optimizer_func, param_path=None, place=None, - parallel=False): + parallel=False, + checkpoint_config=None): self.__stop = False self.parallel = parallel # 1. we need to generate a framework.Program by calling # program_func. Reference: fluid.program_guard in # test_word2vec.py + # config for checkpoint + # only chief worker will save variables + self.trainer_id = 0 + self.checkpoint_cfg = checkpoint_config + if self.checkpoint_cfg: + assert isinstance(self.checkpoint_cfg, CheckpointConfig) + serial = io.get_latest_checkpoint_serial( + self.checkpoint_cfg.checkpoint_dir) + self.checkpoint_cfg.load_serial = serial if serial >= 0 else None + self.scope = core.Scope() self.startup_program = framework.Program() @@ -115,9 +152,9 @@ class Trainer(object): program_func_outs = train_func() self.train_func_outputs = program_func_outs if isinstance( program_func_outs, list) else [program_func_outs] - self.test_program = self.train_program.clone() + self.test_program = self.train_program.clone(for_test=True) - # The fisrt element of program_func_outs is loss. + # The first element of program_func_outs is loss. loss = self.train_func_outputs[0] optimizer = optimizer_func() @@ -137,9 +174,25 @@ class Trainer(object): exe = executor.Executor(place) exe.run(self.startup_program) - if param_path: + if self.checkpoint_cfg and self.checkpoint_cfg.load_serial: + with self._prog_and_scope_guard(): + exe = executor.Executor(place) + io.load_checkpoint(exe, self.checkpoint_cfg.checkpoint_dir, + self.checkpoint_cfg.load_serial, + self.startup_program) + + if not self.checkpoint_cfg.is_pserver: + epoch_id, step_id = io.load_trainer_args( + self.checkpoint_cfg.checkpoint_dir, + self.checkpoint_cfg.load_serial, self.trainer_id, + self._get_checkpoint_load_args()) + self.checkpoint_cfg.epoch_id = int(epoch_id) + self.checkpoint_cfg.step_id = int(step_id) + + if param_path and os.path.isdir(param_path): # load params from param_path into scope - io.load_persistables(exe, dirname=param_path) + io.load_persist_vars_without_grad( + exe, dirname=param_path, program=self.startup_program) def _transpile_nccl2_dist(self): # PADDLE_TRAINER_IPS @@ -194,14 +247,18 @@ class Trainer(object): current_endpoint = os.getenv("PADDLE_CURRENT_IP", "") + ":" + port # the unique trainer id, starting from 0, needed by trainer # only - trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) + self.trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) + # the role, should be either PSERVER or TRAINER training_role = os.getenv("PADDLE_TRAINING_ROLE") with self._prog_and_scope_guard(): t = distribute_transpiler.DistributeTranspiler() t.transpile( - trainer_id, pservers=pserver_endpoints, trainers=trainers) + self.trainer_id, pservers=pserver_endpoints, trainers=trainers) if training_role == "PSERVER": + if self.checkpoint_cfg: + self.is_pserver = True + self.train_program = t.get_pserver_program(current_endpoint) self.startup_program = t.get_startup_program(current_endpoint, self.train_program) @@ -294,11 +351,26 @@ class Trainer(object): self._train_by_any_executor(event_handler, exe, num_epochs, reader) def _train_by_any_executor(self, event_handler, exe, num_epochs, reader): - for epoch_id in range(num_epochs): + if self.checkpoint_cfg: + epochs = [ + epoch_id for epoch_id in range(num_epochs) + if epoch_id >= self.checkpoint_cfg.epoch_id + ] + else: + epochs = [epoch_id for epoch_id in range(num_epochs)] + + for epoch_id in epochs: event_handler(BeginEpochEvent(epoch_id)) for step_id, data in enumerate(reader()): if self.__stop: + if self.checkpoint_cfg: + self._clean_checkpoint() return + + if self.checkpoint_cfg and self.checkpoint_cfg.load_serial \ + and self.checkpoint_cfg.step_id >= step_id and self.checkpoint_cfg.epoch_id == epoch_id: + continue + begin_event = BeginStepEvent(epoch_id, step_id) event_handler(begin_event) if begin_event.fetch_metrics: @@ -309,8 +381,13 @@ class Trainer(object): ]) else: metrics = exe.run(feed=data, fetch_list=[]) + + if self.checkpoint_cfg: + self._save_checkpoint(epoch_id, step_id) event_handler(EndStepEvent(epoch_id, step_id, metrics)) event_handler(EndEpochEvent(epoch_id)) + if self.checkpoint_cfg: + self._clean_checkpoint() def _test_by_executor(self, reader, feed_order, fetch_list): with executor.scope_guard(self.scope): @@ -349,6 +426,38 @@ class Trainer(object): loss_name=self.train_func_outputs[0].name) return self._get_parallel_executor() + def _clean_checkpoint(self): + assert self.checkpoint_cfg + io.clean_checkpoint(checkpoint_dir=self.checkpoint_cfg.checkpoint_dir) + + def _get_checkpoint_load_args(self): + """ + epoch_id and step_id are runtime arguments, they are not variables, will load them independently. + """ + return ["epoch_id", "step_id"] + + def _get_checkpoint_save_args(self, epoch_id, step_id): + """ + epoch_id and step_id are runtime arguments, they are not variables, will save them independently. + """ + trainer_args = {} + trainer_args["epoch_id"] = epoch_id + trainer_args["step_id"] = step_id + return trainer_args + + def _save_checkpoint(self, epoch_id, step_id): + assert self.checkpoint_cfg + + if epoch_id % self.checkpoint_cfg.epoch_interval == 0 and step_id % self.checkpoint_cfg.step_interval == 0: + exe = executor.Executor(self.place) + io.save_checkpoint( + executor=exe, + checkpoint_dir=self.checkpoint_cfg.checkpoint_dir, + trainer_id=self.trainer_id, + trainer_args=self._get_checkpoint_save_args(epoch_id, step_id), + main_program=self.train_program, + max_num_checkpoints=self.checkpoint_cfg.max_num_checkpoints) + def build_feed_var_list(program, feed_order): if not isinstance(program, framework.Program): diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index 27992df462..c7ab300e0f 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -177,6 +177,7 @@ class DistributeTranspiler: dtype=table_grad_var.dtype) for index in range(len(self.pserver_endpoints)) ] + return param_list, grad_list def _init_splited_vars(self, slice_var_up): # update these mappings for further transpile: @@ -199,8 +200,8 @@ class DistributeTranspiler: grad_list.append(g) param_grad_set.add(g.name) - self._update_dist_lookup_table_vars(param_list, grad_list, - self.params_grads) + param_list, grad_list = self._update_dist_lookup_table_vars( + param_list, grad_list, self.params_grads) if slice_var_up: # when we slice var up into blocks, we will slice the var according to