Replace Databuffers with TensorRows

pull/12841/head
hesham 4 years ago
parent 0bb4841427
commit a9a80693be

@ -1,5 +1,5 @@
/** /**
* Copyright 2020 Huawei Technologies Co., Ltd * Copyright 2020-2021 Huawei Technologies Co., Ltd
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -21,15 +21,21 @@
namespace mindspore { namespace mindspore {
namespace dataset { namespace dataset {
TensorRow::TensorRow() noexcept : id_(kDefaultRowId), path_({}) {} TensorRow::TensorRow() noexcept : id_(kDefaultRowId), path_({}), tensor_row_flag_(kFlagNone) {}
TensorRow::TensorRow(size_type n, TensorRow::value_type t) noexcept : id_(kDefaultRowId), path_({}), row_(n, t) {} TensorRow::TensorRow(size_type n, TensorRow::value_type t) noexcept
: id_(kDefaultRowId), path_({}), row_(n, t), tensor_row_flag_(kFlagNone) {}
TensorRow::TensorRow(const TensorRow::vector_type &v) : id_(kDefaultRowId), path_({}), row_(v) {} TensorRow::TensorRow(const TensorRow::vector_type &v)
: id_(kDefaultRowId), path_({}), row_(v), tensor_row_flag_(kFlagNone) {}
TensorRow::TensorRow(row_id_type id, const std::initializer_list<value_type> &lst) : id_(id), path_({}), row_(lst) {} TensorRow::TensorRow(row_id_type id, const std::initializer_list<value_type> &lst)
: id_(id), path_({}), row_(lst), tensor_row_flag_(kFlagNone) {}
TensorRow::TensorRow(const TensorRow &tr) : id_(tr.id_), path_(tr.path_), row_(tr.row_) {} TensorRow::TensorRow(const TensorRow &tr)
: id_(tr.id_), path_(tr.path_), row_(tr.row_), tensor_row_flag_(tr.tensor_row_flag_) {}
TensorRow::TensorRow(TensorRow::TensorRowFlags flag) : tensor_row_flag_(flag) {}
TensorRow &TensorRow::operator=(const TensorRow &tr) { TensorRow &TensorRow::operator=(const TensorRow &tr) {
if (this == &tr) { if (this == &tr) {
@ -38,23 +44,27 @@ TensorRow &TensorRow::operator=(const TensorRow &tr) {
row_ = tr.row_; row_ = tr.row_;
id_ = tr.id_; id_ = tr.id_;
path_ = tr.path_; path_ = tr.path_;
tensor_row_flag_ = tr.tensor_row_flag_;
return *this; return *this;
} }
TensorRow &TensorRow::operator=(const std::initializer_list<TensorRow::value_type> &lst) { TensorRow &TensorRow::operator=(const std::initializer_list<TensorRow::value_type> &lst) {
row_ = lst; row_ = lst;
tensor_row_flag_ = kFlagNone;
return *this; return *this;
} }
TensorRow::TensorRow(TensorRow::vector_type &&v) noexcept : id_(kDefaultRowId), path_({}), row_(std::move(v)) {} TensorRow::TensorRow(TensorRow::vector_type &&v) noexcept
: id_(kDefaultRowId), path_({}), row_(std::move(v)), tensor_row_flag_(kFlagNone) {}
TensorRow::TensorRow(row_id_type id, std::initializer_list<value_type> &&lst) noexcept TensorRow::TensorRow(row_id_type id, std::initializer_list<value_type> &&lst) noexcept
: id_(id), path_({}), row_(std::move(lst)) {} : id_(id), path_({}), row_(std::move(lst)), tensor_row_flag_(kFlagNone) {}
TensorRow::TensorRow(TensorRow &&tr) noexcept { TensorRow::TensorRow(TensorRow &&tr) noexcept {
id_ = tr.id_; id_ = tr.id_;
path_ = std::move(tr.path_); path_ = std::move(tr.path_);
row_ = std::move(tr.row_); row_ = std::move(tr.row_);
tensor_row_flag_ = tr.tensor_row_flag_;
} }
TensorRow &TensorRow::operator=(TensorRow &&tr) noexcept { TensorRow &TensorRow::operator=(TensorRow &&tr) noexcept {
@ -65,11 +75,13 @@ TensorRow &TensorRow::operator=(TensorRow &&tr) noexcept {
id_ = tr.id_; id_ = tr.id_;
tr.id_ = kDefaultRowId; tr.id_ = kDefaultRowId;
path_ = std::move(tr.path_); path_ = std::move(tr.path_);
tensor_row_flag_ = tr.tensor_row_flag_;
return *this; return *this;
} }
TensorRow &TensorRow::operator=(std::initializer_list<TensorRow::value_type> &&lst) noexcept { TensorRow &TensorRow::operator=(std::initializer_list<TensorRow::value_type> &&lst) noexcept {
row_ = std::move(lst); row_ = std::move(lst);
tensor_row_flag_ = kFlagNone;
return *this; return *this;
} }

@ -1,5 +1,5 @@
/** /**
* Copyright 2020 Huawei Technologies Co., Ltd * Copyright 2020-2021 Huawei Technologies Co., Ltd
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -35,6 +35,14 @@ class TensorRow {
public: public:
static constexpr row_id_type kDefaultRowId = -1; // Default row id static constexpr row_id_type kDefaultRowId = -1; // Default row id
enum TensorRowFlags : uint32_t {
kFlagNone = 0,
kFlagEOF = 1, // The buffer is an eof end-of-data msg
kFlagEOE = 1u << 1, // The buffer is an eoe end-of-epoch msg
kFlagWait = 1u << 2, // The buffer is an control signal for workers to suspend operations
kFlagQuit = 1u << 3 // The buffer is a control signal for workers to quit
};
// Type definitions // Type definitions
using size_type = dsize_t; using size_type = dsize_t;
using value_type = std::shared_ptr<Tensor>; using value_type = std::shared_ptr<Tensor>;
@ -222,10 +230,25 @@ class TensorRow {
const_iterator end() const { return row_.end(); } const_iterator end() const { return row_.end(); }
// Convenience getter functions for flag checking
bool eof() const { return (static_cast<uint32_t>(tensor_row_flag_) & static_cast<uint32_t>(kFlagEOF)); }
bool eoe() const { return (static_cast<uint32_t>(tensor_row_flag_) & static_cast<uint32_t>(kFlagEOE)); }
bool wait() const { return (static_cast<uint32_t>(tensor_row_flag_) & static_cast<uint32_t>(kFlagWait)); }
bool quit() const { return (static_cast<uint32_t>(tensor_row_flag_) & static_cast<uint32_t>(kFlagQuit)); }
TensorRowFlags Flags() { return tensor_row_flag_; }
explicit TensorRow(TensorRowFlags);
protected: protected:
row_id_type id_; row_id_type id_;
std::vector<std::string> path_; std::vector<std::string> path_;
std::vector<std::shared_ptr<Tensor>> row_; std::vector<std::shared_ptr<Tensor>> row_;
TensorRowFlags tensor_row_flag_;
}; };
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

@ -1,5 +1,5 @@
/** /**
* Copyright 2020 Huawei Technologies Co., Ltd * Copyright 2020-2021 Huawei Technologies Co., Ltd
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -114,27 +114,6 @@ Status CacheClient::WriteRow(const TensorRow &row, row_id_type *row_id_from_serv
return Status::OK(); return Status::OK();
} }
Status CacheClient::WriteBuffer(std::unique_ptr<DataBuffer> &&in) const {
std::unique_ptr<DataBuffer> db_ptr = std::move(in);
auto num_rows = db_ptr->NumRows();
// We will send the requests async first on all rows and do a final wait.
if (num_rows > 0) {
auto arr = std::make_unique<std::shared_ptr<CacheRowRequest>[]>(num_rows);
for (auto i = 0; i < num_rows; ++i) {
TensorRow row;
RETURN_IF_NOT_OK(db_ptr->PopRow(&row));
arr[i] = std::make_shared<CacheRowRequest>(this);
RETURN_IF_NOT_OK(arr[i]->SerializeCacheRowRequest(this, row));
RETURN_IF_NOT_OK(PushRequest(arr[i]));
}
// Now we wait for them to come back
for (auto i = 0; i < num_rows; ++i) {
RETURN_IF_NOT_OK(arr[i]->Wait());
}
}
return Status::OK();
}
Status CacheClient::AsyncWriteRow(const TensorRow &row) { Status CacheClient::AsyncWriteRow(const TensorRow &row) {
if (async_buffer_stream_ == nullptr) { if (async_buffer_stream_ == nullptr) {
return Status(StatusCode::kMDNotImplementedYet); return Status(StatusCode::kMDNotImplementedYet);
@ -143,34 +122,6 @@ Status CacheClient::AsyncWriteRow(const TensorRow &row) {
return Status::OK(); return Status::OK();
} }
Status CacheClient::AsyncWriteBuffer(std::unique_ptr<DataBuffer> &&in) {
if (async_buffer_stream_ == nullptr) {
return Status(StatusCode::kMDNotImplementedYet);
} else {
Status rc;
std::unique_ptr<TensorQTable> tensor_table = std::make_unique<TensorQTable>();
auto num_rows = in->NumRows();
if (num_rows > 0) {
for (auto i = 0; i < num_rows; ++i) {
TensorRow row;
RETURN_IF_NOT_OK(in->PopRow(&row));
rc = AsyncWriteRow(row);
if (rc.StatusCode() == StatusCode::kMDNotImplementedYet) {
tensor_table->push_back(row);
} else if (rc.IsError()) {
return rc;
}
}
}
// If not all of them can be sent async, return what's left back to the caller.
if (!tensor_table->empty()) {
in->set_tensor_table(std::move(tensor_table));
return Status(StatusCode::kMDNotImplementedYet);
}
}
return Status::OK();
}
Status CacheClient::GetRows(const std::vector<row_id_type> &row_id, TensorTable *out) const { Status CacheClient::GetRows(const std::vector<row_id_type> &row_id, TensorTable *out) const {
RETURN_UNEXPECTED_IF_NULL(out); RETURN_UNEXPECTED_IF_NULL(out);
auto rq = std::make_shared<BatchFetchRequest>(this, row_id); auto rq = std::make_shared<BatchFetchRequest>(this, row_id);

@ -1,5 +1,5 @@
/** /**
* Copyright 2020 Huawei Technologies Co., Ltd * Copyright 2020-2021 Huawei Technologies Co., Ltd
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -156,11 +156,6 @@ class CacheClient {
/// \return return code /// \return return code
Status WriteRow(const TensorRow &row, row_id_type *row_id_from_server = nullptr) const; Status WriteRow(const TensorRow &row, row_id_type *row_id_from_server = nullptr) const;
/// \brief Send a DataBuffer to the cache server
/// \param in Unique pointer of the DataBuffer to be cached
/// \return return code
Status WriteBuffer(std::unique_ptr<DataBuffer> &&in) const;
/// \brief Fetch a list of rows from the cache server. An empty TensorRow will be returned if there is /// \brief Fetch a list of rows from the cache server. An empty TensorRow will be returned if there is
/// any cache miss /// any cache miss
/// \param row_id A vector of row id's /// \param row_id A vector of row id's
@ -257,6 +252,9 @@ class CacheClient {
return false; return false;
} }
/// \brief Serialize a Tensor into the async buffer.
Status AsyncWriteRow(const TensorRow &row);
// Default size of the async write buffer // Default size of the async write buffer
constexpr static int64_t kAsyncBufferSize = 16 * 1048576L; // 16M constexpr static int64_t kAsyncBufferSize = 16 * 1048576L; // 16M
constexpr static int32_t kNumAsyncBuffer = 3; constexpr static int32_t kNumAsyncBuffer = 3;
@ -269,8 +267,6 @@ class CacheClient {
return Status::OK(); return Status::OK();
} }
Status AsyncWriteBuffer(std::unique_ptr<DataBuffer> &&in);
private: private:
mutable RWLock mux_; mutable RWLock mux_;
uint64_t cache_mem_sz_; uint64_t cache_mem_sz_;
@ -354,9 +350,6 @@ class CacheClient {
std::atomic<int64_t> next_addr_; std::atomic<int64_t> next_addr_;
}; };
std::shared_ptr<AsyncBufferStream> async_buffer_stream_; std::shared_ptr<AsyncBufferStream> async_buffer_stream_;
/// \brief Serialize a Tensor into the async buffer.
Status AsyncWriteRow(const TensorRow &row);
}; };
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

@ -1,5 +1,5 @@
/** /**
* Copyright 2020 Huawei Technologies Co., Ltd * Copyright 2020-2021 Huawei Technologies Co., Ltd
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -272,7 +272,6 @@ Status CachePipelineRun::WriterWorkerEntry(int32_t worker_id) {
int64_t min_val = std::numeric_limits<int64_t>::max(); int64_t min_val = std::numeric_limits<int64_t>::max();
int64_t max_val = 0; int64_t max_val = 0;
int64_t total_val = 0; int64_t total_val = 0;
int64_t cnt = 0;
std::vector<int64_t> duration; std::vector<int64_t> duration;
duration.reserve(num_rows_ / num_pipelines_ / cfg_.num_parallel_workers()); duration.reserve(num_rows_ / num_pipelines_ / cfg_.num_parallel_workers());
bool resource_err = false; bool resource_err = false;
@ -291,8 +290,6 @@ Status CachePipelineRun::WriterWorkerEntry(int32_t worker_id) {
} }
// Once we hit resource error, we drain the io block. No point to send anything to the server. // Once we hit resource error, we drain the io block. No point to send anything to the server.
if (!resource_err) { if (!resource_err) {
auto buffer = std::make_unique<DataBuffer>(cnt++, DataBuffer::kDeBFlagNone);
auto tensor_table = std::make_unique<TensorQTable>();
for (auto id : keys) { for (auto id : keys) {
TensorRow row; TensorRow row;
std::shared_ptr<Tensor> element; std::shared_ptr<Tensor> element;
@ -305,29 +302,27 @@ Status CachePipelineRun::WriterWorkerEntry(int32_t worker_id) {
*it = i; *it = i;
} }
row.push_back(std::move(element)); row.push_back(std::move(element));
tensor_table->push_back(std::move(row)); // Measure the time to call WriteBuffer
} auto start_tick = std::chrono::steady_clock::now();
buffer->set_tensor_table(std::move(tensor_table)); rc = cc_->AsyncWriteRow(std::move(row));
// Measure the time to call WriteBuffer auto end_tick = std::chrono::steady_clock::now();
auto start_tick = std::chrono::steady_clock::now(); if (rc.IsError()) {
rc = cc_->AsyncWriteBuffer(std::move(buffer)); if (rc == StatusCode::kMDOutOfMemory || rc == StatusCode::kMDNoSpace) {
auto end_tick = std::chrono::steady_clock::now(); MS_LOG(WARNING) << "Pipeline number " << my_pipeline_ + 1 << " worker id " << worker_id << ": "
if (rc.IsError()) { << rc.ToString();
if (rc == StatusCode::kMDOutOfMemory || rc == StatusCode::kMDNoSpace) { resource_err = true;
MS_LOG(WARNING) << "Pipeline number " << my_pipeline_ + 1 << " worker id " << worker_id << ": " cc_->ServerRunningOutOfResources();
<< rc.ToString(); continue;
resource_err = true; } else {
cc_->ServerRunningOutOfResources(); return rc;
continue; }
} else { } else {
return rc; int64_t ms = std::chrono::duration_cast<std::chrono::microseconds>(end_tick - start_tick).count();
min_val = std::min(min_val, ms);
max_val = std::max(max_val, ms);
duration.push_back(ms);
total_val += ms;
} }
} else {
int64_t ms = std::chrono::duration_cast<std::chrono::microseconds>(end_tick - start_tick).count();
min_val = std::min(min_val, ms);
max_val = std::max(max_val, ms);
duration.push_back(ms);
total_val += ms;
} }
} }
} while (true); } while (true);

File diff suppressed because it is too large Load Diff

@ -1,5 +1,5 @@
/** /**
* Copyright 2019 Huawei Technologies Co., Ltd * Copyright 2019-2021 Huawei Technologies Co., Ltd
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -35,18 +35,21 @@ using TensorMap = std::unordered_map<std::string, std::shared_ptr<Tensor>>;
// forward declare // forward declare
class ExecutionTree; class ExecutionTree;
class DataBuffer; // The DatasetIterator derived class is for fetching rows off the end/root of the execution tree.
class DatasetIterator {
// IteratorBase class is used to iterate data from an executionTree one row at a time.
// The base class provides the general interface, whereas derived classes provide slightly
// different implementations.
class IteratorBase {
public: public:
// Constructor of IteratorBase // Constructor of the DatasetIterator
IteratorBase(); // @param exe_tree The execution tree we want to pull/iterate the data from using it's root node.
explicit DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree);
// Destructor // Destructor
virtual ~IteratorBase(); ~DatasetIterator();
// Getter
// @return The string to column id mapping.
std::unordered_map<std::string, int32_t> GetColumnNameMap() const;
bool eof_handled() const { return eof_handled_; }
// Fetches one row of data from the iterator. // Fetches one row of data from the iterator.
// the base class version simply performs error handling and returns empty row. Actual // the base class version simply performs error handling and returns empty row. Actual
@ -57,63 +60,12 @@ class IteratorBase {
// @note The position of a Tensor/column might be different from the initial column order // @note The position of a Tensor/column might be different from the initial column order
// in corresponding Dataset Op. User must be aware that MapOp, ZipOps, and others might change // in corresponding Dataset Op. User must be aware that MapOp, ZipOps, and others might change
// the column ordering. // the column ordering.
virtual Status FetchNextTensorRow(TensorRow *out_row); Status FetchNextTensorRow(TensorRow *out_row);
// Fetches one row of data from the iterator as a column map. // Fetches one row of data from the iterator as a column map.
// @return A unordered map from column name to shared pointer to Tensor. // @return A unordered map from column name to shared pointer to Tensor.
Status GetNextAsMap(TensorMap *out_map); Status GetNextAsMap(TensorMap *out_map);
/// \brief return column_name, tensor pair in the order of its column id.
/// \param[out] vec
/// \return Error code
Status GetNextAsOrderedPair(std::vector<std::pair<std::string, std::shared_ptr<Tensor>>> *vec);
// Getter
// @return T/F if this iterator is completely done after getting an eof
bool eof_handled() const { return eof_handled_; }
// Getter
// @return The string to column id mapping.
virtual std::unordered_map<std::string, int32_t> GetColumnNameMap() const = 0;
protected:
std::unique_ptr<DataBuffer> curr_buffer_; // holds the current buffer
bool eof_handled_; // T/F if this op got an eof
std::unordered_map<std::string, int32_t> col_name_id_map_;
std::vector<std::pair<std::string, int32_t>> column_order_; // key: column name, val: column id
};
// The DatasetIterator derived class is for fetching rows off the end/root of the execution tree.
class DatasetIterator : public IteratorBase {
public:
// Constructor of the DatasetIterator
// @param exe_tree The execution tree we want to pull/iterate the data from using it's root node.
explicit DatasetIterator(std::shared_ptr<ExecutionTree> exe_tree);
// Destructor
~DatasetIterator();
// Fetches one row of data from the iterator. Overrides the base class. This one fetches
// from the tree root node directly.
// @param out_row - A TensorRow (vector of shared pointers to Tensors). If any of the of data
// messages are encountered (such as eoe or eof), then an empty TensorRow is returned back.
// @return Status The status code returned
Status FetchNextTensorRow(TensorRow *out_row) override;
// Fetches the next tensor row into device row, and returns it's shape.
// @param out_shapes - A vector of tensor shapes (one shape per column)
// @return Status The status code returned
Status GetOutputShapes(std::vector<TensorShape> *out_shapes);
// Fetches the next tensor row into device row, and returns it's shape.
// @param outShapes - A vector of tensor shapes (one shape per column)
// @return Status The status code returned
Status GetOutputTypes(std::vector<DataType> *out_types);
// Getter
// @return The string to column id mapping.
std::unordered_map<std::string, int32_t> GetColumnNameMap() const override;
private: private:
std::shared_ptr<DatasetOp> root_; // saves the root of the executionTree std::shared_ptr<DatasetOp> root_; // saves the root of the executionTree
TensorRow device_queue_row_; TensorRow device_queue_row_;
@ -121,11 +73,14 @@ class DatasetIterator : public IteratorBase {
int32_t cur_batch_num_; // current batch number,used for profiling int32_t cur_batch_num_; // current batch number,used for profiling
int32_t cur_connector_size_; // current connector size of root op,used for profiling int32_t cur_connector_size_; // current connector size of root op,used for profiling
int32_t cur_connector_capacity_; // current connector capacity of root op, used for profiling int32_t cur_connector_capacity_; // current connector capacity of root op, used for profiling
bool eof_handled_; // T/F if this op got an eof
std::unordered_map<std::string, int32_t> col_name_id_map_;
std::vector<std::pair<std::string, int32_t>> column_order_; // key: column name, val: column id
}; };
// The ChildIterator derived class is for fetching rows from intermediate nodes of execution tree. // The ChildIterator derived class is for fetching rows from intermediate nodes of execution tree.
// This one should only be used by internal Dataset operators, rather than an end-user. // This one should only be used by internal Dataset operators, rather than an end-user.
class ChildIterator : public IteratorBase { class ChildIterator {
public: public:
// Constructor of the DatasetIterator // Constructor of the DatasetIterator
// @param current_op - The parent op from which we'll fetch from it's children. // @param current_op - The parent op from which we'll fetch from it's children.
@ -141,7 +96,7 @@ class ChildIterator : public IteratorBase {
// @param out_row - A TensorRow (vector of shared pointers to Tensors). If any of the of data // @param out_row - A TensorRow (vector of shared pointers to Tensors). If any of the of data
// messages are encountered (such as eoe or eof), then an empty TensorRow is returned back. // messages are encountered (such as eoe or eof), then an empty TensorRow is returned back.
// @return Status The status code returned // @return Status The status code returned
Status FetchNextTensorRow(TensorRow *out_row) override; Status FetchNextTensorRow(TensorRow *out_row);
// This function drains buffer until next eoe has been received. // This function drains buffer until next eoe has been received.
// It will be a no-op if the previous row returned is empty. // It will be a no-op if the previous row returned is empty.
@ -150,16 +105,21 @@ class ChildIterator : public IteratorBase {
// Getter // Getter
// @return The string to column id mapping. // @return The string to column id mapping.
std::unordered_map<std::string, int32_t> GetColumnNameMap() const override; std::unordered_map<std::string, int32_t> GetColumnNameMap() const;
// Return T/F if end of epoch // Return T/F if end of epoch
bool end_of_epoch() { return end_epoch_; } bool end_of_epoch() { return end_epoch_; }
// Getter
// @return T/F if this iterator is completely done after getting an eof
bool eof_handled() const { return eof_handled_; }
private: private:
DatasetOp *current_op_; // The parent operator. We consume from it's children. DatasetOp *current_op_; // The parent operator. We consume from it's children.
int32_t child_idx_; // The specific child this iterator will fetch from. int32_t child_idx_; // The specific child this iterator will fetch from.
int32_t worker_id_; // The worker id uses for fetching the child data. int32_t worker_id_; // The worker id uses for fetching the child data.
bool end_epoch_; // the flag used when an empty row has been returned. bool end_epoch_; // the flag used when an empty row has been returned.
bool eof_handled_; // T/F if this op got an eof
}; };
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

@ -1,5 +1,5 @@
/** /**
* Copyright 2020 Huawei Technologies Co., Ltd * Copyright 2020-2021 Huawei Technologies Co., Ltd
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -71,9 +71,10 @@ Status BarrierOp::operator()() {
// Loop until eof is true // Loop until eof is true
while (!eof_) { while (!eof_) {
// Create new table to put the new tensor rows RETURN_IF_NOT_OK(prepare());
std::unique_ptr<TensorQTable> curr_table = std::make_unique<TensorQTable>(); // read the first row
RETURN_IF_NOT_OK(prepare(curr_table.get())); TensorRow new_row;
RETURN_IF_NOT_OK(getNextTensorRow(&new_row));
// If an eof got picked up during the above prepare, then we're done // If an eof got picked up during the above prepare, then we're done
if (eof_) { if (eof_) {
@ -82,92 +83,36 @@ Status BarrierOp::operator()() {
// we have to output new buffer with possibly different buffer size, possibly one row // we have to output new buffer with possibly different buffer size, possibly one row
while (!clean_up_) { while (!clean_up_) {
// 1. If a previous loop iteration sent the current table out, then create a new one. // 2 Block
RETURN_IF_NOT_OK(blockCond());
if (curr_table == nullptr) {
curr_table = std::make_unique<TensorQTable>();
}
// 2 fill the table. Note: clean_up mode might get turned on if epoch is finished MS_LOG(DEBUG) << "Barrier operator finished one row, pushing, cols " << new_row.size() << ", map "
RETURN_IF_NOT_OK(fillBuffer(curr_table.get())); << column_name_id_map_.size() << ".";
RETURN_IF_NOT_OK(out_connector_->Add(std::move(new_row)));
// 3 create and update buffer and send it to the out connector RETURN_IF_NOT_OK(getNextTensorRow(&new_row));
if (!curr_table->empty()) {
std::unique_ptr<DataBuffer> curr_buffer = std::make_unique<DataBuffer>(buffer_id_, DataBuffer::kDeBFlagNone);
curr_buffer->set_tensor_table(std::move(curr_table));
MS_LOG(DEBUG) << "Barrier operator finished one buffer, pushing, rows " << curr_buffer->NumRows() << ", cols "
<< curr_buffer->NumCols() << ", map " << column_name_id_map_.size() << ".";
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(curr_buffer)));
buffer_id_++;
}
} }
// 4 handle drain state.
if (clean_up_) { if (clean_up_) {
MS_LOG(DEBUG) << "Barrier operator sending epoch ending signal."; MS_LOG(DEBUG) << "Barrier operator sending epoch ending signal.";
// Send the eoe up. // 3 Send the eoe up.
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE)))); RETURN_IF_NOT_OK(out_connector_->SendEOE());
} }
} }
// 5 handle eof // 4 handle eof
// propagate eof here. // propagate eof here.
MS_LOG(INFO) << "Barrier operator got EOF, propagating."; MS_LOG(INFO) << "Barrier operator got EOF, propagating.";
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF)))); RETURN_IF_NOT_OK(out_connector_->SendEOF());
return Status::OK(); return Status::OK();
} }
// Handles preprocessing of the main loop, used when starting new epoch // Handles preprocessing of the main loop, used when starting new epoch
Status BarrierOp::prepare(TensorQTable *const table) { Status BarrierOp::prepare() {
MS_LOG(DEBUG) << "Barrier operator prepares for new epoch."; MS_LOG(DEBUG) << "Barrier operator prepares for new epoch.";
clean_up_ = false; clean_up_ = false;
buffer_id_ = 0;
if (table == nullptr) {
return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__,
"BarrierOp prepare phase requires a tensor table.");
}
// fill initial row
TensorRow new_row = {};
// use iterator to get next row and invoke pyfunc wait
RETURN_IF_NOT_OK(getNextTensorRow(&new_row));
// If the first row fetching resulted in eof, then we are done.
if (eof_) {
return Status::OK();
}
if (new_row.empty()) {
// This epoch is empty
return Status::OK();
}
// Pack this first row into our tensor table
// first row we also have to check if we should block
RETURN_IF_NOT_OK(blockCond());
table->push_back(std::move(new_row));
// the update code below shouldn't do anything bad if the column name already exists. // the update code below shouldn't do anything bad if the column name already exists.
return Status::OK(); return Status::OK();
} }
// fillBuffer always expects a new table to fill
Status BarrierOp::fillBuffer(TensorQTable *const table) {
if (table == nullptr) {
return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "BarrierOp fillBuffer null table pointer.");
}
TensorRow new_row = {};
while (table->size() < static_cast<size_t>(rows_per_buffer_)) {
RETURN_IF_NOT_OK(getNextTensorRow(&new_row));
// Early exit the loop if we got empty row from any of our child iterations
if (new_row.empty()) {
return Status::OK();
}
// else we got a row so pack it into the tensor table.
RETURN_IF_NOT_OK(blockCond());
table->push_back(std::move(new_row));
}
return Status::OK();
}
// function executes a py_func and blocks until condition becomes true. // function executes a py_func and blocks until condition becomes true.
Status BarrierOp::blockCond() { Status BarrierOp::blockCond() {
{ {

@ -1,5 +1,5 @@
/** /**
* Copyright 2020 Huawei Technologies Co., Ltd * Copyright 2020-2021 Huawei Technologies Co., Ltd
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@ -139,7 +139,7 @@ class BarrierOp : public PipelineOp {
// Handles preprocessing of the main loop, used when starting new epoch // Handles preprocessing of the main loop, used when starting new epoch
// @param table - a table of tensors to be moved into a buffer // @param table - a table of tensors to be moved into a buffer
Status prepare(TensorQTable *const table); Status prepare();
// This function calls takes a table repeatedly adds rows to it. // This function calls takes a table repeatedly adds rows to it.
// @param table - a table of tensors to be moved into a buffer // @param table - a table of tensors to be moved into a buffer
@ -152,7 +152,7 @@ class BarrierOp : public PipelineOp {
Status blockCond(); Status blockCond();
private: private:
// clean up variable to return imcomplete buffer // clean up variable to return incomplete buffer
bool clean_up_; bool clean_up_;
// end of file state, we stop reading data and shut down // end of file state, we stop reading data and shut down
bool eof_; bool eof_;

@ -182,24 +182,21 @@ void BatchOp::Print(std::ostream &out, bool show_all) const {
} }
} }
Status BatchOp::BatchRows(const std::unique_ptr<TensorQTable> *src, const std::unique_ptr<TensorQTable> *dest, Status BatchOp::BatchRows(const std::unique_ptr<TensorQTable> *src, TensorRow *dest, dsize_t batch_size) {
dsize_t batch_size) {
if ((*src)->size() != batch_size) { if ((*src)->size() != batch_size) {
RETURN_STATUS_UNEXPECTED("[Internal Batch ERROR] Source table size does not match the batch_size"); RETURN_STATUS_UNEXPECTED("[Internal Batch ERROR] Source table size does not match the batch_size");
} }
if (batch_size == 1) { if (batch_size == 1) {
TensorRow row = std::move((*src)->front()); *dest = std::move((*src)->front());
row.setPath({});
(*src)->pop_front(); (*src)->pop_front();
(*dest)->push_back(row);
for (const auto &tensor : (*dest)->front()) { for (const auto &tensor : (*dest)) {
RETURN_IF_NOT_OK(tensor->ExpandDim(0)); RETURN_IF_NOT_OK(tensor->ExpandDim(0));
} }
return Status::OK(); return Status::OK();
} }
TensorRow batched_row;
auto num_columns = (*src)->front().size(); auto num_columns = (*src)->front().size();
for (size_t i = 0; i < num_columns; i++) { for (size_t i = 0; i < num_columns; i++) {
std::shared_ptr<Tensor> first_tensor = (*src)->at(0).at(i); // first row, column i std::shared_ptr<Tensor> first_tensor = (*src)->at(0).at(i); // first row, column i
@ -234,11 +231,9 @@ Status BatchOp::BatchRows(const std::unique_ptr<TensorQTable> *src, const std::u
} }
RETURN_IF_NOT_OK(Tensor::CreateFromVector(strings, new_shape, &new_tensor)); RETURN_IF_NOT_OK(Tensor::CreateFromVector(strings, new_shape, &new_tensor));
} }
batched_row.emplace_back(new_tensor); dest->emplace_back(new_tensor);
} }
(*dest)->emplace_back(batched_row);
return Status::OK(); return Status::OK();
} }
@ -248,30 +243,26 @@ Status BatchOp::WorkerEntry(int32_t workerId) {
RETURN_IF_NOT_OK(worker_queues_[workerId]->PopFront(&table_pair)); RETURN_IF_NOT_OK(worker_queues_[workerId]->PopFront(&table_pair));
while (table_pair.second.ctrl_ != batchCtrl::kQuit) { while (table_pair.second.ctrl_ != batchCtrl::kQuit) {
if (table_pair.second.ctrl_ == batchCtrl::kEOE) { if (table_pair.second.ctrl_ == batchCtrl::kEOE) {
RETURN_IF_NOT_OK(out_connector_->Add(workerId, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE))); RETURN_IF_NOT_OK(out_connector_->SendEOE(workerId));
} else if (table_pair.second.ctrl_ == batchCtrl::kEOF) { } else if (table_pair.second.ctrl_ == batchCtrl::kEOF) {
RETURN_IF_NOT_OK(out_connector_->Add(workerId, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF))); RETURN_IF_NOT_OK(out_connector_->SendEOF(workerId));
} else if (table_pair.second.ctrl_ == batchCtrl::kNoCtrl) { } else if (table_pair.second.ctrl_ == batchCtrl::kNoCtrl) {
std::unique_ptr<DataBuffer> db = nullptr; TensorRow new_row;
RETURN_IF_NOT_OK(MakeBatchedBuffer(std::move(table_pair), &db)); RETURN_IF_NOT_OK(MakeBatchedBuffer(std::move(table_pair), &new_row));
RETURN_IF_NOT_OK(out_connector_->Add(workerId, std::move(db))); RETURN_IF_NOT_OK(out_connector_->Add(std::move(new_row), workerId));
} }
RETURN_IF_NOT_OK(worker_queues_[workerId]->PopFront(&table_pair)); RETURN_IF_NOT_OK(worker_queues_[workerId]->PopFront(&table_pair));
} }
return Status::OK(); return Status::OK();
} }
Status BatchOp::MakeBatchedBuffer(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair, Status BatchOp::MakeBatchedBuffer(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair, TensorRow *new_row) {
std::unique_ptr<DataBuffer> *db) {
RETURN_UNEXPECTED_IF_NULL(table_pair.first); RETURN_UNEXPECTED_IF_NULL(table_pair.first);
#ifdef ENABLE_PYTHON #ifdef ENABLE_PYTHON
if (!in_col_names_.empty()) RETURN_IF_NOT_OK(MapColumns(&table_pair)); // pass it through pyfunc if (!in_col_names_.empty()) RETURN_IF_NOT_OK(MapColumns(&table_pair)); // pass it through pyfunc
#endif #endif
if (pad_) RETURN_IF_NOT_OK(PadColumns(&table_pair.first, pad_info_, column_name_id_map_)); // do padding if needed if (pad_) RETURN_IF_NOT_OK(PadColumns(&table_pair.first, pad_info_, column_name_id_map_)); // do padding if needed
(*db) = std::make_unique<DataBuffer>(table_pair.second.batch_num_, DataBuffer::kDeBFlagNone); RETURN_IF_NOT_OK(BatchRows(&table_pair.first, new_row, table_pair.first->size()));
std::unique_ptr<TensorQTable> dest_table = std::make_unique<TensorQTable>();
RETURN_IF_NOT_OK(BatchRows(&table_pair.first, &dest_table, table_pair.first->size()));
(*db)->set_tensor_table(std::move(dest_table));
return Status::OK(); return Status::OK();
} }
@ -575,14 +566,14 @@ int64_t BatchOp::GetTreeBatchSize() {
return start_batch_size_; return start_batch_size_;
} }
Status BatchOp::GetNextRow(TensorRow *row) { Status BatchOp::GetNextRowPullMode(TensorRow *row) {
std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>(); std::unique_ptr<TensorQTable> table = std::make_unique<TensorQTable>();
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0); child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
int32_t cur_batch_size = 0; int32_t cur_batch_size = 0;
RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, batch_num_, batch_cnt_))); RETURN_IF_NOT_OK(GetBatchSize(&cur_batch_size, CBatchInfo(0, batch_num_, batch_cnt_)));
for (int i = 0; i < cur_batch_size; i++) { for (int i = 0; i < cur_batch_size; i++) {
TensorRow new_row; TensorRow new_row;
RETURN_IF_NOT_OK(child_[0]->GetNextRow(&new_row)); RETURN_IF_NOT_OK(child_[0]->GetNextRowPullMode(&new_row));
if (!new_row.empty()) { if (!new_row.empty()) {
table->emplace_back(new_row); table->emplace_back(new_row);
if (table->size() == static_cast<size_t>(cur_batch_size)) break; if (table->size() == static_cast<size_t>(cur_batch_size)) break;
@ -592,13 +583,10 @@ Status BatchOp::GetNextRow(TensorRow *row) {
} }
} }
} }
std::unique_ptr<TensorQTable> out = std::make_unique<TensorQTable>();
RETURN_UNEXPECTED_IF_NULL(table); RETURN_UNEXPECTED_IF_NULL(table);
if (pad_) RETURN_IF_NOT_OK(PadColumns(&table, pad_info_, column_name_id_map_)); // do padding if needed if (pad_) RETURN_IF_NOT_OK(PadColumns(&table, pad_info_, column_name_id_map_)); // do padding if needed
if (!table->empty()) { if (!table->empty()) {
RETURN_IF_NOT_OK(BatchRows(&table, &out, table->size())); RETURN_IF_NOT_OK(BatchRows(&table, row, table->size()));
CHECK_FAIL_RETURN_UNEXPECTED(out->size() == 1, "Batch returned 2 rows while 1 row was expected.");
*row = out->back();
batch_cnt_++; batch_cnt_++;
batch_num_++; batch_num_++;
} }

@ -203,8 +203,7 @@ class BatchOp : public ParallelOp {
// @param int32_t size - batch_size // @param int32_t size - batch_size
// @param const std::unordered_map<std::string, int32_t>& column_name_id_map - column names to index mapping // @param const std::unordered_map<std::string, int32_t>& column_name_id_map - column names to index mapping
// @return Status The status code returned // @return Status The status code returned
static Status BatchRows(const std::unique_ptr<TensorQTable> *src, const std::unique_ptr<TensorQTable> *dest, static Status BatchRows(const std::unique_ptr<TensorQTable> *src, TensorRow *dest, dsize_t batch_size);
dsize_t batch_size);
// @param table // @param table
// @param const PadInfo &pad_info pad info // @param const PadInfo &pad_info pad info
@ -226,8 +225,7 @@ class BatchOp : public ParallelOp {
// Generate buffer with batched tensors // Generate buffer with batched tensors
// @return Status The status code returned // @return Status The status code returned
Status MakeBatchedBuffer(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair, Status MakeBatchedBuffer(std::pair<std::unique_ptr<TensorQTable>, CBatchInfo> table_pair, TensorRow *new_row);
std::unique_ptr<DataBuffer> *db);
#ifdef ENABLE_PYTHON #ifdef ENABLE_PYTHON
// Function that calls pyfunc to perform map on batch // Function that calls pyfunc to perform map on batch
@ -259,7 +257,7 @@ class BatchOp : public ParallelOp {
// @return Status The status code returned // @return Status The status code returned
Status LaunchThreadsAndInitOp(); Status LaunchThreadsAndInitOp();
Status GetNextRow(TensorRow *row) override; Status GetNextRowPullMode(TensorRow *row) override;
#ifdef ENABLE_PYTHON #ifdef ENABLE_PYTHON
// Invoke batch size function with current BatchInfo to generate batch size. // Invoke batch size function with current BatchInfo to generate batch size.

@ -136,11 +136,11 @@ Status BucketBatchByLengthOp::operator()() {
} }
// need to send EOE manually since we set state to idle in EoeRecieved() // need to send EOE manually since we set state to idle in EoeRecieved()
std::unique_ptr<DataBuffer> eoe_buffer = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE); RETURN_IF_NOT_OK(out_connector_->SendEOE());
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(eoe_buffer)));
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
} }
RETURN_IF_NOT_OK(out_connector_->SendEOF());
return Status::OK(); return Status::OK();
} }
@ -198,13 +198,11 @@ Status BucketBatchByLengthOp::PadAndBatchBucket(int32_t bucket_index, int32_t ba
// PadColumns will change the data in bucket // PadColumns will change the data in bucket
RETURN_IF_NOT_OK(BatchOp::PadColumns(bucket, pad_info_copy, column_name_id_map_)); RETURN_IF_NOT_OK(BatchOp::PadColumns(bucket, pad_info_copy, column_name_id_map_));
std::unique_ptr<TensorQTable> batched_bucket = std::make_unique<TensorQTable>(); TensorRow batched_bucket;
RETURN_IF_NOT_OK(BatchOp::BatchRows(bucket, &batched_bucket, batch_size)); RETURN_IF_NOT_OK(BatchOp::BatchRows(bucket, &batched_bucket, batch_size));
(*bucket)->clear(); (*bucket)->clear();
std::unique_ptr<DataBuffer> batched_buffer = std::make_unique<DataBuffer>(batch_count_, DataBuffer::kDeBFlagNone); RETURN_IF_NOT_OK(out_connector_->Add(std::move(batched_bucket), 0));
batched_buffer->set_tensor_table(std::move(batched_bucket));
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(batched_buffer)));
batch_count_++; batch_count_++;

@ -57,6 +57,7 @@ Status BuildSentencePieceVocabOp::operator()() {
RETURN_IF_NOT_OK(sentence_queue_->EmplaceBack(new_row)); RETURN_IF_NOT_OK(sentence_queue_->EmplaceBack(new_row));
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
} }
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
CHECK_FAIL_RETURN_UNEXPECTED(!eoe_warning, "no op should be after from_dataset (repeat detected)"); CHECK_FAIL_RETURN_UNEXPECTED(!eoe_warning, "no op should be after from_dataset (repeat detected)");
eoe_warning = true; eoe_warning = true;
} }
@ -91,8 +92,8 @@ Status BuildSentencePieceVocabOp::SentenceThread() {
} }
vocab_->set_model_proto(model_proto); vocab_->set_model_proto(model_proto);
} }
RETURN_IF_NOT_OK(out_connector_->Add(0, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE))); RETURN_IF_NOT_OK(out_connector_->SendEOE());
RETURN_IF_NOT_OK(out_connector_->Add(0, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF))); RETURN_IF_NOT_OK(out_connector_->SendEOF());
return Status::OK(); return Status::OK();
} }

@ -112,6 +112,7 @@ Status BuildVocabOp::operator()() {
RETURN_IF_NOT_OK(distributor_queue_->EmplaceBack(new_row)); RETURN_IF_NOT_OK(distributor_queue_->EmplaceBack(new_row));
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
} }
RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&new_row));
CHECK_FAIL_RETURN_UNEXPECTED(!eoe_warning, "no op should be after from_dataset (repeat detected)"); CHECK_FAIL_RETURN_UNEXPECTED(!eoe_warning, "no op should be after from_dataset (repeat detected)");
eoe_warning = true; eoe_warning = true;
} }
@ -184,8 +185,8 @@ Status BuildVocabOp::CollectorThread() {
for (const std::string &sp_tk : special_tokens_) vocab_->append_word(sp_tk); for (const std::string &sp_tk : special_tokens_) vocab_->append_word(sp_tk);
} }
RETURN_IF_NOT_OK(out_connector_->Add(0, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE))); RETURN_IF_NOT_OK(out_connector_->SendEOE());
RETURN_IF_NOT_OK(out_connector_->Add(0, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF))); RETURN_IF_NOT_OK(out_connector_->SendEOF());
// then use std::nth_element to partial sort // then use std::nth_element to partial sort
return Status::OK(); return Status::OK();
} }

@ -174,7 +174,6 @@ Status CacheBase::FetchSamplesToWorkers() {
} }
Status CacheBase::FetchFromCache(int32_t worker_id) { Status CacheBase::FetchFromCache(int32_t worker_id) {
int64_t buffer_id = worker_id;
std::unique_ptr<IOBlock> blk; std::unique_ptr<IOBlock> blk;
do { do {
RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&blk)); RETURN_IF_NOT_OK(io_block_queues_[worker_id]->PopFront(&blk));
@ -185,9 +184,9 @@ Status CacheBase::FetchFromCache(int32_t worker_id) {
wait_for_workers_post_.Set(); wait_for_workers_post_.Set();
} }
} else if (blk->eof()) { } else if (blk->eof()) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF))); RETURN_IF_NOT_OK(out_connector_->SendEOF(worker_id));
} else if (blk->eoe()) { } else if (blk->eoe()) {
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE))); RETURN_IF_NOT_OK(out_connector_->SendEOE(worker_id));
} else { } else {
std::vector<int64_t> keys; std::vector<int64_t> keys;
RETURN_IF_NOT_OK(blk->GetKeys(&keys)); RETURN_IF_NOT_OK(blk->GetKeys(&keys));
@ -195,8 +194,6 @@ Status CacheBase::FetchFromCache(int32_t worker_id) {
// empty key is a quit signal for workers // empty key is a quit signal for workers
break; break;
} }
std::unique_ptr<DataBuffer> db = std::make_unique<DataBuffer>(buffer_id, DataBuffer::kDeBFlagNone);
std::unique_ptr<TensorQTable> que = std::make_unique<TensorQTable>();
for (auto row_id : keys) { for (auto row_id : keys) {
TensorRow row; TensorRow row;
// Block until the row shows up in the pool. // Block until the row shows up in the pool.
@ -209,11 +206,8 @@ Status CacheBase::FetchFromCache(int32_t worker_id) {
RETURN_STATUS_UNEXPECTED(errMsg); RETURN_STATUS_UNEXPECTED(errMsg);
} }
} }
que->push_back(std::move(row)); RETURN_IF_NOT_OK(out_connector_->Add(std::move(row), worker_id));
} }
db->set_tensor_table(std::move(que));
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db)));
buffer_id += num_workers_;
} }
} while (true); } while (true);
return Status::OK(); return Status::OK();

@ -76,30 +76,21 @@ Status CacheMergeOp::operator()() {
// until it shows up in the pool. // until it shows up in the pool.
Status CacheMergeOp::WorkerEntry(int32_t worker_id) { Status CacheMergeOp::WorkerEntry(int32_t worker_id) {
TaskManager::FindMe()->Post(); TaskManager::FindMe()->Post();
std::shared_ptr<DatasetOp> cache_hit_stream = child_[kCacheHitChildIdx]; TensorRow new_row;
std::unique_ptr<DataBuffer> db_ptr; auto child_iterator = std::make_unique<ChildIterator>(this, worker_id, kCacheHitChildIdx);
RETURN_IF_NOT_OK(cache_hit_stream->GetNextBuffer(&db_ptr, worker_id)); RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&new_row));
while (!db_ptr->eof()) { while (!new_row.eof()) {
if (db_ptr->eoe()) { if (new_row.eoe()) {
RETURN_IF_NOT_OK(EoeReceived(worker_id)); RETURN_IF_NOT_OK(EoeReceived(worker_id));
db_ptr.reset(); RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&new_row));
RETURN_IF_NOT_OK(cache_hit_stream->GetNextBuffer(&db_ptr, worker_id));
} else { } else {
// See if there is any missing row if (new_row.empty()) {
auto tbl = std::make_unique<TensorQTable>(); auto row_id = new_row.getId();
while (db_ptr->NumRows() > 0) { // Block until the row shows up in the pool.
TensorRow row; RETURN_IF_NOT_OK(cache_miss_.PopFront(row_id, &new_row));
RETURN_IF_NOT_OK(db_ptr->PopRow(&row));
if (row.empty()) {
auto row_id = row.getId();
// Block until the row shows up in the pool.
RETURN_IF_NOT_OK(cache_miss_.PopFront(row_id, &row));
}
tbl->push_back(std::move(row));
} }
db_ptr->set_tensor_table(std::move(tbl)); RETURN_IF_NOT_OK(out_connector_->Add(std::move(new_row), worker_id));
RETURN_IF_NOT_OK(out_connector_->Add(worker_id, std::move(db_ptr))); RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&new_row));
RETURN_IF_NOT_OK(cache_hit_stream->GetNextBuffer(&db_ptr, worker_id));
} }
} }
RETURN_IF_NOT_OK(EofReceived(worker_id)); RETURN_IF_NOT_OK(EofReceived(worker_id));
@ -111,16 +102,16 @@ Status CacheMergeOp::CacheMissWorkerEntry(int32_t workerId) {
// We will simply pop TensorRow from the stream and insert them into the pool and // We will simply pop TensorRow from the stream and insert them into the pool and
// wake up any worker that is awaiting on the missing TensorRow. // wake up any worker that is awaiting on the missing TensorRow.
// If we see an eoe, ignore it. For eof, we exit. // If we see an eoe, ignore it. For eof, we exit.
std::shared_ptr<DatasetOp> cache_missing_stream = child_[kCacheMissChildIdx];
// Before we start, cache the schema at the server. Pick one of the workers // Before we start, cache the schema at the server. Pick one of the workers
// do it. The schema should have been done at prepare time. // do it. The schema should have been done at prepare time.
if (workerId == 0) { if (workerId == 0) {
RETURN_IF_NOT_OK(cache_client_->CacheSchema(column_name_id_map())); RETURN_IF_NOT_OK(cache_client_->CacheSchema(column_name_id_map()));
} }
std::unique_ptr<DataBuffer> db_ptr; TensorRow new_row;
RETURN_IF_NOT_OK(cache_missing_stream->GetNextBuffer(&db_ptr, workerId)); auto child_iterator = std::make_unique<ChildIterator>(this, workerId, kCacheMissChildIdx);
while (!db_ptr->eof()) { RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&new_row));
if (db_ptr->eoe()) { while (!new_row.eof()) {
if (new_row.eoe()) {
// Ignore it. // Ignore it.
MS_LOG(DEBUG) << "Ignore eoe"; MS_LOG(DEBUG) << "Ignore eoe";
// However we need to flush any left over from the async write buffer. But any error // However we need to flush any left over from the async write buffer. But any error
@ -135,36 +126,32 @@ Status CacheMergeOp::CacheMissWorkerEntry(int32_t workerId) {
} }
} }
} else { } else {
while (db_ptr->NumRows() > 0) { row_id_type row_id = new_row.getId();
TensorRow row; if (row_id < 0) {
RETURN_IF_NOT_OK(db_ptr->PopRow(&row)); std::string errMsg = "Expect positive row id: " + std::to_string(row_id);
row_id_type row_id = row.getId(); RETURN_STATUS_UNEXPECTED(errMsg);
if (row_id < 0) { }
std::string errMsg = "Expect positive row id: " + std::to_string(row_id); if (cache_missing_rows_) {
RETURN_STATUS_UNEXPECTED(errMsg); // Technically number of this row shows up in the cache miss stream is equal to the number
} // of P() call. However the cleaner wants it too. So we need an extra copy.
if (cache_missing_rows_) { TensorRowCacheRequest *rq;
// Technically number of this row shows up in the cache miss stream is equal to the number RETURN_IF_NOT_OK(GetRq(row_id, &rq));
// of P() call. However the cleaner wants it too. So we need an extra copy. if (rq->GetState() == TensorRowCacheRequest::State::kEmpty) {
TensorRowCacheRequest *rq; // We will send the request async. But any error we most
RETURN_IF_NOT_OK(GetRq(row_id, &rq)); // likely ignore and continue.
if (rq->GetState() == TensorRowCacheRequest::State::kEmpty) { Status rc;
// We will send the request async. But any error we most rc = rq->AsyncSendCacheRequest(cache_client_, new_row);
// likely ignore and continue. if (rc.IsOk()) {
Status rc; RETURN_IF_NOT_OK(io_que_->EmplaceBack(row_id));
rc = rq->AsyncSendCacheRequest(cache_client_, row); } else if (rc == StatusCode::kMDOutOfMemory || rc == kMDNoSpace) {
if (rc.IsOk()) { cache_missing_rows_ = false;
RETURN_IF_NOT_OK(io_que_->EmplaceBack(row_id)); cache_client_->ServerRunningOutOfResources();
} else if (rc == StatusCode::kMDOutOfMemory || rc == kMDNoSpace) {
cache_missing_rows_ = false;
cache_client_->ServerRunningOutOfResources();
}
} }
} }
RETURN_IF_NOT_OK(cache_miss_.Add(row_id, std::move(row)));
} }
RETURN_IF_NOT_OK(cache_miss_.Add(row_id, std::move(new_row)));
} }
RETURN_IF_NOT_OK(cache_missing_stream->GetNextBuffer(&db_ptr, workerId)); RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&new_row));
} }
return Status::OK(); return Status::OK();
} }
@ -265,14 +252,14 @@ Status CacheMergeOp::Builder::Build(std::shared_ptr<CacheMergeOp> *ptr) {
Status CacheMergeOp::EoeReceived(int32_t worker_id) { Status CacheMergeOp::EoeReceived(int32_t worker_id) {
// Send the eoe up. // Send the eoe up.
MS_LOG(DEBUG) << "Cache merge sending eoe"; MS_LOG(DEBUG) << "Cache merge sending eoe";
return DatasetOp::EoeReceived(worker_id); return out_connector_->SendEOE(worker_id);
} }
// Base-class override for handling cases when an eof is received. // Base-class override for handling cases when an eof is received.
Status CacheMergeOp::EofReceived(int32_t worker_id) { Status CacheMergeOp::EofReceived(int32_t worker_id) {
// Send the eof up. // Send the eof up.
MS_LOG(DEBUG) << "Cache merge sending eof"; MS_LOG(DEBUG) << "Cache merge sending eof";
return DatasetOp::EofReceived(worker_id); return out_connector_->SendEOF(worker_id);
} }
Status CacheMergeOp::GetRq(row_id_type row_id, CacheMergeOp::TensorRowCacheRequest **out) { Status CacheMergeOp::GetRq(row_id_type row_id, CacheMergeOp::TensorRowCacheRequest **out) {

@ -21,6 +21,7 @@
#include "minddata/dataset/include/constants.h" #include "minddata/dataset/include/constants.h"
#include "minddata/dataset/core/global_context.h" #include "minddata/dataset/core/global_context.h"
#include "minddata/dataset/engine/datasetops/repeat_op.h" #include "minddata/dataset/engine/datasetops/repeat_op.h"
#include "minddata/dataset/engine/dataset_iterator.h"
#include "minddata/dataset/engine/data_buffer.h" #include "minddata/dataset/engine/data_buffer.h"
#include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/engine/execution_tree.h"
#include "minddata/dataset/util/log_adapter.h" #include "minddata/dataset/util/log_adapter.h"
@ -104,15 +105,16 @@ Status CacheOp::CacheAllRows(int32_t worker_id) {
} }
MS_LOG(INFO) << "CacheOp first epoch SAVE mode started. Worker: " << worker_id; MS_LOG(INFO) << "CacheOp first epoch SAVE mode started. Worker: " << worker_id;
// SAVE mode loop // SAVE mode loop
std::unique_ptr<DataBuffer> db_ptr; TensorRow row;
RETURN_IF_NOT_OK(this->GetNextInput(&db_ptr, worker_id, 0)); auto child_iterator = std::make_unique<ChildIterator>(this, worker_id, 0);
while (!db_ptr->eof()) { RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&row));
if (!db_ptr->eoe()) { while (!row.eof()) {
if (!row.eoe()) {
Status rc; Status rc;
// Do the Async write if we attach to the shared memory. // Do the Async write if we attach to the shared memory.
rc = cache_client_->AsyncWriteBuffer(std::move(db_ptr)); rc = cache_client_->AsyncWriteRow(row);
if (rc.StatusCode() == StatusCode::kMDNotImplementedYet) { if (rc.StatusCode() == StatusCode::kMDNotImplementedYet) {
RETURN_IF_NOT_OK(cache_client_->WriteBuffer(std::move(db_ptr))); RETURN_IF_NOT_OK(cache_client_->WriteRow(row));
} else if (rc.IsError()) { } else if (rc.IsError()) {
return rc; return rc;
} }
@ -122,12 +124,13 @@ Status CacheOp::CacheAllRows(int32_t worker_id) {
// the eoe to indicate the end of the epoch, we should next expect to get the eof. // the eoe to indicate the end of the epoch, we should next expect to get the eof.
// Drain this eof so that we don't leave it sitting there on a connector that we'll never fetch // Drain this eof so that we don't leave it sitting there on a connector that we'll never fetch
// from again. // from again.
RETURN_IF_NOT_OK(this->GetNextInput(&db_ptr, worker_id, 0)); RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&row));
if (!db_ptr->eof()) { if (!row.eof()) {
RETURN_STATUS_UNEXPECTED("Cache op expects to get an eof after eoe from child."); RETURN_STATUS_UNEXPECTED("Cache op expects to get an eof after eoe from child.");
} }
break;
} }
RETURN_IF_NOT_OK(this->GetNextInput(&db_ptr, worker_id, 0)); RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&row));
} }
} }
// Let the main guy know we are done. // Let the main guy know we are done.

@ -78,9 +78,12 @@ void ConcatOp::Print(std::ostream &out, bool show_all) const {
// Main entry point for Concat // Main entry point for Concat
Status ConcatOp::operator()() { Status ConcatOp::operator()() {
children_num_ = static_cast<int32_t>(child_.size());
TaskManager::FindMe()->Post(); TaskManager::FindMe()->Post();
std::unique_ptr<DataBuffer> buf; children_num_ = static_cast<int32_t>(child_.size());
for (int32_t i = 0; i < children_num_; i++) {
children_iterators_.push_back(std::make_unique<ChildIterator>(this, 0, i));
}
TensorRow new_row;
int eof_count = 0; int eof_count = 0;
int sample_number = 0; int sample_number = 0;
bool is_not_mappable = true; bool is_not_mappable = true;
@ -95,26 +98,26 @@ Status ConcatOp::operator()() {
while (eof_count == 0) { while (eof_count == 0) {
for (int i = 0; i < children_num_; i++) { for (int i = 0; i < children_num_; i++) {
// 1. Read the first buffer // 1. Read the first row
RETURN_IF_NOT_OK(child_[i]->GetNextBuffer(&buf)); RETURN_IF_NOT_OK(children_iterators_[i]->FetchNextTensorRow(&new_row));
if (buf->eof()) { if (new_row.eof()) {
eof_count++; eof_count++;
continue; continue;
} }
// 2. Do verification as for column name, column data type and rank of column data // 2. Do verification as for column name, column data type and rank of column data
if (!buf->eoe()) { if (!new_row.eoe()) {
RETURN_IF_NOT_OK(Verify(i, buf)); RETURN_IF_NOT_OK(Verify(i, new_row));
} }
// 3. Put the data into output_connector // 3. Put the data into output_connector
if (!children_flag_and_nums_.empty()) { if (!children_flag_and_nums_.empty()) {
is_not_mappable = children_flag_and_nums_[i].first; is_not_mappable = children_flag_and_nums_[i].first;
is_not_mappable_or_second_ne_zero = is_not_mappable || (!children_flag_and_nums_[i].second); is_not_mappable_or_second_ne_zero = is_not_mappable || (!children_flag_and_nums_[i].second);
} }
while (!buf->eoe() && !buf->eof()) { while (!new_row.eoe() && !new_row.eof()) {
// if dataset is not mappable or generator dataset which source is yield, cannot get the number of samples in // if dataset is not mappable or generator dataset which source is yield, cannot get the number of samples in
// python layer), we use filtering to get data // python layer), we use filtering to get data
if (sample_number % num_shard == shard_index && is_not_mappable_or_second_ne_zero) { if (sample_number % num_shard == shard_index && is_not_mappable_or_second_ne_zero) {
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(buf))); RETURN_IF_NOT_OK(out_connector_->Add(std::move(new_row)));
} else if (!is_not_mappable_or_second_ne_zero) { } else if (!is_not_mappable_or_second_ne_zero) {
// if dataset is mappable or generator dataset which source is not yield, // if dataset is mappable or generator dataset which source is not yield,
// get the start and end subscripts of valid values // get the start and end subscripts of valid values
@ -122,7 +125,7 @@ Status ConcatOp::operator()() {
// determine whether the data allocated to the current shard id is false data // determine whether the data allocated to the current shard id is false data
if (f(fv, sv, shard_index)) { if (f(fv, sv, shard_index)) {
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(buf))); RETURN_IF_NOT_OK(out_connector_->Add(std::move(new_row)));
} }
} }
@ -131,7 +134,7 @@ Status ConcatOp::operator()() {
sample_number++; sample_number++;
} }
RETURN_IF_NOT_OK(child_[i]->GetNextBuffer(&buf)); RETURN_IF_NOT_OK(children_iterators_[i]->FetchNextTensorRow(&new_row));
} }
// if dataset is mappable,We don't use filtering to pick data. // if dataset is mappable,We don't use filtering to pick data.
@ -143,8 +146,7 @@ Status ConcatOp::operator()() {
// 4. Add eoe buffer after get buffer from all child // 4. Add eoe buffer after get buffer from all child
if (eof_count == 0) { if (eof_count == 0) {
auto eoe_buffer = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE); RETURN_IF_NOT_OK(out_connector_->SendEOE());
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(eoe_buffer)));
} }
UpdateRepeatAndEpochCounter(); UpdateRepeatAndEpochCounter();
} }
@ -152,15 +154,11 @@ Status ConcatOp::operator()() {
"Something went wrong, eof count does not match the number of children."); "Something went wrong, eof count does not match the number of children.");
// 5. Add eof buffer in the end manually // 5. Add eof buffer in the end manually
MS_LOG(DEBUG) << "Add the eof buffer manually in the end."; MS_LOG(DEBUG) << "Add the eof buffer manually in the end.";
auto eof_buffer = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF); RETURN_IF_NOT_OK(out_connector_->SendEOF());
RETURN_IF_NOT_OK(out_connector_->Add(0, std::move(eof_buffer)));
return Status::OK(); return Status::OK();
} }
Status ConcatOp::Verify(int32_t id, const std::unique_ptr<DataBuffer> &buf) { Status ConcatOp::Verify(int32_t id, const TensorRow &new_row) {
TensorRow new_row;
RETURN_IF_NOT_OK(buf->GetRow(0, &new_row));
if (id == 0) { if (id == 0) {
// Obtain the data type and data rank in child[0] // Obtain the data type and data rank in child[0]
for (auto item : new_row) { for (auto item : new_row) {

@ -21,6 +21,7 @@
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include <utility> #include <utility>
#include "minddata/dataset/engine/dataset_iterator.h"
#include "minddata/dataset/engine/datasetops/pipeline_op.h" #include "minddata/dataset/engine/datasetops/pipeline_op.h"
#include "minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.h" #include "minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.h"
@ -111,7 +112,7 @@ class ConcatOp : public PipelineOp {
Status GetNumClasses(int64_t *num_classes) override; Status GetNumClasses(int64_t *num_classes) override;
private: private:
Status Verify(int32_t id, const std::unique_ptr<DataBuffer> &buf); Status Verify(int32_t id, const TensorRow &tensor_row);
int32_t children_num_; // The num of child of parent node. int32_t children_num_; // The num of child of parent node.
std::unordered_map<std::string, int32_t> column_name_id_; // Mapping between col index and col name std::unordered_map<std::string, int32_t> column_name_id_; // Mapping between col index and col name
@ -120,6 +121,8 @@ class ConcatOp : public PipelineOp {
std::shared_ptr<SamplerRT> sampler_; std::shared_ptr<SamplerRT> sampler_;
std::vector<std::pair<int, int>> children_flag_and_nums_; std::vector<std::pair<int, int>> children_flag_and_nums_;
std::vector<std::pair<int, int>> children_start_end_index_; std::vector<std::pair<int, int>> children_start_end_index_;
std::vector<std::unique_ptr<ChildIterator>> children_iterators_; // Iterator for fetching.
}; };
} // namespace dataset } // namespace dataset
} // namespace mindspore } // namespace mindspore

@ -252,45 +252,15 @@ void DatasetOp::Print(std::ostream &out, bool show_all) const {
} }
} }
Status DatasetOp::GetNextRow(TensorRow *row) { Status DatasetOp::GetNextRowPullMode(TensorRow *row) {
RETURN_UNEXPECTED_IF_NULL(child_[0]); RETURN_UNEXPECTED_IF_NULL(child_[0]);
return child_[0]->GetNextRow(row); return child_[0]->GetNextRowPullMode(row);
} }
// Gets the next buffer from the given child // Gets the next buffer from the given child
Status DatasetOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, bool retry_if_eoe) { Status DatasetOp::GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) {
// pop is a blocked call and will throw an interruption if the whole group shuts down. // pop is a blocked call and will throw an interruption if the whole group shuts down.
RETURN_IF_NOT_OK(out_connector_->PopWithRetry(static_cast<int>(worker_id), p_buffer, retry_if_eoe)); RETURN_IF_NOT_OK(out_connector_->PopWithRetry(static_cast<int>(worker_id), row, retry_if_eoe));
return Status::OK();
}
// Gets the next buffer from the given child . This function also has built-in eoe and eof
// message handling so that child classes don't have to manually code pass-through logic when
// those messages are received.
Status DatasetOp::GetNextInput(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, int32_t child_index) {
if (child_.size() == 0) {
return this->GetNextBuffer(p_buffer, worker_id);
}
CHECK_FAIL_RETURN_UNEXPECTED(child_index < child_.size(),
"Invalid data, child index too big : " + std::to_string(child_index));
std::shared_ptr<DatasetOp> child = child_[child_index];
std::unique_ptr<DataBuffer> buf;
RETURN_IF_NOT_OK(child->GetNextBuffer(&buf, worker_id));
// Loop until non EOE is received
while (buf->eoe()) {
UpdateRepeatAndEpochCounter();
RETURN_IF_NOT_OK(EoeReceived(worker_id));
if (state_ == OpState::kDeOpIdle) {
*p_buffer = std::move(buf);
return Status::OK();
}
RETURN_IF_NOT_OK(child->GetNextBuffer(&buf, worker_id));
}
// Check if the last buf is next eof
if (buf->eof()) {
RETURN_IF_NOT_OK(EofReceived(worker_id));
}
*p_buffer = std::move(buf);
return Status::OK(); return Status::OK();
} }
@ -328,18 +298,12 @@ Status DatasetOp::GetClassIndexing(std::vector<std::pair<std::string, std::vecto
// Performs handling for when an eoe message is received. // Performs handling for when an eoe message is received.
// The base class implementation simply flows the eoe message to output. Derived classes // The base class implementation simply flows the eoe message to output. Derived classes
// may override if they need to perform special eoe handling. // may override if they need to perform special eoe handling.
Status DatasetOp::EoeReceived(int32_t worker_id) { Status DatasetOp::EoeReceived(int32_t worker_id) { return out_connector_->SendEOE(worker_id); }
std::unique_ptr<DataBuffer> eoe_buffer = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOE);
return (out_connector_->Add(static_cast<int>(worker_id), std::move(eoe_buffer)));
}
// Performs handling for when an eof message is received. // Performs handling for when an eof message is received.
// The base class implementation simply flows the eof message to output. Derived classes // The base class implementation simply flows the eof message to output. Derived classes
// may override if they need to perform special eof handling. // may override if they need to perform special eof handling.
Status DatasetOp::EofReceived(int32_t worker_id) { Status DatasetOp::EofReceived(int32_t worker_id) { return out_connector_->SendEOF(worker_id); }
std::unique_ptr<DataBuffer> eof_buffer = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagEOF);
return (out_connector_->Add(static_cast<int>(worker_id), std::move(eof_buffer)));
}
// During tree prepare phase, operators may have specific post-operations to perform depending on their role. // During tree prepare phase, operators may have specific post-operations to perform depending on their role.
Status DatasetOp::PrepareOperator() { Status DatasetOp::PrepareOperator() {

@ -129,7 +129,7 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
/// \param show_all - A bool to control if you want to show all info or just a summary /// \param show_all - A bool to control if you want to show all info or just a summary
virtual void Print(std::ostream &out, bool show_all) const; virtual void Print(std::ostream &out, bool show_all) const;
virtual Status GetNextRow(TensorRow *row); virtual Status GetNextRowPullMode(TensorRow *row);
/// \brief << Stream output operator overload /// \brief << Stream output operator overload
/// \notes This allows you to write the debug print info using stream operators /// \notes This allows you to write the debug print info using stream operators
@ -149,35 +149,17 @@ class DatasetOp : public std::enable_shared_from_this<DatasetOp> {
virtual Status operator()() = 0; virtual Status operator()() = 0;
/// \brief Gets the next buffer from the given child /// \brief Gets the next buffer from the given child
/// \notes See GetNextInput for similar function that has built-in message handling /// \param row[out] - Fetched TensorRow
/// \param p_buffer - The shared pointer for the fetched buffer to return (by reference) /// \param worker_id[in] - The worker id, default to 0.
/// \param worker_id - The worker id
/// \return Status The status code returned
virtual Status GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id) {
return GetNextBuffer(p_buffer, worker_id, false);
}
/// \brief Gets the next buffer from the given child
/// \notes See GetNextInput for similar function that has built-in message handling
/// \param p_buffer - The shared pointer for the fetched buffer to return (by reference)
/// \return Status The status code returned /// \return Status The status code returned
virtual Status GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer) { return GetNextBuffer(p_buffer, 0, false); } virtual Status GetNextRow(TensorRow *row, int32_t worker_id = 0) { return GetNextRow(row, worker_id, false); }
/// \brief Gets the next buffer from the given child /// \brief Gets the next buffer from the given child
/// \notes See GetNextInput for similar function that has built-in message handling /// \param row[out] - Fetched TensorRow
/// \param p_buffer - The shared pointer for the fetched buffer to return (by reference) /// \param worker_id[in] - The worker id, default to 0.
/// \param worker_id - The worker id
/// \param retry_if_eoe Set this flag to true to allow calling pop() again after the first pop() returns EOE. /// \param retry_if_eoe Set this flag to true to allow calling pop() again after the first pop() returns EOE.
/// \return Status The status code returned /// \return Status The status code returned
virtual Status GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, bool retry_if_eoe); virtual Status GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe);
/// \brief Gets the next buffer from the given child . This function also has built-in eoe and eof
/// message handling so that child classes don't have to manually code pass-through logic when
/// those messages are received.
/// \param p_buffer - The shared pointer for the fetched buffer to return (by reference)
/// \param worker_id - The worker id
/// \return Status The status code returned
Status GetNextInput(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id = 0, int32_t child_index = 0);
/// \brief Gets the batch size /// \brief Gets the batch size
/// \return Status - The status code return /// \return Status - The status code return

@ -93,21 +93,18 @@ Status DeviceQueueOp::EoeReceived(int32_t worker_id) {
return Status::OK(); return Status::OK();
} }
Status DeviceQueueOp::CheckExceptions(const std::unique_ptr<DataBuffer> &buffer) const { Status DeviceQueueOp::CheckExceptions(const TensorRow &row) const {
// this method checks if the buffer meets the conditions to be sent to TDT // this method checks if the row meets the conditions to be sent to TDT
if (buffer->NumRows() != 0) { for (const auto &item : row) {
TensorRow row; CHECK_FAIL_RETURN_UNEXPECTED(item->type().IsNumeric(), "Invalid data, cannot send string tensor to device.");
buffer->GetRow(0, &row); CHECK_FAIL_RETURN_UNEXPECTED(item->HasData(), "Invalid data, cannot send tensor with no data to device.");
for (const auto &item : row) {
CHECK_FAIL_RETURN_UNEXPECTED(item->type().IsNumeric(), "Invalid data, cannot send string tensor to device.");
CHECK_FAIL_RETURN_UNEXPECTED(item->HasData(), "Invalid data, cannot send tensor with no data to device.");
}
} }
return Status::OK(); return Status::OK();
} }
Status DeviceQueueOp::operator()() { Status DeviceQueueOp::operator()() {
TaskManager::FindMe()->Post(); TaskManager::FindMe()->Post();
child_iterator_ = std::make_unique<ChildIterator>(this, 0, 0);
#ifdef ENABLE_DUMP_IR #ifdef ENABLE_DUMP_IR
if (md_channel_info_ == nullptr) { if (md_channel_info_ == nullptr) {
@ -163,43 +160,39 @@ Status DeviceQueueOp::SendDataToAscend() {
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()); md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
md_channel_info_->RecordPreprocessBatch(0); md_channel_info_->RecordPreprocessBatch(0);
#endif #endif
std::unique_ptr<DataBuffer> current_buffer; TensorRow curr_row;
RETURN_IF_NOT_OK(GetNextInput(&current_buffer)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
while (!curr_row.eof() && !is_break_loop) {
while (!current_buffer->eof() && !is_break_loop) { while (!curr_row.eoe() && !is_break_loop) {
while (!current_buffer->eoe() && !is_break_loop) { RETURN_IF_NOT_OK(CheckExceptions(curr_row));
RETURN_IF_NOT_OK(CheckExceptions(current_buffer)); WaitContinueSignal();
TensorRow currRow;
for (int row_id = 0; row_id < current_buffer->NumRows(); row_id++) {
RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &currRow));
WaitContinueSignal();
#ifdef ENABLE_DUMP_IR #ifdef ENABLE_DUMP_IR
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()); md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
md_channel_info_->RecordPreprocessBatch(send_batch); md_channel_info_->RecordPreprocessBatch(send_batch);
md_channel_info_->RecordPushStartTime(); md_channel_info_->RecordPushStartTime();
#endif #endif
RETURN_IF_NOT_OK(SendRowToTdt(currRow, isProfilingEnable, &tdt_cost)); RETURN_IF_NOT_OK(SendRowToTdt(curr_row, isProfilingEnable, &tdt_cost));
ProfilingRecorder(isProfilingEnable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time, ProfilingRecorder(isProfilingEnable, profiling_node, send_batch, tdt_cost, &batch_start_time, &end_time,
connector_capacity, connector_size); connector_capacity, connector_size);
send_batch++; send_batch++;
#ifdef ENABLE_DUMP_IR #ifdef ENABLE_DUMP_IR
md_channel_info_->RecordBatchQueue(ChildOpConnectorSize()); md_channel_info_->RecordBatchQueue(ChildOpConnectorSize());
md_channel_info_->RecordPreprocessBatch(send_batch); md_channel_info_->RecordPreprocessBatch(send_batch);
md_channel_info_->RecordPushEndTime(); md_channel_info_->RecordPushEndTime();
#endif #endif
if (total_batch_ > 0 && send_batch >= total_batch_) { if (total_batch_ > 0 && send_batch >= total_batch_) {
is_break_loop = true; is_break_loop = true;
break; break;
}
} }
if (isProfilingEnable) { if (isProfilingEnable) {
connector_size = ChildOpConnectorSize(); connector_size = ChildOpConnectorSize();
connector_capacity = ChildOpConnectorCapacity(); connector_capacity = ChildOpConnectorCapacity();
} }
RETURN_IF_NOT_OK(GetNextInput(&current_buffer)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
} }
if (current_buffer->eoe() && send_epoch_end_) { if (curr_row.eoe() && send_epoch_end_) {
TensorRow currRow; TensorRow currRow;
auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost, auto status = tdtInstancePtr->hostPush(currRow, true, channel_name_, isProfilingEnable, tdt_cost,
ACL_TENSOR_DATA_END_OF_SEQUENCE); ACL_TENSOR_DATA_END_OF_SEQUENCE);
@ -219,7 +212,7 @@ Status DeviceQueueOp::SendDataToAscend() {
connector_capacity = ChildOpConnectorCapacity(); connector_capacity = ChildOpConnectorCapacity();
tree_->SetEpochEnd(); tree_->SetEpochEnd();
} }
RETURN_IF_NOT_OK(GetNextInput(&current_buffer)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
} }
// now we use this flag to judge whether exception raised. // now we use this flag to judge whether exception raised.
@ -444,27 +437,23 @@ Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
// Every thread use cuda api should SetThreadDevice // Every thread use cuda api should SetThreadDevice
RETURN_IF_NOT_OK(SetThreadDevice()); RETURN_IF_NOT_OK(SetThreadDevice());
TaskManager::FindMe()->Post(); TaskManager::FindMe()->Post();
std::unique_ptr<DataBuffer> current_buffer; TensorRow current_row;
uint32_t batch_num = 0; uint32_t batch_num = 0;
RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(&current_buffer)); RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(&current_row));
while (!current_buffer->quit() && !GpuBufferMgr::GetInstance().IsClosed()) { while (!current_row.quit() && !GpuBufferMgr::GetInstance().IsClosed()) {
TensorRow curr_row; std::vector<device::DataItemGpu> items;
for (int row_id = 0; row_id < current_buffer->NumRows() && !GpuBufferMgr::GetInstance().IsClosed(); row_id++) { for (int i = 0; i < current_row.size(); i++) {
RETURN_IF_NOT_OK(current_buffer->GetRow(row_id, &curr_row)); device::DataItemGpu data_item;
std::vector<device::DataItemGpu> items; data_item.data_len_ = static_cast<size_t>(current_row[i]->SizeInBytes());
for (int i = 0; i < curr_row.size(); i++) { data_item.data_ptr_ = nullptr;
device::DataItemGpu data_item; data_item.worker_id_ = worker_id;
data_item.data_len_ = static_cast<size_t>(curr_row[i]->SizeInBytes()); items.push_back(data_item);
data_item.data_ptr_ = nullptr;
data_item.worker_id_ = worker_id;
items.push_back(data_item);
}
RETURN_IF_NOT_OK(MallocForGPUData(&items, curr_row, worker_id));
RETURN_IF_NOT_OK(gpu_item_connector_->Add(worker_id, std::move(items)));
batch_num++;
} }
RETURN_IF_NOT_OK(MallocForGPUData(&items, current_row, worker_id));
RETURN_IF_NOT_OK(gpu_item_connector_->Add(worker_id, std::move(items)));
batch_num++;
RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(&current_buffer)); RETURN_IF_NOT_OK(receive_queues_[worker_id]->PopFront(&current_row));
} }
MS_LOG(INFO) << "Device queue worker id " << worker_id << "proc " << batch_num << "batch."; MS_LOG(INFO) << "Device queue worker id " << worker_id << "proc " << batch_num << "batch.";
@ -477,31 +466,31 @@ Status DeviceQueueOp::WorkerEntry(int32_t worker_id) {
Status DeviceQueueOp::SendDataToGPU() { Status DeviceQueueOp::SendDataToGPU() {
RETURN_IF_NOT_OK(LaunchParallelCopyThread()); RETURN_IF_NOT_OK(LaunchParallelCopyThread());
MS_LOG(INFO) << "Device queue, sending data to GPU."; MS_LOG(INFO) << "Device queue, sending data to GPU.";
std::unique_ptr<DataBuffer> current_buffer; TensorRow current_row;
RETURN_IF_NOT_OK(GetNextInput(&current_buffer)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
int64_t num_buf = 0; int64_t num_buf = 0;
bool is_break_loop = false; bool is_break_loop = false;
while (!current_buffer->eof() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) { while (!current_row.eof() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
while (!current_buffer->eoe() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) { while (!current_row.eoe() && !is_break_loop && !GpuBufferMgr::GetInstance().IsClosed()) {
RETURN_IF_NOT_OK(CheckExceptions(current_buffer)); RETURN_IF_NOT_OK(CheckExceptions(current_row));
RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(current_buffer))); RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(current_row)));
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) { if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
RETURN_IF_NOT_OK(GetNextInput(&current_buffer)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
} else { } else {
is_break_loop = true; is_break_loop = true;
} }
} }
if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) { if (!TaskManager::FindMe()->Interrupted() && !GpuBufferMgr::GetInstance().IsClosed()) {
RETURN_IF_NOT_OK(GetNextInput(&current_buffer)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&current_row));
} else { } else {
is_break_loop = true; is_break_loop = true;
} }
} }
for (uint32_t index = 0; index < num_workers_; index++) { for (uint32_t index = 0; index < num_workers_; index++) {
auto quit = std::make_unique<DataBuffer>(0, DataBuffer::kDeBFlagQuit); TensorRow quit_flag(TensorRow::kFlagQuit);
RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(quit))); RETURN_IF_NOT_OK(receive_queues_[num_buf++ % num_workers_]->Add(std::move(quit_flag)));
} }
MS_LOG(INFO) << "Device queue receive " << num_buf - num_workers_ << " batch."; MS_LOG(INFO) << "Device queue receive " << num_buf - num_workers_ << " batch.";
@ -537,10 +526,9 @@ Status DeviceQueueOp::SendDataToCPU() {
MS_LOG(INFO) << "Device queue, sending data to CPU."; MS_LOG(INFO) << "Device queue, sending data to CPU.";
int64_t total_batch = 0; int64_t total_batch = 0;
std::unique_ptr<ChildIterator> child_iterator = std::make_unique<ChildIterator>(this, 0, 0); while (!(child_iterator_->eof_handled())) {
while (!(child_iterator->eof_handled())) {
TensorRow curr_row; TensorRow curr_row;
RETURN_IF_NOT_OK(child_iterator->FetchNextTensorRow(&curr_row)); RETURN_IF_NOT_OK(child_iterator_->FetchNextTensorRow(&curr_row));
if (!curr_row.empty()) { if (!curr_row.empty()) {
for (auto &tensor : curr_row) { for (auto &tensor : curr_row) {

@ -23,6 +23,8 @@
#include "minddata/dataset/engine/datasetops/pipeline_op.h" #include "minddata/dataset/engine/datasetops/pipeline_op.h"
#include "minddata/dataset/engine/datasetops/repeat_op.h" #include "minddata/dataset/engine/datasetops/repeat_op.h"
#include "minddata/dataset/engine/dataset_iterator.h"
#include "minddata/dataset/engine/perf/device_queue_tracing.h" #include "minddata/dataset/engine/perf/device_queue_tracing.h"
#include "minddata/dataset/util/status.h" #include "minddata/dataset/util/status.h"
#ifdef ENABLE_DUMP_IR #ifdef ENABLE_DUMP_IR
@ -182,9 +184,9 @@ class DeviceQueueOp : public PipelineOp {
std::string Name() const override { return kDeviceQueueOp; } std::string Name() const override { return kDeviceQueueOp; }
private: private:
// Name: checkExceptions(DataBuffer); // Name: checkExceptions(TensorRow);
// Description: Check whether the dataBuffer meets the condition for performing DeviceQueueOp // Description: Check whether the TensorRow meets the condition for performing DeviceQueueOp
Status CheckExceptions(const std::unique_ptr<DataBuffer> &buffer) const; Status CheckExceptions(const TensorRow &row) const;
private: private:
#ifdef ENABLE_TDTQUE #ifdef ENABLE_TDTQUE
@ -204,7 +206,7 @@ class DeviceQueueOp : public PipelineOp {
Status WorkerEntry(int32_t worker_id); Status WorkerEntry(int32_t worker_id);
Status SetThreadDevice(); Status SetThreadDevice();
QueueList<std::unique_ptr<DataBuffer>> receive_queues_; QueueList<TensorRow> receive_queues_;
std::vector<std::shared_ptr<MemoryPool>> pool_; std::vector<std::shared_ptr<MemoryPool>> pool_;
std::unique_ptr<GpuItemConnector> gpu_item_connector_; std::unique_ptr<GpuItemConnector> gpu_item_connector_;
uint32_t num_workers_; uint32_t num_workers_;
@ -216,6 +218,8 @@ class DeviceQueueOp : public PipelineOp {
#endif #endif
Status SendDataToCPU(); Status SendDataToCPU();
std::unique_ptr<ChildIterator> child_iterator_;
std::string channel_name_; std::string channel_name_;
DeviceType device_type_; DeviceType device_type_;
const int32_t device_id_; const int32_t device_id_;

@ -61,24 +61,21 @@ void EpochCtrlOp::Print(std::ostream &out, bool show_all) const {
} }
} }
Status EpochCtrlOp::GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, bool retry_if_eoe) { Status EpochCtrlOp::GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) {
if (child_.empty()) { if (child_.empty()) {
RETURN_STATUS_UNEXPECTED("EpochCtrlOp can't be the leaf node."); RETURN_STATUS_UNEXPECTED("EpochCtrlOp can't be the leaf node.");
} }
std::unique_ptr<DataBuffer> buf;
// `retry_if_eoe` is false because EpochCtrlOp does not eat EOE. // `retry_if_eoe` is false because EpochCtrlOp does not eat EOE.
RETURN_IF_NOT_OK(child_[0]->GetNextBuffer(&buf, worker_id, false)); RETURN_IF_NOT_OK(child_[0]->GetNextRow(row, worker_id, false));
// Only intercept EOE for EoeReceived processing, after that the EOE is forwarded to next op. // Only intercept EOE for EoeReceived processing, after that the EOE is forwarded to next op.
// Other databuffers containing data or EOF will simply be forwarded. // Other databuffers containing data or EOF will simply be forwarded.
// EOF can simply be forwarded because this op does not spawn any thread, thus does not require clean up. // EOF can simply be forwarded because this op does not spawn any thread, thus does not require clean up.
if (buf->eoe()) { if (row->eoe()) {
RETURN_IF_NOT_OK(EoeReceived(worker_id)); RETURN_IF_NOT_OK(EoeReceived(worker_id));
} }
*p_buffer = std::move(buf);
return Status::OK(); return Status::OK();
} }

@ -59,7 +59,7 @@ class EpochCtrlOp : public RepeatOp {
// Since EpochCtrlOp is derived from RepeatOp which is an inlined op, getting a buffer from us // Since EpochCtrlOp is derived from RepeatOp which is an inlined op, getting a buffer from us
// will simply bounce you to get a buffer from our child. // will simply bounce you to get a buffer from our child.
// Epoch Control Op does not eat the EOE, it will pass the EOE to the next op. // Epoch Control Op does not eat the EOE, it will pass the EOE to the next op.
Status GetNextBuffer(std::unique_ptr<DataBuffer> *p_buffer, int32_t worker_id, bool retry_if_eoe) override; Status GetNextRow(TensorRow *row, int32_t worker_id, bool retry_if_eoe) override;
// Base-class override for handling cases when an eoe is received. // Base-class override for handling cases when an eoe is received.
// @param worker_id - The worker id // @param worker_id - The worker id

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save