|
|
|
@ -125,36 +125,36 @@ class MultiGzipReader : public Reader {
|
|
|
|
|
|
|
|
|
|
void MonitorThread(std::vector<ReaderThreadStatus>* thread_status,
|
|
|
|
|
std::shared_ptr<LoDTensorBlockingQueue> queue) {
|
|
|
|
|
VLOG(3) << "monitor thread in";
|
|
|
|
|
VLOG(30) << "monitor thread in";
|
|
|
|
|
bool reader_thread_is_running = true;
|
|
|
|
|
while (reader_thread_is_running) {
|
|
|
|
|
VLOG(3) << "reader_thread_is_running";
|
|
|
|
|
VLOG(30) << "reader_thread_is_running";
|
|
|
|
|
reader_thread_is_running = false;
|
|
|
|
|
for (size_t i = 0; i < (*thread_status).size(); ++i) {
|
|
|
|
|
if ((*thread_status)[i] == Running) {
|
|
|
|
|
VLOG(3) << "reader is running!";
|
|
|
|
|
VLOG(30) << "reader is running!";
|
|
|
|
|
reader_thread_is_running = true;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
|
|
|
|
|
}
|
|
|
|
|
VLOG(3) << "all reader thread is stopped, push empty data into queue";
|
|
|
|
|
VLOG(30) << "all reader thread is stopped, push empty data into queue";
|
|
|
|
|
queue->Push({});
|
|
|
|
|
VLOG(3) << "monitor thread exited";
|
|
|
|
|
VLOG(30) << "monitor thread exited";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void ReadThread(const std::vector<std::string>& file_list,
|
|
|
|
|
const std::vector<std::string>& slots, int batch_size,
|
|
|
|
|
int thread_id, std::vector<ReaderThreadStatus>* thread_status,
|
|
|
|
|
std::shared_ptr<LoDTensorBlockingQueue> queue) {
|
|
|
|
|
VLOG(3) << "[" << thread_id << "]"
|
|
|
|
|
<< " reader thread start! thread_id = " << thread_id;
|
|
|
|
|
VLOG(30) << "[" << thread_id << "]"
|
|
|
|
|
<< " reader thread start! thread_id = " << thread_id;
|
|
|
|
|
for (auto& file : file_list) {
|
|
|
|
|
VLOG(3) << "[" << thread_id << "]"
|
|
|
|
|
<< " file " << file;
|
|
|
|
|
VLOG(30) << "[" << thread_id << "]"
|
|
|
|
|
<< " file " << file;
|
|
|
|
|
}
|
|
|
|
|
(*thread_status)[thread_id] = Running;
|
|
|
|
|
VLOG(3) << "set status to running";
|
|
|
|
|
VLOG(30) << "set status to running";
|
|
|
|
|
|
|
|
|
|
std::unordered_map<std::string, size_t> slot_to_index;
|
|
|
|
|
for (size_t i = 0; i < slots.size(); ++i) {
|
|
|
|
@ -168,7 +168,7 @@ void ReadThread(const std::vector<std::string>& file_list,
|
|
|
|
|
|
|
|
|
|
MultiGzipReader reader(file_list);
|
|
|
|
|
|
|
|
|
|
VLOG(3) << "reader inited";
|
|
|
|
|
VLOG(30) << "reader inited";
|
|
|
|
|
|
|
|
|
|
while (reader.HasNext()) {
|
|
|
|
|
batch_data.clear();
|
|
|
|
@ -226,11 +226,11 @@ void ReadThread(const std::vector<std::string>& file_list,
|
|
|
|
|
lod_datas.push_back(label_tensor);
|
|
|
|
|
|
|
|
|
|
queue->Push(lod_datas);
|
|
|
|
|
VLOG(4) << "push one data, queue_size=" << queue->Size();
|
|
|
|
|
VLOG(40) << "push one data, queue_size=" << queue->Size();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
(*thread_status)[thread_id] = Stopped;
|
|
|
|
|
VLOG(3) << "set status to stopped, thread " << thread_id << " exited";
|
|
|
|
|
VLOG(30) << "set status to stopped, thread " << thread_id << " exited";
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} // namespace reader
|
|
|
|
|