|
|
|
@ -13,7 +13,6 @@ See the License for the specific language governing permissions and
|
|
|
|
|
limitations under the License. */
|
|
|
|
|
|
|
|
|
|
#include "paddle/fluid/framework/parallel_executor.h"
|
|
|
|
|
#include "paddle/fluid/platform/profiler.h"
|
|
|
|
|
|
|
|
|
|
#include <string>
|
|
|
|
|
#include <vector>
|
|
|
|
@ -24,6 +23,7 @@ limitations under the License. */
|
|
|
|
|
|
|
|
|
|
#include "paddle/fluid/framework/details/multi_devices_graph_builder.h"
|
|
|
|
|
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
|
|
|
|
|
#include "paddle/fluid/platform/profiler.h"
|
|
|
|
|
|
|
|
|
|
namespace paddle {
|
|
|
|
|
namespace framework {
|
|
|
|
@ -43,30 +43,40 @@ class ParallelExecutorPrivate {
|
|
|
|
|
#endif
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
|
|
|
|
|
return member_->local_scopes_;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ParallelExecutor::ParallelExecutor(
|
|
|
|
|
size_t num_threads, bool use_event,
|
|
|
|
|
const std::vector<platform::Place> &places,
|
|
|
|
|
const std::unordered_set<std::string> ¶ms,
|
|
|
|
|
const ProgramDesc &startup_program, const ProgramDesc &main_program,
|
|
|
|
|
const std::string &loss_var_name, Scope *scope, bool allow_op_delay)
|
|
|
|
|
const std::unordered_set<std::string> &bcast_vars,
|
|
|
|
|
const ProgramDesc &main_program, const std::string &loss_var_name,
|
|
|
|
|
Scope *scope, const std::vector<Scope *> &local_scopes, bool allow_op_delay)
|
|
|
|
|
: member_(new ParallelExecutorPrivate(places)) {
|
|
|
|
|
member_->global_scope_ = scope;
|
|
|
|
|
|
|
|
|
|
// Step 1. RunStartupProgram and Bcast the params to devs.
|
|
|
|
|
Executor exe(places[0]);
|
|
|
|
|
exe.Run(startup_program, scope, 0);
|
|
|
|
|
// Step 1. Bcast the params to devs.
|
|
|
|
|
// Create local scopes
|
|
|
|
|
for (size_t i = 0; i < member_->places_.size(); ++i) {
|
|
|
|
|
member_->local_scopes_.push_back(&scope->NewScope());
|
|
|
|
|
if (local_scopes.empty()) {
|
|
|
|
|
for (size_t i = 0; i < member_->places_.size(); ++i) {
|
|
|
|
|
member_->local_scopes_.push_back(&scope->NewScope());
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size());
|
|
|
|
|
for (size_t i = 0; i < member_->places_.size(); ++i) {
|
|
|
|
|
member_->local_scopes_.push_back(local_scopes[i]);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Bcast Parameters to all GPUs
|
|
|
|
|
#ifdef PADDLE_WITH_CUDA
|
|
|
|
|
member_->nccl_ctxs_.reset(new platform::NCCLContextMap(member_->places_));
|
|
|
|
|
#endif
|
|
|
|
|
if (platform::is_gpu_place(places[0]) &&
|
|
|
|
|
member_->local_scopes_.size() != 1) { // Is CUDA
|
|
|
|
|
BCastParamsToGPUs(startup_program);
|
|
|
|
|
if (platform::is_gpu_place(places[0]) && member_->local_scopes_.size() != 1 &&
|
|
|
|
|
local_scopes.empty()) { // Is CUDA
|
|
|
|
|
BCastParamsToGPUs(bcast_vars);
|
|
|
|
|
}
|
|
|
|
|
// Startup Program has been run. All local scopes has correct parameters.
|
|
|
|
|
|
|
|
|
@ -99,48 +109,47 @@ ParallelExecutor::ParallelExecutor(
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ParallelExecutor::BCastParamsToGPUs(
|
|
|
|
|
const ProgramDesc &startup_program) const {
|
|
|
|
|
const std::unordered_set<std::string> &vars) const {
|
|
|
|
|
#ifdef PADDLE_WITH_CUDA
|
|
|
|
|
auto *main_scope = member_->local_scopes_[0];
|
|
|
|
|
|
|
|
|
|
for (auto *var_desc : startup_program.Block(0).AllVars()) {
|
|
|
|
|
size_t idx = var_desc->Name().find("@GRAD");
|
|
|
|
|
if (idx != std::string::npos) continue;
|
|
|
|
|
if (var_desc->GetType() == proto::VarType::LOD_TENSOR) {
|
|
|
|
|
auto &main_tensor =
|
|
|
|
|
main_scope->FindVar(var_desc->Name())->Get<LoDTensor>();
|
|
|
|
|
|
|
|
|
|
auto &dims = main_tensor.dims();
|
|
|
|
|
|
|
|
|
|
if (paddle::platform::is_gpu_place(main_tensor.place())) {
|
|
|
|
|
size_t numel = main_tensor.numel();
|
|
|
|
|
ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
|
|
|
|
|
platform::NCCLGroupGuard guard;
|
|
|
|
|
for (size_t i = 0; i < member_->places_.size(); ++i) {
|
|
|
|
|
auto place = member_->places_[i];
|
|
|
|
|
void *buffer;
|
|
|
|
|
if (i == 0) {
|
|
|
|
|
buffer = const_cast<void *>(main_tensor.data<void>());
|
|
|
|
|
} else {
|
|
|
|
|
auto local_scope = member_->local_scopes_[i];
|
|
|
|
|
auto *t =
|
|
|
|
|
local_scope->Var(var_desc->Name())->GetMutable<LoDTensor>();
|
|
|
|
|
t->Resize(dims);
|
|
|
|
|
buffer = t->mutable_data(place, main_tensor.type());
|
|
|
|
|
}
|
|
|
|
|
auto &nccl_ctx = member_->nccl_ctxs_->at(place);
|
|
|
|
|
platform::dynload::ncclBcast(buffer, numel, data_type, 0,
|
|
|
|
|
nccl_ctx.comm_, nccl_ctx.stream());
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
platform::CPUPlace cpu;
|
|
|
|
|
for (size_t i = 1; i < member_->places_.size(); ++i) {
|
|
|
|
|
for (auto &var : vars) {
|
|
|
|
|
auto *main_var = main_scope->FindVar(var);
|
|
|
|
|
if (!main_var->IsType<LoDTensor>()) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto &main_tensor = main_var->Get<LoDTensor>();
|
|
|
|
|
|
|
|
|
|
auto &dims = main_tensor.dims();
|
|
|
|
|
|
|
|
|
|
if (paddle::platform::is_gpu_place(main_tensor.place())) {
|
|
|
|
|
size_t numel = main_tensor.numel();
|
|
|
|
|
ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
|
|
|
|
|
platform::NCCLGroupGuard guard;
|
|
|
|
|
for (size_t i = 0; i < member_->places_.size(); ++i) {
|
|
|
|
|
auto place = member_->places_[i];
|
|
|
|
|
void *buffer;
|
|
|
|
|
if (i == 0) {
|
|
|
|
|
buffer = const_cast<void *>(main_tensor.data<void>());
|
|
|
|
|
} else {
|
|
|
|
|
auto local_scope = member_->local_scopes_[i];
|
|
|
|
|
auto *t = local_scope->Var(var_desc->Name())->GetMutable<LoDTensor>();
|
|
|
|
|
auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
|
|
|
|
|
t->Resize(dims);
|
|
|
|
|
t->mutable_data(cpu, main_tensor.type());
|
|
|
|
|
paddle::framework::TensorCopy(main_tensor, cpu, t);
|
|
|
|
|
buffer = t->mutable_data(place, main_tensor.type());
|
|
|
|
|
}
|
|
|
|
|
auto &nccl_ctx = member_->nccl_ctxs_->at(place);
|
|
|
|
|
platform::dynload::ncclBcast(buffer, numel, data_type, 0,
|
|
|
|
|
nccl_ctx.comm_, nccl_ctx.stream());
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
platform::CPUPlace cpu;
|
|
|
|
|
for (size_t i = 1; i < member_->places_.size(); ++i) {
|
|
|
|
|
auto local_scope = member_->local_scopes_[i];
|
|
|
|
|
auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
|
|
|
|
|
t->Resize(dims);
|
|
|
|
|
t->mutable_data(cpu, main_tensor.type());
|
|
|
|
|
paddle::framework::TensorCopy(main_tensor, cpu, t);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
member_->nccl_ctxs_->WaitAll();
|
|
|
|
|