|
|
|
@ -65,9 +65,6 @@ MapOp::MapOp(const std::vector<std::string> &in_col_names, const std::vector<std
|
|
|
|
|
tfuncs_(std::move(tensor_funcs)),
|
|
|
|
|
in_columns_(in_col_names),
|
|
|
|
|
out_columns_(out_col_names),
|
|
|
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
|
|
|
eof_worker_id_(0),
|
|
|
|
|
#endif
|
|
|
|
|
perf_mode_(perf_mode) {
|
|
|
|
|
// If caller didn't specify the out_col_names, assume they are same as the in_columns.
|
|
|
|
|
if (out_columns_.empty() || out_columns_[0].empty()) {
|
|
|
|
@ -123,17 +120,6 @@ Status MapOp::operator()() {
|
|
|
|
|
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buff, 0));
|
|
|
|
|
is_eof = buff->eof();
|
|
|
|
|
RETURN_IF_NOT_OK(local_queues_[que_id]->Add(std::move(buff)));
|
|
|
|
|
#if defined(_WIN32) || defined(_WIN64)
|
|
|
|
|
if (is_eof) {
|
|
|
|
|
eof_worker_id_ = que_id;
|
|
|
|
|
for (int32_t id = 0; id < num_workers_; id++) {
|
|
|
|
|
if (id != eof_worker_id_) {
|
|
|
|
|
auto eof_buffer = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF);
|
|
|
|
|
RETURN_IF_NOT_OK(local_queues_[id]->Add(std::move(eof_buffer)));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
que_id = (que_id + 1) % num_workers_;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -173,14 +159,6 @@ Status MapOp::WorkerEntry(int32_t worker_id) {
|
|
|
|
|
continue;
|
|
|
|
|
} else if (in_buffer->eof()) {
|
|
|
|
|
// Calling base class EofReceived to forward eof buffer.
|
|
|
|
|
#if defined(_WIN32) || defined(_Win64)
|
|
|
|
|
if (perf_mode_) {
|
|
|
|
|
if (eof_worker_id_ == worker_id) {
|
|
|
|
|
RETURN_IF_NOT_OK(EofReceived(worker_id));
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
RETURN_IF_NOT_OK(EofReceived(worker_id));
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|