From 5df0350b675f35ad7540ff1935bffa37a1224625 Mon Sep 17 00:00:00 2001 From: ZPaC Date: Mon, 9 Nov 2020 16:47:58 +0800 Subject: [PATCH] PS mode supports negative index looking up. --- mindspore/ccsrc/ps/optimizer_info.cc | 6 ++++++ mindspore/ccsrc/ps/worker_proxy.h | 19 ++++++++++++++----- .../ps/cmp_sparse_embedding/shell_run_test.sh | 6 +++--- .../test_cmp_sparse_embedding.py | 2 +- tests/st/ps/full_ps/test_full_ps_lenet.py | 2 +- 5 files changed, 25 insertions(+), 10 deletions(-) diff --git a/mindspore/ccsrc/ps/optimizer_info.cc b/mindspore/ccsrc/ps/optimizer_info.cc index a999f81eff..c9cdb01016 100644 --- a/mindspore/ccsrc/ps/optimizer_info.cc +++ b/mindspore/ccsrc/ps/optimizer_info.cc @@ -222,8 +222,14 @@ void SparseOptimInfo::ComputeMean(const std::vector> &shapes MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; return; } + int64_t reduced_indice_size = unique_sparse_grad.indices_size_ * sizeof(int); MS_EXCEPTION_IF_NULL(unique_sparse_grad.indices_); + ret = memcpy_s(indices()->addr, indices()->size, unique_sparse_grad.indices_, reduced_indice_size); + if (ret != 0) { + MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; + return; + } gradient()->size = reduced_grad_size; indices()->size = reduced_indice_size; diff --git a/mindspore/ccsrc/ps/worker_proxy.h b/mindspore/ccsrc/ps/worker_proxy.h index 625dc5f2af..e0962765ec 100644 --- a/mindspore/ccsrc/ps/worker_proxy.h +++ b/mindspore/ccsrc/ps/worker_proxy.h @@ -304,15 +304,18 @@ int64_t WorkerProxy::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const : auto &kvs = lookup_results_[ts]; mutex_.unlock(); + if (lookup_ids.empty()) { + MS_LOG(EXCEPTION) << "Lookup id is empty."; + } + int64_t single_id_len = SizeToLong(lookup_result->size() / lookup_ids.size()); std::unordered_map>> id_addr_map; for (const auto &s : kvs) { int64_t offset = 0; - int64_t len = s.vals.size() / s.keys.size(); for (size_t i = 0; i < s.keys.size(); i++) { const Key &key = s.keys[i]; T *addr = s.vals.data() + offset; - offset += len; - id_addr_map[key] = std::make_shared>(std::make_pair(addr, len)); + offset += single_id_len; + id_addr_map[key] = std::make_shared>(std::make_pair(addr, single_id_len)); MS_EXCEPTION_IF_NULL(id_addr_map[key]); } } @@ -325,8 +328,12 @@ int64_t WorkerProxy::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const : void *dst_data = nullptr; void *src_data = nullptr; for (size_t i = 0; i < lookup_ids.size(); i++) { + if (id_addr_map.count(lookup_ids[i]) == 0) { + offset += single_id_len; + continue; + } auto &pair = id_addr_map[static_cast(lookup_ids[i])]; - int64_t size = pair->second * sizeof(T); + int64_t size = single_id_len * sizeof(T); dst_size = size; src_size = size; dst_data = result_addr + offset; @@ -338,7 +345,7 @@ int64_t WorkerProxy::AddLookupCB(const ::ps::SArray<::ps::Key> &keys, const : MS_LOG(EXCEPTION) << "memcpy_s error, errorno(" << ret << ")"; return; } - offset += pair->second; + offset += single_id_len; } mutex_.lock(); @@ -406,6 +413,8 @@ void WorkerProxy::LookupIdSlicer(int64_t timestamp, const ::ps::KVPairs &s for (size_t j = 0; j < id_size; j++) { auto lookup_id = static_cast(lookup_ids[j]); + // If lookup_id is out of range, like negative number, unique_ids will not contain it. + // Servers always get lookup_ids in its embedding table range. if (lookup_id >= begin && lookup_id <= end) { unique_ids.insert(lookup_id); } diff --git a/tests/st/ps/cmp_sparse_embedding/shell_run_test.sh b/tests/st/ps/cmp_sparse_embedding/shell_run_test.sh index e1afc1dc14..b9a7d3f634 100644 --- a/tests/st/ps/cmp_sparse_embedding/shell_run_test.sh +++ b/tests/st/ps/cmp_sparse_embedding/shell_run_test.sh @@ -30,7 +30,7 @@ do rm -rf ${execute_path}/sched_$i/ mkdir ${execute_path}/sched_$i/ cd ${execute_path}/sched_$i/ || exit - python ${self_path}/../test_cmp_sparse_embedding.py & + python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET & done export MS_ROLE=MS_PSERVER @@ -39,7 +39,7 @@ do rm -rf ${execute_path}/server_$i/ mkdir ${execute_path}/server_$i/ cd ${execute_path}/server_$i/ || exit - python ${self_path}/../test_cmp_sparse_embedding.py & + python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET & done export MS_ROLE=MS_WORKER @@ -48,7 +48,7 @@ do rm -rf ${execute_path}/worker_$i/ mkdir ${execute_path}/worker_$i/ cd ${execute_path}/worker_$i/ || exit - python ${self_path}/../test_cmp_sparse_embedding.py & + python ${self_path}/../test_cmp_sparse_embedding.py --device_target=$DEVICE_TARGET & done wait $! diff --git a/tests/st/ps/cmp_sparse_embedding/test_cmp_sparse_embedding.py b/tests/st/ps/cmp_sparse_embedding/test_cmp_sparse_embedding.py index 96ca152754..f887c99d36 100644 --- a/tests/st/ps/cmp_sparse_embedding/test_cmp_sparse_embedding.py +++ b/tests/st/ps/cmp_sparse_embedding/test_cmp_sparse_embedding.py @@ -69,7 +69,7 @@ def do_sparse_embedding(ps=False): train_network.set_train() losses = [] for _ in range(epoch): - data = Tensor(np.random.randint(0, 15, (32, 3), np.int32)) + data = Tensor(np.random.randint(-5, 15, (32, 3), np.int32)) label = Tensor(np.random.randint(0, 9, (32), np.int32)) if _is_role_pserver(): train_network(data, label) diff --git a/tests/st/ps/full_ps/test_full_ps_lenet.py b/tests/st/ps/full_ps/test_full_ps_lenet.py index c8a0294e72..1fa7029d9e 100644 --- a/tests/st/ps/full_ps/test_full_ps_lenet.py +++ b/tests/st/ps/full_ps/test_full_ps_lenet.py @@ -135,4 +135,4 @@ if __name__ == "__main__": acc = model.eval(ds_eval, dataset_sink_mode=False) print("Accuracy:", acc['Accuracy']) - assert acc['Accuracy'] > 0.93 + assert acc['Accuracy'] > 0.90