Clean buffered_allocator

test=develop
panyx0718-patch-1
Yu Yang 6 years ago
parent 26fb34c365
commit b59a9bfb7c

@ -22,41 +22,6 @@ namespace memory {
namespace allocation { namespace allocation {
BufferedAllocator::BufferedAllocator(std::unique_ptr<Allocator>&& allocator) { BufferedAllocator::BufferedAllocator(std::unique_ptr<Allocator>&& allocator) {
std::vector<size_t> division_plan(8 * sizeof(size_t));
for (size_t i = 0; i < 8 * sizeof(size_t); ++i) {
division_plan[i] = (static_cast<size_t>(1) << i);
}
InitAndEnforceCheck(std::move(allocator), division_plan);
}
BufferedAllocator::BufferedAllocator(std::unique_ptr<Allocator>&& allocator,
const std::vector<size_t>& division_plan) {
InitAndEnforceCheck(std::move(allocator), division_plan);
}
BufferedAllocator::~BufferedAllocator() { FlushImpl(); }
void BufferedAllocator::FlushImpl() {
for (auto& v : allocations_) {
for (auto& pair : v) {
underlying_allocator_->FreeUniquePtr(std::move(pair.second));
}
v.clear();
}
}
void BufferedAllocator::Flush() {
if (mtx_) {
std::lock_guard<std::mutex> lock(*mtx_);
FlushImpl();
} else {
FlushImpl();
}
}
void BufferedAllocator::InitAndEnforceCheck(
std::unique_ptr<Allocator>&& allocator,
const std::vector<size_t>& division_plan) {
underlying_allocator_.reset( underlying_allocator_.reset(
dynamic_cast<UnmanagedAllocator*>(allocator.release())); dynamic_cast<UnmanagedAllocator*>(allocator.release()));
PADDLE_ENFORCE_NOT_NULL( PADDLE_ENFORCE_NOT_NULL(
@ -65,141 +30,54 @@ void BufferedAllocator::InitAndEnforceCheck(
if (underlying_allocator_->IsAllocThreadSafe()) { if (underlying_allocator_->IsAllocThreadSafe()) {
mtx_.reset(new std::mutex()); mtx_.reset(new std::mutex());
} }
constexpr size_t kMax = std::numeric_limits<size_t>::max();
if (division_plan.empty()) {
division_plan_.assign({0, kMax});
} else {
auto from = division_plan.front() == 0 ? division_plan.begin() + 1
: division_plan.begin();
auto to = division_plan.back() == kMax ? division_plan.end() - 1
: division_plan.end();
division_plan_.reserve(to - from + 2);
division_plan_.push_back(0);
division_plan_.insert(division_plan_.end(), from, to);
division_plan_.push_back(kMax);
for (size_t i = 1; i < division_plan_.size(); ++i) {
PADDLE_ENFORCE_LT(division_plan_[i - 1], division_plan_[i],
"Division plan must be strictly sorted");
}
}
allocations_.resize(division_plan_.size() - 1);
}
void BufferedAllocator::InsertAllocationImpl(
std::unique_ptr<Allocation>&& allocation) {
auto size = allocation->size();
auto idx = GetListIndex(size);
allocations_[idx].emplace(size, std::move(allocation));
}
void BufferedAllocator::InsertAllocation(
std::unique_ptr<Allocation>&& allocation) {
if (mtx_) {
std::lock_guard<std::mutex> lock(*mtx_);
InsertAllocationImpl(std::move(allocation));
} else {
InsertAllocationImpl(std::move(allocation));
}
} }
bool BufferedAllocator::Match(size_t actual_size, size_t requested_size) { BufferedAllocator::~BufferedAllocator() { FreeCache(-1UL); }
return (actual_size >> 1) < requested_size;
}
size_t BufferedAllocator::GetListIndex(size_t size) {
auto it =
std::upper_bound(division_plan_.begin(), division_plan_.end(), size);
return static_cast<size_t>(it - division_plan_.begin()) - 1;
}
std::unique_ptr<Allocation> BufferedAllocator::RemoveAllocationImpl( std::unique_ptr<Allocation> BufferedAllocator::Allocate(size_t size,
size_t size) { Allocator::Attr attr) {
auto idx = GetListIndex(size); std::unique_ptr<Allocation> result;
auto& allocation_map = allocations_[idx]; {
auto it = allocation_map.lower_bound(size); platform::LockGuardPtr<std::mutex> guard(mtx_);
// Only remove allocation whose size is not more than twice of requested size auto it = allocations_.lower_bound(size);
if (it != allocation_map.end()) { if (it != allocations_.end() && it->first < size * 2) {
if (Match(it->second->size(), size)) { result = std::move(it->second);
auto ret = std::move(it->second); allocations_.erase(it);
allocation_map.erase(it);
return ret;
} else {
return nullptr;
}
} else {
while (++idx < allocations_.size() && Match(division_plan_[idx], size)) {
auto& allocation_map = allocations_[idx];
if (!allocation_map.empty()) {
auto it = allocation_map.begin();
if (Match(it->second->size(), size)) {
auto ret = std::move(it->second);
allocation_map.erase(it);
return ret;
} else {
return nullptr;
}
}
} }
return nullptr;
} }
}
std::unique_ptr<Allocation> BufferedAllocator::RemoveAllocation(size_t size) { if (result) {
if (mtx_) { return result;
std::lock_guard<std::mutex> lock(*mtx_);
return RemoveAllocationImpl(size);
} else {
return RemoveAllocationImpl(size);
} }
}
std::unique_ptr<Allocation> BufferedAllocator::Allocate(size_t size, try {
Allocator::Attr attr) { return underlying_allocator_->Allocate(size, attr);
auto ret = RemoveAllocation(size); } catch (BadAlloc&) {
if (!ret) { FreeCache(size);
try { return underlying_allocator_->Allocate(size, attr);
return underlying_allocator_->Allocate(size, attr);
} catch (BadAlloc&) {
// if allocation failed, try to free some memorys from buffers
FreeAllocations(size);
return underlying_allocator_->Allocate(size, attr);
}
} }
return ret;
} }
void BufferedAllocator::FreeAllocationsImpl(size_t size) { void BufferedAllocator::FreeCache(size_t size) {
platform::LockGuardPtr<std::mutex> guard(mtx_);
if (UNLIKELY(size == 0)) return; if (UNLIKELY(size == 0)) return;
size_t cur = 0; size_t cur = 0;
for (auto& alloc_map : allocations_) { while (!allocations_.empty()) { // free the largest
// use reverse iterator to free large allocations first auto it = --allocations_.end();
while (!alloc_map.empty()) { cur += it->second->size();
auto it = --(alloc_map.end()); underlying_allocator_->FreeUniquePtr(std::move(it->second));
cur += it->second->size(); allocations_.erase(it);
underlying_allocator_->FreeUniquePtr(std::move(it->second)); if (cur >= size) return;
alloc_map.erase(it);
if (cur >= size) return;
}
}
}
void BufferedAllocator::FreeAllocations(size_t size) {
if (mtx_) {
std::lock_guard<std::mutex> lock(*mtx_);
FreeAllocationsImpl(size);
} else {
FreeAllocationsImpl(size);
} }
} }
void BufferedAllocator::FreeUniquePtr(std::unique_ptr<Allocation> allocation) { void BufferedAllocator::FreeUniquePtr(std::unique_ptr<Allocation> allocation) {
InsertAllocation(std::move(allocation)); platform::LockGuardPtr<std::mutex> guard(mtx_);
allocations_.emplace(allocation->size(), std::move(allocation));
} }
bool BufferedAllocator::IsAllocThreadSafe() const { return mtx_ != nullptr; } bool BufferedAllocator::IsAllocThreadSafe() const {
return this->underlying_allocator_->IsAllocThreadSafe();
const std::vector<size_t>& BufferedAllocator::GetDivisionPlan() const {
return division_plan_;
} }
} // namespace allocation } // namespace allocation

@ -19,6 +19,7 @@
#include <memory> #include <memory>
#include <vector> #include <vector>
#include "paddle/fluid/memory/allocation/allocator.h" #include "paddle/fluid/memory/allocation/allocator.h"
#include "paddle/fluid/platform/lock_guard_ptr.h"
namespace paddle { namespace paddle {
namespace memory { namespace memory {
@ -32,9 +33,6 @@ class BufferedAllocator : public UnmanagedAllocator {
public: public:
explicit BufferedAllocator(std::unique_ptr<Allocator>&& allocator); explicit BufferedAllocator(std::unique_ptr<Allocator>&& allocator);
BufferedAllocator(std::unique_ptr<Allocator>&& allocator,
const std::vector<size_t>& division_plan);
~BufferedAllocator(); ~BufferedAllocator();
std::unique_ptr<Allocation> Allocate( std::unique_ptr<Allocation> Allocate(
@ -44,31 +42,14 @@ class BufferedAllocator : public UnmanagedAllocator {
bool IsAllocThreadSafe() const override; bool IsAllocThreadSafe() const override;
const std::vector<size_t>& GetDivisionPlan() const; // only used in unittest
inline void ClearCache() { FreeCache(-1UL); }
void Flush();
private: private:
void InitAndEnforceCheck(std::unique_ptr<Allocator>&& allocator, void FreeCache(size_t size);
const std::vector<size_t>& division_plan);
void InsertAllocation(std::unique_ptr<Allocation>&& allocation);
void InsertAllocationImpl(std::unique_ptr<Allocation>&& allocation);
static bool Match(size_t actual_size, size_t requested_size);
std::unique_ptr<Allocation> RemoveAllocation(size_t size);
std::unique_ptr<Allocation> RemoveAllocationImpl(size_t size);
void FreeAllocations(size_t size);
void FreeAllocationsImpl(size_t size);
void FlushImpl();
size_t GetListIndex(size_t size);
std::unique_ptr<UnmanagedAllocator> underlying_allocator_; std::unique_ptr<UnmanagedAllocator> underlying_allocator_;
std::vector<std::multimap<size_t, std::unique_ptr<Allocation>>> allocations_; std::multimap<size_t, std::unique_ptr<Allocation>> allocations_;
std::vector<size_t> division_plan_;
std::unique_ptr<std::mutex> mtx_; std::unique_ptr<std::mutex> mtx_;
}; };

@ -124,7 +124,7 @@ TEST(buffered_allocator, lazy_free) {
{ {
underlying_allocator->ResetCounter(); underlying_allocator->ResetCounter();
allocator->Flush(); allocator->ClearCache();
ASSERT_EQ(underlying_allocator->GetAllocCount(), kZero); ASSERT_EQ(underlying_allocator->GetAllocCount(), kZero);
ASSERT_EQ(underlying_allocator->GetFreeCount(), kTwo); ASSERT_EQ(underlying_allocator->GetFreeCount(), kTwo);
} }

@ -30,9 +30,10 @@ DEFINE_bool(init_allocated_mem, false,
"during unit testing."); "during unit testing.");
DECLARE_double(fraction_of_gpu_memory_to_use); DECLARE_double(fraction_of_gpu_memory_to_use);
DEFINE_bool(use_legacy_allocator, true, DEFINE_string(
"Whether to use the legacy allocator. If the new allocators have" allocator_strategy, "legacy",
"been well tested, we should remove these flag."); "The allocation strategy. Legacy means the original allocator of Fluid."
"New means the experimental allocators of Fluid. in [legacy, new]");
namespace paddle { namespace paddle {
namespace memory { namespace memory {
@ -274,15 +275,11 @@ size_t Usage::operator()(const platform::CUDAPinnedPlace& cuda_pinned) const {
#endif #endif
} }
size_t memory_usage(const platform::Place& p) {
return boost::apply_visitor(Usage(), p);
}
class LegacyAllocation : public Allocation { class LegacyAllocation : public Allocation {
public: public:
using Allocation::Allocation; using Allocation::Allocation;
~LegacyAllocation() { ~LegacyAllocation() final {
boost::apply_visitor(FreeVisitor(this->ptr()), this->place()); boost::apply_visitor(FreeVisitor(this->ptr()), this->place());
} }
}; };
@ -291,7 +288,7 @@ class LegacyAllocation : public Allocation {
std::shared_ptr<Allocation> AllocShared(const platform::Place& place, std::shared_ptr<Allocation> AllocShared(const platform::Place& place,
size_t size, Allocator::Attr attr) { size_t size, Allocator::Attr attr) {
if (FLAGS_use_legacy_allocator) { if (FLAGS_allocator_strategy == "legacy") {
void* p = boost::apply_visitor(legacy::AllocVisitor(size), place); void* p = boost::apply_visitor(legacy::AllocVisitor(size), place);
return std::shared_ptr<Allocation>( return std::shared_ptr<Allocation>(
new legacy::LegacyAllocation(p, size, place)); new legacy::LegacyAllocation(p, size, place));
@ -303,7 +300,7 @@ std::shared_ptr<Allocation> AllocShared(const platform::Place& place,
std::unique_ptr<Allocation> Alloc(const platform::Place& place, size_t size, std::unique_ptr<Allocation> Alloc(const platform::Place& place, size_t size,
Allocator::Attr attr) { Allocator::Attr attr) {
if (FLAGS_use_legacy_allocator) { if (FLAGS_allocator_strategy == "legacy") {
void* p = boost::apply_visitor(legacy::AllocVisitor(size), place); void* p = boost::apply_visitor(legacy::AllocVisitor(size), place);
return std::unique_ptr<Allocation>( return std::unique_ptr<Allocation>(
new legacy::LegacyAllocation(p, size, place)); new legacy::LegacyAllocation(p, size, place));

@ -13,6 +13,7 @@
// limitations under the License. // limitations under the License.
#include "paddle/fluid/operators/reader/reader_op_registry.h" #include "paddle/fluid/operators/reader/reader_op_registry.h"
#include "paddle/fluid/platform/lock_guard_ptr.h"
#include "paddle/fluid/recordio/scanner.h" #include "paddle/fluid/recordio/scanner.h"
namespace paddle { namespace paddle {
@ -33,11 +34,7 @@ class RecordIOFileReader : public framework::FileReader {
protected: protected:
void ReadNextImpl(std::vector<framework::LoDTensor>* out) override { void ReadNextImpl(std::vector<framework::LoDTensor>* out) override {
std::unique_ptr<std::lock_guard<std::mutex>> guard; platform::LockGuardPtr<std::mutex> guard(mutex_);
if (ThreadSafe) {
guard.reset(new std::lock_guard<std::mutex>(*mutex_));
}
bool ok = framework::ReadFromRecordIO(&scanner_, dev_ctx_, out); bool ok = framework::ReadFromRecordIO(&scanner_, dev_ctx_, out);
if (!ok) { if (!ok) {
out->clear(); out->clear();

@ -0,0 +1,55 @@
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <stdint.h>
#include <memory>
#include <mutex> // NOLINT
namespace paddle {
namespace platform {
/**
* LockGuard for std::unique_ptr<LockType>. It will do nothing when guarded ptr
* is nullptr.
*
* The advantage of using `LockGuardPtr` instead of
* std::unique<std::lock_guard<lock_type>> is this type is totally a stack
* variable. There is no heap allocation at all.
*/
template <typename LockType>
class LockGuardPtr {
using LockGuardType = std::lock_guard<LockType>;
public:
class LockGuardDeleter {
public:
void operator()(LockGuardType* guard) { guard->~LockGuardType(); }
};
explicit LockGuardPtr(std::unique_ptr<LockType>& lock_ptr) // NOLINT
: guard_ptr_(lock_ptr ? new (guard_buffer_) LockGuardType(*lock_ptr)
: nullptr) {}
LockGuardPtr(const LockGuardPtr&) = delete;
LockGuardPtr& operator=(const LockGuardPtr&) = delete;
LockGuardPtr(LockGuardPtr&&) = delete;
LockGuardPtr& operator=(LockGuardPtr&&) = delete;
private:
uint8_t guard_buffer_[sizeof(LockGuardType)];
std::unique_ptr<LockGuardType, LockGuardDeleter> guard_ptr_;
};
} // namespace platform
} // namespace paddle

@ -27,10 +27,12 @@ int main(int argc, char** argv) {
new_argv.push_back(argv[i]); new_argv.push_back(argv[i]);
} }
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
new_argv.push_back(strdup("--tryfromenv=fraction_of_gpu_memory_to_use")); new_argv.push_back(
strdup("--tryfromenv=fraction_of_gpu_memory_to_use,allocator_strategy"));
#else #else
new_argv.push_back(strdup( new_argv.push_back(
"--tryfromenv=use_pinned_memory,use_mkldnn,initial_cpu_memory_in_mb")); strdup("--tryfromenv=use_pinned_memory,use_mkldnn,initial_cpu_memory_in_"
"mb,allocator_strategy"));
new_argv.push_back(strdup("--undefok=use_mkldnn,initial_cpu_memory_in_mb")); new_argv.push_back(strdup("--undefok=use_mkldnn,initial_cpu_memory_in_mb"));
#endif #endif
int new_argc = static_cast<int>(new_argv.size()); int new_argc = static_cast<int>(new_argv.size());

@ -114,7 +114,7 @@ def __bootstrap__():
'eager_delete_scope', 'use_mkldnn', 'initial_cpu_memory_in_mb', 'eager_delete_scope', 'use_mkldnn', 'initial_cpu_memory_in_mb',
'init_allocated_mem', 'free_idle_memory', 'paddle_num_threads', 'init_allocated_mem', 'free_idle_memory', 'paddle_num_threads',
"dist_threadpool_size", 'cpu_deterministic', 'eager_delete_tensor_gb', "dist_threadpool_size", 'cpu_deterministic', 'eager_delete_tensor_gb',
'use_legacy_allocator', 'reader_queue_speed_test_mode' 'allocator_strategy', 'reader_queue_speed_test_mode'
] ]
if core.is_compiled_with_dist(): if core.is_compiled_with_dist():
read_env_flags.append('rpc_deadline') read_env_flags.append('rpc_deadline')

Loading…
Cancel
Save