Merge branch 'develop' of https://github.com/baidu/Paddle into ImageExpandFunction

cblas_new
hedaoyuan 8 years ago
commit 69271c92d5

@ -50,6 +50,7 @@ before_install:
# protobuf version. # protobuf version.
- pip install numpy wheel 'protobuf==3.1' sphinx==1.5.6 recommonmark sphinx-rtd-theme==0.1.9 virtualenv pre-commit requests==2.9.2 LinkChecker - pip install numpy wheel 'protobuf==3.1' sphinx==1.5.6 recommonmark sphinx-rtd-theme==0.1.9 virtualenv pre-commit requests==2.9.2 LinkChecker
- pip install rarfile - pip install rarfile
- eval "$(GIMME_GO_VERSION=1.8.3 gimme)"
- | - |
function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; } function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; }
script: script:

@ -126,7 +126,9 @@ endif(WITH_GPU)
add_subdirectory(proto) add_subdirectory(proto)
add_subdirectory(paddle) add_subdirectory(paddle)
add_subdirectory(go/master/c)
add_subdirectory(python) add_subdirectory(python)
add_subdirectory(go/pserver/cclient)
if(WITH_DOC) if(WITH_DOC)
add_subdirectory(doc) add_subdirectory(doc)

@ -74,14 +74,25 @@ typedef enum {
typedef struct { typedef struct {
char* name; char* name;
paddle_element_type element_type; paddle_element_type element_type;
void* content; unsigned char* content;
int content_len; int content_len;
} paddle_parameter, paddle_gradient; } paddle_parameter, paddle_gradient;
typedef struct paddle_pserver_client paddle_pserver_client; typedef int paddle_pserver_client;
paddle_pserver_client* paddle_new_pserver_client(); /**
void paddle_pserver_client_release(paddle_pserver_client* client); * @brief creates a pserver client that talks to etcd for coordination.
*/
paddle_pserver_client paddle_new_etcd_pserver_client(char* etcd_addr);
/**
* @brief creates a pserver client given pserver addresses.
*
* @param pserver_addrs comma-separated pserver addresses.
* @param selected if current pserver client is selected to initialize all parameter servers.
*/
paddle_pserver_client paddle_new_pserver_client(char* pserver_addrs, int selected);
void paddle_pserver_client_release(paddle_pserver_client c);
/** /**
* @brief paddle_begin_init_params begins to initialize parameters on * @brief paddle_begin_init_params begins to initialize parameters on
@ -95,7 +106,7 @@ void paddle_pserver_client_release(paddle_pserver_client* client);
* @return 1 if the trainer is selected to initialize parameter * @return 1 if the trainer is selected to initialize parameter
* servers, otherwise 0. * servers, otherwise 0.
*/ */
int paddle_begin_init_params(paddle_pserver_client* client); int paddle_begin_init_params(paddle_pserver_client client);
/** /**
* @brief paddle_init_param initializes the parameter on parameter * @brief paddle_init_param initializes the parameter on parameter
@ -109,7 +120,7 @@ int paddle_begin_init_params(paddle_pserver_client* client);
* @paddle_begin_init_param). Or simply exit the program and wait for * @paddle_begin_init_param). Or simply exit the program and wait for
* the cluster management system to restart the trainer. * the cluster management system to restart the trainer.
*/ */
int paddle_init_param(paddle_pserver_client* client, paddle_parameter param, const unsigned char* param_config_proto, int config_len); int paddle_init_param(paddle_pserver_client client, paddle_parameter param, const unsigned char* param_config_proto, int config_len);
/** /**
* @brief paddle_finish_init_params tells parameter servers client has * @brief paddle_finish_init_params tells parameter servers client has
@ -120,7 +131,7 @@ int paddle_init_param(paddle_pserver_client* client, paddle_parameter param, con
* @paddle_begin_init_param). Or simply exit the program and wait for * @paddle_begin_init_param). Or simply exit the program and wait for
* the cluster management system to restart the trainer. * the cluster management system to restart the trainer.
*/ */
int paddle_finish_init_params(paddle_pserver_client* client); int paddle_finish_init_params(paddle_pserver_client client);
/** /**
* @brief paddle_send_grads sends gradients to parameter servers for * @brief paddle_send_grads sends gradients to parameter servers for
@ -131,7 +142,7 @@ int paddle_finish_init_params(paddle_pserver_client* client);
* @param learning_rate the learning rate for the gradients. * @param learning_rate the learning rate for the gradients.
* @return 0 if successful, otherwise -1. * @return 0 if successful, otherwise -1.
*/ */
int paddle_send_grads(paddle_pserver_client* client, const paddle_gradient* grads, int len); int paddle_send_grads(paddle_pserver_client client, const paddle_gradient* grads, int len);
/** /**
* @brief paddle_get_params gets parameters from parameter servers. * @brief paddle_get_params gets parameters from parameter servers.
@ -139,13 +150,15 @@ int paddle_send_grads(paddle_pserver_client* client, const paddle_gradient* grad
* paddle_get_params will block until parameters are initialized on * paddle_get_params will block until parameters are initialized on
* the parameter servers. * the parameter servers.
* *
* @param names the array of names of the parameters to get. * @param dst the destination array of parameter pointers to save to.
* @param dst the destination array of parameters to save to. * The parameter pointer must be pre-popullated with required parameter name,
* and the content of parameter must be pre-allocated of the size of required
* parameter on pserver.
* @param len the length of the names array and the paddle_parameter * @param len the length of the names array and the paddle_parameter
* array. * array.
* @return 0 if successful, otherwise -1. * @return 0 if successful, otherwise -1.
*/ */
int paddle_get_params(paddle_pserver_client* client, const char** names, paddle_parameter* dst, int len); int paddle_get_params(paddle_pserver_client client, paddle_parameter** dst, int len);
/** /**
* @brief paddle_save_model indicates parameters to save the parameter * @brief paddle_save_model indicates parameters to save the parameter
@ -154,5 +167,5 @@ int paddle_get_params(paddle_pserver_client* client, const char** names, paddle_
* @param path the path to save parameters. * @param path the path to save parameters.
* @return 0 if successful, otherwise -1. * @return 0 if successful, otherwise -1.
*/ */
int paddle_save_model(paddle_pserver_client* client, const char* path); int paddle_save_model(paddle_pserver_client client, const char* path);
``` ```

@ -0,0 +1,21 @@
# Design Doc: Remote Parameter Updater for Cluster Train
For an overview of distribute training, please refer to [distributed training design doc](README.md). In this design doc, we will discuss the parameter updater that will use parameter server cclient [The Client Library of Parameter Server Design Doc](pserver_client.md) to manage and update parameters.
## Parameter Updater
Parameter Updater is used by trainer to manage and update parameter, there are mainly two kind of parameter updater: local and remote, since this design is for cluster train, we will only discuss remote parameter updater here.
### Remote Parameter Updater
Remote Parameter Updater manage parameters through remote parameter server with the client that communicate with pserver([The Client Library of Parameter Server Design Doc](pserver_client.md))
In PaddlePaddle Python V2 API, trainer is implemented in python, and the trainer will hold a instance of parameter updater and call it's functions directly. In this design, we will also expose the api of RemoteParameterUpdater to python with swig.
#### Sparse Remote Parameter Updater
Since we will only implement dense parameter management new, the mechanism for sparse parameter will be discussed in next stage.
### Interface Design
TBD

@ -22,6 +22,7 @@ To compile the source code, your computer must be equipped with the following de
- **CMake**: CMake >= 3.0 (at least CMake 3.4 on Mac OS X) - **CMake**: CMake >= 3.0 (at least CMake 3.4 on Mac OS X)
- **BLAS**: MKL, OpenBlas or ATLAS - **BLAS**: MKL, OpenBlas or ATLAS
- **Python**: only support Python 2.7 - **Python**: only support Python 2.7
- **Go**
**Note:** For CUDA 7.0 and CUDA 7.5, GCC 5.0 and up are not supported! **Note:** For CUDA 7.0 and CUDA 7.5, GCC 5.0 and up are not supported!
For CUDA 8.0, GCC versions later than 5.3 are not supported! For CUDA 8.0, GCC versions later than 5.3 are not supported!
@ -107,6 +108,18 @@ As a simple example, consider the following:
sudo apt-get install -y python python-pip python-numpy libpython-dev bison sudo apt-get install -y python python-pip python-numpy libpython-dev bison
sudo pip install 'protobuf==3.1.0.post1' sudo pip install 'protobuf==3.1.0.post1'
# Install Go
# You can follow https://golang.org/doc/install for a detailed explanation.
wget -O go.tgz https://storage.googleapis.com/golang/go1.8.1.linux-amd64.tar.gz && \
tar -C $HOME -xzf go.tgz && \
mkdir $HOME/gopath && \
rm go.tgz
# Setup environment variables
export GOROOT=$HOME/go
export GOPATH=$HOME/gopath
export PATH=$PATH:$GOROOT/bin
# install cmake 3.4 # install cmake 3.4
curl -sSL https://cmake.org/files/v3.4/cmake-3.4.1.tar.gz | tar -xz && \ curl -sSL https://cmake.org/files/v3.4/cmake-3.4.1.tar.gz | tar -xz && \
cd cmake-3.4.1 && ./bootstrap && make -j4 && sudo make install && \ cd cmake-3.4.1 && ./bootstrap && make -j4 && sudo make install && \

@ -4,6 +4,7 @@ RNN相关模型
.. toctree:: .. toctree::
:maxdepth: 1 :maxdepth: 1
rnn_config_cn.rst
recurrent_group_cn.md recurrent_group_cn.md
hierarchical_layer_cn.rst hierarchical_layer_cn.rst
hrnn_rnn_api_compare_cn.rst hrnn_rnn_api_compare_cn.rst

@ -1,2 +1,7 @@
RNN Models RNN Models
========== ==========
.. toctree::
:maxdepth: 1
rnn_config_en.rst

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -17,7 +17,7 @@ function(GO_LIBRARY NAME BUILD_TYPE)
endif() endif()
file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go") file(GLOB GO_SOURCE RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.go")
file(RELATIVE_PATH rel ${CMAKE_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR}) file(RELATIVE_PATH rel ${CMAKE_CURRENT_BINARY_DIR} ${CMAKE_CURRENT_SOURCE_DIR})
# find Paddle directory. # find Paddle directory.
get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY) get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
@ -26,25 +26,23 @@ function(GO_LIBRARY NAME BUILD_TYPE)
# automatically get all dependencies specified in the source code # automatically get all dependencies specified in the source code
# for given target. # for given target.
add_custom_target(goGet env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get -d ${rel}/...) add_custom_target(${NAME}_goGet env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} get -d ${rel}/...)
# make a symlink that references Paddle inside $GOPATH, so go get # make a symlink that references Paddle inside $GOPATH, so go get
# will use the local changes in Paddle rather than checkout Paddle # will use the local changes in Paddle rather than checkout Paddle
# in github. # in github.
add_custom_target(copyPaddle add_custom_target(${NAME}_copyPaddle
COMMAND ln -sf ${PADDLE_DIR} ${PADDLE_IN_GOPATH}) COMMAND rm -rf ${PADDLE_IN_GOPATH}/Paddle
add_dependencies(goGet copyPaddle) COMMAND ln -sf ${PADDLE_DIR} ${PADDLE_IN_GOPATH}/Paddle)
add_dependencies(${NAME}_goGet ${NAME}_copyPaddle)
add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp add_custom_command(OUTPUT ${OUTPUT_DIR}/.timestamp
COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE} COMMAND env GOPATH=${GOPATH} ${CMAKE_Go_COMPILER} build ${BUILD_MODE}
-o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}" -o "${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME}"
${CMAKE_GO_FLAGS} ${GO_SOURCE} ${CMAKE_GO_FLAGS} ${GO_SOURCE}
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}) WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR})
add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN}) add_custom_target(${NAME} ALL DEPENDS ${OUTPUT_DIR}/.timestamp ${ARGN})
add_dependencies(${NAME} goGet) add_dependencies(${NAME} ${NAME}_goGet)
if(NOT BUILD_TYPE STREQUAL "STATIC")
install(PROGRAMS ${CMAKE_CURRENT_BINARY_DIR}/${LIB_NAME} DESTINATION bin)
endif()
endfunction(GO_LIBRARY) endfunction(GO_LIBRARY)

