Merge pull request #2413 from jacquesqiao/newparameterupdater
new parameterupdater use paddle pserver cclient of gogangliao-patch-1
commit
91f82aba5c
@ -0,0 +1,21 @@
|
|||||||
|
# Design Doc: Remote Parameter Updater for Cluster Train
|
||||||
|
|
||||||
|
For an overview of distribute training, please refer to [distributed training design doc](README.md). In this design doc, we will discuss the parameter updater that will use parameter server cclient [The Client Library of Parameter Server Design Doc](pserver_client.md) to manage and update parameters.
|
||||||
|
|
||||||
|
## Parameter Updater
|
||||||
|
|
||||||
|
Parameter Updater is used by trainer to manage and update parameter, there are mainly two kind of parameter updater: local and remote, since this design is for cluster train, we will only discuss remote parameter updater here.
|
||||||
|
|
||||||
|
### Remote Parameter Updater
|
||||||
|
|
||||||
|
Remote Parameter Updater manage parameters through remote parameter server with the client that communicate with pserver([The Client Library of Parameter Server Design Doc](pserver_client.md))
|
||||||
|
|
||||||
|
In PaddlePaddle Python V2 API, trainer is implemented in python, and the trainer will hold a instance of parameter updater and call it's functions directly. In this design, we will also expose the api of RemoteParameterUpdater to python with swig.
|
||||||
|
|
||||||
|
#### Sparse Remote Parameter Updater
|
||||||
|
|
||||||
|
Since we will only implement dense parameter management new, the mechanism for sparse parameter will be discussed in next stage.
|
||||||
|
|
||||||
|
### Interface Design
|
||||||
|
|
||||||
|
TBD
|
@ -1,13 +1,22 @@
|
|||||||
cmake_minimum_required(VERSION 3.0)
|
cmake_minimum_required(VERSION 3.0)
|
||||||
|
|
||||||
include_directories(${CMAKE_BINARY_DIR})
|
|
||||||
|
|
||||||
add_executable(main main.c)
|
add_executable(main main.c)
|
||||||
add_dependencies(main client)
|
add_dependencies(main paddle_pserver_cclient)
|
||||||
|
add_executable(test_cclient test_cclient.c)
|
||||||
|
add_dependencies(test_cclient paddle_pserver_cclient)
|
||||||
|
|
||||||
if(APPLE)
|
if(APPLE)
|
||||||
set(CMAKE_EXE_LINKER_FLAGS "-framework CoreFoundation -framework Security")
|
set(CMAKE_EXE_LINKER_FLAGS "-framework CoreFoundation -framework Security")
|
||||||
else()
|
else()
|
||||||
set(CMAKE_EXE_LINKER_FLAGS "-pthread")
|
set(CMAKE_EXE_LINKER_FLAGS "-pthread")
|
||||||
endif()
|
endif()
|
||||||
target_link_libraries(main ${CMAKE_BINARY_DIR}/libclient.a)
|
|
||||||
|
if(PROJ_ROOT)
|
||||||
|
include_directories(${CMAKE_BINARY_DIR}/go/pserver/cclient/)
|
||||||
|
target_link_libraries(main ${CMAKE_BINARY_DIR}/go/pserver/cclient/libpaddle_pserver_cclient.a pthread)
|
||||||
|
target_link_libraries(test_cclient ${CMAKE_BINARY_DIR}/go/pserver/cclient/libpaddle_pserver_cclient.a pthread)
|
||||||
|
else(PROJ_ROOT)
|
||||||
|
include_directories(${CMAKE_BINARY_DIR})
|
||||||
|
target_link_libraries(main ${CMAKE_BINARY_DIR}/libpaddle_pserver_cclient.a pthread)
|
||||||
|
target_link_libraries(test_cclient ${CMAKE_BINARY_DIR}/libpaddle_pserver_cclient.a pthread)
|
||||||
|
endif(PROJ_ROOT)
|
||||||
|
@ -0,0 +1,117 @@
|
|||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
|
||||||
|
#include "libpaddle_pserver_cclient.h"
|
||||||
|
|
||||||
|
typedef float real;
|
||||||
|
|
||||||
|
void fail() {
|
||||||
|
// TODO(helin): fix: gtest using cmake is not working, using this
|
||||||
|
// hacky way for now.
|
||||||
|
printf("test failed.\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
void print_parameter(paddle_gradient* param) {
|
||||||
|
if (param == NULL) {
|
||||||
|
printf("param is NULL!!\n");
|
||||||
|
} else {
|
||||||
|
printf("==== parameter ====\n");
|
||||||
|
printf("name: %s\n", param->name);
|
||||||
|
printf("content_len: %d\n", param->content_len);
|
||||||
|
printf("content_type: %d\n", param->element_type);
|
||||||
|
int i;
|
||||||
|
for (i = 0; i < param->content_len / (int)sizeof(real); ++i) {
|
||||||
|
printf("%f ", ((float*)param->content)[i]);
|
||||||
|
}
|
||||||
|
printf("\n\n");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
char addr[] = "localhost:3000";
|
||||||
|
paddle_pserver_client c = paddle_new_pserver_client(addr, 1);
|
||||||
|
|
||||||
|
char* names[] = {"param_a", "param_b"};
|
||||||
|
|
||||||
|
retry:
|
||||||
|
printf("init parameter to pserver:\n");
|
||||||
|
|
||||||
|
real param_content1[] = {0.1, 0.2, 0.3};
|
||||||
|
real param_content2[] = {0.4, 0.5, 0.6};
|
||||||
|
paddle_parameter** params =
|
||||||
|
(paddle_parameter**)malloc(sizeof(paddle_parameter*) * 2);
|
||||||
|
params[0] = (paddle_parameter*)malloc(sizeof(paddle_parameter));
|
||||||
|
params[0]->name = names[0];
|
||||||
|
params[0]->content = (unsigned char*)param_content1;
|
||||||
|
params[0]->content_len = 3 * sizeof(real);
|
||||||
|
params[0]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
|
||||||
|
|
||||||
|
params[1] = (paddle_parameter*)malloc(sizeof(paddle_parameter));
|
||||||
|
params[1]->name = names[1];
|
||||||
|
params[1]->content = (unsigned char*)param_content2;
|
||||||
|
params[1]->content_len = 3 * sizeof(real);
|
||||||
|
params[1]->element_type = PADDLE_ELEMENT_TYPE_INT32;
|
||||||
|
|
||||||
|
if (paddle_begin_init_params(c)) {
|
||||||
|
if (paddle_init_param(c, *params[0], NULL, 0) != 0) {
|
||||||
|
goto retry;
|
||||||
|
}
|
||||||
|
if (paddle_init_param(c, *params[1], NULL, 0) != 0) {
|
||||||
|
goto retry;
|
||||||
|
}
|
||||||
|
if (paddle_finish_init_params(c) != 0) {
|
||||||
|
goto retry;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("get inited parameters from pserver:\n");
|
||||||
|
// get parameters again by reusing the allocated parameter buffers.
|
||||||
|
if (paddle_get_params(c, params, 2) != 0) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
print_parameter(params[0]);
|
||||||
|
print_parameter(params[1]);
|
||||||
|
|
||||||
|
printf("send gradient to pserver:\n");
|
||||||
|
real gradient_content1[] = {0.01, 0.02, 0.03};
|
||||||
|
real gradinet_content2[] = {0.04, 0.05, 0.06};
|
||||||
|
|
||||||
|
paddle_gradient** grads =
|
||||||
|
(paddle_gradient**)malloc(sizeof(paddle_gradient*) * 2);
|
||||||
|
grads[0] = (paddle_gradient*)malloc(sizeof(paddle_gradient));
|
||||||
|
grads[0]->name = names[0];
|
||||||
|
grads[0]->content = (unsigned char*)gradient_content1;
|
||||||
|
grads[0]->content_len = 3 * sizeof(real);
|
||||||
|
grads[0]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
|
||||||
|
|
||||||
|
grads[1] = (paddle_gradient*)malloc(sizeof(paddle_gradient));
|
||||||
|
grads[1]->name = names[1];
|
||||||
|
grads[1]->content = (unsigned char*)gradinet_content2;
|
||||||
|
grads[1]->content_len = 3 * sizeof(real);
|
||||||
|
grads[1]->element_type = PADDLE_ELEMENT_TYPE_INT32;
|
||||||
|
|
||||||
|
printf("print gradient sent to pserver:\n");
|
||||||
|
print_parameter(grads[0]);
|
||||||
|
print_parameter(grads[1]);
|
||||||
|
|
||||||
|
if (paddle_send_grads(c, grads, 2) != 0) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("get updated parameters from pserver:\n");
|
||||||
|
// get parameters again by reusing the allocated parameter buffers.
|
||||||
|
if (paddle_get_params(c, params, 2) != 0) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
print_parameter(params[0]);
|
||||||
|
print_parameter(params[1]);
|
||||||
|
|
||||||
|
if (paddle_save_model(c, "/tmp/") != 0) {
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
@ -0,0 +1,131 @@
|
|||||||
|
import paddle.v2 as paddle
|
||||||
|
import gzip
|
||||||
|
|
||||||
|
|
||||||
|
def softmax_regression(img):
|
||||||
|
predict = paddle.layer.fc(input=img,
|
||||||
|
size=10,
|
||||||
|
act=paddle.activation.Softmax())
|
||||||
|
return predict
|
||||||
|
|
||||||
|
|
||||||
|
def multilayer_perceptron(img):
|
||||||
|
# The first fully-connected layer
|
||||||
|
hidden1 = paddle.layer.fc(input=img, size=128, act=paddle.activation.Relu())
|
||||||
|
# The second fully-connected layer and the according activation function
|
||||||
|
hidden2 = paddle.layer.fc(input=hidden1,
|
||||||
|
size=64,
|
||||||
|
act=paddle.activation.Relu())
|
||||||
|
# The thrid fully-connected layer, note that the hidden size should be 10,
|
||||||
|
# which is the number of unique digits
|
||||||
|
predict = paddle.layer.fc(input=hidden2,
|
||||||
|
size=10,
|
||||||
|
act=paddle.activation.Softmax())
|
||||||
|
return predict
|
||||||
|
|
||||||
|
|
||||||
|
def convolutional_neural_network(img):
|
||||||
|
# first conv layer
|
||||||
|
conv_pool_1 = paddle.networks.simple_img_conv_pool(
|
||||||
|
input=img,
|
||||||
|
filter_size=5,
|
||||||
|
num_filters=20,
|
||||||
|
num_channel=1,
|
||||||
|
pool_size=2,
|
||||||
|
pool_stride=2,
|
||||||
|
act=paddle.activation.Tanh())
|
||||||
|
# second conv layer
|
||||||
|
conv_pool_2 = paddle.networks.simple_img_conv_pool(
|
||||||
|
input=conv_pool_1,
|
||||||
|
filter_size=5,
|
||||||
|
num_filters=50,
|
||||||
|
num_channel=20,
|
||||||
|
pool_size=2,
|
||||||
|
pool_stride=2,
|
||||||
|
act=paddle.activation.Tanh())
|
||||||
|
# The first fully-connected layer
|
||||||
|
fc1 = paddle.layer.fc(input=conv_pool_2,
|
||||||
|
size=128,
|
||||||
|
act=paddle.activation.Tanh())
|
||||||
|
# The softmax layer, note that the hidden size should be 10,
|
||||||
|
# which is the number of unique digits
|
||||||
|
predict = paddle.layer.fc(input=fc1,
|
||||||
|
size=10,
|
||||||
|
act=paddle.activation.Softmax())
|
||||||
|
return predict
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
paddle.init(use_gpu=False, trainer_count=1)
|
||||||
|
|
||||||
|
# define network topology
|
||||||
|
images = paddle.layer.data(
|
||||||
|
name='pixel', type=paddle.data_type.dense_vector(784))
|
||||||
|
label = paddle.layer.data(
|
||||||
|
name='label', type=paddle.data_type.integer_value(10))
|
||||||
|
|
||||||
|
# Here we can build the prediction network in different ways. Please
|
||||||
|
# choose one by uncomment corresponding line.
|
||||||
|
predict = softmax_regression(images)
|
||||||
|
#predict = multilayer_perceptron(images)
|
||||||
|
#predict = convolutional_neural_network(images)
|
||||||
|
|
||||||
|
cost = paddle.layer.classification_cost(input=predict, label=label)
|
||||||
|
parameters = paddle.parameters.create(cost)
|
||||||
|
|
||||||
|
optimizer = paddle.optimizer.Momentum(
|
||||||
|
learning_rate=0.1 / 128.0,
|
||||||
|
momentum=0.9,
|
||||||
|
regularization=paddle.optimizer.L2Regularization(rate=0.0005 * 128))
|
||||||
|
|
||||||
|
trainer = paddle.trainer.SGD(cost=cost,
|
||||||
|
parameters=parameters,
|
||||||
|
update_equation=optimizer,
|
||||||
|
is_local=False,
|
||||||
|
pserver_spec="localhost:3000")
|
||||||
|
|
||||||
|
lists = []
|
||||||
|
|
||||||
|
def event_handler(event):
|
||||||
|
if isinstance(event, paddle.event.EndIteration):
|
||||||
|
if event.batch_id % 1000 == 0:
|
||||||
|
print "Pass %d, Batch %d, Cost %f, %s" % (
|
||||||
|
event.pass_id, event.batch_id, event.cost, event.metrics)
|
||||||
|
|
||||||
|
elif isinstance(event, paddle.event.EndPass):
|
||||||
|
result = trainer.test(reader=paddle.batch(
|
||||||
|
paddle.dataset.mnist.test(), batch_size=128))
|
||||||
|
print "Test with Pass %d, Cost %f, %s\n" % (
|
||||||
|
event.pass_id, result.cost, result.metrics)
|
||||||
|
lists.append((event.pass_id, result.cost,
|
||||||
|
result.metrics['classification_error_evaluator']))
|
||||||
|
|
||||||
|
trainer.train(
|
||||||
|
reader=paddle.batch(
|
||||||
|
paddle.reader.shuffle(
|
||||||
|
paddle.dataset.mnist.train(), buf_size=8192),
|
||||||
|
batch_size=128),
|
||||||
|
event_handler=event_handler,
|
||||||
|
num_passes=100)
|
||||||
|
|
||||||
|
# find the best pass
|
||||||
|
best = sorted(lists, key=lambda list: float(list[1]))[0]
|
||||||
|
print 'Best pass is %s, testing Avgcost is %s' % (best[0], best[1])
|
||||||
|
print 'The classification accuracy is %.2f%%' % (100 - float(best[2]) * 100)
|
||||||
|
|
||||||
|
test_creator = paddle.dataset.mnist.test()
|
||||||
|
test_data = []
|
||||||
|
for item in test_creator():
|
||||||
|
test_data.append((item[0], ))
|
||||||
|
if len(test_data) == 100:
|
||||||
|
break
|
||||||
|
|
||||||
|
# output is a softmax layer. It returns probabilities.
|
||||||
|
# Shape should be (100, 10)
|
||||||
|
probs = paddle.infer(
|
||||||
|
output_layer=predict, parameters=parameters, input=test_data)
|
||||||
|
print probs.shape
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
@ -0,0 +1,60 @@
|
|||||||
|
import paddle.v2 as paddle
|
||||||
|
import paddle.v2.dataset.uci_housing as uci_housing
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
# init
|
||||||
|
paddle.init(use_gpu=False, trainer_count=1)
|
||||||
|
|
||||||
|
# network config
|
||||||
|
x = paddle.layer.data(name='x', type=paddle.data_type.dense_vector(13))
|
||||||
|
y_predict = paddle.layer.fc(input=x,
|
||||||
|
param_attr=paddle.attr.Param(name='w'),
|
||||||
|
size=1,
|
||||||
|
act=paddle.activation.Linear(),
|
||||||
|
bias_attr=paddle.attr.Param(name='b'))
|
||||||
|
y = paddle.layer.data(name='y', type=paddle.data_type.dense_vector(1))
|
||||||
|
cost = paddle.layer.mse_cost(input=y_predict, label=y)
|
||||||
|
|
||||||
|
# create parameters
|
||||||
|
parameters = paddle.parameters.create(cost)
|
||||||
|
|
||||||
|
# create optimizer
|
||||||
|
optimizer = paddle.optimizer.Momentum(momentum=0)
|
||||||
|
|
||||||
|
trainer = paddle.trainer.SGD(cost=cost,
|
||||||
|
parameters=parameters,
|
||||||
|
update_equation=optimizer,
|
||||||
|
is_local=False,
|
||||||
|
pserver_spec="localhost:3000")
|
||||||
|
|
||||||
|
# event_handler to print training and testing info
|
||||||
|
def event_handler(event):
|
||||||
|
if isinstance(event, paddle.event.EndIteration):
|
||||||
|
if event.batch_id % 100 == 0:
|
||||||
|
print "Pass %d, Batch %d, Cost %f" % (
|
||||||
|
event.pass_id, event.batch_id, event.cost)
|
||||||
|
|
||||||
|
if isinstance(event, paddle.event.EndPass):
|
||||||
|
if (event.pass_id + 1) % 10 == 0:
|
||||||
|
result = trainer.test(
|
||||||
|
reader=paddle.batch(
|
||||||
|
uci_housing.test(), batch_size=2),
|
||||||
|
feeding={'x': 0,
|
||||||
|
'y': 1})
|
||||||
|
print "Test %d, %.2f" % (event.pass_id, result.cost)
|
||||||
|
|
||||||
|
# training
|
||||||
|
trainer.train(
|
||||||
|
reader=paddle.batch(
|
||||||
|
paddle.reader.shuffle(
|
||||||
|
uci_housing.train(), buf_size=500),
|
||||||
|
batch_size=2),
|
||||||
|
feeding={'x': 0,
|
||||||
|
'y': 1},
|
||||||
|
event_handler=event_handler,
|
||||||
|
num_passes=30)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
@ -0,0 +1,86 @@
|
|||||||
|
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License. */
|
||||||
|
|
||||||
|
#include "NewRemoteParameterUpdater.h"
|
||||||
|
#include "Trainer.h"
|
||||||
|
#include "paddle/utils/Stat.h"
|
||||||
|
|
||||||
|
DECLARE_int32(trainer_id);
|
||||||
|
DECLARE_string(save_dir);
|
||||||
|
|
||||||
|
namespace paddle {
|
||||||
|
NewRemoteParameterUpdater::NewRemoteParameterUpdater(
|
||||||
|
const OptimizationConfig &config, const std::string pserverSpec)
|
||||||
|
: parameterClient_(-1),
|
||||||
|
newParameters_(nullptr),
|
||||||
|
newGradients_(nullptr),
|
||||||
|
pserverSpec_(pserverSpec) {}
|
||||||
|
|
||||||
|
void NewRemoteParameterUpdater::init(
|
||||||
|
const std::vector<ParameterPtr> ¶meters) {
|
||||||
|
ParameterUpdater::init(parameters);
|
||||||
|
|
||||||
|
for (auto ¶ : parameters_) {
|
||||||
|
para->getBuf(PARAMETER_VALUE)->zeroMem();
|
||||||
|
para->getBuf(PARAMETER_GRADIENT)->zeroMem();
|
||||||
|
}
|
||||||
|
|
||||||
|
// create parameter server client.
|
||||||
|
parameterClient_ = paddle_new_pserver_client((char *)pserverSpec_.c_str(),
|
||||||
|
FLAGS_trainer_id == 0);
|
||||||
|
|
||||||
|
// init new parameter and gradient.
|
||||||
|
newParameters_ = initNewParameter(PARAMETER_VALUE);
|
||||||
|
newGradients_ = initNewParameter(PARAMETER_GRADIENT);
|
||||||
|
|
||||||
|
// init parameter, one trainer will get the opportunity to int parameter and
|
||||||
|
// send them to parameter server. Others will get the initialized parameter
|
||||||
|
// from parameter server
|
||||||
|
if (paddle_begin_init_params(parameterClient_)) {
|
||||||
|
LOG(INFO) << "paddle_begin_init_params start";
|
||||||
|
for (int i = 0; i < parameterSize(); ++i) {
|
||||||
|
auto paramConfig = parameters_[i]->getConfig();
|
||||||
|
std::string bytes = paramConfig.SerializeAsString();
|
||||||
|
const char *array = bytes.data();
|
||||||
|
int size = (int)bytes.size();
|
||||||
|
paddle_init_param(
|
||||||
|
parameterClient_, *newParameters_[i], (void *)array, size);
|
||||||
|
}
|
||||||
|
paddle_finish_init_params(parameterClient_);
|
||||||
|
LOG(INFO) << "paddle_begin_init_params done";
|
||||||
|
} else {
|
||||||
|
paddle_get_params(parameterClient_, newParameters_, parameterSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG(INFO) << "NewRemoteParameterUpdater initialized";
|
||||||
|
}
|
||||||
|
|
||||||
|
void NewRemoteParameterUpdater::updateImpl(Parameter *para) {}
|
||||||
|
|
||||||
|
void NewRemoteParameterUpdater::finishBatch(real cost) {
|
||||||
|
// send gradient to parameter server.
|
||||||
|
paddle_send_grads(parameterClient_, newGradients_, parameterSize());
|
||||||
|
// get the updated parameter from parameterClient.
|
||||||
|
paddle_get_params(parameterClient_, newParameters_, parameterSize());
|
||||||
|
|
||||||
|
// clear gradient after update parameter.
|
||||||
|
for (auto ¶ : parameters_) {
|
||||||
|
para->getBuf(PARAMETER_GRADIENT)->zeroMem();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void NewRemoteParameterUpdater::startPass() {}
|
||||||
|
|
||||||
|
bool NewRemoteParameterUpdater::finishPass() { return true; }
|
||||||
|
}
|
@ -0,0 +1,114 @@
|
|||||||
|
/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License. */
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <functional>
|
||||||
|
#include <thread>
|
||||||
|
#include "ParameterUpdater.h"
|
||||||
|
#include "libpaddle_pserver_cclient.h"
|
||||||
|
#include "paddle/pserver/ParameterClient2.h"
|
||||||
|
#include "paddle/utils/Queue.h"
|
||||||
|
#include "paddle/utils/Util.h"
|
||||||
|
|
||||||
|
namespace paddle {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* New remote parameter updater for dense parameters that use cclient of go.
|
||||||
|
*/
|
||||||
|
class NewRemoteParameterUpdater : public ParameterUpdater {
|
||||||
|
public:
|
||||||
|
NewRemoteParameterUpdater(const OptimizationConfig& config,
|
||||||
|
const std::string pserverSpec);
|
||||||
|
~NewRemoteParameterUpdater() {
|
||||||
|
releaseNewParameter(newParameters_);
|
||||||
|
releaseNewParameter(newGradients_);
|
||||||
|
if (parameterClient_ >= 0) paddle_pserver_client_release(parameterClient_);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* initialize the internal parameter client and itself.
|
||||||
|
*/
|
||||||
|
virtual void init(const std::vector<ParameterPtr>& parameters);
|
||||||
|
/**
|
||||||
|
* @brief start batch
|
||||||
|
*
|
||||||
|
* @note one batch training exhibits stateful feature to help
|
||||||
|
* to do performance tuning, sgd optimization if necessary.
|
||||||
|
*/
|
||||||
|
virtual PassType startBatch(int64_t batchSize) { return PASS_TRAIN; }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* send parameters to pservers and get returned parameters
|
||||||
|
* from all pservers if necessary.
|
||||||
|
*/
|
||||||
|
virtual void finishBatch(real cost);
|
||||||
|
virtual void startPass();
|
||||||
|
virtual bool finishPass();
|
||||||
|
|
||||||
|
protected:
|
||||||
|
/**
|
||||||
|
* work need to do after finishBatch
|
||||||
|
*/
|
||||||
|
virtual void updateImpl(Parameter* para);
|
||||||
|
|
||||||
|
private:
|
||||||
|
int parameterSize() { return (int)parameters_.size(); }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* init parameter of go paddle pserver cclient.
|
||||||
|
* @param new_params
|
||||||
|
* @param type
|
||||||
|
*/
|
||||||
|
paddle_parameter** initNewParameter(ParameterType type) {
|
||||||
|
paddle_parameter** new_params =
|
||||||
|
(paddle_parameter**)malloc(sizeof(paddle_parameter*) * parameterSize());
|
||||||
|
for (int i = 0; i < parameterSize(); ++i) {
|
||||||
|
new_params[i] = (paddle_parameter*)malloc(sizeof(paddle_parameter));
|
||||||
|
memset(new_params[i], 0, sizeof(paddle_parameter));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < parameterSize(); ++i) {
|
||||||
|
ParameterPtr param = parameters_[i];
|
||||||
|
new_params[i]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
|
||||||
|
new_params[i]->name = (char*)param->getName().c_str();
|
||||||
|
new_params[i]->content =
|
||||||
|
(unsigned char*)(param->getBuf(type).get()->getData());
|
||||||
|
new_params[i]->content_len =
|
||||||
|
(int)param->getBuf(type).get()->getSize() * sizeof(real);
|
||||||
|
}
|
||||||
|
return new_params;
|
||||||
|
}
|
||||||
|
|
||||||
|
void releaseNewParameter(paddle_parameter** newParams) {
|
||||||
|
if (newParams != nullptr) {
|
||||||
|
for (int i = 0; i < parameterSize(); ++i) {
|
||||||
|
free(newParams[i]);
|
||||||
|
}
|
||||||
|
free(newParams);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected:
|
||||||
|
/// internal parameter client object for exchanging data with pserver
|
||||||
|
paddle_pserver_client parameterClient_;
|
||||||
|
/// the parameters for new pserver client
|
||||||
|
paddle_parameter** newParameters_;
|
||||||
|
/// the gradinets for new pserver client
|
||||||
|
paddle_parameter** newGradients_;
|
||||||
|
/// the specification of parameter server "host1:port,host1:port"
|
||||||
|
std::string pserverSpec_;
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace paddle
|
Loading…
Reference in new issue