merge with upstream develop

gangliao-patch-1
gongweibao 8 years ago
commit af5ac2c474

@ -27,6 +27,7 @@ if(NOT CMAKE_CROSSCOMPILING)
endif(NOT CMAKE_CROSSCOMPILING)
find_package(Git REQUIRED)
find_package(Threads REQUIRED)
find_package(Boost QUIET)
include(simd)
@ -48,6 +49,7 @@ option(COVERALLS_UPLOAD "Package code coverage data to coveralls" OFF)
option(ON_TRAVIS "Exclude special unit test on Travis CI" OFF)
option(WITH_C_API "Compile PaddlePaddle with C-API(Prediction)" OFF)
option(WITH_GOLANG "Compile PaddlePaddle with GOLANG" OFF)
option(USE_NNPACK "Compile PaddlePaddle with NNPACK library" OFF)
# CMAKE_BUILD_TYPE
if(NOT CMAKE_BUILD_TYPE)
@ -110,6 +112,7 @@ include_directories("${PROJ_ROOT}")
include_directories("${PROJ_ROOT}/paddle/cuda/include")
include_directories("${CMAKE_CURRENT_BINARY_DIR}/proto")
include_directories("${CMAKE_CURRENT_BINARY_DIR}/go/pserver/cclient")
include_directories(${Boost_INCLUDE_DIRS})
set(EXTERNAL_LIBS
${GFLAGS_LIBRARIES}
@ -127,15 +130,21 @@ if(WITH_GPU)
endif(NOT WITH_DSO)
endif(WITH_GPU)
if(USE_NNPACK)
list(APPEND EXTERNAL_LIBS ${NNPACK_LIB} ${PTHREADPOOL_LIB} "rt")
endif(USE_NNPACK)
add_subdirectory(proto)
add_subdirectory(paddle)
add_subdirectory(python)
# "add_subdirectory(paddle)" and "add_subdirectory(python)" should be
# placed after this block, because they depends on it.
if(WITH_GOLANG)
#TODO (add go/master/c back when fixed)
add_subdirectory(go/pserver/cclient)
add_subdirectory(go/master/c)
add_subdirectory(go/pserver/cclient)
endif(WITH_GOLANG)
add_subdirectory(paddle)
add_subdirectory(python)
if(WITH_DOC)
add_subdirectory(doc)
endif()

@ -69,3 +69,27 @@ endif(NOT WITH_GPU)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SIMD_FLAG}")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SIMD_FLAG}")
if(WITH_GOLANG)
# we need to symlink Paddle directory into GOPATH. If we
# don't do it and we have code that depends on Paddle, go
# get ./... will download a new Paddle repo from Github,
# without the changes in our current Paddle repo that we
# want to build.
set(GOPATH "${CMAKE_CURRENT_BINARY_DIR}/go")
file(MAKE_DIRECTORY ${GOPATH})
set(PADDLE_IN_GOPATH "${GOPATH}/src/github.com/PaddlePaddle/Paddle")
add_custom_target(go_path)
add_custom_command(TARGET go_path
# Symlink Paddle directory into GOPATH
COMMAND mkdir -p ${PADDLE_IN_GOPATH}
COMMAND rm -rf ${PADDLE_IN_GOPATH}
COMMAND ln -sf ${CMAKE_SOURCE_DIR} ${PADDLE_IN_GOPATH}
# Automatically get all dependencies specified in the source code
# We can't run `go get -d ./...` for every target, because
# multiple `go get` can not run concurrently, but make need to be
# able to run with multiple jobs.
COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get -d ./go/...
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
)
endif(WITH_GOLANG)

