!9310 [MD] dynamic shape for ascend in sink mode

From: @liyong126
Reviewed-by: 
Signed-off-by:
pull/9310/MERGE
mindspore-ci-bot 5 years ago committed by Gitee
commit 437eac002c

@ -76,6 +76,12 @@ PYBIND_REGISTER(
THROW_IF_ERROR(de.GetOutputTypes(&out));
return out;
})
.def("GetDataInfo",
[](DEPipeline &de) {
py::list types, shapes;
THROW_IF_ERROR(de.GetDataInfo(&types, &shapes));
return py::make_tuple(types, shapes);
})
.def("GetDatasetSize", &DEPipeline::GetDatasetSize)
.def("GetBatchSize", &DEPipeline::GetBatchSize)
.def("GetNumClasses", &DEPipeline::GetNumClasses)

@ -241,6 +241,30 @@ Status DEPipeline::GetNextAsList(py::list *output) {
return Status::OK();
}
Status DEPipeline::GetDataInfo(py::list *types, py::list *shapes) {
Status s;
DATA_INFO data_info;
// tree_.root() must be DeviceQueueOp
DeviceQueueOp *op = dynamic_cast<DeviceQueueOp *>(tree_->root().get());
if (op == nullptr) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "GetDataInfo only supported by DeviceQueueOp");
}
{
py::gil_scoped_release gil_release;
s = op->GetDataInfo(&data_info);
}
RETURN_IF_NOT_OK(s);
for (auto el : data_info) {
types->append(el.first.AsNumpyType());
py::list shape;
for (auto dim : el.second.AsVector()) {
shape.append(dim);
}
shapes->append(shape);
}
return Status::OK();
}
Status DEPipeline::GetOutputShapes(py::list *output) {
std::vector<TensorShape> shapes;
Status s;
@ -1052,6 +1076,8 @@ Status DEPipeline::ParseDeviceQueueOp(const py::dict &args, std::shared_ptr<Data
(void)builder->SetDeviceId(ToInt(value));
} else if (key == "send_epoch_end") {
(void)builder->SetSendEpochEnd(ToBool(value));
} else if (key == "create_data_info_queue") {
(void)builder->SetCreateDataInfoQueue(ToBool(value));
}
}
}

