Merge remote-tracking branch 'ups/develop' into refine/jit/gru

test=develop
release/1.1
tensor-tang 7 years ago
commit 83dc689877

@ -261,6 +261,13 @@ function(cc_library TARGET_NAME)
add_dependencies(${TARGET_NAME} mklml)
target_link_libraries(${TARGET_NAME} "-L${MKLML_LIB_DIR} -liomp5 -Wl,--as-needed")
endif()
# remove link to python, see notes at:
# https://github.com/pybind/pybind11/blob/master/docs/compiling.rst#building-manually
if("${cc_library_DEPS};" MATCHES "python;")
list(REMOVE_ITEM cc_library_DEPS python)
add_dependencies(${TARGET_NAME} python)
target_link_libraries(${TARGET_NAME} "-Wl,-undefined,dynamic_lookup")
endif()
target_link_libraries(${TARGET_NAME} ${cc_library_DEPS})
add_dependencies(${TARGET_NAME} ${cc_library_DEPS})
endif()

@ -49,6 +49,8 @@ struct VarHandleBase {
void AddOutput(OpHandleBase* out, ir::Node* node) {
if (pending_ops_.find(out) == pending_ops_.end()) {
PADDLE_ENFORCE(out != nullptr, "The output of %s should not be nullptr",
this->Node()->Name());
pending_ops_.insert(out);
node_->outputs.push_back(node);
}

@ -299,6 +299,12 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
}
ParallelExecutor::~ParallelExecutor() {
const auto dev_ctxs =
platform::DeviceContextPool::Instance().GetAllDeviceContexts();
for (auto &dev_ctx : dev_ctxs) {
dev_ctx->Wait();
}
if (member_->own_local_scope_) {
for (size_t i = 1; i < member_->local_scopes_.size(); ++i) {
Scope *local_scope = member_->local_scopes_[i];

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -86,7 +86,7 @@ VarHandlePtr GRPCClient::AsyncSendVar(const std::string& ep,
// stub context
s->response_call_back_ = nullptr;
platform::RecordEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method, p_ctx);
auto call = s->stub_g_.PrepareUnaryCall(
s->context_.get(), "/sendrecv.SendRecvService/SendVariable", req, &cq_);
@ -143,7 +143,7 @@ VarHandlePtr GRPCClient::AsyncGetVar(const std::string& ep,
// stub context
s->response_call_back_ = ProcGetResponse;
platform::RecordEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method, p_ctx);
auto call = s->stub_g_.PrepareUnaryCall(
s->context_.get(), "/sendrecv.SendRecvService/GetVariable", buf, &cq_);
@ -191,7 +191,7 @@ VarHandlePtr GRPCClient::AsyncPrefetchVar(const std::string& ep,
// stub context
s->response_call_back_ = ProcGetResponse;
platform::RecordEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method, p_ctx);
auto call = s->stub_g_.PrepareUnaryCall(
s->context_.get(), "/sendrecv.SendRecvService/PrefetchVariable", req,
@ -221,7 +221,7 @@ VarHandlePtr GRPCClient::AsyncSendBatchBarrier(const std::string& ep,
sendrecv::VariableMessage req;
req.set_varname(BATCH_BARRIER_MESSAGE);
platform::RecordEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method, nullptr);
auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
@ -246,7 +246,7 @@ VarHandlePtr GRPCClient::AsyncSendFetchBarrier(const std::string& ep,
sendrecv::VariableMessage req;
req.set_varname(FETCH_BARRIER_MESSAGE);
platform::RecordEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method, nullptr);
auto rpc = s->stub_->AsyncGetVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
@ -271,7 +271,7 @@ VarHandlePtr GRPCClient::AsyncSendComplete(const std::string& ep,
sendrecv::VariableMessage req;
req.set_varname(COMPLETE_MESSAGE);
platform::RecordEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method, nullptr);
auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
@ -301,7 +301,7 @@ VarHandlePtr GRPCClient::AsyncCheckpointNotify(const std::string& ep,
req.set_varname(CHECKPOINT_SAVE_MESSAGE);
req.set_out_varname(dir);
platform::RecordEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method, nullptr);
auto rpc = s->stub_->AsyncCheckpointNotify(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));

@ -36,7 +36,7 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
const platform::DeviceContext& ctx,
::grpc::ByteBuffer* msg,
const std::string& out_name) {
platform::RecordEvent record_event("serial", &ctx);
platform::RecordRPCEvent record_event("serial", &ctx);
// Default DestroyCallback does nothing, When using GPU
// the CPU buffer need to be freed.
DestroyCallback destroy_callback = [](void* backing) {};
@ -148,7 +148,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg,
const platform::DeviceContext& ctx,
const framework::Scope* scope,
framework::Variable** var) {
platform::RecordEvent record_event("deserial", &ctx);
platform::RecordRPCEvent record_event("deserial", &ctx);
operators::distributed::GRPCVariableResponse resp(scope, &ctx);
PADDLE_ENFORCE(resp.Parse(msg) == 0, "parse bytebuffer to tensor error!");
*var = resp.GetVar();

@ -39,11 +39,9 @@ void CPUGather(const platform::DeviceContext& ctx, const Tensor& src,
PADDLE_ENFORCE(platform::is_cpu_place(ctx.GetPlace()));
// check index of shape 1-D
PADDLE_ENFORCE(index.dims().size() == 1);
int index_size = index.dims()[0];
int64_t index_size = index.dims()[0];
auto src_dims = src.dims();
framework::DDim output_dims(src_dims);
output_dims[0] = index_size;
const T* p_src = src.data<T>();
const int* p_index = index.data<int>();
@ -55,7 +53,7 @@ void CPUGather(const platform::DeviceContext& ctx, const Tensor& src,
const size_t slice_bytes = slice_size * sizeof(T);
for (int i = 0; i < index_size; ++i) {
for (int64_t i = 0; i < index_size; ++i) {
int index_ = p_index[i];
memcpy(p_output + i * slice_size, p_src + index_ * slice_size, slice_bytes);
}

@ -35,6 +35,16 @@ platform::DeviceContext* DeviceContextPool::Get(const platform::Place& place) {
return it->second.get();
}
const std::vector<const DeviceContext*>
DeviceContextPool::GetAllDeviceContexts() const {
std::vector<const DeviceContext*> all_device_ctx;
all_device_ctx.reserve(device_contexts_.size());
for (auto& dev_ctx : device_contexts_) {
all_device_ctx.emplace_back(dev_ctx.second.get());
}
return all_device_ctx;
}
DeviceContextPool::DeviceContextPool(
const std::vector<platform::Place>& places) {
PADDLE_ENFORCE_GT(places.size(), 0);

@ -217,6 +217,9 @@ class DeviceContextPool {
/*! \brief Return handle of single device context. */
platform::DeviceContext* Get(const platform::Place& place);
/*! \brief Return all the device contexts. */
const std::vector<const DeviceContext*> GetAllDeviceContexts() const;
template <typename Place>
const typename DefaultDeviceContextType<Place>::TYPE* GetByPlace(
const Place& place) {

@ -30,6 +30,8 @@ limitations under the License. */
#include "paddle/fluid/platform/device_tracer.h"
#include "paddle/fluid/string/printf.h"
DEFINE_bool(enable_rpc_profiler, false, "Enable rpc profiler or not.");
namespace paddle {
namespace platform {
@ -193,6 +195,13 @@ RecordEvent::~RecordEvent() {
PopEvent(name_, dev_ctx_);
}
RecordRPCEvent::RecordRPCEvent(const std::string& name,
const DeviceContext* dev_ctx) {
if (FLAGS_enable_rpc_profiler) {
event_.reset(new platform::RecordEvent(name, dev_ctx));
}
}
RecordBlock::RecordBlock(int block_id)
: is_enabled_(false), start_ns_(PosixInNsec()) {
std::lock_guard<std::mutex> l(profiler_mu);

@ -87,6 +87,16 @@ struct RecordEvent {
std::string full_name_;
};
class RecordRPCEvent {
public:
// dev_ctx can be set to nullptr if device is cpu.
RecordRPCEvent(const std::string& name, const DeviceContext* dev_ctx);
~RecordRPCEvent() {}
private:
std::unique_ptr<RecordEvent> event_;
};
struct RecordBlock {
explicit RecordBlock(int block_id);
~RecordBlock();

@ -120,6 +120,7 @@ def __bootstrap__():
read_env_flags.append('rpc_deadline')
read_env_flags.append('rpc_server_profile_period')
read_env_flags.append('rpc_server_profile_path')
read_env_flags.append('enable_rpc_profiler')
if core.is_compiled_with_cuda():
read_env_flags += [

Loading…
Cancel
Save