@ -1,80 +1,32 @@
package main package main
import ( import (
"fmt"
"net" "net"
"net/http" "net/http"
"net/rpc" "net/rpc"
"os"
"path/filepath"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/namsral/flag" "github.com/namsral/flag"
"github.com/PaddlePaddle/Paddle/go/master" "github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/recordio"
) )
func main() { func main() {
port := flag.Int("port", 8080, "port of the master server.") port := flag.Int("port", 8080, "port of the master server.")
dataset := flag.String("training_dataset", "", "dataset: comma separated path to RecordIO paths, supports golb patterns.")
faultTolerance := flag.Bool("fault_tolerance", false, "enable fault tolerance (requires etcd).") faultTolerance := flag.Bool("fault_tolerance", false, "enable fault tolerance (requires etcd).")
taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.") taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.")
taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.") taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.")
chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.") chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.")
flag.Parse() flag.Parse()
if *dataset == "" {
panic("no dataset specified.")
}
if *faultTolerance { if *faultTolerance {
panic("fault tolernance not implemented.") panic("fault tolernance not implemented.")
}
var chunks []master.Chunk
var paths []string
ss := strings.Split(*dataset, ",")
fmt.Println(ss)
for _, s := range ss {
match, err := filepath.Glob(s)
if err != nil {
panic(err)
}
paths = append(paths, match...)
}
if len(paths) == 0 {
panic("no valid datset specified.")
}
idx := 0
for _, path := range paths {
f, err := os.Open(path)
if err != nil {
panic(err)
}
index, err := recordio.LoadIndex(f)
if err != nil {
panic(err)
}
f.Close()
count := index.NumChunks()
for i := 0; i < count; i++ {
chunk := master.Chunk{
Idx: idx,
Path: path,
Index: *index.ChunkIndex(i),
}
chunks = append(chunks, chunk)
}
} }
s := master.NewService(chunks, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax) s := master.NewService(*chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
err := rpc.Register(s) err := rpc.Register(s)
if err != nil { if err != nil {
panic(err) panic(err)

@ -4,6 +4,8 @@ import (
"errors" "errors"
"net/rpc" "net/rpc"
"sync" "sync"
log "github.com/sirupsen/logrus"
) )
// TODO(helin): add TCP re-connect logic // TODO(helin): add TCP re-connect logic
@ -21,6 +23,18 @@ func New() *Conn {
return c return c
} }
// Close closes the connection.
func (c *Conn) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.client == nil {
return nil
}
return c.client.Close()
}
// Connect connects the connection to a address. // Connect connects the connection to a address.
func (c *Conn) Connect(addr string) error { func (c *Conn) Connect(addr string) error {
c.mu.Lock() c.mu.Lock()
@ -50,12 +64,20 @@ func (c *Conn) Connect(addr string) error {
c.waitConn = nil c.waitConn = nil
} }
} else { } else {
err := client.Close()
if err != nil {
log.Errorln(err)
}
return errors.New("client already set from a concurrent goroutine") return errors.New("client already set from a concurrent goroutine")
} }
return nil return nil
} }
// TODO(helin): refactor Call to be able to perform given retry
// policy.
// Call make a RPC call. // Call make a RPC call.
// //
// Call will be blocked until the connection to remote RPC service // Call will be blocked until the connection to remote RPC service

