From add19a591c30ef219c4859117777b98de7d460ed Mon Sep 17 00:00:00 2001 From: xiefangqi Date: Mon, 13 Jul 2020 15:06:41 +0800 Subject: [PATCH] md support ps-lite --- mindspore/dataset/engine/datasets.py | 14 ++++++- mindspore/dataset/engine/iterators.py | 31 ++++++++++++++++ mindspore/train/dataset_helper.py | 22 ++++++++++- mindspore/train/model.py | 4 ++ tests/ut/python/dataset/test_noop_mode.py | 45 +++++++++++++++++++++++ 5 files changed, 114 insertions(+), 2 deletions(-) create mode 100644 tests/ut/python/dataset/test_noop_mode.py diff --git a/mindspore/dataset/engine/datasets.py b/mindspore/dataset/engine/datasets.py index c1ef6a9922..108ae225d5 100644 --- a/mindspore/dataset/engine/datasets.py +++ b/mindspore/dataset/engine/datasets.py @@ -38,7 +38,7 @@ from mindspore._c_expression import typing from mindspore import log as logger from . import samplers -from .iterators import DictIterator, TupleIterator +from .iterators import DictIterator, TupleIterator, DummyIterator from .validators import check_batch, check_shuffle, check_map, check_filter, check_repeat, check_skip, check_zip, \ check_rename, check_numpyslicesdataset, \ check_take, check_project, check_imagefolderdatasetv2, check_mnist_cifar_dataset, check_manifestdataset, \ @@ -146,6 +146,12 @@ class Dataset: self._num_classes = None self._repeat_count = None self._sync = False + self.ms_role = os.getenv("MS_ROLE") + + def _noop_mode(self): + if self.ms_role in ("MS_PSERVER", "MS_SCHED"): + return True + return False def __add__(self, datasets): return self.concat(datasets) @@ -1062,6 +1068,8 @@ class Dataset: >>> # convert the returned tuple to a list and print >>> print(list(item)) """ + if self._noop_mode(): + return DummyIterator(self, 'tuple') return TupleIterator(self, columns) def create_dict_iterator(self): @@ -1085,6 +1093,8 @@ class Dataset: >>> print(item["column1"]) """ + if self._noop_mode(): + return DummyIterator(self, 'dict') return DictIterator(self) def __iter__(self): @@ -2318,6 +2328,8 @@ class TransferDataset(DatasetOp): def send(self): # need to keep iterator alive so the executionTree is not destroyed + if self._noop_mode(): + return self.iterator = TupleIterator(self) diff --git a/mindspore/dataset/engine/iterators.py b/mindspore/dataset/engine/iterators.py index 1d2d28c1c0..a2a23cbb44 100644 --- a/mindspore/dataset/engine/iterators.py +++ b/mindspore/dataset/engine/iterators.py @@ -17,7 +17,9 @@ from abc import abstractmethod import copy import weakref +import numpy as np +from mindspore.common.tensor import Tensor from mindspore._c_dataengine import DEPipeline from mindspore._c_dataengine import OpName @@ -287,3 +289,32 @@ class TupleIterator(Iterator): """ return [t.as_array() for t in self.depipeline.GetNextAsList()] + + +class DummyIterator(): + """ + A DummyIterator only work when env MS_ROLE="MS_PSERVER" or MS_ROLE="MS_SCHED" + """ + def __init__(self, dataset, mode): + self.mode = mode + self.shapes = dataset.output_shapes() + self.types = dataset.output_types() + self.fetched_first = False + + def __get_tensor(self): + tensor_row = [] + for np_shape, np_type in zip(self.shapes, self.types): + input_np = np.zeros(np_shape, np_type) + tensor = Tensor(input_np) + tensor_row.append(tensor) + return tensor_row + + def __iter__(self): + return self + + def __next__(self): + if self.mode == "tuple": + if not self.fetched_first: + self.fetched_first = True + return self.__get_tensor() + raise StopIteration() diff --git a/mindspore/train/dataset_helper.py b/mindspore/train/dataset_helper.py index 14797e568b..75e1deabc4 100644 --- a/mindspore/train/dataset_helper.py +++ b/mindspore/train/dataset_helper.py @@ -14,6 +14,7 @@ # ============================================================================ """Dataset help for minddata dataset""" import math +import os from mindspore._checkparam import check_bool from .. import context @@ -60,7 +61,11 @@ class DatasetHelper: if context.get_context("device_target") == "Ascend": iterclass = _DatasetIterMSLoopSink elif context.get_context("device_target") == "GPU": - iterclass = _DatasetIterMS + ms_role = os.getenv("MS_ROLE") + if ms_role in ("MS_PSERVER", "MS_SCHED"): + iterclass = _DatasetIterPSLite + else: + iterclass = _DatasetIterMS elif context.get_context("device_target") == "CPU": raise RuntimeError("Currently dataset sink mode is not supported when the device target is CPU.") else: @@ -131,6 +136,9 @@ class _DatasetIterMSLoopSink(_DatasetIter): def __init__(self, dataset): super(_DatasetIterMSLoopSink, self).__init__(dataset) self.loop_count = self.get_loop_count(dataset) + ms_role = os.getenv("MS_ROLE") + if ms_role in ("MS_PSERVER", "MS_SCHED"): + self.loop_count = 1 # for self._parallel_mode equal to semi_auto_parallel or auto_parallel, and not using full_batch, # use a complete tensor to compile, and slice tensor to run. The batch dimension of tensors for # compile is device_number times the batch dimension of tensors for run. Now only support LoopSink. @@ -154,6 +162,18 @@ class _DatasetIterMS(_DatasetIter): self.op = GetNextSingleOp(self.dataset_types, self.dataset_shapes, queue_name) +class _DatasetIterPSLite(_DatasetIter): + """Iter for context (device_target=GPU) on MS_PSERVER or MS_SCHED""" + def __init__(self, dataset): + super(_DatasetIterPSLite, self).__init__(dataset) + self.loop_count = 1 + self.loop_size = 1 + self.op = None + def op(): + return _construct_tensor_list(self.dataset_types, self.dataset_shapes, batch_expand_num=1) + self.op = op + + class _DatasetIterGE(_DatasetIter): """Iter for ge""" def __init__(self, dataset): diff --git a/mindspore/train/model.py b/mindspore/train/model.py index 79bd6bc90b..74fd668e82 100755 --- a/mindspore/train/model.py +++ b/mindspore/train/model.py @@ -15,6 +15,7 @@ """Model.""" from collections.abc import Iterable +import os import numpy as np from mindspore import log as logger @@ -350,6 +351,9 @@ class Model: cb_params.train_dataset = train_dataset cb_params.list_callback = self._transform_callbacks(callbacks) cb_params.train_dataset_element = None + ms_role = os.getenv("MS_ROLE") + if ms_role in ("MS_PSERVER", "MS_SCHED"): + epoch = 1 # build callback list with _CallbackManager(callbacks) as list_callback: diff --git a/tests/ut/python/dataset/test_noop_mode.py b/tests/ut/python/dataset/test_noop_mode.py new file mode 100644 index 0000000000..0ea9673200 --- /dev/null +++ b/tests/ut/python/dataset/test_noop_mode.py @@ -0,0 +1,45 @@ +# Copyright 2020 Huawei Technologies Co., Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============================================================================== +""" +Test No-op mode support with Dummy Iterator +""" +import os +import mindspore.dataset as ds + +DATA_DIR = "../data/dataset/testVOC2012" + +def test_noop_pserver(): + os.environ['MS_ROLE'] = 'MS_PSERVER' + data1 = ds.VOCDataset(DATA_DIR, task="Segmentation", mode="train", decode=True, shuffle=False) + num = 0 + for _ in data1.create_dict_iterator(): + num += 1 + assert num == 0 + del os.environ['MS_ROLE'] + + +def test_noop_sched(): + os.environ['MS_ROLE'] = 'MS_SCHED' + data1 = ds.VOCDataset(DATA_DIR, task="Segmentation", mode="train", decode=True, shuffle=False) + num = 0 + for _ in data1.create_dict_iterator(): + num += 1 + assert num == 0 + del os.environ['MS_ROLE'] + + +if __name__ == '__main__': + test_noop_pserver() + test_noop_sched()