diff --git a/cmake/external/eigen.cmake b/cmake/external/eigen.cmake index bd853d921b..96fc886a34 100644 --- a/cmake/external/eigen.cmake +++ b/cmake/external/eigen.cmake @@ -8,7 +8,7 @@ ExternalProject_Add( extern_eigen3 ${EXTERNAL_PROJECT_LOG_ARGS} GIT_REPOSITORY "https://github.com/RLovelett/eigen.git" - GIT_TAG 4e79cb69b9425f5f8c3a84be4350d4ab75b5fd9d + GIT_TAG 70661066beef694cadf6c304d0d07e0758825c10 PREFIX ${EIGEN_SOURCE_DIR} UPDATE_COMMAND "" CONFIGURE_COMMAND "" diff --git a/cmake/external/nccl.cmake b/cmake/external/nccl.cmake index dfbbed58c9..57d2c0a352 100644 --- a/cmake/external/nccl.cmake +++ b/cmake/external/nccl.cmake @@ -1,9 +1,8 @@ -INCLUDE(ExternalProject) +include(ExternalProject) -SET(NCCL_SOURCE_DIR ${THIRD_PARTY_PATH}/nccl) - -INCLUDE_DIRECTORIES(${NCCL_SOURCE_DIR}/src/extern_nccl/src) +set(NCCL_SOURCE_DIR ${THIRD_PARTY_PATH}/nccl) +include_directories(${NCCL_SOURCE_DIR}/src/extern_nccl/src) if(WITH_DSO) # If we use DSO, we do not build nccl, just download the dependencies @@ -12,39 +11,39 @@ if(WITH_DSO) set(NCCL_INSTALL_DIR "") else() # otherwise, we build nccl and link it. + set(NCCL_INSTALL_DIR ${THIRD_PARTY_PATH}/install/nccl) + # Note: cuda 8.0 is needed to make nccl + # When cuda is not installed on the system directory, need to set CUDA_HOME to your cuda root set(NCCL_BUILD_COMMAND "make -j 8") - set(NCCL_INSTALL_COMMAND "make install") - SET(NCCL_INSTALL_DIR ${THIRD_PARTY_PATH}/install/nccl) + set(NCCL_INSTALL_COMMAND "make install PREFIX=${NCCL_INSTALL_DIR}") endif() ExternalProject_Add( - extern_nccl - ${EXTERNAL_PROJECT_LOG_ARGS} - GIT_REPOSITORY "https://github.com/NVIDIA/nccl.git" - GIT_TAG "v1.3.4-1" - PREFIX "${NCCL_SOURCE_DIR}" - UPDATE_COMMAND "" - CONFIGURE_COMMAND "" - BUILD_COMMAND "${NCCL_BUILD_COMMAND}" - INSTALL_COMMAND "${NCCL_INSTALL_COMMAND}" - INSTALL_DIR "${NCCL_INSTALL_DIR}" - TEST_COMMAND "" + extern_nccl + ${EXTERNAL_PROJECT_LOG_ARGS} + GIT_REPOSITORY "https://github.com/NVIDIA/nccl.git" + GIT_TAG "v1.3.4-1" + PREFIX "${NCCL_SOURCE_DIR}" + UPDATE_COMMAND "" + CONFIGURE_COMMAND "" + BUILD_COMMAND "${NCCL_BUILD_COMMAND}" + INSTALL_COMMAND "${NCCL_INSTALL_COMMAND}" + INSTALL_DIR "${NCCL_INSTALL_DIR}" + TEST_COMMAND "" ) -if (WITH_DSO) - if (${CMAKE_VERSION} VERSION_LESS "3.3.0") - set(dummyfile ${CMAKE_CURRENT_BINARY_DIR}/lib_any_dummy.c) - file(WRITE ${dummyfile} "const char * dummy_any = \"${dummyfile}\";") +if(WITH_DSO) + if(${CMAKE_VERSION} VERSION_LESS "3.3.0") + set(dummyfile ${CMAKE_CURRENT_BINARY_DIR}/lib_nccl_dummy.c) + file(WRITE ${dummyfile} "const char * dummy_nccl = \"${dummyfile}\";") add_library(nccl STATIC ${dummyfile}) else() add_library(nccl INTERFACE) endif() else() - ADD_LIBRARY(nccl STATIC IMPORTED GLOBAL) - SET_PROPERTY(TARGET nccl PROPERTY IMPORTED_LOCATION - ${NCCL_INSTALL_DIR}/lib/libnccl.a) + add_library(nccl STATIC IMPORTED GLOBAL) + set_property(TARGET nccl PROPERTY IMPORTED_LOCATION + ${NCCL_INSTALL_DIR}/lib/libnccl_static.a) endif() add_dependencies(nccl extern_nccl) - -LIST(APPEND external_project_dependencies nccl) diff --git a/doc/design/graph_survey.md b/doc/design/graph_survey.md new file mode 100644 index 0000000000..6c6db08f46 --- /dev/null +++ b/doc/design/graph_survey.md @@ -0,0 +1,232 @@ +## Survey on Graph + +Neural network framework often provides symbolic API for users to write network topology conveniently. This doc manily focus on symbolic API in most popular neural network frameworks, and try to find out how to parse symbolic configuration to a portable file, such as protobuf or json. + +### Mxnet + +The core concept of symbolic API is `Symbol`. Mxnet implements `Symbol` class in C++, and export to Python using C-API. Please refer to the comments in Mxnet: + + +`Symbol` is help class used to represent the operator node in Graph. +`Symbol` acts as an interface for building graphs from different components like Variable, Functor and Group. `Symbol` is also exported to python front-end (while Graph is not) to enable quick test and deployment. Conceptually, symbol is the final operation of a graph and thus including all the information required (the graph) to evaluate its output value. + + +A simple network topology wrote by Symbol is as follows: + +```python +def get_symbol(num_classes=10, **kwargs): + data = mx.symbol.Variable('data') + data = mx.symbol.Flatten(data=data) + fc1 = mx.symbol.FullyConnected(data = data, name='fc1', num_hidden=128) + act1 = mx.symbol.Activation(data = fc1, name='relu1', act_type="relu") + fc2 = mx.symbol.FullyConnected(data = act1, name = 'fc2', num_hidden = 64) + act2 = mx.symbol.Activation(data = fc2, name='relu2', act_type="relu") + fc3 = mx.symbol.FullyConnected(data = act2, name='fc3', num_hidden=num_classes) + mlp = mx.symbol.SoftmaxOutput(data = fc3, name = 'softmax') + return mlp +``` + + + +Varible here is actually a Symbol. Every basic Symbol will correspond to one Node, and every Node has its own NodeAttr. There is a op field in NodeAttr class, when a Symbol represents Variable(often input data), the op field is null. + +Symbol contains a data member, std::vector outputs, and NodeEntry cantains a poniter to Node. We can follow the Node pointer to get all the Graph. + +And Symbol can be saved to a Json file. + +Here is a detailed example: + +``` +>>> import mxnet as mx +>>> data = mx.symbol.Variable('data') +>>> print data.debug_str() +Variable:data + +>>> data = mx.symbol.Flatten(data=data) +>>> print data.debug_str() +Symbol Outputs: + output[0]=flatten0(0) +Variable:data +-------------------- +Op:Flatten, Name=flatten0 +Inputs: + arg[0]=data(0) version=0 + +>>> fc1 = mx.symbol.FullyConnected(data = data, name='fc1', num_hidden=128) +>>> print fc1.debug_str() +Symbol Outputs: + output[0]=fc1(0) +Variable:data +-------------------- +Op:Flatten, Name=flatten0 +Inputs: + arg[0]=data(0) version=0 +Variable:fc1_weight +Variable:fc1_bias +-------------------- +Op:FullyConnected, Name=fc1 +Inputs: + arg[0]=flatten0(0) + arg[1]=fc1_weight(0) version=0 + arg[2]=fc1_bias(0) version=0 +Attrs: + num_hidden=128 + +``` + + +### TensorFlow + + +The core concept of symbolic API is `Tensor`. Tensorflow defines `Tensor` in Python. Please refer to the comments in TensorFlow: + +A `Tensor` is a symbolic handle to one of the outputs of an `Operation`. It does not hold the values of that operation's output, but instead provides a means of computing those values in a TensorFlow [Session](https://www.tensorflow.org/api_docs/python/tf/Session). + +A simple example is as follows: + +```python + # Build a dataflow graph. + c = tf.constant([[1.0, 2.0], [3.0, 4.0]]) + d = tf.constant([[1.0, 1.0], [0.0, 1.0]]) + e = tf.matmul(c, d) + + # Construct a `Session` to execute the graph. + sess = tf.Session() + + # Execute the graph and store the value that `e` represents in `result`. + result = sess.run(e) +``` + + +The main method of `Tensor` is as follows: + + +```python +@property +def op(self): + """The `Operation` that produces this tensor as an output.""" + return self._op + +@property +def dtype(self): + """The `DType` of elements in this tensor.""" + return self._dtype + +@property +def graph(self): + """The `Graph` that contains this tensor.""" + return self._op.graph + +@property +def name(self): + """The string name of this tensor.""" + if not self._op.name: + raise ValueError("Operation was not named: %s" % self._op) + return "%s:%d" % (self._op.name, self._value_index) + +@property +def device(self): + """The name of the device on which this tensor will be produced, or None.""" + return self._op.device +``` + + +Tensor can be taken as target to run by session. Tensor contains all the information of Graph, and tracks data dependency. + + +Here is a detailed example: + + +``` +>>> import tensorflow as tf +>>> c = tf.constant([[1.0, 2.0], [3.0, 4.0]]) +>>> print c.graph + +>>> d = tf.constant([[1.0, 1.0], [0.0, 1.0]]) +>>> print d.graph + +>>> e = tf.matmul(c, d) +>>> print e.graph + +``` + +### Dynet + + +The core concept of symbolic API is `Expression`, and Dynet defines `Expression` class in C++. + + +A simple example is as follows: + +```cpp +ComputationGraph cg; +Expression W = parameter(cg, pW); + +Expression in = input(cg, xs[i]); +Expression label = input(cg, ys[i]); +Expression pred = W * in; +Expression loss = square(pred - label); +``` + +The input data and parameter are also represented by Expression. Every basci Expression corresponds to a Node. And input data is also a Node. + +Expression has a data member ComputationGraph, and ComputationGraph will be modified in users' configuring process. Expression can be a running target, beacuse Expression contains all dependency. + + +Here is a detailed example: + +write topology in C++ + +``` +ComputationGraph cg; +Expression W = parameter(cg, pW); +cg.print_graphviz(); + +Expression pred = W * xs[i]; +cg.print_graphviz(); + +Expression loss = square(pred - ys[i]); +cg.print_graphviz(); +``` + +compile and print + +``` +# first print +digraph G { + rankdir=LR; + nodesep=.05; + N0 [label="v0 = parameters({1}) @ 0x7ffe4de00110"]; +} +# second print +digraph G { + rankdir=LR; + nodesep=.05; + N0 [label="v0 = parameters({1}) @ 0x7ffe4de00110"]; + N1 [label="v1 = v0 * -0.98"]; + N0 -> N1; +} +# third print +digraph G { + rankdir=LR; + nodesep=.05; + N0 [label="v0 = parameters({1}) @ 0x7ffe4de00110"]; + N1 [label="v1 = v0 * -0.98"]; + N0 -> N1; + N2 [label="v2 = -1.88387 - v1"]; + N1 -> N2; + N3 [label="v3 = -v2"]; + N2 -> N3; + N4 [label="v4 = square(v3)"]; + N3 -> N4; +} +``` + +### Conclusion + + +Actually, Symbol/Tensor/Expression in Mxnet/TensorFlow/Dynet are the same level concepts. We use a unified name Expression here, this level concept has following features: + +- Users wirte topoloy with symbolic API, and all return value is Expression, including input data and parameter. +- Expression corresponds with a global Graph, and Expression can also be composed. +- Expression tracks all dependency and can be taken as a run target diff --git a/doc/design/model_format.md b/doc/design/model_format.md new file mode 100644 index 0000000000..db8c36e5f5 --- /dev/null +++ b/doc/design/model_format.md @@ -0,0 +1,36 @@ +# Design Doc: Model Format + +## Motivation + +The model is the output of training process. One complete model consists of two parts, namely, the **topology** and the **parameters**. To support industrial deployment, we need to make the model format must be self-completed and do not expose any training source code. + +As a result, In PaddlePaddle, the **topology** represents as a [ProgramDesc](https://github.com/PaddlePaddle/Paddle/blob/1c0a4c901c9fc881d120249c703b15d1c50dae7d/doc/design/program.md), which describes the model structure. The **parameters** contain all the trainable weights in the model, we must support large size parameter, and efficient serialization/deserialization. + +## Implementation + +The topology is saved as a plain text, in detail, a self-contain protobuf file. + +The parameters are saved as a binary file. As we all know, the protobuf message has the limits of [64M size](https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.coded_stream#CodedInputStream.SetTotalBytesLimit.details). We do a (benchmark experiment)[https://github.com/PaddlePaddle/Paddle/pull/4610], its result shows protobuf is not fit in this scene. + +As a result, we design a particular format for tensor serialization. By default, arbitrary tensor in Paddle is a [LoDTensor](https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/lod_tensor.md), and has a description information proto of (LoDTensorDesc)[https://github.com/PaddlePaddle/Paddle/blob/develop/paddle/framework/framework.proto#L99]. We save the DescProto as the byte string header, it contains the necessary information, such as the `dims`, the `name` of the tensor, and the `LoD` information in [LoDTensor](https://github.com/PaddlePaddle/Paddle/blob/1c0a4c901c9fc881d120249c703b15d1c50dae7d/paddle/framework/lod_tensor.md). Tensor stores value in a continuous memory buffer, for speed we dump the raw memory to disk and save it as the byte string content. So, the binary format of one tensor is, + +|HeaderLength|ContentLength|**LoDTensorDesc**|**TensorValue**| + +In detail, tensor's byte view as the table shows. Note that all the signed value written in little-endian. + +```text +[offset] [type] [description] +0004 4 bytes integer HeaderLength, the length of LoDTensorDesc +0008 4 bytes integer ContentLength, the length of LodTensor Buffer +0009 1 bytes char TensorDesc +00010 1 bytes char TensorDesc +... +00100 1 bytes char TensorValue +00101 1 bytes char TensorValue +00102 1 bytes char TensorValue .. +... +``` + +## Summary + +We introduce the model format, the `ProgramDesc` describe the **topology**, and a bunch of particular format binary tensors describes the **parameters**. diff --git a/doc/design/optimizer.md b/doc/design/optimizer.md index 17440fae50..202b4b6510 100644 --- a/doc/design/optimizer.md +++ b/doc/design/optimizer.md @@ -65,20 +65,6 @@ class Optimizer(object): def __init__(self): pass - def create_backward_pass(self, loss, parameter_list=None): - """ - create and add gradient Operators in BlockDesc to Compute gradients of `loss` - for parameters in parameter_list - - Args: - loss: an variable generated by cost function. - parameter_list: parameters that need to compute gradient and update to optimize the lost. - - Returns: - list of (parameters, gradients) pair. - """ - return None - def create_optimization_pass(self, parameters_and_grads): """Add optimization operators to update gradients to variables. @@ -93,7 +79,7 @@ class Optimizer(object): def minimize(self, loss, parameter_list): """Add operations to minimize `loss` by updating `parameter_list`. - This method combines interface `create_backward_pass()` and + This method combines interface `append_backward_ops()` and `create_optimization_pass()` into one. """ params_grads = self.create_backward_pass(loss, parameter_list) diff --git a/go/cmd/master/master.go b/go/cmd/master/master.go index 739c4c01e0..f57db1c0a0 100644 --- a/go/cmd/master/master.go +++ b/go/cmd/master/master.go @@ -25,9 +25,8 @@ import ( "strings" "time" + log "github.com/inconshreveable/log15" "github.com/namsral/flag" - log "github.com/sirupsen/logrus" - "github.com/topicai/candy" "github.com/PaddlePaddle/Paddle/go/master" "github.com/PaddlePaddle/Paddle/go/utils/networkhelper" @@ -41,16 +40,20 @@ func main() { taskTimeoutMax := flag.Int("task-timeout-max", 3, "max timtout count for each task before it being declared failed task.") chunkPerTask := flag.Int("chunk-per-task", 10, "chunk per task.") logLevel := flag.String("log-level", "info", - "log level, possible values: debug, info, warning, error, fatal, panic") + "log level, possible values: debug, info, warn, error, crit") flag.Parse() - level, e := log.ParseLevel(*logLevel) - candy.Must(e) + lvl, err := log.LvlFromString(*logLevel) + if err != nil { + panic(err) + } - log.SetLevel(level) + log.Root().SetHandler( + log.LvlFilterHandler(lvl, log.CallerStackHandler("%+v", log.StderrHandler)), + ) if *endpoints == "" { - log.Warningln("-endpoints not set, fault tolerance not be enabled.") + log.Warn("-endpoints not set, fault tolerance not be enabled.") } var store master.Store @@ -58,23 +61,25 @@ func main() { eps := strings.Split(*endpoints, ",") ip, err := networkhelper.GetExternalIP() if err != nil { - log.Fatal(err) + log.Crit("get external ip error", log.Ctx{"error": err}) + panic(err) } addr := fmt.Sprintf("%s:%d", ip, *port) store, err = master.NewEtcdClient(eps, addr, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec) if err != nil { - log.Fatal(err) + log.Crit("error creating etcd client.", log.Ctx{"error": err}) + panic(err) } } else { store = &master.InMemStore{} } shutdown := func() { - log.Infoln("shutting down gracefully") + log.Info("shutting down gracefully") err := store.Shutdown() if err != nil { - log.Errorln(err) + log.Error("shutdown error", log.Ctx{"error": err}) } } @@ -86,24 +91,28 @@ func main() { s, err := master.NewService(store, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax) if err != nil { - log.Fatal(err) + log.Crit("error creating new service.", log.Ctx{"error": err}) + panic(err) } err = rpc.Register(s) if err != nil { - log.Fatal(err) + log.Crit("error registering to etcd.", log.Ctx{"error": err}) + panic(err) } rpc.HandleHTTP() l, err := net.Listen("tcp", ":"+strconv.Itoa(*port)) if err != nil { - log.Fatal(err) + log.Crit("error listing to port", log.Ctx{"error": err, "port": *port}) + panic(err) } go func() { err = http.Serve(l, nil) if err != nil { - log.Fatal(err) + log.Crit("error serving HTTP", log.Ctx{"error": err}) + panic(err) } }() diff --git a/go/cmd/pserver/pserver.go b/go/cmd/pserver/pserver.go index bec5775d54..90f9cf3fcf 100644 --- a/go/cmd/pserver/pserver.go +++ b/go/cmd/pserver/pserver.go @@ -27,11 +27,11 @@ import ( "github.com/topicai/candy" "github.com/PaddlePaddle/Paddle/go/pserver" - log "github.com/sirupsen/logrus" + log "github.com/inconshreveable/log15" ) func main() { - port := flag.Int("port", 0, "port of the pserver") + port := flag.Int("port", 8001, "port of the pserver") index := flag.Int("index", -1, "index of the pserver, set to -1 if use etcd for auto pserver index registry") etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379", "comma separated endpoint string for pserver to connect to etcd") @@ -41,13 +41,17 @@ func main() { checkpointPath := flag.String("checkpoint-path", "/checkpoints/", "save checkpoint path") checkpointInterval := flag.Duration("checkpoint-interval", 600*time.Second, "save checkpoint per interval seconds") logLevel := flag.String("log-level", "info", - "log level, possible values: debug, info, warning, error, fatal, panic") + "log level, possible values: debug, info, warn, error, crit") flag.Parse() - level, err := log.ParseLevel(*logLevel) - candy.Must(err) + lvl, err := log.LvlFromString(*logLevel) + if err != nil { + panic(err) + } - log.SetLevel(level) + log.Root().SetHandler( + log.LvlFilterHandler(lvl, log.CallerStackHandler("%+v", log.StderrHandler)), + ) var idx int @@ -63,7 +67,7 @@ func main() { cp, err = pserver.LoadCheckpoint(e, idx) if err != nil { if err == pserver.ErrCheckpointNotFound { - log.Infof("Could not find the pserver checkpoint.") + log.Info("Could not find the pserver checkpoint.") } else { panic(err) } @@ -71,10 +75,10 @@ func main() { } shutdown := func() { - log.Infoln("shutting down gracefully") + log.Info("shutting down gracefully") sErr := e.Shutdown() if sErr != nil { - log.Errorln(sErr) + log.Error("error shutting down", log.Ctx{"error": sErr}) } } @@ -95,7 +99,7 @@ func main() { candy.Must(err) go func() { - log.Infof("start pserver at port %d", *port) + log.Info("starting pserver", log.Ctx{"port": *port}) err = http.Serve(l, nil) candy.Must(err) }() diff --git a/go/glide.lock b/go/glide.lock index aabc03657f..ce654d3636 100644 --- a/go/glide.lock +++ b/go/glide.lock @@ -1,5 +1,5 @@ -hash: 328e7b9b7306b45e7b9879139a9f86698115981f6283032e1312093a6a6ddb04 -updated: 2017-10-16T08:00:23.484693528Z +hash: 51d9e2e46d7fd9173ff11ecada40f7b7728756be18d5e2f032535f66465e6e15 +updated: 2017-10-24T15:04:09.987751592-07:00 imports: - name: github.com/alecthomas/gometalinter version: bae2f1293d092fd8167939d5108d1b025eaef9de @@ -99,6 +99,8 @@ imports: version: d2709f9f1f31ebcda9651b03077758c1f3a0018c - name: github.com/ghodss/yaml version: 0ca9ea5df5451ffdf184b4428c902747c2c11cd7 +- name: github.com/go-stack/stack + version: 817915b46b97fd7bb80e8ab6b69f01a53ac3eebf - name: github.com/gogo/protobuf version: 909568be09de550ed094403c2bf8a261b5bb730a subpackages: @@ -120,8 +122,14 @@ imports: - runtime - runtime/internal - utilities +- name: github.com/inconshreveable/log15 + version: 0decfc6c20d9ca0ad143b0e89dcaa20f810b4fb3 - name: github.com/jonboulle/clockwork version: 2eee05ed794112d45db504eb05aa693efd2b8b09 +- name: github.com/mattn/go-colorable + version: 5411d3eea5978e6cdc258b30de592b60df6aba96 +- name: github.com/mattn/go-isatty + version: 57fdcb988a5c543893cc61bce354a6e24ab70022 - name: github.com/matttproud/golang_protobuf_extensions version: c12348ce28de40eed0136aa2b644d0ee0650e56c subpackages: @@ -179,11 +187,12 @@ imports: - lex/httplex - trace - name: golang.org/x/sys - version: 0f826bdd13b500be0f1d4004938ad978fcc6031e + version: e48874b42435b4347fc52bdee0424a52abc974d7 repo: https://github.com/golang/sys.git vcs: git subpackages: - unix + - windows - name: golang.org/x/text version: 836efe42bb4aa16aaa17b9c155d8813d336ed720 repo: https://github.com/golang/text.git @@ -222,4 +231,3 @@ testImports: version: 05e8a0eda380579888eb53c394909df027f06991 subpackages: - assert - diff --git a/go/glide.yaml b/go/glide.yaml index 4b22ab2caa..ba253f8beb 100644 --- a/go/glide.yaml +++ b/go/glide.yaml @@ -26,3 +26,7 @@ import: version: v1.1.0 - package: github.com/alecthomas/gometalinter version: v1.2.1 +- package: github.com/inconshreveable/log15 + version: v2.13 +- package: github.com/go-stack/stack + version: v1.6.0 diff --git a/go/master/c/client.go b/go/master/c/client.go index b5759c30b1..9a59337108 100644 --- a/go/master/c/client.go +++ b/go/master/c/client.go @@ -35,13 +35,19 @@ import ( "unsafe" "github.com/PaddlePaddle/Paddle/go/master" - log "github.com/sirupsen/logrus" + log "github.com/inconshreveable/log15" ) var mu sync.Mutex var handleMap = make(map[C.paddle_master_client]*master.Client) var curHandle C.paddle_master_client +func init() { + log.Root().SetHandler( + log.LvlFilterHandler(log.LvlWarn, log.CallerStackHandler("%+v", log.StderrHandler)), + ) +} + func add(c *master.Client) C.paddle_master_client { mu.Lock() defer mu.Unlock() @@ -117,7 +123,7 @@ func paddle_set_dataset(client C.paddle_master_client, path **C.char, size C.int } err := c.SetDataset(paths) if err != nil { - log.Errorln(err) + log.Error("error set dataset", log.Ctx{"error": err}) return C.PADDLE_MASTER_ERROR } @@ -167,7 +173,7 @@ func paddle_request_save_model(client C.paddle_master_client, trainerID string, c := get(client) need, err := c.RequestSaveModel(trainerID, time.Duration(blockMS)*time.Millisecond) if err != nil { - log.Errorln(err) + log.Error("error request save model", log.Ctx{"error": err}) return C.PADDLE_MASTER_ERROR } diff --git a/go/master/client.go b/go/master/client.go index f04cf50ce3..5d657548c9 100644 --- a/go/master/client.go +++ b/go/master/client.go @@ -21,7 +21,7 @@ import ( "github.com/PaddlePaddle/Paddle/go/connection" "github.com/PaddlePaddle/recordio" "github.com/coreos/etcd/clientv3" - log "github.com/sirupsen/logrus" + log "github.com/inconshreveable/log15" ) // Client is the client of the master server. @@ -75,7 +75,7 @@ func WithEtcd(endpoints []string, timeout time.Duration) func(*Client) error { for { err := f() if err != nil { - log.Warningln(err) + log.Warn("create etcd client error", log.Ctx{"error": err}) } else { break } @@ -135,13 +135,13 @@ func (c *Client) getRecords(passID int) { time.Sleep(time.Second * 3) continue } - log.Errorf("getTask error: %s", err) + log.Error("getTask error.", log.Ctx{"error": err}) } for _, chunk := range t.Chunks { f, e := os.Open(chunk.Path) if e != nil { - log.Errorln(e) + log.Error("error open chunk", log.Ctx{"error": e}) continue } @@ -152,12 +152,15 @@ func (c *Client) getRecords(passID int) { if s.Err() != nil { c.ch <- record{nil, s.Err()} - log.Errorln(err, chunk.Path) + log.Error( + "error scan chunk", + log.Ctx{"error": err, "path": chunk.Path}, + ) } err = f.Close() if err != nil { - log.Errorln(err) + log.Error("error close record file", log.Ctx{"error": err}) } } @@ -166,7 +169,7 @@ func (c *Client) getRecords(passID int) { // correct, but a reasonable approximation. err = c.taskFinished(t.Meta.ID) if err != nil { - log.Errorln(err) + log.Error("task finish callback error.", log.Ctx{"error": err}) } } } @@ -179,12 +182,12 @@ func (c *Client) monitorMaster(addrCh <-chan string) { if curMaster == "" { err := c.conn.Close() if err != nil { - log.Errorln(err) + log.Error("close old master addr error", log.Ctx{"error": err}) } } else { err := c.conn.Connect(curMaster) if err != nil { - log.Errorln(err) + log.Error("connect to new master addr error", log.Ctx{"error": err}) // connect to addr failed, set // to last known addr in order diff --git a/go/master/client_internal_test.go b/go/master/client_internal_test.go index d5f3d79464..2f13fd0dcd 100644 --- a/go/master/client_internal_test.go +++ b/go/master/client_internal_test.go @@ -25,8 +25,6 @@ import ( "testing" "time" - log "github.com/sirupsen/logrus" - "github.com/PaddlePaddle/Paddle/go/connection" "github.com/PaddlePaddle/recordio" ) @@ -36,10 +34,6 @@ const ( chunkPerTask = 10 ) -func init() { - log.SetLevel(log.ErrorLevel) -} - func TestGetFinishTask(t *testing.T) { const path = "/tmp/master_client_test_0" diff --git a/go/master/etcd_client.go b/go/master/etcd_client.go index 94848d887e..2a41d36949 100644 --- a/go/master/etcd_client.go +++ b/go/master/etcd_client.go @@ -20,7 +20,7 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" - log "github.com/sirupsen/logrus" + log "github.com/inconshreveable/log15" ) const ( @@ -44,7 +44,7 @@ type EtcdClient struct { // NewEtcdClient creates a new EtcdClient. func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) { - log.Debugf("Connecting to etcd at %v", endpoints) + log.Debug("Connecting to etcd", log.Ctx{"endpoint": endpoints}) cli, err := clientv3.New(clientv3.Config{ Endpoints: endpoints, DialTimeout: dialTimeout, @@ -64,12 +64,12 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat // one master running, but split-brain problem may cause // multiple master servers running), and the cluster management // software will kill one of them. - log.Infof("Trying to acquire lock at %s.", lockPath) + log.Info("Trying to acquire lock.", log.Ctx{"path": lockPath}) err = lock.Lock(context.TODO()) if err != nil { return nil, err } - log.Infof("Successfully acquired lock at %s.", lockPath) + log.Info("Successfully acquired lock at %s.", log.Ctx{"path": lockPath}) put := clientv3.OpPut(addrPath, addr) resp, err := cli.Txn(context.Background()).If(lock.IsOwner()).Then(put).Commit() @@ -78,7 +78,8 @@ func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePat } if !resp.Succeeded { - log.Fatal("No longer owns the master lock. Exiting.") + log.Crit("No longer owns the master lock. Exiting.") + panic("No longer owns the master lock. Exiting.") } e := &EtcdClient{ @@ -102,7 +103,7 @@ func (e *EtcdClient) Save(state []byte) error { } if !resp.Succeeded { - log.Errorln("No longer owns the lock, trying to lock again") + log.Error("No longer owns the lock, trying to lock again") ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) err := e.lock.Lock(ctx) cancel() @@ -116,9 +117,10 @@ func (e *EtcdClient) Save(state []byte) error { // to kill current master server. The current // state is not saved, but the trainer's RPC // call will fail, so the trainer will retry. - log.Fatalf("Could not acquire the lock at %s: %v. Exiting.", e.lockPath, err) + log.Crit("Could not acquire the lock at %s: %v. Exiting.", log.Ctx{"path": e.lockPath, "error": err}) + panic("Could not acquire the lock at %s: %v. Exiting.") } - log.Infof("Successfully acquired lock at %s.", e.lockPath) + log.Info("Successfully acquired lock at %s.", e.lockPath) return e.Save(state) } @@ -136,7 +138,7 @@ func (e *EtcdClient) Load() ([]byte, error) { } if !resp.Succeeded { - log.Errorln("No longer owns the lock, trying to lock and load again.") + log.Error("No longer owns the lock, trying to lock and load again.") err = e.lock.Lock(context.Background()) if err != nil { return nil, err @@ -163,7 +165,7 @@ func (e *EtcdClient) Shutdown() error { if err == nil { err = newErr } else { - log.Errorln(newErr) + log.Error("shutdown error", log.Ctx{"error": newErr}) } } @@ -192,7 +194,7 @@ func watchKey(c *clientv3.Client, key string, valChan chan<- string) { for wresp := range rch { for _, ev := range wresp.Events { // if received event is DELETE, the value will be an empty string - log.Infof("received event %s, %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) + log.Info("received event.", log.Ctx{"type": ev.Type, "key": ev.Kv.Key, "value": ev.Kv.Value}) valChan <- string(ev.Kv.Value) } } diff --git a/go/master/service.go b/go/master/service.go index df7c6860e6..f350102880 100644 --- a/go/master/service.go +++ b/go/master/service.go @@ -25,7 +25,7 @@ import ( "sync" "time" - log "github.com/sirupsen/logrus" + log "github.com/inconshreveable/log15" "github.com/PaddlePaddle/recordio" ) @@ -170,11 +170,11 @@ func (s *Service) recover() (bool, error) { } if state == nil { - log.Infoln("No state exists, not recovered.") + log.Info("No state exists, not recovered.") return false, nil } - log.Infof("Loaded snapshot of size: %d bytes.", len(state)) + log.Info("Loaded snapshot.", log.Ctx{"size": len(state)}) gr, err := gzip.NewReader(bytes.NewReader(state)) if err != nil { return false, err @@ -191,11 +191,11 @@ func (s *Service) recover() (bool, error) { if err != nil { // Only close failed, recover actually succeed, so // just log error. - log.Errorln(err) + log.Error("error close recover file.", log.Ctx{"error": err}) } s.state = tqs - log.WithFields(s.logFields()).Infof("Master recovered from snapshot, scheduling pending task timeout check.") + log.Info("Master recovered from snapshot, scheduling pending task timeout check.", s.logCtx()) for _, t := range s.state.Pending { time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.Meta.ID, t.Task.Meta.Epoch)) } @@ -224,7 +224,7 @@ func (s *Service) snapshot() error { } state := buf.Bytes() - log.Infof("Saving snapshot of size: %d bytes.", len(state)) + log.Info("Saving snapshot.", log.Ctx{"size bytes": len(state)}) return s.store.Save(state) } @@ -260,7 +260,7 @@ func readChunks(globPaths []string) ([]Chunk, error) { } count := index.NumChunks() - log.Infof("readChunks: file %s has %d chunks", path, count) + log.Info("reading chunks.", log.Ctx{"path": path, "num chunks": count}) for i := 0; i < count; i++ { chunk := Chunk{ Path: path, @@ -300,7 +300,7 @@ func (s *Service) SetDataset(globPaths []string, _ *int) error { err = s.snapshot() if err != nil { - log.Errorln(err) + log.Error("snapshot error", log.Ctx{"error": err}) return err } close(s.ready) @@ -320,7 +320,7 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) { defer func() { err := s.snapshot() if err != nil { - log.Errorln(err) + log.Error("snapshot error", log.Ctx{"error": err}) } }() @@ -328,12 +328,12 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) { t.NumFailure++ if t.NumFailure > s.failureMax { - log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure) + log.Warn("Task failed to many times, discard.", log.Ctx{"task": t.Task, "num failed": t.NumFailure}) s.state.Failed = append(s.state.Failed, t) return } - log.Warningf("Task %v failed %d times, re-dispatch.", t.Task, t.NumFailure) + log.Warn("Task failed, re-dispatch.", log.Ctx{"task": t.Task, "num failed": t.NumFailure}) s.state.Todo = append(s.state.Todo, t) return } @@ -353,8 +353,8 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() { } // must be called with lock held. -func (s *Service) logFields() log.Fields { - return log.Fields{ +func (s *Service) logCtx() log.Ctx { + return log.Ctx{ "todoLen": len(s.state.Todo), "pendingLen": len(s.state.Pending), "doneLen": len(s.state.Done), @@ -383,10 +383,10 @@ func (s *Service) GetTask(passID int, task *Task) error { if len(s.state.Todo) == 0 { if len(s.state.Done) == 0 && len(s.state.Pending) == 0 { - log.WithFields(s.logFields()).Warningln("All tasks failed, may start next pass") + log.Warn("All tasks failed, may start next pass", s.logCtx()) return ErrAllTaskFailed } - log.WithFields(s.logFields()).Warningln("No more available task.") + log.Warn("No more available task.", s.logCtx()) return ErrNoMoreAvailable } @@ -400,8 +400,9 @@ func (s *Service) GetTask(passID int, task *Task) error { } *task = t.Task - log.WithFields(s.logFields()).Infof("Task #%v dispatched.", t.Task.Meta) - + ctx := s.logCtx() + ctx["task meta"] = t.Task.Meta + log.Info("Task dispatched.", ctx) time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.Meta.ID, t.Task.Meta.Epoch)) return nil } @@ -417,7 +418,9 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error { t, ok := s.state.Pending[taskID] if !ok { - log.WithFields(s.logFields()).Warningln("Pending task #%d not found.", taskID) + ctx := s.logCtx() + ctx["task id"] = taskID + log.Warn("Pending task not found.", ctx) return nil } @@ -426,7 +429,9 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error { s.state.Done = append(s.state.Done, t) delete(s.state.Pending, taskID) - log.WithFields(s.logFields()).Infof("Task #%d finished.", taskID) + ctx := s.logCtx() + ctx["task id"] = taskID + log.Info("Task finished.", ctx) if len(s.state.Todo) == 0 && len(s.state.Pending) == 0 { // increase master side pass count if all tasks finished s.state.CurPass++ @@ -434,12 +439,14 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error { s.state.Done = []taskEntry{} // TODO(typhoonzero): deal with failed tasks s.state.Failed = []taskEntry{} - log.WithFields(s.logFields()).Warningf("all task finished, add new pass data, newpass: %d.", s.state.CurPass) + ctx := s.logCtx() + ctx["new pass"] = s.state.CurPass + log.Warn("all task finished, add new pass data.", ctx) } err := s.snapshot() if err != nil { - log.Errorln(err) + log.Error("snapshot error", log.Ctx{"error": err}) } return err } @@ -455,7 +462,7 @@ func (s *Service) TaskFailed(meta TaskMeta, dummy *int) error { t, ok := s.state.Pending[meta.ID] if !ok { - log.WithFields(s.logFields()).Warningln("TaskFailed:Pending task #%v not found.", t.Task.Meta) + log.Warn("TaskFailed:Pending task not found.", log.Ctx{"task": t.Task.Meta}) return nil } diff --git a/go/pserver/client/c/cclient.go b/go/pserver/client/c/cclient.go index a49cd01522..2eeec1b6b3 100644 --- a/go/pserver/client/c/cclient.go +++ b/go/pserver/client/c/cclient.go @@ -45,9 +45,15 @@ import ( "github.com/PaddlePaddle/Paddle/go/pserver" "github.com/PaddlePaddle/Paddle/go/pserver/client" - log "github.com/sirupsen/logrus" + log "github.com/inconshreveable/log15" ) +func init() { + log.Root().SetHandler( + log.LvlFilterHandler(log.LvlWarn, log.CallerStackHandler("%+v", log.StderrHandler)), + ) +} + var mu sync.Mutex var handleMap = make(map[C.paddle_pserver_client]*client.Client) var curHandle C.paddle_pserver_client @@ -164,10 +170,13 @@ func paddle_init_param(client C.paddle_pserver_client, param C.paddle_parameter, if err != nil { if err.Error() == pserver.AlreadyInitialized { - log.Warningf("parameter %s already initialized, treat paddle_init_param as successful.", name) + log.Warn( + "parameter already initialized, treat paddle_init_param as successful.", + log.Ctx{"parameter": name}, + ) return C.PSERVER_OK } - log.Errorln(err) + log.Error("error init param", log.Ctx{"error": err}) return C.PSERVER_ERROR } @@ -180,11 +189,11 @@ func paddle_finish_init_params(client C.paddle_pserver_client) C.int { err := c.FinishInitParams() if err != nil { if err.Error() == pserver.AlreadyInitialized { - log.Warningln("parameters already initialized, treat paddle_finish_init_params as successful.") + log.Warn("parameters already initialized, treat paddle_finish_init_params as successful.") return C.PSERVER_OK } - log.Errorln(err) + log.Error("error finish init params", log.Ctx{"error": err}) return C.PSERVER_ERROR } @@ -205,7 +214,7 @@ func paddle_send_grads(client C.paddle_pserver_client, grads **C.paddle_gradient c := get(client) err := c.SendGrads(gs) if err != nil { - log.Errorln(err) + log.Error("error send grads", log.Ctx{"error": err}) return C.PSERVER_ERROR } @@ -222,7 +231,7 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter, c := get(client) ps, err := c.GetParams(ns) if err != nil { - log.Errorln(err) + log.Error("error get params", log.Ctx{"error": err}) return C.PSERVER_ERROR } @@ -231,7 +240,13 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter, for i, p := range ps { pn[i] = p.Name } - log.Errorf("pserver returned wrong number of parameters. Requested: %s, returned: %s.", strings.Join(pn, ", "), strings.Join(ns, ", ")) + log.Error( + "pserver returned wrong number of parameters.", + log.Ctx{ + "Requested": strings.Join(pn, ", "), + "Returned": strings.Join(ns, ", "), + }, + ) return C.PSERVER_ERROR } @@ -241,7 +256,13 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter, for i, p := range ps { pn[i] = p.Name } - log.Errorf("pserver returned wrong parameters, or not in requested order. Requested: %s, returned: %s.", strings.Join(pn, ", "), strings.Join(ns, ", ")) + log.Error( + "pserver returned wrong parameters, or not in requested order.", + log.Ctx{ + "Requested": strings.Join(pn, ", "), + "Returned": strings.Join(ns, ", "), + }, + ) return C.PSERVER_ERROR } } @@ -251,13 +272,19 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter, param := *(**C.paddle_parameter)(unsafe.Pointer((uintptr(unsafe.Pointer(dst)) + uintptr(i)*unsafe.Sizeof(*dst)))) if unsafe.Pointer(param) == nil { - log.Errorln("must pre-allocate parameter.") + log.Error("must pre-allocate parameter.") return C.PSERVER_ERROR } if unsafe.Pointer(param.content) != nil { if int(param.content_len) != len(p.Content) { - log.Errorf("the pre-allocated content len does not match parameter content len. Pre-allocated len: %d, returned len: %d", param.content_len, len(p.Content)) + log.Error( + "the pre-allocated content len does not match parameter content len.", + log.Ctx{ + "Pre-allocated len": param.content_len, + "Returned len": len(p.Content), + }, + ) return C.PSERVER_ERROR } } diff --git a/go/pserver/client/client.go b/go/pserver/client/client.go index e5187ce3df..18fce34b37 100644 --- a/go/pserver/client/client.go +++ b/go/pserver/client/client.go @@ -22,7 +22,7 @@ import ( "github.com/PaddlePaddle/Paddle/go/connection" "github.com/PaddlePaddle/Paddle/go/pserver" - log "github.com/sirupsen/logrus" + log "github.com/inconshreveable/log15" ) // TODO(helin): add RPC call retry logic @@ -84,7 +84,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) { if curServers[i].Addr == "" { err := c.pservers[i].Close() if err != nil { - log.Errorln(err) + log.Error("error closing connection to pserver", log.Ctx{"error": err}) } continue @@ -92,7 +92,7 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) { err := c.pservers[i].Connect(curServers[i].Addr) if err != nil { - log.Errorln(err) + log.Error("error connecting to pserver", log.Ctx{"error": err}) // connect to addr failed, set // to last known addr in order diff --git a/go/pserver/client/client_test.go b/go/pserver/client/client_test.go index c3d88e926d..ec832305ee 100644 --- a/go/pserver/client/client_test.go +++ b/go/pserver/client/client_test.go @@ -30,7 +30,7 @@ import ( "github.com/PaddlePaddle/Paddle/go/pserver" "github.com/PaddlePaddle/Paddle/go/pserver/client" "github.com/coreos/etcd/clientv3" - log "github.com/sirupsen/logrus" + log "github.com/inconshreveable/log15" ) const ( @@ -90,7 +90,7 @@ func initEtcdClient() { DialTimeout: time.Second * time.Duration(1), }) if err != nil { - log.Errorf("err %v", err) + log.Error("error init etcd client", log.Ctx{"error": err}) } ctx, cancel := context.WithTimeout(context.Background(), timeout) _, err = client.Delete(ctx, pserver.PsDesired) diff --git a/go/pserver/client/etcd_client.go b/go/pserver/client/etcd_client.go index f9071caaa8..16d0c3b943 100644 --- a/go/pserver/client/etcd_client.go +++ b/go/pserver/client/etcd_client.go @@ -25,7 +25,7 @@ import ( "github.com/PaddlePaddle/Paddle/go/pserver" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" - log "github.com/sirupsen/logrus" + log "github.com/inconshreveable/log15" ) const ( @@ -54,26 +54,29 @@ func (e *Etcd) Desired() int { resp, err := e.client.Get(ctx, pserver.PsDesired) cancel() if err != nil { - log.Errorf("Get ps dresire number failed! recnnectiong..., %v", err) + log.Error( + "Get ps dresire number failed! reconnecting...", + log.Ctx{"error": err}, + ) time.Sleep(e.timeout) continue } kvs := resp.Kvs if len(kvs) == 0 { - log.Infoln("Waiting for ps desired registered ...") + log.Info("Waiting for ps desired registered ...") time.Sleep(e.timeout) continue } psDesired, err = strconv.Atoi(string(resp.Kvs[0].Value)) if err != nil { - log.Errorf("psDesired %d invalid %v", psDesired, err) + log.Error("atoi failed", log.Ctx{"error": err}) time.Sleep(e.timeout) continue } - log.Debugf("Get psDesired number: %d", psDesired) + log.Debug("Got psDesired", log.Ctx{"psDesired": psDesired}) break } return psDesired @@ -88,17 +91,20 @@ func (e *Etcd) List() []Server { for i := 0; i < psDesired; i++ { ctx, cancel := context.WithTimeout(context.Background(), e.timeout) psKey := pserver.PsPath + strconv.Itoa(i) - log.Debugf("checking %s", psKey) + log.Debug("looking for pserver", log.Ctx{"ps key": psKey}) resp, err := e.client.Get(ctx, psKey) cancel() if err != nil { - log.Infof("Get psKey= %s error, %v", psKey, err) + log.Info( + "Get psKey error", + log.Ctx{"ps key": psKey, "error": err}, + ) time.Sleep(e.timeout) continue } kvs := resp.Kvs if len(kvs) == 0 { - log.Infof("Waiting for ps addr registered ...") + log.Info("Waiting for ps addr registered ...") time.Sleep(e.timeout) continue } @@ -106,11 +112,17 @@ func (e *Etcd) List() []Server { psAddr := string(resp.Kvs[0].Value) // TODO(Longfei) check the ps address if psAddr == "" { - log.Infof("Get psKey = %s, psAddr is empty", psKey) + log.Info( + "Value under psKey is empty", + log.Ctx{"psKey": psKey}, + ) time.Sleep(e.timeout) continue } - log.Debugf("got value (%s) for key: %s", psAddr, psKey) + log.Debug( + "got psAddr given psKey", + log.Ctx{"psAddr": psAddr, "psKey": psKey}, + ) servers[i].Index = i servers[i].Addr = psAddr } @@ -130,13 +142,13 @@ func NewEtcd(endpoints string) *Etcd { DialTimeout: defaultEtcdTimeout, }) if err != nil { - log.Errorf("Init etcd connection failed: %v", err) + log.Error("Init etcd connection failed", log.Ctx{"error": err}) time.Sleep(defaultEtcdTimeout) continue } break } - log.Infof("Connected to etcd: %s\n", endpoints) + log.Info("Connected to etcd endpoint", log.Ctx{"endpoint": endpoints}) client := &Etcd{ client: cli, timeout: defaultEtcdTimeout, @@ -154,7 +166,7 @@ func (e *Etcd) Select() (bool, error) { } lock := concurrency.NewMutex(sess, initLockPath) - log.Infof("Trying to acquire lock at %s.", initLockPath) + log.Info("Trying to acquire lock", log.Ctx{"lock path": initLockPath}) // Do not use timeout context here, since we don't know how // long does it take for other trainers to initialize the // parameters. @@ -162,7 +174,7 @@ func (e *Etcd) Select() (bool, error) { if err != nil { return false, err } - log.Infof("Successfully acquired lock at %s.", initLockPath) + log.Info("Successfully acquired lock", log.Ctx{"lock path": initLockPath}) get := clientv3.OpGet(initDonePath) ctx, cancel := context.WithTimeout(context.Background(), e.timeout) @@ -181,17 +193,17 @@ func (e *Etcd) Select() (bool, error) { if len(resp.Kvs) == 0 { // Key value not set, select current trainer. e.lock = lock - log.Infoln("Trainer selected.") + log.Info("Trainer selected.") return true, nil } if string(resp.Kvs[0].Value) == initDoneVal { - log.Infoln("Initialization is already done.") + log.Info("Initialization is already done.") ctx, cancel = context.WithTimeout(context.Background(), e.timeout) err = lock.Unlock(ctx) cancel() if err != nil { - log.Errorln(err) + log.Error("error unlocking", log.Ctx{"error": err}) } return false, nil } @@ -221,7 +233,7 @@ func (e *Etcd) Done() error { err = e.lock.Unlock(ctx) cancel() if err != nil { - log.Errorln(err) + log.Error("error unlocking", log.Ctx{"error": err}) } else { e.lock = nil } @@ -244,7 +256,7 @@ func (e *Etcd) Close() error { cErr := e.client.Close() if cErr != nil { if err != nil { - log.Errorln(cErr) + log.Error("error closing etcd client", log.Ctx{"error": cErr}) return err } return cErr diff --git a/go/pserver/etcd_client.go b/go/pserver/etcd_client.go index 41f0640fc0..08ddb247f2 100644 --- a/go/pserver/etcd_client.go +++ b/go/pserver/etcd_client.go @@ -24,7 +24,7 @@ import ( "github.com/PaddlePaddle/Paddle/go/utils/networkhelper" "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3/concurrency" - log "github.com/sirupsen/logrus" + log "github.com/inconshreveable/log15" ) const ( @@ -82,19 +82,19 @@ func (e *EtcdClient) Register(port int) (int, error) { DialTimeout: e.dialTimeout, }) if err != nil { - log.Errorf("connect to etcd error: %v", err) + log.Error("connect to etcd error", log.Ctx{"error": err}) time.Sleep(retryTimeout) continue } e.client = cli sess, err := concurrency.NewSession(cli, concurrency.WithTTL(e.ttlSec)) if err != nil { - log.Errorf("create etcd session error: %v", err) + log.Error("create etcd session error", log.Ctx{"error": err}) time.Sleep(retryTimeout) continue } e.sess = sess - log.Debugf("inited client to %s", e.endpoints) + log.Debug("connected to etcd", log.Ctx{"endpoint": e.endpoints}) break } // init /ps_desired using transaction, for multiple pservers may want to write @@ -104,7 +104,7 @@ func (e *EtcdClient) Register(port int) (int, error) { _, err := e.initDesiredPservers(ctx, e.numPservers) cancel() if err != nil { - log.Warn(err) + log.Warn("pserver init error", log.Ctx{"error": err, "num pservers": e.numPservers}) time.Sleep(retryTimeout) continue } @@ -119,14 +119,17 @@ func (e *EtcdClient) Register(port int) (int, error) { resp, err := e.client.Get(ctx, PsDesired) cancel() if err != nil { - log.Errorf("getting %s error: %v", PsDesired, err) + log.Error("get etcd key error", log.Ctx{"key": PsDesired, "error": err}) time.Sleep(retryTimeout) continue } if len(resp.Kvs) != 0 { e.desired, err = strconv.Atoi(string(resp.Kvs[0].Value)) if err != nil { - log.Errorf("value of %s invalid %v\n", PsDesired, err) + log.Error( + "psDesired atoi error", + log.Ctx{"error": err, "value": string(resp.Kvs[0].Value)}, + ) time.Sleep(retryTimeout) // NOTE: wait util ps_desired value change continue @@ -143,7 +146,7 @@ func (e *EtcdClient) Register(port int) (int, error) { pserverIdx, err = e.registerPserverEtcd(ctx, port) cancel() if err != nil { - log.Warn(err) + log.Warn("register pserver on etcd error", log.Ctx{"error": err}) time.Sleep(retryTimeout) continue } @@ -170,16 +173,17 @@ func (e *EtcdClient) registerPserverEtcd(ctx context.Context, port int) (int, er registered := false for i := 0; i < e.desired; i++ { psKey := PsPath + strconv.Itoa(i) - log.Debugf("checking %s", psKey) ps := c.Get(psKey) - log.Debugf("got value (%s) for key: %s", ps, psKey) + log.Debug( + "register pserver got value", + log.Ctx{"value": ps, "key": psKey}, + ) if ps == "" { // find the first id and write info pserverAddr := e.externalIP + ":" + strconv.Itoa(port) c.Put(psKey, pserverAddr, clientv3.WithLease(e.sess.Lease())) - log.Debugf("set pserver node %s with value %s", psKey, pserverAddr) - log.Debug("register finished") + log.Debug("register finished", log.Ctx{"key": psKey, "value": pserverAddr}) idx = i registered = true break @@ -239,7 +243,7 @@ func (e *EtcdClient) Shutdown() error { newErr := e.client.Close() if newErr != nil { if err != nil { - log.Errorln(newErr) + log.Error("shutdown error", log.Ctx{"error": newErr}) } else { err = newErr } diff --git a/go/pserver/optimizer.go b/go/pserver/optimizer.go index ae73590734..e04c86de0a 100644 --- a/go/pserver/optimizer.go +++ b/go/pserver/optimizer.go @@ -25,7 +25,7 @@ import ( "fmt" "unsafe" - log "github.com/sirupsen/logrus" + log "github.com/inconshreveable/log15" ) type optimizer struct { @@ -56,12 +56,12 @@ func newOptimizer(paramWithConfigs ParameterWithConfig, State []byte) *optimizer c := paramWithConfigs.Config s := State paramBufferSize := C.size_t(len(p.Content)) - log.WithFields(log.Fields{ + log.Info("New Optimizer Created with config", log.Ctx{ "ElementType": p.ElementType, "ParamSize": paramBufferSize, "ConfigSize": len(c), "StateSize": len(s), - }).Info("New Optimizer Created with config:") + }) var cbuffer unsafe.Pointer cbuffer = C.malloc(paramBufferSize) @@ -72,21 +72,34 @@ func newOptimizer(paramWithConfigs ParameterWithConfig, State []byte) *optimizer } o.config = c - o.opt = C.paddle_create_optimizer((*C.uchar)(&c[0]), C.int(len(c)), - C.paddle_element_type(p.ElementType), cbuffer, C.int(paramBufferSize), (*C.char)(cstate), C.int(len(s))) + o.opt = C.paddle_create_optimizer( + (*C.uchar)(&c[0]), + C.int(len(c)), + C.paddle_element_type(p.ElementType), + cbuffer, + C.int(paramBufferSize), + (*C.char)(cstate), + C.int(len(s)), + ) return o } func (o *optimizer) GetWeights() []byte { var buffer unsafe.Pointer + // we do not own the buffer, no need to free later. bufferLen := C.paddle_optimizer_get_weights(o.opt, &buffer) return cArrayToSlice(buffer, int(bufferLen)*C.sizeof_float) } func (o *optimizer) GetStates() []byte { var cbuffer *C.char + // we owns the state buffer, need to free later. cbufferLen := C.paddle_optimizer_get_state(o.opt, &cbuffer) - return cArrayToSlice(unsafe.Pointer(cbuffer), int(cbufferLen)) + buf := cArrayToSlice(unsafe.Pointer(cbuffer), int(cbufferLen)) + cpy := make([]byte, len(buf)) + copy(cpy, buf) + C.free(unsafe.Pointer(cbuffer)) + return cpy } func (o *optimizer) UpdateParameter(g Gradient) error { diff --git a/go/pserver/optimizer_test.go b/go/pserver/optimizer_test.go index d001e6993e..565f56dc28 100644 --- a/go/pserver/optimizer_test.go +++ b/go/pserver/optimizer_test.go @@ -15,8 +15,12 @@ package pserver import ( + "encoding/binary" "io/ioutil" + "math" "testing" + + "github.com/stretchr/testify/assert" ) func TestOptimizerCreateRelease(t *testing.T) { @@ -36,3 +40,39 @@ func TestOptimizerCreateRelease(t *testing.T) { o := newOptimizer(param, nil) o.Cleanup() } + +func float32Bytes(float float32) []byte { + bits := math.Float32bits(float) + bytes := make([]byte, 4) + binary.LittleEndian.PutUint32(bytes, bits) + return bytes +} + +func TestOptimizerState(t *testing.T) { + p := Parameter{ + Name: "a", + ElementType: Int32, + } + weights := float32Bytes(100) + p.Content = weights + config, err := ioutil.ReadFile("./client/c/test/testdata/optimizer.pb") + if err != nil { + t.Fatalf("read optimizer proto failed") + } + param := ParameterWithConfig{ + Param: p, + Config: config, + } + o := newOptimizer(param, nil) + s := o.GetStates() + + // clear param content and check if the state is restored. + param.Param.Content = float32Bytes(300) + o1 := newOptimizer(param, s) + s1 := o1.GetStates() + assert.Equal(t, s, s1) + assert.Equal(t, weights, o.GetWeights()) + assert.Equal(t, weights, o1.GetWeights()) + o.Cleanup() + o1.Cleanup() +} diff --git a/go/pserver/service.go b/go/pserver/service.go index 25751540a9..6f66faaf27 100644 --- a/go/pserver/service.go +++ b/go/pserver/service.go @@ -32,7 +32,7 @@ import ( uuid "github.com/satori/go.uuid" - log "github.com/sirupsen/logrus" + log "github.com/inconshreveable/log15" ) // ElementType is the type of elements of a Parameter. @@ -124,6 +124,9 @@ func loadMeta(e *EtcdClient, idx int) (meta checkpointMeta, err error) { // LoadCheckpoint loads checkpoint from file. func LoadCheckpoint(e *EtcdClient, idx int) (Checkpoint, error) { + log.Info("Loading checkpoint", "pserver index", idx) + defer traceTime(time.Now(), "load checkpoint") + cpMeta, err := loadMeta(e, idx) if err != nil { return nil, err @@ -178,6 +181,7 @@ func NewService(idx int, interval time.Duration, path string, client *EtcdClient func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, _ *int) error { select { case <-s.initialized: + log.Warn("init param called but parameters already initialized.") return errors.New(AlreadyInitialized) default: } @@ -191,6 +195,13 @@ func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, _ *int) error // properly memory aligned, if not, make copy to a memory // aligned region. s.optMap[paramWithConfigs.Param.Name] = newOptimizer(paramWithConfigs, nil) + log.Info( + "init parameter", + "name", paramWithConfigs.Param.Name, + "config len", len(paramWithConfigs.Config), + "param len", len(paramWithConfigs.Param.Content), + "type", paramWithConfigs.Param.ElementType, + ) return nil } @@ -199,6 +210,7 @@ func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, _ *int) error func (s *Service) FinishInitParams(_ int, _ *int) error { select { case <-s.initialized: + log.Warn("finished init param called but parameters already initialized.") return errors.New(AlreadyInitialized) default: } @@ -209,10 +221,12 @@ func (s *Service) FinishInitParams(_ int, _ *int) error { for range t { err := s.checkpoint() if err != nil { - log.Errorln(err) + log.Error("finish init params error", log.Ctx{"error": err}) } } }() + + log.Info("init parameter finished.") return nil } @@ -222,6 +236,7 @@ func (s *Service) SendGrad(g Gradient, _ *int) error { select { case <-s.initialized: default: + log.Warn("received gradient before initialization.", "name", g.Name, "size", len(g.Content), "type", g.ElementType) return errors.New(Uninitialized) } @@ -233,6 +248,7 @@ func (s *Service) SendGrad(g Gradient, _ *int) error { return fmt.Errorf("parameter: %s does not exist", g.Name) } + log.Info("received gradient from trainer, updating gradient.", "name", g.Name, "size", len(g.Content), "type", g.ElementType) return o.UpdateParameter(g) } @@ -244,6 +260,7 @@ func (s *Service) GetParam(name string, parameter *Parameter) error { opt, ok := s.optMap[name] if !ok { + log.Warn("trainer wants to get a parameter that does not exist.", "name", name) return fmt.Errorf("parameter: %s does not exist", name) } @@ -257,12 +274,13 @@ func (s *Service) GetParam(name string, parameter *Parameter) error { parameter.Name = name parameter.ElementType = opt.elementType parameter.Content = opt.GetWeights() + log.Info("sending parameter to the trainer", "name", parameter.Name, "size", len(parameter.Content), "type", parameter.ElementType) return nil } func traceTime(start time.Time, name string) { elapsed := time.Since(start) - log.Infof("%s took %v", name, elapsed) + log.Info("time elapsed", log.Ctx{"name": name, "elapsed": elapsed}) } // checkpoint saves checkpoint to disk. @@ -270,7 +288,7 @@ func traceTime(start time.Time, name string) { // checkpoint should be only called after the parameters are // initialized. func (s *Service) checkpoint() (err error) { - log.Infoln("Begin save checkpoint.") + log.Info("Begin save checkpoint.") defer traceTime(time.Now(), "save checkpoint") s.mu.Lock() @@ -297,6 +315,13 @@ func (s *Service) checkpoint() (err error) { return } + if _, err = os.Stat(s.checkpointPath); os.IsNotExist(err) { + err = os.MkdirAll(s.checkpointPath, os.ModePerm) + if err != nil { + return + } + } + id := uuid.NewV4().String() p := path.Join(s.checkpointPath, id) f, err := os.Create(p) @@ -308,7 +333,7 @@ func (s *Service) checkpoint() (err error) { closeErr := f.Close() if closeErr != nil { if err != nil { - log.Errorln(closeErr) + log.Error("error close checkpoint file", log.Ctx{"error": closeErr}) } else { // Set closeErr as return value. err = closeErr @@ -329,7 +354,7 @@ func (s *Service) checkpoint() (err error) { oldMeta, err := loadMeta(s.client, s.idx) if err == ErrCheckpointNotFound { - log.Infoln("Do not have existing checkpoint.") + log.Info("Do not have existing checkpoint.") err = nil } @@ -361,7 +386,7 @@ func (s *Service) checkpoint() (err error) { if rmErr != nil { // log error, but still treat checkpoint as // successful. - log.Errorln(rmErr) + log.Error("remove old meta file error", log.Ctx{"error": rmErr}) } } diff --git a/paddle/framework/CMakeLists.txt b/paddle/framework/CMakeLists.txt index dbe76a8eaf..85374a476d 100644 --- a/paddle/framework/CMakeLists.txt +++ b/paddle/framework/CMakeLists.txt @@ -1,4 +1,7 @@ # ddim lib +proto_library(framework_proto SRCS framework.proto) +proto_library(saver_proto SRCS framework.proto saver.proto) + cc_library(ddim SRCS ddim.cc DEPS eigen3) cc_test(ddim_test SRCS ddim_test.cc DEPS ddim) nv_test(dim_test SRCS dim_test.cu DEPS ddim) @@ -7,8 +10,8 @@ cc_library(tensor SRCS tensor.cc DEPS ddim place paddle_memory device_context) cc_test(tensor_test SRCS tensor_test.cc DEPS tensor) cc_test(eigen_test SRCS eigen_test.cc DEPS tensor) -cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor) -cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor) +cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor saver_proto framework_proto) +cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor paddle_memory) nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor) cc_test(variable_test SRCS variable_test.cc) @@ -16,7 +19,6 @@ cc_test(variable_test SRCS variable_test.cc) cc_library(scope SRCS scope.cc) cc_test(scope_test SRCS scope_test.cc DEPS scope) -proto_library(framework_proto SRCS framework.proto) cc_library(attribute SRCS attribute.cc DEPS framework_proto) cc_test(program_desc_test SRCS program_desc_test.cc DEPS proto_desc) diff --git a/paddle/framework/framework.proto b/paddle/framework/framework.proto index 3d023535ef..8f2df3dc0e 100644 --- a/paddle/framework/framework.proto +++ b/paddle/framework/framework.proto @@ -115,6 +115,7 @@ message VarDesc { SELECTED_ROWS = 2; FEED_MINIBATCH = 3; FETCH_LIST = 4; + STEP_SCOPES = 5; } required string name = 1; required VarType type = 2; diff --git a/paddle/framework/lod_tensor.cc b/paddle/framework/lod_tensor.cc index 7c0ea0df78..f53dd1c185 100644 --- a/paddle/framework/lod_tensor.cc +++ b/paddle/framework/lod_tensor.cc @@ -13,6 +13,15 @@ limitations under the License. */ #include "paddle/framework/lod_tensor.h" +#include "paddle/framework/saver.pb.h" + +#include "paddle/memory/memcpy.h" +#include "paddle/memory/memory.h" + +#include +#include +#include +#include #include @@ -112,5 +121,140 @@ void LoDTensor::ShrinkInLevel(size_t level, size_t elem_begin, lod_ = new_lod; } +std::string LoDTensor::SerializeToString() const { + LoDTensorProto desc; + + // set data_type + if (this->type() == typeid(int8_t)) desc.set_data_type(DataType::BOOL); + if (this->type() == typeid(int16_t)) desc.set_data_type(DataType::INT16); + if (this->type() == typeid(int32_t)) desc.set_data_type(DataType::INT32); + if (this->type() == typeid(int64_t)) desc.set_data_type(DataType::INT64); + // FIXME(dzh): there is no fp16 in standard c++ + + if (this->type() == typeid(float)) // NOLINT + desc.set_data_type(DataType::FP32); + if (this->type() == typeid(double)) // NOLINT + desc.set_data_type(DataType::FP64); + + for (int i = 0; i < dims().size(); ++i) { + desc.add_dims(dims()[i]); + } + + // set lod information + desc.set_lod_level(this->NumLevels()); + for (size_t i = 0; i < this->NumLevels(); ++i) { + LoDInfo* lod = desc.add_levels(); + for (size_t j = 0; j < lod_[i].size(); ++j) { + lod->add_level(lod_[i][j]); + } + } + + desc.set_version(0); + + std::string desc_bytes = desc.SerializeAsString(); + + // FIXME(dzh) : implement fix chunk size buffer. + size_t DESC_SIZE = desc_bytes.size(); + size_t DATA_SIZE = holder_->size() - offset_; + + const size_t BUFFER_SIZE = DESC_SIZE + DATA_SIZE + 2 * sizeof(size_t); + char* buffer = + static_cast(memory::Alloc(platform::CPUPlace(), BUFFER_SIZE)); + + // format: desc_size data_size, desc_bytes, data_bytes. + platform::CPUPlace src_place; + platform::CPUPlace dst_place; + + memory::Copy(dst_place, buffer, src_place, &BUFFER_SIZE, sizeof(size_t)); + memory::Copy(dst_place, buffer + sizeof(size_t), src_place, &DESC_SIZE, + sizeof(size_t)); + memory::Copy(dst_place, buffer + sizeof(size_t) * 2, src_place, + desc_bytes.c_str(), desc_bytes.size()); + + PADDLE_ENFORCE(this->numel() != 0, "Serialize a empty Tensor!"); + + platform::Place place = holder_->place(); + int element_width = holder_->size() / this->numel(); + + if (platform::is_cpu_place(place)) { + memory::Copy(dst_place, buffer + sizeof(size_t) * 2 + desc_bytes.size(), + boost::get(place), + static_cast(holder_->ptr()) + offset_ / element_width, + DATA_SIZE); + } +#ifdef PADDLE_WITH_GPU + if (platform::is_gpu_place(place)) { + memory::Copy(dst_place, buffer + sizeof(size_t) * 2 + desc_bytes.size(), + boost::get(place), + static_cast(holder_->ptr()) + offset_ / element_width, + DATA_SIZE); + } +#endif + + std::string ret(buffer, BUFFER_SIZE); + memory::Free(platform::CPUPlace(), buffer); + return ret; +} + +void LoDTensor::DeserializeFromString(const std::string& s, + const platform::Place& dst_place) { + size_t DESC_SIZE, BUFFER_SIZE; + platform::CPUPlace src_place; + + memory::Copy(src_place, &BUFFER_SIZE, src_place, s.c_str(), sizeof(size_t)); + memory::Copy(src_place, &DESC_SIZE, src_place, s.c_str() + sizeof(size_t), + sizeof(size_t)); + + const size_t DATA_SIZE = BUFFER_SIZE - DESC_SIZE - sizeof(size_t) * 2; + + // parse LoDTensorDesc + LoDTensorProto desc; + desc.ParseFromArray(s.c_str() + sizeof(size_t) * 2, DESC_SIZE); + + std::vector dims; + std::copy(desc.dims().begin(), desc.dims().end(), std::back_inserter(dims)); + this->Resize(make_ddim(dims)); + + // parse data type + void* ptr = nullptr; + if (desc.data_type() == DataType::BOOL) + ptr = this->mutable_data(dst_place); + if (desc.data_type() == DataType::INT16) + ptr = this->mutable_data(dst_place); + if (desc.data_type() == DataType::INT32) + ptr = this->mutable_data(dst_place); + if (desc.data_type() == DataType::INT64) + ptr = this->mutable_data(dst_place); + // FIXME(dzh): there is no fp16 in standard c++ + + if (desc.data_type() == DataType::FP32) + ptr = this->mutable_data(dst_place); + if (desc.data_type() == DataType::FP64) + ptr = this->mutable_data(dst_place); + + LoD lod; + std::vector levels; + for (int i = 0; i < desc.levels().size(); ++i) { + auto current_level = desc.levels()[i].level(); + std::copy(current_level.begin(), current_level.end(), + std::back_inserter(levels)); + lod.emplace_back(levels); + levels.clear(); + } + + this->set_lod(lod); + + if (platform::is_cpu_place(dst_place)) { + memory::Copy(boost::get(dst_place), ptr, src_place, + s.c_str() + sizeof(size_t) * 2 + DESC_SIZE, DATA_SIZE); + } +#ifdef PADDLE_WITH_GPU + if (platform::is_gpu_place(dst_place)) { + memory::Copy(boost::get(dst_place), ptr, src_place, + s.c_str() + sizeof(size_t) * 2 + DESC_SIZE, DATA_SIZE); + } +#endif +} + } // namespace framework } // namespace paddle diff --git a/paddle/framework/lod_tensor.h b/paddle/framework/lod_tensor.h index dec59a5750..f78a751c53 100644 --- a/paddle/framework/lod_tensor.h +++ b/paddle/framework/lod_tensor.h @@ -25,6 +25,7 @@ #include "paddle/framework/ddim.h" #include "paddle/framework/tensor.h" #include "paddle/platform/enforce.h" +#include "paddle/platform/place.h" namespace paddle { namespace framework { @@ -132,6 +133,27 @@ class LoDTensor : public Tensor { */ void ShrinkInLevel(size_t level, size_t elem_begin, size_t elem_end); + /** + * @brief Serialize tensor to char bytes. + * Please check model_format.md for the format detail. + * NOTE: GPUTensor will copy data to cpu implicitly. + * @return return string + */ + + // FIXME(dzh) : Currently, this interface should only be used in + // save/restore model and checkpoint. ParameterServer do not use shape + // information to do the optimization, as a result, when we serialize + // parameter/gradient to string, we should serialize the tensor + // to string in the ps trainer instead of LoDTensor. + std::string SerializeToString() const; + + /** + * @brief Deserialize char bytes to tensor. + * @return return string + */ + void DeserializeFromString(const std::string& s, + const platform::Place& dst_place); + private: LoD lod_; }; diff --git a/paddle/framework/lod_tensor_test.cc b/paddle/framework/lod_tensor_test.cc index e1e15abecf..b984d62071 100644 --- a/paddle/framework/lod_tensor_test.cc +++ b/paddle/framework/lod_tensor_test.cc @@ -17,10 +17,13 @@ #include #include #include +#include namespace paddle { namespace framework { +const int kLodTensorSize = 20 * 128; + class LoDTensorTester : public ::testing::Test { public: virtual void SetUp() override { @@ -38,7 +41,10 @@ class LoDTensorTester : public ::testing::Test { lod_tensor_.Resize({20 /*batch size*/, 128 /*dim*/}); // malloc memory - lod_tensor_.mutable_data(place); + float* dst_ptr = lod_tensor_.mutable_data(place); + for (int i = 0; i < kLodTensorSize; ++i) { + dst_ptr[i] = i; + } lod_tensor_.set_lod(lod); } @@ -101,5 +107,21 @@ TEST_F(LoDTensorTester, ShrinkInLevel) { ASSERT_EQ(new_lod_tensor.data(), lod_tensor_.data()); } +TEST_F(LoDTensorTester, SerializeDeserialize) { + LoDTensor new_lod_tensor = lod_tensor_; + float* src_ptr = lod_tensor_.data(); + std::string s = lod_tensor_.SerializeToString(); + LoDTensor dst; + dst.DeserializeFromString(s, platform::CPUPlace()); + float* dst_ptr = dst.data(); + for (int i = 0; i < kLodTensorSize; ++i) { + EXPECT_EQ(dst_ptr[i], src_ptr[i]); + } + + ASSERT_EQ(dst.NumElements(0), 2UL); + ASSERT_EQ(dst.NumElements(1), 3UL); + ASSERT_EQ(dst.NumElements(2), 8UL); +} + } // namespace framework } // namespace paddle diff --git a/paddle/framework/lod_tensor_test.cu b/paddle/framework/lod_tensor_test.cu index 25041024cb..11659be02a 100644 --- a/paddle/framework/lod_tensor_test.cu +++ b/paddle/framework/lod_tensor_test.cu @@ -48,3 +48,30 @@ TEST(LoDTensor, LoDInGPU) { CHECK_EQ(lod[0].data()[i], src_lod[0].data()[i] * 2); } } + +TEST(LoDTensor, SerializeDeserialize) { + paddle::framework::LoDTensor lod_tensor; + paddle::platform::GPUPlace place(0); + + paddle::framework::LoD src_lod; + src_lod.push_back(std::vector{0, 2, 4, 6, 8, 10, 12, 14}); + + lod_tensor.Resize({14, 16}); + lod_tensor.mutable_data(place); + + lod_tensor.set_lod(src_lod); + CHECK_EQ(lod_tensor.lod_element(0, 2).first, 4UL); + CHECK_EQ(lod_tensor.lod_element(0, 4).first, 8UL); + + test<<<1, 8>>>(src_lod[0].data(), src_lod[0].size()); + cudaDeviceSynchronize(); + + std::string s = lod_tensor.SerializeToString(); + paddle::framework::LoDTensor dst; + dst.DeserializeFromString(s, place); + paddle::framework::LoD dst_lod = dst.lod(); + + for (size_t i = 0; i < dst_lod[0].size(); ++i) { + CHECK_EQ(src_lod[0].data()[i], dst_lod[0].data()[i] * 2); + } +} diff --git a/paddle/framework/saver.proto b/paddle/framework/saver.proto new file mode 100644 index 0000000000..90a191a6a7 --- /dev/null +++ b/paddle/framework/saver.proto @@ -0,0 +1,39 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +syntax = "proto2"; +option optimize_for = LITE_RUNTIME; +package paddle.framework; + +import "framework.proto"; + +/** + * This file contains necessary information for model, checkpoint. + * etc. + */ + +message LoDInfo { repeated int64 level = 1; } + +/** + * Save the LoDTensorDesc information through LoDTensorProto, its data memory + * is copyed to c buffer immediately. See model_format.md for details. + */ + +message LoDTensorProto { + optional DataType data_type = 1; + repeated int64 dims = 2; // [UNK, 640, 480] is saved as [-1, 640, 480] + repeated LoDInfo levels = 3; + optional int32 lod_level = 4 [ default = 0 ]; + optional int32 version = 5; +} diff --git a/paddle/framework/scope.cc b/paddle/framework/scope.cc index ac3ac649f9..19e25fba05 100644 --- a/paddle/framework/scope.cc +++ b/paddle/framework/scope.cc @@ -65,6 +65,23 @@ void Scope::DropKids() { kids_.clear(); } +std::vector Scope::GetAllNames(bool recursive) const { + std::vector known_vars(vars_.size()); + + if (recursive) { + for (auto& kid : kids_) { + auto kid_vars = kid->GetAllNames(); + for (auto& p : kid_vars) { + known_vars.emplace_back(p); + } + } + } + for (auto& p : vars_) { + known_vars.emplace_back(p.first); + } + return known_vars; +} + void Scope::DeleteScope(Scope* scope) { auto it = std::find(this->kids_.begin(), this->kids_.end(), scope); PADDLE_ENFORCE(it != this->kids_.end(), "Cannot find %p as kid scope", scope); diff --git a/paddle/framework/scope.h b/paddle/framework/scope.h index 7206b53068..ac334da5ef 100644 --- a/paddle/framework/scope.h +++ b/paddle/framework/scope.h @@ -17,6 +17,7 @@ limitations under the License. */ #include #include #include +#include #include "paddle/framework/variable.h" #include "paddle/platform/macros.h" @@ -64,6 +65,9 @@ class Scope { /// Drop all kids scopes belonged to this scope. void DropKids(); + // enumerate all the variables current contains. + std::vector GetAllNames(bool recursive = false) const; + private: // Call Scope::NewScope for a sub-scope. explicit Scope(Scope const* parent) : parent_(parent) {} diff --git a/paddle/framework/scope_test.cc b/paddle/framework/scope_test.cc index 7cc5e3510d..f738d5ba9e 100644 --- a/paddle/framework/scope_test.cc +++ b/paddle/framework/scope_test.cc @@ -13,6 +13,7 @@ See the License for the specific language governing permissions and limitations under the License. */ #include "paddle/framework/scope.h" +#include "glog/logging.h" #include "gtest/gtest.h" using paddle::framework::Scope; @@ -54,3 +55,17 @@ TEST(Scope, FindScope) { EXPECT_EQ(&s, s.FindScope(v)); EXPECT_EQ(&s, ss.FindScope(v)); } + +TEST(Scope, GetAllNames) { + Scope s; + Variable* v = s.Var("a"); + EXPECT_EQ(&s, s.FindScope(v)); + + std::vector ans = s.GetAllNames(); + std::string str; + for (auto& var : ans) { + str += var; + } + + EXPECT_STREQ("a", str.c_str()); +} diff --git a/paddle/framework/tensor.h b/paddle/framework/tensor.h index 3a2bdaf086..e31472327d 100644 --- a/paddle/framework/tensor.h +++ b/paddle/framework/tensor.h @@ -31,6 +31,8 @@ namespace paddle { namespace framework { +class LoDTensor; + class Tensor { public: template @@ -134,6 +136,8 @@ class Tensor { inline void check_memory_size() const; private: + friend class LoDTensor; + /** * @note Placeholder hides type T, so it doesn't appear as a template * parameter of Variable. @@ -181,7 +185,12 @@ class Tensor { /*! holds the memory block if allocated. */ std::shared_ptr holder_; - /*! points to dimensions of memory block. */ + /** + * @brief points to elements dimensions. + * + * @note dims_ do not indicate the memory block size. + */ + DDim dims_; /** diff --git a/paddle/operators/CMakeLists.txt b/paddle/operators/CMakeLists.txt index f97bc837dc..d2d70d8be7 100644 --- a/paddle/operators/CMakeLists.txt +++ b/paddle/operators/CMakeLists.txt @@ -69,6 +69,13 @@ function(op_library TARGET) file(APPEND ${pybind_file} "USE_OP(max_pool2d_with_index);\n") endif() + # save_restore_op contains several operators + if ("${TARGET}" STREQUAL "save_restore_op") + set(pybind_flag 1) + # It's enough to just adding one operator to pybind + file(APPEND ${pybind_file} "USE_NO_KERNEL_OP(save);\n") + endif() + # activation_op contains several operators if ("${TARGET}" STREQUAL "activation_op") set(pybind_flag 1) diff --git a/paddle/operators/batch_norm_op.cc b/paddle/operators/batch_norm_op.cc new file mode 100644 index 0000000000..f7dc990f0d --- /dev/null +++ b/paddle/operators/batch_norm_op.cc @@ -0,0 +1,412 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/operators/batch_norm_op.h" + +namespace paddle { +namespace operators { + +using Tensor = framework::Tensor; +template +using EigenMatrix = framework::EigenMatrix; + +template +using EigenArrayMap = + Eigen::Map>; +template +using ConstEigenArrayMap = + Eigen::Map>; +template +using EigenVectorArrayMap = Eigen::Map>; +template +using ConstEigenVectorArrayMap = + Eigen::Map>; + +class BatchNormOp : public framework::OperatorWithKernel { + public: + using framework::OperatorWithKernel::OperatorWithKernel; + + void InferShape(framework::InferShapeContext *ctx) const override { + PADDLE_ENFORCE(ctx->HasInput("X"), ""); + PADDLE_ENFORCE(ctx->HasInput("Scale"), ""); + PADDLE_ENFORCE(ctx->HasInput("Bias"), ""); + PADDLE_ENFORCE(ctx->HasInput("Mean"), ""); + PADDLE_ENFORCE(ctx->HasInput("Variance"), ""); + PADDLE_ENFORCE(ctx->HasOutput("Y"), ""); + PADDLE_ENFORCE(ctx->HasOutput("MeanOut"), ""); + PADDLE_ENFORCE(ctx->HasOutput("VarianceOut"), ""); + PADDLE_ENFORCE(ctx->HasOutput("SavedMean"), ""); + PADDLE_ENFORCE(ctx->HasOutput("SavedVariance"), ""); + + // make sure Mean/MeanOut and Variance/VarianceOut share memory in Python + PADDLE_ENFORCE_EQ(ctx->Inputs("Mean")[0], ctx->Outputs("MeanOut")[0], + "Mean and MeanOut should share the same memory"); + PADDLE_ENFORCE_EQ(ctx->Inputs("Variance")[0], + ctx->Outputs("VarianceOut")[0], + "Variance and VarianceOut should share the same memory"); + + const auto x_dims = ctx->GetInputDim("X"); + const TensorFormat tensor_format = + StringToTensorFormat(ctx->Attrs().Get("tensor_format")); + const int C = + (tensor_format == TensorFormat::NCHW ? x_dims[1] + : x_dims[x_dims.size() - 1]); + + PADDLE_ENFORCE_EQ(ctx->GetInputDim("Scale").size(), 1UL); + PADDLE_ENFORCE_EQ(ctx->GetInputDim("Scale")[0], C); + PADDLE_ENFORCE_EQ(ctx->GetInputDim("Bias").size(), 1UL); + PADDLE_ENFORCE_EQ(ctx->GetInputDim("Bias")[0], C); + + ctx->SetOutputDim("Y", x_dims); + ctx->SetOutputDim("MeanOut", {C}); + ctx->SetOutputDim("VarianceOut", {C}); + ctx->SetOutputDim("SavedMean", {C}); + ctx->SetOutputDim("SavedVariance", {C}); + } +}; + +class BatchNormOpMaker : public framework::OpProtoAndCheckerMaker { + public: + BatchNormOpMaker(framework::OpProto *proto, + framework::OpAttrChecker *op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddAttr("is_test", "").SetDefault(false); + AddAttr("momentum", "").SetDefault(0.9); + AddAttr("epsilon", "").SetDefault(1e-5); + AddAttr("tensor_format", "").SetDefault("NCHW"); + AddInput("X", "The input tensor"); + AddInput("Scale", + "Scale is a 1-dimensional tensor of size C " + "to be applied to the output"); + AddInput("Bias", + "Bias is a 1-dimensional tensor of size C " + "to be applied to the output"); + AddInput("Mean", + "The global mean (for training) or the " + "estimated mean (for testing)"); + AddInput("Variance", + "The global variance (for training) " + "or the estimated Variance (for testing)"); + AddOutput("Y", "result after normalization"); + AddOutput("MeanOut", + "Share memory with Mean. " + "Store the global mean when training"); + AddOutput("VarianceOut", + "Share memory with Variance. " + "Store the global Variance when training"); + AddOutput("SavedMean", + "Mean of the current mini batch, " + "will apply to output when training"); + AddOutput("SavedVariance", + "Variance of the current mini batch, " + "will apply to output when training"); + AddComment(R"DOC( +https://arxiv.org/pdf/1502.03167.pdf + +NHWC `[batch, in_height, in_width, in_channels]` +NCHW `[batch, in_channels, in_height, in_width]` + +)DOC"); + } +}; + +template +class BatchNormKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext &ctx) const override { + const float epsilon = ctx.Attr("epsilon"); + const float momentum = ctx.Attr("momentum"); + const bool is_test = ctx.Attr("is_test"); + const std::string tensor_format_str = + ctx.Attr("tensor_format"); + const TensorFormat tensor_format = StringToTensorFormat(tensor_format_str); + + const auto *x = ctx.Input("X"); + const auto &x_dims = x->dims(); + + PADDLE_ENFORCE(x_dims.size() >= 3 && x_dims.size() <= 5, + "The Input dim size should be between 3 and 5"); + const int N = x_dims[0]; + const int C = + (tensor_format == TensorFormat::NCHW ? x_dims[1] + : x_dims[x_dims.size() - 1]); + const int sample_size = x->numel() / N / C; + + auto *y = ctx.Output("Y"); + auto *mean_out = ctx.Output("MeanOut"); + auto *variance_out = ctx.Output("VarianceOut"); + auto *saved_mean = ctx.Output("SavedMean"); + auto *saved_variance = ctx.Output("SavedVariance"); + + // alloc memory + y->mutable_data(ctx.GetPlace()); + mean_out->mutable_data(ctx.GetPlace()); + variance_out->mutable_data(ctx.GetPlace()); + saved_mean->mutable_data(ctx.GetPlace()); + saved_variance->mutable_data(ctx.GetPlace()); + + if (!is_test) { + // saved_xx is use just in this batch of data + EigenVectorArrayMap saved_mean_e( + saved_mean->mutable_data(ctx.GetPlace()), C); + EigenVectorArrayMap saved_variance_e( + saved_variance->mutable_data(ctx.GetPlace()), C); + saved_mean_e.setZero(); + saved_variance_e.setZero(); + + switch (tensor_format) { + case TensorFormat::NCHW: { + ConstEigenArrayMap x_arr(x->data(), sample_size, N * C); + for (int nc = 0; nc < N * C; ++nc) { + saved_mean_e(nc % C) += x_arr.col(nc).sum(); + } + saved_mean_e /= N * sample_size; + for (int nc = 0; nc < N * C; ++nc) { + saved_variance_e(nc % C) += + (x_arr.col(nc) - saved_mean_e(nc % C)).matrix().squaredNorm(); + } + saved_variance_e /= N * sample_size; + break; + } + case TensorFormat::NHWC: { + ConstEigenArrayMap x_arr(x->data(), C, N * sample_size); + for (int i = 0; i < N * sample_size; ++i) { + saved_mean_e += x_arr.col(i); + } + saved_mean_e /= N * sample_size; + for (int i = 0; i < N * sample_size; ++i) { + saved_variance_e += + (x_arr.col(i) - saved_mean_e) * (x_arr.col(i) - saved_mean_e); + } + saved_variance_e /= N * sample_size; + break; + } + default: + PADDLE_THROW("Unknown storage order: %s", tensor_format_str); + } + + EigenVectorArrayMap running_mean_arr( + mean_out->mutable_data(ctx.GetPlace()), C); + EigenVectorArrayMap running_var_arr( + variance_out->mutable_data(ctx.GetPlace()), C); + running_mean_arr = + running_mean_arr * momentum + saved_mean_e * (1. - momentum); + running_var_arr = + running_var_arr * momentum + saved_variance_e * (1. - momentum); + } + + // use SavedMean and SavedVariance to do normalize + Eigen::Array inv_std(C); + if (is_test) { + ConstEigenVectorArrayMap var_arr( + ctx.Input("Variance")->data(), C); + inv_std = (var_arr + epsilon).sqrt().inverse(); + } else { + EigenVectorArrayMap saved_inv_std( + ctx.Output("SavedVariance")->data(), C); + // inverse SavedVariance first, gradient will use it too. + saved_inv_std = (saved_inv_std + epsilon).inverse().sqrt(); + inv_std = saved_inv_std; + } + ConstEigenVectorArrayMap mean_arr( + is_test ? ctx.Input("Mean")->data() + : ctx.Output("SavedMean")->data(), + C); + + // ((x - est_mean) * (inv_var) * scale + bias + // formula transform ====> + // (x * inv_var * scale) + (bias - est_mean * inv_var * scale) + const auto *scale = ctx.Input("Scale"); + const auto *bias = ctx.Input("Bias"); + ConstEigenVectorArrayMap scale_arr(scale->data(), C); + ConstEigenVectorArrayMap bias_arr(bias->data(), C); + Eigen::Array new_scale = inv_std * scale_arr; + Eigen::Array new_bias = + bias_arr - mean_arr * inv_std * scale_arr; + + switch (tensor_format) { + case TensorFormat::NCHW: { + EigenArrayMap y_arr(y->mutable_data(ctx.GetPlace()), sample_size, + N * C); + ConstEigenArrayMap x_arr(x->data(), sample_size, N * C); + for (int nc = 0; nc < N * C; ++nc) { + y_arr.col(nc) = x_arr.col(nc) * new_scale(nc % C) + new_bias(nc % C); + } + break; + } + case TensorFormat::NHWC: { + EigenArrayMap(y->mutable_data(ctx.GetPlace()), C, + N * sample_size) = + (ConstEigenArrayMap(x->data(), C, N * sample_size).colwise() * + new_scale) + .colwise() + + new_bias; + break; + } + default: + PADDLE_THROW("Unknown storage order: %d", tensor_format); + } + } +}; + +class BatchNormGradOp : public framework::OperatorWithKernel { + public: + using framework::OperatorWithKernel::OperatorWithKernel; + + void InferShape(framework::InferShapeContext *ctx) const override { + // check input + PADDLE_ENFORCE(ctx->HasInput("X")); + PADDLE_ENFORCE(ctx->HasInput("Scale"), ""); + PADDLE_ENFORCE(ctx->HasInput(framework::GradVarName("Y")), ""); + PADDLE_ENFORCE(ctx->HasInput("SavedMean"), ""); + PADDLE_ENFORCE(ctx->HasInput("SavedVariance"), ""); + + // check output + PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("X")), ""); + PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("Scale")), ""); + PADDLE_ENFORCE(ctx->HasOutput(framework::GradVarName("Bias")), ""); + + const auto x_dims = ctx->GetInputDim("X"); + const TensorFormat tensor_format = + StringToTensorFormat(ctx->Attrs().Get("tensor_format")); + const int C = + (tensor_format == TensorFormat::NCHW ? x_dims[1] + : x_dims[x_dims.size() - 1]); + + ctx->SetOutputDim(framework::GradVarName("X"), x_dims); + ctx->SetOutputDim(framework::GradVarName("Scale"), {C}); + ctx->SetOutputDim(framework::GradVarName("Bias"), {C}); + } +}; + +template +class BatchNormGradKernel + : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext &ctx) const override { + const auto *x = ctx.Input("X"); + const auto *d_y = ctx.Input(framework::GradVarName("Y")); + const auto *scale = ctx.Input("Scale"); + const auto *saved_mean = ctx.Input("SavedMean"); + // SavedVariance have been reverted in forward operator + const auto *saved_inv_variance = ctx.Input("SavedVariance"); + const std::string tensor_format_str = + ctx.Attr("tensor_format"); + const TensorFormat tensor_format = StringToTensorFormat(tensor_format_str); + + // Get the size for each dimension. + // NCHW [batch_size, in_channels, in_height, in_width] + const auto &x_dims = x->dims(); + PADDLE_ENFORCE(x_dims.size() >= 3 && x_dims.size() <= 5, + "The Input dim size should be between 3 and 5"); + const int N = x_dims[0]; + const int C = + (tensor_format == TensorFormat::NCHW ? x_dims[1] + : x_dims[x_dims.size() - 1]); + const int sample_size = x->numel() / N / C; + + ConstEigenVectorArrayMap scale_arr(scale->data(), C); + ConstEigenVectorArrayMap mean_arr(saved_mean->data(), C); + ConstEigenVectorArrayMap inv_var_arr(saved_inv_variance->data(), C); + + // init output + auto *d_x = ctx.Output(framework::GradVarName("X")); + auto *d_scale = ctx.Output(framework::GradVarName("Scale")); + auto *d_bias = ctx.Output(framework::GradVarName("Bias")); + + d_x->mutable_data(ctx.GetPlace()); + d_scale->mutable_data(ctx.GetPlace()); + d_bias->mutable_data(ctx.GetPlace()); + + // d_bias = np.sum(d_y, axis=0) + // d_scale = np.sum((X - mean) / inv_std * dy, axis=0) + // d_x = (1. / N) * scale * inv_var * (N * d_y - np.sum(d_y, axis=0) + // - (X - mean) * inv_var * inv_var * np.sum(d_y * (X - mean), axis=0)) + + EigenVectorArrayMap d_bias_arr(d_bias->mutable_data(ctx.GetPlace()), + C); + EigenVectorArrayMap d_scale_arr(d_scale->mutable_data(ctx.GetPlace()), + C); + + d_bias_arr.setZero(); + d_scale_arr.setZero(); + + const auto scale_inv_var_nhw = scale_arr * inv_var_arr / (N * sample_size); + + switch (tensor_format) { + case TensorFormat::NCHW: { + ConstEigenArrayMap x_arr(x->data(), sample_size, N * C); + ConstEigenArrayMap d_y_arr(d_y->data(), sample_size, N * C); + EigenArrayMap d_x_arr(d_x->mutable_data(ctx.GetPlace()), + sample_size, N * C); + d_x_arr.setZero(); + + for (int nc = 0; nc < N * C; ++nc) { + int c = nc % C; + d_bias_arr(c) += d_y_arr.col(nc).sum(); + d_scale_arr(c) += + ((x_arr.col(nc) - mean_arr(c)) * inv_var_arr(c) * d_y_arr.col(nc)) + .sum(); + } + for (int nc = 0; nc < N * C; ++nc) { + int c = nc % C; + d_x_arr.col(nc) += + scale_inv_var_nhw(c) * + (d_y_arr.col(nc) * N * sample_size - d_bias_arr(c) - + (x_arr.col(nc) - mean_arr[c]) * d_scale_arr(c) * inv_var_arr(c)); + } + break; + } + case TensorFormat::NHWC: { + ConstEigenArrayMap x_arr(x->data(), C, N * sample_size); + ConstEigenArrayMap d_y_arr(d_y->data(), C, N * sample_size); + EigenArrayMap d_x_arr(d_x->mutable_data(ctx.GetPlace()), C, + N * sample_size); + d_x_arr.setZero(); + + const auto d_y_row_sum = d_y_arr.rowwise().sum(); + const auto x_minus_mean = x_arr.colwise() - mean_arr; + const auto d_y_mul_x_minus_mean_row_sum = + (d_y_arr * x_minus_mean).rowwise().sum(); + const auto inv_var_sqr = inv_var_arr * inv_var_arr; + for (int nhw = 0; nhw < N * sample_size; ++nhw) { + d_bias_arr += d_y_arr.col(nhw); + d_scale_arr += + (x_arr.col(nhw) - mean_arr) * inv_var_arr * d_y_arr.col(nhw); + d_x_arr.col(nhw) += + scale_inv_var_nhw * + (d_y_arr.col(nhw) * N * sample_size - d_y_row_sum - + x_minus_mean.col(nhw) * inv_var_sqr * + d_y_mul_x_minus_mean_row_sum); + } + break; + } + default: + PADDLE_THROW("Unknown storage order: %s", tensor_format_str); + } + } +}; + +} // namespace operators +} // namespace paddle + +namespace ops = paddle::operators; +REGISTER_OP(batch_norm, ops::BatchNormOp, ops::BatchNormOpMaker, + batch_norm_grad, ops::BatchNormGradOp); +REGISTER_OP_CPU_KERNEL(batch_norm, + ops::BatchNormKernel); +REGISTER_OP_CPU_KERNEL( + batch_norm_grad, + ops::BatchNormGradKernel); diff --git a/paddle/operators/batch_norm_op.h b/paddle/operators/batch_norm_op.h new file mode 100644 index 0000000000..4e80134a1a --- /dev/null +++ b/paddle/operators/batch_norm_op.h @@ -0,0 +1,50 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once +#include "paddle/framework/eigen.h" +#include "paddle/framework/op_registry.h" + +namespace paddle { +namespace operators { + +enum TensorFormat { + NHWC = 0, + NCHW = 1, +}; + +inline TensorFormat StringToTensorFormat(const std::string& str) { + if (str == "NHWC" || str == "nhwc") { + return TensorFormat::NHWC; + } else if (str == "NCHW" || str == "nchw") { + return TensorFormat::NCHW; + } else { + PADDLE_THROW("Unknown storage order string: %s", str); + } +} + +template +class BatchNormKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override; +}; + +template +class BatchNormGradKernel : public framework::OpKernel { + public: + void Compute(const framework::ExecutionContext& ctx) const override; +}; + +} // namespace operators +} // namespace paddle diff --git a/paddle/operators/save_restore_op.cc b/paddle/operators/save_restore_op.cc new file mode 100644 index 0000000000..314e4e9279 --- /dev/null +++ b/paddle/operators/save_restore_op.cc @@ -0,0 +1,147 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve. + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ + +#include "paddle/framework/eigen.h" +#include "paddle/framework/op_registry.h" + +#include + +namespace paddle { +namespace operators { + +using framework::Tensor; +using framework::LoDTensor; + +inline static std::string VarToFileName(const std::string& folder_path, + const std::string& var_name) { + return folder_path + "/__" + var_name + "__"; +} + +class SaveOp : public framework::OperatorBase { + public: + SaveOp(const std::string& type, const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + void Run(const framework::Scope& scope, + const platform::DeviceContext& dev_ctx) const override { + const auto& var_names = this->Inputs("X"); + for (const auto& name : var_names) { + PADDLE_ENFORCE_NOT_NULL(scope.FindVar(name), + "Can not find variable '%s' in the scope.", name); + } + std::string folder_path = this->Attr("folderPath"); + PADDLE_ENFORCE(!folder_path.empty(), + "'folderPath' of SaveOp shouldn't be empty."); + + VLOG(1) << "Save variables to folder: " << folder_path; + for (const auto& name : var_names) { + std::string file_name = VarToFileName(folder_path, name); + std::ofstream fout(file_name, std::ofstream::out); + PADDLE_ENFORCE(fout.is_open(), "Fail to create file %s.", file_name); + const LoDTensor& tensor = scope.FindVar(name)->Get(); + std::string bytes = tensor.SerializeToString(); + fout << bytes; + fout.close(); + } + VLOG(1) << "Compelete saving variables. Items count: " << var_names.size(); + } +}; + +class SaveOpMaker : public framework::OpProtoAndCheckerMaker { + public: + SaveOpMaker(framework::OpProto* proto, framework::OpAttrChecker* op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddInput("X", + "(tensor), the tensor count can be 1~INT_MAX, tensors names which " + "values will be saved.") + .AsDuplicable(); + AddAttr("folderPath", "the folderPath for save model."); + AddComment(R"DOC( +Save the input tensors to a binary file based on input tensor names and absolute path. + +All the inputs can carry the LoD (Level of Details) information, +or not. +)DOC"); + } +}; + +class RestoreOp : public framework::OperatorBase { + public: + RestoreOp(const std::string& type, const framework::VariableNameMap& inputs, + const framework::VariableNameMap& outputs, + const framework::AttributeMap& attrs) + : OperatorBase(type, inputs, outputs, attrs) {} + + void Run(const framework::Scope& scope, + const platform::DeviceContext& dev_ctx) const override { + const auto& var_names = this->Outputs("Out"); + for (const auto& name : var_names) { + PADDLE_ENFORCE_NOT_NULL(scope.FindVar(name), + "Can not find variable '%s' in the scope.", name); + } + std::string folder_path = this->Attr("folderPath"); + PADDLE_ENFORCE(!folder_path.empty(), + "'folderPath' of RestoreOp shouldn't be empty."); + + VLOG(1) << "Try loading variables from folder: " << folder_path; + + for (const auto& name : var_names) { + std::string file_name = VarToFileName(folder_path, name); + std::ifstream fin(file_name, std::ifstream::in); + PADDLE_ENFORCE(fin.is_open(), "Fail to open file %s.", file_name); + const size_t kBufferSize = 4096; // equal to linux page size + char buffer[kBufferSize]; + std::string cache; + while (!fin.eof()) { + fin.read(buffer, kBufferSize); + cache.append(buffer, fin.gcount()); + } + LoDTensor* tensor = scope.FindVar(name)->GetMutable(); + tensor->DeserializeFromString(cache, dev_ctx.GetPlace()); + fin.close(); + } + VLOG(1) << "Complete loading variables."; + } +}; + +class RestoreOpMaker : public framework::OpProtoAndCheckerMaker { + public: + RestoreOpMaker(framework::OpProto* proto, + framework::OpAttrChecker* op_checker) + : OpProtoAndCheckerMaker(proto, op_checker) { + AddOutput("Out", + "(tensor), the tensor count can be 1~INT_MAX, tensors which " + "values will be restores.") + .AsDuplicable(); + AddAttr("folderPath", "the folderPath for model file."); + AddAttr("data_type", "output tensor data type") + .SetDefault(framework::DataType::FP32); + AddComment(R"DOC( +Restore the tensors from model file based on absolute path. + +All the tensors outputs may carry the LoD (Level of Details) information, +or not. +)DOC"); + } +}; + +} // namespace operators +} // namespace paddle + +REGISTER_OPERATOR(save, paddle::operators::SaveOp, + paddle::framework::EmptyGradOpMaker, + paddle::operators::SaveOpMaker); + +REGISTER_OPERATOR(restore, paddle::operators::RestoreOp, + paddle::framework::EmptyGradOpMaker, + paddle::operators::RestoreOpMaker); diff --git a/paddle/optimizer/adadelta_optimizer.cc b/paddle/optimizer/adadelta_optimizer.cc index 6eec5d846f..34913c4050 100644 --- a/paddle/optimizer/adadelta_optimizer.cc +++ b/paddle/optimizer/adadelta_optimizer.cc @@ -25,19 +25,17 @@ void AdadeltaOptimizer::Update(const Tensor* gradient) { } } -const char* AdadeltaOptimizer::SerializeState(int* state_len) { +std::string AdadeltaOptimizer::SerializeState() { AdadeltaOptimizerState state; state.set_num_sample_passed(num_sample_passed_); - std::string lr_str = this->lr_policy_->SerializeState(state_len); + std::string lr_str = this->lr_policy_->SerializeState(); state.mutable_lr_state()->ParseFromString(lr_str); TensorToProto(*parameter_, state.mutable_parameter()); TensorToProto(*accum_gradient_, state.mutable_accum_gradient()); TensorToProto(*accum_delta_, state.mutable_accum_delta()); TensorToProto(*update_delta_, state.mutable_update_delta()); - auto str = state.SerializeAsString(); - *state_len += str.size(); - return str.c_str(); + return state.SerializeAsString(); } void AdadeltaOptimizer::DeserializeState(const std::string& str) { diff --git a/paddle/optimizer/adadelta_optimizer.h b/paddle/optimizer/adadelta_optimizer.h index 1d5eab097f..bc634ee46d 100644 --- a/paddle/optimizer/adadelta_optimizer.h +++ b/paddle/optimizer/adadelta_optimizer.h @@ -23,7 +23,7 @@ public: if (update_delta_) delete update_delta_; } void Update(const Tensor *gradient); - const char *SerializeState(int *state_len); + std::string SerializeState(); void DeserializeState(const std::string &state); private: diff --git a/paddle/optimizer/adagrad_optimizer.cc b/paddle/optimizer/adagrad_optimizer.cc index 5b92610ac5..d915ffb870 100644 --- a/paddle/optimizer/adagrad_optimizer.cc +++ b/paddle/optimizer/adagrad_optimizer.cc @@ -17,17 +17,15 @@ void AdagradOptimizer::Update(const Tensor* gradient) { learning_rate * decay_ * param[i]; } } -const char* AdagradOptimizer::SerializeState(int* state_len) { +std::string AdagradOptimizer::SerializeState() { AdagradOptimizerState state; state.set_num_sample_passed(num_sample_passed_); - std::string lr_str = this->lr_policy_->SerializeState(state_len); + std::string lr_str = this->lr_policy_->SerializeState(); state.mutable_lr_state()->ParseFromString(lr_str); TensorToProto(*parameter_, state.mutable_parameter()); TensorToProto(*accum_gradient_, state.mutable_accum_gradient()); - auto str = state.SerializeAsString(); - *state_len += str.size(); - return str.c_str(); + return state.SerializeAsString(); } void AdagradOptimizer::DeserializeState(const std::string& str) { diff --git a/paddle/optimizer/adagrad_optimizer.h b/paddle/optimizer/adagrad_optimizer.h index 15d0a965ad..b2935f8aff 100644 --- a/paddle/optimizer/adagrad_optimizer.h +++ b/paddle/optimizer/adagrad_optimizer.h @@ -19,7 +19,7 @@ public: if (accum_gradient_) delete accum_gradient_; } void Update(const Tensor *gradient); - const char *SerializeState(int *state_len); + std::string SerializeState(); void DeserializeState(const std::string &state); private: diff --git a/paddle/optimizer/adam_optimizer.cc b/paddle/optimizer/adam_optimizer.cc index 1ebb6b1e0f..18e5896a22 100644 --- a/paddle/optimizer/adam_optimizer.cc +++ b/paddle/optimizer/adam_optimizer.cc @@ -22,18 +22,16 @@ void AdamOptimizer::Update(const Tensor *gradient) { } } -const char *AdamOptimizer::SerializeState(int *state_len) { +std::string AdamOptimizer::SerializeState() { AdamOptimizerState state; - std::string lr_str = this->lr_policy_->SerializeState(state_len); + std::string lr_str = this->lr_policy_->SerializeState(); state.mutable_lr_state()->ParseFromString(lr_str); state.set_num_sample_passed(num_sample_passed_); TensorToProto(*parameter_, state.mutable_parameter()); TensorToProto(*momentums_, state.mutable_momentums()); TensorToProto(*velocitys_, state.mutable_velocitys()); - auto str = state.SerializeAsString(); - *state_len += str.size(); - return str.c_str(); + return state.SerializeAsString(); } void AdamOptimizer::DeserializeState(const std::string &str) { diff --git a/paddle/optimizer/adam_optimizer.h b/paddle/optimizer/adam_optimizer.h index 0ea4c8bb84..d25cdc0731 100644 --- a/paddle/optimizer/adam_optimizer.h +++ b/paddle/optimizer/adam_optimizer.h @@ -25,7 +25,7 @@ public: if (velocitys_) delete velocitys_; } void Update(const Tensor *gradient); - const char *SerializeState(int *state_len); + std::string SerializeState(); void DeserializeState(const std::string &state); private: diff --git a/paddle/optimizer/lr_policy.h b/paddle/optimizer/lr_policy.h index 036c376e10..bbb1ee4821 100644 --- a/paddle/optimizer/lr_policy.h +++ b/paddle/optimizer/lr_policy.h @@ -10,7 +10,7 @@ class LrPolicy { public: virtual ~LrPolicy() {} virtual double LearningRate(const uint64_t num_sample_passed) = 0; - virtual const char *SerializeState(int *state_len) = 0; + virtual std::string SerializeState() = 0; virtual void DeserializeState(const std::string &state) = 0; }; @@ -21,12 +21,10 @@ public: double LearningRate(const uint64_t num_sample_passed) { return learning_rate_; } - const char *SerializeState(int *state_len) { + std::string SerializeState() { LrPolicyState state; state.set_learning_rate(learning_rate_); - auto str = state.SerializeAsString(); - *state_len = str.size(); - return str.c_str(); + return state.SerializeAsString(); } void DeserializeState(const std::string &str) { LrPolicyState state; @@ -46,14 +44,12 @@ public: return std::max(learning_rate_ - lr_decay_a_ * num_sample_passed, lr_decay_b_); } - const char *SerializeState(int *state_len) { + std::string SerializeState() { LrPolicyState state; state.set_learning_rate(learning_rate_); state.set_lr_decay_a(lr_decay_a_); state.set_lr_decay_b(lr_decay_b_); - auto str = state.SerializeAsString(); - *state_len = str.size(); - return str.c_str(); + return state.SerializeAsString(); } void DeserializeState(const std::string &str) { LrPolicyState state; diff --git a/paddle/optimizer/optimizer.cc b/paddle/optimizer/optimizer.cc index eb7125adee..a2af139d01 100644 --- a/paddle/optimizer/optimizer.cc +++ b/paddle/optimizer/optimizer.cc @@ -1,4 +1,7 @@ #include "optimizer.h" +#include +#include +#include #include #include "parameter_optimizer.h" @@ -78,7 +81,13 @@ int paddle_optimizer_get_weights(paddle_optimizer* o, void** param_buffer) { } int paddle_optimizer_get_state(paddle_optimizer* o, const char** state) { - int state_len = 0; - *state = o->impl->SerializeState(&state_len); + std::string s = o->impl->SerializeState(); + int state_len = s.size(); + + if (state_len > 0) { + *state = (char*)std::malloc(state_len); + std::memcpy((void*)*state, (const void*)s.c_str(), state_len); + } + return state_len; } diff --git a/paddle/optimizer/parameter_optimizer.cc b/paddle/optimizer/parameter_optimizer.cc index f621803792..db0714635f 100644 --- a/paddle/optimizer/parameter_optimizer.cc +++ b/paddle/optimizer/parameter_optimizer.cc @@ -32,6 +32,7 @@ ParameterOptimizer *ParameterOptimizer::Create(const std::string &config_proto, Tensor *parameter, const OptimizerConfig &config) -> ParameterOptimizer * { if (config.optimizer() == OptimizerConfig::SGD) { + LOG(INFO) << "creating SGD optimizer"; return new SGDOptimizer(parameter, lr, config.sgd().momentum(), @@ -39,6 +40,7 @@ ParameterOptimizer *ParameterOptimizer::Create(const std::string &config_proto, config.sgd().nesterov()); } if (config.optimizer() == OptimizerConfig::Adadelta) { + LOG(INFO) << "creating Adadelta optimizer"; return new AdadeltaOptimizer(parameter, lr, config.adadelta().rho(), @@ -46,10 +48,12 @@ ParameterOptimizer *ParameterOptimizer::Create(const std::string &config_proto, config.adadelta().decay()); } if (config.optimizer() == OptimizerConfig::Adagrad) { + LOG(INFO) << "creating Adagrad optimizer"; return new AdagradOptimizer( parameter, lr, config.adagrad().epsilon(), config.adagrad().decay()); } if (config.optimizer() == OptimizerConfig::Adam) { + LOG(INFO) << "creating Adam optimizer"; return new AdamOptimizer(parameter, lr, config.adam().beta_1(), diff --git a/paddle/optimizer/parameter_optimizer.h b/paddle/optimizer/parameter_optimizer.h index d89c9abb79..8319f84e1b 100644 --- a/paddle/optimizer/parameter_optimizer.h +++ b/paddle/optimizer/parameter_optimizer.h @@ -28,7 +28,7 @@ public: Tensor *parameter); virtual void Update(const Tensor *gradient) = 0; virtual float *get_weight(int *param_size) const; - virtual const char *SerializeState(int *state_len) = 0; + virtual std::string SerializeState() = 0; virtual void DeserializeState(const std::string &state) = 0; protected: diff --git a/paddle/optimizer/parameter_optimizer_test.cpp b/paddle/optimizer/parameter_optimizer_test.cpp index edf4ae37a9..c88fa11748 100644 --- a/paddle/optimizer/parameter_optimizer_test.cpp +++ b/paddle/optimizer/parameter_optimizer_test.cpp @@ -85,6 +85,7 @@ public: for (size_t i = 0; i < opts_.size(); ++i) { int s = 0; float* newp = (float*)opts_[i]->get_weight(&s); + EXPECT_EQ(s, kSize); for (size_t j = 0; j < kSize; ++j) { EXPECT_EQ(newp[j], (*p)[j]); } @@ -99,10 +100,20 @@ public: } void TestCheckPoint() { + paddle::optimizer::Tensor* p = FixedTensor(kSize); for (size_t i = 0; i < opts_.size(); ++i) { - int state_len = 0; - std::string state = opts_[i]->SerializeState(&state_len); + auto state = opts_[i]->SerializeState(); + opts_[i]->DeserializeState(state); + auto state1 = opts_[i]->SerializeState(); opts_[i]->DeserializeState(state); + EXPECT_EQ(state, state1); + + int s = 0; + float* newp = (float*)opts_[i]->get_weight(&s); + EXPECT_EQ(s, kSize); + for (size_t j = 0; j < kSize; ++j) { + EXPECT_EQ(newp[j], (*p)[j]); + } } } diff --git a/paddle/optimizer/serialization_test.cpp b/paddle/optimizer/serialization_test.cpp index e4d97cbdba..4c416f55ee 100644 --- a/paddle/optimizer/serialization_test.cpp +++ b/paddle/optimizer/serialization_test.cpp @@ -21,7 +21,22 @@ TEST(TensorToProto, Case1) { paddle::optimizer::Tensor t(3), t1(3); for (size_t i = 0; i < t.size(); ++i) { t[i] = i; - t1[i] = 0; + t1[i] = 10; + } + + paddle::TensorProto proto; + paddle::optimizer::TensorToProto(t, &proto); + paddle::optimizer::ProtoToTensor(proto, &t1); + for (size_t i = 0; i < t1.size(); ++i) { + EXPECT_EQ(t1[i], t[i]); + } +} + +TEST(TensorToProto, Case2) { + paddle::optimizer::Tensor t(1), t1(1); + for (size_t i = 0; i < t.size(); ++i) { + t[i] = i; + t1[i] = 10; } paddle::TensorProto proto; diff --git a/paddle/optimizer/sgd_optimizer.cc b/paddle/optimizer/sgd_optimizer.cc index 15418faa84..bf2540ecb0 100644 --- a/paddle/optimizer/sgd_optimizer.cc +++ b/paddle/optimizer/sgd_optimizer.cc @@ -27,16 +27,14 @@ void SGDOptimizer::Update(const Tensor *gradient) { } } -const char *SGDOptimizer::SerializeState(int *state_len) { +std::string SGDOptimizer::SerializeState() { SGDOptimizerState state; state.set_num_sample_passed(num_sample_passed_); - std::string lr_str = this->lr_policy_->SerializeState(state_len); + std::string lr_str = this->lr_policy_->SerializeState(); state.mutable_lr_state()->ParseFromString(lr_str); TensorToProto(*parameter_, state.mutable_parameter()); if (momentum_ != 0.0) TensorToProto(*momentums_, state.mutable_momentums()); - auto str = state.SerializeAsString(); - *state_len += str.size(); - return str.c_str(); + return state.SerializeAsString(); } void SGDOptimizer::DeserializeState(const std::string &str) { diff --git a/paddle/optimizer/sgd_optimizer.h b/paddle/optimizer/sgd_optimizer.h index b74a902e1a..6e1a0f0d3f 100644 --- a/paddle/optimizer/sgd_optimizer.h +++ b/paddle/optimizer/sgd_optimizer.h @@ -23,7 +23,7 @@ public: if (momentums_) delete momentums_; } void Update(const Tensor* gradient); - const char* SerializeState(int* state_len); + std::string SerializeState(); void DeserializeState(const std::string& state); private: diff --git a/paddle/pybind/protobuf.cc b/paddle/pybind/protobuf.cc index 5d43ecea11..6bf6eb9fd4 100644 --- a/paddle/pybind/protobuf.cc +++ b/paddle/pybind/protobuf.cc @@ -224,7 +224,8 @@ void BindVarDsec(py::module &m) { .value("LOD_TENSOR", VarDesc::LOD_TENSOR) .value("SELECTED_ROWS", VarDesc::SELECTED_ROWS) .value("FEED_MINIBATCH", VarDesc::FEED_MINIBATCH) - .value("FETCH_LIST", VarDesc::FETCH_LIST); + .value("FETCH_LIST", VarDesc::FETCH_LIST) + .value("STEP_SCOPES", VarDesc::STEP_SCOPES); } void BindOpDesc(py::module &m) { diff --git a/python/paddle/v2/framework/backward.py b/python/paddle/v2/framework/backward.py new file mode 100644 index 0000000000..6827792cb3 --- /dev/null +++ b/python/paddle/v2/framework/backward.py @@ -0,0 +1,45 @@ +from paddle.v2.framework import framework as framework + +__all__ = ['append_backward_ops'] + + +def append_backward_ops(loss, parameter_list=None, no_grad_set=None): + """ + Create and add gradient Operators in BlockDesc to compute + gradients of `loss` for parameters in parameter_list + + :param loss: an variable generated by cost function. + :type loss: Variable + :param no_grad_set: variable that should not create gradient + :type no_grad_set: set + :param parameter_list: parameters that need to compute gradient and + update to optimize the lost. + :type: list + :return: list of (parameters, gradients) pair. + :rtype: list[Variable] + """ + assert isinstance(loss, framework.Variable) + param_grad_map = loss.block.program.append_backward(loss, no_grad_set or + set()) + if parameter_list is not None: + parameters = parameter_list + else: + params = loss.block.program.global_block().all_parameters() + parameters = [param.name for param in params] + params_and_grads = [] + for param in parameters: + if param not in param_grad_map: + raise ValueError("param %s is not in map" % param) + grad_info = param_grad_map[param] + grad_block = loss.block.program.block(grad_info[1]) + if not grad_block.has_var(grad_info[0]): + raise ValueError("grad block[{0}] did not have grad var {1}".format( + grad_info[1], grad_info[0])) + # Get the param var from the global block + param_var = loss.block.program.global_block().var(param) + grad_var = grad_block.var(grad_info[0]) + if loss.block.has_var(grad_info[0]): + params_and_grads.append((param_var, grad_var)) + else: + params_and_grads.append((param_var, None)) + return params_and_grads diff --git a/python/paddle/v2/framework/framework.py b/python/paddle/v2/framework/framework.py index 40b9008d67..b3f8be8be9 100644 --- a/python/paddle/v2/framework/framework.py +++ b/python/paddle/v2/framework/framework.py @@ -261,7 +261,8 @@ class Operator(object): self.desc.set_attr(attr_name, attrs[attr_name]) self.desc.check_attrs() - if type not in {'feed', 'fetch'}: + no_kernel_op_set = {'feed', 'fetch', 'save', 'restore'} + if type not in no_kernel_op_set: self.desc.infer_var_type(self.block.desc) self.desc.infer_shape(self.block.desc) diff --git a/python/paddle/v2/framework/optimizer.py b/python/paddle/v2/framework/optimizer.py index ba2713e34d..a86908c648 100644 --- a/python/paddle/v2/framework/optimizer.py +++ b/python/paddle/v2/framework/optimizer.py @@ -1,7 +1,11 @@ -import paddle.v2.framework.framework as framework from collections import defaultdict -__all__ = ['SGDOptimizer', 'MomentumOptimizer', 'AdagradOptimizer'] +import paddle.v2.framework.framework as framework +from paddle.v2.framework.backward import append_backward_ops + +__all__ = [ + 'SGDOptimizer', 'MomentumOptimizer', 'AdagradOptimizer', 'AdamOptimizer' +] class Optimizer(object): @@ -43,6 +47,19 @@ class Optimizer(object): """ pass + def _finish_update(self, block): + """Finish any custom updates needed + before completing an optimization step + + Args: + block: the block in which the loss variable is present + parameters: list of parameter variables for the optimizer + + Returns: + list of finish ops or None + """ + pass + def _add_accumulator(self, block, name, param, dtype=None, fill_value=0.0): """Utility function to add an accumulator for a parameter @@ -90,45 +107,6 @@ class Optimizer(object): format(name, param.name)) return self._accumulators[name][param.name] - def create_backward_pass(self, loss, parameter_list=None, no_grad_set=None): - """Create and add gradient Operators in BlockDesc to compute - gradients of `loss` for parameters in parameter_list - - Args: - loss: an variable generated by cost function. - no_grad_set: variable that should not create gradient - parameter_list: parameters that need to compute gradient and - update to optimize the lost. - - Returns: - list of (parameters, gradients) pair. - """ - assert isinstance(loss, framework.Variable) - param_grad_map = loss.block.program.append_backward(loss, no_grad_set or - set()) - if parameter_list is not None: - parameters = parameter_list - else: - params = loss.block.program.global_block().all_parameters() - parameters = [param.name for param in params] - params_and_grads = [] - for param in parameters: - if param not in param_grad_map: - raise Exception("param %s is not in map" % param) - grad_info = param_grad_map[param] - grad_block = loss.block.program.block(grad_info[1]) - if not grad_block.has_var(grad_info[0]): - raise Exception("grad block[%d] did not have grad var %s" % - grad_info[1], grad_info[0]) - # Get the param var from the global block - param_var = loss.block.program.global_block().var(param) - grad_var = grad_block.var(grad_info[0]) - if loss.block.has_var(grad_info[0]): - params_and_grads.append((param_var, grad_var)) - else: - params_and_grads.append((param_var, None)) - return params_and_grads - def create_optimization_pass(self, parameters_and_grads, loss): """Add optimization operators to update gradients to variables. @@ -137,15 +115,17 @@ class Optimizer(object): parameters_and_grads: a list of (variable, gradient) pair to update. Returns: - optmization_op_list: a list of optimization operator that will update - parameter using gradient. + return_op_list: a list of operators that will complete one step of + optimization. This will include parameter update ops, global step + update ops and any other custom ops required by subclasses to manage + their internal state. """ # This is a default implementation of create_optimization_pass that # can be shared by most optimizers. This implementation assumes that # the subclass will implement the _append_optimize_op method and the # _initialize_tensors method. The subclass can extend the # _create_accumulators method if it needs to create accumulators - # for parameters. + # for parameters and extend _finish_update method to add custom ops. # Create any accumulators self._create_accumulators(loss.block, @@ -160,16 +140,26 @@ class Optimizer(object): param_and_grad) optimize_ops.append(optimize_op) - return optimize_ops + # Returned list of ops can include more ops in addition + # to optimization ops + return_ops = optimize_ops + + # Get custom finish ops for subclasses + # FIXME: Need to fix this once we figure out how to handle dependencies + finish_ops = self._finish_update(loss.block) + if finish_ops is not None: + return_ops += finish_ops + + return return_ops def minimize(self, loss, parameter_list=None, no_grad_set=None): """Add operations to minimize `loss` by updating `parameter_list`. - This method combines interface `create_backward_pass()` and + This method combines interface `append_backward_ops()` and `create_optimization_pass()` into one. """ - params_grads = self.create_backward_pass(loss, parameter_list, - no_grad_set or set()) + params_grads = append_backward_ops(loss, parameter_list, no_grad_set or + set()) optimize_ops = self.create_optimization_pass(params_grads, loss) return optimize_ops @@ -329,3 +319,124 @@ class AdagradOptimizer(Optimizer): attrs={"epsilon": self._epsilon}) return adagrad_op + + +class AdamOptimizer(Optimizer): + """Implements the Adam Optimizer + """ + _moment1_acc_str = "moment1" + _moment2_acc_str = "moment2" + + def __init__(self, + learning_rate=0.001, + beta1=0.9, + beta2=0.999, + epsilon=1e-8): + assert learning_rate is not None + assert beta1 is not None + assert beta2 is not None + assert epsilon is not None + super(AdamOptimizer, self).__init__() + self.type = "adam" + self._learning_rate = learning_rate + self._beta1 = beta1 + self._beta2 = beta2 + self._epsilon = epsilon + + def _initialize_tensors(self, block): + assert isinstance(block, framework.Block) + lr_shape = [1] + # create a variable for learning_rate + self._lr = block.create_var( + dtype="float32", shape=lr_shape, lod_level=0) + + # create an op to init the learning_rate + # FIXME: Fix when Initialization design has been implemented + # https://github.com/PaddlePaddle/Paddle/pull/4852 + block.append_op( + type="fill_constant", + outputs={"Out": self._lr}, + attrs={"shape": lr_shape, + "value": self._learning_rate}) + + def _create_accumulators(self, block, parameters): + assert isinstance(block, framework.Block) + + global_block = block.program.global_block() + # Create beta1 and beta2 power tensors + beta_shape = [1] + # Create variables for beta1 and beta2 powers + self._beta1_pow_acc = global_block.create_var( + dtype="float32", shape=beta_shape, lod_level=0) + self._beta2_pow_acc = global_block.create_var( + dtype="float32", shape=beta_shape, lod_level=0) + + # Initialize beta1 and beta2 power accumulators + # FIXME: Fix when Initialization design has been implemented + # https://github.com/PaddlePaddle/Paddle/pull/4852 + global_block.append_op( + type="fill_constant", + outputs={"Out": self._beta1_pow_acc}, + attrs={"shape": beta_shape, + "value": self._beta1}) + global_block.append_op( + type="fill_constant", + outputs={"Out": self._beta2_pow_acc}, + attrs={"shape": beta_shape, + "value": self._beta2}) + + # Create accumulator tensors for first and second moments + for p in parameters: + self._add_accumulator(block, self._moment1_acc_str, p, 'float32') + self._add_accumulator(block, self._moment2_acc_str, p, 'float32') + + def _append_optimize_op(self, block, param_and_grad): + assert isinstance(block, framework.Block) + + moment1 = self._get_accumulator(self._moment1_acc_str, + param_and_grad[0]) + moment2 = self._get_accumulator(self._moment2_acc_str, + param_and_grad[0]) + # create the momentum optimize op + adam_op = block.append_op( + type=self.type, + inputs={ + "Param": param_and_grad[0], + "Grad": param_and_grad[1], + "LearningRate": self._lr, + "Moment1": moment1, + "Moment2": moment2, + "Beta1Pow": self._beta1_pow_acc, + "Beta2Pow": self._beta2_pow_acc + }, + outputs={ + "ParamOut": param_and_grad[0], + "Moment1Out": moment1, + "Moment2Out": moment2 + }, + attrs={ + "beta1": self._beta1, + "beta2": self._beta2, + "epsilon": self._epsilon + }) + + return adam_op + + def _finish_update(self, block): + """Update Beta1 and Beta2 Power accumulators + """ + assert isinstance(block, framework.Block) + global_block = block.program.global_block() + scale_beta1 = global_block.append_op( + type="scale", + inputs={"X": self._beta1_pow_acc}, + outputs={"Out": self._beta1_pow_acc}, + attrs={"scale": self._beta1}) + + scale_beta2 = global_block.append_op( + type="scale", + inputs={"X": self._beta2_pow_acc}, + outputs={"Out": self._beta2_pow_acc}, + attrs={"scale": self._beta2}) + + return [scale_beta1, scale_beta2] diff --git a/python/paddle/v2/framework/tests/op_test.py b/python/paddle/v2/framework/tests/op_test.py index 0f8c61a2ab..a7de01dcdd 100644 --- a/python/paddle/v2/framework/tests/op_test.py +++ b/python/paddle/v2/framework/tests/op_test.py @@ -390,7 +390,8 @@ class OpTest(unittest.TestCase): output_names, no_grad_set=None, in_place=False, - max_relative_error=0.005): + max_relative_error=0.005, + user_defined_grads=None): self.scope = core.Scope() op_inputs = self.inputs if hasattr(self, "inputs") else dict() op_outputs = self.outputs if hasattr(self, "outputs") else dict() @@ -403,7 +404,7 @@ class OpTest(unittest.TestCase): if not type(output_names) is list: output_names = [output_names] - numeric_grads = [ + numeric_grads = user_defined_grads or [ get_numeric_gradient( self.scope, self.op, diff --git a/python/paddle/v2/framework/tests/test_batch_norm_op.py b/python/paddle/v2/framework/tests/test_batch_norm_op.py new file mode 100644 index 0000000000..b7b071c24d --- /dev/null +++ b/python/paddle/v2/framework/tests/test_batch_norm_op.py @@ -0,0 +1,197 @@ +import unittest +import numpy as np +from op_test import OpTest, get_backward_op, grad_var_name +import paddle.v2.framework.core as core +from paddle.v2.framework.op import Operator + + +def _reference_training(x, scale, offset, epsilon, data_format): + if data_format != "NHWC": + raise ValueError("data_format must be NHWC, got %s." % data_format) + x_square = x * x + x_square_sum = np.sum(x_square, (0, 1, 2)) + x_sum = np.sum(x, axis=(0, 1, 2)) + element_count = np.size(x) / int(np.shape(x)[-1]) + mean = x_sum / element_count + var = x_square_sum / element_count - mean * mean + normalized = (x - mean) / np.sqrt(var + epsilon) + return (normalized * scale + offset), mean, var + + +def _reference_grad(x, grad_y, scale, mean, var, epsilon, data_format): + # Use the following formulas to calculate gradients: + # grad_scale = + # sum(grad_y * (x - mean)) * rsqrt(var + epsilon) + # + # grad_offset = sum(output_y) + # + # grad_x = + # 1/N * scale * rsqrt(var + epsilon) * (N * grad_y - sum(grad_y) - + # (x - mean) * sum(grad_y * (x - mean)) / (var + epsilon)) + if data_format != "NHWC": + raise ValueError("data_format must be NHWC, got %s." % data_format) + grad_x = scale * (grad_y - np.mean( + grad_y, axis=(0, 1, 2)) - (x - mean) * np.mean( + grad_y * (x - mean), axis=(0, 1, 2)) / + (var + epsilon)) / np.sqrt(var + epsilon) + grad_scale = np.sum(grad_y * (x - mean) / np.sqrt(var + epsilon), + axis=(0, 1, 2)) + grad_offset = np.sum(grad_y, axis=(0, 1, 2)) + return grad_x, grad_scale, grad_offset + + +def create_or_get_tensor(scope, var_name, var, place): + tensor = scope.var(var_name).get_tensor() + if var is not None: + assert isinstance(var, np.ndarray) + tensor.set_lod([[]]) + tensor.set_dims(var.shape) + tensor.set(var, place) + return tensor + + +def set_output_grad(scope, outputs, place): + def __set_tensor__(name): + out_tensor = scope.find_var(name).get_tensor() + grad_tensor = scope.var(grad_var_name(name)).get_tensor() + out_dtype = out_tensor.dtype() + if out_dtype == core.DataType.FP64: + data = np.ones(out_tensor.shape(), dtype=np.float64) + elif out_dtype == core.DataType.FP32: + data = np.ones(out_tensor.shape(), dtype=np.float32) + else: + raise ValueError("Not supported data type " + str(out_dtype)) + + grad_tensor.set(data, place) + + for output in outputs: + __set_tensor__(output) + + +class TestBatchNormOp(OpTest): + def __assert_close(self, tensor, np_array, msg, atol=1e-4): + self.assertTrue(np.allclose(np.array(tensor), np_array, atol=atol), msg) + + def test_forward_backward(self): + # attr + data_format = "NHWC" + epsilon = 0.00001 + momentum = 0.9 + + channel_num = 2 + x_shape = [2, 3, 4, channel_num] + scale_shape = [channel_num] + + # input + x_val = np.random.random_sample(x_shape).astype(np.float32) + scale_val = np.random.random_sample(scale_shape).astype(np.float32) + bias_val = np.random.random_sample(scale_shape).astype(np.float32) + + mean = np.zeros(scale_shape).astype(np.float32) + variance = np.zeros(scale_shape).astype(np.float32) + + # run forward + y_out, saved_mean, var_ref = _reference_training( + x_val, scale_val, bias_val, epsilon, data_format) + + # run backward + mean_out = saved_mean * (1 - momentum) + variance_out = var_ref * (1 - momentum) + saved_variance = 1 / np.sqrt(var_ref + epsilon) + + # for gradient test + y_grad = np.ones(x_shape).astype(np.float32) + x_grad_ref, scale_grad_ref, bias_grad_ref = _reference_grad( + x_val, y_grad, scale_val, saved_mean, var_ref, epsilon, data_format) + + def test_with_place(place): + scope = core.Scope() + + # create input + x_tensor = create_or_get_tensor(scope, "x_val", x_val, place) + scale_tensor = create_or_get_tensor(scope, "scale_val", scale_val, + place) + bias_tensor = create_or_get_tensor(scope, "bias_val", bias_val, + place) + mean_tensor = create_or_get_tensor(scope, "mean", mean, place) + variance_tensor = create_or_get_tensor(scope, "variance", variance, + place) + + # create output + y_tensor = create_or_get_tensor(scope, "y_out", None, place) + saved_mean_tensor = create_or_get_tensor(scope, "saved_mean", None, + place) + saved_variance_tensor = create_or_get_tensor( + scope, "saved_variance", None, place) + mean_out_tensor = mean_tensor + variance_out_tensor = variance_tensor + + batch_norm_op = Operator( + "batch_norm", + # inputs + X="x_val", + Scale="scale_val", + Bias="bias_val", + Mean="mean", + Variance="variance", + # outputs + Y="y_out", + MeanOut="mean", + VarianceOut="variance", + SavedMean="saved_mean", + SavedVariance="saved_variance", + # attrs + is_test=False, + tensor_format=data_format, + momentum=momentum, + epsilon=epsilon) + + ctx = core.DeviceContext.create(place) + batch_norm_op.run(scope, ctx) + + # check forward result + self.__assert_close(y_tensor, y_out, "y_out") + self.__assert_close(saved_mean_tensor, saved_mean, "saved_mean") + self.__assert_close(saved_variance_tensor, saved_variance, + "saved_variance") + self.__assert_close(mean_out_tensor, mean_out, "mean_out") + # FIXME(qiao) figure out why with cuDNN variance_out have a higher error rate + if isinstance(place, core.GPUPlace): + atol = 5e-2 + else: + atol = 1e-4 + self.__assert_close(variance_out_tensor, variance_out, + "variance_out", atol) + + # run backward + batch_norm_op_grad = get_backward_op(scope, batch_norm_op, set()) + set_output_grad( + scope, + ["y_out", "mean", "variance", "saved_mean", "saved_variance"], + place) + batch_norm_op_grad.run(scope, ctx) + + x_grad_tensor = create_or_get_tensor(scope, + grad_var_name("x_val"), None, + place) + scale_grad_tensor = create_or_get_tensor(scope, + grad_var_name("scale_val"), + None, place) + bias_grad_tensor = create_or_get_tensor(scope, + grad_var_name("bias_val"), + None, place) + + # check gradient output + self.__assert_close(x_grad_tensor, x_grad_ref, "x_grad") + self.__assert_close(scale_grad_tensor, scale_grad_ref, "scale_grad") + self.__assert_close(bias_grad_tensor, bias_grad_ref, "bias_grad") + + places = [core.CPUPlace()] + if core.is_compile_gpu() and core.op_support_gpu("batch_norm"): + places.append(core.GPUPlace(0)) + for place in places: + test_with_place(place) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/v2/framework/tests/test_optimizer.py b/python/paddle/v2/framework/tests/test_optimizer.py index 3d1715bf62..eb5d49bcba 100644 --- a/python/paddle/v2/framework/tests/test_optimizer.py +++ b/python/paddle/v2/framework/tests/test_optimizer.py @@ -2,6 +2,7 @@ import unittest import paddle.v2.framework.framework as framework import paddle.v2.framework.optimizer as optimizer +from paddle.v2.framework.backward import append_backward_ops class TestOptimizer(unittest.TestCase): @@ -51,7 +52,7 @@ class TestMomentumOptimizer(unittest.TestCase): outputs={"Out": mul_out}, attrs={"x_num_col_dims": 1}) momentum_optimizer = self.MockMomentum(learning_rate=0.01, momentum=0.2) - params_grads = momentum_optimizer.create_backward_pass(mul_out) + params_grads = append_backward_ops(mul_out) self.assertEqual(len(params_grads), 1) self.assertEqual(len(momentum_optimizer.get_accumulators()), 0) opts = momentum_optimizer.create_optimization_pass(params_grads, @@ -93,7 +94,7 @@ class TestAdagradOptimizer(unittest.TestCase): outputs={"Out": mul_out}, attrs={"x_num_col_dims": 1}) adagrad_optimizer = self.MockAdagrad(learning_rate=0.01, epsilon=1.0e-6) - params_grads = adagrad_optimizer.create_backward_pass(mul_out) + params_grads = append_backward_ops(mul_out) self.assertEqual(len(params_grads), 1) self.assertEqual(len(adagrad_optimizer.get_accumulators()), 0) opts = adagrad_optimizer.create_optimization_pass(params_grads, mul_out) @@ -110,5 +111,54 @@ class TestAdagradOptimizer(unittest.TestCase): self.assertTrue(mul_x.name in moment_acc) +class TestAdamOptimizer(unittest.TestCase): + class MockAdam(optimizer.AdamOptimizer): + def get_accumulators(self): + return self._accumulators + + def get_moment1_str(self): + return self._moment1_acc_str + + def get_moment2_str(self): + return self._moment2_acc_str + + def test_adam_optimizer(self): + program = framework.Program() + block = program.global_block() + mul_x = block.create_parameter( + dtype="float32", shape=[5, 10], lod_level=0, name="mul.x") + mul_y = block.create_var( + dtype="float32", shape=[10, 8], lod_level=0, name="mul.y") + mul_out = block.create_var( + dtype="float32", shape=[5, 8], lod_level=0, name="mul.out") + block.append_op( + type="mul", + inputs={"X": mul_x, + "Y": mul_y}, + outputs={"Out": mul_out}, + attrs={"x_num_col_dims": 1}) + adam_optimizer = self.MockAdam( + learning_rate=0.01, beta1=0.9, beta2=0.999) + params_grads = append_backward_ops(mul_out) + self.assertEqual(len(params_grads), 1) + self.assertEqual(len(adam_optimizer.get_accumulators()), 0) + opts = adam_optimizer.create_optimization_pass(params_grads, mul_out) + self.assertEqual(len(opts), 3) + adam_op = opts[0] + self.assertEqual(adam_op.type, "adam") + + # Check accumulators + accumulators = adam_optimizer.get_accumulators() + self.assertEqual(len(accumulators), 2) + self.assertTrue(adam_optimizer.get_moment1_str() in accumulators) + self.assertTrue(adam_optimizer.get_moment2_str() in accumulators) + moment1_acc = accumulators[adam_optimizer.get_moment1_str()] + moment2_acc = accumulators[adam_optimizer.get_moment2_str()] + self.assertEqual(len(moment1_acc), 1) + self.assertEqual(len(moment2_acc), 1) + self.assertTrue(mul_x.name in moment1_acc) + self.assertTrue(mul_x.name in moment2_acc) + + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/v2/framework/tests/test_save_restore_op.py b/python/paddle/v2/framework/tests/test_save_restore_op.py new file mode 100644 index 0000000000..3a36d03f62 --- /dev/null +++ b/python/paddle/v2/framework/tests/test_save_restore_op.py @@ -0,0 +1,71 @@ +import paddle.v2.framework.core as core +import paddle.v2.framework.framework as framework +import paddle.v2.framework.executor as executor + +import numpy as np +import unittest +import os +import sys +import shutil + +FOLDER_PATH = "./tmp_test_dir" + + +class TestSaveRestoreOp(unittest.TestCase): + def test_save_restore_op(self): + tensor_1_val = np.random.rand(3, 9).astype("float32") + tensor_2_val = np.random.randint(0, 20, size=(4, 2)).astype("int32") + place = core.CPUPlace() + + program = framework.Program() + block = program.global_block() + v_a = block.create_var( + dtype="float32", shape=[3, 9], lod_level=0, name="tensor_1") + v_b = block.create_var( + dtype="int32", shape=[4, 2], lod_level=0, name="tensor_2") + + t_1 = core.LoDTensor() + t_1.set(tensor_1_val, place) + t_2 = core.LoDTensor() + t_2.set(tensor_2_val, place) + block.append_op( + type="save", + inputs={"X": [v_a, v_b]}, + attrs={"folderPath": FOLDER_PATH}) + block.append_op( + type="fill_constant", + outputs={"Out": [v_a]}, + attrs={"shape": [2, 2], + "value": 0.0}) + block.append_op( + type="fill_constant", + outputs={"Out": [v_b]}, + attrs={"shape": [2, 2], + "value": 0.0}) + block.append_op( + type="restore", + outputs={"Out": [v_a, v_b]}, + attrs={"folderPath": FOLDER_PATH}) + + if os.path.exists(FOLDER_PATH): + shutil.rmtree(FOLDER_PATH) + os.makedirs(FOLDER_PATH) + + exe = executor.Executor(place) + out = exe.run(program, + feed={"tensor_1": t_1, + "tensor_2": t_2}, + fetch_list=[v_a, v_b]) + + self.assertTrue(os.path.isdir(FOLDER_PATH)) + self.assertTrue(os.path.isfile(FOLDER_PATH + "/__tensor_1__")) + self.assertTrue(os.path.isfile(FOLDER_PATH + "/__tensor_2__")) + + self.assertTrue(np.array_equal(np.array(out[0]), tensor_1_val)) + self.assertTrue(np.array_equal(np.array(out[1]), tensor_2_val)) + + shutil.rmtree(FOLDER_PATH) + + +if __name__ == "__main__": + unittest.main() diff --git a/python/paddle/v2/framework/tests/test_variable.py b/python/paddle/v2/framework/tests/test_variable.py index 6fb934c743..c670ca19af 100644 --- a/python/paddle/v2/framework/tests/test_variable.py +++ b/python/paddle/v2/framework/tests/test_variable.py @@ -1,5 +1,5 @@ import unittest -from paddle.v2.framework.framework import Variable, g_program +from paddle.v2.framework.framework import Variable, g_program, Program import paddle.v2.framework.core as core import numpy as np @@ -36,6 +36,13 @@ class TestVariable(unittest.TestCase): self.assertRaises(ValueError, lambda: b.create_var(name="fc.w", shape=(24, 100))) + def test_step_scopes(self): + prog = Program() + b = prog.current_block() + var = b.create_var( + name='step_scopes', type=core.VarDesc.VarType.STEP_SCOPES) + self.assertEqual(core.VarDesc.VarType.STEP_SCOPES, var.type) + if __name__ == '__main__': unittest.main() diff --git a/python/paddle/v2/model.py b/python/paddle/v2/model.py index 20c3282098..4634db55a9 100644 --- a/python/paddle/v2/model.py +++ b/python/paddle/v2/model.py @@ -49,7 +49,7 @@ def save_model(parameters, path): ' in environment variable.') etcd_ip = os.environ.get(etcd_name) - client = master.client("http://" + etcd_ip + ":2379", 5, 0) + client = paddle.v2.master.client("http://" + etcd_ip + ":2379", 5, 0) r = client.request_save_model(trainer_id, 5000) if r == 0: # do not need to save