From 99dc60642d6b94a5ccc92be21917cfa866d6e7f8 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 7 Jun 2017 23:42:11 +0800 Subject: [PATCH 01/12] new parameterupdater use paddle pserver cclient of go --- CMakeLists.txt | 1 + .../cluster_train/remote_parameter_updater.md | 21 ++++ go/cmake/golang.cmake | 8 +- go/pserver/cclient/CMakeLists.txt | 12 +- go/pserver/cclient/test/CMakeLists.txt | 13 ++- go/pserver/cclient/test/main.c | 19 ++-- go/pserver/cclient/test/test_train.py | 60 ++++++++++ paddle/api/CMakeLists.txt | 4 +- paddle/api/Paddle.i | 1 + paddle/api/PaddleAPI.h | 2 + paddle/api/ParameterUpdater.cpp | 9 ++ paddle/trainer/CMakeLists.txt | 11 +- paddle/trainer/NewRemoteParameterUpdater.cpp | 88 +++++++++++++++ paddle/trainer/NewRemoteParameterUpdater.h | 105 ++++++++++++++++++ python/paddle/v2/optimizer.py | 15 ++- python/paddle/v2/trainer.py | 7 +- 16 files changed, 352 insertions(+), 24 deletions(-) create mode 100644 doc/design/cluster_train/remote_parameter_updater.md create mode 100644 go/pserver/cclient/test/test_train.py create mode 100644 paddle/trainer/NewRemoteParameterUpdater.cpp create mode 100644 paddle/trainer/NewRemoteParameterUpdater.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 79210d0436..c2218be5ef 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -127,6 +127,7 @@ endif(WITH_GPU) add_subdirectory(proto) add_subdirectory(paddle) add_subdirectory(python) +add_subdirectory(go/pserver/cclient) if(WITH_DOC) add_subdirectory(doc) diff --git a/doc/design/cluster_train/remote_parameter_updater.md b/doc/design/cluster_train/remote_parameter_updater.md new file mode 100644 index 0000000000..6e8e593845 --- /dev/null +++ b/doc/design/cluster_train/remote_parameter_updater.md @@ -0,0 +1,21 @@ +# Design Doc: Remote Parameter Updater for Cluster Train + +For an overview of distribute training, please refer to [distributed training design doc](README.md). In this design doc, we will discuss the parameter updater that will use parameter server cclient [The Client Library of Parameter Server Design Doc](pserver_client.md) to manage and update parameters. + +## Parameter Updater + +Parameter Updater is used by trainer to manage and update parameter, there are mainly two kind of parameter updater: local and remote, since this design is for cluster train, we will only discuss remote parameter updater here. + +### Remote Parameter Updater + +Remote Parameter Updater manage parameters through remote parameter server with the client that communicate with pserver([The Client Library of Parameter Server Design Doc](pserver_client.md)) + +In PaddlePaddle Python V2 API, trainer is implemented in python, and the trainer will hold a instance of parameter updater and call it's functions directly. In this design, we will also expose the api of RemoteParameterUpdater to python with swig. + +#### Sparse Remote Parameter Updater + +Since we will only implement dense parameter management new, the mechanism for sparse parameter will be discussed in next stage. + +### Interface Design + +TBD diff --git a/go/cmake/golang.cmake b/go/cmake/golang.cmake index d38d06de23..7c85fb6298 100644 --- a/go/cmake/golang.cmake +++ b/go/cmake/golang.cmake @@ -17,7 +17,7 @@ function(GO_LIBRARY NAME BUILD_TYPE) endif() file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") - file(RELATIVE_PATH rel ${CMAKE_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}) + file(RELATIVE_PATH rel ${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}) # find Paddle directory. get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) @@ -32,12 +32,14 @@ function(GO_LIBRARY NAME BUILD_TYPE) # will use the local changes in Paddle rather than checkout Paddle # in github. add_custom_target(copyPaddle - COMMAND ln -sf ${PADDLE_DIR} ${PADDLE_IN_GOPATH}) + COMMAND rm -rf ${PADDLE_IN_GOPATH}/Paddle + COMMAND ln -sf ${PADDLE_DIR} ${PADDLE_IN_GOPATH}/Paddle) add_dependencies(goGet copyPaddle) add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE} - -o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}" + -gcflags=-shared -asmflags=-shared -installsuffix=_shared -a + -o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}" ${CMAKE_GO_FLAGS} ${GO_SOURCE} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) diff --git a/go/pserver/cclient/CMakeLists.txt b/go/pserver/cclient/CMakeLists.txt index c017d74656..e00dd6b14a 100644 --- a/go/pserver/cclient/CMakeLists.txt +++ b/go/pserver/cclient/CMakeLists.txt @@ -9,5 +9,15 @@ project(cxx_go C Go) include(golang) include(flags) -go_library(client STATIC) +go_library(paddle_pserver_cclient STATIC) + +if(PROJ_ROOT) + add_custom_command(OUTPUT ${PROJ_ROOT}/paddle/trainer/libpaddle_pserver_cclient.a + COMMAND cp ${CMAKE_BINARY_DIR}/go/pserver/cclient/libpaddle_pserver_cclient.h ${PROJ_ROOT}/paddle/trainer/ + COMMAND cp ${CMAKE_BINARY_DIR}/go/pserver/cclient/libpaddle_pserver_cclient.a ${PROJ_ROOT}/paddle/trainer/ + WORKING_DIRECTORY ${PROJ_ROOT}/paddle + DEPENDS paddle_pserver_cclient) + add_custom_target(paddle_pserver_cclient_lib ALL DEPENDS ${PROJ_ROOT}/paddle/trainer/libpaddle_pserver_cclient.a) +endif(PROJ_ROOT) + add_subdirectory(test) diff --git a/go/pserver/cclient/test/CMakeLists.txt b/go/pserver/cclient/test/CMakeLists.txt index 16f84648c1..762772812f 100644 --- a/go/pserver/cclient/test/CMakeLists.txt +++ b/go/pserver/cclient/test/CMakeLists.txt @@ -1,11 +1,16 @@ cmake_minimum_required(VERSION 3.0) -include_directories(${CMAKE_BINARY_DIR}) - add_executable(main main.c) -add_dependencies(main client) +add_dependencies(main paddle_pserver_cclient) if(APPLE) set(CMAKE_EXE_LINKER_FLAGS "-framework CoreFoundation -framework Security") endif() -target_link_libraries(main ${CMAKE_BINARY_DIR}/libclient.a) + +if(PROJ_ROOT) + include_directories(${CMAKE_BINARY_DIR}/go/pserver/cclient/) + target_link_libraries(main ${CMAKE_BINARY_DIR}/go/pserver/cclient/libpaddle_pserver_cclient.a pthread) +else(PROJ_ROOT) + include_directories(${CMAKE_BINARY_DIR}) + target_link_libraries(main ${CMAKE_BINARY_DIR}/libpaddle_pserver_cclient.a pthread) +endif(PROJ_ROOT) diff --git a/go/pserver/cclient/test/main.c b/go/pserver/cclient/test/main.c index f75a2110b9..0ad890daa2 100644 --- a/go/pserver/cclient/test/main.c +++ b/go/pserver/cclient/test/main.c @@ -1,6 +1,6 @@ #include -#include "libclient.h" +#include "libpaddle_pserver_cclient.h" void fail() { // TODO(helin): fix: gtest using cmake is not working, using this @@ -14,10 +14,11 @@ int main() { client c = paddle_new_pserver_client(addr, 1); retry: if (paddle_begin_init_params(c)) { + paddle_parameter param; char name_a[] = "param_a"; char name_b[] = "param_b"; - unsigned char content[] = {0x00, 0x11, 0x22}; + unsigned char content[] = {0x00, 0x00, 0x00}; param.element_type = PADDLE_ELEMENT_TYPE_FLOAT32; param.name = name_a; param.content = content; @@ -32,6 +33,7 @@ retry: if (paddle_init_param(c, param, NULL, 0) != 0) { goto retry; } + if (paddle_finish_init_params(c) != 0) { goto retry; } @@ -41,30 +43,31 @@ retry: unsigned char content[] = {0x00, 0x11, 0x22}; paddle_gradient grads[2] = { - {"param_a", PADDLE_ELEMENT_TYPE_INT32, content, 3}, - {"param_b", PADDLE_ELEMENT_TYPE_FLOAT32, content, 3}}; + {"param_a", PADDLE_ELEMENT_TYPE_FLOAT32, content, 3}, + {"param_b", PADDLE_ELEMENT_TYPE_INT32, content, 3}}; - if (!paddle_send_grads(c, grads, 2)) { + if (paddle_send_grads(c, grads, 2) != 0) { fail(); } paddle_parameter* params[2] = {NULL, NULL}; char* names[] = {"param_a", "param_b"}; - if (!paddle_get_params(c, names, params, 2)) { + if (paddle_get_params(c, names, params, 2) != 0) { fail(); } // get parameters again by reusing the allocated parameter buffers. - if (!paddle_get_params(c, names, params, 2)) { + if (paddle_get_params(c, names, params, 2) != 0) { fail(); } paddle_release_param(params[0]); paddle_release_param(params[1]); - if (!paddle_save_model(c, "/tmp/")) { + if (paddle_save_model(c, "/tmp/") != 0) { fail(); } + printf("test success!\n"); return 0; } diff --git a/go/pserver/cclient/test/test_train.py b/go/pserver/cclient/test/test_train.py new file mode 100644 index 0000000000..ddd6371e0c --- /dev/null +++ b/go/pserver/cclient/test/test_train.py @@ -0,0 +1,60 @@ +import paddle.v2 as paddle +import paddle.v2.dataset.uci_housing as uci_housing + + +def main(): + # init + paddle.init(use_gpu=False, trainer_count=1, trainer_id=1) + + # network config + x = paddle.layer.data(name='x', type=paddle.data_type.dense_vector(13)) + y_predict = paddle.layer.fc(input=x, + param_attr=paddle.attr.Param(name='w'), + size=1, + act=paddle.activation.Linear(), + bias_attr=paddle.attr.Param(name='b')) + y = paddle.layer.data(name='y', type=paddle.data_type.dense_vector(1)) + cost = paddle.layer.mse_cost(input=y_predict, label=y) + + # create parameters + parameters = paddle.parameters.create(cost) + + # create optimizer + optimizer = paddle.optimizer.Momentum(momentum=0) + + trainer = paddle.trainer.SGD(cost=cost, + parameters=parameters, + update_equation=optimizer, + is_local=False, + pserver_spec="localhost:3000") + + # event_handler to print training and testing info + def event_handler(event): + if isinstance(event, paddle.event.EndIteration): + if event.batch_id % 100 == 0: + print "Pass %d, Batch %d, Cost %f" % ( + event.pass_id, event.batch_id, event.cost) + + if isinstance(event, paddle.event.EndPass): + if (event.pass_id + 1) % 10 == 0: + result = trainer.test( + reader=paddle.batch( + uci_housing.test(), batch_size=2), + feeding={'x': 0, + 'y': 1}) + print "Test %d, %.2f" % (event.pass_id, result.cost) + + # training + trainer.train( + reader=paddle.batch( + paddle.reader.shuffle( + uci_housing.train(), buf_size=500), + batch_size=2), + feeding={'x': 0, + 'y': 1}, + event_handler=event_handler, + num_passes=30) + + +if __name__ == '__main__': + main() diff --git a/paddle/api/CMakeLists.txt b/paddle/api/CMakeLists.txt index e147659566..c258a15240 100644 --- a/paddle/api/CMakeLists.txt +++ b/paddle/api/CMakeLists.txt @@ -16,7 +16,7 @@ set(API_HEADER Internal.h) add_library(paddle_api STATIC ${API_SOURCES}) -add_dependencies(paddle_api gen_proto_cpp) +add_dependencies(paddle_api gen_proto_cpp paddle_pserver_cclient_lib) INCLUDE(${SWIG_USE_FILE}) INCLUDE_DIRECTORIES(${PROJ_ROOT}/paddle) @@ -44,7 +44,7 @@ SET(SWIG_MODULE_swig_paddle_EXTRA_DEPS ) IF(APPLE) - SET(MACOS_LD_FLAGS "-undefined dynamic_lookup -Wl,-all_load") + SET(MACOS_LD_FLAGS "-undefined dynamic_lookup -Wl,-all_load -framework CoreFoundation -framework Security") ELSE(APPLE) SET(START_GROUP "-Xlinker -start-group") SET(END_GROUP "-Xlinker -end-group") diff --git a/paddle/api/Paddle.i b/paddle/api/Paddle.i index 068ba286c0..3237e73745 100644 --- a/paddle/api/Paddle.i +++ b/paddle/api/Paddle.i @@ -179,6 +179,7 @@ namespace std { %newobject ParameterOptimizer::needSpecialTraversal; %newobject ParameterUpdater::createLocalUpdater; %newobject ParameterUpdater::createRemoteUpdater; +%newobject ParameterUpdater::createNewRemoteUpdater; %feature("director") UpdateCallback; %feature("autodoc", 1); // To generate method stub, for code hint in ide diff --git a/paddle/api/PaddleAPI.h b/paddle/api/PaddleAPI.h index da0f157abd..7565ea51fe 100644 --- a/paddle/api/PaddleAPI.h +++ b/paddle/api/PaddleAPI.h @@ -841,6 +841,8 @@ public: static ParameterUpdater* createRemoteUpdater(OptimizationConfig* config, int passCount, bool useSparseUpdater); + static ParameterUpdater* createNewRemoteUpdater( + OptimizationConfig* config, const std::string pserverSpec); ~ParameterUpdater(); /** diff --git a/paddle/api/ParameterUpdater.cpp b/paddle/api/ParameterUpdater.cpp index 79921ea6e7..eaf8518ae2 100644 --- a/paddle/api/ParameterUpdater.cpp +++ b/paddle/api/ParameterUpdater.cpp @@ -15,6 +15,7 @@ limitations under the License. */ #include "PaddleAPI.h" #include "PaddleAPIPrivate.h" +#include "paddle/trainer/NewRemoteParameterUpdater.h" #include "paddle/trainer/RemoteParameterUpdater.h" #include "paddle/trainer/ThreadParameterUpdater.h" @@ -28,6 +29,14 @@ ParameterUpdater *ParameterUpdater::createLocalUpdater( return updater; } +ParameterUpdater *ParameterUpdater::createNewRemoteUpdater( + OptimizationConfig *config, const std::string pserverSpec) { + auto updater = new ParameterUpdater(); + updater->m->updater.reset(new paddle::NewRemoteParameterUpdater( + config->m->getConfig(), pserverSpec)); + return updater; +} + ParameterUpdater *ParameterUpdater::createRemoteUpdater( OptimizationConfig *config, int passCount, bool useSparseUpdater) { auto updater = new ParameterUpdater(); diff --git a/paddle/trainer/CMakeLists.txt b/paddle/trainer/CMakeLists.txt index 06c019f0a9..9d246b6690 100644 --- a/paddle/trainer/CMakeLists.txt +++ b/paddle/trainer/CMakeLists.txt @@ -4,6 +4,7 @@ set(TRAINER_SOURCES ParameterUpdater.cpp ParamUtil.cpp RemoteParameterUpdater.cpp + NewRemoteParameterUpdater.cpp Tester.cpp Trainer.cpp TrainerInternal.cpp @@ -16,6 +17,7 @@ set(TRAINER_HEADERS ParameterUpdater.h ParamUtil.h RemoteParameterUpdater.h + NewRemoteParameterUpdater.h Tester.h TesterConfig.h Trainer.h @@ -32,7 +34,7 @@ add_style_check_target(paddle_trainer_lib add_style_check_target(paddle_trainer_lib ${TRAINER_HEADERS}) add_dependencies(paddle_trainer_lib - gen_proto_cpp) + gen_proto_cpp paddle_pserver_cclient_lib) macro(add_paddle_exe TARGET_NAME) add_executable(${TARGET_NAME} ${ARGN}) @@ -56,3 +58,10 @@ install(TARGETS paddle_trainer paddle_merge_model set_target_properties(paddle_trainer PROPERTIES INSTALL_RPATH_USE_LINK_PATH TRUE) set_target_properties(paddle_merge_model PROPERTIES INSTALL_RPATH_USE_LINK_PATH TRUE) + +if(APPLE) + set(CMAKE_EXE_LINKER_FLAGS "-framework CoreFoundation -framework Security") +endif() + +target_link_libraries(paddle_trainer ${CMAKE_CURRENT_SOURCE_DIR}/libpaddle_pserver_cclient.a) +target_link_libraries(paddle_trainer_lib ${CMAKE_CURRENT_SOURCE_DIR}/libpaddle_pserver_cclient.a) diff --git a/paddle/trainer/NewRemoteParameterUpdater.cpp b/paddle/trainer/NewRemoteParameterUpdater.cpp new file mode 100644 index 0000000000..9060052e11 --- /dev/null +++ b/paddle/trainer/NewRemoteParameterUpdater.cpp @@ -0,0 +1,88 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "NewRemoteParameterUpdater.h" +#include "Trainer.h" +#include "paddle/utils/Stat.h" + +DECLARE_int32(trainer_id); +DECLARE_string(save_dir); + +namespace paddle { +NewRemoteParameterUpdater::NewRemoteParameterUpdater( + const OptimizationConfig &config, const std::string pserverSpec) + : pserverSpec_(pserverSpec) {} + +void NewRemoteParameterUpdater::init( + const std::vector ¶meters) { + ParameterUpdater::init(parameters); + LOG(INFO) << "NewRemoteParameterUpdater init in"; + + for (auto ¶ : parameters_) { + para->getBuf(PARAMETER_VALUE)->zeroMem(); + para->getBuf(PARAMETER_GRADIENT)->zeroMem(); + } + + // create parameter server client. + parameterClient_ = + paddle_new_pserver_client((char *)pserverSpec_.c_str(), FLAGS_trainer_id); + + // init names_ for get parameter through paddle_cclient + names_ = (char **)malloc(parameterSize() * sizeof(char *)); + for (int i = 0; i < parameterSize(); ++i) { + names_[i] = (char *)parameters_[i]->getName().c_str(); + } + + // init new parameter and gradient. + initNewParameter(newParameters_, PARAMETER_VALUE); + initNewParameter(newGradients_, PARAMETER_GRADIENT); + + // init parameter, one trainer will get the opportunity to int parameter and + // send them to parameter server. Others will get the initialized parameter + // from parameter server + if (paddle_begin_init_params(parameterClient_)) { + LOG(INFO) << "paddle_begin_init_params start"; + for (int i = 0; i < parameterSize(); ++i) { + paddle_init_param(parameterClient_, *newParameters_[i], NULL, 0); + } + paddle_finish_init_params(parameterClient_); + LOG(INFO) << "paddle_begin_init_params done"; + } else { + paddle_get_params( + parameterClient_, names_, newParameters_, (int)parameters_.size()); + } + + LOG(INFO) << "NewRemoteParameterUpdater initialized"; +} + +void NewRemoteParameterUpdater::updateImpl(Parameter *para) {} + +void NewRemoteParameterUpdater::finishBatch(real cost) { + LOG(INFO) << "finishBatch in, cost: " << cost; + + // send gradient to parameter server. + paddle_send_grads(parameterClient_, *newGradients_, parameterSize()); + // get the updated parameter from parameterClient. + paddle_get_params(parameterClient_, names_, newParameters_, parameterSize()); + + // clear gradient after update parameter. + for (auto ¶ : parameters_) { + para->getBuf(PARAMETER_GRADIENT)->zeroMem(); + } +} + +void NewRemoteParameterUpdater::startPass() {} + +bool NewRemoteParameterUpdater::finishPass() { return true; } +} diff --git a/paddle/trainer/NewRemoteParameterUpdater.h b/paddle/trainer/NewRemoteParameterUpdater.h new file mode 100644 index 0000000000..33640bc8a3 --- /dev/null +++ b/paddle/trainer/NewRemoteParameterUpdater.h @@ -0,0 +1,105 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include "ParameterUpdater.h" +#include "libpaddle_pserver_cclient.h" +#include "paddle/pserver/ParameterClient2.h" +#include "paddle/utils/Queue.h" +#include "paddle/utils/Util.h" + +namespace paddle { + +/** + * New remote parameter updater for dense parameters that use cclient of go. + */ +class NewRemoteParameterUpdater : public ParameterUpdater { +public: + NewRemoteParameterUpdater(const OptimizationConfig& config, + const std::string pserverSpec); + ~NewRemoteParameterUpdater() { + if (newGradients_) { + paddle_pserver_client_release(parameterClient_); + } + } + + /** + * initialize the internal parameter client and itself. + */ + virtual void init(const std::vector& parameters); + /** + * @brief start batch + * + * @note one batch training exhibits stateful feature to help + * to do performance tuning, sgd optimization if necessary. + */ + virtual PassType startBatch(int64_t batchSize) { return PASS_TRAIN; } + + /** + * send parameters to pservers and get returned parameters + * from all pservers if necessary. + */ + virtual void finishBatch(real cost); + virtual void startPass(); + virtual bool finishPass(); + + int parameterSize() { return (int)parameters_.size(); } + + /** + * init parameter of paddle pserver cclient. + * @param new_paras + * @param type + */ + void initNewParameter(paddle_parameter**& new_paras, ParameterType type) { + new_paras = + (paddle_parameter**)malloc(sizeof(paddle_parameter*) * parameterSize()); + for (int i = 0; i < parameterSize(); ++i) { + new_paras[i] = (paddle_parameter*)malloc(sizeof(paddle_parameter)); + memset(new_paras[i], 0, sizeof(paddle_parameter)); + } + + for (int i = 0; i < parameterSize(); ++i) { + ParameterPtr para = parameters_[i]; + new_paras[i]->content_len = 10; + new_paras[i]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32; + new_paras[i]->name = (char*)para->getName().c_str(); + new_paras[i]->content = + (unsigned char*)(para->getBuf(type).get()->getData()); + new_paras[i]->content_len = (int)para->getBuf(type).get()->getSize(); + } + } + +protected: + /** + * work need to do after finishBatch + */ + virtual void updateImpl(Parameter* para); + +protected: + /// internal parameter client object for exchanging data with pserver + client parameterClient_ = -1; + /// the parameters for new pserver client + paddle_parameter** newParameters_; + /// the gradinets for new pserver client + paddle_parameter** newGradients_; + /// the names for new parameters. + char** names_; + /// the specification of parameter server "host1:port,host1:port" + std::string pserverSpec_; +}; + +} // namespace paddle diff --git a/python/paddle/v2/optimizer.py b/python/paddle/v2/optimizer.py index 5e99d4a241..1ef2dceca9 100644 --- a/python/paddle/v2/optimizer.py +++ b/python/paddle/v2/optimizer.py @@ -45,7 +45,12 @@ class Optimizer(object): return swig_api.ParameterUpdater.createRemoteUpdater( self.__opt_conf__, pass_num, use_sparse_updater) - def create_updater(self, is_local, num_passes, use_sparse_updater): + def __create_new_remote_updater__(self, pserver_spec): + return swig_api.ParameterUpdater.createNewRemoteUpdater( + self.__opt_conf__, pserver_spec) + + def create_updater(self, is_local, num_passes, use_sparse_updater, + pserver_spec): """ create proper parameter_updater by configuration. :param is_local: create local or remote parameter updater @@ -64,8 +69,12 @@ class Optimizer(object): if is_local: parameter_updater = self.__create_local_updater__() else: - parameter_updater = self.__create_remote_updater__( - num_passes, use_sparse_updater) + if pserver_spec is None: + parameter_updater = self.__create_remote_updater__( + num_passes, use_sparse_updater) + else: + parameter_updater = self.__create_new_remote_updater__( + pserver_spec) return parameter_updater diff --git a/python/paddle/v2/trainer.py b/python/paddle/v2/trainer.py index 8fdb67cc26..f9658a8c5d 100644 --- a/python/paddle/v2/trainer.py +++ b/python/paddle/v2/trainer.py @@ -49,7 +49,8 @@ class SGD(object): parameters, update_equation, extra_layers=None, - is_local=True): + is_local=True, + pserver_spec=None): if not isinstance(parameters, v2_parameters.Parameters): raise TypeError('parameters should be parameters') @@ -63,6 +64,7 @@ class SGD(object): self.__parameters__ = parameters self.__topology_in_proto__ = topology.proto() self.__is_local__ = is_local + self.__pserver_spec__ = pserver_spec self.__use_sparse_updater__ = self.__topology__.use_sparse_updater() # # In local mode, disable sparse_remote_update. @@ -126,7 +128,8 @@ class SGD(object): __check_train_args__(**locals()) self.__parameter_updater__ = self.__optimizer__.create_updater( - self.__is_local__, num_passes, self.__use_sparse_updater__) + self.__is_local__, num_passes, self.__use_sparse_updater__, + self.__pserver_spec__) self.__parameter_updater__.init(self.__gradient_machine__) self.__gradient_machine__.start() From 6f1c91da992b9f7b230633c0ac56db184d4df5c2 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Thu, 8 Jun 2017 22:38:30 +0800 Subject: [PATCH 02/12] refine code --- go/pserver/cclient/test/main.c | 1 - paddle/trainer/NewRemoteParameterUpdater.cpp | 6 +- paddle/trainer/NewRemoteParameterUpdater.h | 68 ++++++++++++-------- 3 files changed, 43 insertions(+), 32 deletions(-) diff --git a/go/pserver/cclient/test/main.c b/go/pserver/cclient/test/main.c index 0ad890daa2..b95abf96b1 100644 --- a/go/pserver/cclient/test/main.c +++ b/go/pserver/cclient/test/main.c @@ -14,7 +14,6 @@ int main() { client c = paddle_new_pserver_client(addr, 1); retry: if (paddle_begin_init_params(c)) { - paddle_parameter param; char name_a[] = "param_a"; char name_b[] = "param_b"; diff --git a/paddle/trainer/NewRemoteParameterUpdater.cpp b/paddle/trainer/NewRemoteParameterUpdater.cpp index 9060052e11..13110adb45 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.cpp +++ b/paddle/trainer/NewRemoteParameterUpdater.cpp @@ -45,8 +45,8 @@ void NewRemoteParameterUpdater::init( } // init new parameter and gradient. - initNewParameter(newParameters_, PARAMETER_VALUE); - initNewParameter(newGradients_, PARAMETER_GRADIENT); + newParameters_ = initNewParameter(PARAMETER_VALUE); + newGradients_ = initNewParameter(PARAMETER_GRADIENT); // init parameter, one trainer will get the opportunity to int parameter and // send them to parameter server. Others will get the initialized parameter @@ -60,7 +60,7 @@ void NewRemoteParameterUpdater::init( LOG(INFO) << "paddle_begin_init_params done"; } else { paddle_get_params( - parameterClient_, names_, newParameters_, (int)parameters_.size()); + parameterClient_, names_, newParameters_, parameterSize()); } LOG(INFO) << "NewRemoteParameterUpdater initialized"; diff --git a/paddle/trainer/NewRemoteParameterUpdater.h b/paddle/trainer/NewRemoteParameterUpdater.h index 33640bc8a3..5fd404dcf8 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.h +++ b/paddle/trainer/NewRemoteParameterUpdater.h @@ -32,9 +32,9 @@ public: NewRemoteParameterUpdater(const OptimizationConfig& config, const std::string pserverSpec); ~NewRemoteParameterUpdater() { - if (newGradients_) { - paddle_pserver_client_release(parameterClient_); - } + releaseNewParameter(newParameters_); + releaseNewParameter(newGradients_); + if (parameterClient_ >= 0) paddle_pserver_client_release(parameterClient_); } /** @@ -57,37 +57,49 @@ public: virtual void startPass(); virtual bool finishPass(); - int parameterSize() { return (int)parameters_.size(); } - +protected: /** - * init parameter of paddle pserver cclient. - * @param new_paras - * @param type + * work need to do after finishBatch */ - void initNewParameter(paddle_parameter**& new_paras, ParameterType type) { - new_paras = - (paddle_parameter**)malloc(sizeof(paddle_parameter*) * parameterSize()); - for (int i = 0; i < parameterSize(); ++i) { - new_paras[i] = (paddle_parameter*)malloc(sizeof(paddle_parameter)); - memset(new_paras[i], 0, sizeof(paddle_parameter)); + virtual void updateImpl(Parameter* para); + +private: + int parameterSize() { + return (int)parameters_.size(); } - for (int i = 0; i < parameterSize(); ++i) { - ParameterPtr para = parameters_[i]; - new_paras[i]->content_len = 10; - new_paras[i]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32; - new_paras[i]->name = (char*)para->getName().c_str(); - new_paras[i]->content = - (unsigned char*)(para->getBuf(type).get()->getData()); - new_paras[i]->content_len = (int)para->getBuf(type).get()->getSize(); + /** + * init parameter of paddle pserver cclient. + * @param new_params + * @param type + */ + paddle_parameter** initNewParameter(ParameterType type) { + paddle_parameter** new_params = + (paddle_parameter**)malloc(sizeof(paddle_parameter*) * parameterSize()); + for (int i = 0; i < parameterSize(); ++i) { + new_params[i] = (paddle_parameter*)malloc(sizeof(paddle_parameter)); + memset(new_params[i], 0, sizeof(paddle_parameter)); + } + + for (int i = 0; i < parameterSize(); ++i) { + ParameterPtr param = parameters_[i]; + new_params[i]->content_len = 10; + new_params[i]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32; + new_params[i]->name = (char*)param->getName().c_str(); + new_params[i]->content = + (unsigned char*)(param->getBuf(type).get()->getData()); + new_params[i]->content_len = (int)param->getBuf(type).get()->getSize(); + } + return new_params; } - } -protected: - /** - * work need to do after finishBatch - */ - virtual void updateImpl(Parameter* para); + void releaseNewParameter(paddle_parameter** newParams) { + if (newParams != NULL) { + for (int i = 0; i < parameterSize(); ++i) { + paddle_release_param(newParams[i]); + } + } + } protected: /// internal parameter client object for exchanging data with pserver From 28476f5f6e81a219914cf70f92a6c0fde2a6c203 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Fri, 9 Jun 2017 14:41:22 +0800 Subject: [PATCH 03/12] fix the problem of paddle_send_grad --- go/pserver/cclient/cclient.go | 4 +- go/pserver/cclient/test/main.c | 50 ++++++++++++++-- paddle/trainer/NewRemoteParameterUpdater.cpp | 8 ++- paddle/trainer/NewRemoteParameterUpdater.h | 62 ++++++++++---------- 4 files changed, 85 insertions(+), 39 deletions(-) diff --git a/go/pserver/cclient/cclient.go b/go/pserver/cclient/cclient.go index 0b4aa79806..662e925427 100644 --- a/go/pserver/cclient/cclient.go +++ b/go/pserver/cclient/cclient.go @@ -164,10 +164,10 @@ func paddle_finish_init_params(client C.client) C.int { } //export paddle_send_grads -func paddle_send_grads(client C.client, grads *C.paddle_gradient, total C.int) C.int { +func paddle_send_grads(client C.client, grads **C.paddle_gradient, total C.int) C.int { var gs []pserver.Gradient for i := 0; i < int(total); i++ { - grad := (*C.paddle_gradient)(unsafe.Pointer((uintptr(unsafe.Pointer(grads)) + uintptr(i)*unsafe.Sizeof(*grads)))) + grad := *(**C.paddle_gradient)(unsafe.Pointer((uintptr(unsafe.Pointer(grads)) + uintptr(i)*unsafe.Sizeof(*grads)))) et := pserver.ElementType(grad.element_type) name := C.GoString(grad.name) content := cArrayToSlice(unsafe.Pointer(grad.content), int(grad.content_len)) diff --git a/go/pserver/cclient/test/main.c b/go/pserver/cclient/test/main.c index b95abf96b1..1039139307 100644 --- a/go/pserver/cclient/test/main.c +++ b/go/pserver/cclient/test/main.c @@ -1,4 +1,5 @@ #include +#include #include "libpaddle_pserver_cclient.h" @@ -9,6 +10,21 @@ void fail() { exit(-1); } +void print_parameter(paddle_gradient* param) { + if (param == NULL) { + printf("param is NULL!!\n"); + } else { + printf("==== parameter ====\n"); + printf("name: %s\n", param->name); + printf("content_len: %d\n", param->content_len); + printf("content_type: %d\n", param->element_type); + for (int i = 0; i < param->content_len; ++i) { + printf("0x%x ", param->content[i]); + } + printf("\n"); + } +} + int main() { char addr[] = "localhost:3000"; client c = paddle_new_pserver_client(addr, 1); @@ -40,12 +56,27 @@ retry: fail(); } - unsigned char content[] = {0x00, 0x11, 0x22}; - paddle_gradient grads[2] = { - {"param_a", PADDLE_ELEMENT_TYPE_FLOAT32, content, 3}, - {"param_b", PADDLE_ELEMENT_TYPE_INT32, content, 3}}; + unsigned char content1[] = {0x12, 0x23, 0x34}; + unsigned char content2[] = {0x45, 0x56, 0x67}; + + paddle_gradient** new_params = + (paddle_gradient**)malloc(sizeof(paddle_gradient*) * 2); + new_params[0] = (paddle_gradient*)malloc(sizeof(paddle_gradient)); + new_params[0]->name = "param_a"; + new_params[0]->content = content1; + new_params[0]->content_len = 3; + new_params[0]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32; - if (paddle_send_grads(c, grads, 2) != 0) { + new_params[1] = (paddle_gradient*)malloc(sizeof(paddle_gradient)); + new_params[1]->name = "param_b"; + new_params[1]->content = content2; + new_params[1]->content_len = 3; + new_params[1]->element_type = PADDLE_ELEMENT_TYPE_INT32; + + print_parameter(new_params[0]); + print_parameter(new_params[1]); + + if (paddle_send_grads(c, new_params, 2) != 0) { fail(); } @@ -55,6 +86,15 @@ retry: fail(); } + print_parameter(params[0]); + print_parameter(params[1]); + + /// change name of parameter. + char* names2[] = {"param_1", "param_2"}; + if (paddle_get_params(c, names2, params, 2) == 0) { + fail(); + } + // get parameters again by reusing the allocated parameter buffers. if (paddle_get_params(c, names, params, 2) != 0) { fail(); diff --git a/paddle/trainer/NewRemoteParameterUpdater.cpp b/paddle/trainer/NewRemoteParameterUpdater.cpp index 13110adb45..35df973897 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.cpp +++ b/paddle/trainer/NewRemoteParameterUpdater.cpp @@ -22,7 +22,11 @@ DECLARE_string(save_dir); namespace paddle { NewRemoteParameterUpdater::NewRemoteParameterUpdater( const OptimizationConfig &config, const std::string pserverSpec) - : pserverSpec_(pserverSpec) {} + : parameterClient_(-1), + newParameters_(nullptr), + newGradients_(nullptr), + names_(nullptr), + pserverSpec_(pserverSpec) {} void NewRemoteParameterUpdater::init( const std::vector ¶meters) { @@ -72,7 +76,7 @@ void NewRemoteParameterUpdater::finishBatch(real cost) { LOG(INFO) << "finishBatch in, cost: " << cost; // send gradient to parameter server. - paddle_send_grads(parameterClient_, *newGradients_, parameterSize()); + paddle_send_grads(parameterClient_, newGradients_, parameterSize()); // get the updated parameter from parameterClient. paddle_get_params(parameterClient_, names_, newParameters_, parameterSize()); diff --git a/paddle/trainer/NewRemoteParameterUpdater.h b/paddle/trainer/NewRemoteParameterUpdater.h index 5fd404dcf8..ed82de3f99 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.h +++ b/paddle/trainer/NewRemoteParameterUpdater.h @@ -32,6 +32,7 @@ public: NewRemoteParameterUpdater(const OptimizationConfig& config, const std::string pserverSpec); ~NewRemoteParameterUpdater() { + LOG(INFO) << "~NewRemoteParameterUpdater in"; releaseNewParameter(newParameters_); releaseNewParameter(newGradients_); if (parameterClient_ >= 0) paddle_pserver_client_release(parameterClient_); @@ -64,46 +65,47 @@ protected: virtual void updateImpl(Parameter* para); private: - int parameterSize() { - return (int)parameters_.size(); - } + int parameterSize() { return (int)parameters_.size(); } - /** - * init parameter of paddle pserver cclient. - * @param new_params - * @param type - */ - paddle_parameter** initNewParameter(ParameterType type) { - paddle_parameter** new_params = - (paddle_parameter**)malloc(sizeof(paddle_parameter*) * parameterSize()); - for (int i = 0; i < parameterSize(); ++i) { - new_params[i] = (paddle_parameter*)malloc(sizeof(paddle_parameter)); - memset(new_params[i], 0, sizeof(paddle_parameter)); - } + /** + * init parameter of go paddle pserver cclient. + * @param new_params + * @param type + */ + paddle_parameter** initNewParameter(ParameterType type) { + paddle_parameter** new_params = + (paddle_parameter**)malloc(sizeof(paddle_parameter*) * parameterSize()); + for (int i = 0; i < parameterSize(); ++i) { + new_params[i] = (paddle_parameter*)malloc(sizeof(paddle_parameter)); + memset(new_params[i], 0, sizeof(paddle_parameter)); + } - for (int i = 0; i < parameterSize(); ++i) { - ParameterPtr param = parameters_[i]; - new_params[i]->content_len = 10; - new_params[i]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32; - new_params[i]->name = (char*)param->getName().c_str(); - new_params[i]->content = - (unsigned char*)(param->getBuf(type).get()->getData()); - new_params[i]->content_len = (int)param->getBuf(type).get()->getSize(); - } - return new_params; + for (int i = 0; i < parameterSize(); ++i) { + ParameterPtr param = parameters_[i]; + new_params[i]->content_len = 10; + new_params[i]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32; + new_params[i]->name = (char*)param->getName().c_str(); + new_params[i]->content = + (unsigned char*)(param->getBuf(type).get()->getData()); + new_params[i]->content_len = (int)param->getBuf(type).get()->getSize(); } + return new_params; + } - void releaseNewParameter(paddle_parameter** newParams) { - if (newParams != NULL) { - for (int i = 0; i < parameterSize(); ++i) { - paddle_release_param(newParams[i]); + void releaseNewParameter(paddle_parameter** newParams) { + if (newParams != nullptr) { + for (int i = 0; i < parameterSize(); ++i) { + auto param = newParams[i]; + if (param != nullptr) { + paddle_release_param(param); } } } + } protected: /// internal parameter client object for exchanging data with pserver - client parameterClient_ = -1; + client parameterClient_; /// the parameters for new pserver client paddle_parameter** newParameters_; /// the gradinets for new pserver client From 966bf9ae1f090d404f56033e1c9f51c15eb6c2ad Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Fri, 9 Jun 2017 16:34:24 +0800 Subject: [PATCH 04/12] fix the problem in cclient when malloc paddle_parameter --- go/pserver/cclient/cclient.go | 7 +++++-- go/pserver/cclient/test/main.c | 9 +++++---- go/pserver/service.go | 4 ++++ paddle/trainer/NewRemoteParameterUpdater.h | 4 ++-- 4 files changed, 16 insertions(+), 8 deletions(-) diff --git a/go/pserver/cclient/cclient.go b/go/pserver/cclient/cclient.go index 662e925427..be16a143d8 100644 --- a/go/pserver/cclient/cclient.go +++ b/go/pserver/cclient/cclient.go @@ -42,6 +42,7 @@ import ( "strings" "sync" "unsafe" + "fmt" "github.com/PaddlePaddle/Paddle/go/pserver" ) @@ -204,12 +205,14 @@ func paddle_get_params(client C.client, names **C.char, dst **C.paddle_parameter } p := ps[i] - param := *(**C.paddle_parameter)(unsafe.Pointer((uintptr(unsafe.Pointer(dst)) + uintptr(i)*unsafe.Sizeof(*dst)))) + paramPtr := (**C.paddle_parameter)(unsafe.Pointer((uintptr(unsafe.Pointer(dst)) + uintptr(i)*unsafe.Sizeof(*dst)))) + param := *paramPtr nameReady := false contentAllocated := false if unsafe.Pointer(param) == nullPtr { - param = (*C.paddle_parameter)(C.calloc(1, C.size_t(unsafe.Sizeof(*param)))) + *paramPtr = (*C.paddle_parameter)(C.calloc(1, C.size_t(unsafe.Sizeof(*param)))) + param = *paramPtr } else { if unsafe.Pointer(param.name) != nullPtr { if n := C.GoString(param.name); n != p.Name { diff --git a/go/pserver/cclient/test/main.c b/go/pserver/cclient/test/main.c index 1039139307..59cf5756fd 100644 --- a/go/pserver/cclient/test/main.c +++ b/go/pserver/cclient/test/main.c @@ -21,7 +21,7 @@ void print_parameter(paddle_gradient* param) { for (int i = 0; i < param->content_len; ++i) { printf("0x%x ", param->content[i]); } - printf("\n"); + printf("\n\n"); } } @@ -33,17 +33,18 @@ retry: paddle_parameter param; char name_a[] = "param_a"; char name_b[] = "param_b"; - unsigned char content[] = {0x00, 0x00, 0x00}; + unsigned char content1[] = {0x01, 0x02, 0x03}; param.element_type = PADDLE_ELEMENT_TYPE_FLOAT32; param.name = name_a; - param.content = content; + param.content = content1; param.content_len = 3; if (paddle_init_param(c, param, NULL, 0) != 0) { goto retry; } + unsigned char content2[] = {0x04, 0x05, 0x06}; param.element_type = PADDLE_ELEMENT_TYPE_INT32; param.name = name_b; - param.content = content; + param.content = content2; param.content_len = 3; if (paddle_init_param(c, param, NULL, 0) != 0) { goto retry; diff --git a/go/pserver/service.go b/go/pserver/service.go index d5787b9708..ab814662b6 100644 --- a/go/pserver/service.go +++ b/go/pserver/service.go @@ -29,6 +29,10 @@ type Parameter struct { Content []byte } +func (p *Parameter) toString() { + fmt.Println(p.Name, p.ElementType, p.Content) +} + // ParameterWithConfig contains the parameter and the configuration. type ParameterWithConfig struct { Param Parameter diff --git a/paddle/trainer/NewRemoteParameterUpdater.h b/paddle/trainer/NewRemoteParameterUpdater.h index ed82de3f99..1dbb3658fb 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.h +++ b/paddle/trainer/NewRemoteParameterUpdater.h @@ -33,8 +33,8 @@ public: const std::string pserverSpec); ~NewRemoteParameterUpdater() { LOG(INFO) << "~NewRemoteParameterUpdater in"; - releaseNewParameter(newParameters_); - releaseNewParameter(newGradients_); +// releaseNewParameter(newParameters_); +// releaseNewParameter(newGradients_); if (parameterClient_ >= 0) paddle_pserver_client_release(parameterClient_); } From c44f5dd883b49d063d336dda874f1794270c2982 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sat, 10 Jun 2017 20:46:26 +0800 Subject: [PATCH 05/12] add simple updater, this version can train a model --- go/pserver/cclient/cclient.go | 1 - go/pserver/cclient/test/CMakeLists.txt | 4 + go/pserver/cclient/test/main.c | 71 ++++-------- go/pserver/cclient/test/test_cclient.c | 114 +++++++++++++++++++ go/pserver/optimizer.c | 8 +- paddle/trainer/NewRemoteParameterUpdater.cpp | 2 - paddle/trainer/NewRemoteParameterUpdater.h | 2 +- 7 files changed, 146 insertions(+), 56 deletions(-) create mode 100644 go/pserver/cclient/test/test_cclient.c diff --git a/go/pserver/cclient/cclient.go b/go/pserver/cclient/cclient.go index be16a143d8..7fdf9f0ec2 100644 --- a/go/pserver/cclient/cclient.go +++ b/go/pserver/cclient/cclient.go @@ -42,7 +42,6 @@ import ( "strings" "sync" "unsafe" - "fmt" "github.com/PaddlePaddle/Paddle/go/pserver" ) diff --git a/go/pserver/cclient/test/CMakeLists.txt b/go/pserver/cclient/test/CMakeLists.txt index 762772812f..e7d0a74237 100644 --- a/go/pserver/cclient/test/CMakeLists.txt +++ b/go/pserver/cclient/test/CMakeLists.txt @@ -2,6 +2,8 @@ cmake_minimum_required(VERSION 3.0) add_executable(main main.c) add_dependencies(main paddle_pserver_cclient) +add_executable(test_cclient test_cclient.c) +add_dependencies(test_cclient paddle_pserver_cclient) if(APPLE) set(CMAKE_EXE_LINKER_FLAGS "-framework CoreFoundation -framework Security") @@ -10,7 +12,9 @@ endif() if(PROJ_ROOT) include_directories(${CMAKE_BINARY_DIR}/go/pserver/cclient/) target_link_libraries(main ${CMAKE_BINARY_DIR}/go/pserver/cclient/libpaddle_pserver_cclient.a pthread) + target_link_libraries(test_cclient ${CMAKE_BINARY_DIR}/go/pserver/cclient/libpaddle_pserver_cclient.a pthread) else(PROJ_ROOT) include_directories(${CMAKE_BINARY_DIR}) target_link_libraries(main ${CMAKE_BINARY_DIR}/libpaddle_pserver_cclient.a pthread) + target_link_libraries(test_cclient ${CMAKE_BINARY_DIR}/libpaddle_pserver_cclient.a pthread) endif(PROJ_ROOT) diff --git a/go/pserver/cclient/test/main.c b/go/pserver/cclient/test/main.c index 59cf5756fd..a074808b16 100644 --- a/go/pserver/cclient/test/main.c +++ b/go/pserver/cclient/test/main.c @@ -1,5 +1,4 @@ #include -#include #include "libpaddle_pserver_cclient.h" @@ -10,21 +9,6 @@ void fail() { exit(-1); } -void print_parameter(paddle_gradient* param) { - if (param == NULL) { - printf("param is NULL!!\n"); - } else { - printf("==== parameter ====\n"); - printf("name: %s\n", param->name); - printf("content_len: %d\n", param->content_len); - printf("content_type: %d\n", param->element_type); - for (int i = 0; i < param->content_len; ++i) { - printf("0x%x ", param->content[i]); - } - printf("\n\n"); - } -} - int main() { char addr[] = "localhost:3000"; client c = paddle_new_pserver_client(addr, 1); @@ -33,23 +17,21 @@ retry: paddle_parameter param; char name_a[] = "param_a"; char name_b[] = "param_b"; - unsigned char content1[] = {0x01, 0x02, 0x03}; + unsigned char content[] = {0x00, 0x11, 0x22}; param.element_type = PADDLE_ELEMENT_TYPE_FLOAT32; param.name = name_a; - param.content = content1; + param.content = content; param.content_len = 3; if (paddle_init_param(c, param, NULL, 0) != 0) { goto retry; } - unsigned char content2[] = {0x04, 0x05, 0x06}; param.element_type = PADDLE_ELEMENT_TYPE_INT32; param.name = name_b; - param.content = content2; + param.content = content; param.content_len = 3; if (paddle_init_param(c, param, NULL, 0) != 0) { goto retry; } - if (paddle_finish_init_params(c) != 0) { goto retry; } @@ -57,27 +39,22 @@ retry: fail(); } - unsigned char content1[] = {0x12, 0x23, 0x34}; - unsigned char content2[] = {0x45, 0x56, 0x67}; - - paddle_gradient** new_params = - (paddle_gradient**)malloc(sizeof(paddle_gradient*) * 2); - new_params[0] = (paddle_gradient*)malloc(sizeof(paddle_gradient)); - new_params[0]->name = "param_a"; - new_params[0]->content = content1; - new_params[0]->content_len = 3; - new_params[0]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32; - - new_params[1] = (paddle_gradient*)malloc(sizeof(paddle_gradient)); - new_params[1]->name = "param_b"; - new_params[1]->content = content2; - new_params[1]->content_len = 3; - new_params[1]->element_type = PADDLE_ELEMENT_TYPE_INT32; - - print_parameter(new_params[0]); - print_parameter(new_params[1]); - - if (paddle_send_grads(c, new_params, 2) != 0) { + unsigned char content[] = {0x00, 0x11, 0x22}; + paddle_gradient** grads = + (paddle_gradient**)malloc(sizeof(paddle_gradient*) * 2); + grads[0] = (paddle_gradient*)malloc(sizeof(paddle_gradient)); + grads[0]->name = "param_a"; + grads[0]->content = content; + grads[0]->content_len = 3; + grads[0]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32; + + grads[1] = (paddle_gradient*)malloc(sizeof(paddle_gradient)); + grads[1]->name = "param_b"; + grads[1]->content = content; + grads[1]->content_len = 3; + grads[1]->element_type = PADDLE_ELEMENT_TYPE_INT32; + + if (paddle_send_grads(c, grads, 2) != 0) { fail(); } @@ -87,15 +64,6 @@ retry: fail(); } - print_parameter(params[0]); - print_parameter(params[1]); - - /// change name of parameter. - char* names2[] = {"param_1", "param_2"}; - if (paddle_get_params(c, names2, params, 2) == 0) { - fail(); - } - // get parameters again by reusing the allocated parameter buffers. if (paddle_get_params(c, names, params, 2) != 0) { fail(); @@ -109,5 +77,6 @@ retry: } printf("test success!\n"); + return 0; } diff --git a/go/pserver/cclient/test/test_cclient.c b/go/pserver/cclient/test/test_cclient.c new file mode 100644 index 0000000000..4d6fca29fb --- /dev/null +++ b/go/pserver/cclient/test/test_cclient.c @@ -0,0 +1,114 @@ +#include +#include + +#include "libpaddle_pserver_cclient.h" + +typedef float real; + +void fail() { + // TODO(helin): fix: gtest using cmake is not working, using this + // hacky way for now. + printf("test failed.\n"); + exit(-1); +} + +void print_parameter(paddle_gradient* param) { + if (param == NULL) { + printf("param is NULL!!\n"); + } else { + printf("==== parameter ====\n"); + printf("name: %s\n", param->name); + printf("content_len: %d\n", param->content_len); + printf("content_type: %d\n", param->element_type); + for (int i = 0; i < param->content_len/sizeof(real); ++i) { + printf("%f ", ((float *)param->content)[i]); + } + printf("\n\n"); + } +} + +int main() { + char addr[] = "localhost:3000"; + client c = paddle_new_pserver_client(addr, 1); + + char* names[] = {"param_a", "param_b"}; +retry: + + if (paddle_begin_init_params(c)) { + paddle_parameter param; + real param_content1[] = {0.1, 0.2, 0.3}; + param.element_type = PADDLE_ELEMENT_TYPE_FLOAT32; + param.name = names[0]; + param.content = (unsigned char*)param_content1; + param.content_len = 3 * sizeof(real); + if (paddle_init_param(c, param, NULL, 0) != 0) { + goto retry; + } + real param_content2[] = {0.4, 0.5, 0.6}; + param.element_type = PADDLE_ELEMENT_TYPE_INT32; + param.name = names[1]; + param.content = (unsigned char*)param_content2; + param.content_len = 3 * sizeof(real); + if (paddle_init_param(c, param, NULL, 0) != 0) { + goto retry; + } + + if (paddle_finish_init_params(c) != 0) { + goto retry; + } + } else { + fail(); + } + + printf("get initialized parameters from pserver:\n"); + paddle_parameter* param_ptrs[2] = {NULL, NULL}; + if (paddle_get_params(c, names, param_ptrs, 2) != 0) { + fail(); + } + print_parameter(param_ptrs[0]); + print_parameter(param_ptrs[1]); + + printf("send gradient to pserver:\n"); + real gradient_content1[] = {0.01, 0.02, 0.03}; + real gradinet_content2[] = {0.04, 0.05, 0.06}; + + paddle_gradient** grads = + (paddle_gradient**)malloc(sizeof(paddle_gradient*) * 2); + grads[0] = (paddle_gradient*)malloc(sizeof(paddle_gradient)); + grads[0]->name = names[0]; + grads[0]->content = (unsigned char*)gradient_content1; + grads[0]->content_len = 3 * sizeof(real); + grads[0]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32; + + grads[1] = (paddle_gradient*)malloc(sizeof(paddle_gradient)); + grads[1]->name = names[1]; + grads[1]->content = (unsigned char*)gradinet_content2; + grads[1]->content_len = 3 * sizeof(real); + grads[1]->element_type = PADDLE_ELEMENT_TYPE_INT32; + + print_parameter(grads[0]); + print_parameter(grads[1]); + + if (paddle_send_grads(c, grads, 2) != 0) { + fail(); + } + + printf("get updated parameters from pserver:\n"); + // get parameters again by reusing the allocated parameter buffers. + if (paddle_get_params(c, names, param_ptrs, 2) != 0) { + fail(); + } + + print_parameter(param_ptrs[0]); + print_parameter(param_ptrs[1]); + + paddle_release_param(param_ptrs[0]); + paddle_release_param(param_ptrs[1]); + + if (paddle_save_model(c, "/tmp/") != 0) { + fail(); + } + + printf("test success!\n"); + return 0; +} diff --git a/go/pserver/optimizer.c b/go/pserver/optimizer.c index b8da3ec959..5d0b1017ce 100644 --- a/go/pserver/optimizer.c +++ b/go/pserver/optimizer.c @@ -32,7 +32,13 @@ int update_SGD(void* optimizer, const void* gradient, int num_bytes) { SGD_optimizer* o = (SGD_optimizer*)optimizer; - // TODO + // TODO(a simple SGD implement) + float* parameter = (float *)buffer; + float* grad = (float *)gradient; + + for(int i = 0; i < num_bytes/sizeof(float); ++i) { + parameter[i] -= o->learning_rate * grad[i]; + } return 0; } diff --git a/paddle/trainer/NewRemoteParameterUpdater.cpp b/paddle/trainer/NewRemoteParameterUpdater.cpp index 35df973897..0f879dbde0 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.cpp +++ b/paddle/trainer/NewRemoteParameterUpdater.cpp @@ -73,8 +73,6 @@ void NewRemoteParameterUpdater::init( void NewRemoteParameterUpdater::updateImpl(Parameter *para) {} void NewRemoteParameterUpdater::finishBatch(real cost) { - LOG(INFO) << "finishBatch in, cost: " << cost; - // send gradient to parameter server. paddle_send_grads(parameterClient_, newGradients_, parameterSize()); // get the updated parameter from parameterClient. diff --git a/paddle/trainer/NewRemoteParameterUpdater.h b/paddle/trainer/NewRemoteParameterUpdater.h index 1dbb3658fb..d3a032badc 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.h +++ b/paddle/trainer/NewRemoteParameterUpdater.h @@ -87,7 +87,7 @@ private: new_params[i]->name = (char*)param->getName().c_str(); new_params[i]->content = (unsigned char*)(param->getBuf(type).get()->getData()); - new_params[i]->content_len = (int)param->getBuf(type).get()->getSize(); + new_params[i]->content_len = (int)param->getBuf(type).get()->getSize() * sizeof(real); } return new_params; } From 39d0b3de99112e6b90c22147b6c15917f5a3e1d5 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sat, 10 Jun 2017 21:18:03 +0800 Subject: [PATCH 06/12] add test file mnist_test.py, free resource of newRemoteParameterUpdater properly --- go/pserver/cclient/test/mnist_test.py | 134 +++++++++++++++++++++ paddle/trainer/NewRemoteParameterUpdater.h | 14 +-- 2 files changed, 141 insertions(+), 7 deletions(-) create mode 100644 go/pserver/cclient/test/mnist_test.py diff --git a/go/pserver/cclient/test/mnist_test.py b/go/pserver/cclient/test/mnist_test.py new file mode 100644 index 0000000000..c77af49130 --- /dev/null +++ b/go/pserver/cclient/test/mnist_test.py @@ -0,0 +1,134 @@ +import paddle.v2 as paddle +import gzip + + +def softmax_regression(img): + predict = paddle.layer.fc(input=img, + size=10, + act=paddle.activation.Softmax()) + return predict + + +def multilayer_perceptron(img): + # The first fully-connected layer + hidden1 = paddle.layer.fc(input=img, size=128, act=paddle.activation.Relu()) + # The second fully-connected layer and the according activation function + hidden2 = paddle.layer.fc(input=hidden1, + size=64, + act=paddle.activation.Relu()) + # The thrid fully-connected layer, note that the hidden size should be 10, + # which is the number of unique digits + predict = paddle.layer.fc(input=hidden2, + size=10, + act=paddle.activation.Softmax()) + return predict + + +def convolutional_neural_network(img): + # first conv layer + conv_pool_1 = paddle.networks.simple_img_conv_pool( + input=img, + filter_size=5, + num_filters=20, + num_channel=1, + pool_size=2, + pool_stride=2, + act=paddle.activation.Tanh()) + # second conv layer + conv_pool_2 = paddle.networks.simple_img_conv_pool( + input=conv_pool_1, + filter_size=5, + num_filters=50, + num_channel=20, + pool_size=2, + pool_stride=2, + act=paddle.activation.Tanh()) + # The first fully-connected layer + fc1 = paddle.layer.fc(input=conv_pool_2, + size=128, + act=paddle.activation.Tanh()) + # The softmax layer, note that the hidden size should be 10, + # which is the number of unique digits + predict = paddle.layer.fc(input=fc1, + size=10, + act=paddle.activation.Softmax()) + return predict + + +def main(): + paddle.init(use_gpu=False, trainer_count=1, trainer_id=1) + + # define network topology + images = paddle.layer.data( + name='pixel', type=paddle.data_type.dense_vector(784)) + label = paddle.layer.data( + name='label', type=paddle.data_type.integer_value(10)) + + # Here we can build the prediction network in different ways. Please + # choose one by uncomment corresponding line. + predict = softmax_regression(images) + #predict = multilayer_perceptron(images) + #predict = convolutional_neural_network(images) + + cost = paddle.layer.classification_cost(input=predict, label=label) + parameters = paddle.parameters.create(cost) + + optimizer = paddle.optimizer.Momentum( + learning_rate=0.1 / 128.0, + momentum=0.9, + regularization=paddle.optimizer.L2Regularization(rate=0.0005 * 128)) + + trainer = paddle.trainer.SGD(cost=cost, + parameters=parameters, + update_equation=optimizer, + is_local=False, + pserver_spec="localhost:3000") + + lists = [] + + def event_handler(event): + if isinstance(event, paddle.event.EndIteration): + if event.batch_id % 1000 == 0: + print "Pass %d, Batch %d, Cost %f, %s" % ( + event.pass_id, event.batch_id, event.cost, event.metrics) + + with gzip.open('params.tar.gz', 'w') as f: + parameters.to_tar(f) + + elif isinstance(event, paddle.event.EndPass): + result = trainer.test(reader=paddle.batch( + paddle.dataset.mnist.test(), batch_size=128)) + print "Test with Pass %d, Cost %f, %s\n" % ( + event.pass_id, result.cost, result.metrics) + lists.append((event.pass_id, result.cost, + result.metrics['classification_error_evaluator'])) + + trainer.train( + reader=paddle.batch( + paddle.reader.shuffle( + paddle.dataset.mnist.train(), buf_size=8192), + batch_size=128), + event_handler=event_handler, + num_passes=100) + + # find the best pass + best = sorted(lists, key=lambda list: float(list[1]))[0] + print 'Best pass is %s, testing Avgcost is %s' % (best[0], best[1]) + print 'The classification accuracy is %.2f%%' % (100 - float(best[2]) * 100) + + test_creator = paddle.dataset.mnist.test() + test_data = [] + for item in test_creator(): + test_data.append((item[0], )) + if len(test_data) == 100: + break + + # output is a softmax layer. It returns probabilities. + # Shape should be (100, 10) + probs = paddle.infer( + output_layer=predict, parameters=parameters, input=test_data) + print probs.shape + + +if __name__ == '__main__': + main() diff --git a/paddle/trainer/NewRemoteParameterUpdater.h b/paddle/trainer/NewRemoteParameterUpdater.h index d3a032badc..db9b44af19 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.h +++ b/paddle/trainer/NewRemoteParameterUpdater.h @@ -32,9 +32,11 @@ public: NewRemoteParameterUpdater(const OptimizationConfig& config, const std::string pserverSpec); ~NewRemoteParameterUpdater() { - LOG(INFO) << "~NewRemoteParameterUpdater in"; -// releaseNewParameter(newParameters_); -// releaseNewParameter(newGradients_); + if (names_ != nullptr) { + free(names_); + } + releaseNewParameter(newParameters_); + releaseNewParameter(newGradients_); if (parameterClient_ >= 0) paddle_pserver_client_release(parameterClient_); } @@ -95,11 +97,9 @@ private: void releaseNewParameter(paddle_parameter** newParams) { if (newParams != nullptr) { for (int i = 0; i < parameterSize(); ++i) { - auto param = newParams[i]; - if (param != nullptr) { - paddle_release_param(param); - } + free(newParams[i]); } + free(newParams); } } From 4f366be4e298db78c7438f60d72fddf39b2b6ccc Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sat, 10 Jun 2017 21:23:02 +0800 Subject: [PATCH 07/12] clang format --- go/pserver/cclient/test/main.c | 2 +- go/pserver/cclient/test/test_cclient.c | 4 ++-- go/pserver/optimizer.c | 6 +++--- paddle/trainer/NewRemoteParameterUpdater.h | 3 ++- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/go/pserver/cclient/test/main.c b/go/pserver/cclient/test/main.c index a074808b16..72ec359076 100644 --- a/go/pserver/cclient/test/main.c +++ b/go/pserver/cclient/test/main.c @@ -41,7 +41,7 @@ retry: unsigned char content[] = {0x00, 0x11, 0x22}; paddle_gradient** grads = - (paddle_gradient**)malloc(sizeof(paddle_gradient*) * 2); + (paddle_gradient**)malloc(sizeof(paddle_gradient*) * 2); grads[0] = (paddle_gradient*)malloc(sizeof(paddle_gradient)); grads[0]->name = "param_a"; grads[0]->content = content; diff --git a/go/pserver/cclient/test/test_cclient.c b/go/pserver/cclient/test/test_cclient.c index 4d6fca29fb..82cef386d7 100644 --- a/go/pserver/cclient/test/test_cclient.c +++ b/go/pserver/cclient/test/test_cclient.c @@ -20,8 +20,8 @@ void print_parameter(paddle_gradient* param) { printf("name: %s\n", param->name); printf("content_len: %d\n", param->content_len); printf("content_type: %d\n", param->element_type); - for (int i = 0; i < param->content_len/sizeof(real); ++i) { - printf("%f ", ((float *)param->content)[i]); + for (int i = 0; i < param->content_len / sizeof(real); ++i) { + printf("%f ", ((float*)param->content)[i]); } printf("\n\n"); } diff --git a/go/pserver/optimizer.c b/go/pserver/optimizer.c index 5d0b1017ce..47fe1efbf5 100644 --- a/go/pserver/optimizer.c +++ b/go/pserver/optimizer.c @@ -33,10 +33,10 @@ int update_SGD(void* optimizer, int num_bytes) { SGD_optimizer* o = (SGD_optimizer*)optimizer; // TODO(a simple SGD implement) - float* parameter = (float *)buffer; - float* grad = (float *)gradient; + float* parameter = (float*)buffer; + float* grad = (float*)gradient; - for(int i = 0; i < num_bytes/sizeof(float); ++i) { + for (int i = 0; i < num_bytes / sizeof(float); ++i) { parameter[i] -= o->learning_rate * grad[i]; } return 0; diff --git a/paddle/trainer/NewRemoteParameterUpdater.h b/paddle/trainer/NewRemoteParameterUpdater.h index db9b44af19..b7c0425982 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.h +++ b/paddle/trainer/NewRemoteParameterUpdater.h @@ -89,7 +89,8 @@ private: new_params[i]->name = (char*)param->getName().c_str(); new_params[i]->content = (unsigned char*)(param->getBuf(type).get()->getData()); - new_params[i]->content_len = (int)param->getBuf(type).get()->getSize() * sizeof(real); + new_params[i]->content_len = + (int)param->getBuf(type).get()->getSize() * sizeof(real); } return new_params; } From da3e84a6d25fe75f63a624e4e523aba7a8c378c6 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Sat, 10 Jun 2017 21:43:59 +0800 Subject: [PATCH 08/12] change trainer_id --- go/pserver/cclient/test/mnist_test.py | 5 +---- go/pserver/cclient/test/test_train.py | 2 +- paddle/trainer/NewRemoteParameterUpdater.cpp | 4 ++-- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/go/pserver/cclient/test/mnist_test.py b/go/pserver/cclient/test/mnist_test.py index c77af49130..c3a3af55e2 100644 --- a/go/pserver/cclient/test/mnist_test.py +++ b/go/pserver/cclient/test/mnist_test.py @@ -56,7 +56,7 @@ def convolutional_neural_network(img): def main(): - paddle.init(use_gpu=False, trainer_count=1, trainer_id=1) + paddle.init(use_gpu=False, trainer_count=1) # define network topology images = paddle.layer.data( @@ -92,9 +92,6 @@ def main(): print "Pass %d, Batch %d, Cost %f, %s" % ( event.pass_id, event.batch_id, event.cost, event.metrics) - with gzip.open('params.tar.gz', 'w') as f: - parameters.to_tar(f) - elif isinstance(event, paddle.event.EndPass): result = trainer.test(reader=paddle.batch( paddle.dataset.mnist.test(), batch_size=128)) diff --git a/go/pserver/cclient/test/test_train.py b/go/pserver/cclient/test/test_train.py index ddd6371e0c..3f8d5d793b 100644 --- a/go/pserver/cclient/test/test_train.py +++ b/go/pserver/cclient/test/test_train.py @@ -4,7 +4,7 @@ import paddle.v2.dataset.uci_housing as uci_housing def main(): # init - paddle.init(use_gpu=False, trainer_count=1, trainer_id=1) + paddle.init(use_gpu=False, trainer_count=1) # network config x = paddle.layer.data(name='x', type=paddle.data_type.dense_vector(13)) diff --git a/paddle/trainer/NewRemoteParameterUpdater.cpp b/paddle/trainer/NewRemoteParameterUpdater.cpp index 0f879dbde0..d554e09759 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.cpp +++ b/paddle/trainer/NewRemoteParameterUpdater.cpp @@ -39,8 +39,8 @@ void NewRemoteParameterUpdater::init( } // create parameter server client. - parameterClient_ = - paddle_new_pserver_client((char *)pserverSpec_.c_str(), FLAGS_trainer_id); + parameterClient_ = paddle_new_pserver_client((char *)pserverSpec_.c_str(), + FLAGS_trainer_id == 0); // init names_ for get parameter through paddle_cclient names_ = (char **)malloc(parameterSize() * sizeof(char *)); From dc458a0d5ac7d5e88047856b773da1052eed80d8 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Mon, 12 Jun 2017 14:18:05 +0800 Subject: [PATCH 09/12] change go version --- .travis.yml | 1 + go/pserver/cclient/test/test_cclient.c | 3 ++- go/pserver/optimizer.c | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 44b755ee32..f9b4a7e083 100644 --- a/.travis.yml +++ b/.travis.yml @@ -50,6 +50,7 @@ before_install: # protobuf version. - pip install numpy wheel 'protobuf==3.1' sphinx==1.5.6 recommonmark sphinx-rtd-theme==0.1.9 virtualenv pre-commit requests==2.9.2 LinkChecker - pip install rarfile + - eval "$(GIMME_GO_VERSION=1.8.3 gimme)" - | function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; } script: diff --git a/go/pserver/cclient/test/test_cclient.c b/go/pserver/cclient/test/test_cclient.c index 82cef386d7..50ba2d5597 100644 --- a/go/pserver/cclient/test/test_cclient.c +++ b/go/pserver/cclient/test/test_cclient.c @@ -20,7 +20,8 @@ void print_parameter(paddle_gradient* param) { printf("name: %s\n", param->name); printf("content_len: %d\n", param->content_len); printf("content_type: %d\n", param->element_type); - for (int i = 0; i < param->content_len / sizeof(real); ++i) { + int i; + for (i = 0; i < param->content_len / sizeof(real); ++i) { printf("%f ", ((float*)param->content)[i]); } printf("\n\n"); diff --git a/go/pserver/optimizer.c b/go/pserver/optimizer.c index 47fe1efbf5..48bbceb343 100644 --- a/go/pserver/optimizer.c +++ b/go/pserver/optimizer.c @@ -36,7 +36,8 @@ int update_SGD(void* optimizer, float* parameter = (float*)buffer; float* grad = (float*)gradient; - for (int i = 0; i < num_bytes / sizeof(float); ++i) { + int i; + for (i = 0; i < num_bytes / sizeof(float); ++i) { parameter[i] -= o->learning_rate * grad[i]; } return 0; From 37594eae1737ad7c95016f48a385521ceb0de529 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Tue, 13 Jun 2017 08:15:12 +0800 Subject: [PATCH 10/12] add paramConfig for each parameter --- go/pserver/cclient/test/main.c | 2 -- go/pserver/cclient/test/test_cclient.c | 3 +-- go/pserver/optimizer.c | 1 - go/pserver/service.go | 6 +----- paddle/trainer/NewRemoteParameterUpdater.cpp | 8 ++++++-- paddle/trainer/NewRemoteParameterUpdater.h | 1 - 6 files changed, 8 insertions(+), 13 deletions(-) diff --git a/go/pserver/cclient/test/main.c b/go/pserver/cclient/test/main.c index 72ec359076..6adc3c9b53 100644 --- a/go/pserver/cclient/test/main.c +++ b/go/pserver/cclient/test/main.c @@ -76,7 +76,5 @@ retry: fail(); } - printf("test success!\n"); - return 0; } diff --git a/go/pserver/cclient/test/test_cclient.c b/go/pserver/cclient/test/test_cclient.c index 50ba2d5597..9083064eee 100644 --- a/go/pserver/cclient/test/test_cclient.c +++ b/go/pserver/cclient/test/test_cclient.c @@ -21,7 +21,7 @@ void print_parameter(paddle_gradient* param) { printf("content_len: %d\n", param->content_len); printf("content_type: %d\n", param->element_type); int i; - for (i = 0; i < param->content_len / sizeof(real); ++i) { + for (i = 0; i < param->content_len / (int)sizeof(real); ++i) { printf("%f ", ((float*)param->content)[i]); } printf("\n\n"); @@ -110,6 +110,5 @@ retry: fail(); } - printf("test success!\n"); return 0; } diff --git a/go/pserver/optimizer.c b/go/pserver/optimizer.c index 48bbceb343..f16ba2cbf8 100644 --- a/go/pserver/optimizer.c +++ b/go/pserver/optimizer.c @@ -32,7 +32,6 @@ int update_SGD(void* optimizer, const void* gradient, int num_bytes) { SGD_optimizer* o = (SGD_optimizer*)optimizer; - // TODO(a simple SGD implement) float* parameter = (float*)buffer; float* grad = (float*)gradient; diff --git a/go/pserver/service.go b/go/pserver/service.go index ab814662b6..7d2a1fea86 100644 --- a/go/pserver/service.go +++ b/go/pserver/service.go @@ -29,10 +29,6 @@ type Parameter struct { Content []byte } -func (p *Parameter) toString() { - fmt.Println(p.Name, p.ElementType, p.Content) -} - // ParameterWithConfig contains the parameter and the configuration. type ParameterWithConfig struct { Param Parameter @@ -53,7 +49,7 @@ type Service struct { // NewService creates a new service. func NewService() *Service { - s := &Service{opt: newOptimizer(sgd, 0.01)} + s := &Service{opt: newOptimizer(sgd, 0.005)} s.paramMap = make(map[string]Parameter) s.initialized = make(chan struct{}) return s diff --git a/paddle/trainer/NewRemoteParameterUpdater.cpp b/paddle/trainer/NewRemoteParameterUpdater.cpp index d554e09759..b3655d9d02 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.cpp +++ b/paddle/trainer/NewRemoteParameterUpdater.cpp @@ -31,7 +31,6 @@ NewRemoteParameterUpdater::NewRemoteParameterUpdater( void NewRemoteParameterUpdater::init( const std::vector ¶meters) { ParameterUpdater::init(parameters); - LOG(INFO) << "NewRemoteParameterUpdater init in"; for (auto ¶ : parameters_) { para->getBuf(PARAMETER_VALUE)->zeroMem(); @@ -58,7 +57,12 @@ void NewRemoteParameterUpdater::init( if (paddle_begin_init_params(parameterClient_)) { LOG(INFO) << "paddle_begin_init_params start"; for (int i = 0; i < parameterSize(); ++i) { - paddle_init_param(parameterClient_, *newParameters_[i], NULL, 0); + auto paramConfig = parameters_[i]->getConfig(); + std::string bytes = paramConfig.SerializeAsString(); + const char *array = bytes.data(); + int size = (int)bytes.size(); + paddle_init_param( + parameterClient_, *newParameters_[i], (void *)array, size); } paddle_finish_init_params(parameterClient_); LOG(INFO) << "paddle_begin_init_params done"; diff --git a/paddle/trainer/NewRemoteParameterUpdater.h b/paddle/trainer/NewRemoteParameterUpdater.h index b7c0425982..1f22c15cef 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.h +++ b/paddle/trainer/NewRemoteParameterUpdater.h @@ -84,7 +84,6 @@ private: for (int i = 0; i < parameterSize(); ++i) { ParameterPtr param = parameters_[i]; - new_params[i]->content_len = 10; new_params[i]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32; new_params[i]->name = (char*)param->getName().c_str(); new_params[i]->content = From ebba2b139bec7fe44fb4d14032011271b68a3fe2 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 14 Jun 2017 10:08:01 +0800 Subject: [PATCH 11/12] update code with new cclient --- go/pserver/cclient/test/main.c | 7 ++- go/pserver/cclient/test/test_cclient.c | 57 ++++++++++---------- paddle/trainer/NewRemoteParameterUpdater.cpp | 11 +--- paddle/trainer/NewRemoteParameterUpdater.h | 7 +-- 4 files changed, 36 insertions(+), 46 deletions(-) diff --git a/go/pserver/cclient/test/main.c b/go/pserver/cclient/test/main.c index c8aed0f2e8..d052f4f5a8 100644 --- a/go/pserver/cclient/test/main.c +++ b/go/pserver/cclient/test/main.c @@ -11,10 +11,9 @@ void sendGrads(paddle_pserver_client c) { unsigned char grad_a[2000] = {2}; unsigned char grad_b[3000] = {3}; - paddle_gradient grads[2] = { - {"param_a", PADDLE_ELEMENT_TYPE_FLOAT32, grad_a, 2000}, - {"param_b", PADDLE_ELEMENT_TYPE_FLOAT32, grad_b, 3000}}; - + paddle_gradient grad1 = {"param_a", PADDLE_ELEMENT_TYPE_FLOAT32, grad_a, 2000}; + paddle_gradient grad2 = {"param_b", PADDLE_ELEMENT_TYPE_FLOAT32, grad_b, 3000}; + paddle_gradient* grads[2] = {&grad1, &grad2}; if (paddle_send_grads(c, grads, 2)) { fail(); } diff --git a/go/pserver/cclient/test/test_cclient.c b/go/pserver/cclient/test/test_cclient.c index 9083064eee..6830479fe9 100644 --- a/go/pserver/cclient/test/test_cclient.c +++ b/go/pserver/cclient/test/test_cclient.c @@ -30,30 +30,36 @@ void print_parameter(paddle_gradient* param) { int main() { char addr[] = "localhost:3000"; - client c = paddle_new_pserver_client(addr, 1); + paddle_pserver_client c = paddle_new_pserver_client(addr, 1); char* names[] = {"param_a", "param_b"}; + retry: + printf("init parameter to pserver:\n"); + + real param_content1[] = {0.1, 0.2, 0.3}; + real param_content2[] = {0.4, 0.5, 0.6}; + paddle_parameter** params = + (paddle_parameter**)malloc(sizeof(paddle_parameter*) * 2); + params[0] = (paddle_parameter*)malloc(sizeof(paddle_parameter)); + params[0]->name = names[0]; + params[0]->content = (unsigned char*)param_content1; + params[0]->content_len = 3 * sizeof(real); + params[0]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32; + + params[1] = (paddle_parameter*)malloc(sizeof(paddle_parameter)); + params[1]->name = names[1]; + params[1]->content = (unsigned char*)param_content2; + params[1]->content_len = 3 * sizeof(real); + params[1]->element_type = PADDLE_ELEMENT_TYPE_INT32; if (paddle_begin_init_params(c)) { - paddle_parameter param; - real param_content1[] = {0.1, 0.2, 0.3}; - param.element_type = PADDLE_ELEMENT_TYPE_FLOAT32; - param.name = names[0]; - param.content = (unsigned char*)param_content1; - param.content_len = 3 * sizeof(real); - if (paddle_init_param(c, param, NULL, 0) != 0) { + if (paddle_init_param(c, *params[0], NULL, 0) != 0) { goto retry; } - real param_content2[] = {0.4, 0.5, 0.6}; - param.element_type = PADDLE_ELEMENT_TYPE_INT32; - param.name = names[1]; - param.content = (unsigned char*)param_content2; - param.content_len = 3 * sizeof(real); - if (paddle_init_param(c, param, NULL, 0) != 0) { + if (paddle_init_param(c, *params[1], NULL, 0) != 0) { goto retry; } - if (paddle_finish_init_params(c) != 0) { goto retry; } @@ -61,13 +67,13 @@ retry: fail(); } - printf("get initialized parameters from pserver:\n"); - paddle_parameter* param_ptrs[2] = {NULL, NULL}; - if (paddle_get_params(c, names, param_ptrs, 2) != 0) { + printf("get inited parameters from pserver:\n"); + // get parameters again by reusing the allocated parameter buffers. + if (paddle_get_params(c, params, 2) != 0) { fail(); } - print_parameter(param_ptrs[0]); - print_parameter(param_ptrs[1]); + print_parameter(params[0]); + print_parameter(params[1]); printf("send gradient to pserver:\n"); real gradient_content1[] = {0.01, 0.02, 0.03}; @@ -87,6 +93,7 @@ retry: grads[1]->content_len = 3 * sizeof(real); grads[1]->element_type = PADDLE_ELEMENT_TYPE_INT32; + printf("print gradient sent to pserver:\n"); print_parameter(grads[0]); print_parameter(grads[1]); @@ -96,15 +103,11 @@ retry: printf("get updated parameters from pserver:\n"); // get parameters again by reusing the allocated parameter buffers. - if (paddle_get_params(c, names, param_ptrs, 2) != 0) { + if (paddle_get_params(c, params, 2) != 0) { fail(); } - - print_parameter(param_ptrs[0]); - print_parameter(param_ptrs[1]); - - paddle_release_param(param_ptrs[0]); - paddle_release_param(param_ptrs[1]); + print_parameter(params[0]); + print_parameter(params[1]); if (paddle_save_model(c, "/tmp/") != 0) { fail(); diff --git a/paddle/trainer/NewRemoteParameterUpdater.cpp b/paddle/trainer/NewRemoteParameterUpdater.cpp index b3655d9d02..3d4d23afc7 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.cpp +++ b/paddle/trainer/NewRemoteParameterUpdater.cpp @@ -25,7 +25,6 @@ NewRemoteParameterUpdater::NewRemoteParameterUpdater( : parameterClient_(-1), newParameters_(nullptr), newGradients_(nullptr), - names_(nullptr), pserverSpec_(pserverSpec) {} void NewRemoteParameterUpdater::init( @@ -41,12 +40,6 @@ void NewRemoteParameterUpdater::init( parameterClient_ = paddle_new_pserver_client((char *)pserverSpec_.c_str(), FLAGS_trainer_id == 0); - // init names_ for get parameter through paddle_cclient - names_ = (char **)malloc(parameterSize() * sizeof(char *)); - for (int i = 0; i < parameterSize(); ++i) { - names_[i] = (char *)parameters_[i]->getName().c_str(); - } - // init new parameter and gradient. newParameters_ = initNewParameter(PARAMETER_VALUE); newGradients_ = initNewParameter(PARAMETER_GRADIENT); @@ -68,7 +61,7 @@ void NewRemoteParameterUpdater::init( LOG(INFO) << "paddle_begin_init_params done"; } else { paddle_get_params( - parameterClient_, names_, newParameters_, parameterSize()); + parameterClient_, newParameters_, parameterSize()); } LOG(INFO) << "NewRemoteParameterUpdater initialized"; @@ -80,7 +73,7 @@ void NewRemoteParameterUpdater::finishBatch(real cost) { // send gradient to parameter server. paddle_send_grads(parameterClient_, newGradients_, parameterSize()); // get the updated parameter from parameterClient. - paddle_get_params(parameterClient_, names_, newParameters_, parameterSize()); + paddle_get_params(parameterClient_, newParameters_, parameterSize()); // clear gradient after update parameter. for (auto ¶ : parameters_) { diff --git a/paddle/trainer/NewRemoteParameterUpdater.h b/paddle/trainer/NewRemoteParameterUpdater.h index 1f22c15cef..f735185f62 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.h +++ b/paddle/trainer/NewRemoteParameterUpdater.h @@ -32,9 +32,6 @@ public: NewRemoteParameterUpdater(const OptimizationConfig& config, const std::string pserverSpec); ~NewRemoteParameterUpdater() { - if (names_ != nullptr) { - free(names_); - } releaseNewParameter(newParameters_); releaseNewParameter(newGradients_); if (parameterClient_ >= 0) paddle_pserver_client_release(parameterClient_); @@ -105,13 +102,11 @@ private: protected: /// internal parameter client object for exchanging data with pserver - client parameterClient_; + paddle_pserver_client parameterClient_; /// the parameters for new pserver client paddle_parameter** newParameters_; /// the gradinets for new pserver client paddle_parameter** newGradients_; - /// the names for new parameters. - char** names_; /// the specification of parameter server "host1:port,host1:port" std::string pserverSpec_; }; From c093a2433600b7666fd8f46aca01f4d5e40b02f6 Mon Sep 17 00:00:00 2001 From: qiaolongfei Date: Wed, 14 Jun 2017 11:27:19 +0800 Subject: [PATCH 12/12] clang format check --- go/pserver/cclient/test/main.c | 9 ++++++--- go/pserver/cclient/test/test_cclient.c | 2 +- paddle/trainer/NewRemoteParameterUpdater.cpp | 3 +-- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/go/pserver/cclient/test/main.c b/go/pserver/cclient/test/main.c index d052f4f5a8..07e1b86b43 100644 --- a/go/pserver/cclient/test/main.c +++ b/go/pserver/cclient/test/main.c @@ -11,8 +11,10 @@ void sendGrads(paddle_pserver_client c) { unsigned char grad_a[2000] = {2}; unsigned char grad_b[3000] = {3}; - paddle_gradient grad1 = {"param_a", PADDLE_ELEMENT_TYPE_FLOAT32, grad_a, 2000}; - paddle_gradient grad2 = {"param_b", PADDLE_ELEMENT_TYPE_FLOAT32, grad_b, 3000}; + paddle_gradient grad1 = { + "param_a", PADDLE_ELEMENT_TYPE_FLOAT32, grad_a, 2000}; + paddle_gradient grad2 = { + "param_b", PADDLE_ELEMENT_TYPE_FLOAT32, grad_b, 3000}; paddle_gradient* grads[2] = {&grad1, &grad2}; if (paddle_send_grads(c, grads, 2)) { fail(); @@ -76,7 +78,8 @@ retry: } } - for (int i = 0; i < 100; i++) { + int i; + for (i = 0; i < 100; i++) { sendGrads(c); getParams(c); } diff --git a/go/pserver/cclient/test/test_cclient.c b/go/pserver/cclient/test/test_cclient.c index 6830479fe9..0f9c2ef801 100644 --- a/go/pserver/cclient/test/test_cclient.c +++ b/go/pserver/cclient/test/test_cclient.c @@ -40,7 +40,7 @@ retry: real param_content1[] = {0.1, 0.2, 0.3}; real param_content2[] = {0.4, 0.5, 0.6}; paddle_parameter** params = - (paddle_parameter**)malloc(sizeof(paddle_parameter*) * 2); + (paddle_parameter**)malloc(sizeof(paddle_parameter*) * 2); params[0] = (paddle_parameter*)malloc(sizeof(paddle_parameter)); params[0]->name = names[0]; params[0]->content = (unsigned char*)param_content1; diff --git a/paddle/trainer/NewRemoteParameterUpdater.cpp b/paddle/trainer/NewRemoteParameterUpdater.cpp index 3d4d23afc7..f25ce2f7f0 100644 --- a/paddle/trainer/NewRemoteParameterUpdater.cpp +++ b/paddle/trainer/NewRemoteParameterUpdater.cpp @@ -60,8 +60,7 @@ void NewRemoteParameterUpdater::init( paddle_finish_init_params(parameterClient_); LOG(INFO) << "paddle_begin_init_params done"; } else { - paddle_get_params( - parameterClient_, newParameters_, parameterSize()); + paddle_get_params(parameterClient_, newParameters_, parameterSize()); } LOG(INFO) << "NewRemoteParameterUpdater initialized";