|
|
|
@ -101,8 +101,9 @@ Status BatchOp::operator()() {
|
|
|
|
|
table->emplace_back(new_row);
|
|
|
|
|
// if # of rows is enough to make 1 batch (1 batch is buffer), send it to worker_queue
|
|
|
|
|
if (table->size() == static_cast<size_t>(cur_batch_size)) {
|
|
|
|
|
RETURN_IF_NOT_OK(worker_queues_[cnt++ % num_workers_]->EmplaceBack(
|
|
|
|
|
std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt - epoch_num))));
|
|
|
|
|
RETURN_IF_NOT_OK(worker_queues_[cnt % num_workers_]->EmplaceBack(
|
|
|
|
|
std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt + 1 - epoch_num))));
|
|
|
|
|
cnt++;
|
|
|
|
|
table = std::make_unique<TensorQTable>();
|
|
|
|
|
RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(epoch_num, batch_num, cnt - epoch_num)));
|
|
|
|
|
}
|
|
|
|
@ -110,8 +111,9 @@ Status BatchOp::operator()() {
|
|
|
|
|
}
|
|
|
|
|
// Reminder logic, execute only when there is a remainder (table is non empty) and don't drop
|
|
|
|
|
if (drop_ == false && table->empty() == false) {
|
|
|
|
|
RETURN_IF_NOT_OK(worker_queues_[cnt++ % num_workers_]->EmplaceBack(
|
|
|
|
|
std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt - epoch_num))));
|
|
|
|
|
RETURN_IF_NOT_OK(worker_queues_[cnt % num_workers_]->EmplaceBack(
|
|
|
|
|
std::make_pair(std::move(table), CBatchInfo(epoch_num, batch_num++, cnt + 1 - epoch_num))));
|
|
|
|
|
cnt++;
|
|
|
|
|
}
|
|
|
|
|
table = std::make_unique<TensorQTable>(); // this drops when drop == true
|
|
|
|
|
// end of the current epoch, batch_num should start from 0 again
|
|
|
|
|