From 1a25f3cd07dd52e50e52d6071fa5aa32b531db12 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Wed, 25 Apr 2018 19:32:08 +0800 Subject: [PATCH 1/7] Add reader blocking queue --- paddle/fluid/operators/reader/CMakeLists.txt | 2 + .../fluid/operators/reader/blocking_queue.h | 117 +++++++++ .../reader/reader_blocking_queue_test.cc | 224 ++++++++++++++++++ 3 files changed, 343 insertions(+) create mode 100644 paddle/fluid/operators/reader/blocking_queue.h create mode 100644 paddle/fluid/operators/reader/reader_blocking_queue_test.cc diff --git a/paddle/fluid/operators/reader/CMakeLists.txt b/paddle/fluid/operators/reader/CMakeLists.txt index 845528860f..3106978eb0 100644 --- a/paddle/fluid/operators/reader/CMakeLists.txt +++ b/paddle/fluid/operators/reader/CMakeLists.txt @@ -23,5 +23,7 @@ reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_o reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc) reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc) reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc) + +cc_test(reader_blocking_queue_test SRCS reader_blocking_queue_test.cc) # Export local libraries to parent set(READER_LIBRARY ${LOCAL_READER_LIBS} PARENT_SCOPE) diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h new file mode 100644 index 0000000000..4a54ccd5e8 --- /dev/null +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -0,0 +1,117 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include // NOLINT +#include +#include + +#include "paddle/fluid/platform/enforce.h" + +namespace paddle { +namespace operators { +namespace reader { + +template +class BlockingQueue { + public: + explicit BlockingQueue(size_t capacity) + : capacity_(capacity), closed_(false) { + PADDLE_ENFORCE_GT( + capacity_, 0, + "The capacity of a reader::BlockingQueue must be greater than 0."); + } + + bool Send(const T& elem) { + std::unique_lock lock(mutex_); + send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); + if (closed_) { + return false; + } else { + PADDLE_ENFORCE_LT(queue_.size(), capacity_); + queue_.push_back(elem); + receive_cv_.notify_one(); + return true; + } + } + + bool Send(T&& elem) { + std::unique_lock lock(mutex_); + send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); + if (closed_) { + return false; + } else { + PADDLE_ENFORCE_LT(queue_.size(), capacity_); + queue_.emplace_back(std::move(elem)); + receive_cv_.notify_one(); + return true; + } + } + + bool Receive(T* elem) { + std::unique_lock lock(mutex_); + receive_cv_.wait(lock, [&] { return !queue_.empty() || closed_; }); + if (!queue_.empty()) { + PADDLE_ENFORCE_NOT_NULL(elem); + *elem = queue_.front(); + queue_.pop_front(); + send_cv_.notify_one(); + return true; + } else { + PADDLE_ENFORCE(closed_); + return false; + } + } + + void Close() { + std::lock_guard lock(mutex_); + closed_ = true; + send_cv_.notify_all(); + receive_cv_.notify_all(); + } + + bool IsClosed() { + std::lock_guard lock(mutex_); + return closed_; + } + + bool CanSend() { + std::lock_guard lock(mutex_); + return !closed_ && queue_.size() < capacity_; + } + + bool CanReceive() { + std::lock_guard lock(mutex_); + return !queue_.empty(); + } + + size_t Cap() { + std::lock_guard lock(mutex_); + return capacity_; + } + + private: + size_t capacity_; + bool closed_; + std::deque queue_; + + std::mutex mutex_; + std::condition_variable receive_cv_; + std::condition_variable send_cv_; +}; +} // namespace reader +} // namespace operators +} // namespace paddle diff --git a/paddle/fluid/operators/reader/reader_blocking_queue_test.cc b/paddle/fluid/operators/reader/reader_blocking_queue_test.cc new file mode 100644 index 0000000000..07c22fe2bf --- /dev/null +++ b/paddle/fluid/operators/reader/reader_blocking_queue_test.cc @@ -0,0 +1,224 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include // NOLINT +#include +#include // NOLINT +#include +#include "gtest/gtest.h" + +#include "paddle/fluid/operators/reader/blocking_queue.h" + +using paddle::operators::reader::BlockingQueue; + +TEST(BlockingQueue, CapacityTest) { + size_t cap = 10; + BlockingQueue q(cap); + EXPECT_EQ(q.Cap(), cap); +} + +TEST(BlockingQueue, CanSendTest) { + size_t cap = 1; + BlockingQueue q(cap); + q.Send(1); + EXPECT_FALSE(q.CanSend()); +} + +void FirstInFirstOut(size_t queue_cap, size_t elem_num, size_t send_time_gap, + size_t receive_time_gap) { + BlockingQueue q(queue_cap); + std::thread sender([&]() { + for (size_t i = 0; i < elem_num; ++i) { + std::this_thread::sleep_for(std::chrono::milliseconds(send_time_gap)); + EXPECT_TRUE(q.Send(i)); + } + q.Close(); + }); + size_t count = 0; + while (true) { + std::this_thread::sleep_for(std::chrono::milliseconds(receive_time_gap)); + size_t elem; + if (!q.Receive(&elem)) { + break; + } + EXPECT_EQ(elem, count++); + } + sender.join(); + EXPECT_EQ(count, elem_num); + EXPECT_TRUE(q.IsClosed()); +} + +TEST(BlockingQueue, FirstInFirstOutTest) { + FirstInFirstOut(2, 5, 2, 50); + FirstInFirstOut(2, 5, 50, 2); + FirstInFirstOut(10, 3, 50, 2); + FirstInFirstOut(10, 3, 2, 50); +} + +TEST(BlockingQueue, SenderBlockingTest) { + const size_t queue_cap = 2; + BlockingQueue q(queue_cap); + size_t send_count = 0; + std::thread sender([&]() { + for (size_t i = 0; i < 5; ++i) { + if (!q.Send(i)) { + break; + } + ++send_count; + } + }); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + q.Close(); + sender.join(); + EXPECT_EQ(send_count, queue_cap); + std::vector res; + while (q.CanReceive()) { + size_t elem; + EXPECT_TRUE(q.Receive(&elem)); + res.push_back(elem); + } + EXPECT_EQ(res.size(), queue_cap); + for (size_t i = 0; i < res.size(); ++i) { + EXPECT_EQ(res[i], i); + } +} + +TEST(BlockingQueue, ReceiverBlockingTest) { + const size_t queue_cap = 5; + BlockingQueue q(queue_cap); + std::vector receive_res; + std::thread receiver([&]() { + size_t elem; + while (true) { + if (!q.Receive(&elem)) { + break; + } + receive_res.push_back(elem); + } + }); + std::vector to_send{2, 1, 7}; + for (auto e : to_send) { + q.Send(e); + } + q.Close(); + receiver.join(); + EXPECT_EQ(receive_res.size(), to_send.size()); + for (size_t i = 0; i < to_send.size(); ++i) { + EXPECT_EQ(receive_res[i], to_send[i]); + } +} + +void CheckIsUnorderedSame(const std::vector>& v1, + const std::vector>& v2) { + std::set s1; + std::set s2; + for (auto vec : v1) { + for (size_t elem : vec) { + s1.insert(elem); + } + } + for (auto vec : v2) { + for (size_t elem : vec) { + s2.insert(elem); + } + } + EXPECT_EQ(s1.size(), s2.size()); + auto it1 = s1.begin(); + auto it2 = s2.begin(); + while (it1 != s1.end()) { + EXPECT_EQ(*it1, *it2); + ++it1; + ++it2; + } +} + +void MultiSenderMultiReceiver(const size_t queue_cap, + const std::vector>& to_send, + size_t receiver_num, size_t send_time_gap, + size_t receive_time_gap) { + BlockingQueue q(queue_cap); + size_t sender_num = to_send.size(); + std::vector senders; + for (size_t s_idx = 0; s_idx < sender_num; ++s_idx) { + senders.emplace_back(std::thread([&, s_idx] { + for (size_t elem : to_send[s_idx]) { + std::this_thread::sleep_for(std::chrono::milliseconds(send_time_gap)); + EXPECT_TRUE(q.Send(elem)); + } + })); + } + std::vector receivers; + std::mutex mu; + std::vector> res; + for (size_t r_idx = 0; r_idx < receiver_num; ++r_idx) { + receivers.emplace_back(std::thread([&] { + std::vector receiver_res; + while (true) { + std::this_thread::sleep_for( + std::chrono::milliseconds(receive_time_gap)); + size_t elem; + if (!q.Receive(&elem)) { + break; + } + receiver_res.push_back(elem); + } + std::lock_guard lock(mu); + res.push_back(receiver_res); + })); + } + for (auto& t : senders) { + t.join(); + } + q.Close(); + for (auto& t : receivers) { + t.join(); + } + CheckIsUnorderedSame(to_send, res); +} + +TEST(BlockingQueue, MultiSenderMultiReaderTest) { + std::vector> to_send_1{{2, 3, 4}, {9}, {0, 7, 15, 6}}; + MultiSenderMultiReceiver(2, to_send_1, 2, 0, 0); + MultiSenderMultiReceiver(10, to_send_1, 2, 0, 0); + MultiSenderMultiReceiver(2, to_send_1, 20, 0, 0); + MultiSenderMultiReceiver(2, to_send_1, 2, 50, 0); + MultiSenderMultiReceiver(2, to_send_1, 2, 0, 50); + + std::vector> to_send_2{ + {2, 3, 4}, {}, {0, 7, 15, 6, 9, 32}}; + MultiSenderMultiReceiver(2, to_send_2, 3, 0, 0); + MultiSenderMultiReceiver(20, to_send_2, 3, 0, 0); + MultiSenderMultiReceiver(2, to_send_2, 30, 0, 0); + MultiSenderMultiReceiver(2, to_send_2, 3, 50, 0); + MultiSenderMultiReceiver(2, to_send_2, 3, 0, 50); +} + +struct MyClass { + MyClass() : val_(0) {} + explicit MyClass(int val) : val_(val) {} + MyClass(const MyClass& b) { val_ = b.val_; } + MyClass(MyClass&& b) { val_ = b.val_; } + void operator=(const MyClass& b) { val_ = b.val_; } + + int val_; +}; + +TEST(BlockingQueue, MyClassTest) { + BlockingQueue q(2); + MyClass a(200); + q.Send(std::move(a)); + MyClass b; + q.Receive(&b); + EXPECT_EQ(a.val_, b.val_); +} From a7866113146229514870f51c5c7d2c6001eada60 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Wed, 25 Apr 2018 19:41:52 +0800 Subject: [PATCH 2/7] Replace Channel in MultiFileReader with BlockingQueue --- .../fluid/operators/reader/open_files_op.cc | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 779dc8a6a0..461b56c7a4 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -14,7 +14,7 @@ #include // NOLINT -#include "paddle/fluid/framework/channel.h" +#include "paddle/fluid/operators/reader/blocking_queue.h" #include "paddle/fluid/operators/reader/reader_op_registry.h" namespace paddle { @@ -48,9 +48,9 @@ class MultiFileReader : public framework::ReaderBase { std::thread scheduler_; std::vector prefetchers_; size_t buffer_size_; - framework::Channel* waiting_file_idx_; - framework::Channel* available_thread_idx_; - framework::Channel>* buffer_; + reader::BlockingQueue* waiting_file_idx_; + reader::BlockingQueue* available_thread_idx_; + reader::BlockingQueue>* buffer_; }; void MultiFileReader::ReadNext(std::vector* out) { @@ -73,17 +73,17 @@ bool MultiFileReader::HasNext() { void MultiFileReader::StartNewScheduler() { size_t thread_num = prefetchers_.size(); - waiting_file_idx_ = framework::MakeChannel(file_names_.size()); - available_thread_idx_ = framework::MakeChannel(thread_num); - buffer_ = - framework::MakeChannel>(buffer_size_); + waiting_file_idx_ = new reader::BlockingQueue(file_names_.size()); + available_thread_idx_ = new reader::BlockingQueue(thread_num); + buffer_ = new reader::BlockingQueue>( + buffer_size_); for (size_t i = 0; i < file_names_.size(); ++i) { - waiting_file_idx_->Send(&i); + waiting_file_idx_->Send(i); } waiting_file_idx_->Close(); for (size_t i = 0; i < thread_num; ++i) { - available_thread_idx_->Send(&i); + available_thread_idx_->Send(i); } scheduler_ = std::thread([this] { ScheduleThreadFunc(); }); @@ -149,7 +149,7 @@ void MultiFileReader::PrefetchThreadFunc(std::string file_name, break; } try { - buffer_->Send(&ins); + buffer_->Send(std::move(ins)); } catch (paddle::platform::EnforceNotMet e) { VLOG(5) << "WARNING: The buffer channel has been closed. The prefetch " "thread of file '" @@ -158,9 +158,7 @@ void MultiFileReader::PrefetchThreadFunc(std::string file_name, } } - try { - available_thread_idx_->Send(&thread_idx); - } catch (paddle::platform::EnforceNotMet e) { + if (!available_thread_idx_->Send(thread_idx)) { VLOG(5) << "WARNING: The available_thread_idx_ channel has been closed. " "Fail to send thread_idx."; } From e2ca42408bc1580ffecbdb0b922f67eddd7aad94 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Wed, 25 Apr 2018 19:54:46 +0800 Subject: [PATCH 3/7] Replace Channel in DoubleBufferReader with BlockingQueue --- .../reader/create_double_buffer_reader_op.cc | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 4372f23fc1..504e069b65 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -14,7 +14,7 @@ #include // NOLINT -#include "paddle/fluid/framework/channel.h" +#include "paddle/fluid/operators/reader/blocking_queue.h" #include "paddle/fluid/operators/reader/reader_op_registry.h" namespace paddle { @@ -23,13 +23,13 @@ namespace reader { // 'Double buffer' means we shall maintain two batches of input data at the same // time. So the kCacheSize shoul be at least 2. -static constexpr size_t kCacheSize = 2; +static constexpr size_t kCacheSize = 3; // There will be two bacthes out of the channel during training: // 1. the one waiting to be sent to the channel // 2. the one just be received from the channel, which is also being used by // subsequent operators. // So the channel size should be kChacheSize - 2 -static constexpr size_t kChannelSize = 0; // kCacheSize - 2 +static constexpr size_t kChannelSize = 1; // kCacheSize - 2 class DoubleBufferReader : public framework::DecoratedReader { public: @@ -58,7 +58,7 @@ class DoubleBufferReader : public framework::DecoratedReader { bool HasNext() const; void StartPrefetcher() { - channel_ = framework::MakeChannel(kChannelSize); + channel_ = new reader::BlockingQueue(kChannelSize); prefetcher_ = std::thread([this] { PrefetchThreadFunc(); }); } @@ -74,7 +74,7 @@ class DoubleBufferReader : public framework::DecoratedReader { void PrefetchThreadFunc(); std::thread prefetcher_; - framework::Channel* channel_; + reader::BlockingQueue* channel_; platform::Place place_; std::vector> cpu_tensor_cache_; std::vector> gpu_tensor_cache_; @@ -185,10 +185,7 @@ void DoubleBufferReader::PrefetchThreadFunc() { gpu_batch[i].set_lod(cpu_batch[i].lod()); } } - try { - size_t tmp = cached_tensor_id; - channel_->Send(&tmp); - } catch (paddle::platform::EnforceNotMet e) { + if (!channel_->Send(cached_tensor_id)) { VLOG(5) << "WARNING: The double buffer channel has been closed. The " "prefetch thread will terminate."; break; From 4cb63d845150b0b5b27505e174fe1023e3c0b229 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Thu, 26 Apr 2018 00:53:44 +0800 Subject: [PATCH 4/7] Remove unnecessary header files --- paddle/fluid/operators/reader/blocking_queue.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index 4a54ccd5e8..5d0c27d30c 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -14,10 +14,8 @@ #pragma once -#include #include // NOLINT #include -#include #include "paddle/fluid/platform/enforce.h" From 304b6b71389e5bc035e6c04c6fdb76d2717f3588 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Thu, 26 Apr 2018 11:11:25 +0800 Subject: [PATCH 5/7] Follow comments --- .../fluid/operators/reader/blocking_queue.h | 37 +++++++++---------- .../reader/create_double_buffer_reader_op.cc | 17 ++------- .../fluid/operators/reader/open_files_op.cc | 12 +----- 3 files changed, 23 insertions(+), 43 deletions(-) diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index 5d0c27d30c..71684b1417 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -25,6 +25,11 @@ namespace reader { template class BlockingQueue { + // BlockingQueue is for buffered reading and is supposed to use only the + // reader package. It is true that we could and we should have been using + // framework::Channel, but which has currently a deadlock bug. BlockingQueue + // is a workaround and a simplified version of framework::Channel as it + // doesn't support GPU and it implements on buffered blocking queue. public: explicit BlockingQueue(size_t capacity) : capacity_(capacity), closed_(false) { @@ -37,26 +42,28 @@ class BlockingQueue { std::unique_lock lock(mutex_); send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); if (closed_) { + VLOG(5) + << "WARNING: Sending an element to a closed reader::BlokcingQueue."; return false; - } else { - PADDLE_ENFORCE_LT(queue_.size(), capacity_); - queue_.push_back(elem); - receive_cv_.notify_one(); - return true; } + PADDLE_ENFORCE_LT(queue_.size(), capacity_); + queue_.push_back(elem); + receive_cv_.notify_one(); + return true; } bool Send(T&& elem) { std::unique_lock lock(mutex_); send_cv_.wait(lock, [&] { return queue_.size() < capacity_ || closed_; }); if (closed_) { + VLOG(5) + << "WARNING: Sending an element to a closed reader::BlokcingQueue."; return false; - } else { - PADDLE_ENFORCE_LT(queue_.size(), capacity_); - queue_.emplace_back(std::move(elem)); - receive_cv_.notify_one(); - return true; } + PADDLE_ENFORCE_LT(queue_.size(), capacity_); + queue_.emplace_back(std::move(elem)); + receive_cv_.notify_one(); + return true; } bool Receive(T* elem) { @@ -86,16 +93,6 @@ class BlockingQueue { return closed_; } - bool CanSend() { - std::lock_guard lock(mutex_); - return !closed_ && queue_.size() < capacity_; - } - - bool CanReceive() { - std::lock_guard lock(mutex_); - return !queue_.empty(); - } - size_t Cap() { std::lock_guard lock(mutex_); return capacity_; diff --git a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc index 504e069b65..3fdc31dfa5 100644 --- a/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc +++ b/paddle/fluid/operators/reader/create_double_buffer_reader_op.cc @@ -55,8 +55,6 @@ class DoubleBufferReader : public framework::DecoratedReader { ~DoubleBufferReader() { EndPrefetcher(); } private: - bool HasNext() const; - void StartPrefetcher() { channel_ = new reader::BlockingQueue(kChannelSize); prefetcher_ = std::thread([this] { PrefetchThreadFunc(); }); @@ -139,17 +137,16 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase { }; void DoubleBufferReader::ReadNext(std::vector* out) { - out->clear(); - if (HasNext()) { - size_t cached_tensor_id; - channel_->Receive(&cached_tensor_id); + size_t cached_tensor_id; + if (channel_->Receive(&cached_tensor_id)) { if (platform::is_gpu_place(place_)) { *out = gpu_tensor_cache_[cached_tensor_id]; - ctxs_[cached_tensor_id]->Wait(); } else { // CPU place *out = cpu_tensor_cache_[cached_tensor_id]; } + } else { + out->clear(); } } @@ -159,12 +156,6 @@ void DoubleBufferReader::ReInit() { StartPrefetcher(); } -bool DoubleBufferReader::HasNext() const { - while (!channel_->IsClosed() && !channel_->CanReceive()) { - } - return channel_->CanReceive(); -} - void DoubleBufferReader::PrefetchThreadFunc() { VLOG(5) << "A new prefetch thread starts."; size_t cached_tensor_id = 0; diff --git a/paddle/fluid/operators/reader/open_files_op.cc b/paddle/fluid/operators/reader/open_files_op.cc index 461b56c7a4..91ad7d5658 100644 --- a/paddle/fluid/operators/reader/open_files_op.cc +++ b/paddle/fluid/operators/reader/open_files_op.cc @@ -37,7 +37,6 @@ class MultiFileReader : public framework::ReaderBase { ~MultiFileReader() { EndScheduler(); } private: - bool HasNext(); void StartNewScheduler(); void EndScheduler(); void ScheduleThreadFunc(); @@ -54,9 +53,8 @@ class MultiFileReader : public framework::ReaderBase { }; void MultiFileReader::ReadNext(std::vector* out) { - out->clear(); - if (HasNext()) { - buffer_->Receive(out); + if (!buffer_->Receive(out)) { + out->clear(); } } @@ -65,12 +63,6 @@ void MultiFileReader::ReInit() { StartNewScheduler(); } -bool MultiFileReader::HasNext() { - while (!buffer_->IsClosed() && !buffer_->CanReceive()) { - } - return buffer_->CanReceive(); -} - void MultiFileReader::StartNewScheduler() { size_t thread_num = prefetchers_.size(); waiting_file_idx_ = new reader::BlockingQueue(file_names_.size()); From 17c51d69d12c09cf0af3baec2a8ebe3c9ec1de93 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Thu, 26 Apr 2018 11:26:25 +0800 Subject: [PATCH 6/7] fix unit test error --- .../operators/reader/reader_blocking_queue_test.cc | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/paddle/fluid/operators/reader/reader_blocking_queue_test.cc b/paddle/fluid/operators/reader/reader_blocking_queue_test.cc index 07c22fe2bf..c0229f53f2 100644 --- a/paddle/fluid/operators/reader/reader_blocking_queue_test.cc +++ b/paddle/fluid/operators/reader/reader_blocking_queue_test.cc @@ -28,13 +28,6 @@ TEST(BlockingQueue, CapacityTest) { EXPECT_EQ(q.Cap(), cap); } -TEST(BlockingQueue, CanSendTest) { - size_t cap = 1; - BlockingQueue q(cap); - q.Send(1); - EXPECT_FALSE(q.CanSend()); -} - void FirstInFirstOut(size_t queue_cap, size_t elem_num, size_t send_time_gap, size_t receive_time_gap) { BlockingQueue q(queue_cap); @@ -83,9 +76,11 @@ TEST(BlockingQueue, SenderBlockingTest) { sender.join(); EXPECT_EQ(send_count, queue_cap); std::vector res; - while (q.CanReceive()) { + while (true) { size_t elem; - EXPECT_TRUE(q.Receive(&elem)); + if (!q.Receive(&elem) { + break; + } res.push_back(elem); } EXPECT_EQ(res.size(), queue_cap); From 8bd34664f12c61178a82dd1be535f4ae798a4540 Mon Sep 17 00:00:00 2001 From: fengjiayi Date: Thu, 26 Apr 2018 11:27:17 +0800 Subject: [PATCH 7/7] fix unit test error --- paddle/fluid/operators/reader/reader_blocking_queue_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/operators/reader/reader_blocking_queue_test.cc b/paddle/fluid/operators/reader/reader_blocking_queue_test.cc index c0229f53f2..7d1b381d56 100644 --- a/paddle/fluid/operators/reader/reader_blocking_queue_test.cc +++ b/paddle/fluid/operators/reader/reader_blocking_queue_test.cc @@ -78,7 +78,7 @@ TEST(BlockingQueue, SenderBlockingTest) { std::vector res; while (true) { size_t elem; - if (!q.Receive(&elem) { + if (!q.Receive(&elem)) { break; } res.push_back(elem);