|
|
|
@ -177,6 +177,9 @@ int InMemoryDataFeed<T>::Next() {
|
|
|
|
|
}
|
|
|
|
|
CHECK(in_channel != nullptr);
|
|
|
|
|
CHECK(out_channel != nullptr);
|
|
|
|
|
VLOG(3) << "in_channel size=" << in_channel->Size()
|
|
|
|
|
<< ", out_channel size=" << out_channel->Size()
|
|
|
|
|
<< ", thread_id=" << thread_id_;
|
|
|
|
|
int index = 0;
|
|
|
|
|
T instance;
|
|
|
|
|
T ins_vec;
|
|
|
|
@ -259,14 +262,19 @@ void InMemoryDataFeed<T>::FillChannelToMemoryData() {
|
|
|
|
|
channel = shuffled_ins_out_;
|
|
|
|
|
}
|
|
|
|
|
CHECK(channel != nullptr);
|
|
|
|
|
local_vec.reserve(channel->Size());
|
|
|
|
|
local_vec.resize(channel->Size());
|
|
|
|
|
for (int64_t i = 0; i < channel->Size(); ++i) {
|
|
|
|
|
channel->Pop(local_vec[i]);
|
|
|
|
|
}
|
|
|
|
|
std::unique_lock<std::mutex> lock(*mutex_for_update_memory_data_);
|
|
|
|
|
lock.lock();
|
|
|
|
|
VLOG(3) << "local_vec size=" << local_vec.size() <<", thread_id=" << thread_id_;
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> g(*mutex_for_update_memory_data_);
|
|
|
|
|
VLOG(3) << "before insert, memory_data_ size=" << memory_data_->size()
|
|
|
|
|
<< ", thread_id=" << thread_id_;
|
|
|
|
|
memory_data_->insert(memory_data_->end(), local_vec.begin(), local_vec.end());
|
|
|
|
|
lock.unlock();
|
|
|
|
|
VLOG(3) << "after insert memory_data_ size=" << memory_data_->size()
|
|
|
|
|
<< ", thread_id=" << thread_id_;
|
|
|
|
|
}
|
|
|
|
|
std::vector<T>().swap(local_vec);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|