Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into some_small_fixes

port
fengjiayi 7 years ago
commit 9dccca963b

@ -23,7 +23,7 @@ repos:
- id: clang-format-with-version-check
name: clang-format
description: Format files with ClangFormat.
entry: bash ./.clang_format.hook -i
entry: bash ./tools/codestyle/clang_format.hook -i
language: system
files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto)$
- repo: local
@ -52,7 +52,7 @@ repos:
hooks:
- id: copyright_checker
name: copyright_checker
entry: python ./.copyright.hook
entry: python ./tools/codestyle/copyright.hook
language: system
files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto|py)$
exclude: (?!.*third_party)^.*$ | (?!.*book)^.*$

@ -1,11 +1,18 @@
FROM nvidia/cuda:9.0-cudnn7-devel-ubuntu16.04
# Use UBUNTU_MIRROR can speed up apt-get speed.
# ARG UBUNTU_MIRROR
# RUN /bin/bash -c 'if [[ -n ${UBUNTU_MIRROR} ]]; then sed -i 's#http://archive.ubuntu.com/ubuntu#${UBUNTU_MIRROR}#g' /etc/apt/sources.list; fi'
RUN apt-get update && apt-get install -y python python-pip iputils-ping libgtk2.0-dev wget vim net-tools iftop python-opencv
RUN ln -s /usr/lib/x86_64-linux-gnu/libcudnn.so.7 /usr/lib/libcudnn.so && ln -s /usr/lib/x86_64-linux-gnu/libnccl.so.2 /usr/lib/libnccl.so
RUN pip install -U pip
RUN pip install -U kubernetes paddlepaddle
# IMPORTANT:
# Add "ENV http_proxy=http://ip:port" if your download is slow, and don't forget to unset it at runtime.
# exmaple: unset http_proxy && unset https_proxy && python fluid_benchmark.py ...
RUN pip install -U pip
RUN pip install -U kubernetes paddlepaddle
RUN sh -c 'echo "import paddle.v2 as paddle\npaddle.dataset.cifar.train10()\npaddle.dataset.flowers.fetch()" | python'
RUN sh -c 'echo "import paddle.v2 as paddle\npaddle.dataset.mnist.train()\npaddle.dataset.mnist.test()\npaddle.dataset.imdb.fetch()" | python'
@ -14,9 +21,11 @@ RUN pip uninstall -y paddlepaddle && mkdir /workspace
ADD https://raw.githubusercontent.com/PaddlePaddle/cloud/develop/docker/paddle_k8s /usr/bin
ADD https://raw.githubusercontent.com/PaddlePaddle/cloud/develop/docker/k8s_tools.py /root
RUN chmod +x /usr/bin/paddle_k8s
ADD *.whl /
RUN pip install /*.whl && rm -f /*.whl && chmod +x /usr/bin/paddle_k8s
RUN pip install /*.whl && rm -f /*.whl
ENV LD_LIBRARY_PATH=/usr/local/lib
ADD fluid_benchmark.py recordio_converter.py models/ /workspace/
ADD fluid_benchmark.py recordio_converter.py args.py recordio_converter.py run.sh run_fluid_benchmark.sh /workspace/
ADD models/ /workspace/models/

@ -97,7 +97,7 @@ def dist_transpile(trainer_id, args):
return train_program, fluid.default_startup_program()
else:
raise ValueError(
'TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
'PADDLE_TRAINING_ROLE environment variable must be either TRAINER or PSERVER'
)
@ -264,8 +264,6 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
break
else:
loss, = exe.run([avg_loss.name], feed=feeder.feed(data))
if args.update_method == "pserver":
exe.bcast_params()
if args.use_reader_op:
num_samples += args.batch_size * args.gpus
else:
@ -301,9 +299,18 @@ def print_train_time(start_time, end_time, num_samples):
(num_samples, train_elapsed, examples_per_sec))
def print_paddle_envs():
print('----------- Configuration envs -----------')
for k in os.environ:
if "PADDLE_" in k:
print "ENV %s:%s" % (k, os.environ[k])
print('------------------------------------------------')
def main():
args = parse_args()
print_arguments(args)
print_paddle_envs()
# the unique trainer id, starting from 0, needed by trainer
# only

@ -17,6 +17,7 @@ import copy
import argparse
import random
import os
import copy
from kube_templates import pserver, trainer, envs
@ -108,10 +109,9 @@ def gen_job():
tn_container["ports"][0]["containerPort"] = spreadport
envs.append({"name": "PADDLE_JOB_NAME", "value": args.jobname})
envs.append({"name": "TRAINERS", "value": str(args.trainers)})
envs.append({"name": "PSERVERS", "value": str(args.pservers)})
envs.append({"name": "PADDLE_TRAINERS", "value": str(args.trainers)})
envs.append({"name": "PADDLE_PSERVERS", "value": str(args.pservers)})
envs.append({"name": "ENTRY", "value": args.entry})
envs.append({"name": "PADDLE_INIT_PORT", "value": str(args.port)})
envs.append({"name": "PADDLE_PSERVER_PORT", "value": str(args.port)})
# NOTE: these directories below are cluster specific, please modify
# this settings before you run on your own cluster.
@ -166,17 +166,23 @@ def gen_job():
tn["spec"]["template"]["spec"]["volumes"] = volumes
tn_container["volumeMounts"] = volumeMounts
ps_container["env"] = envs
ps_container["env"].append({"name": "TRAINING_ROLE", "value": "PSERVER"})
ps_container["env"] = copy.deepcopy(envs)
ps_container["env"].append({
"name": "PADDLE_TRAINING_ROLE",
"value": "PSERVER"
})
tn_container["env"] = envs
if args.disttype == "pserver":
tn_container["env"].append({
"name": "TRAINING_ROLE",
"name": "PADDLE_TRAINING_ROLE",
"value": "TRAINER"
})
elif args.disttype == "nccl2" or args.disttype == "local":
# NCCL2 have no training role, set to plain WORKER
tn_container["env"].append({"name": "TRAINING_ROLE", "value": "WORKER"})
tn_container["env"].append({
"name": "PADDLE_TRAINING_ROLE",
"value": "WORKER"
})
os.mkdir(args.jobname)
if args.disttype == "pserver":

@ -45,7 +45,8 @@ IF(${CBLAS_PROVIDER} STREQUAL "MKLML")
ELSE()
MESSAGE(FATAL_ERROR "Should enable MKLML when build MKLDNN")
ENDIF()
SET(MKLDNN_FLAG "-Wno-error=strict-overflow -Wno-error=unused-result -Wno-unused-result")
SET(MKLDNN_FLAG "-Wno-error=strict-overflow -Wno-error=unused-result")
SET(MKLDNN_FLAG "${MKLDNN_FLAG} -Wno-unused-result -Wno-unused-value")
SET(MKLDNN_CFLAG "${CMAKE_C_FLAGS} ${MKLDNN_FLAG}")
SET(MKLDNN_CXXFLAG "${CMAKE_CXX_FLAGS} ${MKLDNN_FLAG}")
ExternalProject_Add(

@ -1,7 +1,7 @@
#!/bin/bash
python gen_doc.py layers --submodules control_flow device io nn ops tensor detection learning_rate_scheduler metric > layers.rst
for module in data_feeder clip metrics executor initializer io nets optimizer param_attr profiler regularizer
for module in data_feeder clip metrics executor initializer io nets optimizer param_attr profiler regularizer transpiler
do
python gen_doc.py ${module} > ${module}.rst
done

@ -0,0 +1,46 @@
.. THIS FILE IS GENERATED BY `gen_doc.{py|sh}`
!DO NOT EDIT THIS FILE MANUALLY!
==========
transpiler
==========
DistributeTranspiler
--------------------
.. autoclass:: paddle.fluid.transpiler.DistributeTranspiler
:members:
:noindex:
InferenceTranspiler
-------------------
.. autoclass:: paddle.fluid.transpiler.InferenceTranspiler
:members:
:noindex:
memory_optimize
---------------
.. autofunction:: paddle.fluid.transpiler.memory_optimize
:noindex:
release_memory
--------------
.. autofunction:: paddle.fluid.transpiler.release_memory
:noindex:
HashName
--------
.. autoclass:: paddle.fluid.transpiler.HashName
:members:
:noindex:
RoundRobin
----------
.. autoclass:: paddle.fluid.transpiler.RoundRobin
:members:
:noindex:

@ -168,13 +168,13 @@ cd /paddle/python/paddle/fluid/tests/book
第二步启动Parameter Server
```bash
PADDLE_INIT_PORT=6174 PADDLE_INIT_PSERVERS=192.168.1.2 TRAINERS=2 POD_IP=192.168.1.2 PADDLE_INIT_TRAINER_ID=1 TRAINING_ROLE=PSERVER python test_fit_a_line.py
PADDLE_PSERVER_PORT=6174 PADDLE_PSERVER_IPS=192.168.1.2 PADDLE_TRAINERS=2 PADDLE_CURRENT_IP=192.168.1.2 PADDLE_TRAINER_ID=1 PADDLE_TRAINING_ROLE=PSERVER python test_fit_a_line.py
```
执行命令后请等待出现提示: ```Server listening on 192.168.1.2:6174 ```, 表示Paramter Server已经正常启动。
第三步启动Trainer
```bash
PADDLE_INIT_PORT=6174 PADDLE_INIT_PSERVERS=192.168.1.3 TRAINERS=2 POD_IP=192.168.1.3 PADDLE_INIT_TRAINER_ID=1 TRAINING_ROLE=TRAINER python test_fit_a_line.py
PADDLE_PSERVER_PORT=6174 PADDLE_PSERVER_IPS=192.168.1.3 PADDLE_TRAINERS=2 PADDLE_CURRENT_IPP=192.168.1.3 PADDLE_TRAINER_ID=1 PADDLE_TRAINING_ROLE=TRAINER python test_fit_a_line.py
```
由于我们定义的Trainer的数量是2个因此需要在另外一个计算节点上再启动一个Trainer。

@ -114,8 +114,8 @@ def gen_train_list(file_pattern, trainers, trainer_id):
ret_list.append(f)
return ret_list
trainers = int(os.getenv("TRAINERS"))
trainer_id = int(os.getenv("PADDLE_INIT_TRAINER_ID"))
trainers = int(os.getenv("PADDLE_TRAINERS"))
trainer_id = int(os.getenv("PADDLE_TRAINER_ID"))
data_file = fluid.layers.io.open_files(
filenames=gen_train_list("./mnist-[0-9]*.recordio", 2, 0),
thread_num=1,

@ -13,6 +13,7 @@ cpu_noavx_openblas `fluid.tgz <https://guest:@paddleci.ngrok.io/repository
cuda7.5_cudnn5_avx_mkl `fluid.tgz <https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_Cuda75cudnn5cp27cp27mu/.lastSuccessful/fluid.tgz>`_
cuda8.0_cudnn5_avx_mkl `fluid.tgz <https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_Cuda80cudnn5cp27cp27mu/.lastSuccessful/fluid.tgz>`_
cuda8.0_cudnn7_avx_mkl `fluid.tgz <https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_Cuda8cudnn7cp27cp27mu/.lastSuccessful/fluid.tgz>`_
cuda9.0_cudnn7_avx_mkl `fluid.tgz <https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_Cuda90cudnn7avxMkl/.lastSuccessful/fluid.tgz>`_
====================== ========================================
从源码编译

@ -40,10 +40,9 @@ void Main(bool use_gpu) {
//# 2. Prepare input.
int64_t data[4] = {1, 2, 3, 4};
PaddleBuf buf{.data = data, .length = sizeof(data)};
PaddleTensor tensor{.name = "",
.shape = std::vector<int>({4, 1}),
.data = buf,
.data = PaddleBuf(data, sizeof(data)),
.dtype = PaddleDType::INT64};
// For simplicity, we set all the slots with the same data.
@ -55,14 +54,12 @@ void Main(bool use_gpu) {
//# 4. Get output.
ASSERT_EQ(outputs.size(), 1UL);
LOG(INFO) << "output buffer size: " << outputs.front().data.length;
const size_t num_elements = outputs.front().data.length / sizeof(float);
LOG(INFO) << "output buffer size: " << outputs.front().data.length();
const size_t num_elements = outputs.front().data.length() / sizeof(float);
// The outputs' buffers are in CPU memory.
for (size_t i = 0; i < std::min(5UL, num_elements); i++) {
LOG(INFO) << static_cast<float*>(outputs.front().data.data)[i];
LOG(INFO) << static_cast<float*>(outputs.front().data.data())[i];
}
// TODO(Superjomn): this is should be free automatically
free(outputs[0].data.data);
}
}
@ -86,10 +83,9 @@ void MainThreads(int num_threads, bool use_gpu) {
for (int batch_id = 0; batch_id < num_batches; ++batch_id) {
// 2. Dummy Input Data
int64_t data[4] = {1, 2, 3, 4};
PaddleBuf buf{.data = data, .length = sizeof(data)};
PaddleTensor tensor{.name = "",
.shape = std::vector<int>({4, 1}),
.data = buf,
.data = PaddleBuf(data, sizeof(data)),
.dtype = PaddleDType::INT64};
std::vector<PaddleTensor> inputs(4, tensor);
std::vector<PaddleTensor> outputs;
@ -99,13 +95,13 @@ void MainThreads(int num_threads, bool use_gpu) {
// 4. Get output.
ASSERT_EQ(outputs.size(), 1UL);
LOG(INFO) << "TID: " << tid << ", "
<< "output buffer size: " << outputs.front().data.length;
const size_t num_elements = outputs.front().data.length / sizeof(float);
<< "output buffer size: " << outputs.front().data.length();
const size_t num_elements =
outputs.front().data.length() / sizeof(float);
// The outputs' buffers are in CPU memory.
for (size_t i = 0; i < std::min(5UL, num_elements); i++) {
LOG(INFO) << static_cast<float*>(outputs.front().data.data)[i];
LOG(INFO) << static_cast<float*>(outputs.front().data.data())[i];
}
free(outputs[0].data.data);
}
});
}

@ -0,0 +1,59 @@
# Inference High-level APIs
This document describes the high-level inference APIs one can use to easily deploy a Paddle model for an application.
The APIs are described in `paddle_inference_api.h`, just one header file, and two libaries `libpaddle_fluid.so` and `libpaddle_fluid_api.so` are needed.
## PaddleTensor
We provide the `PaddleTensor` data structure is to give a general tensor interface.
The definition is
```c++
struct PaddleTensor {
std::string name; // variable name.
std::vector<int> shape;
PaddleBuf data; // blob of data.
PaddleDType dtype;
};
```
The data is stored in a continuous memory `PaddleBuf`, and tensor's data type is specified by a `PaddleDType`.
The `name` field is used to specify the name of input variable,
that is important when there are multiple inputs and need to distiuish which variable to set.
## engine
The inference APIs has two different underlying implementation, currently there are two valid engines:
- the native engine, which is consists of the native operators and framework,
- the Anakin engine, which is a Anakin library embeded.
The native engine takes a native Paddle model as input, and supports any model that trained by Paddle,
but the Anakin engine can only take the Anakin model as input(user need to manully transform the format first) and currently not all Paddle models are supported.
```c++
enum class PaddleEngineKind {
kNative = 0, // Use the native Fluid facility.
kAnakin, // Use Anakin for inference.
};
```
## PaddlePredictor and how to create one
The main interface is `PaddlePredictor`, there are following methods
- `bool Run(const std::vector<PaddleTensor>& inputs, std::vector<PaddleTensor>* output_data)`
- take inputs and output `output_data`
- `Clone` to clone a predictor from an existing one, with model parameter shared.
There is a factory method to help create a predictor, and the user takes the ownership of this object.
```c++
template <typename ConfigT, PaddleEngineKind engine = PaddleEngineKind::kNative>
std::unique_ptr<PaddlePredictor> CreatePaddlePredictor(const ConfigT& config);
```
By specifying the engine kind and config, one can get an specific implementation.
## Reference
- [paddle_inference_api.h](./paddle_inference_api.h)
- [demos](./demo)

@ -13,3 +13,53 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/contrib/inference/paddle_inference_api.h"
namespace paddle {
PaddleBuf::PaddleBuf(PaddleBuf&& other)
: data_(other.data_),
length_(other.length_),
memory_owned_(other.memory_owned_) {
other.memory_owned_ = false;
other.data_ = nullptr;
other.length_ = 0;
}
PaddleBuf::PaddleBuf(const PaddleBuf& other) { *this = other; }
PaddleBuf& PaddleBuf::operator=(const PaddleBuf& other) {
// only the buffer with external memory can be copied
assert(!other.memory_owned_);
data_ = other.data_;
length_ = other.length_;
memory_owned_ = other.memory_owned_;
return *this;
}
void PaddleBuf::Resize(size_t length) {
// Only the owned memory can be reset, the external memory can't be changed.
if (length_ == length) return;
assert(memory_owned_);
Free();
data_ = new char[length];
length_ = length;
memory_owned_ = true;
}
void PaddleBuf::Reset(void* data, size_t length) {
Free();
memory_owned_ = false;
data_ = data;
length_ = length;
}
void PaddleBuf::Free() {
if (memory_owned_ && data_) {
assert(length_ > 0);
delete static_cast<char*>(data_);
data_ = nullptr;
length_ = 0;
}
}
} // namespace paddle

@ -21,6 +21,7 @@ limitations under the License. */
#pragma once
#include <cassert>
#include <memory>
#include <string>
#include <vector>
@ -32,12 +33,38 @@ enum PaddleDType {
INT64,
};
struct PaddleBuf {
void* data; // pointer to the data memory.
size_t length; // number of memory bytes.
class PaddleBuf {
public:
PaddleBuf() = default;
PaddleBuf(PaddleBuf&& other);
// Copy only available when memory is managed externally.
explicit PaddleBuf(const PaddleBuf&);
PaddleBuf& operator=(const PaddleBuf&);
// Do not own the memory.
PaddleBuf(void* data, size_t length)
: data_(data), length_(length), memory_owned_{false} {}
// Own memory.
PaddleBuf(size_t length)
: data_(new char[length]), length_(length), memory_owned_(true) {}
// Resize to `length` bytes.
void Resize(size_t length);
// Reset to external memory.
void Reset(void* data, size_t length);
bool empty() const { return length_ == 0; }
void* data() const { return data_; }
size_t length() const { return length_; }
~PaddleBuf() { Free(); }
private:
void Free();
void* data_{nullptr}; // pointer to the data memory.
size_t length_{0}; // number of memory bytes.
bool memory_owned_{true};
};
struct PaddleTensor {
PaddleTensor() = default;
std::string name; // variable name.
std::vector<int> shape;
// TODO(Superjomn) for LoD support, add a vector<vector<int>> field if needed.
@ -67,8 +94,9 @@ class PaddlePredictor {
// Predict an record.
// The caller should be responsible for allocating and releasing the memory of
// `inputs`. `inputs` should be alive until Run returns. caller should be
// responsible for releasing the memory of `output_data`.
// `inputs`. `inputs` should be available until Run returns. Caller should be
// responsible for the output tensor's buffer, either allocated or passed from
// outside.
virtual bool Run(const std::vector<PaddleTensor>& inputs,
std::vector<PaddleTensor>* output_data) = 0;
@ -81,8 +109,7 @@ class PaddlePredictor {
// The common configs for all the predictors.
struct Config {
std::string model_dir; // path to the model directory.
bool enable_engine{false}; // Enable to execute (part of) the model on
std::string model_dir; // path to the model directory.
};
};

@ -48,7 +48,7 @@ bool PaddleInferenceAnakinPredictor::Run(
auto d_tensor_in_p = executor_.get_in(input.name);
float *d_data_p = d_tensor_in_p->mutable_data();
if (cudaMemcpy(d_data_p,
static_cast<float *>(input.data.data),
static_cast<float *>(input.data.data()),
d_tensor_in_p->valid_size() * sizeof(float),
cudaMemcpyHostToDevice) != 0) {
LOG(ERROR) << "copy data from CPU to GPU error";
@ -65,8 +65,11 @@ bool PaddleInferenceAnakinPredictor::Run(
for (auto &output : *output_data) {
auto *tensor = executor_.get_out(output.name);
output.shape = tensor->shape();
if (output.data.length() < tensor->valid_size() * sizeof(float)) {
output.data.Resize(tensor->valid_size() * sizeof(float));
}
// Copy data from GPU -> CPU
if (cudaMemcpy(output.data.data,
if (cudaMemcpy(output.data.data(),
tensor->mutable_data(),
tensor->valid_size() * sizeof(float),
cudaMemcpyDeviceToHost) != 0) {

@ -37,28 +37,26 @@ TEST(inference, anakin) {
float data[1 * 3 * 224 * 224] = {1.0f};
PaddleBuf buf{.data = data, .length = sizeof(data)};
PaddleTensor tensor{.name = "input_0",
.shape = std::vector<int>({1, 3, 224, 224}),
.data = buf,
.data = PaddleBuf(data, sizeof(data)),
.dtype = PaddleDType::FLOAT32};
// For simplicity, we set all the slots with the same data.
std::vector<PaddleTensor> paddle_tensor_feeds(1, tensor);
std::vector<PaddleTensor> paddle_tensor_feeds;
paddle_tensor_feeds.emplace_back(std::move(tensor));
float data_out[1000];
PaddleBuf buf_out{.data = data_out, .length = sizeof(data)};
PaddleTensor tensor_out{.name = "prob_out",
.shape = std::vector<int>({1000, 1}),
.data = buf_out,
.data = PaddleBuf(),
.dtype = PaddleDType::FLOAT32};
std::vector<PaddleTensor> outputs(1, tensor_out);
std::vector<PaddleTensor> outputs;
outputs.emplace_back(std::move(tensor_out));
ASSERT_TRUE(predictor->Run(paddle_tensor_feeds, &outputs));
float* data_o = static_cast<float*>(outputs[0].data.data);
float* data_o = static_cast<float*>(outputs[0].data.data());
for (size_t j = 0; j < 1000; ++j) {
LOG(INFO) << "output[" << j << "]: " << data_o[j];
}

@ -178,8 +178,8 @@ bool NativePaddlePredictor::SetFeed(const std::vector<PaddleTensor> &inputs,
// TODO(panyx0718): Init LoDTensor from existing memcpy to save a copy.
std::memcpy(static_cast<void *>(input_ptr),
inputs[i].data.data,
inputs[i].data.length);
inputs[i].data.data(),
inputs[i].data.length());
feeds->push_back(input);
}
return true;
@ -241,10 +241,11 @@ bool NativePaddlePredictor::GetFetch(
}
outputs->at(i).shape = shape;
outputs->at(i).data.length = sizeof(float) * data.size();
outputs->at(i).data.data = malloc(outputs->at(i).data.length);
std::memcpy(
outputs->at(i).data.data, data.data(), outputs->at(i).data.length);
auto &buffer = outputs->at(i).data;
if (buffer.empty() || buffer.length() < sizeof(float) * data.size()) {
buffer.Resize(sizeof(float) * data.size());
}
std::memcpy(buffer.data(), data.data(), buffer.length());
outputs->at(i).dtype = PaddleDType::FLOAT32;
// TODO(panyx0718): support other types? fill tensor name? avoid a copy.
}

@ -27,13 +27,12 @@ namespace paddle {
PaddleTensor LodTensorToPaddleTensor(framework::LoDTensor* t) {
PaddleTensor pt;
pt.data.data = t->data<void>();
if (t->type() == typeid(int64_t)) {
pt.data.length = t->numel() * sizeof(int64_t);
pt.data.Reset(t->data<void>(), t->numel() * sizeof(int64_t));
pt.dtype = PaddleDType::INT64;
} else if (t->type() == typeid(float)) {
pt.data.length = t->numel() * sizeof(float);
pt.data.Reset(t->data<void>(), t->numel() * sizeof(float));
pt.dtype = PaddleDType::FLOAT32;
} else {
LOG(FATAL) << "unsupported type.";
@ -79,8 +78,8 @@ void MainWord2Vec(bool use_gpu) {
std::vector<PaddleTensor> outputs;
ASSERT_TRUE(predictor->Run(paddle_tensor_feeds, &outputs));
ASSERT_EQ(outputs.size(), 1UL);
size_t len = outputs[0].data.length;
float* data = static_cast<float*>(outputs[0].data.data);
size_t len = outputs[0].data.length();
float* data = static_cast<float*>(outputs[0].data.data());
for (size_t j = 0; j < len / sizeof(float); ++j) {
ASSERT_LT(data[j], 1.0);
ASSERT_GT(data[j], -1.0);
@ -103,8 +102,6 @@ void MainWord2Vec(bool use_gpu) {
EXPECT_LT(lod_data[i] - data[i], 1e-3);
EXPECT_GT(lod_data[i] - data[i], -1e-3);
}
free(outputs[0].data.data);
}
void MainImageClassification(bool use_gpu) {
@ -143,13 +140,12 @@ void MainImageClassification(bool use_gpu) {
std::vector<PaddleTensor> outputs;
ASSERT_TRUE(predictor->Run(paddle_tensor_feeds, &outputs));
ASSERT_EQ(outputs.size(), 1UL);
size_t len = outputs[0].data.length;
float* data = static_cast<float*>(outputs[0].data.data);
size_t len = outputs[0].data.length();
float* data = static_cast<float*>(outputs[0].data.data());
float* lod_data = output1.data<float>();
for (size_t j = 0; j < len / sizeof(float); ++j) {
EXPECT_NEAR(lod_data[j], data[j], 1e-3);
}
free(data);
}
void MainThreadsWord2Vec(bool use_gpu) {
@ -192,8 +188,8 @@ void MainThreadsWord2Vec(bool use_gpu) {
// check outputs range
ASSERT_EQ(local_outputs.size(), 1UL);
const size_t len = local_outputs[0].data.length;
float* data = static_cast<float*>(local_outputs[0].data.data);
const size_t len = local_outputs[0].data.length();
float* data = static_cast<float*>(local_outputs[0].data.data());
for (size_t j = 0; j < len / sizeof(float); ++j) {
ASSERT_LT(data[j], 1.0);
ASSERT_GT(data[j], -1.0);
@ -205,7 +201,6 @@ void MainThreadsWord2Vec(bool use_gpu) {
for (int i = 0; i < refs[tid].numel(); ++i) {
EXPECT_NEAR(ref_data[i], data[i], 1e-3);
}
free(data);
});
}
for (int i = 0; i < num_jobs; ++i) {
@ -251,14 +246,13 @@ void MainThreadsImageClassification(bool use_gpu) {
// check outputs correctness
ASSERT_EQ(local_outputs.size(), 1UL);
const size_t len = local_outputs[0].data.length;
float* data = static_cast<float*>(local_outputs[0].data.data);
const size_t len = local_outputs[0].data.length();
float* data = static_cast<float*>(local_outputs[0].data.data());
float* ref_data = refs[tid].data<float>();
EXPECT_EQ(refs[tid].numel(), len / sizeof(float));
for (int i = 0; i < refs[tid].numel(); ++i) {
EXPECT_NEAR(ref_data[i], data[i], 1e-3);
}
free(data);
});
}
for (int i = 0; i < num_jobs; ++i) {

@ -47,10 +47,11 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
#endif
std::unique_ptr<SSAGraph> Build(const ProgramDesc &program) const override;
int GetVarDeviceID(const std::string &varname) const;
private:
void CreateOpHandleIOs(SSAGraph *result, const OpDesc &op,
size_t place_id) const;
size_t device_id) const;
private:
std::string loss_var_name_;
@ -96,21 +97,23 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
const std::string &og,
std::unordered_set<std::string> *og_has_been_broadcast) const;
int GetOpDeviceID(
const std::vector<std::unordered_set<std::string>> &var_name_on_devices,
const OpDesc &op) const;
int GetOpDeviceID(const OpDesc &op) const;
void InsertAllReduceOp(SSAGraph *result, const std::string &og) const;
void CreateBroadcastOp(SSAGraph *result, const std::string &p_name,
size_t src_dev_id) const;
bool IsSparseGradient(
const std::unordered_map<std::string, VarDesc *> &all_vars,
const std::string &og) const;
bool IsSparseGradient(const std::string &og) const;
size_t GetAppropriateDeviceID(
const std::vector<std::string> &var_names) const;
private:
BuildStrategy strategy_;
mutable std::unordered_map<std::string, VarDesc *> all_vars_;
mutable std::unordered_map<std::string, int> var_name_on_devices_;
mutable std::vector<int64_t> balance_vars_;
void SetCommunicationContext(OpHandleBase *op_handle,
const platform::Place &p) const;

@ -30,6 +30,7 @@ class SSAGraphBuilder {
SSAGraphBuilder() {}
virtual ~SSAGraphBuilder() {}
virtual std::unique_ptr<SSAGraph> Build(const ProgramDesc &program) const = 0;
virtual int GetVarDeviceID(const std::string &var_name) const { return -1; }
DISABLE_COPY_AND_ASSIGN(SSAGraphBuilder);

@ -96,6 +96,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
auto cur_ready_vars = ready_vars.PopAll(1, &timeout);
if (timeout) {
std::lock_guard<std::mutex> l(exception_mu_);
if (exception_) {
auto exp = *exception_;
exception_.reset();
@ -199,6 +200,7 @@ void ThreadedSSAGraphExecutor::RunOp(
ready_var_q->Extend(op->Outputs());
VLOG(10) << op << " " << op->Name() << "Signal posted";
} catch (platform::EnforceNotMet ex) {
std::lock_guard<std::mutex> l(exception_mu_);
exception_.reset(new platform::EnforceNotMet(ex));
} catch (...) {
LOG(FATAL) << "Unknown exception catched";

@ -56,6 +56,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
std::vector<Scope *> local_scopes_;
std::vector<platform::Place> places_;
platform::DeviceContextPool fetch_ctxs_;
std::mutex exception_mu_;
std::unique_ptr<platform::EnforceNotMet> exception_;
std::atomic<int> running_ops_;

@ -21,7 +21,7 @@ limitations under the License. */
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/reader.h"
#ifdef PADDLE_WITH_DISTRIBUTE
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/distributed/grpc_client.h"
#endif
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
@ -49,8 +49,8 @@ Executor::Executor(const platform::Place& place) : place_(place) {}
#ifdef PADDLE_WITH_DISTRIBUTE
void Executor::Complete() {
::paddle::operators::detail::RPCClient::GetInstance<
::paddle::operators::detail::GRPCClient>()
::paddle::operators::distributed::RPCClient::GetInstance<
::paddle::operators::distributed::GRPCClient>()
->SendComplete();
}
#endif
@ -321,7 +321,8 @@ std::vector<std::shared_ptr<ExecutorPrepareContext>> Executor::Prepare(
}
void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope, bool create_vars) {
bool create_local_scope, bool create_vars,
bool keep_kids) {
Scope* local_scope = scope;
if (create_vars) {
if (create_local_scope) {
@ -344,12 +345,20 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
}
}
platform::DeviceContextPool::Instance().Get(place_)->Wait();
if (create_vars && create_local_scope) {
if (local_scope != scope) {
scope->DeleteScope(local_scope);
} else {
// Delete the local scopes created in operators.
scope->DropKids();
if (!keep_kids) {
// By default, we should delete all kid scopes after run executor because
// some operators may create local scope when running, such as while_op.
// But when while_op also create a local executor to run it's sub block,
// the sub scopes it created should not be dropped immediately, because
// while_grad_op will use some variables created during while_op run, so
// we need to keep the kids and wait for the outer executor to drop them.
scope->DropKids();
}
}
if (FLAGS_benchmark) {
VLOG(2) << "-------------------------------------------------------";
VLOG(2) << "Memory used after deleting local scope: "

@ -78,7 +78,7 @@ class Executor {
void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope = true,
bool create_vars = true);
bool create_vars = true, bool keep_kids = false);
void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
std::map<std::string, const LoDTensor*>* feed_targets,

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save