add ps_instance doc

revert-15207-remove_op_handle_lock_and_fix_var
heqiaozhi 6 years ago committed by dongdaxiang
parent 35ce6ac2e6
commit bd1c1724aa

@ -137,7 +137,8 @@ cc_library(op_registry SRCS op_registry.cc DEPS op_proto_maker op_info operator
nv_test(op_registry_test SRCS op_registry_test.cc DEPS op_registry)
py_proto_compile(framework_py_proto SRCS framework.proto data_feed.proto)
# Generate an empty __init__.py to make framework_py_proto as a valid python module.
#Generate an empty \
__init__.py to make framework_py_proto as a valid python module.
add_custom_target(framework_py_proto_init ALL COMMAND ${CMAKE_COMMAND} -E touch __init__.py)
add_dependencies(framework_py_proto framework_py_proto_init)
if (NOT WIN32)

@ -30,7 +30,7 @@ limitations under the License. */
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/pybind/pybind.h"
#ifdef PADDLE_WITH_PSLIB
#include "pslib.h"
#include <pslib.h>
#endif
namespace paddle {
@ -70,8 +70,7 @@ void PrepareReaders(std::vector<std::shared_ptr<DataFeed>>& readers, // NOLINT
#ifdef PADDLE_WITH_PSLIB
void AsyncExecutor::InitServer(const std::string& dist_desc, int index) {
_pslib_ptr =
std::shared_ptr<paddle::distributed::PSlib>(
_pslib_ptr = std::shared_ptr<paddle::distributed::PSlib>(
new paddle::distributed::PSlib());
_pslib_ptr->init_server(dist_desc, index);
InitParamConfig();
@ -82,38 +81,41 @@ void AsyncExecutor::InitWorker(const std::string& dist_desc,
int node_num, int index) {
_pslib_ptr = std::shared_ptr<paddle::distributed::PSlib>(
new paddle::distributed::PSlib());
_pslib_ptr->init_worker(
dist_desc, (uint64_t*)(host_sign_list.data()), node_num, index);
_pslib_ptr->init_worker(dist_desc,
static_cast<uint64_t*>(host_sign_list.data()),
node_num, index);
InitParamConfig();
}
uint64_t AsyncExecutor::StartServer() {
return _pslib_ptr->run_server();
}
uint64_t AsyncExecutor::StartServer() { return _pslib_ptr->run_server(); }
void AsyncExecutor::StopServer() {
_pslib_ptr->stop_server();
}
void AsyncExecutor::StopServer() { _pslib_ptr->stop_server(); }
void AsyncExecutor::GatherServers(
const std::vector<uint64_t>& host_sign_list, int node_num) {
_pslib_ptr->gather_servers((uint64_t*)(host_sign_list.data()), node_num);
void AsyncExecutor::GatherServers(const std::vector<uint64_t>& host_sign_list,
int node_num) {
_pslib_ptr->gather_servers(static_cast<uint64_t*>(host_sign_list.data()),
node_num);
}
void AsyncExecutor::InitParamConfig() {
for (int i = 0; i <
_pslib_ptr->get_param()->server_param(). \
downpour_server_param(). \
downpour_table_param_size();
for (int i = 0; i < _pslib_ptr->get_param()
->server_param()
.downpour_server_param()
.downpour_table_param_size();
++i) {
if (_pslib_ptr->get_param()->server_param(). \
downpour_server_param().downpour_table_param(i). \
table_class().find("SparseTable") != -1) {
_param_config.fea_dim = _pslib_ptr->get_param()->server_param(). \
downpour_server_param(). \
downpour_table_param(i). \
accessor().fea_dim();
if (_pslib_ptr->get_param()
->server_param()
.downpour_server_param()
.downpour_table_param(i)
.table_class()
.find("SparseTable") != -1) {
_param_config.fea_dim = _pslib_ptr->get_param()
->server_param()
.downpour_server_param()
.downpour_table_param(i)
.accessor()
.fea_dim();
break;
}
}
@ -123,27 +125,23 @@ void AsyncExecutor::InitParamConfig() {
_param_config.tmp_push_sparse_wait_times = static_cast<int32_t>(
_pslib_ptr->get_param()->trainer_param().push_sparse_per_batch());
for (auto t = 0u;
t < _pslib_ptr->get_param()->trainer_param().skip_op_size();
for (auto t = 0u; t < _pslib_ptr->get_param()->trainer_param().skip_op_size();
++t) {
_param_config.skip_op.push_back(
_pslib_ptr->get_param()->trainer_param().skip_op(t));
}
for (auto t = 0u;
t < _pslib_ptr->get_param()->trainer_param().sparse_table_size();
++t) {
t < _pslib_ptr->get_param()->trainer_param().sparse_table_size(); ++t) {
auto& table = _pslib_ptr->get_param()->trainer_param().sparse_table(t);
std::vector<std::string> tmp_sparse_variable_name;
for (int i = 0u; i < table.slot_value_size(); ++i) {
tmp_sparse_variable_name.push_back(table.slot_value(i));
_param_config.slot_alias_to_table[table.slot_key(i)] =
table.table_id();
_param_config.slot_alias_to_table[table.slot_key(i)] = table.table_id();
}
std::vector<std::string> tmp_sparse_gradient_variable_name;
for (auto i = 0u; i < table.slot_gradient_size(); ++i) {
tmp_sparse_gradient_variable_name.push_back(
table.slot_gradient(i));
tmp_sparse_gradient_variable_name.push_back(table.slot_gradient(i));
}
_param_config.slot_input_vec[table.table_id()] =
std::move(tmp_sparse_variable_name);
@ -153,8 +151,7 @@ void AsyncExecutor::InitParamConfig() {
}
for (auto t = 0u;
t < _pslib_ptr->get_param()->trainer_param().dense_table_size();
++t) {
t < _pslib_ptr->get_param()->trainer_param().dense_table_size(); ++t) {
auto& table = _pslib_ptr->get_param()->trainer_param().dense_table(t);
std::vector<std::string> tmp_dense_variable_name;
for (int i = 0u; i < table.dense_variable_name_size(); ++i) {
@ -198,8 +195,7 @@ void AsyncExecutor::InitModel() {
regions.emplace_back(std::move(reg));
}
auto push_status =
_pslib_ptr->_worker_ptr->push_dense_param(
auto push_status = _pslib_ptr->_worker_ptr->push_dense_param(
regions.data(), regions.size(), table_id);
push_status.wait();
auto status = push_status.get();
@ -225,14 +221,14 @@ void AsyncExecutor::SaveModel(const std::string& path) {
void AsyncExecutor::PrepareDenseThread(const std::string& mode) {
if (mode == "mpi") {
DensePullThreadParam param;
param.ps_client = _pslib_ptr->_worker_ptr;;
param.ps_client = _pslib_ptr->_worker_ptr;
param.threshold = 1;
param.training_thread_num = actual_thread_num;
param.root_scope = root_scope_;
param.dense_params = &_param_config.dense_variable_name;
_pull_dense_thread = std::shared_ptr<DensePullThread>(
new DensePullThread(param));
_pull_dense_thread =
std::shared_ptr<DensePullThread>(new DensePullThread(param));
_pull_dense_thread->start();
}
}
@ -243,8 +239,7 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program,
const std::vector<std::string>& filelist,
const int thread_num,
const std::vector<std::string>& fetch_var_names,
const std::string& mode,
const bool debug) {
const std::string& mode, const bool debug) {
std::vector<std::thread> threads;
auto& block = main_program.Block(0);
@ -308,7 +303,6 @@ void AsyncExecutor::RunFromFile(const ProgramDesc& main_program,
fetch_var_names, root_scope_, thidx, debug);
}
// start executing ops in multiple threads
for (int thidx = 0; thidx < actual_thread_num; ++thidx) {
threads.push_back(

File diff suppressed because it is too large Load Diff

@ -26,7 +26,7 @@ limitations under the License. */
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#ifdef PADDLE_WITH_PSLIB
#include "pslib.h"
#include <pslib.h>
#endif
namespace paddle {
@ -34,7 +34,7 @@ namespace framework {
void CreateTensor(Variable* var, proto::VarType::Type var_type);
#ifdef PADDLE_WITH_PSLIB
const static uint32_t MAX_FEASIGN_NUM = 1000 * 100 * 100;
static const uint32_t MAX_FEASIGN_NUM = 1000 * 100 * 100;
struct AsyncWorkerParamConfig {
int slot_dim;
@ -66,8 +66,8 @@ struct DensePullThreadParam {
class DensePullThread {
public:
explicit DensePullThread(const DensePullThreadParam& param) :
_running(false) {
explicit DensePullThread(const DensePullThreadParam& param)
: _running(false) {
_ps_client = param.ps_client;
_threshold = param.threshold;
_thread_num = param.training_thread_num;
@ -75,8 +75,7 @@ class DensePullThread {
_sleep_time_ms = param.sleep_time_ms;
for (auto& t : *param.dense_params) {
_dense_variable_name[t.first].insert(
_dense_variable_name[t.first].end(),
_dense_variable_name[t.first].insert(_dense_variable_name[t.first].end(),
t.second.begin(), t.second.end());
_training_versions[t.first].resize(_thread_num, 0);
_last_versions[t.first] = 0;
@ -161,10 +160,8 @@ ExecutorThreadWorker()
#ifdef PADDLE_WITH_PSLIB
virtual void SetPSlibPtr(
std::shared_ptr<paddle::distributed::PSlib> pslib_ptr) {}
virtual void SetPullDenseThread(
std::shared_ptr<DensePullThread> dpt) {}
virtual void SetParamConfig(
AsyncWorkerParamConfig * param_config) {}
virtual void SetPullDenseThread(std::shared_ptr<DensePullThread> dpt) {}
virtual void SetParamConfig(AsyncWorkerParamConfig* param_config) {}
#endif
private:
@ -211,13 +208,10 @@ class AsyncExecutorThreadWorker: public ExecutorThreadWorker {
void PushSparse(int table_id);
void PushDense(int table_id);
void check_pull_push_memory(
const std::vector<uint64_t>& features,
std::vector<float*>& push_g,
int dim);
void check_pull_push_memory(const std::vector<uint64_t>& features,
std::vector<std::vector<float>>& push_g,
int dim);
std::vector<float*>* push_g, int dim);
void check_pull_push_memory(const std::vector<uint64_t>& features,
std::vector<std::vector<float>>* push_g, int dim);
void collect_feasign_info(int table_id);
private:
@ -232,7 +226,6 @@ class AsyncExecutorThreadWorker: public ExecutorThreadWorker {
std::map<uint64_t, std::vector<std::vector<float>>> _feature_value;
std::map<uint64_t, std::vector<std::vector<float>>> _feature_push_value;
std::shared_ptr<paddle::distributed::PSlib> _pslib_ptr;
std::shared_ptr<DensePullThread> _pull_dense_thread;
@ -243,7 +236,6 @@ class AsyncExecutorThreadWorker: public ExecutorThreadWorker {
std::vector<::std::future<int32_t>> _push_dense_status;
AsyncWorkerParamConfig* _param_config;
};
#endif

Loading…
Cancel
Save