From 8535e89e6c6566c3ffb4507b6fc4e31fcdc747df Mon Sep 17 00:00:00 2001 From: hesham Date: Mon, 5 Apr 2021 16:15:58 -0400 Subject: [PATCH] Remove DataBuffer --- .../minddata/dataset/engine/CMakeLists.txt | 1 - .../dataset/engine/cache/cache_client.h | 2 +- .../engine/cache/perf/cache_pipeline_run.cc | 2 +- .../dataset/engine/consumers/tree_consumer.cc | 1 - .../minddata/dataset/engine/data_buffer.cc | 89 -------------- .../minddata/dataset/engine/data_buffer.h | 114 ------------------ .../dataset/engine/dataset_iterator.cc | 2 +- .../dataset/engine/datasetops/barrier_op.cc | 2 +- .../dataset/engine/datasetops/barrier_op.h | 1 - .../dataset/engine/datasetops/batch_op.cc | 2 +- .../dataset/engine/datasetops/batch_op.h | 1 - .../datasetops/bucket_batch_by_length_op.h | 1 - .../engine/datasetops/cache_base_op.cc | 10 +- .../engine/datasetops/cache_lookup_op.cc | 9 +- .../engine/datasetops/cache_lookup_op.h | 2 +- .../dataset/engine/datasetops/cache_op.cc | 2 +- .../dataset/engine/datasetops/concat_op.cc | 2 +- .../dataset/engine/datasetops/dataset_op.cc | 2 +- .../dataset/engine/datasetops/dataset_op.h | 2 - .../engine/datasetops/device_queue_op.cc | 2 +- .../engine/datasetops/epoch_ctrl_op.cc | 2 +- .../dataset/engine/datasetops/filter_op.cc | 2 +- .../engine/datasetops/map_op/map_op.cc | 2 +- .../dataset/engine/datasetops/map_op/map_op.h | 1 - .../dataset/engine/datasetops/parallel_op.h | 2 - .../dataset/engine/datasetops/pipeline_op.h | 2 - .../dataset/engine/datasetops/project_op.cc | 2 +- .../dataset/engine/datasetops/rename_op.cc | 2 +- .../dataset/engine/datasetops/rename_op.h | 3 - .../dataset/engine/datasetops/repeat_op.cc | 2 +- .../dataset/engine/datasetops/shuffle_op.cc | 2 +- .../dataset/engine/datasetops/shuffle_op.h | 2 - .../dataset/engine/datasetops/skip_op.cc | 2 +- .../engine/datasetops/source/album_op.cc | 4 +- .../engine/datasetops/source/album_op.h | 2 +- .../engine/datasetops/source/cifar_op.h | 2 +- .../engine/datasetops/source/coco_op.h | 2 +- .../engine/datasetops/source/generator_op.cc | 4 +- .../datasetops/source/image_folder_op.h | 2 +- .../engine/datasetops/source/manifest_op.h | 2 +- .../datasetops/source/mappable_leaf_op.cc | 14 +-- .../datasetops/source/mappable_leaf_op.h | 2 +- .../engine/datasetops/source/mindrecord_op.cc | 2 +- .../engine/datasetops/source/mindrecord_op.h | 1 - .../engine/datasetops/source/mnist_op.h | 2 +- .../source/sampler/distributed_sampler.cc | 45 +++---- .../source/sampler/distributed_sampler.h | 2 +- .../datasetops/source/sampler/pk_sampler.cc | 12 +- .../datasetops/source/sampler/pk_sampler.h | 2 +- .../source/sampler/python_sampler.cc | 8 +- .../source/sampler/python_sampler.h | 2 +- .../source/sampler/random_sampler.cc | 12 +- .../source/sampler/random_sampler.h | 2 +- .../datasetops/source/sampler/sampler.cc | 21 ++-- .../datasetops/source/sampler/sampler.h | 10 +- .../source/sampler/sequential_sampler.cc | 16 ++- .../source/sampler/sequential_sampler.h | 2 +- .../source/sampler/subset_sampler.cc | 14 +-- .../source/sampler/subset_sampler.h | 4 +- .../source/sampler/weighted_random_sampler.cc | 16 ++- .../source/sampler/weighted_random_sampler.h | 2 +- .../dataset/engine/datasetops/source/voc_op.h | 2 +- .../dataset/engine/datasetops/take_op.cc | 2 +- .../dataset/engine/datasetops/zip_op.cc | 2 +- .../dataset/engine/datasetops/zip_op.h | 2 - .../minddata/dataset/engine/db_connector.h | 3 +- .../dataset/engine/jagged_connector.h | 2 +- mindspore/lite/minddata/CMakeLists.txt | 1 - .../lite/minddata/wrapper/album_op_android.h | 24 ++-- .../cpp/dataset/distributed_sampler_test.cc | 29 ++--- tests/ut/cpp/dataset/rename_op_test.cc | 2 +- .../cpp/dataset/stand_alone_samplers_test.cc | 29 ++--- .../cpp/dataset/subset_random_sampler_test.cc | 33 ++--- tests/ut/cpp/dataset/subset_sampler_test.cc | 36 +++--- .../dataset/weighted_random_sampler_test.cc | 76 ++++++------ tests/ut/cpp/dataset/zip_op_test.cc | 2 +- tests/ut/python/dataset/test_cache_map.py | 16 +-- 77 files changed, 234 insertions(+), 511 deletions(-) delete mode 100644 mindspore/ccsrc/minddata/dataset/engine/data_buffer.cc delete mode 100644 mindspore/ccsrc/minddata/dataset/engine/data_buffer.h diff --git a/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt b/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt index f329f59813..92b0c5f306 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt +++ b/mindspore/ccsrc/minddata/dataset/engine/CMakeLists.txt @@ -13,7 +13,6 @@ file(GLOB_RECURSE _CURRENT_SRC_FILES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "*.cc" set_property(SOURCE ${_CURRENT_SRC_FILES} PROPERTY COMPILE_DEFINITIONS SUBMODULE_ID=mindspore::SubModuleId::SM_MD) set(SRC_FILES_LIST execution_tree.cc - data_buffer.cc data_schema.cc dataset_iterator.cc tree_adapter.cc diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.h b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.h index 98d4de247c..ae51df1714 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.h +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/cache_client.h @@ -34,7 +34,7 @@ #else #include "minddata/dataset/engine/cache/stub/cache_grpc_client.h" #endif -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/util/lock.h" #include "minddata/dataset/util/cond_var.h" #include "minddata/dataset/util/queue_map.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.cc b/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.cc index 27c1887625..245a08dd70 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/cache/perf/cache_pipeline_run.cc @@ -22,7 +22,7 @@ #include #include #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/util/random.h" #include "minddata/dataset/util/services.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc index ac43271aa5..b7920a3ebb 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/consumers/tree_consumer.cc @@ -115,7 +115,6 @@ Status IteratorConsumer::GetNextAsOrderedPair(std::vector d) { return tree_adapter_->Compile(std::move(d), num_epochs_); } Status ToDevice::Send() { - std::unique_ptr db; RETURN_IF_NOT_OK(tree_adapter_->Launch()); std::shared_ptr root = std::shared_ptr(tree_adapter_->GetRoot()); CHECK_FAIL_RETURN_UNEXPECTED(root != nullptr, "Root is a nullptr."); diff --git a/mindspore/ccsrc/minddata/dataset/engine/data_buffer.cc b/mindspore/ccsrc/minddata/dataset/engine/data_buffer.cc deleted file mode 100644 index b36aae6837..0000000000 --- a/mindspore/ccsrc/minddata/dataset/engine/data_buffer.cc +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Copyright 2019 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "minddata/dataset/engine/data_buffer.h" -#include "minddata/dataset/util/allocator.h" -#include "minddata/dataset/core/global_context.h" -#include "minddata/dataset/core/tensor.h" - -namespace mindspore { -namespace dataset { -// Name: Constructor #1 -// Description: This is the main constructor that is used for making a buffer -DataBuffer::DataBuffer(int32_t id, BufferFlags flags) : buffer_id_(id), tensor_table_(nullptr), buffer_flags_(flags) {} - -// A method for debug printing of the buffer -void DataBuffer::Print(std::ostream &out, bool show_all) const { - out << "bufferId: " << buffer_id_ << "\nflags: " << std::hex << buffer_flags_ << std::dec << "\n"; - - // If the column counts are set then it means that data has been set into - // the tensor table. Display the tensor table here. - if (this->NumCols() > 0) { - out << "Tensor table:\n"; - for (int32_t row = 0; row < DataBuffer::NumRows(); ++row) { - out << "Row # : " << row << "\n"; - TensorRow currRow = (*tensor_table_)[row]; - for (int32_t col = 0; col < this->NumCols(); ++col) { - out << "Column #: " << col << "\n"; // Should add the column name here as well? - // Call the tensor display - out << *(currRow[col]) << "\n"; - } - } - } -} - -// Remove me!! Callers should fetch rows via pop -Status DataBuffer::GetTensor(std::shared_ptr *ptr, int32_t row_id, int32_t col_id) const { - if (row_id < tensor_table_->size() && col_id < tensor_table_->at(row_id).size()) { - *ptr = (tensor_table_->at(row_id)).at(col_id); - } else { - std::string err_msg = - "indices for mTensorTable out of range: (" + std::to_string(row_id) + "," + std::to_string(col_id) + ")."; - RETURN_STATUS_UNEXPECTED(err_msg); - } - return Status::OK(); -} - -// Remove me!! Callers should fetch rows via pop -Status DataBuffer::GetRow(int32_t row_id, TensorRow *ptr) const { - if (tensor_table_ && !tensor_table_->empty() && row_id < tensor_table_->size()) { - *ptr = tensor_table_->at(row_id); - } else { - std::string err_msg = "rowId for mTensorTable out of range: " + std::to_string(row_id); - RETURN_STATUS_UNEXPECTED(err_msg); - } - - return Status::OK(); -} - -Status DataBuffer::PopRow(TensorRow *ptr) { - if (tensor_table_ && !tensor_table_->empty()) { - *ptr = std::move(tensor_table_->front()); - tensor_table_->pop_front(); - } - - return Status::OK(); -} - -Status DataBuffer::SliceOff(int64_t number_of_rows) { - while (number_of_rows > 0) { - tensor_table_->pop_back(); - number_of_rows--; - } - - return Status::OK(); -} -} // namespace dataset -} // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/data_buffer.h b/mindspore/ccsrc/minddata/dataset/engine/data_buffer.h deleted file mode 100644 index 81dcbadd20..0000000000 --- a/mindspore/ccsrc/minddata/dataset/engine/data_buffer.h +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Copyright 2019-2021 Huawei Technologies Co., Ltd - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#ifndef MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATA_BUFFER_H_ -#define MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATA_BUFFER_H_ - -#include -#include -#include -#include -#include -#include "minddata/dataset/util/allocator.h" -#include "minddata/dataset/util/status.h" -#include "minddata/dataset/include/constants.h" -#include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/core/tensor_row.h" - -namespace mindspore { -namespace dataset { -/// \brief The DataBuffer class is a container of tensor data and is the unit of transmission between -/// connectors of dataset operators. Inside the buffer, tensors are organized into a table-like format -/// where n TensorRows may consist of m tensors (columns). -class DataBuffer { - public: - // Buffer flags - enum BufferFlags : uint32_t { - kDeBFlagNone = 0, - kDeBFlagEOF = 1, // The buffer is an eof end-of-data msg - kDeBFlagEOE = 1u << 1, // The buffer is an eoe end-of-epoch msg - kDeBFlagWait = 1u << 2, // The buffer is an control signal for workers to suspend operations - kDeBFlagQuit = 1u << 3 // The buffer is a control signal for workers to quit - }; - - // Name: Constructor #1 - // Description: This is the main constructor that is used for making a buffer - DataBuffer(int32_t id, BufferFlags flags); - - /// \brief default destructor - ~DataBuffer() = default; - - /// \brief A method for debug printing of the buffer - /// \param[in/out] out The stream to write to - /// \param[in] show_all A boolean to toggle between details and summary printing - void Print(std::ostream &out, bool show_all) const; - - // Provide stream operator for displaying it - friend std::ostream &operator<<(std::ostream &out, const DataBuffer &cb) { - cb.Print(out, false); - return out; - } - - // Convenience getter functions for flag checking - bool eof() const { return (static_cast(buffer_flags_) & static_cast(kDeBFlagEOF)); } - - bool eoe() const { return (static_cast(buffer_flags_) & static_cast(kDeBFlagEOE)); } - - bool wait() const { return (static_cast(buffer_flags_) & static_cast(kDeBFlagWait)); } - - bool quit() const { return (static_cast(buffer_flags_) & static_cast(kDeBFlagQuit)); } - - // Simple getter funcs - int32_t id() const { return buffer_id_; } - - void set_id(int32_t id) { buffer_id_ = id; } - - int32_t NumRows() const { return ((tensor_table_) ? tensor_table_->size() : 0); } - - int32_t NumCols() const { - return (tensor_table_ == nullptr || tensor_table_->empty()) ? 0 : tensor_table_->at(0).size(); - } - - BufferFlags buffer_flags() const { return buffer_flags_; } - - // Remove me!! Callers should fetch rows via pop - Status GetTensor(std::shared_ptr *, int32_t row_id, int32_t col_id) const; - - // Remove me!! Callers should drain rows via pop. - Status GetRow(int32_t row_id, TensorRow *) const; - - // Get a row from the TensorTable - Status PopRow(TensorRow *); - - Status SliceOff(int64_t number_of_rows); - - // Replacing mTensorTable, the unique_ptr assignment will release the old TensorTable. - void set_tensor_table(std::unique_ptr new_table) { tensor_table_ = std::move(new_table); } - - void set_flag(BufferFlags in_flag) { - buffer_flags_ = static_cast(static_cast(buffer_flags_) | static_cast(in_flag)); - } - - void Shuffle() {} // does nothing right now. possibly remove later - - protected: - int32_t buffer_id_; // An id for the buffer. - std::unique_ptr tensor_table_; // A table (row major) of Tensors - BufferFlags buffer_flags_; // bit mask for various buffer properties -}; -} // namespace dataset -} // namespace mindspore - -#endif // MINDSPORE_CCSRC_MINDDATA_DATASET_ENGINE_DATA_BUFFER_H_ diff --git a/mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc b/mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc index 4139322b64..234a98ffe7 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/dataset_iterator.cc @@ -19,7 +19,7 @@ #include "minddata/dataset/core/data_type.h" #include "minddata/dataset/core/tensor.h" #include "minddata/dataset/core/tensor_shape.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/util/status.h" #include "minddata/dataset/engine/datasetops/dataset_op.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/barrier_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/barrier_op.cc index 935f93152d..44633e11a4 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/barrier_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/barrier_op.cc @@ -17,7 +17,7 @@ #include #include #include "minddata/dataset/include/constants.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/db_connector.h" #include "minddata/dataset/core/config_manager.h" #include "minddata/dataset/core/global_context.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/barrier_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/barrier_op.h index 3df5fee858..7eb30f9073 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/barrier_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/barrier_op.h @@ -28,7 +28,6 @@ namespace mindspore { namespace dataset { // Forward declare -class DataBuffer; class ExecutionTree; // BarrierOp class implements the Barrier operator. It will block sending of rows until a signal has diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc index c7c19ba5dd..34e42a6965 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.cc @@ -21,7 +21,7 @@ #ifdef ENABLE_PYTHON #include "minddata/dataset/core/pybind_support.h" #endif -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/db_connector.h" #include "minddata/dataset/kernels/data/data_utils.h" #include "minddata/dataset/util/status.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h index 7f261a22d0..54dde54583 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/batch_op.h @@ -34,7 +34,6 @@ namespace mindspore { namespace dataset { -class DataBuffer; using PadInfo = std::map>>; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/bucket_batch_by_length_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/bucket_batch_by_length_op.h index 2c2c515154..90f0cc9513 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/bucket_batch_by_length_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/bucket_batch_by_length_op.h @@ -32,7 +32,6 @@ namespace mindspore { namespace dataset { -class DataBuffer; class BucketBatchByLengthOp : public PipelineOp { public: diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_base_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_base_op.cc index 07914ef71f..bab98af432 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_base_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_base_op.cc @@ -94,11 +94,9 @@ Status CacheBase::FetchSamplesToWorkers() { keys.reserve(1); std::vector prefetch_keys; prefetch_keys.reserve(prefetch_size_); - std::unique_ptr sampler_buffer; - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); - while (!sampler_buffer->eoe()) { - TensorRow sample_row; - RETURN_IF_NOT_OK(sampler_buffer->PopRow(&sample_row)); + TensorRow sample_row; + RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row)); + while (!sample_row.eoe()) { std::shared_ptr sample_ids = sample_row[0]; for (auto itr = sample_ids->begin(); itr != sample_ids->end(); itr++) { ++row_cnt_; @@ -115,7 +113,7 @@ Status CacheBase::FetchSamplesToWorkers() { prefetch_keys.clear(); } } - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); + RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row)); } // Deal with any partial keys left. if (!prefetch_keys.empty()) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.cc index d96b68e2dc..96e48f2476 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.cc @@ -95,7 +95,7 @@ void CacheLookupOp::SamplerPrint(std::ostream &out, bool show_all) const { // Then add our own info if any } } -Status CacheLookupOp::GetNextSample(std::unique_ptr *out_buffer) { +Status CacheLookupOp::GetNextSample(TensorRow *out) { std::vector cache_miss; RETURN_IF_NOT_OK(keys_miss_->Pop(0, &cache_miss)); // Ignore the case we have no cache miss, we can't return empty samples. @@ -104,19 +104,16 @@ Status CacheLookupOp::GetNextSample(std::unique_ptr *out_buffer) { } // Special code for eoe if (cache_miss.at(0) == eoe_row_id) { - *out_buffer = std::make_unique(0, DataBuffer::kDeBFlagEOE); + *out = std::move(TensorRow(TensorRow::kFlagEOE)); } else { std::shared_ptr sample_ts; RETURN_IF_NOT_OK(CreateSamplerTensor(&sample_ts, cache_miss.size())); - (*out_buffer) = std::make_unique(0, DataBuffer::kDeBFlagNone); auto idPtr = sample_ts->begin(); for (auto i = 0; i < cache_miss.size(); ++i) { *idPtr = cache_miss.at(i); ++idPtr; } - TensorRow row; - row.push_back(sample_ts); - (*out_buffer)->set_tensor_table(std::make_unique(1, row)); + *out = {sample_ts}; } return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.h index 7e867d28f5..9ebafed392 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_lookup_op.h @@ -96,7 +96,7 @@ class CacheLookupOp : public CacheBase, public SamplerRT { Status ResetSampler() override; Status HandshakeRandomAccessOp(const RandomAccessOp *op) override; Status InitSampler() override; - Status GetNextSample(std::unique_ptr *out_buffer) override; + Status GetNextSample(TensorRow *out) override; void Print(std::ostream &out, bool show_all) const override; void SamplerPrint(std::ostream &out, bool show_all) const override; bool AllowCacheMiss() override { return true; } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_op.cc index c05ac0f961..449b829dda 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/cache_op.cc @@ -22,7 +22,7 @@ #include "minddata/dataset/core/global_context.h" #include "minddata/dataset/engine/datasetops/repeat_op.h" #include "minddata/dataset/engine/dataset_iterator.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/util/log_adapter.h" #include "minddata/dataset/util/task_manager.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.cc index 557d32a6f7..4f2548688b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/concat_op.cc @@ -19,7 +19,7 @@ #include #include "minddata/dataset/core/config_manager.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/db_connector.h" #include "utils/ms_utils.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc index 9234ae8858..67a508a31b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.cc @@ -26,7 +26,7 @@ #include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/engine/datasetops/device_queue_op.h" #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/db_connector.h" #ifndef ENABLE_ANDROID #include "utils/system/crc32c.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h index 26b5d9f0c9..5b4be48c8c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/dataset_op.h @@ -59,8 +59,6 @@ constexpr char kZipOp[] = "ZipOp"; // Forward declare class ExecutionTree; -class DataBuffer; - class NodePass; class SamplerRT; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc index 8095c55525..f9edd2807c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/device_queue_op.cc @@ -19,7 +19,7 @@ #include #include #include -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/dataset_iterator.h" #include "minddata/dataset/util/status.h" #include "minddata/dataset/util/task_manager.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/epoch_ctrl_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/epoch_ctrl_op.cc index bf9652c43c..415f1a9170 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/epoch_ctrl_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/epoch_ctrl_op.cc @@ -18,7 +18,7 @@ #include #include "minddata/dataset/engine/datasetops/epoch_ctrl_op.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/util/log_adapter.h" namespace mindspore { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.cc index 455ab7c3b1..6a5ad913ee 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/filter_op.cc @@ -22,7 +22,7 @@ #include "minddata/dataset/core/config_manager.h" #include "minddata/dataset/core/global_context.h" #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/kernels/tensor_op.h" #include "minddata/dataset/util/log_adapter.h" #include "minddata/dataset/util/task_manager.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc index d49f5344a0..3674fa5b1f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.cc @@ -23,7 +23,7 @@ #include "minddata/dataset/core/config_manager.h" #include "minddata/dataset/include/constants.h" #include "minddata/dataset/core/global_context.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/datasetops/map_op/cpu_map_job.h" #include "minddata/dataset/engine/datasetops/map_op/gpu_map_job.h" #include "minddata/dataset/engine/execution_tree.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h index 2ed9fdf972..f690a796d1 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/map_op/map_op.h @@ -34,7 +34,6 @@ namespace mindspore { namespace dataset { // Forward declare -class DataBuffer; class ExecutionTree; // MapOp class implements the Map operator. It will apply a list of operations to each record specified by column names. diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.h index 442b7e7206..3791e80b4c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/parallel_op.h @@ -30,8 +30,6 @@ namespace dataset { constexpr int32_t kEndOfActions = -1; // Forward declares -class DataBuffer; - class DbConnector; // A ParallelOp provides a multi-threaded DatasetOp diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/pipeline_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/pipeline_op.h index 0af0b0d889..5ff03052ef 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/pipeline_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/pipeline_op.h @@ -26,8 +26,6 @@ namespace dataset { // forward declare class ExecutionTree; -class DataBuffer; - class PipelineOp : public DatasetOp { public: // Constructor diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.cc index 57742b8ead..f3296595b3 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/project_op.cc @@ -22,7 +22,7 @@ #include #include #include -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/util/log_adapter.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/rename_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/rename_op.cc index f5a5eb337b..bcb7b764f6 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/rename_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/rename_op.cc @@ -22,7 +22,7 @@ #include "minddata/dataset/core/config_manager.h" #include "minddata/dataset/include/constants.h" #include "minddata/dataset/core/global_context.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/db_connector.h" #include "minddata/dataset/util/log_adapter.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/rename_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/rename_op.h index 6898026a49..04abbf369a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/rename_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/rename_op.h @@ -27,9 +27,6 @@ namespace mindspore { namespace dataset { -// forward declare -class DataBuffer; - class RenameOp : public PipelineOp { public: // The nested builder class inside of the RenameOp is used to help manage all of diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/repeat_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/repeat_op.cc index 190885710c..1b5fd5e6ff 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/repeat_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/repeat_op.cc @@ -19,7 +19,7 @@ #include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/engine/datasetops/repeat_op.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/util/log_adapter.h" namespace mindspore { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.cc index 4ddeed74a7..d6638419cd 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.cc @@ -25,7 +25,7 @@ #include "minddata/dataset/core/config_manager.h" #include "minddata/dataset/engine/datasetops/shuffle_op.h" #include "minddata/dataset/engine/dataset_iterator.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/db_connector.h" #include "minddata/dataset/util/log_adapter.h" #include "minddata/dataset/util/random.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.h index 253d3d65e9..c1d81e6182 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/shuffle_op.h @@ -37,8 +37,6 @@ class ExecutionTree; class DbConnector; -class DataBuffer; - class ShuffleOp : public PipelineOp { // Shuffle buffer state flags // diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.cc index 0249408692..c2559fb1ee 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/skip_op.cc @@ -18,7 +18,7 @@ #include #include "minddata/dataset/core/config_manager.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/dataset_iterator.h" #include "minddata/dataset/engine/datasetops/skip_op.h" #include "minddata/dataset/engine/db_connector.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc index 229ef94e20..1fe6ae1e92 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.cc @@ -487,10 +487,8 @@ Status AlbumOp::GetNextRowPullMode(TensorRow *row) { if (image_rows_.empty()) PrescanEntry(); if (sample_ids_ == nullptr) { RETURN_IF_NOT_OK(this->InitSampler()); - std::unique_ptr sample_buffer; TensorRow sample_row; - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_buffer)); - RETURN_IF_NOT_OK(sample_buffer->PopRow(&sample_row)); + RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row)); sample_ids_ = sample_row[0]; } if (curr_row_ + 1 > sample_ids_->Size()) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h index 9442ea86bf..91bc88cf9f 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/album_op.h @@ -27,7 +27,7 @@ #include #include #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" #include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.h index 993638d95a..335996538b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/cifar_op.h @@ -23,7 +23,7 @@ #include #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" #include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.h index 29e04ee396..46dfc41c32 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/coco_op.h @@ -24,7 +24,7 @@ #include #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" #include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.cc index 08f5a6abc5..ff0b5de6e9 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/generator_op.cc @@ -16,7 +16,7 @@ #include "minddata/dataset/engine/datasetops/source/generator_op.h" #include #include "minddata/dataset/core/global_context.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/execution_tree.h" #include "minddata/dataset/util/task_manager.h" @@ -219,12 +219,10 @@ Status GeneratorOp::operator()() { if (eoe) { // Push out EOE upon StopIteration exception from generator MS_LOG(DEBUG) << "Generator operator sends out EOE."; - std::unique_ptr eoe_buffer = std::make_unique(0, DataBuffer::kDeBFlagEOE); RETURN_IF_NOT_OK(out_connector_->SendEOE()); if (IsLastIteration()) { // If last repeat or not repeated, push out EOF and exit master loop MS_LOG(DEBUG) << "Generator operator sends out EOF."; - std::unique_ptr eof_buffer = std::make_unique(0, DataBuffer::kDeBFlagEOF); RETURN_IF_NOT_OK(out_connector_->SendEOF()); MS_LOG(DEBUG) << "Generator operator main execution loop complete."; eof = true; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h index 32f742e47b..2b7b837934 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/image_folder_op.h @@ -26,7 +26,7 @@ #include #include #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" #include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.h index 76562b2d0b..6e2146a645 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/manifest_op.h @@ -23,7 +23,7 @@ #include #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" #include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.cc index b2d63cbc41..0eb89a0795 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.cc @@ -30,13 +30,11 @@ MappableLeafOp::MappableLeafOp(int32_t num_wkrs, int32_t queue_size, std::shared // Main logic, Register Queue with TaskGroup, launch all threads and do the functor's work Status MappableLeafOp::operator()() { RETURN_IF_NOT_OK(LaunchThreadsAndInitOp()); - std::unique_ptr sampler_buffer; - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); + TensorRow sample_row; + RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row)); int64_t row_cnt = 0; while (true) { // each iteration is 1 epoch, breaks when IsLastIteration() is true - while (sampler_buffer->eoe() == false) { - TensorRow sample_row; - RETURN_IF_NOT_OK(sampler_buffer->PopRow(&sample_row)); + while (sample_row.eoe() == false) { std::shared_ptr sample_ids = sample_row[0]; for (auto itr = sample_ids->begin(); itr != sample_ids->end(); ++itr) { if ((*itr) >= num_rows_) { @@ -46,7 +44,7 @@ Status MappableLeafOp::operator()() { RETURN_IF_NOT_OK( io_block_queues_[row_cnt++ % num_workers_]->Add(std::make_unique(*itr, IOBlock::kDeIoBlockNone))); } - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); + RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row)); } if (IsLastIteration()) { std::unique_ptr eoe_block = std::make_unique(IOBlock::kDeIoBlockFlagEoe); @@ -71,7 +69,7 @@ Status MappableLeafOp::operator()() { // If not the last repeat, self-reset and go to loop again. if (!IsLastIteration()) { RETURN_IF_NOT_OK(Reset()); - RETURN_IF_NOT_OK(sampler_->GetNextSample(&sampler_buffer)); + RETURN_IF_NOT_OK(sampler_->GetNextSample(&sample_row)); } UpdateRepeatAndEpochCounter(); } @@ -90,7 +88,7 @@ Status MappableLeafOp::InitSampler() { return Status::OK(); } -// contains the main logic of pulling a IOBlock from IOBlockQueue, load a buffer and push the buffer to out_connector_ +// contains the main logic of pulling a IOBlock from IOBlockQueue, load a row and push the row to out_connector_ // IMPORTANT: 1 IOBlock produces 1 row Status MappableLeafOp::WorkerEntry(int32_t worker_id) { TaskManager::FindMe()->Post(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.h index fd1356361b..a53ab15a9d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mappable_leaf_op.h @@ -26,7 +26,7 @@ #include #include #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc index 246c8714ee..dfde1a304a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.cc @@ -25,7 +25,7 @@ #include "minddata/dataset/core/config_manager.h" #include "minddata/dataset/include/constants.h" #include "minddata/dataset/core/global_context.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/datasetops/dataset_op.h" #include "minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h" #include "minddata/dataset/engine/db_connector.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h index 0cf54d9863..c9004196d8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mindrecord_op.h @@ -42,7 +42,6 @@ namespace dataset { // Forward declares template class Queue; -class DataBuffer; using mindrecord::ShardOperator; using mindrecord::ShardReader; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.h index 03b200c34d..3b28e61c6d 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/mnist_op.h @@ -24,7 +24,7 @@ #include #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" #include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.cc index ddd043097d..9b6731648b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.cc @@ -19,7 +19,6 @@ #include #include -#include "minddata/dataset/engine/data_buffer.h" #include "minddata/dataset/util/random.h" namespace mindspore { @@ -63,15 +62,15 @@ Status DistributedSamplerRT::InitSampler() { if (offset_ != -1 || !even_dist_) { if (offset_ == -1) offset_ = 0; - samples_per_buffer_ = (num_rows_ + offset_) / num_devices_; + samples_per_tensor_ = (num_rows_ + offset_) / num_devices_; int64_t remainder = (num_rows_ + offset_) % num_devices_; - if (device_id_ < remainder) samples_per_buffer_++; - if (device_id_ < offset_) samples_per_buffer_--; + if (device_id_ < remainder) samples_per_tensor_++; + if (device_id_ < offset_) samples_per_tensor_--; } else { offset_ = 0; - samples_per_buffer_ = (num_rows_ + num_devices_ - 1) / num_devices_; // equals to ceil(num_rows/num_devices) + samples_per_tensor_ = (num_rows_ + num_devices_ - 1) / num_devices_; // equals to ceil(num_rows/num_devices) } - samples_per_buffer_ = num_samples_ < samples_per_buffer_ ? num_samples_ : samples_per_buffer_; + samples_per_tensor_ = num_samples_ < samples_per_tensor_ ? num_samples_ : samples_per_tensor_; if (shuffle_) { shuffle_vec_.reserve(num_rows_); for (int64_t i = 0; i < num_rows_; i++) { @@ -79,51 +78,48 @@ Status DistributedSamplerRT::InitSampler() { } std::shuffle(shuffle_vec_.begin(), shuffle_vec_.end(), rnd_); } - if (!samples_per_buffer_) non_empty_ = false; + if (!samples_per_tensor_) non_empty_ = false; is_initialized = true; return Status::OK(); } -Status DistributedSamplerRT::GetNextSample(std::unique_ptr *out_buffer) { - if (cnt_ > samples_per_buffer_) { +Status DistributedSamplerRT::GetNextSample(TensorRow *out) { + if (cnt_ > samples_per_tensor_) { RETURN_STATUS_UNEXPECTED( "Number of samples(cnt) that have already been filled in to buffer should be less than or " "equal to samples_per_buffer, but got cnt: " + - std::to_string(cnt_) + ", samples_per_buffer: " + std::to_string(samples_per_buffer_)); - } else if (cnt_ == samples_per_buffer_ && (non_empty_ || !even_dist_)) { - (*out_buffer) = std::make_unique(0, DataBuffer::kDeBFlagEOE); - if (!samples_per_buffer_) { + std::to_string(cnt_) + ", samples_per_buffer: " + std::to_string(samples_per_tensor_)); + } else if (cnt_ == samples_per_tensor_ && (non_empty_ || !even_dist_)) { + (*out) = TensorRow(TensorRow::kFlagEOE); + if (!samples_per_tensor_) { non_empty_ = false; } - } else if (!samples_per_buffer_ && !non_empty_) { + } else if (!samples_per_tensor_ && !non_empty_) { // If the buffer is empty, we add samples with subscript 0 in the current dataset. // This step is to make up for the solution that the code default buffer is not empty before. // We will remove this value in the concat phase non_empty_ = true; - (*out_buffer) = std::make_unique(cnt_, DataBuffer::kDeBFlagNone); std::shared_ptr sample_ids; RETURN_IF_NOT_OK(CreateSamplerTensor(&sample_ids, 1)); auto id_ptr = sample_ids->begin(); // add index 0 *id_ptr = 0; - TensorRow row(1, sample_ids); - (*out_buffer)->set_tensor_table(std::make_unique(1, row)); + (*out) = {sample_ids}; } else { if (HasChildSampler()) { RETURN_IF_NOT_OK(child_[0]->GetNextSample(&child_ids_)); } - (*out_buffer) = std::make_unique(cnt_, DataBuffer::kDeBFlagNone); std::shared_ptr sample_ids; - RETURN_IF_NOT_OK(CreateSamplerTensor(&sample_ids, samples_per_buffer_)); + RETURN_IF_NOT_OK(CreateSamplerTensor(&sample_ids, samples_per_tensor_)); auto id_ptr = sample_ids->begin(); bool flag_add_1 = false; - while (cnt_ < samples_per_buffer_ && id_ptr != sample_ids->end()) { + while (cnt_ < samples_per_tensor_ && id_ptr != sample_ids->end()) { int64_t middle_value = num_devices_ * cnt_ + device_id_ - offset_; // if index < 0, we move back one place if (middle_value < 0) { - samples_per_buffer_++; + samples_per_tensor_++; cnt_++; flag_add_1 = true; middle_value = num_devices_ * cnt_ + device_id_ - offset_; @@ -145,17 +141,16 @@ Status DistributedSamplerRT::GetNextSample(std::unique_ptr *out_buff // If 1 was added before, we will cut off 1 here if (flag_add_1) { - samples_per_buffer_--; + samples_per_tensor_--; cnt_--; } - TensorRow row(1, sample_ids); - (*out_buffer)->set_tensor_table(std::make_unique(1, row)); + (*out) = {sample_ids}; } return Status::OK(); } Status DistributedSamplerRT::ResetSampler() { - CHECK_FAIL_RETURN_UNEXPECTED(cnt_ == samples_per_buffer_, "ERROR Reset() called early/late"); + CHECK_FAIL_RETURN_UNEXPECTED(cnt_ == samples_per_tensor_, "ERROR Reset() called early/late"); cnt_ = 0; if (shuffle_ == true) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.h index 90460a2c1b..40b9972391 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.h @@ -50,7 +50,7 @@ class DistributedSamplerRT : public SamplerRT { /// \param std::unique_ptr * pBuffer /// \param int32_t workerId /// \return Status code - Status GetNextSample(std::unique_ptr *out_buffer) override; + Status GetNextSample(TensorRow *out) override; /// Init sampler, called by base class or python Status InitSampler() override; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.cc index 43e2b3f0e1..dacac1b136 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.cc @@ -52,7 +52,7 @@ Status PKSamplerRT::InitSampler() { num_samples_ = num_rows_; } - samples_per_buffer_ = (samples_per_buffer_ > num_samples_) ? num_samples_ : samples_per_buffer_; + samples_per_tensor_ = (samples_per_tensor_ > num_samples_) ? num_samples_ : samples_per_tensor_; if (shuffle_ == true) { std::shuffle(labels_.begin(), labels_.end(), rnd_); } else { @@ -65,19 +65,18 @@ Status PKSamplerRT::InitSampler() { return Status::OK(); } -Status PKSamplerRT::GetNextSample(std::unique_ptr *out_buffer) { +Status PKSamplerRT::GetNextSample(TensorRow *out) { if (next_id_ > num_samples_ || num_samples_ == 0) { RETURN_STATUS_UNEXPECTED("Index must be less than or equal to num_samples, but got: " + std::to_string(next_id_)); } else if (next_id_ == num_samples_) { - (*out_buffer) = std::make_unique(0, DataBuffer::kDeBFlagEOE); + (*out) = TensorRow(TensorRow::kFlagEOE); } else { if (HasChildSampler()) { RETURN_IF_NOT_OK(child_[0]->GetNextSample(&child_ids_)); } - (*out_buffer) = std::make_unique(next_id_, DataBuffer::kDeBFlagNone); std::shared_ptr sample_ids; - int64_t last_id = (samples_per_buffer_ + next_id_ > num_samples_) ? num_samples_ : samples_per_buffer_ + next_id_; + int64_t last_id = (samples_per_tensor_ + next_id_ > num_samples_) ? num_samples_ : samples_per_tensor_ + next_id_; RETURN_IF_NOT_OK(CreateSamplerTensor(&sample_ids, last_id - next_id_)); auto id_ptr = sample_ids->begin(); CHECK_FAIL_RETURN_UNEXPECTED(samples_per_class_ != 0, "samples cannot be zero."); @@ -95,8 +94,7 @@ Status PKSamplerRT::GetNextSample(std::unique_ptr *out_buffer) { id_ptr++; } - TensorRow row(1, sample_ids); - (*out_buffer)->set_tensor_table(std::make_unique(1, row)); + (*out) = {sample_ids}; } return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.h index e1163505d9..a8f611784c 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/pk_sampler.h @@ -41,7 +41,7 @@ class PKSamplerRT : public SamplerRT { // NOT YET FINISHED // @param std::unique_ptr *out_buffer) override; + Status GetNextSample(TensorRow *out) override; // first handshake between leaf source op and Sampler. This func will determine the amount of data // in the dataset that we can sample from. diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.cc index 4c283ca272..04f5dd3c66 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.cc @@ -23,9 +23,9 @@ namespace dataset { PythonSamplerRT::PythonSamplerRT(int64_t num_samples, py::object py_sampler_instance, int64_t samples_per_buffer) : SamplerRT(num_samples, samples_per_buffer), py_sampler_instance(py_sampler_instance), need_to_reset_(false) {} -Status PythonSamplerRT::GetNextSample(std::unique_ptr *out_buffer) { +Status PythonSamplerRT::GetNextSample(TensorRow *out) { if (need_to_reset_) { - (*out_buffer) = std::make_unique(0, DataBuffer::kDeBFlagEOE); + (*out) = TensorRow(TensorRow::kFlagEOE); } else { if (HasChildSampler()) { RETURN_IF_NOT_OK(child_[0]->GetNextSample(&child_ids_)); @@ -34,7 +34,6 @@ Status PythonSamplerRT::GetNextSample(std::unique_ptr *out_buffer) { std::shared_ptr sample_ids; { py::gil_scoped_acquire gil_acquire; - (*out_buffer) = std::make_unique(0, DataBuffer::kDeBFlagNone); if (Py_IsInitialized() == 0) { return Status(StatusCode::kMDPythonInterpreterFailure, "Python Interpreter is finalized"); } @@ -57,8 +56,7 @@ Status PythonSamplerRT::GetNextSample(std::unique_ptr *out_buffer) { "Invalid data, python sampler iterator should return an integer index."); } } - TensorRow row(1, sample_ids); - (*out_buffer)->set_tensor_table(std::make_unique(1, row)); + (*out) = {sample_ids}; need_to_reset_ = true; } return Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.h index 5b22d288f5..c4f04abcf3 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/python_sampler.h @@ -48,7 +48,7 @@ class PythonSamplerRT : public SamplerRT { // @param std::unique_ptr pBuffer - Buffer to be returned to corresponding Dataset Op // @param int32_t workerId - not meant to be used // @return Status The status code returned - Status GetNextSample(std::unique_ptr *out_buffer) override; + Status GetNextSample(TensorRow *out) override; // Printer for debugging purposes. // @param out - output stream to write to diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.cc index 489c6cb811..6773fd950a 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.cc @@ -31,19 +31,18 @@ RandomSamplerRT::RandomSamplerRT(int64_t num_samples, bool replacement, bool res dist(nullptr), reshuffle_each_epoch_(reshuffle_each_epoch) {} -Status RandomSamplerRT::GetNextSample(std::unique_ptr *out_buffer) { +Status RandomSamplerRT::GetNextSample(TensorRow *out) { if (next_id_ > num_samples_) { RETURN_STATUS_UNEXPECTED("RandomSampler Internal Error"); } else if (next_id_ == num_samples_) { - (*out_buffer) = std::make_unique(0, DataBuffer::kDeBFlagEOE); + (*out) = TensorRow(TensorRow::kFlagEOE); } else { if (HasChildSampler()) { RETURN_IF_NOT_OK(child_[0]->GetNextSample(&child_ids_)); } - (*out_buffer) = std::make_unique(next_id_, DataBuffer::kDeBFlagNone); std::shared_ptr sampleIds; - int64_t last_id = std::min(samples_per_buffer_ + next_id_, num_samples_); + int64_t last_id = std::min(samples_per_tensor_ + next_id_, num_samples_); RETURN_IF_NOT_OK(CreateSamplerTensor(&sampleIds, last_id - next_id_)); auto id_ptr = sampleIds->begin(); @@ -62,8 +61,7 @@ Status RandomSamplerRT::GetNextSample(std::unique_ptr *out_buffer) { *(id_ptr + static_cast(i)) = sampled_id; } next_id_ = last_id; - TensorRow row(1, sampleIds); - (*out_buffer)->set_tensor_table(std::make_unique(1, row)); + (*out) = {sampleIds}; } return Status::OK(); } @@ -81,7 +79,7 @@ Status RandomSamplerRT::InitSampler() { num_samples_ > 0 && num_rows_ > 0, "Invalid parameter, num_samples & num_rows must be greater than 0, but got num_samples: " + std::to_string(num_samples_) + ", num_rows: " + std::to_string(num_rows_)); - samples_per_buffer_ = samples_per_buffer_ > num_samples_ ? num_samples_ : samples_per_buffer_; + samples_per_tensor_ = samples_per_tensor_ > num_samples_ ? num_samples_ : samples_per_tensor_; rnd_.seed(seed_); if (!replacement_) { diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.h index 7ff022b3c3..98fa809479 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/random_sampler.h @@ -41,7 +41,7 @@ class RandomSamplerRT : public SamplerRT { // @param std::unique_ptr pBuffer - Buffer to be returned to StorageOp // @param int32_t workerId - not meant to be used // @return Status The status code returned - Status GetNextSample(std::unique_ptr *out_buffer) override; + Status GetNextSample(TensorRow *out) override; // meant to be called by base class or python Status InitSampler() override; diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.cc index ab810511d5..ee4b730ae4 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.cc @@ -36,7 +36,7 @@ Status RandomAccessOp::GetNumRowsInDataset(int64_t *num) const { SamplerRT::SamplerRT(int64_t num_samples, int64_t samples_per_buffer) : num_rows_(0), num_samples_(num_samples), - samples_per_buffer_(samples_per_buffer), + samples_per_tensor_(samples_per_buffer), col_desc_(nullptr), is_initialized(false) {} @@ -91,22 +91,19 @@ void SamplerRT::SamplerPrint(std::ostream &out, bool show_all) const { #ifdef ENABLE_PYTHON Status SamplerRT::GetAllIdsThenReset(py::array *data) { - std::unique_ptr db; std::shared_ptr sample_ids; TensorRow sample_row; - // A call to derived class to get sample ids wrapped inside a buffer - RETURN_IF_NOT_OK(GetNextSample(&db)); - // Get the only tensor inside the buffer that contains the actual SampleIds for the entire epoch - RETURN_IF_NOT_OK(db->GetRow(0, &sample_row)); + // Get the only tensor inside the row that contains the actual SampleIds for the entire epoch + RETURN_IF_NOT_OK(GetNextSample(&sample_row)); sample_ids = sample_row[0]; // check this buffer is not a ctrl buffer - CHECK_FAIL_RETURN_UNEXPECTED(db->buffer_flags() == DataBuffer::kDeBFlagNone, "ERROR ctrl buffer received"); + CHECK_FAIL_RETURN_UNEXPECTED(sample_row.Flags() == TensorRow::kFlagNone, "ERROR ctrl row received"); // perform error checking! Next buffer supposed to be EOE since last one already contains all ids for current epoch - RETURN_IF_NOT_OK(GetNextSample(&db)); - CHECK_FAIL_RETURN_UNEXPECTED(db->eoe(), "ERROR Non EOE received"); + RETURN_IF_NOT_OK(GetNextSample(&sample_row)); + CHECK_FAIL_RETURN_UNEXPECTED(sample_row.eoe(), "ERROR Non EOE received"); // Reset Sampler since this is the end of the epoch RETURN_IF_NOT_OK(ResetSampler()); @@ -178,13 +175,11 @@ Status SamplerRT::AddChild(std::shared_ptr child) { bool SamplerRT::HasChildSampler() { return !child_.empty(); } Status SamplerRT::GetAssociatedChildId(int64_t *out_associated_id, int64_t id) { - if (child_ids_ == nullptr) { + if (child_ids_.empty()) { RETURN_STATUS_UNEXPECTED("Trying to get associated child id, but there are no child ids!"); } - TensorRow sample_row; - RETURN_IF_NOT_OK(child_ids_->GetRow(0, &sample_row)); - std::shared_ptr sample_ids = sample_row[0]; + std::shared_ptr sample_ids = child_ids_[0]; RETURN_IF_NOT_OK(sample_ids->GetItemAt(out_associated_id, {id})); return Status::OK(); } diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.h index 3a51ffcfc4..b9045dc165 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sampler.h @@ -23,7 +23,7 @@ #include #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/dataset_op.h" @@ -66,7 +66,7 @@ class SamplerRT { // @param int64_t samplesPerBuffer: Num of Sampler Ids to fetch via 1 GetNextBuffer call SamplerRT(int64_t num_samples, int64_t samples_per_buffer); - SamplerRT(const SamplerRT &s) : SamplerRT(s.num_samples_, s.samples_per_buffer_) {} + SamplerRT(const SamplerRT &s) : SamplerRT(s.num_samples_, s.samples_per_tensor_) {} // default destructor ~SamplerRT() = default; @@ -76,7 +76,7 @@ class SamplerRT { // @param std::unique_ptr pBuffer - Buffer to be returned to StorageOp // @param int32_t workerId - not meant to be used // @return Status The status code returned - virtual Status GetNextSample(std::unique_ptr *out_buffer) = 0; + virtual Status GetNextSample(TensorRow *out) = 0; // This function only called by python layer. Not needed by Android. #ifdef ENABLE_PYTHON @@ -170,10 +170,10 @@ class SamplerRT { int64_t num_samples_; bool is_initialized; - int64_t samples_per_buffer_; + int64_t samples_per_tensor_; std::unique_ptr col_desc_; std::vector> child_; // Child nodes - std::unique_ptr child_ids_; + TensorRow child_ids_; }; } // namespace dataset } // namespace mindspore diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.cc index 7668130007..16cc0ae709 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.cc @@ -24,23 +24,22 @@ namespace dataset { SequentialSamplerRT::SequentialSamplerRT(int64_t num_samples, int64_t start_index, int64_t samples_per_buffer) : SamplerRT(num_samples, samples_per_buffer), current_id_(start_index), start_index_(start_index), id_count_(0) {} -Status SequentialSamplerRT::GetNextSample(std::unique_ptr *out_buffer) { +Status SequentialSamplerRT::GetNextSample(TensorRow *out) { if (id_count_ > num_samples_) { RETURN_STATUS_UNEXPECTED("SequentialSampler Internal Error"); } else if (id_count_ == num_samples_) { - (*out_buffer) = std::make_unique(0, DataBuffer::kDeBFlagEOE); + (*out) = TensorRow(TensorRow::kFlagEOE); } else { if (HasChildSampler()) { RETURN_IF_NOT_OK(child_[0]->GetNextSample(&child_ids_)); } - (*out_buffer) = std::make_unique(current_id_, DataBuffer::kDeBFlagNone); std::shared_ptr sampleIds; // Compute how many ids are left to pack, and pack this amount into a new buffer. Respect the setting for // samples per buffer though. int64_t remaining_ids = num_samples_ - id_count_; - int64_t num_elements = std::min(remaining_ids, samples_per_buffer_); + int64_t num_elements = std::min(remaining_ids, samples_per_tensor_); RETURN_IF_NOT_OK(CreateSamplerTensor(&sampleIds, num_elements)); auto idPtr = sampleIds->begin(); @@ -57,8 +56,7 @@ Status SequentialSamplerRT::GetNextSample(std::unique_ptr *out_buffe id_count_ += num_elements; // Count the packed ids towards our overall sample count - TensorRow row(1, sampleIds); - (*out_buffer)->set_tensor_table(std::make_unique(1, row)); + (*out) = {sampleIds}; } return Status::OK(); } @@ -83,9 +81,9 @@ Status SequentialSamplerRT::InitSampler() { num_samples_ = available_row_count; } CHECK_FAIL_RETURN_UNEXPECTED( - (num_samples_ > 0 && samples_per_buffer_ > 0) || num_samples_ == 0, - "Invalid parameter, samples_per_buffer must be greater than 0, but got " + std::to_string(samples_per_buffer_)); - samples_per_buffer_ = samples_per_buffer_ > num_samples_ ? num_samples_ : samples_per_buffer_; + (num_samples_ > 0 && samples_per_tensor_ > 0) || num_samples_ == 0, + "Invalid parameter, samples_per_buffer must be greater than 0, but got " + std::to_string(samples_per_tensor_)); + samples_per_tensor_ = samples_per_tensor_ > num_samples_ ? num_samples_ : samples_per_tensor_; is_initialized = true; return Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h index 3d6d76d7df..b40feb07fd 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/sequential_sampler.h @@ -47,7 +47,7 @@ class SequentialSamplerRT : public SamplerRT { // @param std::unique_ptr pBuffer - Buffer to be returned to corresponding Dataset Op // @param int32_t workerId - not meant to be used // @return Status The status code returned - Status GetNextSample(std::unique_ptr *out_buffer) override; + Status GetNextSample(TensorRow *out) override; /// \brief Recursively calls this function on its children to get the actual number of samples on a tree of samplers /// \note This is not a getter for num_samples_. For example, if num_samples_ is 0 or if it's smaller than num_rows, diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.cc index 1fe703a8d8..4f88332177 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.cc @@ -39,8 +39,8 @@ Status SubsetSamplerRT::InitSampler() { num_samples_ = static_cast(indices_.size()); } - if (samples_per_buffer_ > num_samples_) { - samples_per_buffer_ = num_samples_; + if (samples_per_tensor_ > num_samples_) { + samples_per_tensor_ = num_samples_; } is_initialized = true; @@ -61,19 +61,18 @@ Status SubsetSamplerRT::ResetSampler() { } // Get the sample ids. -Status SubsetSamplerRT::GetNextSample(std::unique_ptr *out_buffer) { +Status SubsetSamplerRT::GetNextSample(TensorRow *out) { // All samples have been drawn if (sample_id_ == num_samples_) { - (*out_buffer) = std::make_unique(buffer_id_++, DataBuffer::kDeBFlagEOE); + (*out) = TensorRow(TensorRow::kFlagEOE); } else { if (HasChildSampler()) { RETURN_IF_NOT_OK(child_[0]->GetNextSample(&child_ids_)); } - (*out_buffer) = std::make_unique(buffer_id_++, DataBuffer::kDeBFlagNone); std::shared_ptr outputIds; - int64_t last_id = sample_id_ + samples_per_buffer_; + int64_t last_id = sample_id_ + samples_per_tensor_; // Handling the return all samples at once, and when last draw is not a full batch. if (last_id > num_samples_) { last_id = num_samples_; @@ -101,8 +100,7 @@ Status SubsetSamplerRT::GetNextSample(std::unique_ptr *out_buffer) { sample_id_++; } - // Create a TensorTable from that single tensor and push into DataBuffer - (*out_buffer)->set_tensor_table(std::make_unique(1, TensorRow(1, outputIds))); + (*out) = {outputIds}; } return Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.h index a2cb2cf53a..7533ce4d1b 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/subset_sampler.h @@ -47,9 +47,9 @@ class SubsetSamplerRT : public SamplerRT { Status ResetSampler() override; /// Get the sample ids. - /// \param[out] out_buffer The address of a unique_ptr to DataBuffer where the sample ids will be placed. + /// \param[out] out The address of a unique_ptr to DataBuffer where the sample ids will be placed. /// @note the sample ids (int64_t) will be placed in one Tensor and be placed into pBuffer. - Status GetNextSample(std::unique_ptr *out_buffer) override; + Status GetNextSample(TensorRow *out) override; /// Printer for debugging purposes. /// \param out - output stream to write to diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.cc index 3168645fa0..555db238b8 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.cc @@ -49,9 +49,9 @@ Status WeightedRandomSamplerRT::InitSampler() { num_rows_ > 0 && num_samples_, "Invalid parameter, num_samples and num_rows must be greater than 0, but got num_rows: " + std::to_string(num_rows_) + ", num_samples: " + std::to_string(num_samples_)); - CHECK_FAIL_RETURN_UNEXPECTED(samples_per_buffer_ > 0, + CHECK_FAIL_RETURN_UNEXPECTED(samples_per_tensor_ > 0, "Invalid parameter, samples_per_buffer must be greater than 0, but got " + - std::to_string(samples_per_buffer_) + ".\n"); + std::to_string(samples_per_tensor_) + ".\n"); if (weights_.size() > static_cast(num_rows_)) { return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, @@ -69,7 +69,7 @@ Status WeightedRandomSamplerRT::InitSampler() { // Initialize random generator with seed from config manager rand_gen_.seed(GetSeed()); - samples_per_buffer_ = (samples_per_buffer_ > num_samples_) ? num_samples_ : samples_per_buffer_; + samples_per_tensor_ = (samples_per_tensor_ > num_samples_) ? num_samples_ : samples_per_tensor_; if (!replacement_) { exp_dist_ = std::make_unique>(1); @@ -117,7 +117,7 @@ Status WeightedRandomSamplerRT::ResetSampler() { } // Get the sample ids. -Status WeightedRandomSamplerRT::GetNextSample(std::unique_ptr *out_buffer) { +Status WeightedRandomSamplerRT::GetNextSample(TensorRow *out) { if (weights_.size() > static_cast(num_rows_)) { return Status(StatusCode::kMDUnexpectedError, __LINE__, __FILE__, "Invalid parameter, size of sample weights must be less than or equal to num of data, " @@ -133,16 +133,15 @@ Status WeightedRandomSamplerRT::GetNextSample(std::unique_ptr *out_b } if (sample_id_ == num_samples_) { - (*out_buffer) = std::make_unique(buffer_id_++, DataBuffer::kDeBFlagEOE); + (*out) = TensorRow(TensorRow::kFlagEOE); } else { if (HasChildSampler()) { RETURN_IF_NOT_OK(child_[0]->GetNextSample(&child_ids_)); } - (*out_buffer) = std::make_unique(buffer_id_++, DataBuffer::kDeBFlagNone); std::shared_ptr outputIds; - int64_t last_id = sample_id_ + samples_per_buffer_; + int64_t last_id = sample_id_ + samples_per_tensor_; // Handling the return all samples at once, and when last draw is not a full batch. if (last_id > num_samples_) { last_id = num_samples_; @@ -178,8 +177,7 @@ Status WeightedRandomSamplerRT::GetNextSample(std::unique_ptr *out_b sample_id_++; } - // Create a TensorTable from that single tensor and push into DataBuffer - (*out_buffer)->set_tensor_table(std::make_unique(1, TensorRow(1, outputIds))); + (*out) = {outputIds}; } return Status::OK(); diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.h index a1b84d0030..f1946e7735 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.h @@ -51,7 +51,7 @@ class WeightedRandomSamplerRT : public SamplerRT { // Get the sample ids. // @param[out] out_buffer The address of a unique_ptr to DataBuffer where the sample ids will be placed. // @note the sample ids (int64_t) will be placed in one Tensor and be placed into pBuffer. - Status GetNextSample(std::unique_ptr *out_buffer) override; + Status GetNextSample(TensorRow *out) override; // Printer for debugging purposes. // @param out - output stream to write to diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.h index 34b34c0a8d..ed16462c21 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/source/voc_op.h @@ -24,7 +24,7 @@ #include "./tinyxml2.h" #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/engine/datasetops/parallel_op.h" #include "minddata/dataset/engine/datasetops/source/mappable_leaf_op.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.cc index 9c10ee7b16..2567445ac9 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/take_op.cc @@ -19,7 +19,7 @@ #include #include "utils/ms_utils.h" #include "minddata/dataset/core/config_manager.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/dataset_iterator.h" #include "minddata/dataset/engine/datasetops/take_op.h" #include "minddata/dataset/engine/db_connector.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.cc b/mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.cc index b76b953640..b65d537b94 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.cc +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.cc @@ -18,7 +18,7 @@ #include #include #include "minddata/dataset/include/constants.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/db_connector.h" #include "minddata/dataset/core/config_manager.h" #include "minddata/dataset/core/global_context.h" diff --git a/mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.h b/mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.h index f14adc70fd..9060016f2e 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.h +++ b/mindspore/ccsrc/minddata/dataset/engine/datasetops/zip_op.h @@ -29,8 +29,6 @@ namespace mindspore { namespace dataset { -// forward declare -class DataBuffer; class ZipOp : public PipelineOp { public: diff --git a/mindspore/ccsrc/minddata/dataset/engine/db_connector.h b/mindspore/ccsrc/minddata/dataset/engine/db_connector.h index d2e50733b8..85f15c3535 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/db_connector.h +++ b/mindspore/ccsrc/minddata/dataset/engine/db_connector.h @@ -18,8 +18,9 @@ #include #include +#include "minddata/dataset/core/tensor_row.h" #include "minddata/dataset/engine/connector.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/include/constants.h" namespace mindspore { diff --git a/mindspore/ccsrc/minddata/dataset/engine/jagged_connector.h b/mindspore/ccsrc/minddata/dataset/engine/jagged_connector.h index e181c564bd..1780bf48f5 100644 --- a/mindspore/ccsrc/minddata/dataset/engine/jagged_connector.h +++ b/mindspore/ccsrc/minddata/dataset/engine/jagged_connector.h @@ -21,7 +21,7 @@ #include #include #include "minddata/dataset/engine/connector.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/util/status.h" #include "minddata/dataset/include/constants.h" diff --git a/mindspore/lite/minddata/CMakeLists.txt b/mindspore/lite/minddata/CMakeLists.txt index bc3c573782..ca94b15480 100644 --- a/mindspore/lite/minddata/CMakeLists.txt +++ b/mindspore/lite/minddata/CMakeLists.txt @@ -181,7 +181,6 @@ if(BUILD_MINDDATA STREQUAL "full") ${MINDDATA_DIR}/engine/datasetops/source/sampler/weighted_random_sampler.cc ${MINDDATA_DIR}/engine/runtime_context.cc ${MINDDATA_DIR}/engine/tree_adapter.cc - ${MINDDATA_DIR}/engine/data_buffer.cc ${MINDDATA_DIR}/engine/execution_tree.cc ${MINDDATA_DIR}/engine/dataset_iterator.cc ${MINDDATA_DIR}/core/tensor_row.cc diff --git a/mindspore/lite/minddata/wrapper/album_op_android.h b/mindspore/lite/minddata/wrapper/album_op_android.h index b63e36b9c7..cb8b3a8d48 100644 --- a/mindspore/lite/minddata/wrapper/album_op_android.h +++ b/mindspore/lite/minddata/wrapper/album_op_android.h @@ -27,7 +27,7 @@ #include #include #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/data_schema.h" #include "minddata/dataset/util/path.h" #include "minddata/dataset/util/status.h" @@ -91,69 +91,69 @@ class AlbumOp { /// \brief Load image to tensor /// \param[in] image_file Image name of file /// \param[in] col_num Column num in schema - /// \param[inout] Tensor to push to + /// \param[in,out] Tensor to push to /// \return Status The error code returned Status LoadImageTensor(const std::string &image_file, uint32_t col_num, TensorPtr *tensor); /// \brief Load vector of ints to tensor, append tensor to tensor /// \param[in] json_obj Json object containing multi-dimensional label /// \param[in] col_num Column num in schema - /// \param[inout] Tensor to push to + /// \param[in,out] Tensor to push to /// \return Status The error code returned Status LoadIntArrayTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorPtr *tensor); /// \brief Load vector of floatss to tensor, append tensor to tensor /// \param[in] json_obj Json object containing array data /// \param[in] col_num Column num in schema - /// \param[inout] Tensor to push to + /// \param[in,out] Tensor to push to /// \return Status The error code returned Status LoadFloatArrayTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorPtr *tensor); /// \brief Load string array into a tensor, append tensor to tensor /// \param[in] json_obj Json object containing string tensor /// \param[in] col_num Column num in schema - /// \param[inout] Tensor to push to + /// \param[in,out] Tensor to push to /// \return Status The error code returned Status LoadStringArrayTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorPtr *tensor); /// \brief Load string into a tensor, append tensor to tensor /// \param[in] json_obj Json object containing string tensor /// \param[in] col_num Column num in schema - /// \param[inout] Tensor to push to + /// \param[in,out] Tensor to push to /// \return Status The error code returned Status LoadStringTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorPtr *tensor); /// \brief Load float value to tensor /// \param[in] json_obj Json object containing float /// \param[in] col_num Column num in schema - /// \param[inout] Tensor to push to + /// \param[in,out] Tensor to push to /// \return Status The error code returned Status LoadFloatTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorPtr *tensor); /// \brief Load int value to tensor /// \param[in] json_obj Json object containing int /// \param[in] col_num Column num in schema - /// \param[inout] Tensor to push to + /// \param[in,out] Tensor to push to /// \return Status The error code returned Status LoadIntTensor(const nlohmann::json &json_obj, uint32_t col_num, TensorPtr *tensor); - /// \brief Load emtpy tensor to tensor + /// \brief Load empty tensor to tensor /// \param[in] col_num Column num in schema - /// \param[inout] Tensor to push to + /// \param[in,out] Tensor to push to /// \return Status The error code returned Status LoadEmptyTensor(uint32_t col_num, TensorPtr *tensor); /// \brief Load id from file name to tensor /// \param[in] file The file name to get ID from /// \param[in] col_num Column num in schema - /// \param[inout] Tensor to push to + /// \param[in,out] Tensor to push to /// \return Status The error code returned Status LoadIDTensor(const std::string &file, uint32_t col_num, TensorPtr *tensor); /// \brief Load a tensor according to a json file /// \param[in] row_id_type row_id - id for this tensor row /// \param[in] ImageColumns file Json file location - /// \param[inout] TensorRow Json content stored into a tensor row + /// \param[in,out] TensorRow Json content stored into a tensor row /// \return Status The error code returned Status LoadTensorRow(row_id_type row_id, const std::string &file, std::unordered_map> *map_row); diff --git a/tests/ut/cpp/dataset/distributed_sampler_test.cc b/tests/ut/cpp/dataset/distributed_sampler_test.cc index 8d36cda72e..73c926552c 100644 --- a/tests/ut/cpp/dataset/distributed_sampler_test.cc +++ b/tests/ut/cpp/dataset/distributed_sampler_test.cc @@ -18,7 +18,7 @@ #include "minddata/dataset/include/constants.h" #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" #include "minddata/dataset/engine/datasetops/source/sampler/distributed_sampler.h" #include "utils/log_adapter.h" @@ -52,11 +52,9 @@ TEST_F(MindDataTestDistributedSampler, TestTwoShardsOne) { DummyRandomAccessOp dummyRandomAccessOp(num_samples); m_sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -65,8 +63,8 @@ TEST_F(MindDataTestDistributedSampler, TestTwoShardsOne) { ASSERT_EQ(4, out.size()); - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); } TEST_F(MindDataTestDistributedSampler, TestTwoShardsTwo) { @@ -78,11 +76,10 @@ TEST_F(MindDataTestDistributedSampler, TestTwoShardsTwo) { DummyRandomAccessOp dummyRandomAccessOp(num_samples); m_sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -91,8 +88,8 @@ TEST_F(MindDataTestDistributedSampler, TestTwoShardsTwo) { ASSERT_EQ(3, out.size()); - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); } TEST_F(MindDataTestDistributedSampler, TestThreeShards) { @@ -104,11 +101,10 @@ TEST_F(MindDataTestDistributedSampler, TestThreeShards) { DummyRandomAccessOp dummyRandomAccessOp(num_samples); m_sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -117,7 +113,6 @@ TEST_F(MindDataTestDistributedSampler, TestThreeShards) { ASSERT_EQ(0, out.size()); - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); } - diff --git a/tests/ut/cpp/dataset/rename_op_test.cc b/tests/ut/cpp/dataset/rename_op_test.cc index bd884ff18e..4d43e8a5a4 100644 --- a/tests/ut/cpp/dataset/rename_op_test.cc +++ b/tests/ut/cpp/dataset/rename_op_test.cc @@ -22,7 +22,7 @@ #include "minddata/dataset/engine/datasetops/rename_op.h" #include "common/common.h" #include "utils/ms_utils.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "gtest/gtest.h" #include "minddata/dataset/core/global_context.h" #include "utils/log_adapter.h" diff --git a/tests/ut/cpp/dataset/stand_alone_samplers_test.cc b/tests/ut/cpp/dataset/stand_alone_samplers_test.cc index 27002a2734..2feb0077ca 100644 --- a/tests/ut/cpp/dataset/stand_alone_samplers_test.cc +++ b/tests/ut/cpp/dataset/stand_alone_samplers_test.cc @@ -39,7 +39,7 @@ class MindDataTestStandAloneSampler : public UT::DatasetOpTesting { protected: class MockStorageOp : public RandomAccessOp { public: - MockStorageOp(int64_t val){ + MockStorageOp(int64_t val) { // row count is in base class as protected member // GetNumRowsInDataset does not need an override, the default from base class is fine. num_rows_ = val; @@ -57,17 +57,17 @@ TEST_F(MindDataTestStandAloneSampler, TestDistributedSampler) { row.push_back(t); } MockStorageOp mock(20); - std::unique_ptr db; std::shared_ptr tensor; int64_t num_samples = 0; + TensorRow sample_row; for (int i = 0; i < 6; i++) { std::shared_ptr sampler = std::make_shared(num_samples, 3, i % 3, (i < 3 ? false : true)); sampler->HandshakeRandomAccessOp(&mock); - sampler->GetNextSample(&db); - db->GetTensor(&tensor, 0, 0); + sampler->GetNextSample(&sample_row); + tensor = sample_row[0]; MS_LOG(DEBUG) << (*tensor); - if(i < 3) { // This is added due to std::shuffle() + if (i < 3) { // This is added due to std::shuffle() EXPECT_TRUE((*tensor) == (*row[i])); } } @@ -83,20 +83,21 @@ TEST_F(MindDataTestStandAloneSampler, TestStandAoneSequentialSampler) { int64_t num_samples = 0; int64_t start_index = 0; std::shared_ptr sampler = std::make_shared(num_samples, start_index, 3); - std::unique_ptr db; + std::shared_ptr tensor; + TensorRow sample_row; sampler->HandshakeRandomAccessOp(&mock); - sampler->GetNextSample(&db); - db->GetTensor(&tensor, 0, 0); + sampler->GetNextSample(&sample_row); + tensor = sample_row[0]; EXPECT_TRUE((*tensor) == (*label1)); - sampler->GetNextSample(&db); - db->GetTensor(&tensor, 0, 0); + sampler->GetNextSample(&sample_row); + tensor = sample_row[0]; EXPECT_TRUE((*tensor) == (*label2)); sampler->ResetSampler(); - sampler->GetNextSample(&db); - db->GetTensor(&tensor, 0, 0); + sampler->GetNextSample(&sample_row); + tensor = sample_row[0]; EXPECT_TRUE((*tensor) == (*label1)); - sampler->GetNextSample(&db); - db->GetTensor(&tensor, 0, 0); + sampler->GetNextSample(&sample_row); + tensor = sample_row[0]; EXPECT_TRUE((*tensor) == (*label2)); } diff --git a/tests/ut/cpp/dataset/subset_random_sampler_test.cc b/tests/ut/cpp/dataset/subset_random_sampler_test.cc index 332aff96f9..68981ff88d 100644 --- a/tests/ut/cpp/dataset/subset_random_sampler_test.cc +++ b/tests/ut/cpp/dataset/subset_random_sampler_test.cc @@ -18,7 +18,7 @@ #include "minddata/dataset/include/constants.h" #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" #include "minddata/dataset/engine/datasetops/source/sampler/subset_random_sampler.h" @@ -46,11 +46,10 @@ TEST_F(MindDataTestSubsetRandomSampler, TestAllAtOnce) { DummyRandomAccessOp dummyRandomAccessOp(5); sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -61,8 +60,8 @@ TEST_F(MindDataTestSubsetRandomSampler, TestAllAtOnce) { ASSERT_NE(in_set.find(out[i]), in_set.end()); } - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); } TEST_F(MindDataTestSubsetRandomSampler, TestGetNextBuffer) { @@ -75,23 +74,20 @@ TEST_F(MindDataTestSubsetRandomSampler, TestGetNextBuffer) { DummyRandomAccessOp dummyRandomAccessOp(total_samples); sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); int epoch = 0; - while (!db->eoe()) { + while (!row.eoe()) { epoch++; - db->PopRow(&row); for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); } } - db.reset(); - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); } ASSERT_EQ(epoch, (total_samples + samples_per_buffer - 1) / samples_per_buffer); @@ -107,12 +103,10 @@ TEST_F(MindDataTestSubsetRandomSampler, TestReset) { DummyRandomAccessOp dummyRandomAccessOp(5); sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -125,9 +119,8 @@ TEST_F(MindDataTestSubsetRandomSampler, TestReset) { sampler.ResetSampler(); - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), false); - db->PopRow(&row); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), false); out.clear(); for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { @@ -139,6 +132,6 @@ TEST_F(MindDataTestSubsetRandomSampler, TestReset) { ASSERT_NE(in_set.find(out[i]), in_set.end()); } - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); } diff --git a/tests/ut/cpp/dataset/subset_sampler_test.cc b/tests/ut/cpp/dataset/subset_sampler_test.cc index 4e69cf9079..48d5c52d6f 100644 --- a/tests/ut/cpp/dataset/subset_sampler_test.cc +++ b/tests/ut/cpp/dataset/subset_sampler_test.cc @@ -18,7 +18,7 @@ #include "minddata/dataset/include/constants.h" #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" #include "minddata/dataset/engine/datasetops/source/sampler/subset_sampler.h" @@ -46,11 +46,10 @@ TEST_F(MindDataTestSubsetSampler, TestAllAtOnce) { DummyRandomAccessOp dummyRandomAccessOp(5); sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -61,8 +60,8 @@ TEST_F(MindDataTestSubsetSampler, TestAllAtOnce) { ASSERT_EQ(in[i], out[i]); } - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); } TEST_F(MindDataTestSubsetSampler, TestGetNextBuffer) { @@ -75,23 +74,21 @@ TEST_F(MindDataTestSubsetSampler, TestGetNextBuffer) { DummyRandomAccessOp dummyRandomAccessOp(total_samples); sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); int epoch = 0; - while (!db->eoe()) { + while (!row.eoe()) { epoch++; - db->PopRow(&row); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); } } - db.reset(); - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); } ASSERT_EQ(epoch, (total_samples + samples_per_buffer - 1) / samples_per_buffer); @@ -107,12 +104,11 @@ TEST_F(MindDataTestSubsetSampler, TestReset) { DummyRandomAccessOp dummyRandomAccessOp(5); sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -125,9 +121,9 @@ TEST_F(MindDataTestSubsetSampler, TestReset) { sampler.ResetSampler(); - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), false); - db->PopRow(&row); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), false); + out.clear(); for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { @@ -139,6 +135,6 @@ TEST_F(MindDataTestSubsetSampler, TestReset) { ASSERT_EQ(in[i], out[i]); } - ASSERT_EQ(sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); } diff --git a/tests/ut/cpp/dataset/weighted_random_sampler_test.cc b/tests/ut/cpp/dataset/weighted_random_sampler_test.cc index 349f960f8b..6546ec2ae3 100644 --- a/tests/ut/cpp/dataset/weighted_random_sampler_test.cc +++ b/tests/ut/cpp/dataset/weighted_random_sampler_test.cc @@ -18,7 +18,7 @@ #include "minddata/dataset/include/constants.h" #include "minddata/dataset/core/tensor.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "minddata/dataset/engine/datasetops/source/sampler/sampler.h" #include "minddata/dataset/engine/datasetops/source/sampler/weighted_random_sampler.h" #include "utils/log_adapter.h" @@ -55,11 +55,10 @@ TEST_F(MindDataTestWeightedRandomSampler, TestOneshotReplacement) { DummyRandomAccessOp dummyRandomAccessOp(total_samples); m_sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -69,8 +68,8 @@ TEST_F(MindDataTestWeightedRandomSampler, TestOneshotReplacement) { ASSERT_EQ(num_samples, out.size()); - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); } TEST_F(MindDataTestWeightedRandomSampler, TestOneshotNoReplacement) { @@ -85,11 +84,10 @@ TEST_F(MindDataTestWeightedRandomSampler, TestOneshotNoReplacement) { DummyRandomAccessOp dummyRandomAccessOp(total_samples); m_sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -105,8 +103,8 @@ TEST_F(MindDataTestWeightedRandomSampler, TestOneshotNoReplacement) { } } - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); } TEST_F(MindDataTestWeightedRandomSampler, TestGetNextBufferReplacement) { @@ -121,21 +119,20 @@ TEST_F(MindDataTestWeightedRandomSampler, TestGetNextBufferReplacement) { DummyRandomAccessOp dummyRandomAccessOp(total_samples); m_sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); int epoch = 0; - while (!db->eoe()) { + while (!row.eoe()) { epoch++; - db->PopRow(&row); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); } } - db.reset(); - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); + + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); } ASSERT_EQ(epoch, (num_samples + samples_per_buffer - 1) / samples_per_buffer); @@ -157,22 +154,21 @@ TEST_F(MindDataTestWeightedRandomSampler, TestGetNextBufferNoReplacement) { DummyRandomAccessOp dummyRandomAccessOp(total_samples); m_sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); int epoch = 0; - while (!db->eoe()) { + while (!row.eoe()) { epoch++; - db->PopRow(&row); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); freq[*it]++; } } - db.reset(); - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); + + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); } // Without replacement, each sample only drawn once. @@ -198,11 +194,10 @@ TEST_F(MindDataTestWeightedRandomSampler, TestResetReplacement) { DummyRandomAccessOp dummyRandomAccessOp(total_samples); m_sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -211,14 +206,14 @@ TEST_F(MindDataTestWeightedRandomSampler, TestResetReplacement) { } ASSERT_EQ(num_samples, out.size()); - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); m_sampler.ResetSampler(); out.clear(); - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -227,8 +222,8 @@ TEST_F(MindDataTestWeightedRandomSampler, TestResetReplacement) { } ASSERT_EQ(num_samples, out.size()); - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); } TEST_F(MindDataTestWeightedRandomSampler, TestResetNoReplacement) { @@ -243,11 +238,10 @@ TEST_F(MindDataTestWeightedRandomSampler, TestResetNoReplacement) { DummyRandomAccessOp dummyRandomAccessOp(total_samples); m_sampler.HandshakeRandomAccessOp(&dummyRandomAccessOp); - std::unique_ptr db; TensorRow row; std::vector out; - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -256,8 +250,8 @@ TEST_F(MindDataTestWeightedRandomSampler, TestResetNoReplacement) { } ASSERT_EQ(num_samples, out.size()); - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); m_sampler.ResetSampler(); out.clear(); @@ -265,8 +259,8 @@ TEST_F(MindDataTestWeightedRandomSampler, TestResetNoReplacement) { freq.resize(total_samples, 0); MS_LOG(INFO) << "Resetting sampler"; - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - db->PopRow(&row); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + for (const auto &t : row) { for (auto it = t->begin(); it != t->end(); it++) { out.push_back(*it); @@ -282,6 +276,6 @@ TEST_F(MindDataTestWeightedRandomSampler, TestResetNoReplacement) { } } - ASSERT_EQ(m_sampler.GetNextSample(&db), Status::OK()); - ASSERT_EQ(db->eoe(), true); + ASSERT_EQ(m_sampler.GetNextSample(&row), Status::OK()); + ASSERT_EQ(row.eoe(), true); } diff --git a/tests/ut/cpp/dataset/zip_op_test.cc b/tests/ut/cpp/dataset/zip_op_test.cc index 858c6d8179..e130619885 100644 --- a/tests/ut/cpp/dataset/zip_op_test.cc +++ b/tests/ut/cpp/dataset/zip_op_test.cc @@ -28,7 +28,7 @@ #include "minddata/dataset/core/config_manager.h" #include "common/common.h" #include "utils/ms_utils.h" -#include "minddata/dataset/engine/data_buffer.h" + #include "gtest/gtest.h" #include "minddata/dataset/core/global_context.h" #include "utils/log_adapter.h" diff --git a/tests/ut/python/dataset/test_cache_map.py b/tests/ut/python/dataset/test_cache_map.py index 3df178c6d0..0c94c6e069 100644 --- a/tests/ut/python/dataset/test_cache_map.py +++ b/tests/ut/python/dataset/test_cache_map.py @@ -760,11 +760,11 @@ def test_cache_map_parameter_check(): with pytest.raises(TypeError) as info: ds.DatasetCache(session_id="1", size=0) - assert "Argument session_id with value 1 is not of type (,)" in str(info.value) + assert "Argument session_id with value 1 is not of type []" in str(info.value) with pytest.raises(TypeError) as info: ds.DatasetCache(session_id=None, size=0) - assert "Argument session_id with value None is not of type (,)" in str(info.value) + assert "Argument session_id with value None is not of type []" in str(info.value) with pytest.raises(ValueError) as info: ds.DatasetCache(session_id=1, size=-1) @@ -772,19 +772,19 @@ def test_cache_map_parameter_check(): with pytest.raises(TypeError) as info: ds.DatasetCache(session_id=1, size="1") - assert "Argument size with value 1 is not of type (,)" in str(info.value) + assert "Argument size with value 1 is not of type []" in str(info.value) with pytest.raises(TypeError) as info: ds.DatasetCache(session_id=1, size=None) - assert "Argument size with value None is not of type (,)" in str(info.value) + assert "Argument size with value None is not of type []" in str(info.value) with pytest.raises(TypeError) as info: ds.DatasetCache(session_id=1, size=0, spilling="illegal") - assert "Argument spilling with value illegal is not of type (,)" in str(info.value) + assert "Argument spilling with value illegal is not of type []" in str(info.value) with pytest.raises(TypeError) as err: ds.DatasetCache(session_id=1, size=0, hostname=50052) - assert "Argument hostname with value 50052 is not of type (,)" in str(err.value) + assert "Argument hostname with value 50052 is not of type []" in str(err.value) with pytest.raises(RuntimeError) as err: ds.DatasetCache(session_id=1, size=0, hostname="illegal") @@ -796,11 +796,11 @@ def test_cache_map_parameter_check(): with pytest.raises(TypeError) as info: ds.DatasetCache(session_id=1, size=0, port="illegal") - assert "Argument port with value illegal is not of type (,)" in str(info.value) + assert "Argument port with value illegal is not of type []" in str(info.value) with pytest.raises(TypeError) as info: ds.DatasetCache(session_id=1, size=0, port="50052") - assert "Argument port with value 50052 is not of type (,)" in str(info.value) + assert "Argument port with value 50052 is not of type []" in str(info.value) with pytest.raises(ValueError) as err: ds.DatasetCache(session_id=1, size=0, port=0)