|
|
|
@ -12,7 +12,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
|
limitations under the License. */
|
|
|
|
|
|
|
|
|
|
#include "paddle/framework/channel.h"
|
|
|
|
|
#include "paddle/fluid/framework/channel.h"
|
|
|
|
|
|
|
|
|
|
#include <chrono>
|
|
|
|
|
#include <thread>
|
|
|
|
@ -176,7 +176,7 @@ TEST(Channel, ConcurrentSendNonConcurrentReceiveWithSufficientBufferSize) {
|
|
|
|
|
sum += i;
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.5 sec
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.1 sec
|
|
|
|
|
EXPECT_EQ(sum, 45U);
|
|
|
|
|
|
|
|
|
|
CloseChannel(ch);
|
|
|
|
@ -194,10 +194,7 @@ TEST(Channel, RecevingOrderEqualToSendingOrderWithBufferedChannel) {
|
|
|
|
|
RecevingOrderEqualToSendingOrder(ch);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This tests that closing a buffered channel also unblocks
|
|
|
|
|
// any receivers waiting on the channel
|
|
|
|
|
TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
|
|
|
|
|
auto ch = MakeChannel<int>(1);
|
|
|
|
|
void ChannelCloseUnblocksReceiversTest(Channel<int> *ch) {
|
|
|
|
|
size_t num_threads = 5;
|
|
|
|
|
std::thread t[num_threads];
|
|
|
|
|
bool thread_ended[num_threads];
|
|
|
|
@ -208,15 +205,14 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
|
|
|
|
|
t[i] = std::thread(
|
|
|
|
|
[&](bool *p) {
|
|
|
|
|
int data;
|
|
|
|
|
// All reads should return false
|
|
|
|
|
EXPECT_EQ(ch->Receive(&data), false);
|
|
|
|
|
*p = true;
|
|
|
|
|
},
|
|
|
|
|
&thread_ended[i]);
|
|
|
|
|
}
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.1 sec
|
|
|
|
|
|
|
|
|
|
// Verify that all threads are blocked
|
|
|
|
|
// Verify that all the threads are blocked
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
EXPECT_EQ(thread_ended[i], false);
|
|
|
|
|
}
|
|
|
|
@ -225,7 +221,7 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
|
|
|
|
|
// This should unblock all receivers
|
|
|
|
|
CloseChannel(ch);
|
|
|
|
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait 0.1 sec
|
|
|
|
|
|
|
|
|
|
// Verify that all threads got unblocked
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
@ -233,13 +229,12 @@ TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) t[i].join();
|
|
|
|
|
delete ch;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This tests that closing a buffered channel also unblocks
|
|
|
|
|
// any senders waiting for channel to have write space
|
|
|
|
|
TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
|
|
|
|
|
auto ch = MakeChannel<int>(1);
|
|
|
|
|
void ChannelCloseUnblocksSendersTest(Channel<int> *ch) {
|
|
|
|
|
using paddle::framework::details::Buffered;
|
|
|
|
|
using paddle::framework::details::UnBuffered;
|
|
|
|
|
|
|
|
|
|
size_t num_threads = 5;
|
|
|
|
|
std::thread t[num_threads];
|
|
|
|
|
bool thread_ended[num_threads];
|
|
|
|
@ -259,34 +254,56 @@ TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
|
|
|
|
|
}
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
|
|
|
|
|
|
|
|
|
|
// Verify that atleast 4 threads are blocked
|
|
|
|
|
int ct = 0;
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
if (thread_ended[i] == false) ct++;
|
|
|
|
|
if (dynamic_cast<Buffered<int> *>(ch)) {
|
|
|
|
|
// If ch is Buffered, atleast 4 threads must be blocked.
|
|
|
|
|
int ct = 0;
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
if (!thread_ended[i]) ct++;
|
|
|
|
|
}
|
|
|
|
|
EXPECT_GE(ct, 4);
|
|
|
|
|
} else {
|
|
|
|
|
// If ch is UnBuffered, all the threads should be blocked.
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
EXPECT_EQ(thread_ended[i], false);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// Atleast 4 threads must be blocked
|
|
|
|
|
EXPECT_GE(ct, 4);
|
|
|
|
|
|
|
|
|
|
// Explicitly close the thread
|
|
|
|
|
// This should unblock all senders
|
|
|
|
|
CloseChannel(ch);
|
|
|
|
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(200)); // wait
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // wait
|
|
|
|
|
|
|
|
|
|
// Verify that all threads got unblocked
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
EXPECT_EQ(thread_ended[i], true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Verify that only 1 send was successful
|
|
|
|
|
ct = 0;
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
if (send_success[i]) ct++;
|
|
|
|
|
if (dynamic_cast<Buffered<int> *>(ch)) {
|
|
|
|
|
// Verify that only 1 send was successful
|
|
|
|
|
int ct = 0;
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
if (send_success[i]) ct++;
|
|
|
|
|
}
|
|
|
|
|
// Only 1 send must be successful
|
|
|
|
|
EXPECT_EQ(ct, 1);
|
|
|
|
|
}
|
|
|
|
|
// Only 1 send must be successful
|
|
|
|
|
EXPECT_EQ(ct, 1);
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) t[i].join();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This tests that closing a buffered channel also unblocks
|
|
|
|
|
// any receivers waiting on the channel
|
|
|
|
|
TEST(Channel, BufferedChannelCloseUnblocksReceiversTest) {
|
|
|
|
|
auto ch = MakeChannel<int>(1);
|
|
|
|
|
ChannelCloseUnblocksReceiversTest(ch);
|
|
|
|
|
delete ch;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// This tests that closing a buffered channel also unblocks
|
|
|
|
|
// any senders waiting for channel to have write space
|
|
|
|
|
TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
|
|
|
|
|
auto ch = MakeChannel<int>(1);
|
|
|
|
|
ChannelCloseUnblocksSendersTest(ch);
|
|
|
|
|
delete ch;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -294,40 +311,7 @@ TEST(Channel, BufferedChannelCloseUnblocksSendersTest) {
|
|
|
|
|
// unblocks any receivers waiting for senders
|
|
|
|
|
TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
|
|
|
|
|
auto ch = MakeChannel<int>(0);
|
|
|
|
|
size_t num_threads = 5;
|
|
|
|
|
std::thread t[num_threads];
|
|
|
|
|
bool thread_ended[num_threads];
|
|
|
|
|
|
|
|
|
|
// Launches threads that try to read and are blocked becausew of no writers
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
thread_ended[i] = false;
|
|
|
|
|
t[i] = std::thread(
|
|
|
|
|
[&](bool *p) {
|
|
|
|
|
int data;
|
|
|
|
|
EXPECT_EQ(ch->Receive(&data), false);
|
|
|
|
|
*p = true;
|
|
|
|
|
},
|
|
|
|
|
&thread_ended[i]);
|
|
|
|
|
}
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
|
|
|
|
|
|
|
|
|
|
// Verify that all the threads are blocked
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
EXPECT_EQ(thread_ended[i], false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Explicitly close the thread
|
|
|
|
|
// This should unblock all receivers
|
|
|
|
|
CloseChannel(ch);
|
|
|
|
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
|
|
|
|
|
|
|
|
|
|
// Verify that all threads got unblocked
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
EXPECT_EQ(thread_ended[i], true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) t[i].join();
|
|
|
|
|
ChannelCloseUnblocksReceiversTest(ch);
|
|
|
|
|
delete ch;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -335,40 +319,7 @@ TEST(Channel, UnbufferedChannelCloseUnblocksReceiversTest) {
|
|
|
|
|
// unblocks any senders waiting for senders
|
|
|
|
|
TEST(Channel, UnbufferedChannelCloseUnblocksSendersTest) {
|
|
|
|
|
auto ch = MakeChannel<int>(0);
|
|
|
|
|
size_t num_threads = 5;
|
|
|
|
|
std::thread t[num_threads];
|
|
|
|
|
bool thread_ended[num_threads];
|
|
|
|
|
|
|
|
|
|
// Launches threads that try to read and are blocked becausew of no writers
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
thread_ended[i] = false;
|
|
|
|
|
t[i] = std::thread(
|
|
|
|
|
[&](bool *p) {
|
|
|
|
|
int data = 10;
|
|
|
|
|
EXPECT_EQ(ch->Send(&data), false);
|
|
|
|
|
*p = true;
|
|
|
|
|
},
|
|
|
|
|
&thread_ended[i]);
|
|
|
|
|
}
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
|
|
|
|
|
|
|
|
|
|
// Verify that all the threads are blocked
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
EXPECT_EQ(thread_ended[i], false);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Explicitly close the thread
|
|
|
|
|
// This should unblock all receivers
|
|
|
|
|
CloseChannel(ch);
|
|
|
|
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // wait 0.5 sec
|
|
|
|
|
|
|
|
|
|
// Verify that all threads got unblocked
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) {
|
|
|
|
|
EXPECT_EQ(thread_ended[i], true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < num_threads; i++) t[i].join();
|
|
|
|
|
ChannelCloseUnblocksReceiversTest(ch);
|
|
|
|
|
delete ch;
|
|
|
|
|
}
|
|
|
|
|
|