|
|
|
@ -29,7 +29,9 @@ limitations under the License. */
|
|
|
|
|
#include "paddle/fluid/inference/io.h"
|
|
|
|
|
#include "paddle/fluid/platform/place.h"
|
|
|
|
|
#include "paddle/fluid/pybind/pybind.h"
|
|
|
|
|
#ifdef PADDLE_WITH_PSLIB
|
|
|
|
|
#include "pslib.h"
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
namespace paddle {
|
|
|
|
|
namespace framework {
|
|
|
|
@ -48,9 +50,11 @@ void AsyncExecutor::CreateThreads(
|
|
|
|
|
worker->SetDataFeed(reader);
|
|
|
|
|
worker->SetFetchVarNames(fetch_var_names);
|
|
|
|
|
worker->BindingDataFeedMemory();
|
|
|
|
|
#ifdef PADDLE_WITH_PSLIB
|
|
|
|
|
worker->SetPSlibPtr(_pslib_ptr);
|
|
|
|
|
worker->SetPullDenseThread(_pull_dense_thread);
|
|
|
|
|
worker->SetParamConfig(&_param_config);
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void PrepareReaders(std::vector<std::shared_ptr<DataFeed>>& readers, // NOLINT
|
|
|
|
@ -64,6 +68,7 @@ void PrepareReaders(std::vector<std::shared_ptr<DataFeed>>& readers, // NOLINT
|
|
|
|
|
readers[0]->SetFileList(filelist);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#ifdef PADDLE_WITH_PSLIB
|
|
|
|
|
void AsyncExecutor::InitServer(const std::string& dist_desc, int index) {
|
|
|
|
|
_pslib_ptr =
|
|
|
|
|
std::shared_ptr<paddle::distributed::PSlib>(
|
|
|
|
@ -231,6 +236,7 @@ void AsyncExecutor::PrepareDenseThread(const std::string& mode) {
|
|
|
|
|
_pull_dense_thread->start();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
void AsyncExecutor::RunFromFile(const ProgramDesc& main_program,
|
|
|
|
|
const std::string& data_feed_desc_str,
|
|
|
|
@ -279,15 +285,21 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program,
|
|
|
|
|
// todo: should be factory method for creating datafeed
|
|
|
|
|
std::vector<std::shared_ptr<DataFeed>> readers;
|
|
|
|
|
PrepareReaders(readers, actual_thread_num, data_feed_desc, filelist);
|
|
|
|
|
#ifdef PADDLE_WITH_PSLIB
|
|
|
|
|
PrepareDenseThread(mode);
|
|
|
|
|
#endif
|
|
|
|
|
std::vector<std::shared_ptr<ExecutorThreadWorker>> workers;
|
|
|
|
|
workers.resize(actual_thread_num);
|
|
|
|
|
for (auto& worker : workers) {
|
|
|
|
|
#ifdef PADDLE_WITH_PSLIB
|
|
|
|
|
if (mode == "mpi") {
|
|
|
|
|
worker.reset(new AsyncExecutorThreadWorker);
|
|
|
|
|
} else {
|
|
|
|
|
worker.reset(new ExecutorThreadWorker);
|
|
|
|
|
}
|
|
|
|
|
#else
|
|
|
|
|
worker.reset(new ExecutorThreadWorker);
|
|
|
|
|
#endif
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// prepare thread resource here
|
|
|
|
@ -306,9 +318,11 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program,
|
|
|
|
|
for (auto& th : threads) {
|
|
|
|
|
th.join();
|
|
|
|
|
}
|
|
|
|
|
#ifdef PADDLE_WITH_PSLIB
|
|
|
|
|
if (mode == "mpi") {
|
|
|
|
|
_pull_dense_thread->stop();
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
root_scope_->DropKids();
|
|
|
|
|
|
|
|
|
|
return;
|
|
|
|
|