|
|
|
@ -128,9 +128,6 @@ void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
|
|
|
|
|
PADDLE_THROW("There is no next data!");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (local_buffer_.payloads_.empty()) {
|
|
|
|
|
buffer_->Receive(&local_buffer_);
|
|
|
|
|
}
|
|
|
|
|
*out = local_buffer_.payloads_;
|
|
|
|
|
local_buffer_.payloads_.clear();
|
|
|
|
|
if (local_buffer_.ctx_) {
|
|
|
|
@ -149,21 +146,30 @@ void DoubleBufferReader::ReInit() {
|
|
|
|
|
void DoubleBufferReader::PrefetchThreadFunc() {
|
|
|
|
|
VLOG(5) << "A new prefetch thread starts.";
|
|
|
|
|
size_t gpu_ctx_offset = 0;
|
|
|
|
|
std::vector<std::vector<framework::LoDTensor>> cpu_tensor_cache(4);
|
|
|
|
|
std::vector<std::vector<framework::LoDTensor>> gpu_tensor_cache(4);
|
|
|
|
|
size_t tensor_cache_id = 0;
|
|
|
|
|
|
|
|
|
|
while (reader_->HasNext()) {
|
|
|
|
|
Item batch;
|
|
|
|
|
reader_->ReadNext(&batch.payloads_);
|
|
|
|
|
if (platform::is_gpu_place(place_)) {
|
|
|
|
|
std::vector<framework::LoDTensor> gpu_batch;
|
|
|
|
|
tensor_cache_id %= 4;
|
|
|
|
|
auto& gpu_batch = gpu_tensor_cache[tensor_cache_id];
|
|
|
|
|
auto& cpu_batch = cpu_tensor_cache[tensor_cache_id];
|
|
|
|
|
cpu_batch = batch.payloads_;
|
|
|
|
|
++tensor_cache_id;
|
|
|
|
|
|
|
|
|
|
auto& gpu_ctx = this->ctxs_[gpu_ctx_offset++];
|
|
|
|
|
gpu_ctx_offset %= this->ctxs_.size();
|
|
|
|
|
|
|
|
|
|
gpu_batch.resize(batch.payloads_.size());
|
|
|
|
|
for (size_t i = 0; i < batch.payloads_.size(); ++i) {
|
|
|
|
|
framework::TensorCopy(batch.payloads_[i], place_, *gpu_ctx,
|
|
|
|
|
&gpu_batch[i]);
|
|
|
|
|
for (size_t i = 0; i < cpu_batch.size(); ++i) {
|
|
|
|
|
framework::TensorCopy(cpu_batch[i], place_, *gpu_ctx, &gpu_batch[i]);
|
|
|
|
|
gpu_batch[i].set_lod(batch.payloads_[i].lod());
|
|
|
|
|
}
|
|
|
|
|
batch.ctx_ = gpu_ctx.get();
|
|
|
|
|
std::swap(gpu_batch, batch.payloads_);
|
|
|
|
|
batch.payloads_ = gpu_batch;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|