Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into seq_expand_op

fix-typo
wanghaoshuang 7 years ago
commit 35e79448a0

1
.gitignore vendored

@ -28,3 +28,4 @@ cmake_install.cmake
paddle/.timestamp
python/paddlepaddle.egg-info/
paddle/pybind/pybind.h
python/paddle/v2/framework/tests/tmp/*

@ -8,7 +8,7 @@ ExternalProject_Add(
extern_eigen3
${EXTERNAL_PROJECT_LOG_ARGS}
GIT_REPOSITORY "https://github.com/RLovelett/eigen.git"
GIT_TAG 4e79cb69b9425f5f8c3a84be4350d4ab75b5fd9d
GIT_TAG 70661066beef694cadf6c304d0d07e0758825c10
PREFIX ${EIGEN_SOURCE_DIR}
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""

@ -1,9 +1,8 @@
INCLUDE(ExternalProject)
include(ExternalProject)
SET(NCCL_SOURCE_DIR ${THIRD_PARTY_PATH}/nccl)
INCLUDE_DIRECTORIES(${NCCL_SOURCE_DIR}/src/extern_nccl/src)
set(NCCL_SOURCE_DIR ${THIRD_PARTY_PATH}/nccl)
include_directories(${NCCL_SOURCE_DIR}/src/extern_nccl/src)
if(WITH_DSO)
# If we use DSO, we do not build nccl, just download the dependencies
@ -12,39 +11,39 @@ if(WITH_DSO)
set(NCCL_INSTALL_DIR "")
else()
# otherwise, we build nccl and link it.
set(NCCL_INSTALL_DIR ${THIRD_PARTY_PATH}/install/nccl)
# Note: cuda 8.0 is needed to make nccl
# When cuda is not installed on the system directory, need to set CUDA_HOME to your cuda root
set(NCCL_BUILD_COMMAND "make -j 8")
set(NCCL_INSTALL_COMMAND "make install")
SET(NCCL_INSTALL_DIR ${THIRD_PARTY_PATH}/install/nccl)
set(NCCL_INSTALL_COMMAND "make install PREFIX=${NCCL_INSTALL_DIR}")
endif()
ExternalProject_Add(
extern_nccl
${EXTERNAL_PROJECT_LOG_ARGS}
GIT_REPOSITORY "https://github.com/NVIDIA/nccl.git"
GIT_TAG "v1.3.4-1"
PREFIX "${NCCL_SOURCE_DIR}"
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
BUILD_COMMAND "${NCCL_BUILD_COMMAND}"
INSTALL_COMMAND "${NCCL_INSTALL_COMMAND}"
INSTALL_DIR "${NCCL_INSTALL_DIR}"
TEST_COMMAND ""
extern_nccl
${EXTERNAL_PROJECT_LOG_ARGS}
GIT_REPOSITORY "https://github.com/NVIDIA/nccl.git"
GIT_TAG "v1.3.4-1"
PREFIX "${NCCL_SOURCE_DIR}"
UPDATE_COMMAND ""
CONFIGURE_COMMAND ""
BUILD_COMMAND "${NCCL_BUILD_COMMAND}"
INSTALL_COMMAND "${NCCL_INSTALL_COMMAND}"
INSTALL_DIR "${NCCL_INSTALL_DIR}"
TEST_COMMAND ""
)
if (WITH_DSO)
if (${CMAKE_VERSION} VERSION_LESS "3.3.0")
set(dummyfile ${CMAKE_CURRENT_BINARY_DIR}/lib_any_dummy.c)
file(WRITE ${dummyfile} "const char * dummy_any = \"${dummyfile}\";")
if(WITH_DSO)
if(${CMAKE_VERSION} VERSION_LESS "3.3.0")
set(dummyfile ${CMAKE_CURRENT_BINARY_DIR}/lib_nccl_dummy.c)
file(WRITE ${dummyfile} "const char * dummy_nccl = \"${dummyfile}\";")
add_library(nccl STATIC ${dummyfile})
else()
add_library(nccl INTERFACE)
endif()
else()
ADD_LIBRARY(nccl STATIC IMPORTED GLOBAL)
SET_PROPERTY(TARGET nccl PROPERTY IMPORTED_LOCATION
${NCCL_INSTALL_DIR}/lib/libnccl.a)
add_library(nccl STATIC IMPORTED GLOBAL)
set_property(TARGET nccl PROPERTY IMPORTED_LOCATION
${NCCL_INSTALL_DIR}/lib/libnccl_static.a)
endif()
add_dependencies(nccl extern_nccl)
LIST(APPEND external_project_dependencies nccl)

@ -0,0 +1,232 @@
## Survey on Graph
Neural network framework often provides symbolic API for users to write network topology conveniently. This doc manily focus on symbolic API in most popular neural network frameworks, and try to find out how to parse symbolic configuration to a portable file, such as protobuf or json.
### Mxnet
The core concept of symbolic API is `Symbol`. Mxnet implements `Symbol` class in C++, and export to Python using C-API. Please refer to the comments in Mxnet:
`Symbol` is help class used to represent the operator node in Graph.
`Symbol` acts as an interface for building graphs from different components like Variable, Functor and Group. `Symbol` is also exported to python front-end (while Graph is not) to enable quick test and deployment. Conceptually, symbol is the final operation of a graph and thus including all the information required (the graph) to evaluate its output value.
A simple network topology wrote by Symbol is as follows:
```python
def get_symbol(num_classes=10, **kwargs):
data = mx.symbol.Variable('data')
data = mx.symbol.Flatten(data=data)
fc1 = mx.symbol.FullyConnected(data = data, name='fc1', num_hidden=128)
act1 = mx.symbol.Activation(data = fc1, name='relu1', act_type="relu")
fc2 = mx.symbol.FullyConnected(data = act1, name = 'fc2', num_hidden = 64)
act2 = mx.symbol.Activation(data = fc2, name='relu2', act_type="relu")
fc3 = mx.symbol.FullyConnected(data = act2, name='fc3', num_hidden=num_classes)
mlp = mx.symbol.SoftmaxOutput(data = fc3, name = 'softmax')
return mlp
```
Varible here is actually a Symbol. Every basic Symbol will correspond to one Node, and every Node has its own NodeAttr. There is a op field in NodeAttr class, when a Symbol represents Variable(often input data), the op field is null.
Symbol contains a data member, std::vector<NodeEntry> outputs, and NodeEntry cantains a poniter to Node. We can follow the Node pointer to get all the Graph.
And Symbol can be saved to a Json file.
Here is a detailed example:
```
>>> import mxnet as mx
>>> data = mx.symbol.Variable('data')
>>> print data.debug_str()
Variable:data
>>> data = mx.symbol.Flatten(data=data)
>>> print data.debug_str()
Symbol Outputs:
output[0]=flatten0(0)
Variable:data
--------------------
Op:Flatten, Name=flatten0
Inputs:
arg[0]=data(0) version=0
>>> fc1 = mx.symbol.FullyConnected(data = data, name='fc1', num_hidden=128)
>>> print fc1.debug_str()
Symbol Outputs:
output[0]=fc1(0)
Variable:data
--------------------
Op:Flatten, Name=flatten0
Inputs:
arg[0]=data(0) version=0
Variable:fc1_weight
Variable:fc1_bias
--------------------
Op:FullyConnected, Name=fc1
Inputs:
arg[0]=flatten0(0)
arg[1]=fc1_weight(0) version=0
arg[2]=fc1_bias(0) version=0
Attrs:
num_hidden=128
```
### TensorFlow
The core concept of symbolic API is `Tensor`. Tensorflow defines `Tensor` in Python. Please refer to the comments in TensorFlow:
A `Tensor` is a symbolic handle to one of the outputs of an `Operation`. It does not hold the values of that operation's output, but instead provides a means of computing those values in a TensorFlow [Session](https://www.tensorflow.org/api_docs/python/tf/Session).
A simple example is as follows:
```python
# Build a dataflow graph.
c = tf.constant([[1.0, 2.0], [3.0, 4.0]])
d = tf.constant([[1.0, 1.0], [0.0, 1.0]])
e = tf.matmul(c, d)
# Construct a `Session` to execute the graph.
sess = tf.Session()
# Execute the graph and store the value that `e` represents in `result`.
result = sess.run(e)
```
The main method of `Tensor` is as follows:
```python
@property
def op(self):
"""The `Operation` that produces this tensor as an output."""
return self._op
@property
def dtype(self):
"""The `DType` of elements in this tensor."""
return self._dtype
@property
def graph(self):
"""The `Graph` that contains this tensor."""
return self._op.graph
@property
def name(self):
"""The string name of this tensor."""
if not self._op.name:
raise ValueError("Operation was not named: %s" % self._op)
return "%s:%d" % (self._op.name, self._value_index)
@property
def device(self):
"""The name of the device on which this tensor will be produced, or None."""
return self._op.device
```
Tensor can be taken as target to run by session. Tensor contains all the information of Graph, and tracks data dependency.
Here is a detailed example:
```
>>> import tensorflow as tf
>>> c = tf.constant([[1.0, 2.0], [3.0, 4.0]])
>>> print c.graph
<tensorflow.python.framework.ops.Graph object at 0x10f256d50>
>>> d = tf.constant([[1.0, 1.0], [0.0, 1.0]])
>>> print d.graph
<tensorflow.python.framework.ops.Graph object at 0x10f256d50>
>>> e = tf.matmul(c, d)
>>> print e.graph
<tensorflow.python.framework.ops.Graph object at 0x10f256d50>
```
### Dynet
The core concept of symbolic API is `Expression`, and Dynet defines `Expression` class in C++.
A simple example is as follows:
```cpp
ComputationGraph cg;
Expression W = parameter(cg, pW);
Expression in = input(cg, xs[i]);
Expression label = input(cg, ys[i]);
Expression pred = W * in;
Expression loss = square(pred - label);
```
The input data and parameter are also represented by Expression. Every basci Expression corresponds to a Node. And input data is also a Node.
Expression has a data member ComputationGraph, and ComputationGraph will be modified in users' configuring process. Expression can be a running target, beacuse Expression contains all dependency.
Here is a detailed example:
write topology in C++
```
ComputationGraph cg;
Expression W = parameter(cg, pW);
cg.print_graphviz();
Expression pred = W * xs[i];
cg.print_graphviz();
Expression loss = square(pred - ys[i]);
cg.print_graphviz();
```
compile and print
```
# first print
digraph G {
rankdir=LR;
nodesep=.05;
N0 [label="v0 = parameters({1}) @ 0x7ffe4de00110"];
}
# second print
digraph G {
rankdir=LR;
nodesep=.05;
N0 [label="v0 = parameters({1}) @ 0x7ffe4de00110"];
N1 [label="v1 = v0 * -0.98"];
N0 -> N1;
}
# third print
digraph G {
rankdir=LR;
nodesep=.05;
N0 [label="v0 = parameters({1}) @ 0x7ffe4de00110"];
N1 [label="v1 = v0 * -0.98"];
N0 -> N1;
N2 [label="v2 = -1.88387 - v1"];
N1 -> N2;
N3 [label="v3 = -v2"];
N2 -> N3;
N4 [label="v4 = square(v3)"];
N3 -> N4;
}
```
### Conclusion
Actually, Symbol/Tensor/Expression in Mxnet/TensorFlow/Dynet are the same level concepts. We use a unified name Expression here, this level concept has following features:
- Users wirte topoloy with symbolic API, and all return value is Expression, including input data and parameter.
- Expression corresponds with a global Graph, and Expression can also be composed.
- Expression tracks all dependency and can be taken as a run target

@ -0,0 +1,36 @@
# Design Doc: Model Format
## Motivation
A model is an output of the training process. One complete model consists of two parts, the **topology** and the **parameters**. In order to support industrial deployment, the model format must be self-complete and must not expose any training source code.
As a result, In PaddlePaddle, the **topology** is represented as a [ProgramDesc](https://github.com/PaddlePaddle/Paddle/blob/1c0a4c901c9fc881d120249c703b15d1c50dae7d/doc/design/program.md), which describes the model structure. The **parameters** contain all the trainable weights in the model. We must support large size parameters and efficient serialization/deserialization of parameters.
## Implementation
The topology is saved as a plain text in a detailed self-contain protobuf file.
The parameters are saved as a binary file. As we all know, the protobuf message has a limit of [64M size](https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.coded_stream#CodedInputStream.SetTotalBytesLimit.details). We have done a [benchmark experiment](https://github.com/PaddlePaddle/Paddle/pull/4610), which shows that protobuf is not fit for the task.
As a result, we design a particular format for tensor serialization. By default, an arbitrary tensor in Paddle is a [LoDTensor](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/lod_tensor.md), and has a description information proto of [LoDTensorDesc](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/framework.proto#L99). We save the DescProto as the byte string header. It contains all the necessary information, such as the `dims`, and the `LoD` information in [LoDTensor](https://github.com/PaddlePaddle/Paddle/blob/1c0a4c901c9fc881d120249c703b15d1c50dae7d/paddle/framework/lod_tensor.md). A tensor stores values in a continuous memory buffer. For speed we dump the raw memory to disk and save it as the byte string content. So, the binary format of one tensor is,
The table below shows a tensor's byte view in detail. Note that all the signed values are written in the little-endian format.
|field name | type | description |
| --- | --- | --- |
| version | uint32_t | Version of saved file. Always 0 now. |
| tensor desc length | uint32_t | TensorDesc(Protobuf message) length in bytes. |
| tensor desc | void* | TensorDesc protobuf binary message |
| tensor data | void* | Tensor's data in binary format. The length of `tensor_data` is decided by `TensorDesc.dims()` and `TensorDesc.data_type()` |
| lod_level | uint64_t | Level of LoD |
| length of lod[0] | uint64_t | [Optional] length of lod[0] in bytes. |
| data of lod[0] | uint64_t* | [Optional] lod[0].data() |
| ... | ... | ... |
## Summary
- We introduce a model format.
- The model represented by its forward-pass computation procedure is saved in a **ProgramDesc** protobuf message.
- A bunch of specified format binary tensors describe the **parameters**.

@ -65,20 +65,6 @@ class Optimizer(object):
def __init__(self):
pass
def create_backward_pass(self, loss, parameter_list=None):
"""
create and add gradient Operators in BlockDesc to Compute gradients of `loss`
for parameters in parameter_list
Args:
loss: an variable generated by cost function.
parameter_list: parameters that need to compute gradient and update to optimize the lost.
Returns:
list of (parameters, gradients) pair.
"""
return None
def create_optimization_pass(self, parameters_and_grads):
"""Add optimization operators to update gradients to variables.
@ -93,7 +79,7 @@ class Optimizer(object):
def minimize(self, loss, parameter_list):
"""Add operations to minimize `loss` by updating `parameter_list`.
This method combines interface `create_backward_pass()` and
This method combines interface `append_backward_ops()` and
`create_optimization_pass()` into one.
"""
params_grads = self.create_backward_pass(loss, parameter_list)

@ -1,7 +1,7 @@
# Regularization in PaddlePaddle
## Introduction to Regularization
A central problem in machine learning is how to design an algorithm that will perform well not just on the training data, but also on new data. Many strategies are used by machine learning practitioners to reduce the test error, possibly at the expense of increased training error. These strategies are collectively known as **regularization**.
A central problem in machine learning is how to design an algorithm that will perform well not just on the training data, but also on new data. A frequently faced problem is the problem of **overfitting**, where the model does not make reliable predictions on new unseen data. **Regularization** is the process of introducing additional information in order to prevent overfitting. This is usually done by adding extra penalties to the loss function that restricts the parameter spaces that an optimization algorithm can explore.
### Parameter Norm Penalties
Most common regularization approaches in deep learning are based on limiting the capacity of the models by adding a parameter norm penalty to the objective function `J`. This is given as follows:
@ -18,52 +18,21 @@ The most commonly used norm penalties are the L2 norm penalty and the L1 norm pe
##### L1 Regularization
<img src="./images/l1_regularization.png" align="center"/><br/>
A much more detailed mathematical background of reguilarization can be found [here](http://www.deeplearningbook.org/contents/regularization.html).
A much more detailed mathematical background of regularization can be found [here](http://www.deeplearningbook.org/contents/regularization.html).
## Regularization Survey
## How to do Regularization in PaddlePaddle
On surveying existing frameworks like Tensorflow, PyTorch, Caffe, etc, it can be seen that there are 2 common approaches of doing regularization:
1. Making regularization a part of the optimizer using an attribute like `weight_decay` that is used to control the scale of the L2 Penalty. This approach is used in PyTorch as follows:
```python
opt = torch.optim.SGD(params, lr=0.2, weight_decay=0.2)
```
At every optimization step, this code will add the gradient of the L2 Norm of the params to the gradient of the params with respect to the loss function. This can seen in the following code snippet:
```python
if weight_decay != 0:
d_p.add_(weight_decay, p.data)
```
This is a very restyrictive way of doing regularization and does not give the users enough flexibility.
**Advantages**:
- It is easy to implement for us.
- Faster execution of backward. However, it can be done manually by advanced users too.
**Disadvantages**:
- Not flexible for other regularizations such as L1/L0 regularization.
- Does not allow for different regularization coefficient for different parameters. For example, in most models, ony the weight matrices are regularized and the bias vectors are unregularized.
- Tightly coupled optimizer and regularization implementation.
2. Adding regularization ops to the graph through Python API. This approach is used by Tensorflow and Caffe. Using this approach, we manually add regularization ops to the graph and then add the regularization loss to the final loss function before sending them to the optimizer.
**Advantages**:
- Allows for greater flexibility to the users of Paddle. Using this approach, the users can put different regularization to different parameters and also choose parameters that are not a part of regularization.
- Makes it easy for the users to customize and extend the framework.
**Disadvantages**:
- Implementation requires comprehensive design and time.
A detailed survey of regularization in various deep learning frameworks can be found [here](https://github.com/PaddlePaddle/Paddle/wiki/Regularization-Survey).
## Proposal for Regularization in PaddlePaddle
### Low-Level implementation
In the new design, we propose to create new operations for regularization. For now, we can add 2 ops thgat correspond to the most frequently used regularizations:
In the new design, we propose to create new operations for regularization. For now, we can add 2 ops that correspond to the most frequently used regularizations:
- L2_regularization_op
- L1_regularization_op
These ops can be like any other ops with their own CPU/GPU implementations either using Eigen or separate Cpu and GPU kernels. As the initial implementation, we can implement their kernels using Eigen following the abstraction pattern implemented for [Activation Ops](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/operators/accuracy_op.h). This abstraction pattern can make it very easy to implement new regularization schemes. other than L1 and L2 norm penalties.
These ops can be like any other ops with their own CPU/GPU implementations either using Eigen or separate CPU and GPU kernels. As the initial implementation, we can implement their kernels using Eigen following the abstraction pattern implemented for [Activation Ops](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/operators/accuracy_op.h). This abstraction pattern can make it very easy to implement new regularization schemes other than L1 and L2 norm penalties.
The idea of building ops for regularization is in sync with the refactored Paddle philosophy of using operators to represent any computation unit. The way these ops will be added to the computation graph, will be decided by the [layer functions](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/python_api.md#layer-function) in Python API.
@ -94,7 +63,7 @@ Since we want to create the regularization ops in a lazy manner, the regularizat
#### High-level API
In PaddlePaddle Python API, users will primarily rely on [layer functions](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/python_api.md#layer-function) to create neural network layers. Hence, we lso need to provide regularization functionality in layer functions. The design of these APIs can be postponed for later right now. A good reference for these APIs can be found in [Keras](https://keras.io/regularizers/) and also by looking at Tensorflow in [`tf.contrib.layers`](https://www.tensorflow.org/api_guides/python/contrib.layers).
In PaddlePaddle Python API, users will primarily rely on [layer functions](https://github.com/PaddlePaddle/Paddle/blob/develop/doc/design/python_api.md#layer-function) to create neural network layers. Hence, we also need to provide regularization functionality in layer functions. The design of these APIs can be postponed for later right now. A good reference for these APIs can be found in [Keras](https://keras.io/regularizers/) and also by looking at Tensorflow in [`tf.contrib.layers`](https://www.tensorflow.org/api_guides/python/contrib.layers).

@ -25,9 +25,8 @@ import (
"strings"
"time"
log "github.com/inconshreveable/log15"
"github.com/namsral/flag"
log "github.com/sirupsen/logrus"
"github.com/topicai/candy"
"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
@ -41,16 +40,20 @@ func main() {
taskTimeoutMax := flag.Int("task-timeout-max", 3, "max timtout count for each task before it being declared failed task.")
chunkPerTask := flag.Int("chunk-per-task", 10, "chunk per task.")
logLevel := flag.String("log-level", "info",
"log level, possible values: debug, info, warning, error, fatal, panic")
"log level, possible values: debug, info, warn, error, crit")
flag.Parse()
level, e := log.ParseLevel(*logLevel)
candy.Must(e)
lvl, err := log.LvlFromString(*logLevel)
if err != nil {
panic(err)
}
log.SetLevel(level)
log.Root().SetHandler(
log.LvlFilterHandler(lvl, log.CallerStackHandler("%+v", log.StderrHandler)),
)
if *endpoints == "" {
log.Warningln("-endpoints not set, fault tolerance not be enabled.")
log.Warn("-endpoints not set, fault tolerance not be enabled.")
}
var store master.Store
@ -58,23 +61,25 @@ func main() {
eps := strings.Split(*endpoints, ",")
ip, err := networkhelper.GetExternalIP()
if err != nil {
log.Fatal(err)
log.Crit("get external ip error", log.Ctx{"error": err})
panic(err)
}
addr := fmt.Sprintf("%s:%d", ip, *port)
store, err = master.NewEtcdClient(eps, addr, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec)
if err != nil {
log.Fatal(err)
log.Crit("error creating etcd client.", log.Ctx{"error": err})
panic(err)
}
} else {
store = &master.InMemStore{}
}
shutdown := func() {
log.Infoln("shutting down gracefully")
log.Info("shutting down gracefully")
err := store.Shutdown()
if err != nil {
log.Errorln(err)
log.Error("shutdown error", log.Ctx{"error": err})
}
}
@ -86,24 +91,28 @@ func main() {
s, err := master.NewService(store, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
if err != nil {
log.Fatal(err)
log.Crit("error creating new service.", log.Ctx{"error": err})
panic(err)
}
err = rpc.Register(s)
if err != nil {
log.Fatal(err)
log.Crit("error registering to etcd.", log.Ctx{"error": err})
panic(err)
}
rpc.HandleHTTP()
l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
if err != nil {
log.Fatal(err)
log.Crit("error listing to port", log.Ctx{"error": err, "port": *port})
panic(err)
}
go func() {
err = http.Serve(l, nil)
if err != nil {
log.Fatal(err)
log.Crit("error serving HTTP", log.Ctx{"error": err})
panic(err)
}
}()

@ -27,11 +27,11 @@ import (
"github.com/topicai/candy"
"github.com/PaddlePaddle/Paddle/go/pserver"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
func main() {
port := flag.Int("port", 0, "port of the pserver")
port := flag.Int("port", 8001, "port of the pserver")
index := flag.Int("index", -1, "index of the pserver, set to -1 if use etcd for auto pserver index registry")
etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
"comma separated endpoint string for pserver to connect to etcd")
@ -41,13 +41,17 @@ func main() {
checkpointPath := flag.String("checkpoint-path", "/checkpoints/", "save checkpoint path")
checkpointInterval := flag.Duration("checkpoint-interval", 600*time.Second, "save checkpoint per interval seconds")
logLevel := flag.String("log-level", "info",
"log level, possible values: debug, info, warning, error, fatal, panic")
"log level, possible values: debug, info, warn, error, crit")
flag.Parse()
level, err := log.ParseLevel(*logLevel)
candy.Must(err)
lvl, err := log.LvlFromString(*logLevel)
if err != nil {
panic(err)
}
log.SetLevel(level)
log.Root().SetHandler(
log.LvlFilterHandler(lvl, log.CallerStackHandler("%+v", log.StderrHandler)),
)
var idx int
@ -63,7 +67,7 @@ func main() {
cp, err = pserver.LoadCheckpoint(e, idx)
if err != nil {
if err == pserver.ErrCheckpointNotFound {
log.Infof("Could not find the pserver checkpoint.")
log.Info("load checkpoint error", "error", err)
} else {
panic(err)
}
@ -71,10 +75,10 @@ func main() {
}
shutdown := func() {
log.Infoln("shutting down gracefully")
log.Info("shutting down gracefully")
sErr := e.Shutdown()
if sErr != nil {
log.Errorln(sErr)
log.Error("error shutting down", log.Ctx{"error": sErr})
}
}
@ -95,7 +99,7 @@ func main() {
candy.Must(err)
go func() {
log.Infof("start pserver at port %d", *port)
log.Info("serving pserver", log.Ctx{"port": *port})
err = http.Serve(l, nil)
candy.Must(err)
}()

16
go/glide.lock generated

@ -1,5 +1,5 @@
hash: 328e7b9b7306b45e7b9879139a9f86698115981f6283032e1312093a6a6ddb04
updated: 2017-10-16T08:00:23.484693528Z
hash: 51d9e2e46d7fd9173ff11ecada40f7b7728756be18d5e2f032535f66465e6e15
updated: 2017-10-24T15:04:09.987751592-07:00
imports:
- name: github.com/alecthomas/gometalinter
version: bae2f1293d092fd8167939d5108d1b025eaef9de
@ -99,6 +99,8 @@ imports:
version: d2709f9f1f31ebcda9651b03077758c1f3a0018c
- name: github.com/ghodss/yaml
version: 0ca9ea5df5451ffdf184b4428c902747c2c11cd7
- name: github.com/go-stack/stack
version: 817915b46b97fd7bb80e8ab6b69f01a53ac3eebf
- name: github.com/gogo/protobuf
version: 909568be09de550ed094403c2bf8a261b5bb730a
subpackages:
@ -120,8 +122,14 @@ imports:
- runtime
- runtime/internal
- utilities
- name: github.com/inconshreveable/log15
version: 0decfc6c20d9ca0ad143b0e89dcaa20f810b4fb3
- name: github.com/jonboulle/clockwork
version: 2eee05ed794112d45db504eb05aa693efd2b8b09
- name: github.com/mattn/go-colorable
version: 5411d3eea5978e6cdc258b30de592b60df6aba96
- name: github.com/mattn/go-isatty
version: 57fdcb988a5c543893cc61bce354a6e24ab70022
- name: github.com/matttproud/golang_protobuf_extensions
version: c12348ce28de40eed0136aa2b644d0ee0650e56c
subpackages:
@ -179,11 +187,12 @@ imports:
- lex/httplex
- trace
- name: golang.org/x/sys
version: 0f826bdd13b500be0f1d4004938ad978fcc6031e
version: e48874b42435b4347fc52bdee0424a52abc974d7
repo: https://github.com/golang/sys.git
vcs: git
subpackages:
- unix
- windows
- name: golang.org/x/text
version: 836efe42bb4aa16aaa17b9c155d8813d336ed720
repo: https://github.com/golang/text.git
@ -222,4 +231,3 @@ testImports:
version: 05e8a0eda380579888eb53c394909df027f06991
subpackages:
- assert

@ -26,3 +26,7 @@ import:
version: v1.1.0
- package: github.com/alecthomas/gometalinter
version: v1.2.1
- package: github.com/inconshreveable/log15
version: v2.13
- package: github.com/go-stack/stack
version: v1.6.0

@ -35,13 +35,19 @@ import (
"unsafe"
"github.com/PaddlePaddle/Paddle/go/master"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
var mu sync.Mutex
var handleMap = make(map[C.paddle_master_client]*master.Client)
var curHandle C.paddle_master_client
func init() {
log.Root().SetHandler(
log.LvlFilterHandler(log.LvlWarn, log.CallerStackHandler("%+v", log.StderrHandler)),
)
}
func add(c *master.Client) C.paddle_master_client {
mu.Lock()
defer mu.Unlock()
@ -117,7 +123,8 @@ func paddle_set_dataset(client C.paddle_master_client, path **C.char, size C.int
}
err := c.SetDataset(paths)
if err != nil {
log.Errorln(err)
log.Error("error set dataset",
log.Ctx{"error": err, "paths": paths})
return C.PADDLE_MASTER_ERROR
}
@ -167,7 +174,7 @@ func paddle_request_save_model(client C.paddle_master_client, trainerID string,
c := get(client)
need, err := c.RequestSaveModel(trainerID, time.Duration(blockMS)*time.Millisecond)
if err != nil {
log.Errorln(err)
log.Error("error request save model", log.Ctx{"error": err})
return C.PADDLE_MASTER_ERROR
}

@ -21,7 +21,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/recordio"
"github.com/coreos/etcd/clientv3"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
// Client is the client of the master server.
@ -75,7 +75,7 @@ func WithEtcd(endpoints []string, timeout time.Duration) func(*Client) error {
for {
err := f()
if err != nil {
log.Warningln(err)
log.Warn("create etcd client error", log.Ctx{"error": err})
} else {
break
}
@ -121,6 +121,7 @@ func (c *Client) StartGetRecords(passID int) {
}
func (c *Client) getRecords(passID int) {
i := 0
for {
t, err := c.getTask(passID)
if err != nil {
@ -130,18 +131,26 @@ func (c *Client) getRecords(passID int) {
c.ch <- record{nil, err}
break
}
if err.Error() == ErrPassAfter.Error() {
// wait util last pass finishes
time.Sleep(time.Second * 3)
continue
if i%60 == 0 {
log.Debug("getTask of passID error.",
log.Ctx{"error": err, "passID": passID})
i = 0
}
log.Errorf("getTask error: %s", err)
// if err.Error() == ErrPassAfter.Error()
// wait util last pass finishes
// if other error such as network error
// wait to reconnect or task time out
time.Sleep(time.Second * 3)
i += 3
continue
}
for _, chunk := range t.Chunks {
f, e := os.Open(chunk.Path)
if e != nil {
log.Errorln(e)
log.Error("error open chunk", log.Ctx{"error": e})
continue
}
@ -152,12 +161,15 @@ func (c *Client) getRecords(passID int) {
if s.Err() != nil {
c.ch <- record{nil, s.Err()}
log.Errorln(err, chunk.Path)
log.Error(
"error scan chunk",
log.Ctx{"error": err, "path": chunk.Path},
)
}
err = f.Close()
if err != nil {
log.Errorln(err)
log.Error("error close record file", log.Ctx{"error": err})
}
}
@ -166,7 +178,7 @@ func (c *Client) getRecords(passID int) {
// correct, but a reasonable approximation.
err = c.taskFinished(t.Meta.ID)
if err != nil {
log.Errorln(err)
log.Error("task finish callback error.", log.Ctx{"error": err})
}
}
}
@ -179,12 +191,12 @@ func (c *Client) monitorMaster(addrCh <-chan string) {
if curMaster == "" {
err := c.conn.Close()
if err != nil {
log.Errorln(err)
log.Error("close old master addr error", log.Ctx{"error": err})
}
} else {
err := c.conn.Connect(curMaster)
if err != nil {
log.Errorln(err)
log.Error("connect to new master addr error", log.Ctx{"error": err})
// connect to addr failed, set
// to last known addr in order

@ -25,8 +25,6 @@ import (
"testing"
"time"
log "github.com/sirupsen/logrus"
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/recordio"
)
@ -36,10 +34,6 @@ const (
chunkPerTask = 10
)
func init() {
log.SetLevel(log.ErrorLevel)
}
func TestGetFinishTask(t *testing.T) {
const path = "/tmp/master_client_test_0"

@ -117,6 +117,7 @@ func TestNextRecord(t *testing.T) {
if e != nil {
panic(e)
}
// test for n passes
for pass := 0; pass < 10; pass++ {
c.StartGetRecords(pass)

@ -20,7 +20,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
const (
@ -44,7 +44,7 @@ type EtcdClient struct {
// NewEtcdClient creates a new EtcdClient.
func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) {
log.Debugf("Connecting to etcd at %v", endpoints)
log.Debug("Connecting to etcd", log.Ctx{"endpoint": endpoints})
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
@ -64,12 +64,12 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat
// one master running, but split-brain problem may cause
// multiple master servers running), and the cluster management
// software will kill one of them.
log.Infof("Trying to acquire lock at %s.", lockPath)
log.Info("Trying to acquire lock.", log.Ctx{"path": lockPath})
err = lock.Lock(context.TODO())
if err != nil {
return nil, err
}
log.Infof("Successfully acquired lock at %s.", lockPath)
log.Info("Successfully acquired lock at %s.", log.Ctx{"path": lockPath})
put := clientv3.OpPut(addrPath, addr)
resp, err := cli.Txn(context.Background()).If(lock.IsOwner()).Then(put).Commit()
@ -78,7 +78,8 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat
}
if !resp.Succeeded {
log.Fatal("No longer owns the master lock. Exiting.")
log.Crit("No longer owns the master lock. Exiting.")
panic("No longer owns the master lock. Exiting.")
}
e := &EtcdClient{
@ -102,7 +103,7 @@ func (e *EtcdClient) Save(state []byte) error {
}
if !resp.Succeeded {
log.Errorln("No longer owns the lock, trying to lock again")
log.Error("No longer owns the lock, trying to lock again")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := e.lock.Lock(ctx)
cancel()
@ -116,9 +117,10 @@ func (e *EtcdClient) Save(state []byte) error {
// to kill current master server. The current
// state is not saved, but the trainer's RPC
// call will fail, so the trainer will retry.
log.Fatalf("Could not acquire the lock at %s: %v. Exiting.", e.lockPath, err)
log.Crit("Could not acquire the lock at %s: %v. Exiting.", log.Ctx{"path": e.lockPath, "error": err})
panic("Could not acquire the lock at %s: %v. Exiting.")
}
log.Infof("Successfully acquired lock at %s.", e.lockPath)
log.Info("Successfully acquired lock at %s.", e.lockPath)
return e.Save(state)
}
@ -136,7 +138,7 @@ func (e *EtcdClient) Load() ([]byte, error) {
}
if !resp.Succeeded {
log.Errorln("No longer owns the lock, trying to lock and load again.")
log.Error("No longer owns the lock, trying to lock and load again.")
err = e.lock.Lock(context.Background())
if err != nil {
return nil, err
@ -163,7 +165,7 @@ func (e *EtcdClient) Shutdown() error {
if err == nil {
err = newErr
} else {
log.Errorln(newErr)
log.Error("shutdown error", log.Ctx{"error": newErr})
}
}
@ -192,7 +194,7 @@ func watchKey(c *clientv3.Client, key string, valChan chan<- string) {
for wresp := range rch {
for _, ev := range wresp.Events {
// if received event is DELETE, the value will be an empty string
log.Infof("received event %s, %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
log.Info("received event.", log.Ctx{"type": ev.Type, "key": ev.Kv.Key, "value": ev.Kv.Value})
valChan <- string(ev.Kv.Value)
}
}

@ -25,7 +25,7 @@ import (
"sync"
"time"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
"github.com/PaddlePaddle/recordio"
)
@ -170,11 +170,11 @@ func (s *Service) recover() (bool, error) {
}
if state == nil {
log.Infoln("No state exists, not recovered.")
log.Info("No state exists, not recovered.")
return false, nil
}
log.Infof("Loaded snapshot of size: %d bytes.", len(state))
log.Info("Loaded snapshot.", log.Ctx{"size": len(state)})
gr, err := gzip.NewReader(bytes.NewReader(state))
if err != nil {
return false, err
@ -191,11 +191,11 @@ func (s *Service) recover() (bool, error) {
if err != nil {
// Only close failed, recover actually succeed, so
// just log error.
log.Errorln(err)
log.Error("error close recover file.", log.Ctx{"error": err})
}
s.state = tqs
log.WithFields(s.logFields()).Infof("Master recovered from snapshot, scheduling pending task timeout check.")
log.Info("Master recovered from snapshot, scheduling pending task timeout check.", s.logCtx())
for _, t := range s.state.Pending {
time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.Meta.ID, t.Task.Meta.Epoch))
}
@ -224,7 +224,7 @@ func (s *Service) snapshot() error {
}
state := buf.Bytes()
log.Infof("Saving snapshot of size: %d bytes.", len(state))
log.Info("Saving snapshot.", log.Ctx{"size bytes": len(state)})
return s.store.Save(state)
}
@ -260,7 +260,7 @@ func readChunks(globPaths []string) ([]Chunk, error) {
}
count := index.NumChunks()
log.Infof("readChunks: file %s has %d chunks", path, count)
log.Info("reading chunks.", log.Ctx{"path": path, "num chunks": count})
for i := 0; i < count; i++ {
chunk := Chunk{
Path: path,
@ -300,7 +300,7 @@ func (s *Service) SetDataset(globPaths []string, _ *int) error {
err = s.snapshot()
if err != nil {
log.Errorln(err)
log.Error("snapshot error", log.Ctx{"error": err})
return err
}
close(s.ready)
@ -320,7 +320,7 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) {
defer func() {
err := s.snapshot()
if err != nil {
log.Errorln(err)
log.Error("snapshot error", log.Ctx{"error": err})
}
}()
@ -328,12 +328,12 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) {
t.NumFailure++
if t.NumFailure > s.failureMax {
log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure)
log.Warn("Task failed to many times, discard.", log.Ctx{"task": t.Task, "num failed": t.NumFailure})
s.state.Failed = append(s.state.Failed, t)
return
}
log.Warningf("Task %v failed %d times, re-dispatch.", t.Task, t.NumFailure)
log.Warn("Task failed, re-dispatch.", log.Ctx{"task": t.Task, "num failed": t.NumFailure})
s.state.Todo = append(s.state.Todo, t)
return
}
@ -353,8 +353,8 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
}
// must be called with lock held.
func (s *Service) logFields() log.Fields {
return log.Fields{
func (s *Service) logCtx() log.Ctx {
return log.Ctx{
"todoLen": len(s.state.Todo),
"pendingLen": len(s.state.Pending),
"doneLen": len(s.state.Done),
@ -383,10 +383,10 @@ func (s *Service) GetTask(passID int, task *Task) error {
if len(s.state.Todo) == 0 {
if len(s.state.Done) == 0 && len(s.state.Pending) == 0 {
log.WithFields(s.logFields()).Warningln("All tasks failed, may start next pass")
log.Warn("All tasks failed, may start next pass", s.logCtx())
return ErrAllTaskFailed
}
log.WithFields(s.logFields()).Warningln("No more available task.")
log.Warn("No more available task.", s.logCtx())
return ErrNoMoreAvailable
}
@ -400,8 +400,9 @@ func (s *Service) GetTask(passID int, task *Task) error {
}
*task = t.Task
log.WithFields(s.logFields()).Infof("Task #%v dispatched.", t.Task.Meta)
ctx := s.logCtx()
ctx["task meta"] = t.Task.Meta
log.Info("Task dispatched.", ctx)
time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.Meta.ID, t.Task.Meta.Epoch))
return nil
}
@ -417,7 +418,9 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
t, ok := s.state.Pending[taskID]
if !ok {
log.WithFields(s.logFields()).Warningln("Pending task #%d not found.", taskID)
ctx := s.logCtx()
ctx["task id"] = taskID
log.Warn("Pending task not found.", ctx)
return nil
}
@ -426,7 +429,9 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
s.state.Done = append(s.state.Done, t)
delete(s.state.Pending, taskID)
log.WithFields(s.logFields()).Infof("Task #%d finished.", taskID)
ctx := s.logCtx()
ctx["task id"] = taskID
log.Info("Task finished.", ctx)
if len(s.state.Todo) == 0 && len(s.state.Pending) == 0 {
// increase master side pass count if all tasks finished
s.state.CurPass++
@ -434,12 +439,14 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
s.state.Done = []taskEntry{}
// TODO(typhoonzero): deal with failed tasks
s.state.Failed = []taskEntry{}
log.WithFields(s.logFields()).Warningf("all task finished, add new pass data, newpass: %d.", s.state.CurPass)
ctx := s.logCtx()
ctx["new pass"] = s.state.CurPass
log.Warn("all task finished, add new pass data.", ctx)
}
err := s.snapshot()
if err != nil {
log.Errorln(err)
log.Error("snapshot error", log.Ctx{"error": err})
}
return err
}
@ -455,7 +462,7 @@ func (s *Service) TaskFailed(meta TaskMeta, dummy *int) error {
t, ok := s.state.Pending[meta.ID]
if !ok {
log.WithFields(s.logFields()).Warningln("TaskFailed:Pending task #%v not found.", t.Task.Meta)
log.Warn("TaskFailed:Pending task not found.", log.Ctx{"task": t.Task.Meta})
return nil
}

@ -45,9 +45,15 @@ import (
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/PaddlePaddle/Paddle/go/pserver/client"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
func init() {
log.Root().SetHandler(
log.LvlFilterHandler(log.LvlWarn, log.CallerStackHandler("%+v", log.StderrHandler)),
)
}
var mu sync.Mutex
var handleMap = make(map[C.paddle_pserver_client]*client.Client)
var curHandle C.paddle_pserver_client
@ -164,10 +170,13 @@ func paddle_init_param(client C.paddle_pserver_client, param C.paddle_parameter,
if err != nil {
if err.Error() == pserver.AlreadyInitialized {
log.Warningf("parameter %s already initialized, treat paddle_init_param as successful.", name)
log.Warn(
"parameter already initialized, treat paddle_init_param as successful.",
log.Ctx{"parameter": name},
)
return C.PSERVER_OK
}
log.Errorln(err)
log.Error("error init param", log.Ctx{"error": err})
return C.PSERVER_ERROR
}
@ -180,11 +189,11 @@ func paddle_finish_init_params(client C.paddle_pserver_client) C.int {
err := c.FinishInitParams()
if err != nil {
if err.Error() == pserver.AlreadyInitialized {
log.Warningln("parameters already initialized, treat paddle_finish_init_params as successful.")
log.Warn("parameters already initialized, treat paddle_finish_init_params as successful.")
return C.PSERVER_OK
}
log.Errorln(err)
log.Error("error finish init params", log.Ctx{"error": err})
return C.PSERVER_ERROR
}
@ -205,7 +214,7 @@ func paddle_send_grads(client C.paddle_pserver_client, grads **C.paddle_gradient
c := get(client)
err := c.SendGrads(gs)
if err != nil {
log.Errorln(err)
log.Error("error send grads", log.Ctx{"error": err})
return C.PSERVER_ERROR
}
@ -222,7 +231,7 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
c := get(client)
ps, err := c.GetParams(ns)
if err != nil {
log.Errorln(err)
log.Error("error get params", log.Ctx{"error": err})
return C.PSERVER_ERROR
}
@ -231,7 +240,13 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
for i, p := range ps {
pn[i] = p.Name
}
log.Errorf("pserver returned wrong number of parameters. Requested: %s, returned: %s.", strings.Join(pn, ", "), strings.Join(ns, ", "))
log.Error(
"pserver returned wrong number of parameters.",
log.Ctx{
"Requested": strings.Join(pn, ", "),
"Returned": strings.Join(ns, ", "),
},
)
return C.PSERVER_ERROR
}
@ -241,7 +256,13 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
for i, p := range ps {
pn[i] = p.Name
}
log.Errorf("pserver returned wrong parameters, or not in requested order. Requested: %s, returned: %s.", strings.Join(pn, ", "), strings.Join(ns, ", "))
log.Error(
"pserver returned wrong parameters, or not in requested order.",
log.Ctx{
"Requested": strings.Join(pn, ", "),
"Returned": strings.Join(ns, ", "),
},
)
return C.PSERVER_ERROR
}
}
@ -251,13 +272,19 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
param := *(**C.paddle_parameter)(unsafe.Pointer((uintptr(unsafe.Pointer(dst)) + uintptr(i)*unsafe.Sizeof(*dst))))
if unsafe.Pointer(param) == nil {
log.Errorln("must pre-allocate parameter.")
log.Error("must pre-allocate parameter.")
return C.PSERVER_ERROR
}
if unsafe.Pointer(param.content) != nil {
if int(param.content_len) != len(p.Content) {
log.Errorf("the pre-allocated content len does not match parameter content len. Pre-allocated len: %d, returned len: %d", param.content_len, len(p.Content))
log.Error(
"the pre-allocated content len does not match parameter content len.",
log.Ctx{
"Pre-allocated len": param.content_len,
"Returned len": len(p.Content),
},
)
return C.PSERVER_ERROR
}
}

@ -22,7 +22,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/Paddle/go/pserver"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
// TODO(helin): add RPC call retry logic
@ -84,7 +84,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) {
if curServers[i].Addr == "" {
err := c.pservers[i].Close()
if err != nil {
log.Errorln(err)
log.Error("error closing connection to pserver", log.Ctx{"error": err})
}
continue
@ -92,7 +92,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) {
err := c.pservers[i].Connect(curServers[i].Addr)
if err != nil {
log.Errorln(err)
log.Error("error connecting to pserver", log.Ctx{"error": err})
// connect to addr failed, set
// to last known addr in order

@ -30,7 +30,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/PaddlePaddle/Paddle/go/pserver/client"
"github.com/coreos/etcd/clientv3"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
const (
@ -90,7 +90,7 @@ func initEtcdClient() {
DialTimeout: time.Second * time.Duration(1),
})
if err != nil {
log.Errorf("err %v", err)
log.Error("error init etcd client", log.Ctx{"error": err})
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
_, err = client.Delete(ctx, pserver.PsDesired)

@ -25,7 +25,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/pserver"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
const (
@ -54,26 +54,29 @@ func (e *Etcd) Desired() int {
resp, err := e.client.Get(ctx, pserver.PsDesired)
cancel()
if err != nil {
log.Errorf("Get ps dresire number failed! recnnectiong..., %v", err)
log.Error(
"Get ps dresire number failed! reconnecting...",
log.Ctx{"error": err},
)
time.Sleep(e.timeout)
continue
}
kvs := resp.Kvs
if len(kvs) == 0 {
log.Infoln("Waiting for ps desired registered ...")
log.Info("Waiting for ps desired registered ...")
time.Sleep(e.timeout)
continue
}
psDesired, err = strconv.Atoi(string(resp.Kvs[0].Value))
if err != nil {
log.Errorf("psDesired %d invalid %v", psDesired, err)
log.Error("atoi failed", log.Ctx{"error": err})
time.Sleep(e.timeout)
continue
}
log.Debugf("Get psDesired number: %d", psDesired)
log.Debug("Got psDesired", log.Ctx{"psDesired": psDesired})
break
}
return psDesired
@ -88,17 +91,20 @@ func (e *Etcd) List() []Server {
for i := 0; i < psDesired; i++ {
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
psKey := pserver.PsPath + strconv.Itoa(i)
log.Debugf("checking %s", psKey)
log.Debug("looking for pserver", log.Ctx{"ps key": psKey})
resp, err := e.client.Get(ctx, psKey)
cancel()
if err != nil {
log.Infof("Get psKey= %s error, %v", psKey, err)
log.Info(
"Get psKey error",
log.Ctx{"ps key": psKey, "error": err},
)
time.Sleep(e.timeout)
continue
}
kvs := resp.Kvs
if len(kvs) == 0 {
log.Infof("Waiting for ps addr registered ...")
log.Info("Waiting for ps addr registered ...")
time.Sleep(e.timeout)
continue
}
@ -106,11 +112,17 @@ func (e *Etcd) List() []Server {
psAddr := string(resp.Kvs[0].Value)
// TODO(Longfei) check the ps address
if psAddr == "" {
log.Infof("Get psKey = %s, psAddr is empty", psKey)
log.Info(
"Value under psKey is empty",
log.Ctx{"psKey": psKey},
)
time.Sleep(e.timeout)
continue
}
log.Debugf("got value (%s) for key: %s", psAddr, psKey)
log.Debug(
"got psAddr given psKey",
log.Ctx{"psAddr": psAddr, "psKey": psKey},
)
servers[i].Index = i
servers[i].Addr = psAddr
}
@ -130,13 +142,13 @@ func NewEtcd(endpoints string) *Etcd {
DialTimeout: defaultEtcdTimeout,
})
if err != nil {
log.Errorf("Init etcd connection failed: %v", err)
log.Error("Init etcd connection failed", log.Ctx{"error": err})
time.Sleep(defaultEtcdTimeout)
continue
}
break
}
log.Infof("Connected to etcd: %s\n", endpoints)
log.Info("Connected to etcd endpoint", log.Ctx{"endpoint": endpoints})
client := &Etcd{
client: cli,
timeout: defaultEtcdTimeout,
@ -154,7 +166,7 @@ func (e *Etcd) Select() (bool, error) {
}
lock := concurrency.NewMutex(sess, initLockPath)
log.Infof("Trying to acquire lock at %s.", initLockPath)
log.Info("Trying to acquire lock", log.Ctx{"lock path": initLockPath})
// Do not use timeout context here, since we don't know how
// long does it take for other trainers to initialize the
// parameters.
@ -162,7 +174,7 @@ func (e *Etcd) Select() (bool, error) {
if err != nil {
return false, err
}
log.Infof("Successfully acquired lock at %s.", initLockPath)
log.Info("Successfully acquired lock", log.Ctx{"lock path": initLockPath})
get := clientv3.OpGet(initDonePath)
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
@ -181,17 +193,17 @@ func (e *Etcd) Select() (bool, error) {
if len(resp.Kvs) == 0 {
// Key value not set, select current trainer.
e.lock = lock
log.Infoln("Trainer selected.")
log.Info("Trainer selected.")
return true, nil
}
if string(resp.Kvs[0].Value) == initDoneVal {
log.Infoln("Initialization is already done.")
log.Info("Initialization is already done.")
ctx, cancel = context.WithTimeout(context.Background(), e.timeout)
err = lock.Unlock(ctx)
cancel()
if err != nil {
log.Errorln(err)
log.Error("error unlocking", log.Ctx{"error": err})
}
return false, nil
}
@ -221,7 +233,7 @@ func (e *Etcd) Done() error {
err = e.lock.Unlock(ctx)
cancel()
if err != nil {
log.Errorln(err)
log.Error("error unlocking", log.Ctx{"error": err})
} else {
e.lock = nil
}
@ -244,7 +256,7 @@ func (e *Etcd) Close() error {
cErr := e.client.Close()
if cErr != nil {
if err != nil {
log.Errorln(cErr)
log.Error("error closing etcd client", log.Ctx{"error": cErr})
return err
}
return cErr

@ -24,7 +24,7 @@ import (
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
const (
@ -82,19 +82,19 @@ func (e *EtcdClient) Register(port int) (int, error) {
DialTimeout: e.dialTimeout,
})
if err != nil {
log.Errorf("connect to etcd error: %v", err)
log.Error("connect to etcd error", log.Ctx{"error": err})
time.Sleep(retryTimeout)
continue
}
e.client = cli
sess, err := concurrency.NewSession(cli, concurrency.WithTTL(e.ttlSec))
if err != nil {
log.Errorf("create etcd session error: %v", err)
log.Error("create etcd session error", log.Ctx{"error": err})
time.Sleep(retryTimeout)
continue
}
e.sess = sess
log.Debugf("inited client to %s", e.endpoints)
log.Debug("connected to etcd", log.Ctx{"endpoint": e.endpoints})
break
}
// init /ps_desired using transaction, for multiple pservers may want to write
@ -104,7 +104,7 @@ func (e *EtcdClient) Register(port int) (int, error) {
_, err := e.initDesiredPservers(ctx, e.numPservers)
cancel()
if err != nil {
log.Warn(err)
log.Warn("pserver init error", log.Ctx{"error": err, "num pservers": e.numPservers})
time.Sleep(retryTimeout)
continue
}
@ -119,14 +119,17 @@ func (e *EtcdClient) Register(port int) (int, error) {
resp, err := e.client.Get(ctx, PsDesired)
cancel()
if err != nil {
log.Errorf("getting %s error: %v", PsDesired, err)
log.Error("get etcd key error", log.Ctx{"key": PsDesired, "error": err})
time.Sleep(retryTimeout)
continue
}
if len(resp.Kvs) != 0 {
e.desired, err = strconv.Atoi(string(resp.Kvs[0].Value))
if err != nil {
log.Errorf("value of %s invalid %v\n", PsDesired, err)
log.Error(
"psDesired atoi error",
log.Ctx{"error": err, "value": string(resp.Kvs[0].Value)},
)
time.Sleep(retryTimeout)
// NOTE: wait util ps_desired value change
continue
@ -143,7 +146,7 @@ func (e *EtcdClient) Register(port int) (int, error) {
pserverIdx, err = e.registerPserverEtcd(ctx, port)
cancel()
if err != nil {
log.Warn(err)
log.Warn("register pserver on etcd error", log.Ctx{"error": err})
time.Sleep(retryTimeout)
continue
}
@ -170,16 +173,17 @@ func (e *EtcdClient) registerPserverEtcd(ctx context.Context, port int) (int, er
registered := false
for i := 0; i < e.desired; i++ {
psKey := PsPath + strconv.Itoa(i)
log.Debugf("checking %s", psKey)
ps := c.Get(psKey)
log.Debugf("got value (%s) for key: %s", ps, psKey)
log.Debug(
"register pserver got value",
log.Ctx{"value": ps, "key": psKey},
)
if ps == "" {
// find the first id and write info
pserverAddr := e.externalIP + ":" + strconv.Itoa(port)
c.Put(psKey, pserverAddr, clientv3.WithLease(e.sess.Lease()))
log.Debugf("set pserver node %s with value %s", psKey, pserverAddr)
log.Debug("register finished")
log.Debug("register finished", log.Ctx{"key": psKey, "value": pserverAddr})
idx = i
registered = true
break
@ -239,7 +243,7 @@ func (e *EtcdClient) Shutdown() error {
newErr := e.client.Close()
if newErr != nil {
if err != nil {
log.Errorln(newErr)
log.Error("shutdown error", log.Ctx{"error": newErr})
} else {
err = newErr
}

@ -25,7 +25,7 @@ import (
"fmt"
"unsafe"
log "github.com/sirupsen/logrus"
log "github.com/inconshreveable/log15"
)
type optimizer struct {
@ -56,12 +56,12 @@ func newOptimizer(paramWithConfigs ParameterWithConfig, State []byte) *optimizer
c := paramWithConfigs.Config
s := State
paramBufferSize := C.size_t(len(p.Content))
log.WithFields(log.Fields{
log.Info("New Optimizer Created with config", log.Ctx{
"ElementType": p.ElementType,
"ParamSize": paramBufferSize,
"ConfigSize": len(c),
"StateSize": len(s),
}).Info("New Optimizer Created with config:")
})
var cbuffer unsafe.Pointer
cbuffer = C.malloc(paramBufferSize)
@ -71,22 +71,41 @@ func newOptimizer(paramWithConfigs ParameterWithConfig, State []byte) *optimizer
cstate = unsafe.Pointer(&s[0])
}
var cptr (*C.uchar)
if len(c) > 0 {
cptr = (*C.uchar)(&c[0])
} else {
log.Error("empty config", "param name", paramWithConfigs.Param.Name)
}
o.config = c
o.opt = C.paddle_create_optimizer((*C.uchar)(&c[0]), C.int(len(c)),
C.paddle_element_type(p.ElementType), cbuffer, C.int(paramBufferSize), (*C.char)(cstate), C.int(len(s)))
o.opt = C.paddle_create_optimizer(
cptr,
C.int(len(c)),
C.paddle_element_type(p.ElementType),
cbuffer,
C.int(paramBufferSize),
(*C.char)(cstate),
C.int(len(s)),
)
return o
}
func (o *optimizer) GetWeights() []byte {
var buffer unsafe.Pointer
// we do not own the buffer, no need to free later.
bufferLen := C.paddle_optimizer_get_weights(o.opt, &buffer)
return cArrayToSlice(buffer, int(bufferLen)*C.sizeof_float)
}
func (o *optimizer) GetStates() []byte {
var cbuffer *C.char
// we owns the state buffer, need to free later.
cbufferLen := C.paddle_optimizer_get_state(o.opt, &cbuffer)
return cArrayToSlice(unsafe.Pointer(cbuffer), int(cbufferLen))
buf := cArrayToSlice(unsafe.Pointer(cbuffer), int(cbufferLen))
cpy := make([]byte, len(buf))
copy(cpy, buf)
C.free(unsafe.Pointer(cbuffer))
return cpy
}
func (o *optimizer) UpdateParameter(g Gradient) error {

@ -15,8 +15,12 @@
package pserver
import (
"encoding/binary"
"io/ioutil"
"math"
"testing"
"github.com/stretchr/testify/assert"
)
func TestOptimizerCreateRelease(t *testing.T) {
@ -36,3 +40,39 @@ func TestOptimizerCreateRelease(t *testing.T) {
o := newOptimizer(param, nil)
o.Cleanup()
}
func float32Bytes(float float32) []byte {
bits := math.Float32bits(float)
bytes := make([]byte, 4)
binary.LittleEndian.PutUint32(bytes, bits)
return bytes
}
func TestOptimizerState(t *testing.T) {
p := Parameter{
Name: "a",
ElementType: Int32,
}
weights := float32Bytes(100)
p.Content = weights
config, err := ioutil.ReadFile("./client/c/test/testdata/optimizer.pb")
if err != nil {
t.Fatalf("read optimizer proto failed")
}
param := ParameterWithConfig{
Param: p,
Config: config,
}
o := newOptimizer(param, nil)
s := o.GetStates()
// clear param content and check if the state is restored.
param.Param.Content = float32Bytes(300)
o1 := newOptimizer(param, s)
s1 := o1.GetStates()
assert.Equal(t, s, s1)
assert.Equal(t, weights, o.GetWeights())
assert.Equal(t, weights, o1.GetWeights())
o.Cleanup()
o1.Cleanup()
}

File diff suppressed because it is too large Load Diff

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save