@ -111,6 +111,8 @@ class DEPipeline {
Status GetOutputTypes(py::list *output);
Status GetDataInfo(py::list *types, py::list *shapes);
Status SaveDataset(const std::vector<std::string> &file_names, const std::string &file_type);
int GetDatasetSize() const;

@ -32,14 +32,15 @@
namespace mindspore {
namespace dataset {
DeviceQueueOp::DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size,
bool send_epoch_end)
bool send_epoch_end, bool create_data_info_queue)
: PipelineOp(1),
channel_name_(channel_name),
device_type_(device_type),
device_id_(device_id),
prefetch_size_(prefetch_size),
send_epoch_end_(send_epoch_end),
stop_send_(false) {
stop_send_(false),
create_data_info_queue_(create_data_info_queue) {
#ifdef ENABLE_TDTQUE
ascend_keep_waiting_ = true;
#endif
@ -84,6 +85,10 @@ Status DeviceQueueOp::operator()() {
if (device_type_ == DeviceType::Ascend) {
#ifdef ENABLE_TDTQUE
if (create_data_info_queue_) {
data_info_queue_ptr_ = std::make_unique<DATA_INFO_QUEUE>(kDataInfoQueueCapacity);
RETURN_IF_NOT_OK(data_info_queue_ptr_->Register(tree_->AllTasks()));
}
RETURN_IF_NOT_OK(SendDataToAscend());
#endif
} else if (device_type_ == DeviceType::GPU) {
@ -137,6 +142,13 @@ Status DeviceQueueOp::SendDataToAscend() {
return Status(StatusCode::kTDTPushFailure, "TDT Push Failed");
}
}
if (create_data_info_queue_) {
DATA_INFO data_info;
(void)std::transform(
currRow.begin(), currRow.end(), std::back_inserter(data_info),
[](const std::shared_ptr<Tensor> &ts) { return std::make_pair(ts->type(), ts->shape()); });
RETURN_IF_NOT_OK(data_info_queue_ptr_->Add(data_info));
}
if (isProfilingEnable) {
end_time = ProfilingTime::GetCurMilliSecond();
@ -187,6 +199,21 @@ Status DeviceQueueOp::SendDataToAscend() {
return Status::OK();
}
#endif
#ifdef ENABLE_TDTQUE
Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) {
if (!create_data_info_queue_) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "DataInfo queue is not created.");
}
RETURN_IF_NOT_OK(data_info_queue_ptr_->PopFront(data_info));
return Status::OK();
}
#else
Status DeviceQueueOp::GetDataInfo(DATA_INFO *data_info) {
return Status(StatusCode::kUnexpectedError, __LINE__, __FILE__, "GetDataInfo is not supported yet.");
}
#endif
#ifdef ENABLE_GPUQUE

@ -18,6 +18,7 @@
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "minddata/dataset/engine/datasetops/pipeline_op.h"
@ -25,6 +26,7 @@
#include "minddata/dataset/util/status.h"
#ifdef ENABLE_TDTQUE
#include "minddata/dataset/util/queue.h"
#include "minddata/dataset/engine/tdt/tdt_plugin.h"
#endif
@ -37,6 +39,10 @@ using mindspore::device::GpuBufferMgr;
namespace mindspore {
namespace dataset {
using DATA_INFO = std::vector<std::pair<DataType, TensorShape>>;
using DATA_INFO_QUEUE = Queue<DATA_INFO>;
const int kDataInfoQueueCapacity = 128;
class DeviceQueueOp : public PipelineOp {
public:
static const uint32_t INVALID_HANDLE = 0xffffffffUL;
@ -86,13 +92,18 @@ class DeviceQueueOp : public PipelineOp {
return *this;
}
Builder &SetCreateDataInfoQueue(bool create_data_info_queue) {
builder_create_data_info_queue_ = create_data_info_queue;
return *this;
}
// Name: Build()
// Description: The final step for building a DeviceQueueOp via the Builder is
// to call this Build() method. It will instantiate the DeviceQueueOp
// and return it to caller as a shared pointer.
Status Build(std::shared_ptr<DeviceQueueOp> *ptr) {
*ptr = std::make_shared<DeviceQueueOp>(builder_channel_name_, builder_device_type_, builder_device_id_,
builder_prefetch_size_, builder_send_epoch_end_);
builder_prefetch_size_, builder_send_epoch_end_,
builder_create_data_info_queue_);
return Status::OK();
}
@ -102,12 +113,13 @@ class DeviceQueueOp : public PipelineOp {
DeviceType builder_device_type_;
std::string builder_channel_name_;
bool builder_send_epoch_end_;
bool builder_create_data_info_queue_;
};
// Name: constructor
// Description
DeviceQueueOp(std::string channel_name, DeviceType device_type, int32_t device_id, int32_t prefetch_size,
bool send_epoch_end);
bool send_epoch_end, bool create_data_info_queue);
// Name: destructor
// Description
@ -132,6 +144,8 @@ class DeviceQueueOp : public PipelineOp {
void StopWaiting() { ascend_keep_waiting_ = false; }
#endif
Status GetDataInfo(DATA_INFO *data_info);
// Name: Print()
// Description: A function that prints info about the node
void Print(std::ostream &out, // In: The output stream to print to
@ -164,6 +178,7 @@ class DeviceQueueOp : public PipelineOp {
#ifdef ENABLE_TDTQUE
Status SendDataToAscend();
bool ascend_keep_waiting_;
#endif
#ifdef ENABLE_GPUQUE
@ -182,6 +197,8 @@ class DeviceQueueOp : public PipelineOp {
const int32_t prefetch_size_;
const bool send_epoch_end_;
bool stop_send_;
bool create_data_info_queue_;
std::unique_ptr<DATA_INFO_QUEUE> data_info_queue_ptr_;
#ifdef ENABLE_TDTQUE
std::shared_ptr<TdtPlugin> tdtInstancePtr;

@ -988,7 +988,7 @@ class Dataset:
return dataset
@check_device_send
def device_que(self, prefetch_size=None, send_epoch_end=True):
def device_que(self, prefetch_size=None, send_epoch_end=True, create_data_info_queue=False):
"""
Return a transferred Dataset that transfers data through a device.
@ -996,6 +996,8 @@ class Dataset:
prefetch_size (int, optional): Prefetch number of records ahead of the
user's request (default=None).
send_epoch_end (bool, optional): Whether to send end of sequence to device or not (default=True).
create_data_info_queue (bool, optional): Whether to create queue which stores
types and shapes of data or not(default=False).
Note:
If device is Ascend, features of data will be transferred one by one. The limitation
@ -1004,15 +1006,17 @@ class Dataset:
Return:
TransferDataset, dataset for transferring.
"""
return self.to_device(send_epoch_end=send_epoch_end)
return self.to_device(send_epoch_end=send_epoch_end, create_data_info_queue=create_data_info_queue)
@check_device_send
def to_device(self, send_epoch_end=True):
def to_device(self, send_epoch_end=True, create_data_info_queue=False):
"""
Transfer data through CPU, GPU or Ascend devices.
Args:
send_epoch_end (bool, optional): Whether to send end of sequence to device or not (default=True).
create_data_info_queue (bool, optional): Whether to create queue which stores
types and shapes of data or not(default=False).
Note:
If device is Ascend, features of data will be transferred one by one. The limitation
@ -1061,7 +1065,7 @@ class Dataset:
distribution_path, device_id = get_distribution(self)
if distribution_path == "":
return TransferDataset(self, queue_name, device_id, device_type, send_epoch_end)
return TransferDataset(self, queue_name, device_id, device_type, send_epoch_end, create_data_info_queue)
try:
with open(distribution_path, 'r') as distribution_f:
dist = json.load(distribution_f)
@ -1071,7 +1075,7 @@ class Dataset:
except Exception:
raise RuntimeError("Distribution file failed to read")
return TransferDataset(self, queue_name, device_id, device_type, send_epoch_end)
return TransferDataset(self, queue_name, device_id, device_type, send_epoch_end, create_data_info_queue)
@check_save
def save(self, file_name, num_files=1, file_type='mindrecord'):
@ -1775,6 +1779,25 @@ class BatchDataset(DatasetOp):
for input_dataset in dataset.children:
BatchDataset._update_batch_size_for_syncwait(input_dataset, batch_size)
def __deepcopy__(self, memodict):
if id(self) in memodict:
return memodict[id(self)]
cls = self.__class__
new_op = cls.__new__(cls)
memodict[id(self)] = new_op
new_op.children = copy.deepcopy(self.children, memodict)
new_op.parent = copy.deepcopy(self.parent, memodict)
new_op.num_parallel_workers = self.num_parallel_workers
new_op.batch_size = self.batch_size
new_op.drop_remainder = self.drop_remainder
new_op.per_batch_map = self.per_batch_map
new_op.input_columns = copy.deepcopy(self.input_columns, memodict)
new_op.output_columns = copy.deepcopy(self.output_columns, memodict)
new_op.column_order = copy.deepcopy(self.column_order, memodict)
new_op.pad_info = copy.deepcopy(self.pad_info, memodict)
new_op._input_indexs = self._input_indexs # pylint: disable=W0212
return new_op
class BatchInfo(CBatchInfo):
"""
@ -2600,9 +2623,12 @@ class TransferDataset(DatasetOp):
device_id (int): ID of device.
device_type (str): Type of device, including "CPU", "GPU", and "Ascend".
send_epoch_end (bool, optional): Whether to send end of sequence to device or not (default=True).
create_data_info_queue (bool, optional): Whether to create queue which stores
types and shapes of data or not(default=False).
"""
def __init__(self, input_dataset, queue_name, device_id, device_type, send_epoch_end=True):
def __init__(self, input_dataset, queue_name, device_id, device_type, send_epoch_end=True,
create_data_info_queue=False):
super().__init__()
self.children.append(input_dataset)
input_dataset.parent.append(self)
@ -2612,6 +2638,7 @@ class TransferDataset(DatasetOp):
self._device_id = device_id
self._send_epoch_end = send_epoch_end
self.iterator = None
self._create_data_info_queue = create_data_info_queue
def get_args(self):
args = super().get_args()
@ -2619,6 +2646,7 @@ class TransferDataset(DatasetOp):
args["device_type"] = self._device_type
args["device_id"] = self._device_id
args["send_epoch_end"] = self._send_epoch_end
args["create_data_info_queue"] = self._create_data_info_queue
return args
def create_dict_iterator(self, num_epochs=-1, output_numpy=False):
@ -2650,6 +2678,27 @@ class TransferDataset(DatasetOp):
def continue_send(self):
self.iterator.depipeline.ContinueSend()
def get_data_info(self):
return self.iterator.depipeline.GetDataInfo()
def __deepcopy__(self, memodict):
if id(self) in memodict:
return memodict[id(self)]
cls = self.__class__
new_op = cls.__new__(cls)
memodict[id(self)] = new_op
new_op.children = copy.deepcopy(self.children, memodict)
new_op.parent = copy.deepcopy(self.parent, memodict)
new_op.num_parallel_workers = self.num_parallel_workers
new_op.queue_name = self.queue_name
new_op._device_type = self._device_type # pylint: disable=W0212
new_op._device_id = self._device_id # pylint: disable=W0212
new_op._input_indexs = self._input_indexs # pylint: disable=W0212
new_op._send_epoch_end = self._send_epoch_end # pylint: disable=W0212
new_op._create_data_info_queue = self._create_data_info_queue # pylint: disable=W0212
return new_op
class RangeDataset(MappableDataset):
"""

@ -304,6 +304,7 @@ class PKSampler(BuiltinSampler):
Args:
num_val (int): Number of elements to sample for each class.
num_class (int, optional): Number of classes to sample (default=None, all classes).
The parameter is not supported to specify currently.
shuffle (bool, optional): If True, the class IDs are shuffled (default=False).
class_column (str, optional): Name of column with class labels for MindDataset (default='label').
num_samples (int, optional): The number of samples to draw (default=None, all elements).

@ -50,7 +50,7 @@ def _get_types_and_shapes(dataset):
return dataset_types, dataset_shapes
def _exec_datagraph(exec_dataset, dataset_size, phase='dataset'):
def _exec_datagraph(exec_dataset, dataset_size, phase='dataset', create_data_info_queue=False):
"""Initialize and execute the dataset graph."""
batch_size = exec_dataset.get_batch_size()
input_indexs = exec_dataset.input_indexs
@ -58,7 +58,7 @@ def _exec_datagraph(exec_dataset, dataset_size, phase='dataset'):
# transform data format
dataset_types, dataset_shapes = _get_types_and_shapes(exec_dataset)
send_epoch_end = bool(dataset_size == -1)
exec_dataset = exec_dataset.device_que(send_epoch_end=send_epoch_end)
exec_dataset = exec_dataset.device_que(send_epoch_end=send_epoch_end, create_data_info_queue=create_data_info_queue)
_executor.init_dataset(exec_dataset.queue_name,
dataset_size,

@ -17,6 +17,7 @@ import math
import os
from mindspore._checkparam import check_bool, check_int
from mindspore.common.dtype import pytype_to_dtype
from .. import context, nn
from ._utils import _exec_datagraph, _get_types_and_shapes, _construct_tensor_list
from ..nn.wrap import GetNextSingleOp
@ -31,6 +32,7 @@ def _send_data(dataset, epoch_num):
exec_dataset.send(epoch_num)
dataset.__has_sent__ = True
def _send_data_no_flag(dataset, epoch_num):
"""Engine dataset to write data to tdt queue directly."""
exec_dataset = dataset.__TRANSFER_DATASET__
@ -70,6 +72,7 @@ def connect_network_with_dataset(network, dataset_helper):
Wraps the input network with a dataset which automatically fetches data with 'GetNext' function from the
dataset channel 'queue_name' and performs the forward computation.
"""
def __init__(self, network, dataset_types, dataset_shapes, queue_name):
super(_DataWrapper, self).__init__(auto_prefix=False, flags=network.get_flags())
# Also copy the flag in `network` construct
@ -88,16 +91,41 @@ def connect_network_with_dataset(network, dataset_helper):
if isinstance(dataset_iter, _DatasetIterNormal):
raise RuntimeError("Dataset should be connected with network only in sink mode.")
if (hasattr(dataset_iter, "sink_size") and dataset_iter.sink_size == 1) \
and (hasattr(dataset_iter, "sink_count") and dataset_iter.sink_count == 1) \
and context.get_context("device_target") == "Ascend":
if not hasattr(dataset, '__network__'):
dataset.__network__ = network
network = dataset.__network__
dataset_types, dataset_shapes = dataset_helper.get_data_info()
dataset_types = [pytype_to_dtype(x) for x in dataset_types]
key = str(dataset_types) + str(dataset_shapes)
if hasattr(dataset, '__network_manage__') and key in dataset.__network_manage__:
network = dataset.__network_manage__[key]
else:
if _need_to_full():
device_num = _get_device_num()
dataset_shapes = _to_full_shapes(dataset_shapes, device_num)
network = _DataWrapper(network, dataset_types, dataset_shapes, dataset.__TRANSFER_DATASET__.queue_name)
dataset.__network_manage__ = dataset.__network_manage__ if hasattr(
dataset, '__network_manage__') else dict()
dataset.__network_manage__[key] = network
return network
if not hasattr(dataset, '__ME_INITED__') and context.get_context("device_target") == "Ascend" and \
not context.get_context("enable_ge"):
dataset.__ME_INITED__ = True
dataset_types, dataset_shapes = dataset_helper.types_shapes()
queue_name = dataset.__TRANSFER_DATASET__.queue_name
network = _DataWrapper(network, dataset_types, dataset_shapes, queue_name)
return network
class DatasetHelper:
"""
DatasetHelper is a class to process the MindData dataset and it provides the information of dataset.
@ -167,18 +195,25 @@ class DatasetHelper:
"""continue send data to device at the beginning of epoch."""
self.iter.continue_send()
def get_data_info(self):
return self.iter.get_data_info()
class _DatasetIter:
"""Base iter for dataset helper"""
def __init__(self, dataset, sink_size, epoch_num):
self.dataset = dataset
self.sink_size = sink_size
self.sink_count = 1
self.sink_count = self.get_sink_count(dataset)
if not hasattr(dataset, '__TRANSFER_DATASET__'):
if hasattr(dataset, '__loop_size__'):
self.sink_size = dataset.__loop_size__
dataset.__TRANSFER_DATASET__ = _exec_datagraph(dataset, self.sink_size)
create_data_info_queue = (sink_size == 1 and self.sink_count == 1 and context.get_context(
"device_target") == "Ascend")
dataset.__TRANSFER_DATASET__ = _exec_datagraph(dataset, self.sink_size,
create_data_info_queue=create_data_info_queue)
if not hasattr(dataset, '__no_send__'):
_send_data(dataset, epoch_num)
@ -187,6 +222,7 @@ class _DatasetIter:
self.stop_send = dataset.__TRANSFER_DATASET__.stop_send
self.continue_send = dataset.__TRANSFER_DATASET__.continue_send
self.get_data_info = dataset.__TRANSFER_DATASET__.get_data_info
self.dataset_types, self.dataset_shapes = _get_types_and_shapes(dataset)
def __iter__(self):
@ -228,6 +264,7 @@ class _DatasetIter:
class _DatasetIterGE(_DatasetIter):
"""Iter for GE."""
def __init__(self, dataset, sink_size, epoch_num):
super().__init__(dataset, sink_size, epoch_num)
self.sink_count = self.get_sink_count(dataset)
@ -244,6 +281,7 @@ class _DatasetIterGE(_DatasetIter):
class _DatasetIterMSLoopSink(_DatasetIter):
"""Iter for context (device_target=Ascend)"""
def __init__(self, dataset, sink_size, epoch_num):
super().__init__(dataset, sink_size, epoch_num)
self.sink_count = self.get_sink_count(dataset)
@ -265,6 +303,7 @@ class _DatasetIterMSLoopSink(_DatasetIter):
class _DatasetIterMS(_DatasetIter):
"""Iter for MS(enable_loop_sink=False)."""
def __init__(self, dataset, sink_size, epoch_num):
super().__init__(dataset, sink_size, epoch_num)
if sink_size > 0:
@ -278,11 +317,13 @@ class _DatasetIterMS(_DatasetIter):
class _DatasetIterPSLite(_DatasetIter):
"""Iter for context (device_target=GPU) on MS_PSERVER or MS_SCHED"""
def __init__(self, dataset, sink_size, epoch_num):
super().__init__(dataset, sink_size, epoch_num)
self.sink_count = 1
self.sink_size = 1
self.op = None
def op():
return _construct_tensor_list(self.dataset_types, self.dataset_shapes, batch_expand_num=1)
self.op = op

@ -250,11 +250,14 @@ class Model:
scaling_sens /= self._device_number
return scaling_sens
def _exec_preprocess(self, network, is_train, phase, dataset, dataset_sink_mode, sink_size=-1, epoch_num=1):
def _exec_preprocess(self, network, is_train, phase, dataset,
dataset_sink_mode, sink_size=-1, epoch_num=1, dataset_helper=None):
"""Initializes dataset."""
if dataset_sink_mode and not is_train:
dataset.__loop_size__ = 1
dataset_helper = DatasetHelper(dataset, dataset_sink_mode, sink_size, epoch_num)
if dataset_helper is None:
dataset_helper = DatasetHelper(dataset, dataset_sink_mode, sink_size, epoch_num)
if dataset_sink_mode:
network = connect_network_with_dataset(network, dataset_helper)
@ -404,15 +407,6 @@ class Model:
else:
epoch_num = math.ceil(epoch * sink_size / train_dataset.get_dataset_size())
dataset_helper, train_network = self._exec_preprocess(self._train_network,
is_train=True,
phase='train',
dataset=train_dataset,
dataset_sink_mode=True,
sink_size=sink_size,
epoch_num=epoch_num)
self._train_network = train_network
cb_params.train_network = self._train_network
cb_params.cur_step_num = 0
run_context = RunContext(cb_params)
@ -420,9 +414,21 @@ class Model:
# used to stop training for early stop, such as stopAtTIme or stopATStep
should_stop = False
dataset_helper = None
for i in range(epoch):
cb_params.cur_epoch_num = i + 1
list_callback.epoch_begin(run_context)
dataset_helper, train_network = self._exec_preprocess(self._train_network,
is_train=True,
phase='train',
dataset=train_dataset,
dataset_sink_mode=True,
sink_size=sink_size,
epoch_num=epoch_num,
dataset_helper=dataset_helper)
self._train_network = train_network
cb_params.train_network = self._train_network
# for data sink dataset_helper only iter once, other wise iter epoch_size times.
for inputs in dataset_helper:

@ -50,7 +50,7 @@ class MindData:
def input_indexs(self):
return self._input_indexs
def device_que(self, send_epoch_end=True):
def device_que(self, send_epoch_end=True, create_data_info_queue=False):
self.queue_name = '6ba41974-209e-11ea-88b0-a24efeb2c736'
self.send_epoch_end = send_epoch_end
return self
@ -61,6 +61,9 @@ class MindData:
def send(self, num_epochs=-1):
pass
def get_data_info(self):
pass
def stop_send(self):
pass

Loading…
Cancel
Save