@ -7,8 +7,17 @@ INCLUDE_DIRECTORIES(${EIGEN_SOURCE_DIR}/src/eigen3)
ExternalProject_Add(
eigen3
${EXTERNAL_PROJECT_LOG_ARGS}
URL "https://bitbucket.org/eigen/eigen/get/3.3.4.tar.gz"
URL_MD5 "1a47e78efe365a97de0c022d127607c3"
# for latest version, please get from official website
# URL "https://bitbucket.org/eigen/eigen/get/3.3.4.tar.gz"
# URL_MD5 "1a47e78efe365a97de0c022d127607c3"
# for no-ssl http support, please get from bazel's mirror
# URL "http://mirror.bazel.build/bitbucket.org/eigen/eigen/get/f3a22f35b044.tar.gz"
# URL_MD5 "4645c66075982da6fa0bcf6b20f3e8f7"
# get from github mirror
GIT_REPOSITORY "https://github.com/RLovelett/eigen.git"
GIT_TAG "a46d2e7337c4656f00abe54a8115f6d76153a048"
PREFIX ${EIGEN_SOURCE_DIR}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""

@ -13,6 +13,10 @@
# limitations under the License.
INCLUDE(ExternalProject)
# Always invoke `FIND_PACKAGE(Protobuf)` for importing function protobuf_generate_cpp
FIND_PACKAGE(Protobuf QUIET)
SET(PROTOBUF_FOUND "OFF")
# Print and set the protobuf library information,
# finish this cmake process and exit from this file.
@ -39,12 +43,19 @@ macro(PROMPT_PROTOBUF_LIB)
ADD_LIBRARY(protobuf_lite ${protobuf_LIBTYPE} IMPORTED GLOBAL)
SET_PROPERTY(TARGET protobuf_lite PROPERTY IMPORTED_LOCATION ${PROTOBUF_LITE_LIBRARY})
ADD_LIBRARY(protoc ${protobuf_LIBTYPE} IMPORTED GLOBAL)
SET_PROPERTY(TARGET protoc PROPERTY IMPORTED_LOCATION ${PROTOC_LIBRARY})
ADD_LIBRARY(libprotoc ${protobuf_LIBTYPE} IMPORTED GLOBAL)
SET_PROPERTY(TARGET libprotoc PROPERTY IMPORTED_LOCATION ${PROTOC_LIBRARY})
ADD_EXECUTABLE(protoc IMPORTED GLOBAL)
SET_PROPERTY(TARGET protoc PROPERTY IMPORTED_LOCATION ${PROTOBUF_PROTOC_EXECUTABLE})
# FIND_Protobuf.cmake uses `Protobuf_PROTOC_EXECUTABLE`.
# make `protobuf_generate_cpp` happy.
SET(Protobuf_PROTOC_EXECUTABLE ${PROTOBUF_PROTOC_EXECUTABLE})
FOREACH(dep ${protobuf_DEPS})
ADD_DEPENDENCIES(protobuf ${dep})
ADD_DEPENDENCIES(protobuf_lite ${dep})
ADD_DEPENDENCIES(libprotoc ${dep})
ADD_DEPENDENCIES(protoc ${dep})
ENDFOREACH()
@ -133,18 +144,7 @@ FUNCTION(build_protobuf TARGET_NAME BUILD_FOR_HOST)
ENDFUNCTION()
SET(PROTOBUF_VERSION 3.1)
IF(NOT CMAKE_CROSSCOMPILING)
FIND_PACKAGE(Protobuf ${PROTOBUF_VERSION})
IF(PROTOBUF_FOUND)
SET_PROTOBUF_VERSION()
IF("${PROTOBUF_VERSION}" VERSION_LESS "3.1.0")
SET(PROTOBUF_FOUND OFF)
ELSE()
PROMPT_PROTOBUF_LIB()
ENDIF()
ENDIF(PROTOBUF_FOUND)
ELSE()
IF(CMAKE_CROSSCOMPILING)
build_protobuf(protobuf_host TRUE)
LIST(APPEND external_project_dependencies protobuf_host)

@ -32,193 +32,6 @@ IF(PYTHONINTERP_FOUND)
MESSAGE(FATAL_ERROR "Found Python Protobuf ${PY_GOOGLE.PROTOBUF_VERSION} < 3.0.0, "
"please use pip to upgrade protobuf. pip install -U protobuf")
ENDIF()
ELSE(PYTHONINTERP_FOUND)
MESSAGE(FATAL_ERROR "Please install python 2.7 before building PaddlePaddle.")
##################################### PYTHON ########################################
SET(PYTHON_SOURCES_DIR ${THIRD_PARTY_PATH}/python)
SET(PYTHON_INSTALL_DIR ${THIRD_PARTY_PATH}/install/python)
SET(_python_DIR ${PYTHON_INSTALL_DIR})
IF(UNIX)
SET(PYTHON_FOUND ON)
SET(PYTHON_INCLUDE_DIR "${PYTHON_INSTALL_DIR}/include/python2.7" CACHE PATH "Python include dir" FORCE)
SET(PYTHON_LIBRARIES "${PYTHON_INSTALL_DIR}/lib/libpython2.7.a" CACHE FILEPATH "Python library" FORCE)
SET(PYTHON_EXECUTABLE ${PYTHON_INSTALL_DIR}/bin/python CACHE FILEPATH "Python executable" FORCE)
SET(PY_SITE_PACKAGES_PATH "${PYTHON_INSTALL_DIR}/lib/python2.7/site-packages" CACHE PATH "Python site-packages path" FORCE)
ELSEIF(WIN32)
SET(PYTHON_FOUND ON)
SET(PYTHON_INCLUDE_DIR "${PYTHON_INSTALL_DIR}/include" CACHE PATH "Python include dir" FORCE)
SET(PYTHON_LIBRARIES "${PYTHON_INSTALL_DIR}/libs/python27.lib" CACHE FILEPATH "Python library" FORCE)
SET(PYTHON_EXECUTABLE "${PYTHON_INSTALL_DIR}/bin/python.exe" CACHE FILEPATH "Python executable" FORCE)
SET(PY_SITE_PACKAGES_PATH "${PYTHON_INSTALL_DIR}/Lib/site-packages" CACHE PATH "Python site-packages path" FORCE)
ELSE()
MESSAGE(FATAL_ERROR "Unknown system !")
ENDIF()
IF(APPLE)
LIST(APPEND EXTERNAL_PROJECT_OPTIONAL_CMAKE_ARGS
-DCMAKE_BUILD_WITH_INSTALL_RPATH:BOOL=ON
)
ENDIF()
SET(EXTERNAL_PROJECT_OPTIONAL_CMAKE_CACHE_ARGS)
# Force Python build to "Release".
IF(CMAKE_CONFIGURATION_TYPES)
SET(SAVED_CMAKE_CFG_INTDIR ${CMAKE_CFG_INTDIR})
SET(CMAKE_CFG_INTDIR "Release")
ELSE()
LIST(APPEND EXTERNAL_PROJECT_OPTIONAL_CMAKE_CACHE_ARGS
-DCMAKE_BUILD_TYPE:STRING=Release
)
ENDIF()
ExternalProject_Add(python
${EXTERNAL_PROJECT_LOG_ARGS}
GIT_REPOSITORY "https://github.com/python-cmake-buildsystem/python-cmake-buildsystem.git"
PREFIX ${PYTHON_SOURCES_DIR}
UPDATE_COMMAND ""
CMAKE_ARGS -DPYTHON_VERSION=2.7.12
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
CMAKE_ARGS -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
CMAKE_CACHE_ARGS
-DCMAKE_INSTALL_PREFIX:PATH=${PYTHON_INSTALL_DIR}
-DBUILD_LIBPYTHON_SHARED:BOOL=OFF
-DUSE_SYSTEM_LIBRARIES:BOOL=OFF
-DZLIB_ROOT:FILEPATH=${ZLIB_ROOT}
-DZLIB_INCLUDE_DIR:PATH=${ZLIB_INCLUDE_DIR}
-DZLIB_LIBRARY:FILEPATH=${ZLIB_LIBRARIES}
-DDOWNLOAD_SOURCES:BOOL=ON
-DINSTALL_WINDOWS_TRADITIONAL:BOOL=OFF
${EXTERNAL_PROJECT_OPTIONAL_CMAKE_CACHE_ARGS}
${EXTERNAL_PROJECT_OPTIONAL_CMAKE_ARGS}
DEPENDS zlib
)
SET(py_env
PATH=${PYTHON_INSTALL_DIR}/bin
PYTHONHOME=${PYTHON_INSTALL_DIR}
PYTHONPATH=${PYTHON_INSTALL_DIR}/lib:${PYTHON_INSTALL_DIR}/lib/python2.7:${PY_SITE_PACKAGES_PATH})
####################################################################################
##################################### SETUPTOOLS ###################################
SET(SETUPTOOLS_SOURCES_DIR ${PYTHON_SOURCES_DIR}/setuptools)
ExternalProject_Add(setuptools
${EXTERNAL_PROJECT_LOG_ARGS}
PREFIX ${SETUPTOOLS_SOURCES_DIR}
URL "https://pypi.python.org/packages/source/s/setuptools/setuptools-18.3.2.tar.gz"
BUILD_IN_SOURCE 1
PATCH_COMMAND ""
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
INSTALL_COMMAND ""
BUILD_COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py install
DEPENDS python zlib
)
#####################################################################################
##################################### SIX ###########################################
SET(SIX_SOURCES_DIR ${PYTHON_SOURCES_DIR}/six)
ExternalProject_Add(six
${EXTERNAL_PROJECT_LOG_ARGS}
PREFIX ${SIX_SOURCES_DIR}
URL https://pypi.python.org/packages/source/s/six/six-1.10.0.tar.gz
BUILD_IN_SOURCE 1
PATCH_COMMAND ""
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
INSTALL_COMMAND ""
BUILD_COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py install
DEPENDS python setuptools
)
#####################################################################################
##################################### CYTHON ########################################
SET(CYTHON_SOURCES_DIR ${PYTHON_SOURCES_DIR}/cython)
ExternalProject_Add(cython
${EXTERNAL_PROJECT_LOG_ARGS}
PREFIX ${CYTHON_SOURCES_DIR}
URL https://github.com/cython/cython/archive/0.25.2.tar.gz
GIT_TAG 0.25.2
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND ""
PATCH_COMMAND ""
UPDATE_COMMAND ""
INSTALL_COMMAND ""
BUILD_COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py install
DEPENDS python
)
####################################################################################
##################################### NUMPY ########################################
SET(NUMPY_SOURCES_DIR ${PYTHON_SOURCES_DIR}/numpy)
SET(NUMPY_TAG_VERSION "v1.11.3")
SET(NUMPY_VERSION "1.11.3")
SET(EGG_NAME "")
SET(PYTHON_NUMPY_INCLUDE_DIR "")
IF(WIN32)
SET(EGG_NAME "numpy-${NUMPY_VERSION}-py2.7-${HOST_SYSTEM}.egg")
ELSE(WIN32)
IF(APPLE)
SET(EGG_NAME "numpy-${NUMPY_VERSION}-py2.7-${HOST_SYSTEM}-${MACOS_VERSION}")
ELSE(APPLE)
SET(EGG_NAME "numpy-${NUMPY_VERSION}-py2.7-linux")
SET(EGG_NAME "numpy-${NUMPY_VERSION}-py2.7-linux")
ENDIF(APPLE)
FOREACH(suffix x86_64 intel fat64 fat32 universal)
LIST(APPEND PYTHON_NUMPY_INCLUDE_DIR ${PY_SITE_PACKAGES_PATH}/${EGG_NAME}-${suffix}.egg/numpy/core/include)
ENDFOREACH()
ENDIF(WIN32)
ExternalProject_Add(numpy
${EXTERNAL_PROJECT_LOG_ARGS}
GIT_REPOSITORY https://github.com/numpy/numpy.git
GIT_TAG ${NUMPY_TAG_VERSION}
CONFIGURE_COMMAND ""
UPDATE_COMMAND ""
PREFIX ${NUMPY_SOURCES_DIR}
BUILD_COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py build
INSTALL_COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py install
BUILD_IN_SOURCE 1
DEPENDS python setuptools cython
)
####################################################################################
##################################### WHEEL ########################################
SET(WHEEL_SOURCES_DIR ${PYTHON_SOURCES_DIR}/wheel)
ExternalProject_Add(wheel
${EXTERNAL_PROJECT_LOG_ARGS}
URL https://pypi.python.org/packages/source/w/wheel/wheel-0.29.0.tar.gz
PREFIX ${WHEEL_SOURCES_DIR}
CONFIGURE_COMMAND ""
UPDATE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py install
BUILD_IN_SOURCE 1
DEPENDS python setuptools
)
####################################################################################
################################### PROTOBUF #######################################
SET(PY_PROTOBUF_SOURCES_DIR ${PYTHON_SOURCES_DIR}/protobuf)
ExternalProject_Add(python-protobuf
${EXTERNAL_PROJECT_LOG_ARGS}
URL https://pypi.python.org/packages/e0/b0/0a1b364fe8a7d177b4b7d4dca5b798500dc57a7273b93cca73931b305a6a/protobuf-3.1.0.post1.tar.gz
URL_MD5 38b5fb160c768d2f8444d0c6d637ff91
PREFIX ${PY_PROTOBUF_SOURCES_DIR}
BUILD_IN_SOURCE 1
PATCH_COMMAND ""
CONFIGURE_COMMAND ""
BUILD_COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py build
INSTALL_COMMAND env ${py_env} ${PYTHON_EXECUTABLE} setup.py install
DEPENDS python setuptools six
)
####################################################################################
LIST(APPEND external_project_dependencies python setuptools six cython wheel python-protobuf numpy)
ENDIF(PYTHONINTERP_FOUND)
IF(WITH_PYTHON)

@ -87,6 +87,9 @@
# go_library(example SHARED)
#
# including binary directory for generated headers.
include_directories(${CMAKE_BINARY_DIR})
if(NOT APPLE)
find_package(Threads REQUIRED)
link_libraries(${CMAKE_THREAD_LIBS_INIT})
@ -98,23 +101,16 @@ function(merge_static_libs TARGET_NAME)
# First get the file names of the libraries to be merged
foreach(lib ${libs})
get_target_property(libtype ${lib} TYPE)
if(NOT libtype STREQUAL "STATIC_LIBRARY")
message(FATAL_ERROR "merge_static_libs can only process static libraries")
endif()
set(libfiles ${libfiles} $<TARGET_FILE:${lib}>)
endforeach()
if(APPLE) # Use OSX's libtool to merge archives
add_custom_target(${TARGET_NAME}_archive
COMMAND libtool -static -o "${CMAKE_CURRENT_BINARY_DIR}/lib${TARGET_NAME}.a" ${libfiles}
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}
DEPENDS ${libs}
)
add_library(${TARGET_NAME} STATIC IMPORTED GLOBAL)
set_property(TARGET ${TARGET_NAME} PROPERTY
IMPORTED_LOCATION "${CMAKE_CURRENT_BINARY_DIR}/lib${TARGET_NAME}.a")
add_dependencies(${TARGET_NAME} ${TARGET_NAME}_archive)
set(dummyfile ${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME}_dummy.c)
file(WRITE ${dummyfile} "const char * dummy = \"${dummyfile}\";")
add_library(${TARGET_NAME} STATIC ${dummyfile})
add_custom_command(TARGET ${TARGET_NAME} POST_BUILD
COMMAND rm "${CMAKE_CURRENT_BINARY_DIR}/lib${TARGET_NAME}.a"
COMMAND /usr/bin/libtool -static -o "${CMAKE_CURRENT_BINARY_DIR}/lib${TARGET_NAME}.a" ${libfiles})
else() # general UNIX: use "ar" to extract objects and re-add to a common lib
foreach(lib ${libs})
set(objlistfile ${lib}.objlist) # list of objects in the input library
@ -143,9 +139,9 @@ function(merge_static_libs TARGET_NAME)
set(outlibfile "$<TARGET_FILE:${TARGET_NAME}>")
foreach(lib ${libs})
add_custom_command(TARGET ${TARGET_NAME} POST_BUILD
COMMAND ${CMAKE_AR} ru ${outlibfile} @"../${objlistfile}"
WORKING_DIRECTORY ${objdir})
add_custom_command(TARGET ${TARGET_NAME} POST_BUILD
COMMAND ${CMAKE_AR} ru ${outlibfile} @"../${lib}.objlist"
WORKING_DIRECTORY ${lib}.objdir)
endforeach()
add_custom_command(TARGET ${TARGET_NAME} POST_BUILD
@ -253,10 +249,6 @@ function(nv_test TARGET_NAME)
endif()
endfunction(nv_test)
set(GOPATH "${CMAKE_CURRENT_BINARY_DIR}/go")
file(MAKE_DIRECTORY ${GOPATH})
set(PADDLE_IN_GOPATH "${GOPATH}/src/github.com/PaddlePaddle/Paddle")
function(go_library TARGET_NAME)
set(options STATIC static SHARED shared)
set(oneValueArgs "")
@ -265,10 +257,10 @@ function(go_library TARGET_NAME)
if (go_library_SHARED OR go_library_shared)
set(BUILD_MODE "-buildmode=c-shared")
set(LIB_NAME "${CMAKE_SHARED_LIBRARY_PREFIX}${TARGET_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}")
set(${TARGET_NAME}_LIB_NAME "${CMAKE_SHARED_LIBRARY_PREFIX}${TARGET_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX}" CACHE STRING "output library name for target ${TARGET_NAME}")
else()
set(BUILD_MODE "-buildmode=c-archive")
set(LIB_NAME "${CMAKE_STATIC_LIBRARY_PREFIX}${TARGET_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX}")
set(${TARGET_NAME}_LIB_NAME "${CMAKE_STATIC_LIBRARY_PREFIX}${TARGET_NAME}${CMAKE_STATIC_LIBRARY_SUFFIX}" CACHE STRING "output library name for target ${TARGET_NAME}")
endif()
# Add dummy code to support `make target_name` under Terminal Command
@ -283,25 +275,17 @@ function(go_library TARGET_NAME)
add_dependencies(${TARGET_NAME} ${go_library_DEPS})
endif(go_library_DEPS)
# we need to symlink Paddle directory into GOPATH. If we
# don't do it and we have code that depends on Paddle, go
# get ./... will download a new Paddle repo from Github,
# without the changes in our current Paddle repo that we
# want to build.
set(${TARGET_NAME}_LIB_PATH "${CMAKE_CURRENT_BINARY_DIR}/${${TARGET_NAME}_LIB_NAME}" CACHE STRING "output library path for target ${TARGET_NAME}")
file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go")
add_custom_command(TARGET ${TARGET_NAME} POST_BUILD
COMMAND rm "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}"
# Symlink Paddle directory into GOPATH
COMMAND mkdir -p ${PADDLE_IN_GOPATH}
COMMAND rm -rf ${PADDLE_IN_GOPATH}
COMMAND ln -sf ${CMAKE_SOURCE_DIR} ${PADDLE_IN_GOPATH}
# Automatically get all dependencies specified in the source code
COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get -d ./...
COMMAND rm "${${TARGET_NAME}_LIB_PATH}"
# Golang build source code
COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE}
-o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}"
-o "${${TARGET_NAME}_LIB_PATH}"
${GO_SOURCE}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
add_dependencies(${TARGET_NAME} go_path)
endfunction(go_library)
function(go_binary TARGET_NAME)
@ -331,3 +315,13 @@ function(go_test TARGET_NAME)
add_custom_target(${TARGET_NAME} ALL DEPENDS ${TARGET_NAME}_timestamp ${go_test_DEPS})
add_test(${TARGET_NAME} ${CMAKE_CURRENT_BINARY_DIR}/${TARGET_NAME})
endfunction(go_test)
function(proto_library TARGET_NAME)
set(oneValueArgs "")
set(multiValueArgs SRCS)
cmake_parse_arguments(proto_library "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
set(proto_srcs)
set(proto_hdrs)
protobuf_generate_cpp(proto_srcs proto_hdrs ${proto_library_SRCS})
cc_library(${TARGET_NAME} SRCS ${proto_srcs} DEPS protobuf)
endfunction()

@ -27,10 +27,6 @@ sphinx_add_target(paddle_docs
${CMAKE_CURRENT_SOURCE_DIR}
${SPHINX_HTML_DIR_EN})
add_dependencies(paddle_docs
gen_proto_py)
# configured documentation tools and intermediate build results
set(BINARY_BUILD_DIR_CN "${CMAKE_CURRENT_BINARY_DIR}/cn/_build")
@ -51,6 +47,3 @@ sphinx_add_target(paddle_docs_cn
${SPHINX_CACHE_DIR_CN}
${CMAKE_CURRENT_SOURCE_DIR}
${SPHINX_HTML_DIR_CN})
add_dependencies(paddle_docs_cn
gen_proto_py)

@ -105,3 +105,48 @@ shared_library(api
### Implementation
As above example CMakeLists.txt executes, each function invocation adds "nodes" to a dependency graph. It also use this graph to generate CMake commands including `add_executable`, `add_dependencies`, `target_link_libraries`, and `add_test`.
### Using Package Manager For Go
Building Go binaries and libraries need to satisfy their dependencies, generally
we can do `go get ./...` to download and compile all external dependencies. The
problems are:
1. `go get` will always get the latest code from the default branch of the
remote repo, so changes of dependents might break the build. This is very
different with what we already have in `cmake/external` which download a
specific version or commit id of the dependency.
1. Some locations can not access external dependencies through the internet, as mentioned
in https://github.com/PaddlePaddle/Paddle/issues/2605. Using package management
tools can package the dependencies as a "vendor" package, which can be mirrored
at many cloud file hosting, so users what to compile paddle by themselves can
download this "vendor" package from a mirror site.
#### Choose A Suitable Tool
As mentioned by @wangkuiyi, [Here](https://github.com/golang/go/wiki/PackageManagementTools)
list dozens of Go package managers. We choose the tool using following principles:
- Most "active" projects with more stars, more pull requests or commits
- Widely used project
After comparing all these projects, we shall choose between the most popular
tools: Godep and Glide.
Here's a brief comparison between Godep and Glide
: https://github.com/Masterminds/glide/wiki/Go-Package-Manager-Comparison. There are
also many complaints about using `Godep`. There's also a new "official" pakcage
management tool has been started at: https://github.com/golang/dep to resolve
such problems, but it's currently at Alpha stage. So the best choice now is
glide obviously.
#### Manage Go Packages
- Dependencies: `go/glide.yaml` will store the dependencies and their versions which
is directly imported by paddle. `go/glide.lock` will store all dependencies recursively
with their commit id. Builds will "lock" to these packages if we don't `glide up`
them
- Vendor package: `go/vendor` directory will generated when running `cmake` command. `cmake`
will download the code corresponding to `go/glide.lock`. If we put a vendor folder
under `go/`, cmake will just check the commit id to the packages under the folder,
if commit id matches, there will be no download at all.

@ -0,0 +1,110 @@
# Design Doc: Save Model
## Overview
The model is the output of the training process. There are two
ways from which user can obtain a model:
- Save model triggered by user code: user code asks PaddlePaddle to
save a model.
- Convert model from the checkpoint: model being converted from
pservers' periodic checkpoint. In this way, the user can cancel a
job at any time, and still have a relatively fresh model (we
checkpoint around every 5 minutes).
### Trainer Saving Model vs. Pservers Saving Model
Both trainers and pservers have access to the model. So the model can
be saved from a trainer or pservers. We need to decide where the model
is saved from.
#### Dense Update vs. Sparse Update
There are two types of model update methods: dense update and sparse
update (when the model parameter is configured to be sparse).
- Dense update
Every trainer has it's own full copy of the model. Every model
update will update the entire model.
- Sparse update
The training input is sparse, and the trainer does not have the
entire model. It will only download the sub-model necessary related
to the input. When updating the model, only the sub-model related to
the training input is updated.
#### Pservers Saving Model
The benefit of letting pservers save model is they have the entire
model all the time. However, since pservers are on different nodes, it
requires a merging process to merge model shards into the same
model. Thus requires the pservers to write models to a distributed
filesystem, making the checkpoint shards visible to the merge program.
#### Trainer Saving Model
The benefit of letting one trainer to save the model is it does not
require a distributed filesystem. And it's reusing the same save model
logic when training locally - except when doing sparse update, the
trainer needs to download the entire model during the saving process.
#### Conclusion
Given trainer saving model does not require a distributed filesystem,
and is an intuitive extension to trainer saving model when training
locally, we decide to let the trainer save the model when doing
distributed training.
### Convert Model from Checkpoint
TODO
## Timeline
We first implement trainer save the model. Converting the latest
snapshot to a model will be a TODO for future.
## Trainer Save Model
### Trainer Election
One trainer will be elected as the one to save the model. When using
etcd, trainer ID is a randomly generated UUID, we will utilize etcd to
elect one trainer. When not using etcd, unique trainer IDs will be
given by the administrator, the trainer whose ID is "0" is elected to
save the model.
### Model Save Path
Each trainer will be given the directory to save the model. The
elected trainer will save the model to
`given-directory/trainerID`. Since the trainer ID is unique, this
would prevent concurrent save to the same file when multiple trainers
are elected to save the model when split-brain problem happens.
### What Happens When Model Is Saving
It takes some time to save model, we need to define what will happen
when save model is taking place.
When doing dense update, the trainer uses the local model. Pservers
does not need to pause model update.
When doing sparse update. The trainer needs to download the entire
model while saving. To get the most accurate model, the model update
needs to be paused before the download starts and resumed after the
download finishes. Otherwise, the trainer gets a model that is
"polluted": some part of the model is old, some part of the model is
new.
It's unclear that the "polluted" model will be inferior due to the
stochastic nature of deep learning, and pausing the model update will
add more complexity to the system. Since supporting sparse update is a
TODO item. We defer the evaluation of pause the model update or not
during saving model to the future.

@ -41,7 +41,7 @@ class Scope {
const Variable* GetVariable(const std::string& name) const;
private:
std::unordered_map<std::string, std::unique_ptr<Vairable>> vars_;
std::unordered_map<std::string, std::unique_ptr<Variable>> vars_;
};
```
@ -59,9 +59,9 @@ class Scope {
Scope(const std::shared_ptr<Scope>& scope): parent_(scope) {}
Variable* GetVariable(const std::string& name) const {
Variable* var = GetVarLocally(name);
if (var != nullptr) {
return var;
auto it = vars_.find(name);
if (it != vars_.end()) {
return it->second.get();
} else if (parent_ != nullptr) {
return parent_->GetVariable(name);
} else {
@ -97,8 +97,8 @@ class Scope {
// return nullptr if not found.
Variable* GetVariable(const std::string& name) const;
// return Error if already contains same name variable.
Error CreateVariable(const std::string& name);
// return if already contains same name variable.
Variable* CreateVariable(const std::string& name);
private:
std::shared_ptr<Scope> parent_;

@ -31,7 +31,7 @@ def event_handler(event):
# define training dataset reader
def train_reader():
train_x = np.array([[1, 1], [1, 2], [3, 4], [5, 2]])
train_y = np.array([-2, -3, -7, -7])
train_y = np.array([[-2], [-3], [-7], [-7]])
def reader():
for i in xrange(train_y.shape[0]):

@ -30,7 +30,13 @@ func main() {
log.SetLevel(level)
timeout := time.Second * time.Duration((*etcdTimeout))
s, err := pserver.NewService(*etcdEndpoint, *numPservers, timeout)
e := pserver.NewEtcdClient(*etcdEndpoint, *numPservers, timeout)
idx, err := e.Register()
if err != nil {
panic(err)
}
s, err := pserver.NewService(idx)
if err != nil {
panic(err)
}

@ -1,21 +1,3 @@
cmake_minimum_required(VERSION 3.0)
get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY)
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PARENT_DIR}/cmake")
project(cxx_go C Go)
#include(golang)
include(flags)
set(MASTER_LIB_NAME "paddle_master")
go_library(${MASTER_LIB_NAME} SHARED)
if(PROJ_ROOT)
add_custom_command(OUTPUT ${PROJ_ROOT}/python/paddle/v2/master/lib${MASTER_LIB_NAME}.so
COMMAND rm ${CMAKE_CURRENT_BINARY_DIR}/lib${MASTER_LIB_NAME}.h
COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/lib${MASTER_LIB_NAME}.so ${PROJ_ROOT}/python/paddle/v2/master/
DEPENDS ${MASTER_LIB_NAME})
add_custom_target(paddle_master_shared ALL DEPENDS ${PROJ_ROOT}/python/paddle/v2/master/lib${MASTER_LIB_NAME}.so)
endif(PROJ_ROOT)
go_library(paddle_master SHARED)

@ -13,10 +13,13 @@ typedef int paddle_master_client;
import "C"
import (
"strings"
"sync"
"time"
"unsafe"
"github.com/PaddlePaddle/Paddle/go/master"
"github.com/coreos/etcd/clientv3"
log "github.com/sirupsen/logrus"
)
@ -48,16 +51,33 @@ func remove(client C.paddle_master_client) *master.Client {
return h
}
type addresser string
func (a addresser) Address() string {
return string(a)
//export paddle_new_etcd_master_client
func paddle_new_etcd_master_client(etcdEndpoints *C.char, timeout int, bufSize int) C.paddle_master_client {
p := C.GoString(etcdEndpoints)
cli, err := clientv3.New(clientv3.Config{
Endpoints: strings.Split(p, ","),
DialTimeout: time.Second * time.Duration(timeout),
})
if err != nil {
panic(err)
}
ch := make(chan string, 1)
a, err := master.GetKey(cli, master.DefaultAddrPath, timeout)
if err != nil {
panic(err)
}
ch <- a
go master.WatchKey(cli, master.DefaultAddrPath, ch)
c := master.NewClient(ch, bufSize)
return add(c)
}
//export paddle_new_master_client
func paddle_new_master_client(addr *C.char, bufSize int) C.paddle_master_client {
a := C.GoString(addr)
c := master.NewClient(addresser(a), bufSize)
ch := make(chan string, 1)
ch <- a
c := master.NewClient(ch, bufSize)
return add(c)
}

@ -2,18 +2,12 @@ package master
import (
"os"
"time"
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/recordio"
log "github.com/sirupsen/logrus"
)
// Addresser provide the address of the master server.
type Addresser interface {
Address() string
}
// Client is the client of the master server.
type Client struct {
conn *connection.Conn
@ -29,11 +23,11 @@ type record struct {
//
// bufSize is the record buffer size. NextRecord will read from this
// buffer.
func NewClient(addr Addresser, bufSize int) *Client {
func NewClient(addrCh <-chan string, bufSize int) *Client {
c := &Client{}
c.conn = connection.New()
c.ch = make(chan record, bufSize)
go c.monitorMaster(addr)
go c.monitorMaster(addrCh)
go c.getRecords()
return c
}
@ -78,12 +72,10 @@ func (c *Client) getRecords() {
}
}
func (c *Client) monitorMaster(addr Addresser) {
func (c *Client) monitorMaster(addrCh <-chan string) {
lastMaster := ""
monitor := func() {
// get the lastest address of the master server,
for curMaster := range addrCh {
// connect to the new address once address changed.
curMaster := addr.Address()
if curMaster != lastMaster {
if curMaster == "" {
err := c.conn.Close()
@ -100,18 +92,10 @@ func (c *Client) monitorMaster(addr Addresser) {
// to retry next time.
curMaster = lastMaster
}
}
}
lastMaster = curMaster
}
monitor()
ticker := time.NewTicker(10 * time.Second)
for _ = range ticker.C {
monitor()
}
}
// SetDataset set dataset for the master server to dispatch.

@ -26,12 +26,6 @@ func init() {
log.SetLevel(log.ErrorLevel)
}
type TestAddresser string
func (a TestAddresser) Address() string {
return string(a)
}
func TestGetFinishTask(t *testing.T) {
const path = "/tmp/master_client_test_0"
@ -45,7 +39,6 @@ func TestGetFinishTask(t *testing.T) {
if err != nil {
panic(err)
}
go func(l net.Listener) {
s, err := NewService(&InMemStore{}, chunkPerTask, time.Second, 1)
if err != nil {
@ -82,9 +75,11 @@ func TestGetFinishTask(t *testing.T) {
// Manually intialize client to avoid calling c.getRecords()
c := &Client{}
c.conn = connection.New()
go c.monitorMaster(TestAddresser(fmt.Sprintf(":%d", p)))
addr := fmt.Sprintf(":%d", p)
ch := make(chan string, 1)
ch <- addr
go c.monitorMaster(ch)
c.SetDataset([]string{path})
checkOnePass := func(i int) {
var tasks []Task
for idx := 0; idx < totalTask; idx++ {

@ -20,7 +20,6 @@ func TestNextRecord(t *testing.T) {
path = "/tmp/master_client_TestFull"
total = 50
)
l, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
@ -31,7 +30,6 @@ func TestNextRecord(t *testing.T) {
if err != nil {
panic(err)
}
go func(l net.Listener) {
s, err := master.NewService(&master.InMemStore{}, 10, time.Second, 1)
if err != nil {
@ -63,10 +61,10 @@ func TestNextRecord(t *testing.T) {
}
w.Close()
f.Close()
c := master.NewClient(master.TestAddresser(fmt.Sprintf(":%d", p)), 10)
curAddr := make(chan string, 1)
curAddr <- fmt.Sprintf(":%d", p)
c := master.NewClient(curAddr, 10)
c.SetDataset([]string{path})
for pass := 0; pass < 50; pass++ {
received := make(map[byte]bool)
for i := 0; i < total; i++ {

@ -18,8 +18,8 @@ const (
DefaultAddrPath = "/master/addr"
)
// EtcdClient is the etcd client that master uses for fault tolerance
// and service registry.
// EtcdClient is the etcd client that the master uses for fault
// tolerance and service registry.
type EtcdClient struct {
lockPath string
statePath string
@ -142,3 +142,31 @@ func (e *EtcdClient) Load() ([]byte, error) {
state := kvs[0].Value
return state, nil
}
// GetKey gets the value by the specify key.
func GetKey(c *clientv3.Client, key string, timeout int) (string, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(timeout))
resp, err := c.Get(ctx, key)
cancel()
if err != nil {
return "", err
}
kvs := resp.Kvs
if len(kvs) == 0 {
return "", nil
}
v := kvs[0].Value
return string(v), nil
}
// WatchKey watches the specify key and send to valChan if there is some event.
func WatchKey(c *clientv3.Client, key string, valChan chan<- string) {
rch := c.Watch(context.Background(), key)
for wresp := range rch {
for _, ev := range wresp.Events {
// if received event is DELETE, the value will be an empty string
log.Infof("received event %s, %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
valChan <- string(ev.Kv.Value)
}
}
}

@ -1,3 +1,3 @@
cc_library(main SRCS main.c DEPS paddle_pserver_cclient)
cc_binary(main SRCS main.c DEPS paddle_pserver_cclient)
cc_test(test_cclient SRCS test_cclient.c DEPS paddle_pserver_cclient)

@ -1,6 +1,7 @@
package pserver
import (
"errors"
"hash/fnv"
"sort"
"time"
@ -123,6 +124,9 @@ func (c *Client) FinishInitParams() error {
// SendGrads sends gradients to parameter servers for updating
// parameters.
func (c *Client) SendGrads(grads []Gradient) error {
if len(grads) == 0 {
return errors.New("no gradient received")
}
errCh := make(chan error, len(grads))
for _, g := range grads {
go func(g Gradient) {

@ -7,7 +7,6 @@ import (
"strconv"
"strings"
"testing"
"time"
"github.com/PaddlePaddle/Paddle/go/pserver"
)
@ -31,7 +30,7 @@ func init() {
port[i] = p
go func(l net.Listener) {
s, err := pserver.NewService("", time.Second*5)
s, err := pserver.NewService(0)
if err != nil {
panic(err)
}

@ -0,0 +1,181 @@
package pserver
import (
"context"
"errors"
"strconv"
"strings"
"time"
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus"
)
// EtcdClient is the etcd client that the pserver uses for fault
// tolerance, service registry and coordination.
type EtcdClient struct {
numPservers int
etcdEndpoints string
etcdClient *clientv3.Client
// etcdTimeout is also used as retry intervals.
etcdTimeout time.Duration
// FIXME: ensure GetExternalIP gets the correct ip for trainers to connect.
externalIP string
// desired number of pservers in the job.
// assume desired will not change during one training job.
desired int
}
// NewEtcdClient creates an EtcdClient
func NewEtcdClient(endpoints string, numPservers int, timeout time.Duration) *EtcdClient {
return &EtcdClient{
etcdTimeout: timeout,
numPservers: numPservers,
etcdEndpoints: endpoints,
}
}
// Register registers the pserver on etcd
//
// Register returns the index of the current pserver.
func (e *EtcdClient) Register() (int, error) {
var err error
e.externalIP, err = networkhelper.GetExternalIP()
if err != nil {
return 0, err
}
// initialize connection to etcd.
ep := strings.Split(e.etcdEndpoints, ",")
for {
cli, err := clientv3.New(clientv3.Config{
Endpoints: ep,
DialTimeout: e.etcdTimeout,
})
if err != nil {
log.Errorf("connect to etcd error: %v", err)
time.Sleep(e.etcdTimeout)
continue
}
e.etcdClient = cli
log.Debugf("inited client to %s", e.etcdEndpoints)
break
}
// init /ps_desired using transaction, for multiple pservers may want to write
// it at the same time.
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := e.initDesiredPsercers(ctx, e.numPservers)
cancel()
if err != nil {
log.Warn(err)
time.Sleep(e.etcdTimeout)
continue
}
break
}
// TODO: when implementing extending or reducing pservers, /ps_desired is
// changed, then we need to watch /ps_desired node for events. For now, just
// write once when init and read from it.
// wait and set s.desired init value
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := e.etcdClient.Get(ctx, PsDesired)
cancel()
if err != nil {
log.Errorf("getting %s error: %v", PsDesired, err)
time.Sleep(e.etcdTimeout)
continue
}
if len(resp.Kvs) != 0 {
e.desired, err = strconv.Atoi(string(resp.Kvs[0].Value))
if err != nil {
log.Errorf("value of %s invalid %v\n", PsDesired, err)
time.Sleep(e.etcdTimeout)
// NOTE: wait util ps_desired value change
continue
}
break
}
}
var pserverIdx int
// try register pserver node on etcd
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
var err error
pserverIdx, err = e.registerPserverEtcd(ctx)
cancel()
if err != nil {
log.Warn(err)
time.Sleep(e.etcdTimeout)
continue
}
break
}
return pserverIdx, nil
}
func (e *EtcdClient) initDesiredPsercers(ctx context.Context, numPservers int) (*clientv3.TxnResponse, error) {
return concurrency.NewSTM(e.etcdClient, func(c concurrency.STM) error {
dsStr := c.Get(PsDesired)
if dsStr == "" {
c.Put(PsDesired, strconv.Itoa(numPservers))
}
return nil
}, concurrency.WithAbortContext(ctx), concurrency.WithIsolation(concurrency.RepeatableReads))
}
// registerPserverEtcd registers pserver node on etcd using transaction.
func (e *EtcdClient) registerPserverEtcd(ctx context.Context) (int, error) {
var idx int
_, err := concurrency.NewSTM(e.etcdClient, func(c concurrency.STM) error {
registered := false
for i := 0; i < e.desired; i++ {
psKey := "/ps/" + strconv.Itoa(i)
log.Debugf("checking %s", psKey)
ps := c.Get(psKey)
log.Debugf("got value (%s) for key: %s", ps, psKey)
if ps == "" {
resp, err := e.etcdClient.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
// find the first id and write info
c.Put(psKey, e.externalIP, clientv3.WithLease(resp.ID))
log.Debugf("set pserver node %s with value %s", psKey, e.externalIP)
ch, kaerr := e.etcdClient.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
log.Errorf("keepalive etcd node error: %v", kaerr)
return kaerr
}
// Eat the keep alive message so etcd
// will not expire the lease.
go func(ch <-chan *clientv3.LeaseKeepAliveResponse) {
ka := <-ch
log.Debugf("keepalive: %d\n", ka.TTL)
}(ch)
log.Debug("register finished")
idx = i
registered = true
break
}
}
if registered == true {
return nil
}
return errors.New("not registerd, may due to already have enough pservers")
}, concurrency.WithAbortContext(ctx), concurrency.WithIsolation(concurrency.RepeatableReads))
if err != nil {
return 0, err
}
return idx, nil
}

@ -1,18 +1,9 @@
package pserver
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus"
)
// ElementType is the type of elements of a Parameter.
@ -55,160 +46,25 @@ type Gradient Parameter
// Service is the RPC service for pserver.
type Service struct {
initialized chan struct{}
idx int
mu sync.Mutex
opt *optimizer
paramMap map[string]Parameter
etcdEndpoints string
etcdClient *clientv3.Client
// etcdTimeout is also used as retry intervals.
etcdTimeout time.Duration
// desired number of pservers in the job.
// assume desired will not change during one training job.
desired int
// FIXME: ensure GetExternalIP gets the correct ip for trainers to connect.
externalIP string
}
// NewService creates a new service, will bypass etcd registration if no
// endpoints specified.
func NewService(endpoints string, numPservers int, timeout time.Duration) (*Service, error) {
s := &Service{opt: newOptimizer(sgd, 0.005)}
func NewService(idx int) (*Service, error) {
s := &Service{
idx: idx,
opt: newOptimizer(sgd, 0.005),
}
s.paramMap = make(map[string]Parameter)
s.initialized = make(chan struct{})
s.etcdEndpoints = endpoints
s.etcdTimeout = timeout
var err error
s.externalIP, err = networkhelper.GetExternalIP()
if err != nil {
return nil, err
}
if endpoints != "" {
// initialize connection to etcd, try
ep := strings.Split(s.etcdEndpoints, ",")
for {
cli, err := clientv3.New(clientv3.Config{
Endpoints: ep,
DialTimeout: s.etcdTimeout,
})
if err != nil {
log.Errorf("connect to etcd error: %v", err)
time.Sleep(s.etcdTimeout)
continue
}
s.etcdClient = cli
log.Debugf("inited client to %s", s.etcdEndpoints)
break
}
// init /ps_desired using transaction, for multiple pservers may want to write
// it at the same time.
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := s.initDesiredPsercers(ctx, numPservers)
cancel()
if err != nil {
log.Warn(err)
time.Sleep(s.etcdTimeout)
continue
}
break
}
// TODO: when implementing extending or reducing pservers, /ps_desired is
// changed, then we need to watch /ps_desired node for events. For now, just
// write once when init and read from it.
// wait and set s.desired init value
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := s.etcdClient.Get(ctx, PsDesired)
cancel()
if err != nil {
log.Errorf("getting %s error: %v", PsDesired, err)
time.Sleep(s.etcdTimeout)
continue
}
if len(resp.Kvs) != 0 {
s.desired, err = strconv.Atoi(string(resp.Kvs[0].Value))
if err != nil {
log.Errorf("value of %s invalid %v\n", PsDesired, err)
time.Sleep(s.etcdTimeout)
// NOTE: wait util ps_desired value change
continue
}
break
}
}
// try register pserver node on etcd
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := s.registerPserverEtcd(ctx)
cancel()
if err != nil {
log.Warn(err)
time.Sleep(s.etcdTimeout)
continue
}
break
}
} // if endpoints != ""
// Bypass etcd registration if no endpoints specified
return s, nil
}
func (s *Service) initDesiredPsercers(ctx context.Context, numPservers int) (*clientv3.TxnResponse, error) {
return concurrency.NewSTM(s.etcdClient, func(c concurrency.STM) error {
dsStr := c.Get(PsDesired)
if dsStr == "" {
c.Put(PsDesired, strconv.Itoa(numPservers))
}
return nil
}, concurrency.WithAbortContext(ctx), concurrency.WithIsolation(concurrency.RepeatableReads))
}
// registerPserverEtcd registers pserver node on etcd using transaction.
func (s *Service) registerPserverEtcd(ctx context.Context) (*clientv3.TxnResponse, error) {
return concurrency.NewSTM(s.etcdClient, func(c concurrency.STM) error {
registered := false
for i := 0; i < s.desired; i++ {
psKey := "/ps/" + strconv.Itoa(i)
log.Debugf("checking %s", psKey)
ps := c.Get(psKey)
log.Debugf("got value (%s) for key: %s", ps, psKey)
if ps == "" {
resp, err := s.etcdClient.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
// find the first id and write info
c.Put(psKey, s.externalIP, clientv3.WithLease(resp.ID))
log.Debugf("set pserver node %s with value %s", psKey, s.externalIP)
ch, kaerr := s.etcdClient.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
log.Errorf("keepalive etcd node error: %v", kaerr)
return kaerr
}
// Eat the keep alive message so etcd
// will not expire the lease.
go func(ch <-chan *clientv3.LeaseKeepAliveResponse) {
ka := <-ch
log.Debugf("keepalive: %d\n", ka.TTL)
}(ch)
log.Debug("register finished")
registered = true
break
}
}
if registered == true {
return nil
}
return errors.New("not registerd, may due to already have enough pservers")
}, concurrency.WithAbortContext(ctx), concurrency.WithIsolation(concurrency.RepeatableReads))
}
// InitParam initializes a parameter.
func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, dummy *int) error {
select {

@ -10,7 +10,7 @@ import (
)
func TestFull(t *testing.T) {
s, err := pserver.NewService("", time.Second*5)
s, err := pserver.NewService(0)
if err != nil {
t.Error(err)
}
@ -75,7 +75,7 @@ func TestFull(t *testing.T) {
}
func TestMultipleInit(t *testing.T) {
s, err := pserver.NewService("", time.Second*5)
s, err := pserver.NewService(0)
if err != nil {
t.Error(err)
}
@ -91,7 +91,7 @@ func TestMultipleInit(t *testing.T) {
}
func TestUninitialized(t *testing.T) {
s, err := pserver.NewService("", time.Second*5)
s, err := pserver.NewService(0)
err = s.SendGrad(pserver.Gradient{}, nil)
if err.Error() != pserver.Uninitialized {
t.FailNow()
@ -99,7 +99,7 @@ func TestUninitialized(t *testing.T) {
}
func TestBlockUntilInitialized(t *testing.T) {
s, err := pserver.NewService("", time.Second*5)
s, err := pserver.NewService(0)
if err != nil {
t.Error(err)
}

@ -9,17 +9,10 @@ add_subdirectory(pserver)
add_subdirectory(trainer)
add_subdirectory(scripts)
add_subdirectory(optimizer)
add_subdirectory(strings)
# Do not build go directory until go cmake is working smoothly.
# if(CMAKE_Go_COMPILER)
# add_subdirectory(go)
# endif()
find_package(Boost QUIET)
add_subdirectory(string)
if(Boost_FOUND)
include_directories(${Boost_INCLUDE_DIRS})
add_subdirectory(memory)
add_subdirectory(platform)
add_subdirectory(framework)
endif()

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save