add topo-aware in heter-ps (#30087)

* add topo aware

* resource.h

* topo aware

* format
revert-31562-mean
Thunderbrook 4 years ago committed by GitHub
parent 297fff1a79
commit 0b8e1fadc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1225,6 +1225,13 @@ void FleetWrapper::LoadModelOneTable(const uint64_t table_id,
void FleetWrapper::LoadWithWhitelist(const uint64_t table_id, void FleetWrapper::LoadWithWhitelist(const uint64_t table_id,
const std::string& path, const int mode) { const std::string& path, const int mode) {
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
auto ret = pslib_ptr_->_worker_ptr->load_with_whitelist(table_id, path,
std::to_string(mode));
ret.wait();
if (ret.get() != 0) {
LOG(ERROR) << "load model of table id: " << table_id
<< ", from path: " << path << " failed";
}
#else #else
VLOG(0) << "FleetWrapper::LoadWhitelist does nothing when no pslib"; VLOG(0) << "FleetWrapper::LoadWhitelist does nothing when no pslib";
#endif #endif
@ -1349,7 +1356,16 @@ int32_t FleetWrapper::SaveWithWhitelist(int table_id, const std::string& path,
const int mode, const int mode,
const std::string& whitelist_path) { const std::string& whitelist_path) {
#ifdef PADDLE_WITH_PSLIB #ifdef PADDLE_WITH_PSLIB
return 0; auto ret = pslib_ptr_->_worker_ptr->save_with_whitelist(
table_id, path, std::to_string(mode), whitelist_path);
ret.wait();
int32_t feasign_cnt = ret.get();
if (feasign_cnt == -1) {
LOG(ERROR) << "table save cache failed";
sleep(sleep_seconds_before_fail_exit_);
exit(-1);
}
return feasign_cnt;
#else #else
VLOG(0) << "FleetWrapper::SaveCache does nothing when no pslib"; VLOG(0) << "FleetWrapper::SaveCache does nothing when no pslib";
return -1; return -1;

@ -765,7 +765,7 @@ x.second );
unsigned long long get_num_collisions() const { return m_collisions; } unsigned long long get_num_collisions() const { return m_collisions; }
void print() { void print() {
for (size_type i = 0; i < m_hashtbl_size; ++i) { for (size_type i = 0; i < 10; ++i) {
std::cout << i << ": " << m_hashtbl_values[i].first << "," std::cout << i << ": " << m_hashtbl_values[i].first << ","
<< m_hashtbl_values[i].second << std::endl; << m_hashtbl_values[i].second << std::endl;
} }

@ -68,6 +68,34 @@ class HeterComm {
Sgd& sgd); Sgd& sgd);
int log2i(int x); int log2i(int x);
bool need_transfer(int send_id, int receive_id) {
return ((send_id / 4 != receive_id / 4) && (send_id + 4) % 8 != receive_id);
}
int get_transfer_devid(int send_id) { return (send_id + 4) % 8; }
struct Node {
cudaStream_t in_stream;
cudaStream_t out_stream;
char* key_storage;
char* val_storage;
int sync;
int key_bytes_len;
int val_bytes_len;
int gpu_num;
};
struct Path {
std::vector<Node> nodes_;
};
void init_path();
void create_storage(
int start_index, int end_index, int keylen, int vallen,
std::vector<std::shared_ptr<memory::Allocation>>& local_strorage);
void walk_to_src(int start_index, int end_index, char* src_val);
void walk_to_dest(int start_index, int end_index, char* src_key,
char* src_val);
private: private:
using Table = HashTable<KeyType, ValType>; using Table = HashTable<KeyType, ValType>;
@ -76,6 +104,8 @@ class HeterComm {
std::vector<Table*> tables_; std::vector<Table*> tables_;
std::shared_ptr<HeterPsResource> resource_; std::shared_ptr<HeterPsResource> resource_;
CustomGradMerger merger_; CustomGradMerger merger_;
int topo_aware_{1};
std::vector<std::vector<Path>> path_;
}; };
} // end namespace framework } // end namespace framework

File diff suppressed because it is too large Load Diff

@ -19,23 +19,35 @@ limitations under the License. */
namespace paddle { namespace paddle {
namespace framework { namespace framework {
GPUResource::GPUResource(int dev_id, int index) { GPUResource::GPUResource(std::vector<int>& dev_ids, int index) {
index_ = index; index_ = index;
dev_id_ = dev_id; dev_ids_ = dev_ids;
dev_id_ = dev_ids_[index];
platform::CUDADeviceGuard guard(dev_id_); platform::CUDADeviceGuard guard(dev_id_);
local_streams_.resize(dev_ids_.size());
comm_streams_.resize(dev_ids_.size());
for (size_t i = 0; i < dev_ids_.size(); ++i) {
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamCreateWithFlags(&local_streams_[i], cudaStreamNonBlocking));
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamCreateWithFlags(&comm_streams_[i], cudaStreamNonBlocking));
}
PADDLE_ENFORCE_CUDA_SUCCESS( PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamCreateWithFlags(&stream_, cudaStreamNonBlocking)); cudaStreamCreateWithFlags(&remote_stream_, cudaStreamNonBlocking));
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamCreateWithFlags(&copy_stream_, cudaStreamNonBlocking));
} }
GPUResource::~GPUResource() { GPUResource::~GPUResource() {
platform::CUDADeviceGuard guard(dev_id_); platform::CUDADeviceGuard guard(dev_id_);
for (size_t i = 0; i < local_streams_.size(); ++i) {
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamDestroy(stream_)); PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamDestroy(local_streams_[i]));
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamDestroy(copy_stream_)); }
for (size_t i = 0; i < comm_streams_.size(); ++i) {
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamDestroy(comm_streams_[i]));
}
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamDestroy(remote_stream_));
} }
void HeterPsResource::enable_p2p() { void HeterPsResource::enable_p2p() {
@ -64,18 +76,22 @@ HeterPsResource::HeterPsResource(const std::vector<int>& dev_ids) {
dev_ids_ = dev_ids; dev_ids_ = dev_ids;
for (size_t i = 0; i < dev_ids_.size(); ++i) { for (size_t i = 0; i < dev_ids_.size(); ++i) {
std::shared_ptr<GPUResource> resource = std::shared_ptr<GPUResource> resource =
std::make_shared<GPUResource>(dev_ids_[i], i); std::make_shared<GPUResource>(dev_ids_, i);
resources_.push_back(resource); resources_.push_back(resource);
devid_2_index_[dev_ids_[i]] = i; devid_2_index_[dev_ids_[i]] = i;
} }
} }
cudaStream_t HeterPsResource::copy_stream(int num) { cudaStream_t HeterPsResource::comm_stream(int gpu_num, int stream_num) {
return resources_[num]->copy_stream(); return resources_[gpu_num]->comm_stream(stream_num);
}
cudaStream_t HeterPsResource::local_stream(int gpu_num, int stream_num) {
return resources_[gpu_num]->local_stream(stream_num);
} }
cudaStream_t HeterPsResource::stream(int num) { cudaStream_t HeterPsResource::remote_stream(int gpu_num) {
return resources_[num]->stream(); return resources_[gpu_num]->remote_stream();
} }
int HeterPsResource::dev_id(int num) { return dev_ids_[num]; } int HeterPsResource::dev_id(int num) { return dev_ids_[num]; }

@ -27,20 +27,23 @@ namespace framework {
class GPUResource { class GPUResource {
public: public:
GPUResource(int device_id, int index); GPUResource(std::vector<int>& device_id, int index);
virtual ~GPUResource(); virtual ~GPUResource();
GPUResource(const GPUResource&) = delete; GPUResource(const GPUResource&) = delete;
GPUResource& operator=(const GPUResource&) = delete; GPUResource& operator=(const GPUResource&) = delete;
int dev_id() const { return dev_id_; } int dev_id() const { return dev_id_; }
int index() const { return index_; } int index() const { return index_; }
cudaStream_t stream() { return stream_; } cudaStream_t local_stream(int num) { return local_streams_[num]; }
cudaStream_t copy_stream() { return copy_stream_; } cudaStream_t remote_stream() { return remote_stream_; }
cudaStream_t comm_stream(int num) { return comm_streams_[num]; }
int dev_id_; int dev_id_;
int index_; int index_;
cudaStream_t stream_; std::vector<int> dev_ids_;
cudaStream_t copy_stream_; cudaStream_t remote_stream_;
std::vector<cudaStream_t> local_streams_;
std::vector<cudaStream_t> comm_streams_;
}; };
class HeterPsResource { class HeterPsResource {
@ -52,9 +55,10 @@ class HeterPsResource {
void enable_p2p(); void enable_p2p();
int total_gpu(); int total_gpu();
int get_index_by_devid(int devid); int get_index_by_devid(int devid);
cudaStream_t stream(int num);
cudaStream_t copy_stream(int num);
int dev_id(int num); int dev_id(int num);
cudaStream_t local_stream(int gpu_num, int stream_num);
cudaStream_t remote_stream(int gpu_num);
cudaStream_t comm_stream(int gpu_num, int stream_num);
std::vector<std::shared_ptr<GPUResource>> resources_; std::vector<std::shared_ptr<GPUResource>> resources_;
std::vector<int> dev_ids_; std::vector<int> dev_ids_;

@ -15,18 +15,19 @@ limitations under the License. */
#pragma once #pragma once
namespace optimizer_config { namespace optimizer_config {
__constant__ float mf_create_thresholds = 1;
__constant__ float nonclk_coeff = 1; __constant__ float mf_create_thresholds = 0;
__constant__ float nonclk_coeff = 0.1;
__constant__ float clk_coeff = 1; __constant__ float clk_coeff = 1;
__constant__ float min_bound = -10000; __constant__ float min_bound = -10;
__constant__ float max_bound = 10000; __constant__ float max_bound = 10;
__constant__ float learning_rate = 1; __constant__ float learning_rate = 0.05;
__constant__ float initial_g2sum = 1; __constant__ float initial_g2sum = 3.0;
__constant__ float initial_range = 1; __constant__ float initial_range = 1e-4;
__constant__ float mf_learning_rate = 1; __constant__ float mf_learning_rate = 0.05;
__constant__ float mf_initial_g2sum = 1; __constant__ float mf_initial_g2sum = 3.0;
__constant__ float mf_initial_range = 1; __constant__ float mf_initial_range = 1e-4;
__constant__ float mf_min_bound = 1; __constant__ float mf_min_bound = -10;
__constant__ float mf_max_bound = 1; __constant__ float mf_max_bound = 10;
} }

@ -143,16 +143,17 @@ void PSGPUWorker::SetNeedDump(bool need_dump_field) {
void PSGPUWorker::DumpParam() {} void PSGPUWorker::DumpParam() {}
void PSGPUWorker::TrainFiles() { void PSGPUWorker::TrainFiles() {
VLOG(3) << "train file A";
platform::SetNumThreads(1); platform::SetNumThreads(1);
platform::Timer timeline;
timeline.Start();
int total_ins_num = 0;
VLOG(3) << "train file B";
// how to accumulate fetched values here // how to accumulate fetched values here
device_reader_->Start(); device_reader_->Start();
VLOG(3) << "train file C";
int cur_batch; int cur_batch;
while ((cur_batch = device_reader_->Next()) > 0) { while ((cur_batch = device_reader_->Next()) > 0) {
VLOG(3) << "train file D"; total_ins_num += cur_batch;
for (auto& op : ops_) { for (auto& op : ops_) {
bool need_skip = false; bool need_skip = false;
for (auto t = 0u; t < skip_ops_.size(); ++t) { for (auto t = 0u; t < skip_ops_.size(); ++t) {
@ -169,6 +170,9 @@ void PSGPUWorker::TrainFiles() {
PrintFetchVars(); PrintFetchVars();
thread_scope_->DropKids(); thread_scope_->DropKids();
} }
timeline.Pause();
VLOG(1) << "GpuPs worker " << thread_id_ << " train cost "
<< timeline.ElapsedSec() << " seconds, ins_num: " << total_ins_num;
return; return;
} }

@ -57,7 +57,11 @@ void BindFleetWrapper(py::module* m) {
.def("get_cache_threshold", &framework::FleetWrapper::GetCacheThreshold) .def("get_cache_threshold", &framework::FleetWrapper::GetCacheThreshold)
.def("cache_shuffle", &framework::FleetWrapper::CacheShuffle) .def("cache_shuffle", &framework::FleetWrapper::CacheShuffle)
.def("save_cache", &framework::FleetWrapper::SaveCache) .def("save_cache", &framework::FleetWrapper::SaveCache)
.def("save_model_with_whitelist",
&framework::FleetWrapper::SaveWithWhitelist)
.def("load_model", &framework::FleetWrapper::LoadModel) .def("load_model", &framework::FleetWrapper::LoadModel)
.def("load_table_with_whitelist",
&framework::FleetWrapper::LoadWithWhitelist)
.def("clear_model", &framework::FleetWrapper::ClearModel) .def("clear_model", &framework::FleetWrapper::ClearModel)
.def("clear_one_table", &framework::FleetWrapper::ClearOneTable) .def("clear_one_table", &framework::FleetWrapper::ClearOneTable)
.def("stop_server", &framework::FleetWrapper::StopServer) .def("stop_server", &framework::FleetWrapper::StopServer)

@ -101,15 +101,16 @@ class PSLib(Fleet):
# barrier_all for init_worker # barrier_all for init_worker
self._role_maker._barrier_all() self._role_maker._barrier_all()
# prepare for client to client communication # prepare for client to client communication
if self._role_maker.is_worker(): if not self._opt_info["use_ps_gpu"]:
info = self._fleet_ptr.get_clients_info() if self._role_maker.is_worker():
all_info = self._role_maker._worker_gather(info[0]) info = self._fleet_ptr.get_clients_info()
self._fleet_ptr.gather_clients(all_info) all_info = self._role_maker._worker_gather(info[0])
self._fleet_ptr.set_client2client_config( self._fleet_ptr.gather_clients(all_info)
self._client2client_request_timeout_ms, self._fleet_ptr.set_client2client_config(
self._client2client_connect_timeout_ms, self._client2client_request_timeout_ms,
self._client2client_max_retry) self._client2client_connect_timeout_ms,
self._fleet_ptr.create_client2client_connection() self._client2client_max_retry)
self._fleet_ptr.create_client2client_connection()
# barrier for init model # barrier for init model
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
if self._role_maker.is_first_worker(): if self._role_maker.is_first_worker():
@ -137,9 +138,10 @@ class PSLib(Fleet):
"var " + var_name + " not found in scope, " "var " + var_name + " not found in scope, "
+ "you should run startup program first") + "you should run startup program first")
var_name_list.append(var_name) var_name_list.append(var_name)
self._fleet_ptr.init_model(scope, if not self._opt_info["use_ps_gpu"]:
int(table.table_id), self._fleet_ptr.init_model(scope,
var_name_list) int(table.table_id),
var_name_list)
# barrier for init model done # barrier for init model done
self._role_maker._barrier_worker() self._role_maker._barrier_worker()
else: else:

Loading…
Cancel
Save