|
|
|
@ -246,8 +246,8 @@ void InMemoryDataFeed<T>::FillMemoryDataToChannel() {
|
|
|
|
|
VLOG(3) << "FillMemoryDataToChannel, thread_id=" << thread_id_;
|
|
|
|
|
auto interval = GetMemoryDataInterval();
|
|
|
|
|
VLOG(3) << "memory data size=" << memory_data_->size()
|
|
|
|
|
<< ", fill data from [" << interval.first << ", "
|
|
|
|
|
<< interval.second << "), thread_id=" << thread_id_;
|
|
|
|
|
<< ", fill data from [" << interval.first << ", " << interval.second
|
|
|
|
|
<< "), thread_id=" << thread_id_;
|
|
|
|
|
for (int64_t i = interval.first; i < interval.second; ++i) {
|
|
|
|
|
T& t = (*memory_data_)[i];
|
|
|
|
|
shuffled_ins_->Push(std::move(t));
|
|
|
|
@ -275,13 +275,13 @@ void InMemoryDataFeed<T>::FillChannelToMemoryData() {
|
|
|
|
|
channel->Pop(&local_vec[i]);
|
|
|
|
|
}
|
|
|
|
|
VLOG(3) << "local_vec size=" << local_vec.size()
|
|
|
|
|
<<", thread_id=" << thread_id_;
|
|
|
|
|
<< ", thread_id=" << thread_id_;
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> g(*mutex_for_update_memory_data_);
|
|
|
|
|
VLOG(3) << "before insert, memory_data_ size=" << memory_data_->size()
|
|
|
|
|
<< ", thread_id=" << thread_id_;
|
|
|
|
|
memory_data_->insert(memory_data_->end(), local_vec.begin(),
|
|
|
|
|
local_vec.end());
|
|
|
|
|
local_vec.end());
|
|
|
|
|
VLOG(3) << "after insert memory_data_ size=" << memory_data_->size()
|
|
|
|
|
<< ", thread_id=" << thread_id_;
|
|
|
|
|
}
|
|
|
|
@ -308,8 +308,8 @@ void InMemoryDataFeed<T>::LoadIntoMemory() {
|
|
|
|
|
local_vec.push_back(instance);
|
|
|
|
|
}
|
|
|
|
|
timeline.Pause();
|
|
|
|
|
VLOG(3) << "LoadIntoMemory() read all lines, file="
|
|
|
|
|
<< filename << ", cost time=" << timeline.ElapsedSec()
|
|
|
|
|
VLOG(3) << "LoadIntoMemory() read all lines, file=" << filename
|
|
|
|
|
<< ", cost time=" << timeline.ElapsedSec()
|
|
|
|
|
<< " seconds, thread_id=" << thread_id_;
|
|
|
|
|
{
|
|
|
|
|
std::lock_guard<std::mutex> lock(*mutex_for_update_memory_data_);
|
|
|
|
@ -319,8 +319,7 @@ void InMemoryDataFeed<T>::LoadIntoMemory() {
|
|
|
|
|
std::make_move_iterator(local_vec.end()));
|
|
|
|
|
timeline.Pause();
|
|
|
|
|
VLOG(3) << "LoadIntoMemory() memory_data insert, cost time="
|
|
|
|
|
<< timeline.ElapsedSec() << " seconds, thread_id="
|
|
|
|
|
<< thread_id_;
|
|
|
|
|
<< timeline.ElapsedSec() << " seconds, thread_id=" << thread_id_;
|
|
|
|
|
}
|
|
|
|
|
local_vec.clear();
|
|
|
|
|
}
|
|
|
|
@ -358,8 +357,8 @@ void InMemoryDataFeed<T>::GlobalShuffle() {
|
|
|
|
|
std::string send_str;
|
|
|
|
|
SerializeIns(send_vec[j], &send_str);
|
|
|
|
|
VLOG(3) << "send str_length=" << send_str.length()
|
|
|
|
|
<< ", ins num=" << send_vec[j].size() << " to node_id="
|
|
|
|
|
<< j << ", thread_id=" << thread_id_;
|
|
|
|
|
<< ", ins num=" << send_vec[j].size() << " to node_id=" << j
|
|
|
|
|
<< ", thread_id=" << thread_id_;
|
|
|
|
|
auto ret = fleet_ptr->SendClientToClientMsg(0, j, send_str);
|
|
|
|
|
VLOG(3) << "end send, thread_id=" << thread_id_;
|
|
|
|
|
send_vec[j].clear();
|
|
|
|
@ -371,8 +370,8 @@ void InMemoryDataFeed<T>::GlobalShuffle() {
|
|
|
|
|
if (send_vec[j].size() != 0) {
|
|
|
|
|
std::string send_str;
|
|
|
|
|
SerializeIns(send_vec[j], &send_str);
|
|
|
|
|
VLOG(3) << "send str_length=" << send_str.length()
|
|
|
|
|
<< " to node_id=" << j << ", thread_id=" << thread_id_;
|
|
|
|
|
VLOG(3) << "send str_length=" << send_str.length() << " to node_id=" << j
|
|
|
|
|
<< ", thread_id=" << thread_id_;
|
|
|
|
|
auto ret = fleet_ptr->SendClientToClientMsg(0, j, send_str);
|
|
|
|
|
VLOG(3) << "end send, thread_id=" << thread_id_;
|
|
|
|
|
total_status.push_back(std::move(ret));
|
|
|
|
@ -888,15 +887,13 @@ void MultiSlotInMemoryDataFeed::PutToFeedVec(
|
|
|
|
|
|
|
|
|
|
// todo serialize ins in global shuffle
|
|
|
|
|
void MultiSlotInMemoryDataFeed::SerializeIns(
|
|
|
|
|
const std::vector<std::vector<MultiSlotType>*>& ins,
|
|
|
|
|
std::string* str) {
|
|
|
|
|
const std::vector<std::vector<MultiSlotType>*>& ins, std::string* str) {
|
|
|
|
|
auto fleet_ptr = FleetWrapper::GetInstance();
|
|
|
|
|
fleet_ptr->Serialize(ins, str);
|
|
|
|
|
}
|
|
|
|
|
// todo deserialize ins in global shuffle
|
|
|
|
|
void MultiSlotInMemoryDataFeed::DeserializeIns(
|
|
|
|
|
std::vector<std::vector<MultiSlotType>>* ins,
|
|
|
|
|
const std::string& str) {
|
|
|
|
|
std::vector<std::vector<MultiSlotType>>* ins, const std::string& str) {
|
|
|
|
|
auto fleet_ptr = FleetWrapper::GetInstance();
|
|
|
|
|
fleet_ptr->Deserialize(ins, str);
|
|
|
|
|
}
|
|
|
|
|