|
|
|
@ -278,11 +278,11 @@ Status StorageOp::init() {
|
|
|
|
|
if (this->num_rows() % rows_per_buffer_ != 0) {
|
|
|
|
|
buffers_needed++;
|
|
|
|
|
}
|
|
|
|
|
MS_LOG(INFO) << "Master: Initializing StorageOp. Dataset files dir: " << dataset_files_dir_ << " Dataset type: "
|
|
|
|
|
<< static_cast<std::underlying_type<DatasetType>::type>(store_client_->schema()->dataset_type())
|
|
|
|
|
<< " Dataset schema file: " << schema_file_ << " Number of rows: " << num_rows_
|
|
|
|
|
<< " Rows per buffer: " << rows_per_buffer_ << " Num buffers (computed): " << buffers_needed
|
|
|
|
|
<< " Number of workers: " << num_workers_ << ".";
|
|
|
|
|
MS_LOG(DEBUG) << "Master: Initializing StorageOp. Dataset files dir: " << dataset_files_dir_ << " Dataset type: "
|
|
|
|
|
<< static_cast<std::underlying_type<DatasetType>::type>(store_client_->schema()->dataset_type())
|
|
|
|
|
<< " Dataset schema file: " << schema_file_ << " Number of rows: " << num_rows_
|
|
|
|
|
<< " Rows per buffer: " << rows_per_buffer_ << " Num buffers (computed): " << buffers_needed
|
|
|
|
|
<< " Number of workers: " << num_workers_ << ".";
|
|
|
|
|
|
|
|
|
|
// Next, create each buffer in a loop.
|
|
|
|
|
int32_t buff_id = 0;
|
|
|
|
@ -344,7 +344,7 @@ void StorageOp::Print(std::ostream &out, bool show_all) const {
|
|
|
|
|
// Private helper method. This one posts a control indicator for each worker thread to consume
|
|
|
|
|
// from the action queue. When the worker pops this msg, it will shut itself down gracefully.
|
|
|
|
|
Status StorageOp::PostEndOfData() {
|
|
|
|
|
MS_LOG(INFO) << "Master: Processed all of the buffers. Send end-of-data message to workers.";
|
|
|
|
|
MS_LOG(DEBUG) << "Master: Processed all of the buffers. Send end-of-data message to workers.";
|
|
|
|
|
|
|
|
|
|
// For each worker we add the message so that they can all get the memo
|
|
|
|
|
for (int32_t i = 0; i < num_workers_; ++i) {
|
|
|
|
@ -462,14 +462,14 @@ Status StorageOp::operator()() {
|
|
|
|
|
// Reduce the shared_ptr ref count of this buffer by removing it from the mDataBuffers
|
|
|
|
|
// table first before we push the buffer to output connector.
|
|
|
|
|
data_buffers_[buffer_id].reset();
|
|
|
|
|
MS_LOG(INFO) << "StorageOp master: Consumed buffer " << buffer_id << " from internal worker connector.";
|
|
|
|
|
MS_LOG(DEBUG) << "StorageOp master: Consumed buffer " << buffer_id << " from internal worker connector.";
|
|
|
|
|
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(fetched_buffer)));
|
|
|
|
|
MS_LOG(INFO) << "StorageOp master: pushed buffer " << buffer_id << " to output connector.";
|
|
|
|
|
MS_LOG(DEBUG) << "StorageOp master: pushed buffer " << buffer_id << " to output connector.";
|
|
|
|
|
|
|
|
|
|
// Now, check our loop exit conditions and perform appropriate end of data handling if
|
|
|
|
|
// we've reached the end of our scan.
|
|
|
|
|
if (buffers_fetched_ == num_buffers_to_fetch) {
|
|
|
|
|
MS_LOG(INFO) << "StorageOp master: Reached end of data.";
|
|
|
|
|
MS_LOG(DEBUG) << "StorageOp master: Reached end of data.";
|
|
|
|
|
|
|
|
|
|
// If we are not inside of a Repeat path in the tree, or we are in a repeat path but
|
|
|
|
|
// this was our last repeat, then we do a full quit here with eof control message.
|
|
|
|
@ -479,17 +479,17 @@ Status StorageOp::operator()() {
|
|
|
|
|
RETURN_IF_NOT_OK(this->PostEndOfData());
|
|
|
|
|
std::unique_ptr<DataBuffer> eoeBuffer = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE);
|
|
|
|
|
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(eoeBuffer)));
|
|
|
|
|
MS_LOG(INFO) << "StorageOp master: Flow end-of-data eof message.";
|
|
|
|
|
MS_LOG(DEBUG) << "StorageOp master: Flow end-of-data eof message.";
|
|
|
|
|
std::unique_ptr<DataBuffer> eofBuffer = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF);
|
|
|
|
|
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(eofBuffer)));
|
|
|
|
|
MS_LOG(INFO) << "StorageOp master: Main execution loop complete.";
|
|
|
|
|
MS_LOG(DEBUG) << "StorageOp master: Main execution loop complete.";
|
|
|
|
|
done = true; // while loop exit
|
|
|
|
|
} else {
|
|
|
|
|
// We are in a repeat path and it's not the last repeat.
|
|
|
|
|
// Flow an end-of-epoch control message up the pipeline.
|
|
|
|
|
// RepeatOp above us somewhere in the tree will re-init us with the data to fetch again
|
|
|
|
|
// once it gets the end-of-epoch message.
|
|
|
|
|
MS_LOG(INFO) << "StorageOp master: Flow end-of-epoch eoe message.";
|
|
|
|
|
MS_LOG(DEBUG) << "StorageOp master: Flow end-of-epoch eoe message.";
|
|
|
|
|
std::unique_ptr<DataBuffer> eoe_buffer = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE);
|
|
|
|
|
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(eoe_buffer)));
|
|
|
|
|
|
|
|
|
@ -513,7 +513,7 @@ Status StorageOp::operator()() {
|
|
|
|
|
// The entry point code for when workers are launched.
|
|
|
|
|
Status StorageOp::WorkerEntry(int32_t worker_id) {
|
|
|
|
|
int32_t next_action_id = 0;
|
|
|
|
|
MS_LOG(INFO) << "Worker: StorageOp worker entry point.";
|
|
|
|
|
MS_LOG(DEBUG) << "Worker: StorageOp worker entry point.";
|
|
|
|
|
|
|
|
|
|
// Handshake with TaskManager to synchronize the creation
|
|
|
|
|
TaskManager::FindMe()->Post();
|
|
|
|
@ -524,18 +524,18 @@ Status StorageOp::WorkerEntry(int32_t worker_id) {
|
|
|
|
|
// Drive a load of this buffer and get a pointer to the buffer after it's loaded in
|
|
|
|
|
std::unique_ptr<DataBuffer> dB;
|
|
|
|
|
RETURN_IF_NOT_OK(this->GetBuffer(next_action_id, &dB));
|
|
|
|
|
MS_LOG(INFO) << "Worker: Loaded buffer " << next_action_id << ".";
|
|
|
|
|
MS_LOG(DEBUG) << "Worker: Loaded buffer " << next_action_id << ".";
|
|
|
|
|
|
|
|
|
|
// Add the buffer to the internal queue for master to consume from later.
|
|
|
|
|
// This could end up blocking if the queue is full in which case it waits here
|
|
|
|
|
// until the master can drain a buffer off the queue.
|
|
|
|
|
RETURN_IF_NOT_OK(worker_connector_->Add(worker_id, std::move(dB)));
|
|
|
|
|
MS_LOG(INFO) << "Worker: Pushed buffer " << next_action_id << " to internal worker connector.";
|
|
|
|
|
MS_LOG(DEBUG) << "Worker: Pushed buffer " << next_action_id << " to internal worker connector.";
|
|
|
|
|
|
|
|
|
|
// Get the next action id and loop
|
|
|
|
|
RETURN_IF_NOT_OK(action_queue_[worker_id]->PopFront(&next_action_id));
|
|
|
|
|
}
|
|
|
|
|
MS_LOG(INFO) << "Worker: Received end-of-data message. Worker complete.";
|
|
|
|
|
MS_LOG(DEBUG) << "Worker: Received end-of-data message. Worker complete.";
|
|
|
|
|
return Status::OK();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -576,12 +576,12 @@ Status StorageOp::LoadParallelConfig() {
|
|
|
|
|
RETURN_STATUS_UNEXPECTED("Invalid deviceNum");
|
|
|
|
|
}
|
|
|
|
|
if (device_id_ > MAX_INTEGER_INT32 || device_id_ >= device_num_) {
|
|
|
|
|
MS_LOG(INFO) << "In parallel config file " << data_distribution_file_ << ", wrong deviceID provided.";
|
|
|
|
|
MS_LOG(DEBUG) << "In parallel config file " << data_distribution_file_ << ", wrong deviceID provided.";
|
|
|
|
|
RETURN_STATUS_UNEXPECTED("Invalid deviceId");
|
|
|
|
|
}
|
|
|
|
|
shard_config_ = js.value("shardConfig", "");
|
|
|
|
|
if (shard_config_ != "ALL" && shard_config_ != "UNIQUE" && shard_config_ != "RANDOM") {
|
|
|
|
|
MS_LOG(INFO) << "In parallel config file " << data_distribution_file_ << " wrong mShardConfig provided.";
|
|
|
|
|
MS_LOG(DEBUG) << "In parallel config file " << data_distribution_file_ << " wrong mShardConfig provided.";
|
|
|
|
|
RETURN_STATUS_UNEXPECTED("Invalid shardConfig");
|
|
|
|
|
}
|
|
|
|
|
std::string shuffle_str = js.value("shuffle", "");
|
|
|
|
@ -590,8 +590,8 @@ Status StorageOp::LoadParallelConfig() {
|
|
|
|
|
} else if (shuffle_str == "OFF") {
|
|
|
|
|
shuffle_config_ = false;
|
|
|
|
|
} else {
|
|
|
|
|
MS_LOG(INFO) << "In parallel config file " << data_distribution_file_
|
|
|
|
|
<< ", shuffle config is wrong: it's not ON or OFF";
|
|
|
|
|
MS_LOG(DEBUG) << "In parallel config file " << data_distribution_file_
|
|
|
|
|
<< ", shuffle config is wrong: it's not ON or OFF";
|
|
|
|
|
RETURN_STATUS_UNEXPECTED("Invalid shuffle option");
|
|
|
|
|
}
|
|
|
|
|
seed_ = js.value("seed", 0);
|
|
|
|
|