Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into improve_pruning

gangliao-patch-1
xzl 8 years ago
commit 23b1a27483

@ -50,6 +50,7 @@ before_install:
# protobuf version.
- pip install numpy wheel 'protobuf==3.1' sphinx==1.5.6 recommonmark sphinx-rtd-theme==0.1.9 virtualenv pre-commit requests==2.9.2 LinkChecker
- pip install rarfile
- eval "$(GIMME_GO_VERSION=1.8.3 gimme)"
- |
function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; }
script:

@ -127,6 +127,7 @@ endif(WITH_GPU)
add_subdirectory(proto)
add_subdirectory(paddle)
add_subdirectory(python)
add_subdirectory(go/pserver/cclient)
if(WITH_DOC)
add_subdirectory(doc)

@ -59,6 +59,11 @@ context_projection
.. autoclass:: paddle.v2.layer.context_projection
:noindex:
row_conv
--------
.. autoclass:: paddle.v2.layer.row_conv
:noindex:
Image Pooling Layer
===================
@ -346,6 +351,12 @@ sampling_id
.. autoclass:: paddle.v2.layer.sampling_id
:noindex:
multiplex
---------
.. autoclass:: paddle.v2.layer.multiplex
:noindex:
Slicing and Joining Layers
==========================

@ -74,14 +74,25 @@ typedef enum {
typedef struct {
char* name;
paddle_element_type element_type;
void* content;
unsigned char* content;
int content_len;
} paddle_parameter, paddle_gradient;
typedef struct paddle_pserver_client paddle_pserver_client;
typedef int paddle_pserver_client;
paddle_pserver_client* paddle_new_pserver_client();
void paddle_pserver_client_release(paddle_pserver_client* client);
/**
* @brief creates a pserver client that talks to etcd for coordination.
*/
paddle_pserver_client paddle_new_etcd_pserver_client(char* etcd_addr);
/**
* @brief creates a pserver client given pserver addresses.
*
* @param pserver_addrs comma-separated pserver addresses.
* @param selected if current pserver client is selected to initialize all parameter servers.
*/
paddle_pserver_client paddle_new_pserver_client(char* pserver_addrs, int selected);
void paddle_pserver_client_release(paddle_pserver_client c);
/**
* @brief paddle_begin_init_params begins to initialize parameters on
@ -95,7 +106,7 @@ void paddle_pserver_client_release(paddle_pserver_client* client);
* @return 1 if the trainer is selected to initialize parameter
* servers, otherwise 0.
*/
int paddle_begin_init_params(paddle_pserver_client* client);
int paddle_begin_init_params(paddle_pserver_client client);
/**
* @brief paddle_init_param initializes the parameter on parameter
@ -109,7 +120,7 @@ int paddle_begin_init_params(paddle_pserver_client* client);
* @paddle_begin_init_param). Or simply exit the program and wait for
* the cluster management system to restart the trainer.
*/
int paddle_init_param(paddle_pserver_client* client, paddle_parameter param, const unsigned char* param_config_proto, int config_len);
int paddle_init_param(paddle_pserver_client client, paddle_parameter param, const unsigned char* param_config_proto, int config_len);
/**
* @brief paddle_finish_init_params tells parameter servers client has
@ -120,7 +131,7 @@ int paddle_init_param(paddle_pserver_client* client, paddle_parameter param, con
* @paddle_begin_init_param). Or simply exit the program and wait for
* the cluster management system to restart the trainer.
*/
int paddle_finish_init_params(paddle_pserver_client* client);
int paddle_finish_init_params(paddle_pserver_client client);
/**
* @brief paddle_send_grads sends gradients to parameter servers for
@ -131,7 +142,7 @@ int paddle_finish_init_params(paddle_pserver_client* client);
* @param learning_rate the learning rate for the gradients.
* @return 0 if successful, otherwise -1.
*/
int paddle_send_grads(paddle_pserver_client* client, const paddle_gradient* grads, int len);
int paddle_send_grads(paddle_pserver_client client, const paddle_gradient* grads, int len);
/**
* @brief paddle_get_params gets parameters from parameter servers.
@ -139,13 +150,15 @@ int paddle_send_grads(paddle_pserver_client* client, const paddle_gradient* grad
* paddle_get_params will block until parameters are initialized on
* the parameter servers.
*
* @param names the array of names of the parameters to get.
* @param dst the destination array of parameters to save to.
* @param dst the destination array of parameter pointers to save to.
* The parameter pointer must be pre-popullated with required parameter name,
* and the content of parameter must be pre-allocated of the size of required
* parameter on pserver.
* @param len the length of the names array and the paddle_parameter
* array.
* @return 0 if successful, otherwise -1.
*/
int paddle_get_params(paddle_pserver_client* client, const char** names, paddle_parameter* dst, int len);
int paddle_get_params(paddle_pserver_client client, paddle_parameter** dst, int len);
/**
* @brief paddle_save_model indicates parameters to save the parameter
@ -154,5 +167,5 @@ int paddle_get_params(paddle_pserver_client* client, const char** names, paddle_
* @param path the path to save parameters.
* @return 0 if successful, otherwise -1.
*/
int paddle_save_model(paddle_pserver_client* client, const char* path);
int paddle_save_model(paddle_pserver_client client, const char* path);
```

@ -0,0 +1,21 @@
# Design Doc: Remote Parameter Updater for Cluster Train
For an overview of distribute training, please refer to [distributed training design doc](README.md). In this design doc, we will discuss the parameter updater that will use parameter server cclient [The Client Library of Parameter Server Design Doc](pserver_client.md) to manage and update parameters.
## Parameter Updater
Parameter Updater is used by trainer to manage and update parameter, there are mainly two kind of parameter updater: local and remote, since this design is for cluster train, we will only discuss remote parameter updater here.
### Remote Parameter Updater
Remote Parameter Updater manage parameters through remote parameter server with the client that communicate with pserver([The Client Library of Parameter Server Design Doc](pserver_client.md))
In PaddlePaddle Python V2 API, trainer is implemented in python, and the trainer will hold a instance of parameter updater and call it's functions directly. In this design, we will also expose the api of RemoteParameterUpdater to python with swig.
#### Sparse Remote Parameter Updater
Since we will only implement dense parameter management new, the mechanism for sparse parameter will be discussed in next stage.
### Interface Design
TBD

@ -17,7 +17,7 @@ function(GO_LIBRARY NAME BUILD_TYPE)
endif()
file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go")
file(RELATIVE_PATH rel ${CMAKE_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR})
file(RELATIVE_PATH rel ${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR})
# find Paddle directory.
get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
@ -32,12 +32,14 @@ function(GO_LIBRARY NAME BUILD_TYPE)
# will use the local changes in Paddle rather than checkout Paddle
# in github.
add_custom_target(copyPaddle
COMMAND ln -sf ${PADDLE_DIR} ${PADDLE_IN_GOPATH})
COMMAND rm -rf ${PADDLE_IN_GOPATH}/Paddle
COMMAND ln -sf ${PADDLE_DIR} ${PADDLE_IN_GOPATH}/Paddle)
add_dependencies(goGet copyPaddle)
add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp
COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE}
-o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}"
-gcflags=-shared -asmflags=-shared -installsuffix=_shared -a
-o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}"
${CMAKE_GO_FLAGS} ${GO_SOURCE}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})

@ -1,80 +1,32 @@
package main
import (
"fmt"
"net"
"net/http"
"net/rpc"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/namsral/flag"
"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/recordio"
)
func main() {
port := flag.Int("port", 8080, "port of the master server.")
dataset := flag.String("training_dataset", "", "dataset: comma separated path to RecordIO paths, supports golb patterns.")
faultTolerance := flag.Bool("fault_tolerance", false, "enable fault tolerance (requires etcd).")
taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.")
taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.")
chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.")
flag.Parse()
if *dataset == "" {
panic("no dataset specified.")
}
if *faultTolerance {
panic("fault tolernance not implemented.")
}
var chunks []master.Chunk
var paths []string
ss := strings.Split(*dataset, ",")
fmt.Println(ss)
for _, s := range ss {
match, err := filepath.Glob(s)
if err != nil {
panic(err)
}
paths = append(paths, match...)
}
if len(paths) == 0 {
panic("no valid datset specified.")
}
idx := 0
for _, path := range paths {
f, err := os.Open(path)
if err != nil {
panic(err)
}
index, err := recordio.LoadIndex(f)
if err != nil {
panic(err)
}
f.Close()
count := index.NumChunks()
for i := 0; i < count; i++ {
chunk := master.Chunk{
Idx: idx,
Path: path,
Index: *index.ChunkIndex(i),
}
chunks = append(chunks, chunk)
}
}
s := master.NewService(chunks, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
s := master.NewService(*chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
err := rpc.Register(s)
if err != nil {
panic(err)

@ -2,6 +2,7 @@ package connection
import (
"errors"
"log"
"net/rpc"
"sync"
)
@ -21,6 +22,18 @@ func New() *Conn {
return c
}
// Close closes the connection.
func (c *Conn) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.client == nil {
return nil
}
return c.client.Close()
}
// Connect connects the connection to a address.
func (c *Conn) Connect(addr string) error {
c.mu.Lock()
@ -50,12 +63,20 @@ func (c *Conn) Connect(addr string) error {
c.waitConn = nil
}
} else {
err := client.Close()
if err != nil {
log.Println(err)
}
return errors.New("client already set from a concurrent goroutine")
}
return nil
}
// TODO(helin): refactor Call to be able to perform given retry
// policy.
// Call make a RPC call.
//
// Call will be blocked until the connection to remote RPC service

@ -0,0 +1,82 @@
package master
import (
"log"
"time"
"github.com/PaddlePaddle/Paddle/go/connection"
)
// Addresser provide the address of the master server.
type Addresser interface {
Address() string
}
// Client is the client of the master server.
type Client struct {
conn *connection.Conn
}
// NewClient creates a new Client.
func NewClient(addr Addresser) *Client {
c := &Client{}
c.conn = connection.New()
go c.monitorMaster(addr)
return c
}
func (c *Client) monitorMaster(addr Addresser) {
lastMaster := ""
monitor := func() {
// get the lastest address of the master server,
// connect to the new address once address changed.
curMaster := addr.Address()
if curMaster != lastMaster {
if curMaster == "" {
err := c.conn.Close()
if err != nil {
log.Println(err)
}
} else {
err := c.conn.Connect(curMaster)
if err != nil {
log.Println(err)
// connect to addr failed, set
// to last known addr in order
// to retry next time.
curMaster = lastMaster
}
}
}
lastMaster = curMaster
}
monitor()
ticker := time.NewTicker(10 * time.Second)
for _ = range ticker.C {
monitor()
}
}
// SetDataset set dataset for the master server to dispatch.
//
// SetDataset can be call multiple times from different nodes. But
// only the first call will be honored.
func (c *Client) SetDataset(globPaths []string) error {
return c.conn.Call("Service.SetDataset", globPaths, nil)
}
// GetTask gets a new task from the master server.
func (c *Client) GetTask() (Task, error) {
var t Task
err := c.conn.Call("Service.GetTask", 0, &t)
return t, err
}
// TaskFinished tells the master server a task is finished.
func (c *Client) TaskFinished(taskID int) error {
return c.conn.Call("Service.TaskFinished", taskID, nil)
}

@ -0,0 +1,120 @@
package master_test
import (
"fmt"
"net"
"net/http"
"net/rpc"
"os"
"strconv"
"strings"
"testing"
"time"
log "github.com/sirupsen/logrus"
"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/recordio"
)
const (
totalTask = 20
chunkPerTask = 10
)
var port int
func init() {
log.SetLevel(log.ErrorLevel)
l, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
ss := strings.Split(l.Addr().String(), ":")
p, err := strconv.Atoi(ss[len(ss)-1])
if err != nil {
panic(err)
}
port = p
go func(l net.Listener) {
s := master.NewService(chunkPerTask, time.Second, 1)
server := rpc.NewServer()
err := server.Register(s)
if err != nil {
panic(err)
}
mux := http.NewServeMux()
mux.Handle(rpc.DefaultRPCPath, server)
err = http.Serve(l, mux)
if err != nil {
panic(err)
}
}(l)
}
type addresser string
func (a addresser) Address() string {
return string(a)
}
func TestClientFull(t *testing.T) {
const p = "/tmp/master_client_test_0"
f, err := os.Create(p)
if err != nil {
panic(err)
}
for i := 0; i < totalTask*chunkPerTask; i++ {
w := recordio.NewWriter(f, -1, -1)
w.Write(nil)
// call Close to force RecordIO writing a chunk.
w.Close()
}
f.Close()
c := master.NewClient(addresser(fmt.Sprintf(":%d", port)))
c.SetDataset([]string{p})
checkOnePass := func(i int) {
var tasks []master.Task
for i := 0; i < totalTask; i++ {
task, err := c.GetTask()
if err != nil {
t.Fatal(i, err)
}
tasks = append(tasks, task)
}
_, err = c.GetTask()
if err == nil {
t.Fatal(i, "should get error.")
}
err = c.TaskFinished(tasks[0].ID)
if err != nil {
t.Fatal(err)
}
tasks = tasks[1:]
task, err := c.GetTask()
if err != nil {
t.Fatal(err)
}
tasks = append(tasks, task)
for _, task := range tasks {
err = c.TaskFinished(task.ID)
if err != nil {
t.Fatal(i, err)
}
}
}
for i := 0; i < 10; i++ {
checkOnePass(i)
}
}

File diff suppressed because it is too large Load Diff

@ -9,5 +9,15 @@ project(cxx_go C Go)
include(golang)
include(flags)
go_library(client STATIC)
go_library(paddle_pserver_cclient STATIC)
if(PROJ_ROOT)
add_custom_command(OUTPUT ${PROJ_ROOT}/paddle/trainer/libpaddle_pserver_cclient.a
COMMAND cp ${CMAKE_BINARY_DIR}/go/pserver/cclient/libpaddle_pserver_cclient.h ${PROJ_ROOT}/paddle/trainer/
COMMAND cp ${CMAKE_BINARY_DIR}/go/pserver/cclient/libpaddle_pserver_cclient.a ${PROJ_ROOT}/paddle/trainer/
WORKING_DIRECTORY ${PROJ_ROOT}/paddle
DEPENDS paddle_pserver_cclient)
add_custom_target(paddle_pserver_cclient_lib ALL DEPENDS ${PROJ_ROOT}/paddle/trainer/libpaddle_pserver_cclient.a)
endif(PROJ_ROOT)
add_subdirectory(test)

File diff suppressed because it is too large Load Diff

@ -1,11 +1,22 @@
cmake_minimum_required(VERSION 3.0)
include_directories(${CMAKE_BINARY_DIR})
add_executable(main main.c)
add_dependencies(main client)
add_dependencies(main paddle_pserver_cclient)
add_executable(test_cclient test_cclient.c)
add_dependencies(test_cclient paddle_pserver_cclient)
if(APPLE)
set(CMAKE_EXE_LINKER_FLAGS "-framework CoreFoundation -framework Security")
else()
set(CMAKE_EXE_LINKER_FLAGS "-pthread")
endif()
target_link_libraries(main ${CMAKE_BINARY_DIR}/libclient.a)
if(PROJ_ROOT)
include_directories(${CMAKE_BINARY_DIR}/go/pserver/cclient/)
target_link_libraries(main ${CMAKE_BINARY_DIR}/go/pserver/cclient/libpaddle_pserver_cclient.a pthread)
target_link_libraries(test_cclient ${CMAKE_BINARY_DIR}/go/pserver/cclient/libpaddle_pserver_cclient.a pthread)
else(PROJ_ROOT)
include_directories(${CMAKE_BINARY_DIR})
target_link_libraries(main ${CMAKE_BINARY_DIR}/libpaddle_pserver_cclient.a pthread)
target_link_libraries(test_cclient ${CMAKE_BINARY_DIR}/libpaddle_pserver_cclient.a pthread)
endif(PROJ_ROOT)

@ -1,68 +1,90 @@
#include <stdio.h>
#include "libclient.h"
#include "libpaddle_pserver_cclient.h"
void fail() {
// TODO(helin): fix: gtest using cmake is not working, using this
// hacky way for now.
printf("test failed.\n");
// TODO(helin): Fix: gtest using cmake is not working, using this
// hacky way for now.
#define fail() \
fprintf(stderr, "info: %s:%d: ", __FILE__, __LINE__); \
exit(-1);
void sendGrads(paddle_pserver_client c) {
unsigned char grad_a[2000] = {2};
unsigned char grad_b[3000] = {3};
paddle_gradient grad1 = {
"param_a", PADDLE_ELEMENT_TYPE_FLOAT32, grad_a, 2000};
paddle_gradient grad2 = {
"param_b", PADDLE_ELEMENT_TYPE_FLOAT32, grad_b, 3000};
paddle_gradient* grads[2] = {&grad1, &grad2};
if (paddle_send_grads(c, grads, 2)) {
fail();
}
}
void getParams(paddle_pserver_client c) {
paddle_parameter param_a;
paddle_parameter param_b;
char name_a[] = "param_a";
char name_b[] = "param_b";
// Must pre-allocate the prameter content before calling paddle_get_params.
unsigned char content_a[2000] = {};
unsigned char content_b[3000] = {};
param_a.element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
param_a.name = name_a;
param_a.content = content_a;
param_a.content_len = 2000;
param_b.element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
param_b.name = name_b;
param_b.content = content_b;
param_b.content_len = 3000;
paddle_parameter* params[2] = {&param_a, &param_b};
if (paddle_get_params(c, params, 2)) {
fail();
}
}
int main() {
char addr[] = "localhost:3000";
client c = paddle_new_pserver_client(addr, 1);
paddle_pserver_client c = paddle_new_pserver_client(addr, 1);
retry:
if (paddle_begin_init_params(c)) {
paddle_parameter param;
char name_a[] = "param_a";
char name_b[] = "param_b";
unsigned char content[] = {0x00, 0x11, 0x22};
unsigned char content_a[2000] = {1};
unsigned char content_b[3000] = {0};
param.element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
param.name = name_a;
param.content = content;
param.content_len = 3;
if (paddle_init_param(c, param, NULL, 0) != 0) {
param.content = content_a;
param.content_len = 2000;
int error = paddle_init_param(c, param, NULL, 0);
if (error != 0) {
goto retry;
}
param.element_type = PADDLE_ELEMENT_TYPE_INT32;
param.element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
param.name = name_b;
param.content = content;
param.content_len = 3;
if (paddle_init_param(c, param, NULL, 0) != 0) {
param.content = content_b;
param.content_len = 3000;
error = paddle_init_param(c, param, NULL, 0);
if (error != 0) {
goto retry;
}
if (paddle_finish_init_params(c) != 0) {
error = paddle_finish_init_params(c);
if (error != 0) {
goto retry;
}
} else {
fail();
}
unsigned char content[] = {0x00, 0x11, 0x22};
paddle_gradient grads[2] = {
{"param_a", PADDLE_ELEMENT_TYPE_INT32, content, 3},
{"param_b", PADDLE_ELEMENT_TYPE_FLOAT32, content, 3}};
if (!paddle_send_grads(c, grads, 2)) {
fail();
}
paddle_parameter* params[2] = {NULL, NULL};
char* names[] = {"param_a", "param_b"};
if (!paddle_get_params(c, names, params, 2)) {
fail();
int i;
for (i = 0; i < 100; i++) {
sendGrads(c);
getParams(c);
}
// get parameters again by reusing the allocated parameter buffers.
if (!paddle_get_params(c, names, params, 2)) {
fail();
}
paddle_release_param(params[0]);
paddle_release_param(params[1]);
if (!paddle_save_model(c, "/tmp/")) {
if (paddle_save_model(c, "/tmp/")) {
fail();
}

@ -0,0 +1,117 @@
#include <stdio.h>
#include <stdlib.h>
#include "libpaddle_pserver_cclient.h"
typedef float real;
void fail() {
// TODO(helin): fix: gtest using cmake is not working, using this
// hacky way for now.
printf("test failed.\n");
exit(-1);
}
void print_parameter(paddle_gradient* param) {
if (param == NULL) {
printf("param is NULL!!\n");
} else {
printf("==== parameter ====\n");
printf("name: %s\n", param->name);
printf("content_len: %d\n", param->content_len);
printf("content_type: %d\n", param->element_type);
int i;
for (i = 0; i < param->content_len / (int)sizeof(real); ++i) {
printf("%f ", ((float*)param->content)[i]);
}
printf("\n\n");
}
}
int main() {
char addr[] = "localhost:3000";
paddle_pserver_client c = paddle_new_pserver_client(addr, 1);
char* names[] = {"param_a", "param_b"};
retry:
printf("init parameter to pserver:\n");
real param_content1[] = {0.1, 0.2, 0.3};
real param_content2[] = {0.4, 0.5, 0.6};
paddle_parameter** params =
(paddle_parameter**)malloc(sizeof(paddle_parameter*) * 2);
params[0] = (paddle_parameter*)malloc(sizeof(paddle_parameter));
params[0]->name = names[0];
params[0]->content = (unsigned char*)param_content1;
params[0]->content_len = 3 * sizeof(real);
params[0]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
params[1] = (paddle_parameter*)malloc(sizeof(paddle_parameter));
params[1]->name = names[1];
params[1]->content = (unsigned char*)param_content2;
params[1]->content_len = 3 * sizeof(real);
params[1]->element_type = PADDLE_ELEMENT_TYPE_INT32;
if (paddle_begin_init_params(c)) {
if (paddle_init_param(c, *params[0], NULL, 0) != 0) {
goto retry;
}
if (paddle_init_param(c, *params[1], NULL, 0) != 0) {
goto retry;
}
if (paddle_finish_init_params(c) != 0) {
goto retry;
}
} else {
fail();
}
printf("get inited parameters from pserver:\n");
// get parameters again by reusing the allocated parameter buffers.
if (paddle_get_params(c, params, 2) != 0) {
fail();
}
print_parameter(params[0]);
print_parameter(params[1]);
printf("send gradient to pserver:\n");
real gradient_content1[] = {0.01, 0.02, 0.03};
real gradinet_content2[] = {0.04, 0.05, 0.06};
paddle_gradient** grads =
(paddle_gradient**)malloc(sizeof(paddle_gradient*) * 2);
grads[0] = (paddle_gradient*)malloc(sizeof(paddle_gradient));
grads[0]->name = names[0];
grads[0]->content = (unsigned char*)gradient_content1;
grads[0]->content_len = 3 * sizeof(real);
grads[0]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
grads[1] = (paddle_gradient*)malloc(sizeof(paddle_gradient));
grads[1]->name = names[1];
grads[1]->content = (unsigned char*)gradinet_content2;
grads[1]->content_len = 3 * sizeof(real);
grads[1]->element_type = PADDLE_ELEMENT_TYPE_INT32;
printf("print gradient sent to pserver:\n");
print_parameter(grads[0]);
print_parameter(grads[1]);
if (paddle_send_grads(c, grads, 2) != 0) {
fail();
}
printf("get updated parameters from pserver:\n");
// get parameters again by reusing the allocated parameter buffers.
if (paddle_get_params(c, params, 2) != 0) {
fail();
}
print_parameter(params[0]);
print_parameter(params[1]);
if (paddle_save_model(c, "/tmp/") != 0) {
fail();
}
return 0;
}

@ -0,0 +1,131 @@
import paddle.v2 as paddle
import gzip
def softmax_regression(img):
predict = paddle.layer.fc(input=img,
size=10,
act=paddle.activation.Softmax())
return predict
def multilayer_perceptron(img):
# The first fully-connected layer
hidden1 = paddle.layer.fc(input=img, size=128, act=paddle.activation.Relu())
# The second fully-connected layer and the according activation function
hidden2 = paddle.layer.fc(input=hidden1,
size=64,
act=paddle.activation.Relu())
# The thrid fully-connected layer, note that the hidden size should be 10,
# which is the number of unique digits
predict = paddle.layer.fc(input=hidden2,
size=10,
act=paddle.activation.Softmax())
return predict
def convolutional_neural_network(img):
# first conv layer
conv_pool_1 = paddle.networks.simple_img_conv_pool(
input=img,
filter_size=5,
num_filters=20,
num_channel=1,
pool_size=2,
pool_stride=2,
act=paddle.activation.Tanh())
# second conv layer
conv_pool_2 = paddle.networks.simple_img_conv_pool(
input=conv_pool_1,
filter_size=5,
num_filters=50,
num_channel=20,
pool_size=2,
pool_stride=2,
act=paddle.activation.Tanh())
# The first fully-connected layer
fc1 = paddle.layer.fc(input=conv_pool_2,
size=128,
act=paddle.activation.Tanh())
# The softmax layer, note that the hidden size should be 10,
# which is the number of unique digits
predict = paddle.layer.fc(input=fc1,
size=10,
act=paddle.activation.Softmax())
return predict
def main():
paddle.init(use_gpu=False, trainer_count=1)
# define network topology
images = paddle.layer.data(
name='pixel', type=paddle.data_type.dense_vector(784))
label = paddle.layer.data(
name='label', type=paddle.data_type.integer_value(10))
# Here we can build the prediction network in different ways. Please
# choose one by uncomment corresponding line.
predict = softmax_regression(images)
#predict = multilayer_perceptron(images)
#predict = convolutional_neural_network(images)
cost = paddle.layer.classification_cost(input=predict, label=label)
parameters = paddle.parameters.create(cost)
optimizer = paddle.optimizer.Momentum(
learning_rate=0.1 / 128.0,
momentum=0.9,
regularization=paddle.optimizer.L2Regularization(rate=0.0005 * 128))
trainer = paddle.trainer.SGD(cost=cost,
parameters=parameters,
update_equation=optimizer,
is_local=False,
pserver_spec="localhost:3000")
lists = []
def event_handler(event):
if isinstance(event, paddle.event.EndIteration):
if event.batch_id % 1000 == 0:
print "Pass %d, Batch %d, Cost %f, %s" % (
event.pass_id, event.batch_id, event.cost, event.metrics)
elif isinstance(event, paddle.event.EndPass):
result = trainer.test(reader=paddle.batch(
paddle.dataset.mnist.test(), batch_size=128))
print "Test with Pass %d, Cost %f, %s\n" % (
event.pass_id, result.cost, result.metrics)
lists.append((event.pass_id, result.cost,
result.metrics['classification_error_evaluator']))
trainer.train(
reader=paddle.batch(
paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=8192),
batch_size=128),
event_handler=event_handler,
num_passes=100)
# find the best pass
best = sorted(lists, key=lambda list: float(list[1]))[0]
print 'Best pass is %s, testing Avgcost is %s' % (best[0], best[1])
print 'The classification accuracy is %.2f%%' % (100 - float(best[2]) * 100)
test_creator = paddle.dataset.mnist.test()
test_data = []
for item in test_creator():
test_data.append((item[0], ))
if len(test_data) == 100:
break
# output is a softmax layer. It returns probabilities.
# Shape should be (100, 10)
probs = paddle.infer(
output_layer=predict, parameters=parameters, input=test_data)
print probs.shape
if __name__ == '__main__':
main()

@ -0,0 +1,60 @@
import paddle.v2 as paddle
import paddle.v2.dataset.uci_housing as uci_housing
def main():
# init
paddle.init(use_gpu=False, trainer_count=1)
# network config
x = paddle.layer.data(name='x', type=paddle.data_type.dense_vector(13))
y_predict = paddle.layer.fc(input=x,
param_attr=paddle.attr.Param(name='w'),
size=1,
act=paddle.activation.Linear(),
bias_attr=paddle.attr.Param(name='b'))
y = paddle.layer.data(name='y', type=paddle.data_type.dense_vector(1))
cost = paddle.layer.mse_cost(input=y_predict, label=y)
# create parameters
parameters = paddle.parameters.create(cost)
# create optimizer
optimizer = paddle.optimizer.Momentum(momentum=0)
trainer = paddle.trainer.SGD(cost=cost,
parameters=parameters,
update_equation=optimizer,
is_local=False,
pserver_spec="localhost:3000")
# event_handler to print training and testing info
def event_handler(event):
if isinstance(event, paddle.event.EndIteration):
if event.batch_id % 100 == 0:
print "Pass %d, Batch %d, Cost %f" % (
event.pass_id, event.batch_id, event.cost)
if isinstance(event, paddle.event.EndPass):
if (event.pass_id + 1) % 10 == 0:
result = trainer.test(
reader=paddle.batch(
uci_housing.test(), batch_size=2),
feeding={'x': 0,
'y': 1})
print "Test %d, %.2f" % (event.pass_id, result.cost)
# training
trainer.train(
reader=paddle.batch(
paddle.reader.shuffle(
uci_housing.train(), buf_size=500),
batch_size=2),
feeding={'x': 0,
'y': 1},
event_handler=event_handler,
num_passes=30)
if __name__ == '__main__':
main()

@ -6,7 +6,7 @@ import (
"sort"
"time"
"github.com/PaddlePaddle/Paddle/go/pserver/internal/connection"
"github.com/PaddlePaddle/Paddle/go/connection"
)
// TODO(helin): add RPC call retry logic
@ -47,7 +47,7 @@ func NewClient(l Lister, pserverNum int, sel Selector) *Client {
// monitorPservers monitors pserver addresses, and updates connection
// when the address changes.
func (c *Client) monitorPservers(l Lister, pserverNum int) {
knownServers := make([]Server, pserverNum)
lastServers := make([]Server, pserverNum)
ticker := time.NewTicker(10 * time.Second)
monitor := func() {
curServers := make([]Server, pserverNum)
@ -56,25 +56,37 @@ func (c *Client) monitorPservers(l Lister, pserverNum int) {
curServers[l.Index] = l
}
for i := range knownServers {
if knownServers[i].Addr != curServers[i].Addr {
err := c.pservers[i].Connect(curServers[i].Addr)
for i := range lastServers {
if lastServers[i].Addr == curServers[i].Addr {
continue
}
if curServers[i].Addr == "" {
err := c.pservers[i].Close()
if err != nil {
log.Println(err)
// connect to addr failed, set
// to last known addr in order
// to retry next time.
curServers[i].Addr = knownServers[i].Addr
}
continue
}
err := c.pservers[i].Connect(curServers[i].Addr)
if err != nil {
log.Println(err)
// connect to addr failed, set
// to last known addr in order
// to retry next time.
curServers[i].Addr = lastServers[i].Addr
}
}
knownServers = curServers
lastServers = curServers
}
monitor()
for _ = range ticker.C {
for range ticker.C {
monitor()
}
}
@ -93,16 +105,14 @@ func (c *Client) BeginInitParams() bool {
// InitParam initializes the parameter on parameter servers.
func (c *Client) InitParam(paramWithConfigs ParameterWithConfig) error {
var dummy int
return c.pservers[c.partition(paramWithConfigs.Param.Name)].Call("Service.InitParam", paramWithConfigs, &dummy)
return c.pservers[c.partition(paramWithConfigs.Param.Name)].Call("Service.InitParam", paramWithConfigs, nil)
}
// FinishInitParams tells parameter servers client has sent all
// parameters to parameter servers as initialization.
func (c *Client) FinishInitParams() error {
for _, p := range c.pservers {
var dummy int
err := p.Call("Service.FinishInitParams", dummy, &dummy)
err := p.Call("Service.FinishInitParams", 0, nil)
if err != nil {
return err
}
@ -116,8 +126,7 @@ func (c *Client) SendGrads(grads []Gradient) error {
errCh := make(chan error, len(grads))
for _, g := range grads {
go func(g Gradient) {
var dummy int
err := c.pservers[c.partition(g.Name)].Call("Service.SendGrad", g, &dummy)
err := c.pservers[c.partition(g.Name)].Call("Service.SendGrad", g, nil)
errCh <- err
}(g)
}
@ -196,8 +205,7 @@ func (c *Client) Save(path string) error {
errCh := make(chan error, len(c.pservers))
for _, p := range c.pservers {
var dummy int
err := p.Call("Service.Save", path, &dummy)
err := p.Call("Service.Save", path, nil)
errCh <- err
}

@ -117,7 +117,7 @@ func TestClientFull(t *testing.T) {
for i := range params {
if names[i] != params[i].Name {
t.Fatalf("order of returned parameter does not required: parameter name: %s, required name: %s", names[i], params[i])
t.Fatalf("order of returned parameter does not required: parameter name: %s, required name: %s", names[i], params[i].Name)
}
}
}

@ -32,7 +32,13 @@ int update_SGD(void* optimizer,
const void* gradient,
int num_bytes) {
SGD_optimizer* o = (SGD_optimizer*)optimizer;
// TODO
float* parameter = (float*)buffer;
float* grad = (float*)gradient;
int i;
for (i = 0; i < num_bytes / sizeof(float); ++i) {
parameter[i] -= o->learning_rate * grad[i];
}
return 0;
}

@ -9,8 +9,10 @@ import (
// ElementType is the type of elements of a Parameter.
type ElementType int
var ErrAlreadyInitialized = errors.New("pserver already initialized")
var ErrUninitialized = errors.New("pserver not fully initialized")
const (
AlreadyInitialized = "pserver already initialized"
Uninitialized = "pserver not fully initialized"
)
// Supported element types
const (
@ -49,7 +51,7 @@ type Service struct {
// NewService creates a new service.
func NewService() *Service {
s := &Service{opt: newOptimizer(sgd, 0.01)}
s := &Service{opt: newOptimizer(sgd, 0.005)}
s.paramMap = make(map[string]Parameter)
s.initialized = make(chan struct{})
return s
@ -59,7 +61,7 @@ func NewService() *Service {
func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, dummy *int) error {
select {
case <-s.initialized:
return ErrAlreadyInitialized
return errors.New(AlreadyInitialized)
default:
}
@ -80,7 +82,7 @@ func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, dummy *int) er
func (s *Service) FinishInitParams(dummy0 int, dummy1 *int) error {
select {
case <-s.initialized:
return ErrAlreadyInitialized
return errors.New(AlreadyInitialized)
default:
}
@ -94,7 +96,7 @@ func (s *Service) SendGrad(g Gradient, dummy *int) error {
select {
case <-s.initialized:
default:
return ErrUninitialized
return errors.New(Uninitialized)
}
s.mu.Lock()

@ -15,8 +15,7 @@ func TestFull(t *testing.T) {
p.Name = "param_a"
p.Content = []byte{1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0}
p.ElementType = pserver.Int32
var dummy int
err := s.InitParam(pserver.ParameterWithConfig{p, nil}, &dummy)
err := s.InitParam(pserver.ParameterWithConfig{Param: p, Config: nil}, nil)
if err != nil {
t.FailNow()
}
@ -25,12 +24,12 @@ func TestFull(t *testing.T) {
p1.Name = "param_b"
p1.Content = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
p1.ElementType = pserver.Float32
err = s.InitParam(pserver.ParameterWithConfig{p1, nil}, &dummy)
err = s.InitParam(pserver.ParameterWithConfig{Param: p1, Config: nil}, nil)
if err != nil {
t.FailNow()
}
err = s.FinishInitParams(0, &dummy)
err = s.FinishInitParams(0, nil)
if err != nil {
t.FailNow()
}
@ -46,11 +45,11 @@ func TestFull(t *testing.T) {
}
g1, g2 := pserver.Gradient(p1), pserver.Gradient(p)
err = s.SendGrad(g1, &dummy)
err = s.SendGrad(g1, nil)
if err != nil {
t.FailNow()
}
err = s.SendGrad(g2, &dummy)
err = s.SendGrad(g2, nil)
if err != nil {
t.FailNow()
@ -74,23 +73,21 @@ func TestFull(t *testing.T) {
func TestMultipleInit(t *testing.T) {
s := pserver.NewService()
var dummy int
err := s.FinishInitParams(0, &dummy)
err := s.FinishInitParams(0, nil)
if err != nil {
t.FailNow()
}
err = s.FinishInitParams(0, &dummy)
if err != pserver.ErrAlreadyInitialized {
err = s.FinishInitParams(0, nil)
if err.Error() != pserver.AlreadyInitialized {
t.FailNow()
}
}
func TestUninitialized(t *testing.T) {
s := pserver.NewService()
var dummy int
err := s.SendGrad(pserver.Gradient{}, &dummy)
if err != pserver.ErrUninitialized {
err := s.SendGrad(pserver.Gradient{}, nil)
if err.Error() != pserver.Uninitialized {
t.FailNow()
}
}
@ -98,13 +95,14 @@ func TestUninitialized(t *testing.T) {
func TestBlockUntilInitialized(t *testing.T) {
s := pserver.NewService()
ch := make(chan struct{}, 2)
errCh := make(chan error, 2)
var wg sync.WaitGroup
wg.Add(1)
go func() {
var param pserver.Parameter
err := s.GetParam("param_a", &param)
if err != nil {
t.FailNow()
errCh <- err
}
wg.Done()
ch <- struct{}{}
@ -112,10 +110,9 @@ func TestBlockUntilInitialized(t *testing.T) {
wg.Add(1)
go func() {
var dummy int
err := s.Save("", &dummy)
err := s.Save("", nil)
if err != nil {
t.FailNow()
errCh <- err
}
wg.Done()
ch <- struct{}{}
@ -127,6 +124,8 @@ func TestBlockUntilInitialized(t *testing.T) {
case <-ch:
// some function returned before initialization is completed.
t.FailNow()
case <-errCh:
t.FailNow()
default:
}
@ -134,13 +133,12 @@ func TestBlockUntilInitialized(t *testing.T) {
p.Name = "param_a"
p.Content = []byte{1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0}
p.ElementType = pserver.Int32
var dummy int
err := s.InitParam(pserver.ParameterWithConfig{p, nil}, &dummy)
err := s.InitParam(pserver.ParameterWithConfig{Param: p, Config: nil}, nil)
if err != nil {
t.FailNow()
}
err = s.FinishInitParams(0, &dummy)
err = s.FinishInitParams(0, nil)
if err != nil {
t.FailNow()
}

@ -16,7 +16,7 @@ set(API_HEADER
Internal.h)
add_library(paddle_api STATIC ${API_SOURCES})
add_dependencies(paddle_api gen_proto_cpp)
add_dependencies(paddle_api gen_proto_cpp paddle_pserver_cclient_lib)
INCLUDE(${SWIG_USE_FILE})
INCLUDE_DIRECTORIES(${PROJ_ROOT}/paddle)
@ -45,7 +45,7 @@ SET(SWIG_MODULE_swig_paddle_EXTRA_DEPS
)
IF(APPLE)
SET(MACOS_LD_FLAGS "-undefined dynamic_lookup -Wl,-all_load")
SET(MACOS_LD_FLAGS "-undefined dynamic_lookup -Wl,-all_load -framework CoreFoundation -framework Security")
ELSE(APPLE)
SET(START_GROUP "-Xlinker -start-group")
SET(END_GROUP "-Xlinker -end-group")

@ -179,6 +179,7 @@ namespace std {
%newobject ParameterOptimizer::needSpecialTraversal;
%newobject ParameterUpdater::createLocalUpdater;
%newobject ParameterUpdater::createRemoteUpdater;
%newobject ParameterUpdater::createNewRemoteUpdater;
%feature("director") UpdateCallback;
%feature("autodoc", 1); // To generate method stub, for code hint in ide

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save