|
|
@ -20,22 +20,27 @@ limitations under the License. */
|
|
|
|
#include "paddle/framework/op_registry.h"
|
|
|
|
#include "paddle/framework/op_registry.h"
|
|
|
|
#include "paddle/framework/operator.h"
|
|
|
|
#include "paddle/framework/operator.h"
|
|
|
|
#include "paddle/framework/program_desc.h"
|
|
|
|
#include "paddle/framework/program_desc.h"
|
|
|
|
|
|
|
|
#include "paddle/operators/math/math_function.h"
|
|
|
|
|
|
|
|
#include "paddle/operators/math/selected_rows_functor.h"
|
|
|
|
#include "paddle/string/printf.h"
|
|
|
|
#include "paddle/string/printf.h"
|
|
|
|
|
|
|
|
|
|
|
|
USE_NO_KERNEL_OP(send);
|
|
|
|
USE_NO_KERNEL_OP(send);
|
|
|
|
USE_NO_KERNEL_OP(recv);
|
|
|
|
USE_NO_KERNEL_OP(recv);
|
|
|
|
USE_OP(sum);
|
|
|
|
USE_OP(sum);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
namespace f = paddle::framework;
|
|
|
|
|
|
|
|
namespace p = paddle::platform;
|
|
|
|
|
|
|
|
namespace m = paddle::operators::math;
|
|
|
|
|
|
|
|
|
|
|
|
// global for simplicity.
|
|
|
|
// global for simplicity.
|
|
|
|
std::unique_ptr<paddle::framework::OperatorBase> recv_op;
|
|
|
|
std::unique_ptr<f::OperatorBase> recv_op;
|
|
|
|
|
|
|
|
|
|
|
|
void InitTensorsInScope(paddle::framework::Scope &scope,
|
|
|
|
void InitTensorsInScope(f::Scope &scope, p::CPUPlace &place) {
|
|
|
|
paddle::platform::CPUPlace &place) {
|
|
|
|
p::CPUDeviceContext ctx(place);
|
|
|
|
paddle::platform::CPUDeviceContext ctx(place);
|
|
|
|
|
|
|
|
for (int i = 0; i < 2; ++i) {
|
|
|
|
for (int i = 0; i < 2; ++i) {
|
|
|
|
auto var_name = paddle::string::Sprintf("x%d", i);
|
|
|
|
auto var_name = paddle::string::Sprintf("x%d", i);
|
|
|
|
auto var = scope.Var(var_name);
|
|
|
|
auto var = scope.Var(var_name);
|
|
|
|
auto tensor = var->GetMutable<paddle::framework::LoDTensor>();
|
|
|
|
auto tensor = var->GetMutable<f::LoDTensor>();
|
|
|
|
tensor->Resize({10, 10});
|
|
|
|
tensor->Resize({10, 10});
|
|
|
|
float *expect = tensor->mutable_data<float>(place);
|
|
|
|
float *expect = tensor->mutable_data<float>(place);
|
|
|
|
for (int64_t i = 0; i < tensor->numel(); ++i) {
|
|
|
|
for (int64_t i = 0; i < tensor->numel(); ++i) {
|
|
|
@ -44,21 +49,53 @@ void InitTensorsInScope(paddle::framework::Scope &scope,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
auto out_var = scope.Var("Out");
|
|
|
|
auto out_var = scope.Var("Out");
|
|
|
|
auto out_tensor = out_var->GetMutable<paddle::framework::LoDTensor>();
|
|
|
|
auto out_tensor = out_var->GetMutable<f::LoDTensor>();
|
|
|
|
out_tensor->Resize({10, 10});
|
|
|
|
out_tensor->Resize({10, 10});
|
|
|
|
out_tensor->mutable_data<float>(place); // allocate
|
|
|
|
out_tensor->mutable_data<float>(place); // allocate
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void AddOp(const std::string &type,
|
|
|
|
void InitSelectedRowsInScope(f::Scope &scope, p::CPUPlace &place) {
|
|
|
|
const paddle::framework::VariableNameMap &inputs,
|
|
|
|
p::CPUDeviceContext ctx(place);
|
|
|
|
const paddle::framework::VariableNameMap &outputs,
|
|
|
|
int64_t height = 10;
|
|
|
|
paddle::framework::AttributeMap attrs,
|
|
|
|
int64_t row_numel = 10;
|
|
|
|
paddle::framework::BlockDesc *block) {
|
|
|
|
m::SetConstant<p::CPUDeviceContext, float> set_one;
|
|
|
|
|
|
|
|
// init x0
|
|
|
|
|
|
|
|
std::vector<int64_t> rows0{0, 4, 7};
|
|
|
|
|
|
|
|
auto x0_var = scope.Var("x0");
|
|
|
|
|
|
|
|
auto x0 = x0_var->GetMutable<f::SelectedRows>();
|
|
|
|
|
|
|
|
x0->set_rows(rows0);
|
|
|
|
|
|
|
|
x0->set_height(height);
|
|
|
|
|
|
|
|
auto x0_value = x0->mutable_value();
|
|
|
|
|
|
|
|
x0_value->mutable_data<float>(
|
|
|
|
|
|
|
|
f::make_ddim({static_cast<int64_t>(rows0.size()), row_numel}), place);
|
|
|
|
|
|
|
|
set_one(ctx, x0_value, 1.0);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// init x1
|
|
|
|
|
|
|
|
std::vector<int64_t> rows1{2, 9};
|
|
|
|
|
|
|
|
auto x1_var = scope.Var("x1");
|
|
|
|
|
|
|
|
auto x1 = x1_var->GetMutable<f::SelectedRows>();
|
|
|
|
|
|
|
|
x1->set_rows(rows1);
|
|
|
|
|
|
|
|
x1->set_height(height);
|
|
|
|
|
|
|
|
auto x1_value = x1->mutable_value();
|
|
|
|
|
|
|
|
x1_value->mutable_data<float>(
|
|
|
|
|
|
|
|
f::make_ddim({static_cast<int64_t>(rows1.size()), row_numel}), place);
|
|
|
|
|
|
|
|
set_one(ctx, x1_value, 1.0);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
auto out_var = scope.Var("Out");
|
|
|
|
|
|
|
|
auto out = out_var->GetMutable<f::SelectedRows>();
|
|
|
|
|
|
|
|
auto out_value = out->mutable_value();
|
|
|
|
|
|
|
|
out->set_height(height);
|
|
|
|
|
|
|
|
out_value->mutable_data<float>(f::make_ddim({5, 10}), place);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
void AddOp(const std::string &type, const f::VariableNameMap &inputs,
|
|
|
|
|
|
|
|
const f::VariableNameMap &outputs, f::AttributeMap attrs,
|
|
|
|
|
|
|
|
f::BlockDesc *block) {
|
|
|
|
// insert output
|
|
|
|
// insert output
|
|
|
|
for (auto kv : outputs) {
|
|
|
|
for (auto kv : outputs) {
|
|
|
|
for (auto v : kv.second) {
|
|
|
|
for (auto v : kv.second) {
|
|
|
|
auto var = block->Var(v);
|
|
|
|
auto var = block->Var(v);
|
|
|
|
var->SetDataType(paddle::framework::proto::DataType::FP32);
|
|
|
|
var->SetDataType(f::proto::DataType::FP32);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -74,58 +111,99 @@ void AddOp(const std::string &type,
|
|
|
|
op->SetAttrMap(attrs);
|
|
|
|
op->SetAttrMap(attrs);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void StartServerNet() {
|
|
|
|
void StartServerNet(bool is_sparse) {
|
|
|
|
paddle::framework::Scope scope;
|
|
|
|
f::Scope scope;
|
|
|
|
paddle::platform::CPUPlace place;
|
|
|
|
p::CPUPlace place;
|
|
|
|
InitTensorsInScope(scope, place);
|
|
|
|
if (is_sparse) {
|
|
|
|
|
|
|
|
InitSelectedRowsInScope(scope, place);
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
InitTensorsInScope(scope, place);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// sub program run in recv_op, for simple test we use sum
|
|
|
|
// sub program run in recv_op, for simple test we use sum
|
|
|
|
paddle::framework::ProgramDesc program;
|
|
|
|
f::ProgramDesc program;
|
|
|
|
paddle::framework::BlockDesc *block = program.MutableBlock(0);
|
|
|
|
f::BlockDesc *block = program.MutableBlock(0);
|
|
|
|
// X for server side tensors, RX for received tensers, must be of same shape.
|
|
|
|
// X for server side tensors, RX for received tensers, must be of same shape.
|
|
|
|
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"x0"}}}, {}, block);
|
|
|
|
AddOp("sum", {{"X", {"x0", "x1"}}}, {{"Out", {"Out"}}}, {}, block);
|
|
|
|
|
|
|
|
|
|
|
|
paddle::framework::AttributeMap attrs;
|
|
|
|
f::AttributeMap attrs;
|
|
|
|
attrs.insert({"endpoint", std::string("127.0.0.1:6174")});
|
|
|
|
attrs.insert({"endpoint", std::string("127.0.0.1:6174")});
|
|
|
|
attrs.insert({"ParamList", std::vector<std::string>({"x0"})});
|
|
|
|
attrs.insert({"ParamList", std::vector<std::string>({"Out"})});
|
|
|
|
attrs.insert({"GradList", std::vector<std::string>({"x1"})});
|
|
|
|
attrs.insert({"GradList", std::vector<std::string>({"x1"})});
|
|
|
|
std::string program_proto;
|
|
|
|
std::string program_proto;
|
|
|
|
PADDLE_ENFORCE(program.Proto()->SerializeToString(&program_proto));
|
|
|
|
PADDLE_ENFORCE(program.Proto()->SerializeToString(&program_proto));
|
|
|
|
|
|
|
|
|
|
|
|
attrs.insert({"OptimizeProgram", program_proto});
|
|
|
|
attrs.insert({"OptimizeProgram", program_proto});
|
|
|
|
recv_op = paddle::framework::OpRegistry::CreateOp("recv", {{"RX", {"x1"}}},
|
|
|
|
recv_op = f::OpRegistry::CreateOp("recv", {{"RX", {"x1"}}}, {}, attrs);
|
|
|
|
{}, attrs);
|
|
|
|
|
|
|
|
recv_op->Run(scope, place);
|
|
|
|
recv_op->Run(scope, place);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
TEST(SendRecvOp, CPU) {
|
|
|
|
TEST(SendRecvOp, CPUDense) {
|
|
|
|
std::thread server_thread(StartServerNet);
|
|
|
|
std::thread server_thread(StartServerNet, false);
|
|
|
|
sleep(5); // wait server to start
|
|
|
|
sleep(3); // wait server to start
|
|
|
|
// local net
|
|
|
|
// local net
|
|
|
|
paddle::framework::Scope scope;
|
|
|
|
f::Scope scope;
|
|
|
|
paddle::platform::CPUPlace place;
|
|
|
|
p::CPUPlace place;
|
|
|
|
InitTensorsInScope(scope, place);
|
|
|
|
InitTensorsInScope(scope, place);
|
|
|
|
|
|
|
|
|
|
|
|
paddle::framework::AttributeMap attrs;
|
|
|
|
f::AttributeMap attrs;
|
|
|
|
attrs.insert({"endpoints", std::vector<std::string>({"127.0.0.1:6174"})});
|
|
|
|
attrs.insert({"endpoints", std::vector<std::string>({"127.0.0.1:6174"})});
|
|
|
|
attrs.insert({"epmap", std::vector<std::string>({"127.0.0.1:6174"})});
|
|
|
|
attrs.insert({"epmap", std::vector<std::string>({"127.0.0.1:6174"})});
|
|
|
|
auto send_op = paddle::framework::OpRegistry::CreateOp(
|
|
|
|
auto send_op = f::OpRegistry::CreateOp("send", {{"X", {"x1"}}},
|
|
|
|
"send", {{"X", {"x1"}}}, {{"Out", {"x0"}}}, attrs);
|
|
|
|
{{"Out", {"Out"}}}, attrs);
|
|
|
|
send_op->Run(scope, place);
|
|
|
|
send_op->Run(scope, place);
|
|
|
|
|
|
|
|
|
|
|
|
auto in_var = scope.Var("x1");
|
|
|
|
auto in_var = scope.Var("x1");
|
|
|
|
auto tensor = in_var->GetMutable<paddle::framework::LoDTensor>();
|
|
|
|
auto tensor = in_var->GetMutable<f::LoDTensor>();
|
|
|
|
float *expected = tensor->data<float>();
|
|
|
|
float *expected = tensor->data<float>();
|
|
|
|
auto out_var = scope.Var("x0");
|
|
|
|
auto out_var = scope.Var("Out");
|
|
|
|
auto target = out_var->GetMutable<paddle::framework::LoDTensor>();
|
|
|
|
auto target = out_var->GetMutable<f::LoDTensor>();
|
|
|
|
// x1 * 2 == x0
|
|
|
|
// x1 * 2 == x0
|
|
|
|
EXPECT_NE(target->memory_size(), size_t(0));
|
|
|
|
EXPECT_NE(target->memory_size(), size_t(0));
|
|
|
|
float *actual = target->data<float>();
|
|
|
|
float *actual = target->data<float>();
|
|
|
|
for (int64_t i = 0; i < target->numel(); ++i) {
|
|
|
|
for (int64_t i = 0; i < target->numel(); ++i) {
|
|
|
|
EXPECT_EQ(expected[i] * 2, actual[i]);
|
|
|
|
EXPECT_EQ(expected[i] * 2, actual[i]);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
recv_op->Stop();
|
|
|
|
|
|
|
|
server_thread.join();
|
|
|
|
|
|
|
|
recv_op.reset(nullptr);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
TEST(SendRecvOp, CPUSparse) {
|
|
|
|
|
|
|
|
std::thread server_thread(StartServerNet, true);
|
|
|
|
|
|
|
|
sleep(3); // wait server to start
|
|
|
|
|
|
|
|
// local net
|
|
|
|
|
|
|
|
f::Scope scope;
|
|
|
|
|
|
|
|
p::CPUPlace place;
|
|
|
|
|
|
|
|
p::CPUDeviceContext ctx(place);
|
|
|
|
|
|
|
|
InitSelectedRowsInScope(scope, place);
|
|
|
|
|
|
|
|
f::AttributeMap attrs;
|
|
|
|
|
|
|
|
attrs.insert({"endpoints", std::vector<std::string>({"127.0.0.1:6174"})});
|
|
|
|
|
|
|
|
attrs.insert({"epmap", std::vector<std::string>({"127.0.0.1:6174"})});
|
|
|
|
|
|
|
|
auto send_op = f::OpRegistry::CreateOp("send", {{"X", {"x1"}}},
|
|
|
|
|
|
|
|
{{"Out", {"Out"}}}, attrs);
|
|
|
|
|
|
|
|
send_op->Run(scope, place);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
auto x0 = scope.Var("x0")->GetMutable<f::SelectedRows>();
|
|
|
|
|
|
|
|
auto x1 = scope.Var("x1")->GetMutable<f::SelectedRows>();
|
|
|
|
|
|
|
|
auto out = scope.Var("Out")->GetMutable<f::SelectedRows>();
|
|
|
|
|
|
|
|
auto actual = out->mutable_value();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<f::SelectedRows> expect{new f::SelectedRows()};
|
|
|
|
|
|
|
|
auto expect_value = expect->mutable_value();
|
|
|
|
|
|
|
|
expect_value->mutable_data<float>(f::make_ddim({5, 10}), place);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
m::SelectedRowsAdd<p::CPUDeviceContext, float> add_functor;
|
|
|
|
|
|
|
|
add_functor(ctx, *x0, *x1, expect.get());
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
EXPECT_EQ(actual->numel(), expect_value->numel());
|
|
|
|
|
|
|
|
EXPECT_EQ(out->rows().size(), x0->rows().size() + x1->rows().size());
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for (int64_t i = 0; i < expect_value->numel(); ++i) {
|
|
|
|
|
|
|
|
EXPECT_EQ(expect_value->mutable_data<float>(place)[i],
|
|
|
|
|
|
|
|
actual->mutable_data<float>(place)[i]);
|
|
|
|
|
|
|
|
}
|
|
|
|
recv_op->Stop();
|
|
|
|
recv_op->Stop();
|
|
|
|
server_thread.join();
|
|
|
|
server_thread.join();
|
|
|
|
// recv_op.reset();
|
|
|
|
recv_op.reset();
|
|
|
|
}
|
|
|
|
}
|
|
|
|