@ -0,0 +1,21 @@
cmake_minimum_required(VERSION 3.0)
get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY)
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PARENT_DIR}/cmake")
project(cxx_go C Go)
include(golang)
include(flags)
set(MASTER_LIB_NAME "paddle_master")
go_library(${MASTER_LIB_NAME} SHARED)
if(PROJ_ROOT)
add_custom_command(OUTPUT ${PROJ_ROOT}/python/paddle/v2/master/lib${MASTER_LIB_NAME}.so
COMMAND rm ${CMAKE_CURRENT_BINARY_DIR}/lib${MASTER_LIB_NAME}.h
COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/lib${MASTER_LIB_NAME}.so ${PROJ_ROOT}/python/paddle/v2/master/
DEPENDS ${MASTER_LIB_NAME})
add_custom_target(paddle_master_shared ALL DEPENDS ${PROJ_ROOT}/python/paddle/v2/master/lib${MASTER_LIB_NAME}.so)
endif(PROJ_ROOT)

@ -0,0 +1,110 @@
package main
/*
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#define PADDLE_MASTER_OK 0
#define PADDLE_MASTER_ERROR -1
typedef int paddle_master_client;
*/
import "C"
import (
"sync"
"unsafe"
"github.com/PaddlePaddle/Paddle/go/master"
log "github.com/sirupsen/logrus"
)
var nullPtr = unsafe.Pointer(uintptr(0))
var mu sync.Mutex
var handleMap = make(map[C.paddle_master_client]*master.Client)
var curHandle C.paddle_master_client
func add(c *master.Client) C.paddle_master_client {
mu.Lock()
defer mu.Unlock()
client := curHandle
curHandle++
handleMap[client] = c
return client
}
func get(client C.paddle_master_client) *master.Client {
mu.Lock()
defer mu.Unlock()
return handleMap[client]
}
func remove(client C.paddle_master_client) *master.Client {
mu.Lock()
defer mu.Unlock()
h := handleMap[client]
delete(handleMap, client)
return h
}
type addresser string
func (a addresser) Address() string {
return string(a)
}
//export paddle_new_master_client
func paddle_new_master_client(addr *C.char, bufSize int) C.paddle_master_client {
a := C.GoString(addr)
c := master.NewClient(addresser(a), bufSize)
return add(c)
}
//export paddle_release_master_client
func paddle_release_master_client(client C.paddle_master_client) {
remove(client)
}
//export paddle_set_dataset
func paddle_set_dataset(client C.paddle_master_client, path **C.char, size C.int) C.int {
c := get(client)
var paths []string
for i := 0; i < int(size); i++ {
ptr := (**C.char)(unsafe.Pointer(uintptr(unsafe.Pointer(path)) + uintptr(i)*unsafe.Sizeof(*path)))
str := C.GoString(*ptr)
paths = append(paths, str)
}
err := c.SetDataset(paths)
if err != nil {
log.Errorln(err)
return C.PADDLE_MASTER_ERROR
}
return C.PADDLE_MASTER_OK
}
//export paddle_next_record
func paddle_next_record(client C.paddle_master_client, record **C.uchar) C.int {
c := get(client)
r := c.NextRecord()
if len(r) == 0 {
*record = (*C.uchar)(nullPtr)
return 0
}
size := C.size_t(len(r))
*record = (*C.uchar)(C.malloc(size))
C.memcpy(unsafe.Pointer(*record), unsafe.Pointer(&r[0]), size)
return C.int(size)
}
//export mem_free
func mem_free(p unsafe.Pointer) {
// "free" may be a better name for this function, but doing so
// will cause calling any function of this library from Python
// ctypes hanging.
C.free(p)
}
func main() {}

