|
|
|
@ -120,11 +120,13 @@ void DoubleBufferReader::ReadNext(std::vector<LoDTensor>* out) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
out->clear();
|
|
|
|
|
out->resize(buffer_[read_pos_].size());
|
|
|
|
|
out->reserve(buffer_[read_pos_].size());
|
|
|
|
|
// TODO(fengjiayi): This copy shall be reduced.
|
|
|
|
|
for (size_t i = 0; i < buffer_[read_pos_].size(); ++i) {
|
|
|
|
|
TensorCopy(buffer_[read_pos_][i], platform::CPUPlace(), &out[i]);
|
|
|
|
|
out[i].set_lod(buffer_[read_pos_][i].lod());
|
|
|
|
|
LoDTensor dst;
|
|
|
|
|
TensorCopy(buffer_[read_pos_][i], platform::CPUPlace(), &dst);
|
|
|
|
|
dst.set_lod(buffer_[read_pos_][i].lod());
|
|
|
|
|
out->push_back(dst);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
++read_pos_;
|
|
|
|
@ -134,13 +136,13 @@ void DoubleBufferReader::ReadNext(std::vector<LoDTensor>* out) {
|
|
|
|
|
buffer_not_full_.notify_all();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool DoubleBufferReader::HasNext() {
|
|
|
|
|
bool DoubleBufferReader::HasNext() const {
|
|
|
|
|
return reader_->HasNext() || !buffer_.empty();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void DoubleBufferReader::ProducerThreadFunc() {
|
|
|
|
|
while (reader_->HasNext()) {
|
|
|
|
|
std::unique_lock<std::mutex> lck(mtx);
|
|
|
|
|
std::unique_lock<std::mutex> lck(mtx_);
|
|
|
|
|
while (((write_pos_ + 1) % kDoubleBufferSize) == read_pos_) {
|
|
|
|
|
buffer_not_full_.wait(lck);
|
|
|
|
|
}
|
|
|
|
|