add queue_generator_op, dequeue_op, enqueue_op and ut (#24481)
* add queue_generator_op, dequeue_op, enqueue_op and ut, test=developrevert-24981-add_device_attr_for_regulization
parent
b8f17a049d
commit
6e10022781
@ -0,0 +1,97 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "paddle/fluid/framework/lod_tensor.h"
|
||||
#include "paddle/fluid/framework/op_registry.h"
|
||||
#include "paddle/fluid/framework/operator.h"
|
||||
#include "paddle/fluid/framework/var_type.h"
|
||||
#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"
|
||||
using LoDTensor = paddle::framework::LoDTensor;
|
||||
using LoDTensorBlockingQueueHolder =
|
||||
paddle::operators::reader::LoDTensorBlockingQueueHolder;
|
||||
|
||||
namespace paddle {
|
||||
namespace operators {
|
||||
|
||||
class DequeueOp : public framework::OperatorBase {
|
||||
public:
|
||||
using framework::OperatorBase::OperatorBase;
|
||||
DequeueOp(const std::string& type, const framework::VariableNameMap& inputs,
|
||||
const framework::VariableNameMap& outputs,
|
||||
const framework::AttributeMap& attrs)
|
||||
: OperatorBase(type, inputs, outputs, attrs) {}
|
||||
|
||||
private:
|
||||
void RunImpl(const framework::Scope& scope,
|
||||
const platform::Place& dev_place) const override {
|
||||
const std::string& queue_name = Attr<std::string>("queue_name");
|
||||
auto* queue_holder_var = scope.FindVar(queue_name);
|
||||
PADDLE_ENFORCE_NOT_NULL(
|
||||
queue_holder_var,
|
||||
platform::errors::NotFound(
|
||||
"No LoDTensorBlockingQueueHolder variable with name %s found.",
|
||||
queue_name));
|
||||
auto* queue_holder =
|
||||
queue_holder_var->template GetMutable<LoDTensorBlockingQueueHolder>();
|
||||
auto& out_names = Outputs("Out");
|
||||
PADDLE_ENFORCE_GT(out_names.size(), 0,
|
||||
platform::errors::InvalidArgument(
|
||||
"The output for Op(dequeue) must be set."));
|
||||
for (size_t i = 0; i < out_names.size(); ++i) {
|
||||
auto out_var = scope.FindVar(out_names[i]);
|
||||
PADDLE_ENFORCE_NOT_NULL(
|
||||
out_var, platform::errors::NotFound("No variable with name %s found",
|
||||
out_names[i]));
|
||||
auto* out_tensor = out_var->GetMutable<LoDTensor>();
|
||||
PADDLE_ENFORCE_NOT_NULL(
|
||||
out_tensor,
|
||||
platform::errors::InvalidArgument(
|
||||
"Variable with name %s has not been initialized.", out_names[i]));
|
||||
|
||||
std::vector<LoDTensor> lod_tensor_vec;
|
||||
bool success = false;
|
||||
lod_tensor_vec = queue_holder->GetQueue()->Pop(&success);
|
||||
PADDLE_ENFORCE_EQ(lod_tensor_vec.size(), 1,
|
||||
platform::errors::InvalidArgument(
|
||||
"Expected to pop only one element per Pop call for "
|
||||
"Op(dequeue), but poped %d element.",
|
||||
lod_tensor_vec.size()));
|
||||
for (size_t j = 0; j < lod_tensor_vec.size(); ++j) {
|
||||
TensorCopySync(lod_tensor_vec[j], dev_place, out_tensor);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
class DequeueOpMaker : public framework::OpProtoAndCheckerMaker {
|
||||
public:
|
||||
void Make() override {
|
||||
AddAttr<std::string>("queue_name",
|
||||
"Name of the `LoDTensorBlockingQueueHolder` variable");
|
||||
AddOutput("Out", "A list of `lod_tensor` to dequeue and assigned.")
|
||||
.AsDuplicable();
|
||||
AddComment(R"DOC(
|
||||
Dequeue operator.
|
||||
)DOC");
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace operators
|
||||
} // namespace paddle
|
||||
|
||||
namespace ops = ::paddle::operators;
|
||||
|
||||
REGISTER_OP_WITHOUT_GRADIENT(dequeue, ops::DequeueOp, ops::DequeueOpMaker);
|
@ -0,0 +1,79 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include <string>
|
||||
#include <vector>
|
||||
#include "paddle/fluid/framework/lod_tensor.h"
|
||||
#include "paddle/fluid/framework/op_registry.h"
|
||||
#include "paddle/fluid/framework/operator.h"
|
||||
#include "paddle/fluid/framework/var_type.h"
|
||||
#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"
|
||||
|
||||
using LoDTensor = paddle::framework::LoDTensor;
|
||||
using LoDTensorBlockingQueueHolder =
|
||||
paddle::operators::reader::LoDTensorBlockingQueueHolder;
|
||||
|
||||
namespace paddle {
|
||||
namespace operators {
|
||||
|
||||
class EnqueueOp : public framework::OperatorBase {
|
||||
public:
|
||||
EnqueueOp(const std::string& type, const framework::VariableNameMap& inputs,
|
||||
const framework::VariableNameMap& outputs,
|
||||
const framework::AttributeMap& attrs)
|
||||
: OperatorBase(type, inputs, outputs, attrs) {}
|
||||
|
||||
private:
|
||||
void RunImpl(const framework::Scope& scope,
|
||||
const platform::Place& dev_place) const override {
|
||||
const std::string& queue_name = Attr<std::string>("queue_name");
|
||||
auto* queue_holder_var = scope.FindVar(queue_name);
|
||||
PADDLE_ENFORCE_NOT_NULL(
|
||||
queue_holder_var,
|
||||
platform::errors::NotFound(
|
||||
"No LoDTensorBlockingQueueHolder variable with name %s found.",
|
||||
queue_name));
|
||||
const std::string& var_name = Input("X");
|
||||
auto* in_var = scope.FindVar(var_name);
|
||||
PADDLE_ENFORCE_NOT_NULL(
|
||||
in_var, platform::errors::NotFound("No variable with name %s found.",
|
||||
var_name));
|
||||
auto* in_tensor = in_var->GetMutable<LoDTensor>();
|
||||
auto* queue_holder =
|
||||
queue_holder_var->template GetMutable<LoDTensorBlockingQueueHolder>();
|
||||
|
||||
std::vector<LoDTensor> lod_tensor_vec;
|
||||
lod_tensor_vec.emplace_back(*in_tensor);
|
||||
queue_holder->GetQueue()->Push(lod_tensor_vec);
|
||||
}
|
||||
};
|
||||
|
||||
class EnqueueOpMaker : public framework::OpProtoAndCheckerMaker {
|
||||
public:
|
||||
void Make() override {
|
||||
AddInput("X", "`lod_tensor` to enqueue");
|
||||
AddAttr<std::string>("queue_name",
|
||||
"Name of the `LoDTensorBlockingQueueHolder` variable");
|
||||
AddComment(R"DOC(
|
||||
Enqueue operator.
|
||||
)DOC");
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace operators
|
||||
} // namespace paddle
|
||||
|
||||
namespace ops = ::paddle::operators;
|
||||
|
||||
REGISTER_OP_WITHOUT_GRADIENT(enqueue, ops::EnqueueOp, ops::EnqueueOpMaker);
|
@ -0,0 +1,93 @@
|
||||
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License. */
|
||||
|
||||
#include <stdint.h>
|
||||
#include <ostream>
|
||||
#include <string>
|
||||
|
||||
#include "paddle/fluid/framework/lod_tensor.h"
|
||||
#include "paddle/fluid/framework/op_registry.h"
|
||||
#include "paddle/fluid/framework/operator.h"
|
||||
#include "paddle/fluid/framework/var_type.h"
|
||||
#include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h"
|
||||
|
||||
namespace paddle {
|
||||
namespace operators {
|
||||
|
||||
class QueueGeneratorOp : public framework::OperatorBase {
|
||||
public:
|
||||
QueueGeneratorOp(const std::string& type,
|
||||
const framework::VariableNameMap& inputs,
|
||||
const framework::VariableNameMap& outputs,
|
||||
const framework::AttributeMap& attrs)
|
||||
: OperatorBase(type, inputs, outputs, attrs) {}
|
||||
|
||||
void RunImpl(const framework::Scope& scope,
|
||||
const platform::Place& dev_place) const override {
|
||||
std::vector<std::string> names = Attr<std::vector<std::string>>("names");
|
||||
PADDLE_ENFORCE_GT(names.size(), 0, platform::errors::InvalidArgument(
|
||||
"The attribute 'names' for "
|
||||
"Op(queue_generator) must be set."));
|
||||
|
||||
int capacity = Attr<int>("capacity");
|
||||
PADDLE_ENFORCE_GT(capacity, 0,
|
||||
platform::errors::InvalidArgument(
|
||||
"The attribute 'capacity' for Op(queue_generator) "
|
||||
"must be set a positive value, "
|
||||
"but the one received is %d.",
|
||||
capacity));
|
||||
|
||||
// generate queue vars and initialize them
|
||||
for (const auto& name : names) {
|
||||
GenerateQueue(&scope, name, capacity);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
void GenerateQueue(const framework::Scope* scope, const std::string& name,
|
||||
size_t capacity) const {
|
||||
auto var = scope->FindVar(name);
|
||||
PADDLE_ENFORCE_NOT_NULL(
|
||||
var, platform::errors::NotFound(
|
||||
"Can't find var named '%s' in the global scope.", name));
|
||||
auto ptr = var->GetMutable<reader::LoDTensorBlockingQueueHolder>();
|
||||
ptr->InitOnce(capacity);
|
||||
|
||||
VLOG(3) << "generated a LodTensorBlockingQueue var named: " << name;
|
||||
}
|
||||
};
|
||||
|
||||
class QueueGeneratorOpMaker : public framework::OpProtoAndCheckerMaker {
|
||||
public:
|
||||
void Make() override {
|
||||
AddComment(R"DOC(
|
||||
QueueGenerator operator
|
||||
Generate and initialize one or more LodTensorBlockingQueueHolders.
|
||||
)DOC");
|
||||
AddAttr<std::vector<std::string>>(
|
||||
"names",
|
||||
"['name1', 'name2', ...] "
|
||||
"list of names for LodTensorBlockingQueueHolders")
|
||||
.SetDefault({});
|
||||
AddAttr<int>("capacity", "queue capacity").SetDefault(1);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace operators
|
||||
} // namespace paddle
|
||||
|
||||
namespace ops = paddle::operators;
|
||||
|
||||
REGISTER_OP_WITHOUT_GRADIENT(queue_generator, ops::QueueGeneratorOp,
|
||||
ops::QueueGeneratorOpMaker);
|
@ -0,0 +1,74 @@
|
||||
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import numpy as np
|
||||
import os
|
||||
import unittest
|
||||
|
||||
import paddle.fluid as fluid
|
||||
import paddle.fluid.layers as layers
|
||||
import paddle.fluid.core as core
|
||||
|
||||
|
||||
class TestQueue(unittest.TestCase):
|
||||
def test_eq(self):
|
||||
"""
|
||||
test queue_generator op, enqueue op and dequeue op.
|
||||
"""
|
||||
|
||||
main_program = fluid.Program()
|
||||
startup_program = fluid.Program()
|
||||
value = np.random.rand(1)
|
||||
with fluid.program_guard(main_program, startup_program):
|
||||
data_in = layers.create_global_var(
|
||||
shape=[2, 3],
|
||||
value=value,
|
||||
dtype="float32",
|
||||
persistable=True,
|
||||
name='var_in')
|
||||
data_out = layers.create_global_var(
|
||||
shape=[2, 3],
|
||||
value=value - 1.0,
|
||||
dtype="float32",
|
||||
persistable=True,
|
||||
name='var_out')
|
||||
startup_block = startup_program.block(0)
|
||||
queue_name = 'blocking_queue'
|
||||
startup_block.create_var(
|
||||
name=queue_name, persistable=True, type=core.VarDesc.VarType.RAW)
|
||||
startup_block.append_op(
|
||||
type="queue_generator", attrs={'names': [queue_name]})
|
||||
block = main_program.block(0)
|
||||
block.append_op(
|
||||
type='enqueue',
|
||||
inputs={'X': data_in},
|
||||
attrs={'queue_name': queue_name})
|
||||
block.append_op(
|
||||
type='dequeue',
|
||||
outputs={'Out': [data_out]},
|
||||
attrs={'queue_name': queue_name})
|
||||
|
||||
place = fluid.CUDAPlace(0) if core.is_compiled_with_cuda(
|
||||
) else fluid.CPUPlace()
|
||||
exe = fluid.Executor(place)
|
||||
exe.run(startup_program)
|
||||
ret = exe.run(main_program, fetch_list=[data_out.name])
|
||||
self.assertTrue(
|
||||
np.allclose(np.asarray(ret), np.full((2, 3), value, np.float32)))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in new issue