@ -0,0 +1,137 @@
package master
import (
"os"
"time"
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/recordio"
log "github.com/sirupsen/logrus"
)
// Addresser provide the address of the master server.
type Addresser interface {
Address() string
}
// Client is the client of the master server.
type Client struct {
conn *connection.Conn
ch chan []byte
}
// NewClient creates a new Client.
//
// bufSize is the record buffer size. NextRecord will read from this
// buffer.
func NewClient(addr Addresser, bufSize int) *Client {
c := &Client{}
c.conn = connection.New()
c.ch = make(chan []byte, bufSize)
go c.monitorMaster(addr)
go c.getRecords()
return c
}
func (c *Client) getRecords() {
for {
t, err := c.getTask()
if err != nil {
// TODO(helin): wait before move on with next
// getTask call.
log.Errorln(err)
continue
}
for _, chunk := range t.Chunks {
f, err := os.Open(chunk.Path)
if err != nil {
log.Errorln(err)
continue
}
s := recordio.NewRangeScanner(f, &chunk.Index, -1, -1)
for s.Scan() {
c.ch <- s.Record()
}
if s.Err() != nil {
log.Errorln(err, chunk.Path)
}
err = f.Close()
if err != nil {
log.Errorln(err)
}
}
// We treat a task as finished whenever the last data
// instance of the task is read. This is not exactly
// correct, but a reasonable approximation.
c.taskFinished(t.ID)
}
}
func (c *Client) monitorMaster(addr Addresser) {
lastMaster := ""
monitor := func() {
// get the lastest address of the master server,
// connect to the new address once address changed.
curMaster := addr.Address()
if curMaster != lastMaster {
if curMaster == "" {
err := c.conn.Close()
if err != nil {
log.Errorln(err)
}
} else {
err := c.conn.Connect(curMaster)
if err != nil {
log.Errorln(err)
// connect to addr failed, set
// to last known addr in order
// to retry next time.
curMaster = lastMaster
}
}
}
lastMaster = curMaster
}
monitor()
ticker := time.NewTicker(10 * time.Second)
for _ = range ticker.C {
monitor()
}
}
// SetDataset set dataset for the master server to dispatch.
//
// SetDataset can be call multiple times from different nodes. But
// only the first call will be honored.
func (c *Client) SetDataset(globPaths []string) error {
return c.conn.Call("Service.SetDataset", globPaths, nil)
}
// getTask gets a new task from the master server.
func (c *Client) getTask() (Task, error) {
var t Task
err := c.conn.Call("Service.GetTask", 0, &t)
return t, err
}
// TaskFinished tells the master server a task is finished.
func (c *Client) taskFinished(taskID int) error {
return c.conn.Call("Service.TaskFinished", taskID, nil)
}
// NextRecord returns next record in the dataset.
//
// NextRecord will block until the next record is available. It is
// thread-safe.
func (c *Client) NextRecord() []byte {
return <-c.ch
}

