|
|
|
@ -15,6 +15,7 @@ limitations under the License. */
|
|
|
|
|
#include "paddle/fluid/framework/parallel_executor.h"
|
|
|
|
|
|
|
|
|
|
#include <string>
|
|
|
|
|
#include <tuple>
|
|
|
|
|
#include <vector>
|
|
|
|
|
|
|
|
|
|
#ifdef PADDLE_WITH_CUDA
|
|
|
|
@ -41,6 +42,8 @@ class ParallelExecutorPrivate {
|
|
|
|
|
#ifdef PADDLE_WITH_CUDA
|
|
|
|
|
std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
std::vector<std::tuple<std::string, proto::VarType::Type, bool>> var_types_;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
|
|
|
|
@ -97,14 +100,9 @@ ParallelExecutor::ParallelExecutor(
|
|
|
|
|
allow_op_delay));
|
|
|
|
|
|
|
|
|
|
// Step 3. Create vars in each scope;
|
|
|
|
|
for (auto *scope : member_->local_scopes_) {
|
|
|
|
|
for (auto *var : main_program.Block(0).AllVars()) {
|
|
|
|
|
if (scope->FindVar(var->Name()) != nullptr) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
InitializeVariable(scope->Var(var->Name()), var->GetType());
|
|
|
|
|
}
|
|
|
|
|
for (auto *var : main_program.Block(0).AllVars()) {
|
|
|
|
|
member_->var_types_.emplace_back(var->Name(), var->GetType(),
|
|
|
|
|
var->Persistable());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -163,9 +161,42 @@ void ParallelExecutor::Run(
|
|
|
|
|
const std::unordered_map<std::string, LoDTensor> &feed_tensors) {
|
|
|
|
|
platform::RecordBlock b(0);
|
|
|
|
|
SplitTensorToPlaces(feed_tensors);
|
|
|
|
|
|
|
|
|
|
// Create local scopes.
|
|
|
|
|
for (auto &scope : member_->local_scopes_) {
|
|
|
|
|
Scope &local_scope = scope->NewScope();
|
|
|
|
|
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>() =
|
|
|
|
|
&local_scope;
|
|
|
|
|
|
|
|
|
|
for (auto &name_type_pair : member_->var_types_) {
|
|
|
|
|
if (scope->FindVar(std::get<0>(name_type_pair)) != nullptr) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (std::get<2>(name_type_pair)) { // Persistable
|
|
|
|
|
InitializeVariable(scope->Var(std::get<0>(name_type_pair)),
|
|
|
|
|
std::get<1>(name_type_pair));
|
|
|
|
|
} else {
|
|
|
|
|
InitializeVariable(scope->Var(std::get<0>(name_type_pair)),
|
|
|
|
|
std::get<1>(name_type_pair));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
auto fetch_data = member_->executor_->Run(fetch_tensors);
|
|
|
|
|
*member_->global_scope_->Var(fetched_var_name)->GetMutable<FeedFetchList>() =
|
|
|
|
|
fetch_data;
|
|
|
|
|
|
|
|
|
|
// Wait All computational streams
|
|
|
|
|
for (auto p : member_->places_) {
|
|
|
|
|
platform::DeviceContextPool::Instance().Get(p)->Wait();
|
|
|
|
|
}
|
|
|
|
|
for (auto &scope : member_->local_scopes_) {
|
|
|
|
|
auto &local_scope =
|
|
|
|
|
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>();
|
|
|
|
|
scope->DeleteScope(local_scope);
|
|
|
|
|
local_scope = nullptr;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ParallelExecutor::SplitTensorToPlaces(
|
|
|
|
|