|
|
|
@ -73,7 +73,6 @@ class DoubleBufferReader : public framework::DecoratedReader {
|
|
|
|
|
framework::Channel<Item>* buffer_;
|
|
|
|
|
platform::Place place_;
|
|
|
|
|
std::vector<std::unique_ptr<platform::DeviceContext>> ctxs_;
|
|
|
|
|
mutable Item local_buffer_;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
class CreateDoubleBufferReaderOp : public framework::OperatorBase {
|
|
|
|
@ -128,12 +127,9 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
bool DoubleBufferReader::HasNext() const {
|
|
|
|
|
if (local_buffer_.payloads_.empty()) {
|
|
|
|
|
bool ok = buffer_->Receive(&local_buffer_);
|
|
|
|
|
return ok;
|
|
|
|
|
} else {
|
|
|
|
|
return true;
|
|
|
|
|
while (!buffer_->IsClosed() && !buffer_->CanReceive()) {
|
|
|
|
|
}
|
|
|
|
|
return buffer_->CanReceive()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
|
|
|
|
@ -141,10 +137,11 @@ void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
|
|
|
|
|
PADDLE_THROW("There is no next data!");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
*out = local_buffer_.payloads_;
|
|
|
|
|
local_buffer_.payloads_.clear();
|
|
|
|
|
if (local_buffer_.ctx_) {
|
|
|
|
|
local_buffer_.ctx_->Wait();
|
|
|
|
|
Item batch;
|
|
|
|
|
buffer_->Receive(&batch);
|
|
|
|
|
*out = batch.payload_;
|
|
|
|
|
if (batch.ctx_) {
|
|
|
|
|
batch.ctx_->Wait();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|