@ -0,0 +1,121 @@
package master
import (
"fmt"
"net"
"net/http"
"net/rpc"
"os"
"strconv"
"strings"
"testing"
"time"
log "github.com/sirupsen/logrus"
"github.com/PaddlePaddle/Paddle/go/connection"
"github.com/PaddlePaddle/recordio"
)
const (
totalTask = 20
chunkPerTask = 10
)
func init() {
log.SetLevel(log.ErrorLevel)
}
type TestAddresser string
func (a TestAddresser) Address() string {
return string(a)
}
func TestGetFinishTask(t *testing.T) {
const path = "/tmp/master_client_test_0"
l, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
ss := strings.Split(l.Addr().String(), ":")
p, err := strconv.Atoi(ss[len(ss)-1])
if err != nil {
panic(err)
}
go func(l net.Listener) {
s := NewService(chunkPerTask, time.Second, 1)
server := rpc.NewServer()
err := server.Register(s)
if err != nil {
panic(err)
}
mux := http.NewServeMux()
mux.Handle(rpc.DefaultRPCPath, server)
err = http.Serve(l, mux)
if err != nil {
panic(err)
}
}(l)
f, err := os.Create(path)
if err != nil {
panic(err)
}
for i := 0; i < totalTask*chunkPerTask; i++ {
w := recordio.NewWriter(f, -1, -1)
w.Write(nil)
// call Close to force RecordIO writing a chunk.
w.Close()
}
f.Close()
// Manually intialize client to avoid calling c.getRecords()
c := &Client{}
c.conn = connection.New()
go c.monitorMaster(TestAddresser(fmt.Sprintf(":%d", p)))
c.SetDataset([]string{path})
checkOnePass := func(i int) {
var tasks []Task
for idx := 0; idx < totalTask; idx++ {
task, err := c.getTask()
if err != nil {
t.Fatalf("Error: %v, pass: %d\n", err, i)
}
tasks = append(tasks, task)
}
_, err = c.getTask()
if err == nil {
t.Fatalf("Should get error, pass: %d\n", i)
}
err = c.taskFinished(tasks[0].ID)
if err != nil {
t.Fatalf("Error: %v, pass: %d\n", err, i)
}
tasks = tasks[1:]
task, err := c.getTask()
if err != nil {
t.Fatal(err)
}
tasks = append(tasks, task)
for _, task := range tasks {
err = c.taskFinished(task.ID)
if err != nil {
t.Fatalf("Error: %v, pass: %d\n", err, i)
}
}
}
for i := 0; i < 10; i++ {
checkOnePass(i)
}
}

@ -0,0 +1,79 @@
package master_test
import (
"fmt"
"net"
"net/http"
"net/rpc"
"os"
"strconv"
"strings"
"testing"
"time"
"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/recordio"
)
func TestNextRecord(t *testing.T) {
const (
path = "/tmp/master_client_TestFull"
total = 50
)
l, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
ss := strings.Split(l.Addr().String(), ":")
p, err := strconv.Atoi(ss[len(ss)-1])
if err != nil {
panic(err)
}
go func(l net.Listener) {
s := master.NewService(10, time.Second, 1)
server := rpc.NewServer()
err := server.Register(s)
if err != nil {
panic(err)
}
mux := http.NewServeMux()
mux.Handle(rpc.DefaultRPCPath, server)
err = http.Serve(l, mux)
if err != nil {
panic(err)
}
}(l)
f, err := os.Create(path)
if err != nil {
panic(err)
}
w := recordio.NewWriter(f, -1, -1)
for i := 0; i < total; i++ {
w.Write([]byte{byte(i)})
}
w.Close()
f.Close()
c := master.NewClient(master.TestAddresser(fmt.Sprintf(":%d", p)), 10)
c.SetDataset([]string{path})
for pass := 0; pass < 50; pass++ {
received := make(map[byte]bool)
for i := 0; i < total; i++ {
r := c.NextRecord()
if len(r) != 1 {
t.Fatal("Length should be 1.", r)
}
if received[r[0]] {
t.Fatal("Received duplicate.", received, r)
}
received[r[0]] = true
}
}
}

File diff suppressed because it is too large Load Diff

