From a900015c0302bad1654b7d664677fab2313fb7f8 Mon Sep 17 00:00:00 2001 From: Dun Liang Date: Sat, 12 Jan 2019 19:18:59 +0800 Subject: [PATCH 1/7] add async copy and pinned place --- .../fluid/operators/reader/buffered_reader.cc | 36 ++++++++++++++++++- .../fluid/operators/reader/buffered_reader.h | 6 ++++ python/paddle/fluid/layers/io.py | 23 ++++++++++-- 3 files changed, 61 insertions(+), 4 deletions(-) diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index 26ff221dfa..d5a7c50d95 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -14,6 +14,7 @@ #include "paddle/fluid/operators/reader/buffered_reader.h" #include +#include "paddle/fluid/framework/data_type.h" namespace paddle { namespace operators { @@ -24,6 +25,12 @@ BufferedReader::~BufferedReader() { position_.front().wait(); position_.pop(); } +#ifdef PADDLE_WITH_CUDA + if (platform::is_gpu_place(place_)) { + platform::SetDeviceId(boost::get(place_).device); + PADDLE_ENFORCE(cudaStreamDestroy(stream)); + } +#endif } BufferedReader::BufferedReader( @@ -33,6 +40,12 @@ BufferedReader::BufferedReader( thread_pool_(1), place_(place), buffer_size_(buffer_size) { +#ifdef PADDLE_WITH_CUDA + if (platform::is_gpu_place(place_)) { + platform::SetDeviceId(boost::get(place_).device); + PADDLE_ENFORCE(cudaStreamCreate(&stream)); + } +#endif cpu_buffer_.resize(buffer_size); gpu_buffer_.resize(buffer_size); ReadTillBufferFullAsync(); @@ -54,14 +67,35 @@ void BufferedReader::ReadAsync(size_t i) { return -1UL; } +#ifdef PADDLE_WITH_CUDA + // NOTE(liangdun): using async copy instead of TensorCopySync + // TensorCopySync would block other stream if (platform::is_gpu_place(place_)) { TensorVec &gpu = gpu_buffer_[i]; gpu.resize(cpu.size()); for (size_t i = 0; i < cpu.size(); ++i) { - framework::TensorCopySync(cpu[i], place_, &gpu[i]); + gpu[i].Resize(cpu[i].dims()); + gpu[i].set_layout(cpu[i].layout()); + auto cpu_place = cpu[i].place(); + auto cpu_ptr = cpu[i].data(); + auto gpu_ptr = gpu[i].mutable_data(place_, cpu[i].type()); + auto size = + cpu[i].numel() * paddle::framework::SizeOfType(cpu[i].type()); + if (platform::is_cuda_pinned_place(cpu_place)) + memory::Copy(boost::get(place_), gpu_ptr, + boost::get(cpu_place), + cpu_ptr, size, stream); + else + // if cpu place is not pinned, async copy is slower than sync copy, + // so we use sync copy instead. + memory::Copy(boost::get(place_), gpu_ptr, + boost::get(cpu_place), cpu_ptr, size, + 0); gpu[i].set_lod(cpu[i].lod()); } + PADDLE_ENFORCE(cudaStreamSynchronize(stream)); } +#endif return i; })); } diff --git a/paddle/fluid/operators/reader/buffered_reader.h b/paddle/fluid/operators/reader/buffered_reader.h index cbe2bc1b5f..e55572177c 100644 --- a/paddle/fluid/operators/reader/buffered_reader.h +++ b/paddle/fluid/operators/reader/buffered_reader.h @@ -19,6 +19,9 @@ #include #include "ThreadPool.h" #include "paddle/fluid/framework/reader.h" +#ifdef PADDLE_WITH_CUDA +#include "paddle/fluid/platform/gpu_info.h" +#endif namespace paddle { namespace operators { @@ -59,6 +62,9 @@ class BufferedReader : public framework::DecoratedReader { std::vector cpu_buffer_; std::vector gpu_buffer_; size_t prev_pos_{-1UL}; +#ifdef PADDLE_WITH_CUDA + cudaStream_t stream; +#endif }; } // namespace reader diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 9a29b25093..a5f91aad79 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -483,6 +483,7 @@ def _py_reader(capacity, lod_levels=None, name=None, use_double_buffer=True, + use_cuda_pinned_place=False, feed_list=None): if feed_list is not None: @@ -565,7 +566,10 @@ def _py_reader(capacity, for item in tensors: if not isinstance(item, core.LoDTensor): tmp = core.LoDTensor() - tmp.set(item, core.CPUPlace()) + if use_cuda_pinned_place: + tmp.set(item, core.CUDAPinnedPlace()) + else: + tmp.set(item, core.CPUPlace()) item = tmp array.append(item) @@ -635,7 +639,8 @@ def py_reader(capacity, dtypes, lod_levels=None, name=None, - use_double_buffer=True): + use_double_buffer=True, + use_cuda_pinned_place=None): """ Create a Python reader for data feeding in Python @@ -659,6 +664,9 @@ def py_reader(capacity, name(basestring): The prefix Python queue name and Reader name. None will be generated automatically. use_double_buffer(bool): Whether use double buffer or not. + use_cuda_pinned_place(bool): Whether use cuda pinned place or not, + this option only works with double buffer and cuda enabled. + None will be enabled when double buffer and cuda are enabled. Returns: Variable: A Reader from which we can get feeding data. @@ -754,13 +762,22 @@ def py_reader(capacity, >>> except fluid.core.EOFException: >>> test_reader.reset() """ + if use_double_buffer and core.is_compiled_with_cuda(): + if use_cuda_pinned_place == None: + use_cuda_pinned_place = True + else: + if use_cuda_pinned_place: + raise RuntimeError( + "use_cuda_pinned_place can only be used with double buffer and cuda enabled." + ) return _py_reader( capacity=capacity, shapes=shapes, dtypes=dtypes, lod_levels=lod_levels, name=name, - use_double_buffer=use_double_buffer) + use_double_buffer=use_double_buffer, + use_cuda_pinned_place=use_cuda_pinned_place) def create_py_reader_by_data(capacity, From 0c5c561bd15a459ed4c1b9a5893d9da7dd1ca65c Mon Sep 17 00:00:00 2001 From: Dun Liang Date: Sat, 12 Jan 2019 22:46:12 +0800 Subject: [PATCH 4/7] test=develop --- paddle/fluid/API.spec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 9872631553..d2a9899ea5 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -218,7 +218,7 @@ paddle.fluid.layers.shuffle ArgSpec(args=['reader', 'buffer_size'], varargs=None paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,)) -paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True)) +paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer', 'use_cuda_pinned_place'], varargs=None, keywords=None, defaults=(None, None, True, None)) paddle.fluid.layers.create_py_reader_by_data ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True)) paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None) From e5004f3c1c142b39b12bc3c88faa22acee859efe Mon Sep 17 00:00:00 2001 From: Dun Liang Date: Sun, 20 Jan 2019 16:52:38 +0800 Subject: [PATCH 5/7] fix ci && test=develop --- paddle/fluid/operators/reader/buffered_reader.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index d5a7c50d95..971db8b37d 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -85,6 +85,10 @@ void BufferedReader::ReadAsync(size_t i) { memory::Copy(boost::get(place_), gpu_ptr, boost::get(cpu_place), cpu_ptr, size, stream); + else if ((platform::is_gpu_place(cpu_place))) + memory::Copy(boost::get(place_), gpu_ptr, + boost::get(cpu_place), cpu_ptr, + size, stream); else // if cpu place is not pinned, async copy is slower than sync copy, // so we use sync copy instead. From db9e700ba1d7fb4a264225439bf66f24fba66ff4 Mon Sep 17 00:00:00 2001 From: Dun Liang Date: Fri, 25 Jan 2019 15:21:06 +0800 Subject: [PATCH 7/7] default use pin place && test=develop --- paddle/fluid/API.spec | 2 +- python/paddle/fluid/layers/io.py | 20 +++----------------- 2 files changed, 4 insertions(+), 18 deletions(-) diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index d2a9899ea5..9872631553 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -218,7 +218,7 @@ paddle.fluid.layers.shuffle ArgSpec(args=['reader', 'buffer_size'], varargs=None paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,)) -paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer', 'use_cuda_pinned_place'], varargs=None, keywords=None, defaults=(None, None, True, None)) +paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True)) paddle.fluid.layers.create_py_reader_by_data ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True)) paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None) diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index a5f91aad79..47686eb60a 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -483,9 +483,8 @@ def _py_reader(capacity, lod_levels=None, name=None, use_double_buffer=True, - use_cuda_pinned_place=False, feed_list=None): - + use_cuda_pinned_place = use_double_buffer and core.is_compiled_with_cuda() if feed_list is not None: if not isinstance(feed_list, list): raise TypeError("feed_list should be a list of Variable" @@ -639,8 +638,7 @@ def py_reader(capacity, dtypes, lod_levels=None, name=None, - use_double_buffer=True, - use_cuda_pinned_place=None): + use_double_buffer=True): """ Create a Python reader for data feeding in Python @@ -664,9 +662,6 @@ def py_reader(capacity, name(basestring): The prefix Python queue name and Reader name. None will be generated automatically. use_double_buffer(bool): Whether use double buffer or not. - use_cuda_pinned_place(bool): Whether use cuda pinned place or not, - this option only works with double buffer and cuda enabled. - None will be enabled when double buffer and cuda are enabled. Returns: Variable: A Reader from which we can get feeding data. @@ -762,22 +757,13 @@ def py_reader(capacity, >>> except fluid.core.EOFException: >>> test_reader.reset() """ - if use_double_buffer and core.is_compiled_with_cuda(): - if use_cuda_pinned_place == None: - use_cuda_pinned_place = True - else: - if use_cuda_pinned_place: - raise RuntimeError( - "use_cuda_pinned_place can only be used with double buffer and cuda enabled." - ) return _py_reader( capacity=capacity, shapes=shapes, dtypes=dtypes, lod_levels=lod_levels, name=name, - use_double_buffer=use_double_buffer, - use_cuda_pinned_place=use_cuda_pinned_place) + use_double_buffer=use_double_buffer) def create_py_reader_by_data(capacity,