diff --git a/doc/design/parallel_do.md b/doc/design/parallel_do.md index 221af6b6a4..45f7731996 100644 --- a/doc/design/parallel_do.md +++ b/doc/design/parallel_do.md @@ -24,7 +24,7 @@ A vanilla implementation of parallel_do can be shown as the following (`|` means ``` In the forward pass | Split input onto different devices - | Copy parameter to onto different devices + | Copy parameter onto different devices |||| Compute forward pass in parallel | Merge output from different devices @@ -87,7 +87,7 @@ block2 { } ``` -## Proformance Imporvement +## Performance Imporvement There are serial places we can make this parallel_do faster. diff --git a/paddle/fluid/framework/block_desc.cc b/paddle/fluid/framework/block_desc.cc index 1efb775cdc..d72b64700f 100644 --- a/paddle/fluid/framework/block_desc.cc +++ b/paddle/fluid/framework/block_desc.cc @@ -44,6 +44,25 @@ bool BlockDesc::HasVar(const std::string &name) const { return vars_.find(name) != vars_.end(); } +VarDesc *BlockDesc::RenameVar(const std::string &old_name, + const std::string &new_name) { + if (!this->HasVar(old_name)) { + return nullptr; + } + need_update_ = true; + auto *var = this->Var(old_name); + VarDesc *new_var = new VarDesc(*(var->Proto())); + new_var->SetName(new_name); + vars_[new_name].reset(new_var); + // rename inputs and outputs + for (const auto &op : ops_) { + auto *it = op.get(); + it->Rename(old_name, new_name); + } + vars_.erase(old_name); + return new_var; +} + VarDesc *BlockDesc::FindVarRecursive(const std::string &name) const { if (name == kEmptyVarName) return nullptr; diff --git a/paddle/fluid/framework/block_desc.h b/paddle/fluid/framework/block_desc.h index 8345934a71..3bd90f3890 100644 --- a/paddle/fluid/framework/block_desc.h +++ b/paddle/fluid/framework/block_desc.h @@ -57,6 +57,8 @@ class BlockDesc { bool HasVar(const std::string &var_name) const; + VarDesc *RenameVar(const std::string &old_name, const std::string &new_name); + VarDesc *FindVarRecursive(const std::string &name_bytes) const; VarDesc &FindRecursiveOrCreateVar(const std::string &name_bytes); diff --git a/paddle/fluid/framework/channel.h b/paddle/fluid/framework/channel.h index 5acf4fb39b..8ca1f2aa47 100644 --- a/paddle/fluid/framework/channel.h +++ b/paddle/fluid/framework/channel.h @@ -15,6 +15,8 @@ limitations under the License. */ #pragma once #include // for size_t +#include +#include "paddle/fluid/platform/enforce.h" namespace paddle { namespace framework { @@ -51,6 +53,77 @@ void CloseChannel(Channel* ch) { ch->Close(); } +/* + * The ChannelHolder class serves two main purposes: + * 1. It acts as a unified wrapper for the different kinds of + * channels, i.e. Buffered and Unbuffered channels. This is + * similar to the ReaderHolder class. + * 2. It also helps us in TypeHiding. This is similar to the + * PlaceHolder implementations in variable.h and tensor.h. + */ +class ChannelHolder { + public: + template + void Reset(size_t buffer_size) { + holder_.reset(new PlaceholderImpl(buffer_size)); + } + + template + bool Send(T* data) { + if (!IsInitialized()) return false; + PADDLE_ENFORCE_EQ(holder_->Type(), std::type_index(typeid(T))); + // Static cast should be safe because we have ensured that types are same + Channel* channel = static_cast*>(holder_->Ptr()); + return channel != nullptr ? channel->Send(data) : false; + } + + template + bool Receive(T* data) { + if (!IsInitialized()) return false; + PADDLE_ENFORCE_EQ(holder_->Type(), std::type_index(typeid(T))); + Channel* channel = static_cast*>(holder_->Ptr()); + return channel != nullptr ? channel->Receive(data) : false; + } + + void close() { + if (IsInitialized()) holder_->Close(); + } + + inline bool IsInitialized() const { return holder_ != nullptr; } + + private: + /** + * @note Placeholder hides type T, so it doesn't appear as a template + * parameter of ChannelHolder. + */ + struct Placeholder { + virtual ~Placeholder() {} + virtual const std::type_index Type() const = 0; + virtual void* Ptr() const = 0; + virtual void Close() const = 0; + std::type_info type_; + }; + + template + struct PlaceholderImpl : public Placeholder { + PlaceholderImpl(size_t buffer_size) : type_(std::type_index(typeid(T))) { + channel_.reset(MakeChannel(buffer_size)); + } + + virtual const std::type_index Type() const { return type_; } + virtual void* Ptr() const { return static_cast(channel_.get()); } + virtual void Close() { + if (channel_) channel_->Close(); + } + + std::unique_ptr*> channel_; + const std::type_index type_; + }; + + // Pointer to a PlaceholderImpl object + std::unique_ptr holder_; +}; + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 6fd19e804a..0d2691e811 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -17,6 +17,7 @@ limitations under the License. */ #include #include "gflags/gflags.h" +#include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/feed_fetch_type.h" #include "paddle/fluid/framework/lod_rank_table.h" @@ -55,13 +56,15 @@ static void CreateTensor(Variable* var, proto::VarType::Type var_type) { var->GetMutable(); } else if (var_type == proto::VarType::READER) { var->GetMutable(); + } else if (var_type == proto::VarType::CHANNEL) { + var->GetMutable(); } else if (var_type == proto::VarType::NCCL_COM) { // GetMutable will be called in ncclInit } else { PADDLE_THROW( "Variable type %d is not in " "[LOD_TENSOR, SELECTED_ROWS, FEED_MINIBATCH, FETCH_LIST, " - "LOD_RANK_TABLE, PLACE_LIST, READER, NCCL_COM]", + "LOD_RANK_TABLE, PLACE_LIST, READER, CHANNEL, NCCL_COM]", var_type); } } diff --git a/paddle/fluid/framework/var_desc.cc b/paddle/fluid/framework/var_desc.cc index 7e3f002b53..1aa0ae0f7c 100644 --- a/paddle/fluid/framework/var_desc.cc +++ b/paddle/fluid/framework/var_desc.cc @@ -88,7 +88,13 @@ std::vector> VarDesc::GetShapes() const { } void VarDesc::SetDataType(proto::VarType::Type data_type) { - mutable_tensor_desc()->set_data_type(data_type); + switch (desc_.type().type()) { + case proto::VarType::CHANNEL: + mutable_channel_desc()->set_data_type(data_type); + break; + default: + mutable_tensor_desc()->set_data_type(data_type); + } } void VarDesc::SetDataTypes( @@ -109,7 +115,13 @@ void VarDesc::SetDataTypes( } proto::VarType::Type VarDesc::GetDataType() const { - return tensor_desc().data_type(); + switch (desc_.type().type()) { + case proto::VarType::CHANNEL: + return channel_desc().data_type(); + break; + default: + return tensor_desc().data_type(); + } } std::vector VarDesc::GetDataTypes() const { @@ -122,6 +134,17 @@ std::vector VarDesc::GetDataTypes() const { return res; } +void VarDesc::SetCapacity(int64_t capacity) { + switch (desc_.type().type()) { + case proto::VarType::CHANNEL: + desc_.mutable_type()->mutable_channel()->set_capacity(capacity); + break; + default: + PADDLE_THROW("Setting 'capacity' is not supported by the type of var %s.", + this->Name()); + } +} + void VarDesc::SetLoDLevel(int32_t lod_level) { switch (desc_.type().type()) { case proto::VarType::LOD_TENSOR: @@ -191,6 +214,19 @@ std::vector VarDesc::GetLoDLevels() const { } } +const proto::VarType::ChannelDesc &VarDesc::channel_desc() const { + PADDLE_ENFORCE(desc_.has_type(), "The var's type hasn't been set."); + PADDLE_ENFORCE(desc_.type().has_type(), "The var type hasn't been set."); + switch (desc_.type().type()) { + case proto::VarType::CHANNEL: + return desc_.type().channel(); + default: + PADDLE_THROW( + "Getting 'channel_desc' is not supported by the type of var %s.", + this->Name()); + } +} + const proto::VarType::TensorDesc &VarDesc::tensor_desc() const { PADDLE_ENFORCE(desc_.has_type(), "The var's type hasn't been set."); PADDLE_ENFORCE(desc_.type().has_type(), "The var type hasn't been set."); @@ -226,6 +262,20 @@ std::vector VarDesc::tensor_descs() const { } } +proto::VarType::ChannelDesc *VarDesc::mutable_channel_desc() { + PADDLE_ENFORCE(desc_.has_type(), "The var type hasn't been set."); + PADDLE_ENFORCE(desc_.type().has_type(), "The var type hasn't been set."); + switch (desc_.type().type()) { + case proto::VarType::CHANNEL: + return desc_.mutable_type()->mutable_channel(); + default: + PADDLE_THROW( + "Getting 'mutable_channel_desc' is not supported by the type of var " + "%s.", + this->Name()); + } +} + proto::VarType::TensorDesc *VarDesc::mutable_tensor_desc() { PADDLE_ENFORCE(desc_.has_type(), "The var type hasn't been set."); PADDLE_ENFORCE(desc_.type().has_type(), "The var type hasn't been set."); diff --git a/paddle/fluid/framework/var_desc.h b/paddle/fluid/framework/var_desc.h index 19b8d890c1..f62415fda6 100644 --- a/paddle/fluid/framework/var_desc.h +++ b/paddle/fluid/framework/var_desc.h @@ -85,6 +85,8 @@ class VarDesc { void SetDataTypes( const std::vector &multiple_data_type); + void SetCapacity(int64_t capacity); + proto::VarType::Type GetDataType() const; std::vector GetDataTypes() const; @@ -106,8 +108,10 @@ class VarDesc { void SetPersistable(bool persistable) { desc_.set_persistable(persistable); } private: + const proto::VarType::ChannelDesc &channel_desc() const; const proto::VarType::TensorDesc &tensor_desc() const; std::vector tensor_descs() const; + proto::VarType::ChannelDesc *mutable_channel_desc(); proto::VarType::TensorDesc *mutable_tensor_desc(); std::vector mutable_tensor_descs(); diff --git a/paddle/fluid/framework/var_type.h b/paddle/fluid/framework/var_type.h index 960ebff9d7..2b646d78f0 100644 --- a/paddle/fluid/framework/var_type.h +++ b/paddle/fluid/framework/var_type.h @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #pragma once +#include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/framework.pb.h" #include "paddle/fluid/framework/lod_rank_table.h" #include "paddle/fluid/framework/lod_tensor.h" @@ -34,6 +35,8 @@ inline proto::VarType::Type ToVarType(std::type_index type) { return proto::VarType_Type_SELECTED_ROWS; } else if (type.hash_code() == typeid(ReaderHolder).hash_code()) { return proto::VarType_Type_READER; + } else if (type.hash_code() == typeid(ChannelHolder).hash_code()) { + return proto::VarType_Type_CHANNEL; } else { PADDLE_THROW("ToVarType:Unsupported type %s", type.name()); } @@ -57,6 +60,9 @@ inline void VisitVarType(const framework::Variable& var, Visitor visitor) { case proto::VarType_Type_READER: visitor(var.Get()); return; + case proto::VarType_Type_CHANNEL: + visitor(var.Get()); + return; default: PADDLE_THROW("Not supported visit type, %d", ToVarType(var.Type())); } diff --git a/paddle/fluid/operators/elementwise_add_op.h b/paddle/fluid/operators/elementwise_add_op.h index 3c546bf3e4..253964562c 100644 --- a/paddle/fluid/operators/elementwise_add_op.h +++ b/paddle/fluid/operators/elementwise_add_op.h @@ -41,59 +41,8 @@ class ElementwiseAddKernel : public framework::OpKernel { }; template -struct ElementwiseAddGradFunctor { - template - void operator()(Device d, X x, Y y, Z z, dX dx, dY dy, dZ dz) { - auto dz_e = framework::EigenVector::Flatten(*dz); - if (dx) { - auto dx_e = framework::EigenVector::Flatten(*dx); - dx_e.device(d) = dz_e; - } - if (dy) { - auto dy_e = framework::EigenVector::Flatten(*dy); - dy_e.device(d) = dz_e; - } - } -}; - -template -struct ElementwiseAddBroadCastGradFunctor { - template - void operator()(Device d, X x, Y y, Z z, dX dx, dY dy, dZ dz, Pre pre, N n) { - auto dz_e = framework::EigenVector::Flatten(*dz); - if (dx) { - auto dx_e = framework::EigenVector::Flatten(*dx); - dx_e.device(d) = dz_e; - } - - if (dy) { - auto dy_e = framework::EigenVector::Flatten(*dy); - dy_e.device(d) = dz_e.reshape(Eigen::DSizes(pre, n)) - .sum(Eigen::array{{0}}); - } - } -}; - -template -struct ElementwiseAddBroadCast2GradFunctor { - template - void operator()(Device d, X x, Y y, Z z, dX dx, dY dy, dZ dz, Pre pre, N n, - Post post) { - auto dz_e = framework::EigenVector::Flatten(*dz); - if (dx) { - auto dx_e = framework::EigenVector::Flatten(*dx); - dx_e.device(d) = dz_e; - } - - if (dy) { - auto dy_e = framework::EigenVector::Flatten(*dy); - dy_e.device(d) = dz_e.reshape(Eigen::DSizes(pre, n, post)) - .sum(Eigen::array{{0, 2}}); - } - } +struct IdentityGrad { + HOSTDEVICE T operator()(T x, T y, T out, T dout) const { return dout; } }; template @@ -109,10 +58,9 @@ class ElementwiseAddGradKernel : public framework::OpKernel { auto* dx = ctx.Output(framework::GradVarName("X")); auto* dy = ctx.Output(framework::GradVarName("Y")); int axis = ctx.Attr("axis"); - ElementwiseGradCompute, - ElementwiseAddBroadCastGradFunctor, - ElementwiseAddBroadCast2GradFunctor>( - ctx, x, y, out, dout, axis, dx, dy); + ElemwiseGradCompute, IdentityGrad>( + ctx, *x, *y, *out, *dout, axis, dx, dy, IdentityGrad(), + IdentityGrad()); } }; diff --git a/paddle/fluid/operators/elementwise_op_function.h b/paddle/fluid/operators/elementwise_op_function.h index 2a4a611511..5c78303530 100644 --- a/paddle/fluid/operators/elementwise_op_function.h +++ b/paddle/fluid/operators/elementwise_op_function.h @@ -20,9 +20,11 @@ limitations under the License. */ #ifdef __NVCC__ #include +constexpr int ELEMWISE_MAX_BLOCK_DIM = 1024; #endif #include "paddle/fluid/operators/math/math_function.h" +#include "paddle/fluid/platform/for_range.h" namespace paddle { namespace operators { @@ -33,10 +35,10 @@ namespace operators { * For example: * 1. shape(X) = (2, 3, 4, 5), shape(Y) = (3, 4), with axis=1 * pre=2, n=3*4, post=5 - * x.shape(2, 12, 5) * y.shape(1,12,1).broadcast(2,12,5) + * x.shape(2, 12, 5) * y.shape(1, 12, 1).broadcast(2, 12, 5) * 2. shape(X) = (2, 3, 4, 5), shape(Y) = (4,5) * pre=2*3, n=4*5, post=1 - * x.shape(2, 3, 20) * y.shape(1,1,20).broadcast(2,3,20) + * x.shape(6, 20, 1) * y.shape(1, 20, 1).broadcast(6, 20, 1) */ inline void get_mid_dims(const framework::DDim& x_dims, const framework::DDim& y_dims, const int axis, @@ -311,6 +313,258 @@ EIGEN_FUNCTOR(Mul, EIGEN_MUL); #define EIGEN_DIV(x, y) ((x) / (y)) EIGEN_FUNCTOR(Div, EIGEN_DIV); +template +struct ElemwiseGradNoBroadcast { + const T* x_; + const T* y_; + const T* out_; + const T* dout_; + + HOSTDEVICE void operator()(size_t i) { + if (dx_ != nullptr) { + dx_[i] = dx_op_(x_[i], y_[i], out_[i], dout_[i]); + } + if (dy_ != nullptr) { + dy_[i] = dx_op_(x_[i], y_[i], out_[i], dout_[i]); + } + } + + DX_OP dx_op_; + DY_OP dy_op_; + T* dx_; + T* dy_; +}; + +template +static void ElemwiseGradBroadcast1CPU(const T* x, const T* y, const T* out, + const T* dout, int h, int w, DX_OP dx_op, + DY_OP dy_op, T* dx, T* dy) { + for (int i = 0; i < h; ++i) { + for (int j = 0; j < w; ++j) { + int x_offset = i * w + j; + if (dx != nullptr) { + dx[x_offset] = dx_op(x[x_offset], y[j], out[x_offset], dout[x_offset]); + } + if (dy != nullptr) { + T tmp = dy_op(x[x_offset], y[j], out[x_offset], dout[x_offset]); + if (i == 0) { + dy[j] = tmp; + } else { + dy[j] += tmp; + } + } + } + } +} +#ifdef __NVCC__ +template +static __global__ void ElemwiseGradBroadcast1CUDAKernel( + const T* x, const T* y, const T* out, const T* dout, int h, int w, + DX_OP dx_op, DY_OP dy_op, T* dx, T* dy) { + extern __shared__ char shm_buffer[]; + T* shm = reinterpret_cast(shm_buffer); + + int j = blockIdx.x; + int i = threadIdx.x; + int tid = threadIdx.x; + shm[tid] = 0; + + do { + int x_offset = i * w + j; + if (dx) { + dx[x_offset] = dx_op(x[x_offset], y[j], out[x_offset], dout[x_offset]); + } + if (dy) { + shm[tid] += dy_op(x[x_offset], y[j], out[x_offset], dout[x_offset]); + } + i += ELEMWISE_MAX_BLOCK_DIM; + } while (i < h); + + if (dy) { + __syncthreads(); + + h = h > ELEMWISE_MAX_BLOCK_DIM ? ELEMWISE_MAX_BLOCK_DIM : h; + + // Sum, could be optimized + if (threadIdx.x == 0) { + for (int k = 1; k < h; ++k) { + shm[0] += shm[k]; + } + dy[j] = shm[0]; + } + } +} + +template +static void ElemwiseGradBroadcast1CUDA(cudaStream_t stream, const T* x, + const T* y, const T* out, const T* dout, + int h, int w, DX_OP dx_op, DY_OP dy_op, + T* dx, T* dy) { + int block_size = std::min(ELEMWISE_MAX_BLOCK_DIM, h); + int gird_size = w; + int shared_mem_size = block_size * sizeof(T); + ElemwiseGradBroadcast1CUDAKernel<<>>(x, y, out, dout, h, w, dx_op, + dy_op, dx, dy); +} + +#endif + +template +static void ElemwiseGradBroadcast2CPU(const T* x, const T* y, const T* out, + const T* dout, int pre, int n, int post, + DX_OP dx_op, DY_OP dy_op, T* dx, T* dy) { + for (int i = 0; i < pre; ++i) { + for (int j = 0; j < n; ++j) { + for (int k = 0; k < post; ++k) { + int x_offset = i * n * post + j * post + k; + if (dx != nullptr) { + dx[x_offset] = + dx_op(x[x_offset], y[j], out[x_offset], dout[x_offset]); + } + if (dy != nullptr) { + T tmp = dy_op(x[x_offset], y[j], out[x_offset], dout[x_offset]); + if (i == 0 && k == 0) { + dy[j] = tmp; + } else { + dy[j] += tmp; + } + } + } + } + } +} + +#ifdef __NVCC__ + +template +static __global__ void ElemwiseGradBroadcast2CUDAKernel( + const T* x, const T* y, const T* out, const T* dout, int pre, int n, + int post, DX_OP dx_op, DY_OP dy_op, T* dx, T* dy) { + int tid = threadIdx.x; + int j = blockIdx.x; + + extern __shared__ char shm_buffer[]; + T* shm = reinterpret_cast(shm_buffer); + shm[tid] = 0; + int ttid = tid; + + while (true) { + int i = ttid / post; + int k = ttid % post; + if (i >= pre) break; + + int x_offset = i * n * post + j * post + k; + + if (dx != nullptr) { + dx[x_offset] = dx_op(x[x_offset], y[j], out[x_offset], dout[x_offset]); + } + + if (dy != nullptr) { + shm[tid] += dy_op(x[x_offset], y[j], out[x_offset], dout[x_offset]); + } + + ttid += ELEMWISE_MAX_BLOCK_DIM; + } + + if (dy) { + __syncthreads(); + int h = pre * post; + h = h > ELEMWISE_MAX_BLOCK_DIM ? ELEMWISE_MAX_BLOCK_DIM : h; + + // Sum, could be optimized + if (tid == 0) { + for (int i = 1; i < h; ++i) { + shm[0] += shm[i]; + } + dy[j] = shm[0]; + } + } +} + +template +static void ElemwiseGradBroadcast2CUDA(cudaStream_t stream, const T* x, + const T* y, const T* out, const T* dout, + int pre, int n, int post, DX_OP dx_op, + DY_OP dy_op, T* dx, T* dy) { + int block_size = std::min(ELEMWISE_MAX_BLOCK_DIM, pre * post); + int gird_size = n; + int shared_mem_size = block_size * sizeof(T); + ElemwiseGradBroadcast2CUDAKernel<<>>(x, y, out, dout, pre, n, post, + dx_op, dy_op, dx, dy); +} + +#endif + +template +void ElemwiseGradCompute(const framework::ExecutionContext& ctx, + const framework::Tensor& x, const framework::Tensor& y, + const framework::Tensor& out, + const framework::Tensor& dout, int axis, + framework::Tensor* dx, framework::Tensor* dy, + DX_OP dx_op, DY_OP dy_op) { + if (x.dims() == y.dims()) { + size_t N = static_cast(framework::product(x.dims())); + platform::ForRange for_range( + ctx.template device_context(), N); + for_range(ElemwiseGradNoBroadcast{ + x.data(), y.data(), out.data(), dout.data(), dx_op, dy_op, + dx == nullptr ? nullptr : dx->mutable_data(ctx.GetPlace()), + dy == nullptr ? nullptr : dy->mutable_data(ctx.GetPlace())}); + } else { // Y is a scalar + auto x_dim = x.dims(); + auto y_dim = y.dims(); + + if (y_dim.size() == 1 && y_dim[0] == 1) { + // y is a scalar + auto extended_dims = framework::vectorize(x_dim); + extended_dims.push_back(1); + x_dim = framework::make_ddim(extended_dims); + } + + axis = (axis == -1 ? x_dim.size() - y_dim.size() : axis); + int pre, n, post; + get_mid_dims(x_dim, y_dim, axis, pre, n, post); + if (post == 1) { + int h = pre; + int w = n; + if (platform::is_gpu_place(ctx.GetPlace())) { +#ifdef __NVCC__ + ElemwiseGradBroadcast1CUDA( + ctx.template device_context().stream(), x.data(), + y.data(), out.data(), dout.data(), h, w, dx_op, dy_op, + dx == nullptr ? nullptr : dx->mutable_data(ctx.GetPlace()), + dy == nullptr ? nullptr : dy->mutable_data(ctx.GetPlace())); +#endif + } else { + ElemwiseGradBroadcast1CPU( + x.data(), y.data(), out.data(), dout.data(), h, w, + dx_op, dy_op, + dx == nullptr ? nullptr : dx->mutable_data(ctx.GetPlace()), + dy == nullptr ? nullptr : dy->mutable_data(ctx.GetPlace())); + } + } else { + if (platform::is_gpu_place(ctx.GetPlace())) { +#ifdef __NVCC__ + ElemwiseGradBroadcast2CUDA( + ctx.template device_context().stream(), x.data(), + y.data(), out.data(), dout.data(), pre, n, post, dx_op, + dy_op, + dx == nullptr ? nullptr : dx->mutable_data(ctx.GetPlace()), + dy == nullptr ? nullptr : dy->mutable_data(ctx.GetPlace())); +#endif + } else { + ElemwiseGradBroadcast2CPU( + x.data(), y.data(), out.data(), dout.data(), pre, n, + post, dx_op, dy_op, + dx == nullptr ? nullptr : dx->mutable_data(ctx.GetPlace()), + dy == nullptr ? nullptr : dy->mutable_data(ctx.GetPlace())); + } + } + } +}; + template void ElementwiseGradCompute(const framework::ExecutionContext& ctx, diff --git a/paddle/fluid/operators/listen_and_serv_op.cc b/paddle/fluid/operators/listen_and_serv_op.cc index 6c0292ecb2..ee0e3533ce 100644 --- a/paddle/fluid/operators/listen_and_serv_op.cc +++ b/paddle/fluid/operators/listen_and_serv_op.cc @@ -75,13 +75,6 @@ class ListenAndServOp : public framework::OperatorBase { server_thread_->join(); } - std::string GetGradVarNameForTrainer(const std::string &varname) const { - if (grads_counter_.find(varname) == grads_counter_.end()) { - grads_counter_[varname] = 0; - } - return string::Sprintf("%s.trainer_%d", varname, grads_counter_[varname]++); - } - void RunImpl(const framework::Scope &scope, const platform::Place &dev_place) const override { platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); @@ -91,8 +84,7 @@ class ListenAndServOp : public framework::OperatorBase { // FIXME(Yancey1989): initialize rpc server with lazy mode. rpc_service_->SetScope(&recv_scope); rpc_service_->SetDevCtx(&dev_ctx); - auto param_list = Attr>("ParamList"); - auto grad_list = Attr>("GradList"); + auto ins = Inputs("X"); auto fan_in = Attr("Fanin"); auto *block = Attr(kOptimizeBlock); @@ -109,40 +101,24 @@ class ListenAndServOp : public framework::OperatorBase { // the gradients arrives, just add suffix 0~n and merge the gradient. rpc_service_->SetCond(0); size_t recv_var_cnt = 0; - size_t update_param_cnt = 0; int batch_barrier = 0; while (batch_barrier != fan_in) { const detail::MessageWithName &v = rpc_service_->Get(); - auto grad_var_name = v.first; - if (grad_var_name == LISTEN_TERMINATE_MESSAGE) { + auto recv_var_name = v.first; + if (recv_var_name == LISTEN_TERMINATE_MESSAGE) { LOG(INFO) << "received terminate message and exit"; exit_flag = true; break; - } else if (grad_var_name == BATCH_BARRIER_MESSAGE) { + } else if (recv_var_name == BATCH_BARRIER_MESSAGE) { VLOG(3) << "recv batch barrier message"; batch_barrier++; continue; } else { - // receive a variable + VLOG(3) << "received grad: " << recv_var_name; recv_var_cnt++; - auto it = - std::find(grad_list.begin(), grad_list.end(), grad_var_name); - std::string param_var_name; - if (it != grad_list.end()) { - param_var_name = param_list[it - grad_list.begin()]; - update_param_cnt++; - VLOG(3) << "received grad: " << grad_var_name - << " updating param: " << param_var_name; - } else { - VLOG(3) << "received variable: " << grad_var_name - << " no need to update param"; - } - if (fan_in > 1 && !param_var_name.empty()) { - grad_var_name = this->GetGradVarNameForTrainer(grad_var_name); - } - auto *var = recv_scope.FindVar(grad_var_name); + auto *var = recv_scope.FindVar(recv_var_name); if (var == nullptr) { - LOG(ERROR) << "Can not find server side var: " << grad_var_name; + LOG(ERROR) << "Can not find server side var: " << recv_var_name; PADDLE_THROW("Can not find server side var"); } detail::DeserializeFromMessage(v.second, dev_ctx, var); @@ -151,29 +127,26 @@ class ListenAndServOp : public framework::OperatorBase { } } } - VLOG(3) << "recv " << recv_var_cnt << " parmeters for one barrier."; if (exit_flag) { rpc_service_->ShutDown(); } - VLOG(3) << "run optimize graph..."; try { executor.Run(*program, &recv_scope, block->ID(), /*global_block*/ false /*create_local_scope*/, false /*create_vars*/); } catch (std::exception &e) { LOG(ERROR) << "run sub program error " << e.what(); } - // Reset the received sparse variables, the sum operator would not // sum the input sparse variables which rows is empty at the next // mini-batch. - // TOOD(Yancey1989): move the reset action into an operator, we couldn't + // TODO(Yancey1989): move the reset action into an operator, we couldn't // have any hide logic in the operator. for (auto &var : sparse_vars) { var->GetMutable()->mutable_rows()->clear(); } rpc_service_->SetCond(1); - rpc_service_->WaitClientGet(update_param_cnt); - grads_counter_.clear(); + // FIXME(typhoonzero): use another condition to sync wait clients get. + rpc_service_->WaitClientGet(ins.size()); sparse_vars.clear(); } // while(true) } @@ -181,13 +154,13 @@ class ListenAndServOp : public framework::OperatorBase { protected: std::shared_ptr rpc_service_; std::shared_ptr server_thread_; - mutable std::unordered_map grads_counter_; }; class ListenAndServOpMaker : public framework::OpProtoAndCheckerMaker { public: ListenAndServOpMaker(OpProto *proto, OpAttrChecker *op_checker) : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput("X", "(Tensor) Variables that server recv.").AsDuplicable(); AddComment(R"DOC( ListenAndServ operator @@ -201,16 +174,7 @@ from send_op and send back variables to recv_op. .AddCustomChecker([](const std::string &ip) { return !ip.empty(); }); AddAttr(kOptimizeBlock, "BlockID to run on server side."); - AddAttr>( - "ParamList", "type list of string", - "grad->param name mapping to find which parameters to optimize.") - .SetDefault({}); - AddAttr>( - "GradList", "type list of string", - "grad->param name mapping to find which parameters to optimize.") - .SetDefault({}); - AddAttr("Fanin", "type int", - "Number of trainers in the current cluster job") + AddAttr("Fanin", "How many clients send to this server.") .SetDefault(1); } }; diff --git a/paddle/fluid/platform/float16.h b/paddle/fluid/platform/float16.h index cf6a4b09db..5832bd9ce3 100644 --- a/paddle/fluid/platform/float16.h +++ b/paddle/fluid/platform/float16.h @@ -74,17 +74,12 @@ struct PADDLE_ALIGN(2) float16 { // The following defaulted special class member functions // are added to make float16 pass the std::is_trivial test - HOSTDEVICE inline float16() = default; - - HOSTDEVICE inline float16(const float16&) = default; - - HOSTDEVICE inline float16& operator=(const float16&) = default; - - HOSTDEVICE inline float16(float16&&) = default; - - HOSTDEVICE inline float16& operator=(float16&&) = default; - - HOSTDEVICE inline ~float16() = default; + float16() = default; + float16(const float16& o) = default; + float16& operator=(const float16& o) = default; + float16(float16&& o) = default; + float16& operator=(float16&& o) = default; + ~float16() = default; // Constructors #ifdef PADDLE_CUDA_FP16 diff --git a/paddle/fluid/pybind/protobuf.cc b/paddle/fluid/pybind/protobuf.cc index 01dc53de78..b725be7952 100644 --- a/paddle/fluid/pybind/protobuf.cc +++ b/paddle/fluid/pybind/protobuf.cc @@ -172,6 +172,14 @@ void BindBlockDesc(py::module &m) { [](BlockDesc &self, py::bytes byte_name) { std::string name = byte_name; return self.HasVar(name); + }, + py::return_value_policy::reference) + .def("rename_var", + [](BlockDesc &self, const py::bytes &byte_name, + const py::bytes &byte_name_new) { + std::string name = byte_name; + std::string new_name = byte_name_new; + self.RenameVar(name, new_name); }) .def("has_var_recursive", [](BlockDesc &self, py::bytes byte_name) { @@ -200,7 +208,7 @@ void BindVarDsec(py::module &m) { py::class_ var_desc(m, "VarDesc", ""); var_desc .def("name", - [](const VarDesc &self) { + [](VarDesc &self) { py::bytes name = self.Name(); return name; }, @@ -210,6 +218,7 @@ void BindVarDsec(py::module &m) { .def("set_shapes", &VarDesc::SetShapes) .def("set_dtype", &VarDesc::SetDataType) .def("set_dtypes", &VarDesc::SetDataTypes) + .def("set_capacity", &VarDesc::SetCapacity) .def("shape", &VarDesc::GetShape, py::return_value_policy::reference) .def("shapes", &VarDesc::GetShapes, py::return_value_policy::reference) .def("dtype", &VarDesc::GetDataType, py::return_value_policy::reference) @@ -240,6 +249,7 @@ void BindVarDsec(py::module &m) { .value("STEP_SCOPES", proto::VarType::STEP_SCOPES) .value("LOD_RANK_TABLE", proto::VarType::LOD_RANK_TABLE) .value("LOD_TENSOR_ARRAY", proto::VarType::LOD_TENSOR_ARRAY) + .value("CHANNEL", proto::VarType::CHANNEL) .value("PLACE_LIST", proto::VarType::PLACE_LIST) .value("READER", proto::VarType::READER) .value("NCCL_COM", proto::VarType::NCCL_COM); diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 56c1a935d9..abe2b11449 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -17,6 +17,7 @@ limitations under the License. */ #include // for call_once #include #include "paddle/fluid/framework/backward.h" +#include "paddle/fluid/framework/channel.h" #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/feed_fetch_method.h" #include "paddle/fluid/framework/framework.pb.h" diff --git a/python/paddle/trainer_config_helpers/layer_math.py b/python/paddle/trainer_config_helpers/layer_math.py index e1c8f0c350..ee84188bac 100644 --- a/python/paddle/trainer_config_helpers/layer_math.py +++ b/python/paddle/trainer_config_helpers/layer_math.py @@ -75,7 +75,7 @@ LayerOutput.__add__ = add def sub(layeroutput, other): if is_compatible_with(other, float): - return slope_intercept_layer(input=layeroutput, intercept=other) + return slope_intercept_layer(input=layeroutput, intercept=-other) if not isinstance(other, LayerOutput): logger.fatal("LayerOutput can only be subtracted with" " another Layeroutput or a number") diff --git a/python/paddle/trainer_config_helpers/tests/configs/protostr/math_ops.protostr b/python/paddle/trainer_config_helpers/tests/configs/protostr/math_ops.protostr index eaaf7fd6f5..582207741a 100644 --- a/python/paddle/trainer_config_helpers/tests/configs/protostr/math_ops.protostr +++ b/python/paddle/trainer_config_helpers/tests/configs/protostr/math_ops.protostr @@ -230,7 +230,7 @@ layers { input_layer_name: "__mixed_1__" } slope: 1.0 - intercept: 2 + intercept: -2 } layers { name: "__slope_intercept_layer_4__" @@ -411,4 +411,3 @@ sub_models { output_layer_names: "__mixed_3__" is_recurrent_layer_group: false } - diff --git a/python/paddle/v2/dataset/common.py b/python/paddle/v2/dataset/common.py index 9aba35a648..c6ff09a1d1 100644 --- a/python/paddle/v2/dataset/common.py +++ b/python/paddle/v2/dataset/common.py @@ -74,6 +74,8 @@ def download(url, module_name, md5sum, save_name=None): retry = 0 retry_limit = 3 while not (os.path.exists(filename) and md5file(filename) == md5sum): + if os.path.exists(filename): + print "file md5", md5file(filename), md5sum if retry < retry_limit: retry += 1 else: diff --git a/python/paddle/v2/fluid/distribute_transpiler.py b/python/paddle/v2/fluid/distribute_transpiler.py index 03a7478ae8..2fcf3753c5 100644 --- a/python/paddle/v2/fluid/distribute_transpiler.py +++ b/python/paddle/v2/fluid/distribute_transpiler.py @@ -14,7 +14,7 @@ from __future__ import print_function import framework -from framework import Program, default_main_program, Parameter, Variable +from framework import Program, default_main_program, default_startup_program, Parameter, Variable import optimizer from layer_helper import LayerHelper from distributed_spliter import * @@ -133,6 +133,7 @@ class DistributeTranspiler: def transpile(self, optimize_ops, params_grads, + trainer_id, program=None, pservers="127.0.0.1:6174", trainers=1, @@ -146,13 +147,37 @@ class DistributeTranspiler: Use different methods to split trainable variables to different parameter servers. + Steps to transpile trainer: + 1. split variable to multiple blocks, aligned by product(dim[1:]) (width). + 2. rename splited grad variables to add trainer_id suffix ".trainer_%d". + 3. modify trainer program add split_op to each grad variable. + 4. append send_op to send splited variables to server and fetch + params(splited blocks or origin param) from server. + 5. append concat_op to merge splited blocks to update local weights. + + Steps to transpile pserver: + 1. create new program for parameter server. + 2. create params and grad variables that assigned to current server instance. + 3. create a sub-block in the server side program + 4. append ops that should run on current server instance. + 5. add listen_and_serv op + :param optimize_ops: op list of optimization, should be the - return value of Optimizer.minimize + return value of Optimizer.minimize :type optimize_ops: list - :param program: program to optimize, default is default_main_program + :param params_grads: list of tuple(weight, gradient) + :type params_grads: list + :param trainer_id: one unique id for each trainer in a job. + :type trainer_id: int + :param program: program to transpile, default is default_main_program + :type program: Program :param pservers: parameter server endpoints like "m1:6174,m2:6174" :type pservers: string - :return: return a list of programs + :param trainers: total number of workers/trainers in the job + :type trainers: int + :param split_method: A function to determin how to split variables + to different servers equally. + :type split_method: function """ assert (callable(split_method)) if program is None: @@ -160,25 +185,19 @@ class DistributeTranspiler: self.program = program self.trainers = trainers self.optimize_ops = optimize_ops - # steps to transpile: - # 1. split variable to multiple blocks, aligned by product(dim[1:]) (width). - # 2. modify trainer program add split_op to each Grad. - # 3. append send_op to trainer. - # 4. append concat_op to trainer to update local weights. - # 5. create new program for parameter server. - # 6. create parameter server program by split_method generated endpoint->VarBlock - + # TODO(typhoonzero): currently trainer_id is fetched from cluster system + # like Kubernetes, we should port this to use etcd later when developing + # fluid distributed training with fault-tolerance. + self.trainer_id = trainer_id pserver_endpoints = pservers.split(",") # step1 param_list = [pg[0] for pg in params_grads] grad_list = [pg[1] for pg in params_grads] - # TODO: add split selected rows support grad_blocks = split_dense_variable(grad_list, len(pserver_endpoints)) param_blocks = split_dense_variable(param_list, len(pserver_endpoints)) # step2 grad_var_mapping = self._append_split_op(program, grad_blocks) - # step3 send_inputs = [] send_outputs = [] @@ -211,7 +230,7 @@ class DistributeTranspiler: shape=[0]) # create send_op - send_op = program.global_block().append_op( + program.global_block().append_op( type="send", inputs={"X": send_inputs}, outputs={"Out": send_outputs, @@ -223,14 +242,158 @@ class DistributeTranspiler: if len(splited_var) <= 1: continue orig_param = program.global_block().vars[varname] - concat = program.global_block().append_op( + program.global_block().append_op( type="concat", inputs={"X": splited_var}, outputs={"Out": [orig_param]}, attrs={"axis": 0}) - def _create_vars_from_blocklist(self, program, block_list): - # Create respective variables using the block_list + def get_trainer_program(self): + # remove optimize ops and add a send op to main_program + self.program.global_block().delete_ops(self.optimize_ops) + return self.program + + def get_pserver_program(self, endpoint): + """ + Get pserver side program using the endpoint. + NOTE: assume blocks of the same variable is not distributed + on the same pserver, only change param/grad varnames for + trainers to fetch. + """ + # step1 + pserver_program = Program() + # step2 + recv_inputs = [] + for v in self.param_grad_ep_mapping[endpoint]["params"]: + self._clone_var(pserver_program.global_block(), v) + for v in self.param_grad_ep_mapping[endpoint]["grads"]: + # create vars for each trainer in global scope, so + # we don't need to create them when grad arrives. + # change client side var name to origin name by + # removing ".trainer_%d" suffix + suff_idx = v.name.find(".trainer_") + if suff_idx >= 0: + orig_var_name = v.name[:suff_idx] + pserver_program.global_block().create_var( + name=orig_var_name, + persistable=True, + dtype=v.dtype, + shape=v.shape) + print("create origin var: ", orig_var_name) + for trainer_id in xrange(self.trainers): + var = pserver_program.global_block().create_var( + name="%s.trainer_%d" % (orig_var_name, trainer_id), + persistable=False, + dtype=v.dtype, + shape=v.shape) + recv_inputs.append(var) + print("create per trainer var: ", var.name) + # step3 + optimize_block = pserver_program.create_block(0) + # step 4 + # Create a union-find data struct from optimize ops, + # If two ops are connected, we could add these two ops + # into one set. + ufind = self._create_ufind(self.optimize_ops) + # step 4.2 + # Iterate through the ops and append optimize op which + # located on current pserver + opt_op_on_pserver = [] + for _, op in enumerate(self.optimize_ops): + if self._is_opt_op(op) and self._is_opt_op_on_pserver(endpoint, op): + opt_op_on_pserver.append(op) + # step 4.3 + # Iterate through the ops, and if an op and the optimize ops + # which located on current pserver are in one set, then + # append it into the sub program. + for _, op in enumerate(self.optimize_ops): + for _, opt_op in enumerate(opt_op_on_pserver): + if ufind.is_connected(op, opt_op): + if self._is_opt_op(op): + self._append_pserver_ops(optimize_block, op, endpoint) + else: + self._append_pserver_non_opt_ops(optimize_block, op) + break + # step5 append the listen_and_serv op + pserver_program.global_block().append_op( + type="listen_and_serv", + inputs={'X': recv_inputs}, + outputs={}, + attrs={ + "OptimizeBlock": optimize_block, + "endpoint": endpoint, + "Fanin": self.trainers + }) + pserver_program.sync_with_cpp() + return pserver_program + + def get_startup_program(self, endpoint, pserver_program): + """ + Get startup program for current parameter server. + Modify operator input variables if there are variables that + were split to several blocks. + """ + s_prog = Program() + orig_s_prog = framework.default_startup_program() + params = self.param_grad_ep_mapping[endpoint]["params"] + + def _get_splited_name_and_shape(varname): + for idx, splited_param in enumerate(params): + pname = splited_param.name + if same_or_split_var(pname, varname) and varname != pname: + return pname, splited_param.shape + return "", [] + + # 1. create vars in pserver program to startup program + pserver_vars = pserver_program.global_block().vars + created_var_map = dict() + for _, var in pserver_vars.iteritems(): + tmpvar = s_prog.global_block().create_var( + name=var.name, + persistable=var.persistable, + dtype=var.dtype, + shape=var.shape) + created_var_map[var.name] = tmpvar + + # 2. rename op outputs + for op in orig_s_prog.global_block().ops: + new_inputs = dict() + new_outputs = dict() + # do not append startup op if var is not on this pserver + op_on_pserver = False + for key in op.output_names: + newname, _ = _get_splited_name_and_shape(op.output(key)[0]) + if newname: + op_on_pserver = True + new_outputs[key] = created_var_map[newname] + elif op.output(key)[0] in pserver_vars: + op_on_pserver = True + new_outputs[key] = pserver_vars[op.output(key)[0]] + + # most startup program ops have no inputs + new_inputs = self._get_input_map_from_op(pserver_vars, op) + + if op_on_pserver: + if op.type in [ + "gaussian_random", "fill_constant", "uniform_random" + ]: + op.attrs["shape"] = new_outputs["Out"].shape + s_prog.global_block().append_op( + type=op.type, + inputs=new_inputs, + outputs=new_outputs, + attrs=op.attrs) + return s_prog + + # ====================== private transpiler functions ===================== + def _create_vars_from_blocklist(self, + program, + block_list, + add_trainer_suffix=False): + """ + NOTE: only grads need to be named for different trainers, use + add_trainer_suffix to rename the grad vars. + """ block_map = dict() var_mapping = dict() for block_str in block_list: @@ -239,11 +402,20 @@ class DistributeTranspiler: block_map[varname] = [] block_map[varname].append((long(offset), long(size))) for varname, splited in block_map.iteritems(): - orig_var = program.global_block().vars[varname] - var_mapping[varname] = [] + orig_var = program.global_block().var(varname) if len(splited) == 1: - var_mapping[varname] = [orig_var] + if add_trainer_suffix: + new_var_name = "%s.trainer_%d" % \ + (orig_var.name, self.trainer_id) + program.global_block().rename_var(varname, new_var_name) + var_mapping[varname] = \ + [program.global_block().var(new_var_name)] + else: + var_mapping[varname] = \ + [program.global_block().var(orig_var.name)] continue + + var_mapping[varname] = [] orig_shape = orig_var.shape orig_dim1_flatten = 1 if len(orig_shape) >= 2: @@ -255,13 +427,21 @@ class DistributeTranspiler: splited_shape = [rows] if len(orig_shape) >= 2: splited_shape.extend(orig_shape[1:]) + new_var_name = "" + if add_trainer_suffix: + new_var_name = "%s.block%d.trainer_%d" % \ + (varname, i, self.trainer_id) + else: + new_var_name = "%s.block%d" % \ + (varname, i) var = program.global_block().create_var( - name="%s.block%d" % (varname, i), + name=new_var_name, persistable=False, dtype=orig_var.dtype, type=orig_var.type, shape=splited_shape) # flattend splited var var_mapping[varname].append(var) + program.global_block().sync_with_cpp() return var_mapping def _clone_var(self, block, var): @@ -272,13 +452,12 @@ class DistributeTranspiler: dtype=var.dtype, type=var.type, lod_level=var.lod_level, - # HACK: let all param in pserver be persistable so the child - # program in recv can get them persistable=True) def _append_split_op(self, program, gradblocks): # Split variables that need to be split and append respective ops - var_mapping = self._create_vars_from_blocklist(program, gradblocks) + var_mapping = self._create_vars_from_blocklist( + program, gradblocks, add_trainer_suffix=True) for varname, splited_vars in var_mapping.iteritems(): # variable that don't need to split have empty splited_vars if len(splited_vars) <= 1: @@ -308,24 +487,6 @@ class DistributeTranspiler: "[LOD_TENSOR, SELECTED_ROWS]") return var_mapping - def get_trainer_program(self): - # remove optimize ops and add a send op to main_program - self.program.global_block().delete_ops(self.optimize_ops) - return self.program - - def _create_var_for_trainers(self, block, var, trainers): - # For each trainer, create the necessary variables - var_list = [] - for i in xrange(trainers): - var_each = block.create_var( - name="%s.trainer_%d" % (var.name, i), - persistable=var.persistable, - dtype=var.dtype, - type=var.type, - shape=var.shape) - var_list.append(var_each) - return var_list - def _get_optimizer_input_shape(self, op_type, varkey, orig_shape, param_shape): """ @@ -353,6 +514,13 @@ class DistributeTranspiler: pass return orig_shape + def _orig_varname(self, varname): + suff_idx = varname.find(".trainer_") + orig_var_name = "" + if suff_idx >= 0: + orig_var_name = varname[:suff_idx] + return orig_var_name + def _append_pserver_ops(self, optimize_block, opt_op, endpoint): program = optimize_block.program pserver_block = program.global_block() @@ -363,18 +531,23 @@ class DistributeTranspiler: if key == "Grad": grad_block = None for g in self.param_grad_ep_mapping[endpoint]["grads"]: - if same_or_split_var(g.name, opt_op.input(key)[0]): + if same_or_split_var( + self._orig_varname(g.name), opt_op.input(key)[0]): grad_block = g break if not grad_block: # do not append this op if current endpoint # is not dealing with this grad block return - merged_var = pserver_block.vars[grad_block.name] - # append merging ops if trainers > 1 + merged_var = \ + pserver_block.vars[self._orig_varname(grad_block.name)] if self.trainers > 1: - vars2merge = self._create_var_for_trainers( - pserver_block, grad_block, self.trainers) + vars2merge = [] + for i in xrange(self.trainers): + per_trainer_name = "%s.trainer_%d" % \ + (self._orig_varname(grad_block.name), i) + vars2merge.append(pserver_block.vars[per_trainer_name]) + optimize_block.append_op( type="sum", inputs={"X": vars2merge}, @@ -517,77 +690,6 @@ class DistributeTranspiler: return False return False - def get_pserver_program(self, endpoint): - """ - Get pserver side program using the endpoint - - NOTE: assume blocks of the same variable is not distributed - on the same pserver, only change param/grad varnames for - trainers to fetch. For each pserver endpoint, server side - program must be a sub-set of the original optimization program. - """ - # step5 - pserver_program = Program() - for v in self.param_grad_ep_mapping[endpoint]["params"]: - self._clone_var(pserver_program.global_block(), v) - for v in self.param_grad_ep_mapping[endpoint]["grads"]: - # create vars for each trainer in global scope, so - # we don't need to create them when grad arrives. - pserver_program.global_block().create_var( - name=v.name, persistable=True, dtype=v.dtype, shape=v.shape) - for trainer_id in xrange(self.trainers): - pserver_program.global_block().create_var( - name="%s.trainer_%d" % (v.name, trainer_id), - persistable=True, - dtype=v.dtype, - shape=v.shape) - # step6 - optimize_block = pserver_program.create_block(0) - # step 6.1 - # Create a union-find data struct by optimize ops, - # If two ops are connected, we could add these two ops - # into one set. - ufind = self._create_ufind(self.optimize_ops) - # step 6.2 - # Iterate through the ops and append optimize op which - # located on current pserver - opt_op_on_pserver = [] - for _, op in enumerate(self.optimize_ops): - if self._is_opt_op(op) and self._is_opt_op_on_pserver(endpoint, op): - opt_op_on_pserver.append(op) - # step 6.3 - # Iterate through the ops, and if an op and the optimize ops - # which located on current pserver are in one set, then - # append it into the sub program. - for _, op in enumerate(self.optimize_ops): - for _, opt_op in enumerate(opt_op_on_pserver): - if ufind.is_connected(op, opt_op): - if self._is_opt_op(op): - self._append_pserver_ops(optimize_block, op, endpoint) - else: - self._append_pserver_non_opt_ops(optimize_block, op) - break - # Append the listen_and_serv op - pserver_program.global_block().append_op( - type="listen_and_serv", - inputs={}, - outputs={}, - attrs={ - "OptimizeBlock": optimize_block, - "endpoint": endpoint, - "ParamList": [ - p.name - for p in self.param_grad_ep_mapping[endpoint]["params"] - ], - "GradList": [ - p.name - for p in self.param_grad_ep_mapping[endpoint]["grads"] - ], - "Fanin": self.trainers - }) - pserver_program.sync_with_cpp() - return pserver_program - def _get_input_map_from_op(self, varmap, op): iomap = dict() for key in op.input_names: @@ -611,61 +713,3 @@ class DistributeTranspiler: else: iomap[key] = vars return iomap - - def get_startup_program(self, endpoint, pserver_program): - """ - Get startup program for current parameter server. - Modify operator input variables if there are variables that - were split to several blocks. - """ - s_prog = Program() - orig_s_prog = framework.default_startup_program() - params = self.param_grad_ep_mapping[endpoint]["params"] - - def _get_splited_name_and_shape(varname): - for idx, splited_param in enumerate(params): - pname = splited_param.name - if same_or_split_var(pname, varname) and varname != pname: - return pname, splited_param.shape - return "", [] - - # 1. create vars in pserver program to startup program - pserver_vars = pserver_program.global_block().vars - created_var_map = dict() - for _, var in pserver_vars.iteritems(): - tmpvar = s_prog.global_block().create_var( - name=var.name, - persistable=var.persistable, - dtype=var.dtype, - shape=var.shape) - created_var_map[var.name] = tmpvar - - # 2. rename op outputs - for op in orig_s_prog.global_block().ops: - new_inputs = dict() - new_outputs = dict() - # do not append startup op if var is not on this pserver - op_on_pserver = False - for key in op.output_names: - newname, _ = _get_splited_name_and_shape(op.output(key)[0]) - if newname: - op_on_pserver = True - new_outputs[key] = created_var_map[newname] - elif op.output(key)[0] in pserver_vars: - op_on_pserver = True - new_outputs[key] = pserver_vars[op.output(key)[0]] - - # most startup program ops have no inputs - new_inputs = self._get_input_map_from_op(pserver_vars, op) - - if op_on_pserver: - if op.type in [ - "gaussian_random", "fill_constant", "uniform_random" - ]: - op.attrs["shape"] = new_outputs["Out"].shape - s_prog.global_block().append_op( - type=op.type, - inputs=new_inputs, - outputs=new_outputs, - attrs=op.attrs) - return s_prog diff --git a/python/paddle/v2/fluid/framework.py b/python/paddle/v2/fluid/framework.py index 3ec8d97814..78318dc6d6 100644 --- a/python/paddle/v2/fluid/framework.py +++ b/python/paddle/v2/fluid/framework.py @@ -286,6 +286,10 @@ class Variable(object): def name(self): return self.desc.name() + @name.setter + def name(self, new_name): + self.desc.set_name(new_name) + @property def shape(self): # convert to tuple, make it as same as numpy API. @@ -531,6 +535,12 @@ class Operator(object): """ return self.desc.input(name) + def rename_input(self, old_name, new_name): + self.desc.rename_input(old_name, new_name) + + def rename_output(self, old_name, new_name): + self.desc.rename_output(old_name, new_name) + @property def input_names(self): """ @@ -540,6 +550,14 @@ class Operator(object): """ return self.desc.input_names() + @property + def input_arg_names(self): + return self.desc.input_arg_names() + + @property + def output_arg_names(self): + return self.desc.output_arg_names() + def output(self, name): """ Get output arguments by the output parameter name @@ -741,6 +759,60 @@ class Block(object): def has_var(self, name): return name in self.vars + def rename_var(self, name, new_name): + """ + Rename variable in vars and ops' inputs and outputs + """ + if not self.has_var(name): + raise ValueError("var %s is not in current" % name) + v = self.var(name) + stop_gradient = None + trainable = None + optimize_attr = None + regularizer = None + gradient_clip_attr = None + error_clip = None + if type(v) == Parameter: + stop_gradient = v.stop_gradient + trainable = v.trainable + optimize_attr = v.optimize_attr + regularizer = v.regularizer + gradient_clip_attr = v.gradient_clip_attr + error_clip = v.error_clip + elif type(v) == Variable: + error_clip = v.error_clip + stop_gradient = v.stop_gradient + else: + raise ValueError("unsupported var type: %s", type(v)) + + self.desc.rename_var(name, new_name) + d = self.desc.find_var(new_name) + var = None + if type(v) == Parameter: + var = Parameter( + self, + d.shape(), + d.dtype(), + name=new_name, + stop_gradient=stop_gradient, + trainable=trainable, + optimize_attr=optimize_attr, + regularizer=regularizer, + gradient_clip_attr=gradient_clip_attr, + error_clip=error_clip) + elif type(v) == Variable: + var = Variable( + self, + name=new_name, + error_clip=error_clip, + stop_gradient=stop_gradient) + + # rename the python side, sync_with_cpp will only add + # new vars/ops to python side. + self.vars[new_name] = var + del self.vars[name] + self.sync_with_cpp() + def create_parameter(self, *args, **kwargs): global_block = self.program.global_block() param = Parameter(global_block, *args, **kwargs) diff --git a/python/paddle/v2/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py b/python/paddle/v2/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py index 07815059c4..1c1fffc589 100644 --- a/python/paddle/v2/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py +++ b/python/paddle/v2/fluid/tests/book_distribute/notest_recognize_digits_conv_dist.py @@ -58,14 +58,19 @@ trainers = int(os.getenv("TRAINERS")) # total trainer count current_endpoint = os.getenv("SERVER_ENDPOINT") # current pserver endpoint training_role = os.getenv("TRAINING_ROLE", "TRAINER") # get the training role: trainer/pserver +if not current_endpoint: + print("need env SERVER_ENDPOINT") + exit(1) + t = fluid.DistributeTranspiler() t.transpile( - optimize_ops, params_grads, pservers=pserver_endpoints, trainers=trainers) + optimize_ops, + params_grads, + 0, + pservers=pserver_endpoints, + trainers=trainers) if training_role == "PSERVER": - if not current_endpoint: - print("need env SERVER_ENDPOINT") - exit(1) pserver_prog = t.get_pserver_program(current_endpoint) pserver_startup = t.get_startup_program(current_endpoint, pserver_prog) exe.run(pserver_startup)