@ -9,5 +9,15 @@ project(cxx_go C Go)
include(golang) include(golang)
include(flags) include(flags)
go_library(client STATIC) go_library(paddle_pserver_cclient STATIC)
if(PROJ_ROOT)
add_custom_command(OUTPUT ${PROJ_ROOT}/paddle/trainer/libpaddle_pserver_cclient.a
COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/libpaddle_pserver_cclient.h ${PROJ_ROOT}/paddle/trainer/
COMMAND cp ${CMAKE_CURRENT_BINARY_DIR}/libpaddle_pserver_cclient.a ${PROJ_ROOT}/paddle/trainer/
WORKING_DIRECTORY ${PROJ_ROOT}/paddle
DEPENDS paddle_pserver_cclient)
add_custom_target(paddle_pserver_cclient_lib ALL DEPENDS ${PROJ_ROOT}/paddle/trainer/libpaddle_pserver_cclient.a)
endif(PROJ_ROOT)
add_subdirectory(test) add_subdirectory(test)

File diff suppressed because it is too large Load Diff

@ -1,11 +1,22 @@
cmake_minimum_required(VERSION 3.0) cmake_minimum_required(VERSION 3.0)
include_directories(${CMAKE_BINARY_DIR})
add_executable(main main.c) add_executable(main main.c)
add_dependencies(main client) add_dependencies(main paddle_pserver_cclient)
add_executable(test_cclient test_cclient.c)
add_dependencies(test_cclient paddle_pserver_cclient)
if(APPLE) if(APPLE)
set(CMAKE_EXE_LINKER_FLAGS "-framework CoreFoundation -framework Security") set(CMAKE_EXE_LINKER_FLAGS "-framework CoreFoundation -framework Security")
else()
set(CMAKE_EXE_LINKER_FLAGS "-pthread")
endif() endif()
target_link_libraries(main ${CMAKE_BINARY_DIR}/libclient.a)
if(PROJ_ROOT)
include_directories(${CMAKE_CURRENT_BINARY_DIR}/..)
target_link_libraries(main ${CMAKE_CURRENT_BINARY_DIR}/../libpaddle_pserver_cclient.a pthread)
target_link_libraries(test_cclient ${CMAKE_CURRENT_BINARY_DIR}/../libpaddle_pserver_cclient.a pthread)
else(PROJ_ROOT)
include_directories(${CMAKE_BINARY_DIR})
target_link_libraries(main ${CMAKE_BINARY_DIR}/libpaddle_pserver_cclient.a pthread)
target_link_libraries(test_cclient ${CMAKE_BINARY_DIR}/libpaddle_pserver_cclient.a pthread)
endif(PROJ_ROOT)

