|
|
|
@ -156,14 +156,15 @@ Status MapOp::WorkerEntry(int32_t worker_id) {
|
|
|
|
|
// initializations that happen after the first fetch.
|
|
|
|
|
RETURN_IF_NOT_OK(FetchNextBuffer(&in_buffer, worker_id));
|
|
|
|
|
|
|
|
|
|
// Initialize details related to column selections and column map by calling WorkerEntryInit.
|
|
|
|
|
// WorkerEntryInit contains thread-safe lock to ensure that this init work is only performed once
|
|
|
|
|
// by the first worker to enter the codepath. All other threads will share the const info that
|
|
|
|
|
// gets set up here going forward.
|
|
|
|
|
// Sanity check the databuffer.
|
|
|
|
|
// Special case: if there's more threads than buffers, some threads simply get the final control
|
|
|
|
|
// messages (eoe/eof), and so they will not perform the init work.
|
|
|
|
|
// messages (eoe/eof), and so they will not perform the check.
|
|
|
|
|
if (!in_buffer->eoe() && !in_buffer->eof()) {
|
|
|
|
|
RETURN_IF_NOT_OK(WorkerEntryInit(in_buffer.get()));
|
|
|
|
|
int32_t num_rows = in_buffer->NumRows();
|
|
|
|
|
int32_t num_cols = in_buffer->NumCols();
|
|
|
|
|
if (num_rows == 0 || num_cols == 0) {
|
|
|
|
|
RETURN_STATUS_UNEXPECTED("MapOp is getting an empty DataBuffer.");
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Now that init work is done, drop into the main fetching loop.
|
|
|
|
@ -258,26 +259,37 @@ Status MapOp::WorkerCompute(DataBuffer *in_buffer, TensorQTable *new_tensor_tabl
|
|
|
|
|
return Status::OK();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// initialize some internal data structure used by WorkerEntry()
|
|
|
|
|
Status MapOp::WorkerEntryInit(const DataBuffer *in_buf) {
|
|
|
|
|
int32_t num_rows = in_buf->NumRows();
|
|
|
|
|
int32_t num_cols = in_buf->NumCols();
|
|
|
|
|
if (num_rows == 0 || num_cols == 0) {
|
|
|
|
|
RETURN_STATUS_UNEXPECTED("MapOp is getting an empty DataBuffer.");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We can't use AssignColMapFromChild() here since we need to modify the column map. We need to be threadsafe
|
|
|
|
|
// though for saving the final map in the op, so use the lock here.
|
|
|
|
|
if (first_fetch_) {
|
|
|
|
|
std::unique_lock<std::mutex> lock(column_name_map_mutex_);
|
|
|
|
|
// If the map has not been set up yet in the base class, then we are the first one in to set it up
|
|
|
|
|
// (and we are under protection of the mutex lock)
|
|
|
|
|
Status MapOp::ComputeColMap() {
|
|
|
|
|
// If the map has not been set up yet in the base class, then set it up
|
|
|
|
|
if (column_name_id_map_.empty()) {
|
|
|
|
|
std::unordered_map<std::string, int32_t> current_name_id_map = child_[0]->column_name_id_map();
|
|
|
|
|
// Initialize private variables
|
|
|
|
|
RETURN_IF_NOT_OK(InitPrivateVariable(¤t_name_id_map));
|
|
|
|
|
// Create the final column name to index mapping in the base class field
|
|
|
|
|
CreateFinalColMap(¤t_name_id_map);
|
|
|
|
|
MS_LOG(DEBUG) << "Column name map for map op set: " << this->ColumnNameMapAsString();
|
|
|
|
|
} else {
|
|
|
|
|
MS_LOG(WARNING) << "Column name map is already set!";
|
|
|
|
|
}
|
|
|
|
|
return Status::OK();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Validating if each of the input_columns exists in the DataBuffer.
|
|
|
|
|
Status MapOp::ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map) {
|
|
|
|
|
for (const auto &inCol : in_columns_) {
|
|
|
|
|
bool found = col_name_id_map.find(inCol) != col_name_id_map.end() ? true : false;
|
|
|
|
|
if (!found) {
|
|
|
|
|
std::string err_msg = "input column name: " + inCol + " doesn't exist in the dataset columns.";
|
|
|
|
|
RETURN_STATUS_UNEXPECTED(err_msg);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return Status::OK();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Status MapOp::InitPrivateVariable(std::unordered_map<std::string, int32_t> *col_name_id_map) {
|
|
|
|
|
// If input_columns is empty(), The col at index-0 will be picked.
|
|
|
|
|
if (in_columns_.empty()) {
|
|
|
|
|
for (const auto &pair : current_name_id_map) {
|
|
|
|
|
for (const auto &pair : *col_name_id_map) {
|
|
|
|
|
if (pair.second == 0) {
|
|
|
|
|
MS_LOG(INFO) << "Input columns empty for map op, will apply to the first column in the current table.";
|
|
|
|
|
in_columns_.push_back(pair.first);
|
|
|
|
@ -294,38 +306,18 @@ Status MapOp::WorkerEntryInit(const DataBuffer *in_buf) {
|
|
|
|
|
|
|
|
|
|
// Before we continue, issue a sanity check to make sure the input columns from user and the incoming
|
|
|
|
|
// columns from child are correct
|
|
|
|
|
RETURN_IF_NOT_OK(this->ValidateInColumns(current_name_id_map));
|
|
|
|
|
RETURN_IF_NOT_OK(this->ValidateInColumns(*col_name_id_map));
|
|
|
|
|
|
|
|
|
|
// initialize keep_input_columns, true means to keep the column.
|
|
|
|
|
keep_input_columns_.resize(num_cols, true);
|
|
|
|
|
keep_input_columns_.resize(col_name_id_map->size(), true);
|
|
|
|
|
for (const auto &col_name : in_columns_) {
|
|
|
|
|
int32_t missed = current_name_id_map[col_name];
|
|
|
|
|
int32_t missed = (*col_name_id_map)[col_name];
|
|
|
|
|
keep_input_columns_[missed] = false;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// initialize to_process_indices.
|
|
|
|
|
for (const auto &col_name : in_columns_) {
|
|
|
|
|
to_process_indices_.push_back(current_name_id_map[col_name]);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Create the final column name to index mapping in the base class field
|
|
|
|
|
CreateFinalColMap(¤t_name_id_map);
|
|
|
|
|
first_fetch_ = false;
|
|
|
|
|
}
|
|
|
|
|
} // mutex lock will release here
|
|
|
|
|
|
|
|
|
|
MS_LOG(DEBUG) << "Column name map for map op set: " << this->ColumnNameMapAsString();
|
|
|
|
|
return Status::OK();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Validating if each of the input_columns exists in the DataBuffer.
|
|
|
|
|
Status MapOp::ValidateInColumns(const std::unordered_map<std::string, int32_t> &col_name_id_map) {
|
|
|
|
|
for (const auto &inCol : in_columns_) {
|
|
|
|
|
bool found = col_name_id_map.find(inCol) != col_name_id_map.end() ? true : false;
|
|
|
|
|
if (!found) {
|
|
|
|
|
std::string err_msg = "input column name: " + inCol + " doesn't exist in the dataset columns.";
|
|
|
|
|
RETURN_STATUS_UNEXPECTED(err_msg);
|
|
|
|
|
}
|
|
|
|
|
to_process_indices_.push_back((*col_name_id_map)[col_name]);
|
|
|
|
|
}
|
|
|
|
|
return Status::OK();
|
|
|
|
|
}
|
|
|
|
|