tangwei12 7 years ago
commit e589005229

@ -23,7 +23,7 @@ repos:
- id: clang-format-with-version-check
name: clang-format
description: Format files with ClangFormat.
entry: bash ./.clang_format.hook -i
entry: bash ./tools/codestyle/clang_format.hook -i
language: system
files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto)$
- repo: local
@ -52,7 +52,7 @@ repos:
hooks:
- id: copyright_checker
name: copyright_checker
entry: python ./.copyright.hook
entry: python ./tools/codestyle/copyright.hook
language: system
files: \.(c|cc|cxx|cpp|cu|h|hpp|hxx|proto|py)$
exclude: (?!.*third_party)^.*$ | (?!.*book)^.*$

@ -1,11 +1,18 @@
FROM nvidia/cuda:9.0-cudnn7-devel-ubuntu16.04
# Use UBUNTU_MIRROR can speed up apt-get speed.
# ARG UBUNTU_MIRROR
# RUN /bin/bash -c 'if [[ -n ${UBUNTU_MIRROR} ]]; then sed -i 's#http://archive.ubuntu.com/ubuntu#${UBUNTU_MIRROR}#g' /etc/apt/sources.list; fi'
RUN apt-get update && apt-get install -y python python-pip iputils-ping libgtk2.0-dev wget vim net-tools iftop python-opencv
RUN ln -s /usr/lib/x86_64-linux-gnu/libcudnn.so.7 /usr/lib/libcudnn.so && ln -s /usr/lib/x86_64-linux-gnu/libnccl.so.2 /usr/lib/libnccl.so
RUN pip install -U pip
RUN pip install -U kubernetes paddlepaddle
# IMPORTANT:
# Add "ENV http_proxy=http://ip:port" if your download is slow, and don't forget to unset it at runtime.
# exmaple: unset http_proxy && unset https_proxy && python fluid_benchmark.py ...
RUN pip install -U pip
RUN pip install -U kubernetes paddlepaddle
RUN sh -c 'echo "import paddle.v2 as paddle\npaddle.dataset.cifar.train10()\npaddle.dataset.flowers.fetch()" | python'
RUN sh -c 'echo "import paddle.v2 as paddle\npaddle.dataset.mnist.train()\npaddle.dataset.mnist.test()\npaddle.dataset.imdb.fetch()" | python'
@ -14,9 +21,11 @@ RUN pip uninstall -y paddlepaddle && mkdir /workspace
ADD https://raw.githubusercontent.com/PaddlePaddle/cloud/develop/docker/paddle_k8s /usr/bin
ADD https://raw.githubusercontent.com/PaddlePaddle/cloud/develop/docker/k8s_tools.py /root
RUN chmod +x /usr/bin/paddle_k8s
ADD *.whl /
RUN pip install /*.whl && rm -f /*.whl && chmod +x /usr/bin/paddle_k8s
RUN pip install /*.whl && rm -f /*.whl
ENV LD_LIBRARY_PATH=/usr/local/lib
ADD fluid_benchmark.py recordio_converter.py models/ /workspace/
ADD fluid_benchmark.py recordio_converter.py args.py recordio_converter.py run.sh run_fluid_benchmark.sh /workspace/
ADD models/ /workspace/models/

@ -264,8 +264,6 @@ def train_parallel(avg_loss, infer_prog, optimizer, train_reader, test_reader,
break
else:
loss, = exe.run([avg_loss.name], feed=feeder.feed(data))
if args.update_method == "pserver":
exe.bcast_params()
if args.use_reader_op:
num_samples += args.batch_size * args.gpus
else:
@ -301,9 +299,18 @@ def print_train_time(start_time, end_time, num_samples):
(num_samples, train_elapsed, examples_per_sec))
def print_paddle_envs():
print('----------- Configuration envs -----------')
for k in os.environ:
if "PADDLE_" in k:
print "ENV %s:%s" % (k, os.environ[k])
print('------------------------------------------------')
def main():
args = parse_args()
print_arguments(args)
print_paddle_envs()
# the unique trainer id, starting from 0, needed by trainer
# only

@ -17,6 +17,7 @@ import copy
import argparse
import random
import os
import copy
from kube_templates import pserver, trainer, envs
@ -109,10 +110,9 @@ def gen_job():
envs.append({"name": "PADDLE_JOB_NAME", "value": args.jobname})
envs.append({"name": "PADDLE_TRAINERS", "value": str(args.trainers)})
envs.append({"name": "PSERVERS", "value": str(args.pservers)})
envs.append({"name": "PADDLE_PSERVERS", "value": str(args.pservers)})
envs.append({"name": "ENTRY", "value": args.entry})
envs.append({"name": "PADDLE_PSERVER_PORT", "value": str(args.port)})
envs.append({"name": "PADDLE_PSERVER_PORT", "value": str(args.port)})
# NOTE: these directories below are cluster specific, please modify
# this settings before you run on your own cluster.
envs.append({
@ -166,7 +166,7 @@ def gen_job():
tn["spec"]["template"]["spec"]["volumes"] = volumes
tn_container["volumeMounts"] = volumeMounts
ps_container["env"] = envs
ps_container["env"] = copy.deepcopy(envs)
ps_container["env"].append({
"name": "PADDLE_TRAINING_ROLE",
"value": "PSERVER"

@ -1,7 +1,7 @@
#!/bin/bash
python gen_doc.py layers --submodules control_flow device io nn ops tensor detection learning_rate_scheduler metric > layers.rst
for module in data_feeder clip metrics executor initializer io nets optimizer param_attr profiler regularizer
for module in data_feeder clip metrics executor initializer io nets optimizer param_attr profiler regularizer transpiler
do
python gen_doc.py ${module} > ${module}.rst
done

@ -0,0 +1,46 @@
.. THIS FILE IS GENERATED BY `gen_doc.{py|sh}`
!DO NOT EDIT THIS FILE MANUALLY!
==========
transpiler
==========
DistributeTranspiler
--------------------
.. autoclass:: paddle.fluid.transpiler.DistributeTranspiler
:members:
:noindex:
InferenceTranspiler
-------------------
.. autoclass:: paddle.fluid.transpiler.InferenceTranspiler
:members:
:noindex:
memory_optimize
---------------
.. autofunction:: paddle.fluid.transpiler.memory_optimize
:noindex:
release_memory
--------------
.. autofunction:: paddle.fluid.transpiler.release_memory
:noindex:
HashName
--------
.. autoclass:: paddle.fluid.transpiler.HashName
:members:
:noindex:
RoundRobin
----------
.. autoclass:: paddle.fluid.transpiler.RoundRobin
:members:
:noindex:

@ -13,6 +13,7 @@ cpu_noavx_openblas `fluid.tgz <https://guest:@paddleci.ngrok.io/repository
cuda7.5_cudnn5_avx_mkl `fluid.tgz <https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_Cuda75cudnn5cp27cp27mu/.lastSuccessful/fluid.tgz>`_
cuda8.0_cudnn5_avx_mkl `fluid.tgz <https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_Cuda80cudnn5cp27cp27mu/.lastSuccessful/fluid.tgz>`_
cuda8.0_cudnn7_avx_mkl `fluid.tgz <https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_Cuda8cudnn7cp27cp27mu/.lastSuccessful/fluid.tgz>`_
cuda9.0_cudnn7_avx_mkl `fluid.tgz <https://guest:@paddleci.ngrok.io/repository/download/Manylinux1_Cuda90cudnn7avxMkl/.lastSuccessful/fluid.tgz>`_
====================== ========================================
从源码编译

@ -40,10 +40,9 @@ void Main(bool use_gpu) {
//# 2. Prepare input.
int64_t data[4] = {1, 2, 3, 4};
PaddleBuf buf{.data = data, .length = sizeof(data)};
PaddleTensor tensor{.name = "",
.shape = std::vector<int>({4, 1}),
.data = buf,
.data = PaddleBuf(data, sizeof(data)),
.dtype = PaddleDType::INT64};
// For simplicity, we set all the slots with the same data.
@ -55,14 +54,12 @@ void Main(bool use_gpu) {
//# 4. Get output.
ASSERT_EQ(outputs.size(), 1UL);
LOG(INFO) << "output buffer size: " << outputs.front().data.length;
const size_t num_elements = outputs.front().data.length / sizeof(float);
LOG(INFO) << "output buffer size: " << outputs.front().data.length();
const size_t num_elements = outputs.front().data.length() / sizeof(float);
// The outputs' buffers are in CPU memory.
for (size_t i = 0; i < std::min(5UL, num_elements); i++) {
LOG(INFO) << static_cast<float*>(outputs.front().data.data)[i];
LOG(INFO) << static_cast<float*>(outputs.front().data.data())[i];
}
// TODO(Superjomn): this is should be free automatically
free(outputs[0].data.data);
}
}
@ -86,10 +83,9 @@ void MainThreads(int num_threads, bool use_gpu) {
for (int batch_id = 0; batch_id < num_batches; ++batch_id) {
// 2. Dummy Input Data
int64_t data[4] = {1, 2, 3, 4};
PaddleBuf buf{.data = data, .length = sizeof(data)};
PaddleTensor tensor{.name = "",
.shape = std::vector<int>({4, 1}),
.data = buf,
.data = PaddleBuf(data, sizeof(data)),
.dtype = PaddleDType::INT64};
std::vector<PaddleTensor> inputs(4, tensor);
std::vector<PaddleTensor> outputs;
@ -99,13 +95,13 @@ void MainThreads(int num_threads, bool use_gpu) {
// 4. Get output.
ASSERT_EQ(outputs.size(), 1UL);
LOG(INFO) << "TID: " << tid << ", "
<< "output buffer size: " << outputs.front().data.length;
const size_t num_elements = outputs.front().data.length / sizeof(float);
<< "output buffer size: " << outputs.front().data.length();
const size_t num_elements =
outputs.front().data.length() / sizeof(float);
// The outputs' buffers are in CPU memory.
for (size_t i = 0; i < std::min(5UL, num_elements); i++) {
LOG(INFO) << static_cast<float*>(outputs.front().data.data)[i];
LOG(INFO) << static_cast<float*>(outputs.front().data.data())[i];
}
free(outputs[0].data.data);
}
});
}

@ -13,3 +13,53 @@ See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/contrib/inference/paddle_inference_api.h"
namespace paddle {
PaddleBuf::PaddleBuf(PaddleBuf&& other)
: data_(other.data_),
length_(other.length_),
memory_owned_(other.memory_owned_) {
other.memory_owned_ = false;
other.data_ = nullptr;
other.length_ = 0;
}
PaddleBuf::PaddleBuf(const PaddleBuf& other) { *this = other; }
PaddleBuf& PaddleBuf::operator=(const PaddleBuf& other) {
// only the buffer with external memory can be copied
assert(!other.memory_owned_);
data_ = other.data_;
length_ = other.length_;
memory_owned_ = other.memory_owned_;
return *this;
}
void PaddleBuf::Resize(size_t length) {
// Only the owned memory can be reset, the external memory can't be changed.
if (length_ == length) return;
assert(memory_owned_);
Free();
data_ = new char[length];
length_ = length;
memory_owned_ = true;
}
void PaddleBuf::Reset(void* data, size_t length) {
Free();
memory_owned_ = false;
data_ = data;
length_ = length;
}
void PaddleBuf::Free() {
if (memory_owned_ && data_) {
assert(length_ > 0);
delete static_cast<char*>(data_);
data_ = nullptr;
length_ = 0;
}
}
} // namespace paddle

@ -21,6 +21,7 @@ limitations under the License. */
#pragma once
#include <cassert>
#include <memory>
#include <string>
#include <vector>
@ -32,12 +33,38 @@ enum PaddleDType {
INT64,
};
struct PaddleBuf {
void* data; // pointer to the data memory.
size_t length; // number of memory bytes.
class PaddleBuf {
public:
PaddleBuf() = default;
PaddleBuf(PaddleBuf&& other);
// Copy only available when memory is managed externally.
explicit PaddleBuf(const PaddleBuf&);
PaddleBuf& operator=(const PaddleBuf&);
// Do not own the memory.
PaddleBuf(void* data, size_t length)
: data_(data), length_(length), memory_owned_{false} {}
// Own memory.
PaddleBuf(size_t length)
: data_(new char[length]), length_(length), memory_owned_(true) {}
// Resize to `length` bytes.
void Resize(size_t length);
// Reset to external memory.
void Reset(void* data, size_t length);
bool empty() const { return length_ == 0; }
void* data() const { return data_; }
size_t length() const { return length_; }
~PaddleBuf() { Free(); }
private:
void Free();
void* data_{nullptr}; // pointer to the data memory.
size_t length_{0}; // number of memory bytes.
bool memory_owned_{true};
};
struct PaddleTensor {
PaddleTensor() = default;
std::string name; // variable name.
std::vector<int> shape;
// TODO(Superjomn) for LoD support, add a vector<vector<int>> field if needed.
@ -67,8 +94,9 @@ class PaddlePredictor {
// Predict an record.
// The caller should be responsible for allocating and releasing the memory of
// `inputs`. `inputs` should be alive until Run returns. caller should be
// responsible for releasing the memory of `output_data`.
// `inputs`. `inputs` should be available until Run returns. Caller should be
// responsible for the output tensor's buffer, either allocated or passed from
// outside.
virtual bool Run(const std::vector<PaddleTensor>& inputs,
std::vector<PaddleTensor>* output_data) = 0;

@ -48,7 +48,7 @@ bool PaddleInferenceAnakinPredictor::Run(
auto d_tensor_in_p = executor_.get_in(input.name);
float *d_data_p = d_tensor_in_p->mutable_data();
if (cudaMemcpy(d_data_p,
static_cast<float *>(input.data.data),
static_cast<float *>(input.data.data()),
d_tensor_in_p->valid_size() * sizeof(float),
cudaMemcpyHostToDevice) != 0) {
LOG(ERROR) << "copy data from CPU to GPU error";
@ -65,8 +65,11 @@ bool PaddleInferenceAnakinPredictor::Run(
for (auto &output : *output_data) {
auto *tensor = executor_.get_out(output.name);
output.shape = tensor->shape();
if (output.data.length() < tensor->valid_size() * sizeof(float)) {
output.data.Resize(tensor->valid_size() * sizeof(float));
}
// Copy data from GPU -> CPU
if (cudaMemcpy(output.data.data,
if (cudaMemcpy(output.data.data(),
tensor->mutable_data(),
tensor->valid_size() * sizeof(float),
cudaMemcpyDeviceToHost) != 0) {

@ -37,28 +37,26 @@ TEST(inference, anakin) {
float data[1 * 3 * 224 * 224] = {1.0f};
PaddleBuf buf{.data = data, .length = sizeof(data)};
PaddleTensor tensor{.name = "input_0",
.shape = std::vector<int>({1, 3, 224, 224}),
.data = buf,
.data = PaddleBuf(data, sizeof(data)),
.dtype = PaddleDType::FLOAT32};
// For simplicity, we set all the slots with the same data.
std::vector<PaddleTensor> paddle_tensor_feeds(1, tensor);
std::vector<PaddleTensor> paddle_tensor_feeds;
paddle_tensor_feeds.emplace_back(std::move(tensor));
float data_out[1000];
PaddleBuf buf_out{.data = data_out, .length = sizeof(data)};
PaddleTensor tensor_out{.name = "prob_out",
.shape = std::vector<int>({1000, 1}),
.data = buf_out,
.data = PaddleBuf(),
.dtype = PaddleDType::FLOAT32};
std::vector<PaddleTensor> outputs(1, tensor_out);
std::vector<PaddleTensor> outputs;
outputs.emplace_back(std::move(tensor_out));
ASSERT_TRUE(predictor->Run(paddle_tensor_feeds, &outputs));
float* data_o = static_cast<float*>(outputs[0].data.data);
float* data_o = static_cast<float*>(outputs[0].data.data());
for (size_t j = 0; j < 1000; ++j) {
LOG(INFO) << "output[" << j << "]: " << data_o[j];
}

@ -178,8 +178,8 @@ bool NativePaddlePredictor::SetFeed(const std::vector<PaddleTensor> &inputs,
// TODO(panyx0718): Init LoDTensor from existing memcpy to save a copy.
std::memcpy(static_cast<void *>(input_ptr),
inputs[i].data.data,
inputs[i].data.length);
inputs[i].data.data(),
inputs[i].data.length());
feeds->push_back(input);
}
return true;
@ -241,10 +241,11 @@ bool NativePaddlePredictor::GetFetch(
}
outputs->at(i).shape = shape;
outputs->at(i).data.length = sizeof(float) * data.size();
outputs->at(i).data.data = malloc(outputs->at(i).data.length);
std::memcpy(
outputs->at(i).data.data, data.data(), outputs->at(i).data.length);
auto &buffer = outputs->at(i).data;
if (buffer.empty() || buffer.length() < sizeof(float) * data.size()) {
buffer.Resize(sizeof(float) * data.size());
}
std::memcpy(buffer.data(), data.data(), buffer.length());
outputs->at(i).dtype = PaddleDType::FLOAT32;
// TODO(panyx0718): support other types? fill tensor name? avoid a copy.
}

@ -27,13 +27,12 @@ namespace paddle {
PaddleTensor LodTensorToPaddleTensor(framework::LoDTensor* t) {
PaddleTensor pt;
pt.data.data = t->data<void>();
if (t->type() == typeid(int64_t)) {
pt.data.length = t->numel() * sizeof(int64_t);
pt.data.Reset(t->data<void>(), t->numel() * sizeof(int64_t));
pt.dtype = PaddleDType::INT64;
} else if (t->type() == typeid(float)) {
pt.data.length = t->numel() * sizeof(float);
pt.data.Reset(t->data<void>(), t->numel() * sizeof(float));
pt.dtype = PaddleDType::FLOAT32;
} else {
LOG(FATAL) << "unsupported type.";
@ -79,8 +78,8 @@ void MainWord2Vec(bool use_gpu) {
std::vector<PaddleTensor> outputs;
ASSERT_TRUE(predictor->Run(paddle_tensor_feeds, &outputs));
ASSERT_EQ(outputs.size(), 1UL);
size_t len = outputs[0].data.length;
float* data = static_cast<float*>(outputs[0].data.data);
size_t len = outputs[0].data.length();
float* data = static_cast<float*>(outputs[0].data.data());
for (size_t j = 0; j < len / sizeof(float); ++j) {
ASSERT_LT(data[j], 1.0);
ASSERT_GT(data[j], -1.0);
@ -103,8 +102,6 @@ void MainWord2Vec(bool use_gpu) {
EXPECT_LT(lod_data[i] - data[i], 1e-3);
EXPECT_GT(lod_data[i] - data[i], -1e-3);
}
free(outputs[0].data.data);
}
void MainImageClassification(bool use_gpu) {
@ -143,13 +140,12 @@ void MainImageClassification(bool use_gpu) {
std::vector<PaddleTensor> outputs;
ASSERT_TRUE(predictor->Run(paddle_tensor_feeds, &outputs));
ASSERT_EQ(outputs.size(), 1UL);
size_t len = outputs[0].data.length;
float* data = static_cast<float*>(outputs[0].data.data);
size_t len = outputs[0].data.length();
float* data = static_cast<float*>(outputs[0].data.data());
float* lod_data = output1.data<float>();
for (size_t j = 0; j < len / sizeof(float); ++j) {
EXPECT_NEAR(lod_data[j], data[j], 1e-3);
}
free(data);
}
void MainThreadsWord2Vec(bool use_gpu) {
@ -192,8 +188,8 @@ void MainThreadsWord2Vec(bool use_gpu) {
// check outputs range
ASSERT_EQ(local_outputs.size(), 1UL);
const size_t len = local_outputs[0].data.length;
float* data = static_cast<float*>(local_outputs[0].data.data);
const size_t len = local_outputs[0].data.length();
float* data = static_cast<float*>(local_outputs[0].data.data());
for (size_t j = 0; j < len / sizeof(float); ++j) {
ASSERT_LT(data[j], 1.0);
ASSERT_GT(data[j], -1.0);
@ -205,7 +201,6 @@ void MainThreadsWord2Vec(bool use_gpu) {
for (int i = 0; i < refs[tid].numel(); ++i) {
EXPECT_NEAR(ref_data[i], data[i], 1e-3);
}
free(data);
});
}
for (int i = 0; i < num_jobs; ++i) {
@ -251,14 +246,13 @@ void MainThreadsImageClassification(bool use_gpu) {
// check outputs correctness
ASSERT_EQ(local_outputs.size(), 1UL);
const size_t len = local_outputs[0].data.length;
float* data = static_cast<float*>(local_outputs[0].data.data);
const size_t len = local_outputs[0].data.length();
float* data = static_cast<float*>(local_outputs[0].data.data());
float* ref_data = refs[tid].data<float>();
EXPECT_EQ(refs[tid].numel(), len / sizeof(float));
for (int i = 0; i < refs[tid].numel(); ++i) {
EXPECT_NEAR(ref_data[i], data[i], 1e-3);
}
free(data);
});
}
for (int i = 0; i < num_jobs; ++i) {

@ -47,10 +47,11 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
#endif
std::unique_ptr<SSAGraph> Build(const ProgramDesc &program) const override;
int GetVarDeviceID(const std::string &varname) const;
private:
void CreateOpHandleIOs(SSAGraph *result, const OpDesc &op,
size_t place_id) const;
size_t device_id) const;
private:
std::string loss_var_name_;
@ -96,21 +97,23 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder {
const std::string &og,
std::unordered_set<std::string> *og_has_been_broadcast) const;
int GetOpDeviceID(
const std::vector<std::unordered_set<std::string>> &var_name_on_devices,
const OpDesc &op) const;
int GetOpDeviceID(const OpDesc &op) const;
void InsertAllReduceOp(SSAGraph *result, const std::string &og) const;
void CreateBroadcastOp(SSAGraph *result, const std::string &p_name,
size_t src_dev_id) const;
bool IsSparseGradient(
const std::unordered_map<std::string, VarDesc *> &all_vars,
const std::string &og) const;
bool IsSparseGradient(const std::string &og) const;
size_t GetAppropriateDeviceID(
const std::vector<std::string> &var_names) const;
private:
BuildStrategy strategy_;
mutable std::unordered_map<std::string, VarDesc *> all_vars_;
mutable std::unordered_map<std::string, int> var_name_on_devices_;
mutable std::vector<int64_t> balance_vars_;
void SetCommunicationContext(OpHandleBase *op_handle,
const platform::Place &p) const;

@ -30,6 +30,7 @@ class SSAGraphBuilder {
SSAGraphBuilder() {}
virtual ~SSAGraphBuilder() {}
virtual std::unique_ptr<SSAGraph> Build(const ProgramDesc &program) const = 0;
virtual int GetVarDeviceID(const std::string &var_name) const { return -1; }
DISABLE_COPY_AND_ASSIGN(SSAGraphBuilder);

@ -96,6 +96,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
auto cur_ready_vars = ready_vars.PopAll(1, &timeout);
if (timeout) {
std::lock_guard<std::mutex> l(exception_mu_);
if (exception_) {
auto exp = *exception_;
exception_.reset();
@ -199,6 +200,7 @@ void ThreadedSSAGraphExecutor::RunOp(
ready_var_q->Extend(op->Outputs());
VLOG(10) << op << " " << op->Name() << "Signal posted";
} catch (platform::EnforceNotMet ex) {
std::lock_guard<std::mutex> l(exception_mu_);
exception_.reset(new platform::EnforceNotMet(ex));
} catch (...) {
LOG(FATAL) << "Unknown exception catched";

@ -56,6 +56,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
std::vector<Scope *> local_scopes_;
std::vector<platform::Place> places_;
platform::DeviceContextPool fetch_ctxs_;
std::mutex exception_mu_;
std::unique_ptr<platform::EnforceNotMet> exception_;
std::atomic<int> running_ops_;

@ -21,7 +21,7 @@ limitations under the License. */
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/reader.h"
#ifdef PADDLE_WITH_DISTRIBUTE
#include "paddle/fluid/operators/detail/grpc_client.h"
#include "paddle/fluid/operators/distributed/grpc_client.h"
#endif
#include "paddle/fluid/platform/place.h"
#include "paddle/fluid/platform/profiler.h"
@ -49,8 +49,8 @@ Executor::Executor(const platform::Place& place) : place_(place) {}
#ifdef PADDLE_WITH_DISTRIBUTE
void Executor::Complete() {
::paddle::operators::detail::RPCClient::GetInstance<
::paddle::operators::detail::GRPCClient>()
::paddle::operators::distributed::RPCClient::GetInstance<
::paddle::operators::distributed::GRPCClient>()
->SendComplete();
}
#endif
@ -321,7 +321,8 @@ std::vector<std::shared_ptr<ExecutorPrepareContext>> Executor::Prepare(
}
void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope, bool create_vars) {
bool create_local_scope, bool create_vars,
bool keep_kids) {
Scope* local_scope = scope;
if (create_vars) {
if (create_local_scope) {
@ -344,12 +345,20 @@ void Executor::RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
}
}
platform::DeviceContextPool::Instance().Get(place_)->Wait();
if (create_vars && create_local_scope) {
if (local_scope != scope) {
scope->DeleteScope(local_scope);
} else {
// Delete the local scopes created in operators.
scope->DropKids();
if (!keep_kids) {
// By default, we should delete all kid scopes after run executor because
// some operators may create local scope when running, such as while_op.
// But when while_op also create a local executor to run it's sub block,
// the sub scopes it created should not be dropped immediately, because
// while_grad_op will use some variables created during while_op run, so
// we need to keep the kids and wait for the outer executor to drop them.
scope->DropKids();
}
}
if (FLAGS_benchmark) {
VLOG(2) << "-------------------------------------------------------";
VLOG(2) << "Memory used after deleting local scope: "

@ -78,7 +78,7 @@ class Executor {
void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
bool create_local_scope = true,
bool create_vars = true);
bool create_vars = true, bool keep_kids = false);
void RunPreparedContext(ExecutorPrepareContext* ctx, Scope* scope,
std::map<std::string, const LoDTensor*>* feed_targets,

@ -110,7 +110,6 @@ ParallelExecutor::ParallelExecutor(
// Step 3. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
details::SSAGraphBuilderFactory builder_factory(
member_->places_, loss_var_name, params, member_->local_scopes_,
build_strategy);
@ -122,9 +121,10 @@ ParallelExecutor::ParallelExecutor(
#endif
}
builder_ = std::move(builder_factory.Create());
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, places,
builder_factory.Create()->Build(main_program)));
builder_->Build(main_program)));
member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, std::move(var_infos),
@ -133,10 +133,22 @@ ParallelExecutor::ParallelExecutor(
void ParallelExecutor::BCastParamsToGPUs(
const std::unordered_set<std::string> &vars) const {
auto *main_scope = member_->local_scopes_[0];
// the the initialize bcast, all vars would be bcast from device(0), otherwise
// bcast from the specified device.
bool initialize = builder_.get() == nullptr ? true : false;
for (auto &var : vars) {
auto *main_var = main_scope->FindVar(var);
int var_dev_id =
builder_.get() == nullptr ? -1 : builder_->GetVarDeviceID(var);
if (!initialize && var_dev_id == -1) continue;
framework::Variable *main_var = nullptr;
if (initialize) {
main_var = member_->local_scopes_[0]->FindVar(var);
} else {
main_var = member_->local_scopes_[var_dev_id]->FindVar(var);
}
if (main_var == nullptr || !main_var->IsType<LoDTensor>()) {
continue;
}
@ -151,7 +163,8 @@ void ParallelExecutor::BCastParamsToGPUs(
for (size_t i = 0; i < member_->places_.size(); ++i) {
auto place = member_->places_[i];
void *buffer;
if (i == 0) {
if ((initialize && i == 0) || (!initialize && i == var_dev_id)) {
buffer = const_cast<void *>(main_tensor.data<void>());
} else {
auto local_scope = member_->local_scopes_[i];

@ -19,12 +19,14 @@ limitations under the License. */
#include <unordered_set>
#include <vector>
#include "paddle/fluid/framework/details/execution_strategy.h"
#include "paddle/fluid/framework/details/multi_devices_graph_builder.h"
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/op_info.h"
#include "paddle/fluid/framework/program_desc.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/tensor.h"
#include "paddle/fluid/platform/device_context.h"
namespace paddle {
namespace framework {
@ -68,6 +70,7 @@ class ParallelExecutor {
private:
ParallelExecutorPrivate *member_;
std::unique_ptr<details::SSAGraphBuilder> builder_;
};
} // namespace framework

@ -27,7 +27,7 @@ void TensorRTSubGraphPass::Run(DataFlowGraph *graph) {
SubGraphFuse(graph, node_inside_subgraph_teller_);
}
} // analysis
} // inference
} // namespace analysis
} // namespace inference
} // paddle
} // namespace paddle

@ -184,9 +184,9 @@ else()
set(DEPS_OPS ${DEPS_OPS} nccl_op)
endif()
add_subdirectory(detail)
if(WITH_DISTRIBUTE)
add_subdirectory(distributed)
set(DISTRIBUTE_DEPS "")
if(WITH_GRPC)
set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf)
@ -195,20 +195,11 @@ if(WITH_DISTRIBUTE)
endif()
set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor")
op_library(prefetch_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(prefetch_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(checkpoint_notify_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(checkpoint_notify_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(recv_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(recv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(listen_and_serv_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(listen_and_serv_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(send_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(send_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
op_library(send_barrier_op DEPS ${DISTRIBUTE_DEPS})
op_library(fetch_barrier_op DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(send_barrier_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
set_source_files_properties(fetch_barrier_op.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
foreach(dist_op "prefetch_op" "checkpoint_notify_op" "listen_and_serv_op" "send_op" "recv_op" "send_barrier_op" "fetch_barrier_op")
op_library(${dist_op} DEPS ${DISTRIBUTE_DEPS})
set_source_files_properties(${dist_op}.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
endforeach()
#set_source_files_properties(send_recv_op_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
#cc_test(test_send_recv SRCS send_recv_op_test.cc DEPS prefetch_op send_op
# listen_and_serv_op sum_op executor SERIAL)

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save