@ -1,68 +1,91 @@
#include <stdio.h> #include <stdio.h>
#include <stdlib.h>
#include "libclient.h" #include "libpaddle_pserver_cclient.h"
void fail() { // TODO(helin): Fix: gtest using cmake is not working, using this
// TODO(helin): fix: gtest using cmake is not working, using this // hacky way for now.
// hacky way for now. #define fail() \
printf("test failed.\n"); fprintf(stderr, "info: %s:%d: ", __FILE__, __LINE__); \
exit(-1); exit(-1);
void sendGrads(paddle_pserver_client c) {
unsigned char grad_a[2000] = {2};
unsigned char grad_b[3000] = {3};
paddle_gradient grad1 = {
"param_a", PADDLE_ELEMENT_TYPE_FLOAT32, grad_a, 2000};
paddle_gradient grad2 = {
"param_b", PADDLE_ELEMENT_TYPE_FLOAT32, grad_b, 3000};
paddle_gradient* grads[2] = {&grad1, &grad2};
if (paddle_send_grads(c, grads, 2)) {
fail();
}
}
void getParams(paddle_pserver_client c) {
paddle_parameter param_a;
paddle_parameter param_b;
char name_a[] = "param_a";
char name_b[] = "param_b";
// Must pre-allocate the prameter content before calling paddle_get_params.
unsigned char content_a[2000] = {};
unsigned char content_b[3000] = {};
param_a.element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
param_a.name = name_a;
param_a.content = content_a;
param_a.content_len = 2000;
param_b.element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
param_b.name = name_b;
param_b.content = content_b;
param_b.content_len = 3000;
paddle_parameter* params[2] = {&param_a, &param_b};
if (paddle_get_params(c, params, 2)) {
fail();
}
} }
int main() { int main() {
char addr[] = "localhost:3000"; char addr[] = "localhost:3000";
client c = paddle_new_pserver_client(addr, 1); paddle_pserver_client c = paddle_new_pserver_client(addr, 1);
retry: retry:
if (paddle_begin_init_params(c)) { if (paddle_begin_init_params(c)) {
paddle_parameter param; paddle_parameter param;
char name_a[] = "param_a"; char name_a[] = "param_a";
char name_b[] = "param_b"; char name_b[] = "param_b";
unsigned char content[] = {0x00, 0x11, 0x22}; unsigned char content_a[2000] = {1};
unsigned char content_b[3000] = {0};
param.element_type = PADDLE_ELEMENT_TYPE_FLOAT32; param.element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
param.name = name_a; param.name = name_a;
param.content = content; param.content = content_a;
param.content_len = 3; param.content_len = 2000;
if (paddle_init_param(c, param, NULL, 0) != 0) { int error = paddle_init_param(c, param, NULL, 0);
if (error != 0) {
goto retry; goto retry;
} }
param.element_type = PADDLE_ELEMENT_TYPE_INT32;
param.element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
param.name = name_b; param.name = name_b;
param.content = content; param.content = content_b;
param.content_len = 3; param.content_len = 3000;
if (paddle_init_param(c, param, NULL, 0) != 0) { error = paddle_init_param(c, param, NULL, 0);
if (error != 0) {
goto retry; goto retry;
} }
if (paddle_finish_init_params(c) != 0) {
error = paddle_finish_init_params(c);
if (error != 0) {
goto retry; goto retry;
} }
} else {
fail();
}
unsigned char content[] = {0x00, 0x11, 0x22};
paddle_gradient grads[2] = {
{"param_a", PADDLE_ELEMENT_TYPE_INT32, content, 3},
{"param_b", PADDLE_ELEMENT_TYPE_FLOAT32, content, 3}};
if (!paddle_send_grads(c, grads, 2)) {
fail();
} }
paddle_parameter* params[2] = {NULL, NULL}; int i;
char* names[] = {"param_a", "param_b"}; for (i = 0; i < 100; i++) {
if (!paddle_get_params(c, names, params, 2)) { sendGrads(c);
fail(); getParams(c);
} }
// get parameters again by reusing the allocated parameter buffers. if (paddle_save_model(c, "/tmp/")) {
if (!paddle_get_params(c, names, params, 2)) {
fail();
}
paddle_release_param(params[0]);
paddle_release_param(params[1]);
if (!paddle_save_model(c, "/tmp/")) {
fail(); fail();
} }

@ -0,0 +1,117 @@
#include <stdio.h>
#include <stdlib.h>
#include "libpaddle_pserver_cclient.h"
typedef float real;
void fail() {
// TODO(helin): fix: gtest using cmake is not working, using this
// hacky way for now.
printf("test failed.\n");
exit(-1);
}
void print_parameter(paddle_gradient* param) {
if (param == NULL) {
printf("param is NULL!!\n");
} else {
printf("==== parameter ====\n");
printf("name: %s\n", param->name);
printf("content_len: %d\n", param->content_len);
printf("content_type: %d\n", param->element_type);
int i;
for (i = 0; i < param->content_len / (int)sizeof(real); ++i) {
printf("%f ", ((float*)param->content)[i]);
}
printf("\n\n");
}
}
int main() {
char addr[] = "localhost:3000";
paddle_pserver_client c = paddle_new_pserver_client(addr, 1);
char* names[] = {"param_a", "param_b"};
retry:
printf("init parameter to pserver:\n");
real param_content1[] = {0.1, 0.2, 0.3};
real param_content2[] = {0.4, 0.5, 0.6};
paddle_parameter** params =
(paddle_parameter**)malloc(sizeof(paddle_parameter*) * 2);
params[0] = (paddle_parameter*)malloc(sizeof(paddle_parameter));
params[0]->name = names[0];
params[0]->content = (unsigned char*)param_content1;
params[0]->content_len = 3 * sizeof(real);
params[0]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
params[1] = (paddle_parameter*)malloc(sizeof(paddle_parameter));
params[1]->name = names[1];
params[1]->content = (unsigned char*)param_content2;
params[1]->content_len = 3 * sizeof(real);
params[1]->element_type = PADDLE_ELEMENT_TYPE_INT32;
if (paddle_begin_init_params(c)) {
if (paddle_init_param(c, *params[0], NULL, 0) != 0) {
goto retry;
}
if (paddle_init_param(c, *params[1], NULL, 0) != 0) {
goto retry;
}
if (paddle_finish_init_params(c) != 0) {
goto retry;
}
} else {
fail();
}
printf("get inited parameters from pserver:\n");
// get parameters again by reusing the allocated parameter buffers.
if (paddle_get_params(c, params, 2) != 0) {
fail();
}
print_parameter(params[0]);
print_parameter(params[1]);
printf("send gradient to pserver:\n");
real gradient_content1[] = {0.01, 0.02, 0.03};
real gradinet_content2[] = {0.04, 0.05, 0.06};
paddle_gradient** grads =
(paddle_gradient**)malloc(sizeof(paddle_gradient*) * 2);
grads[0] = (paddle_gradient*)malloc(sizeof(paddle_gradient));
grads[0]->name = names[0];
grads[0]->content = (unsigned char*)gradient_content1;
grads[0]->content_len = 3 * sizeof(real);
grads[0]->element_type = PADDLE_ELEMENT_TYPE_FLOAT32;
grads[1] = (paddle_gradient*)malloc(sizeof(paddle_gradient));
grads[1]->name = names[1];
grads[1]->content = (unsigned char*)gradinet_content2;
grads[1]->content_len = 3 * sizeof(real);
grads[1]->element_type = PADDLE_ELEMENT_TYPE_INT32;
printf("print gradient sent to pserver:\n");
print_parameter(grads[0]);
print_parameter(grads[1]);
if (paddle_send_grads(c, grads, 2) != 0) {
fail();
}
printf("get updated parameters from pserver:\n");
// get parameters again by reusing the allocated parameter buffers.
if (paddle_get_params(c, params, 2) != 0) {
fail();
}
print_parameter(params[0]);
print_parameter(params[1]);
if (paddle_save_model(c, "/tmp/") != 0) {
fail();
}
return 0;
}

@ -0,0 +1,131 @@
import paddle.v2 as paddle
import gzip
def softmax_regression(img):
predict = paddle.layer.fc(input=img,
size=10,
act=paddle.activation.Softmax())
return predict
def multilayer_perceptron(img):
# The first fully-connected layer
hidden1 = paddle.layer.fc(input=img, size=128, act=paddle.activation.Relu())
# The second fully-connected layer and the according activation function
hidden2 = paddle.layer.fc(input=hidden1,
size=64,
act=paddle.activation.Relu())
# The thrid fully-connected layer, note that the hidden size should be 10,
# which is the number of unique digits
predict = paddle.layer.fc(input=hidden2,
size=10,
act=paddle.activation.Softmax())
return predict
def convolutional_neural_network(img):
# first conv layer
conv_pool_1 = paddle.networks.simple_img_conv_pool(
input=img,
filter_size=5,
num_filters=20,
num_channel=1,
pool_size=2,
pool_stride=2,
act=paddle.activation.Tanh())
# second conv layer
conv_pool_2 = paddle.networks.simple_img_conv_pool(
input=conv_pool_1,
filter_size=5,
num_filters=50,
num_channel=20,
pool_size=2,
pool_stride=2,
act=paddle.activation.Tanh())
# The first fully-connected layer
fc1 = paddle.layer.fc(input=conv_pool_2,
size=128,
act=paddle.activation.Tanh())
# The softmax layer, note that the hidden size should be 10,
# which is the number of unique digits
predict = paddle.layer.fc(input=fc1,
size=10,
act=paddle.activation.Softmax())
return predict
def main():
paddle.init(use_gpu=False, trainer_count=1)
# define network topology
images = paddle.layer.data(
name='pixel', type=paddle.data_type.dense_vector(784))
label = paddle.layer.data(
name='label', type=paddle.data_type.integer_value(10))
# Here we can build the prediction network in different ways. Please
# choose one by uncomment corresponding line.
predict = softmax_regression(images)
#predict = multilayer_perceptron(images)
#predict = convolutional_neural_network(images)
cost = paddle.layer.classification_cost(input=predict, label=label)
parameters = paddle.parameters.create(cost)
optimizer = paddle.optimizer.Momentum(
learning_rate=0.1 / 128.0,
momentum=0.9,
regularization=paddle.optimizer.L2Regularization(rate=0.0005 * 128))
trainer = paddle.trainer.SGD(cost=cost,
parameters=parameters,
update_equation=optimizer,
is_local=False,
pserver_spec="localhost:3000")
lists = []
def event_handler(event):
if isinstance(event, paddle.event.EndIteration):
if event.batch_id % 1000 == 0:
print "Pass %d, Batch %d, Cost %f, %s" % (
event.pass_id, event.batch_id, event.cost, event.metrics)
elif isinstance(event, paddle.event.EndPass):
result = trainer.test(reader=paddle.batch(
paddle.dataset.mnist.test(), batch_size=128))
print "Test with Pass %d, Cost %f, %s\n" % (
event.pass_id, result.cost, result.metrics)
lists.append((event.pass_id, result.cost,
result.metrics['classification_error_evaluator']))
trainer.train(
reader=paddle.batch(
paddle.reader.shuffle(
paddle.dataset.mnist.train(), buf_size=8192),
batch_size=128),
event_handler=event_handler,
num_passes=100)
# find the best pass
best = sorted(lists, key=lambda list: float(list[1]))[0]
print 'Best pass is %s, testing Avgcost is %s' % (best[0], best[1])
print 'The classification accuracy is %.2f%%' % (100 - float(best[2]) * 100)
test_creator = paddle.dataset.mnist.test()
test_data = []
for item in test_creator():
test_data.append((item[0], ))
if len(test_data) == 100:
break
# output is a softmax layer. It returns probabilities.
# Shape should be (100, 10)
probs = paddle.infer(
output_layer=predict, parameters=parameters, input=test_data)
print probs.shape
if __name__ == '__main__':
main()

@ -0,0 +1,60 @@
import paddle.v2 as paddle
import paddle.v2.dataset.uci_housing as uci_housing
def main():
# init
paddle.init(use_gpu=False, trainer_count=1)
# network config
x = paddle.layer.data(name='x', type=paddle.data_type.dense_vector(13))
y_predict = paddle.layer.fc(input=x,
param_attr=paddle.attr.Param(name='w'),
size=1,
act=paddle.activation.Linear(),
bias_attr=paddle.attr.Param(name='b'))
y = paddle.layer.data(name='y', type=paddle.data_type.dense_vector(1))
cost = paddle.layer.mse_cost(input=y_predict, label=y)
# create parameters
parameters = paddle.parameters.create(cost)
# create optimizer
optimizer = paddle.optimizer.Momentum(momentum=0)
trainer = paddle.trainer.SGD(cost=cost,
parameters=parameters,
update_equation=optimizer,
is_local=False,
pserver_spec="localhost:3000")
# event_handler to print training and testing info
def event_handler(event):
if isinstance(event, paddle.event.EndIteration):
if event.batch_id % 100 == 0:
print "Pass %d, Batch %d, Cost %f" % (
event.pass_id, event.batch_id, event.cost)
if isinstance(event, paddle.event.EndPass):
if (event.pass_id + 1) % 10 == 0:
result = trainer.test(
reader=paddle.batch(
uci_housing.test(), batch_size=2),
feeding={'x': 0,
'y': 1})
print "Test %d, %.2f" % (event.pass_id, result.cost)
# training
trainer.train(
reader=paddle.batch(
paddle.reader.shuffle(
uci_housing.train(), buf_size=500),
batch_size=2),
feeding={'x': 0,
'y': 1},
event_handler=event_handler,
num_passes=30)
if __name__ == '__main__':
main()

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save