From 55217c962d127e72ee88e042d2dd95cfe7375a65 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Tue, 16 May 2017 18:41:01 -0400 Subject: [PATCH 01/15] Implement Pserver RPC, gradient update logic, cgo part --- paddle/go/cmd/pserver/.gitignore | 1 + paddle/go/cmd/pserver/pserver.go | 33 ++++++ paddle/go/pserver/optimizer.c | 22 ++++ paddle/go/pserver/optimizer.go | 51 +++++++++ paddle/go/pserver/optimizer.h | 19 ++++ paddle/go/pserver/service.go | 165 ++++++++++++++++++++++++++++++ paddle/go/pserver/service_test.go | 154 ++++++++++++++++++++++++++++ 7 files changed, 445 insertions(+) create mode 100644 paddle/go/cmd/pserver/.gitignore create mode 100644 paddle/go/cmd/pserver/pserver.go create mode 100644 paddle/go/pserver/optimizer.c create mode 100644 paddle/go/pserver/optimizer.go create mode 100644 paddle/go/pserver/optimizer.h create mode 100644 paddle/go/pserver/service.go create mode 100644 paddle/go/pserver/service_test.go diff --git a/paddle/go/cmd/pserver/.gitignore b/paddle/go/cmd/pserver/.gitignore new file mode 100644 index 0000000000..fffd9adc4f --- /dev/null +++ b/paddle/go/cmd/pserver/.gitignore @@ -0,0 +1 @@ +pserver diff --git a/paddle/go/cmd/pserver/pserver.go b/paddle/go/cmd/pserver/pserver.go new file mode 100644 index 0000000000..41417875fb --- /dev/null +++ b/paddle/go/cmd/pserver/pserver.go @@ -0,0 +1,33 @@ +package main + +import ( + "flag" + "net" + "net/http" + "net/rpc" + "strconv" + + "github.com/PaddlePaddle/Paddle/paddle/go/pserver" +) + +func main() { + port := flag.Int("p", 0, "port of the pserver") + flag.Parse() + + s := pserver.NewService() + err := rpc.Register(s) + if err != nil { + panic(err) + } + + rpc.HandleHTTP() + l, err := net.Listen("tcp", ":"+strconv.Itoa(*port)) + if err != nil { + panic(err) + } + + err = http.Serve(l, nil) + if err != nil { + panic(err) + } +} diff --git a/paddle/go/pserver/optimizer.c b/paddle/go/pserver/optimizer.c new file mode 100644 index 0000000000..d83409297b --- /dev/null +++ b/paddle/go/pserver/optimizer.c @@ -0,0 +1,22 @@ +#include + +#include "optimizer.h" + +typedef struct { + double learning_rate; +} SGD_optimizer; + +paddle_optimizer* paddle_create_SGD_optimizer(double learning_rate) { + SGD_optimizer* o = (SGD_optimizer*)malloc(sizeof(SGD_optimizer)); + o->learning_rate = learning_rate; + return (paddle_optimizer*)o; +} + +void paddle_release_optimizer(paddle_optimizer* o) { + free(o); +} + +int paddle_update_parameter(paddle_optimizer* o, void *buffer, paddle_element_type datatype, const void* gradient, int num_bytes) { + // TODO + return 0; +} diff --git a/paddle/go/pserver/optimizer.go b/paddle/go/pserver/optimizer.go new file mode 100644 index 0000000000..aa02bed3e0 --- /dev/null +++ b/paddle/go/pserver/optimizer.go @@ -0,0 +1,51 @@ +package pserver + +/* +#include "optimizer.h" +*/ +import "C" +import ( + "fmt" + "unsafe" +) + +type optimizerType int + +const ( + sgd optimizerType = iota +) + +var nullPtr = unsafe.Pointer(uintptr(0)) + +type optimizer struct { + opt *C.paddle_optimizer +} + +func newOptimizer(t optimizerType, learning_rate float64) *optimizer { + o := &optimizer{} + o.opt = C.paddle_create_SGD_optimizer(C.double(learning_rate)) + return o +} + +func (o *optimizer) UpdateParameter(p Parameter, g Gradient) error { + if len(p.Content) != len(g.Content) { + return fmt.Errorf("parameter and gradient length not match, parameter: %d, gradient: %d", len(p.Content), len(g.Content)) + } + + if p.ElementType != g.ElementType { + return fmt.Errorf("parameter and gradient element type not match, parameter: %v, gradient: %v", p.ElementType, g.ElementType) + } + + r := C.paddle_update_parameter(o.opt, unsafe.Pointer(&p.Content[0]), C.paddle_element_type(p.ElementType), unsafe.Pointer(&g.Content[0]), C.int(len(g.Content))) + if r != 0 { + return fmt.Errorf("optimier returned error code: %d", r) + } + return nil +} + +func (o *optimizer) Cleanup() { + if unsafe.Pointer(o.opt) != nullPtr { + C.paddle_release_optimizer(o.opt) + o.opt = (*C.paddle_optimizer)(nullPtr) + } +} diff --git a/paddle/go/pserver/optimizer.h b/paddle/go/pserver/optimizer.h new file mode 100644 index 0000000000..e1750ca608 --- /dev/null +++ b/paddle/go/pserver/optimizer.h @@ -0,0 +1,19 @@ +#ifndef PADDLE_PSERVER_OPTIMIZER_H +#define PADDLE_PSERVER_OPTIMIZER_H + +typedef enum { + PADDLE_ELEMENT_TYPE_INT32 = 0, + PADDLE_ELEMENT_TYPE_UINT32 = 1, + PADDLE_ELEMENT_TYPE_INT64 = 2, + PADDLE_ELEMENT_TYPE_UINT64 = 3, + PADDLE_ELEMENT_TYPE_FLOAT32 = 4, + PADDLE_ELEMENT_TYPE_FLOAT64 = 5, +} paddle_element_type; + +typedef struct paddle_optimizer paddle_optimizer; + +paddle_optimizer* paddle_create_SGD_optimizer(double learning_rate); +void paddle_release_optimizer(paddle_optimizer* o); +int paddle_update_parameter(paddle_optimizer* o, void *buffer, paddle_element_type datatype, const void* gradient, int num_bytes); + +#endif /* PADDLE_PSERVER_OPTIMIZER_H */ diff --git a/paddle/go/pserver/service.go b/paddle/go/pserver/service.go new file mode 100644 index 0000000000..0d10da9880 --- /dev/null +++ b/paddle/go/pserver/service.go @@ -0,0 +1,165 @@ +package pserver + +import ( + "errors" + "fmt" + "sync" +) + +// ElementType is the type of elements of a Parameter. +type ElementType int + +var ErrUnintialized = errors.New("pserver not initialized") +var ErrAlreadyIntialized = errors.New("pserver already initialized") + +// Supported element types +const ( + Int32 ElementType = iota + UInt32 + Int64 + UInt64 + Float32 + Float64 +) + +// Parameter is a piece of data to sync with the parameter server. +type Parameter struct { + Name string + ElementType ElementType + Content []byte +} + +// ParameterWithConfig contains the parameter and the configuration. +type ParameterWithConfig struct { + Param Parameter + Config []byte // parameter configuration in Proto Buffer format +} + +// Gradient is the gradient of the parameter. +type Gradient Parameter + +type Service struct { + initialized chan struct{} + + mu sync.Mutex + opt *optimizer + paramMap map[string]Parameter +} + +func NewService() *Service { + s := &Service{} + s.paramMap = make(map[string]Parameter) + s.initialized = make(chan struct{}) + return s +} + +func (s *Service) BeginInitParams(config []byte, dummy *int) error { + select { + case <-s.initialized: + return ErrAlreadyIntialized + default: + } + + s.mu.Lock() + defer s.mu.Unlock() + + if s.opt != nil { + s.opt.Cleanup() + } + + // TODO(helin): parse learning rate from config + s.opt = newOptimizer(sgd, 0.01) + return nil +} + +func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, dummy *int) error { + select { + case <-s.initialized: + return ErrAlreadyIntialized + default: + } + + // TODO(helin): parse parameter config + + s.mu.Lock() + defer s.mu.Unlock() + + // TODO(helin): check if paramWithConfigs.Param.Content is + // properly memory aligned, if not, make copy to a memory + // aligned region. + s.paramMap[paramWithConfigs.Param.Name] = paramWithConfigs.Param + return nil +} + +func (s *Service) FinishInitParams(dummy0 int, dummy1 *int) error { + select { + case <-s.initialized: + return ErrAlreadyIntialized + default: + } + + close(s.initialized) + return nil +} + +func (s *Service) SendGrads(grads []Gradient, dummy *int) error { + select { + case <-s.initialized: + default: + return ErrUnintialized + } + + s.mu.Lock() + s.mu.Unlock() + + for _, g := range grads { + if _, ok := s.paramMap[g.Name]; !ok { + return fmt.Errorf("parameter: %s does not exist", g.Name) + } + } + + var wg sync.WaitGroup + for _, g := range grads { + wg.Add(1) + go func(p Parameter, g Gradient) { + s.opt.UpdateParameter(p, g) + wg.Done() + }(s.paramMap[g.Name], g) + } + + wg.Wait() + return nil +} + +func (s *Service) GetParams(names []string, parameters *[]Parameter) error { + <-s.initialized + s.mu.Lock() + s.mu.Unlock() + + for _, n := range names { + if _, ok := s.paramMap[n]; !ok { + return fmt.Errorf("parameter: %s does not exist", n) + } + } + + *parameters = make([]Parameter, len(names)) + for i, n := range names { + // The parameter content (a byte slice) may change + // during RPC serialization due to write from other + // goroutine, we allow it since mini-batch based deep + // learning optimization methods are stochastic in + // nature. This race condition is allowed deliberately + // to save the program from making a copy of the + // paramter content. + (*parameters)[i] = s.paramMap[n] + } + + return nil +} + +func (s *Service) SaveModel(path string, dummy *int) error { + <-s.initialized + + // TODO + return nil +} diff --git a/paddle/go/pserver/service_test.go b/paddle/go/pserver/service_test.go new file mode 100644 index 0000000000..ebeff1fb89 --- /dev/null +++ b/paddle/go/pserver/service_test.go @@ -0,0 +1,154 @@ +package pserver_test + +import ( + "reflect" + "sync" + "testing" + + "github.com/PaddlePaddle/Paddle/paddle/go/pserver" +) + +func TestFull(t *testing.T) { + s := pserver.NewService() + var dummy int + err := s.BeginInitParams(nil, &dummy) + if err != nil { + t.FailNow() + } + + var p pserver.Parameter + p.Name = "param_a" + p.Content = []byte{1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0} + p.ElementType = pserver.Int32 + err = s.InitParam(pserver.ParameterWithConfig{p, nil}, &dummy) + if err != nil { + t.FailNow() + } + + var p1 pserver.Parameter + p1.Name = "param_b" + p1.Content = []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} + p1.ElementType = pserver.Float32 + err = s.InitParam(pserver.ParameterWithConfig{p1, nil}, &dummy) + if err != nil { + t.FailNow() + } + + err = s.FinishInitParams(0, &dummy) + if err != nil { + t.FailNow() + } + + var params []pserver.Parameter + err = s.GetParams([]string{"param_b", "param_a"}, ¶ms) + if err != nil { + t.FailNow() + } + + if len(params) != 2 || !reflect.DeepEqual(params[0], p1) || !reflect.DeepEqual(params[0], p1) { + t.FailNow() + } + + grads := []pserver.Gradient{pserver.Gradient(p1), pserver.Gradient(p)} + err = s.SendGrads(grads, &dummy) + if err != nil { + t.FailNow() + } + + var params1 []pserver.Parameter + err = s.GetParams([]string{"param_b", "param_a"}, ¶ms1) + if err != nil { + t.FailNow() + } + + if len(params) != 2 { + t.FailNow() + } + + // we don't care the content, since it's already optimized with gradient + params1[0].Content = nil + params1[0].Content = nil + p.Content = nil + p1.Content = nil + + if !reflect.DeepEqual(params1[0], p1) || !reflect.DeepEqual(params1[0], p1) { + t.FailNow() + } +} + +func TestMultipleInit(t *testing.T) { + s := pserver.NewService() + var dummy int + err := s.BeginInitParams(nil, &dummy) + if err != nil { + t.FailNow() + } + + // this is fine, it's possible for client to call init + // multiple times. + err = s.BeginInitParams(nil, &dummy) + if err != nil { + t.FailNow() + } + + err = s.FinishInitParams(0, &dummy) + if err != nil { + t.FailNow() + } + + err = s.FinishInitParams(0, &dummy) + if err != pserver.ErrAlreadyIntialized { + t.FailNow() + } + + err = s.BeginInitParams(nil, &dummy) + if err != pserver.ErrAlreadyIntialized { + t.FailNow() + } +} + +func TestUninitialized(t *testing.T) { + s := pserver.NewService() + var dummy int + err := s.SendGrads(nil, &dummy) + if err != pserver.ErrUnintialized { + t.FailNow() + } +} + +func TestBlockUntilInitialized(t *testing.T) { + s := pserver.NewService() + var wg sync.WaitGroup + wg.Add(1) + go func() { + var params []pserver.Parameter + err := s.GetParams(nil, ¶ms) + if err != nil { + t.FailNow() + } + wg.Done() + }() + + wg.Add(1) + go func() { + var dummy int + err := s.SaveModel("", &dummy) + if err != nil { + t.FailNow() + } + wg.Done() + }() + + var dummy int + err := s.BeginInitParams(nil, &dummy) + if err != nil { + t.FailNow() + } + + err = s.FinishInitParams(0, &dummy) + if err != nil { + t.FailNow() + } + + wg.Wait() +} From 6ee5bc81c021e063369d1e9ba9333d534219a2cb Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 17 May 2017 19:51:36 -0400 Subject: [PATCH 02/15] use function pointer for updater dispatching --- paddle/go/pserver/optimizer.c | 32 +++++++++++++++++++++++--------- paddle/go/pserver/optimizer.go | 4 ++-- paddle/go/pserver/optimizer.h | 9 ++++----- 3 files changed, 29 insertions(+), 16 deletions(-) diff --git a/paddle/go/pserver/optimizer.c b/paddle/go/pserver/optimizer.c index d83409297b..123684970f 100644 --- a/paddle/go/pserver/optimizer.c +++ b/paddle/go/pserver/optimizer.c @@ -2,21 +2,35 @@ #include "optimizer.h" -typedef struct { - double learning_rate; -} SGD_optimizer; +typedef int (*update_func)(void*, void *, paddle_element_type, const void*, int); -paddle_optimizer* paddle_create_SGD_optimizer(double learning_rate) { - SGD_optimizer* o = (SGD_optimizer*)malloc(sizeof(SGD_optimizer)); - o->learning_rate = learning_rate; - return (paddle_optimizer*)o; -} +typedef struct paddle_optimizer{ + update_func func; + void* optimizer; +} paddle_optimizer; void paddle_release_optimizer(paddle_optimizer* o) { free(o); } -int paddle_update_parameter(paddle_optimizer* o, void *buffer, paddle_element_type datatype, const void* gradient, int num_bytes) { +int paddle_update_parameter(paddle_optimizer* o, void *buffer, paddle_element_type element_type, const void* gradient, int num_bytes) { + return o->func(o->optimizer, buffer, element_type, gradient, num_bytes); +} + +typedef struct { + double learning_rate; +} SGD_optimizer; + +int paddle_SGD_update_parameter(void* optimizer, void *buffer, paddle_element_type element_type, const void* gradient, int num_bytes) { // TODO return 0; } + +paddle_optimizer* paddle_create_SGD_optimizer(double learning_rate) { + SGD_optimizer* o = (SGD_optimizer*)malloc(sizeof(SGD_optimizer)); + o->learning_rate = learning_rate; + paddle_optimizer* container = (paddle_optimizer*)malloc(sizeof(paddle_optimizer)); + container->func = paddle_SGD_update_parameter; + container->optimizer = o; + return container; +} diff --git a/paddle/go/pserver/optimizer.go b/paddle/go/pserver/optimizer.go index aa02bed3e0..8c6450bca0 100644 --- a/paddle/go/pserver/optimizer.go +++ b/paddle/go/pserver/optimizer.go @@ -18,7 +18,7 @@ const ( var nullPtr = unsafe.Pointer(uintptr(0)) type optimizer struct { - opt *C.paddle_optimizer + opt *C.struct_paddle_optimizer } func newOptimizer(t optimizerType, learning_rate float64) *optimizer { @@ -46,6 +46,6 @@ func (o *optimizer) UpdateParameter(p Parameter, g Gradient) error { func (o *optimizer) Cleanup() { if unsafe.Pointer(o.opt) != nullPtr { C.paddle_release_optimizer(o.opt) - o.opt = (*C.paddle_optimizer)(nullPtr) + o.opt = (*C.struct_paddle_optimizer)(nullPtr) } } diff --git a/paddle/go/pserver/optimizer.h b/paddle/go/pserver/optimizer.h index e1750ca608..cde8da70cc 100644 --- a/paddle/go/pserver/optimizer.h +++ b/paddle/go/pserver/optimizer.h @@ -10,10 +10,9 @@ typedef enum { PADDLE_ELEMENT_TYPE_FLOAT64 = 5, } paddle_element_type; -typedef struct paddle_optimizer paddle_optimizer; - -paddle_optimizer* paddle_create_SGD_optimizer(double learning_rate); -void paddle_release_optimizer(paddle_optimizer* o); -int paddle_update_parameter(paddle_optimizer* o, void *buffer, paddle_element_type datatype, const void* gradient, int num_bytes); +struct paddle_optimizer; +struct paddle_optimizer* paddle_create_SGD_optimizer(double learning_rate); +void paddle_release_optimizer(struct paddle_optimizer* o); +int paddle_update_parameter(struct paddle_optimizer* o, void *buffer, paddle_element_type element_type, const void* gradient, int num_bytes); #endif /* PADDLE_PSERVER_OPTIMIZER_H */ From bd2469f21c663c192a7ab73b2b2fdfafffdf5edb Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 17 May 2017 20:01:29 -0400 Subject: [PATCH 03/15] correct optimizer release, add test --- paddle/go/pserver/optimizer.c | 18 ++++++++++++++---- paddle/go/pserver/optimizer_test.go | 8 ++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) create mode 100644 paddle/go/pserver/optimizer_test.go diff --git a/paddle/go/pserver/optimizer.c b/paddle/go/pserver/optimizer.c index 123684970f..36a612a56f 100644 --- a/paddle/go/pserver/optimizer.c +++ b/paddle/go/pserver/optimizer.c @@ -3,34 +3,44 @@ #include "optimizer.h" typedef int (*update_func)(void*, void *, paddle_element_type, const void*, int); +typedef void (*release_func)(void*); typedef struct paddle_optimizer{ - update_func func; + update_func update; + release_func release; void* optimizer; } paddle_optimizer; void paddle_release_optimizer(paddle_optimizer* o) { + o->release(o->optimizer); free(o); } int paddle_update_parameter(paddle_optimizer* o, void *buffer, paddle_element_type element_type, const void* gradient, int num_bytes) { - return o->func(o->optimizer, buffer, element_type, gradient, num_bytes); + return o->update(o->optimizer, buffer, element_type, gradient, num_bytes); } typedef struct { double learning_rate; } SGD_optimizer; -int paddle_SGD_update_parameter(void* optimizer, void *buffer, paddle_element_type element_type, const void* gradient, int num_bytes) { +int update_SGD(void* optimizer, void *buffer, paddle_element_type element_type, const void* gradient, int num_bytes) { + SGD_optimizer* o = (SGD_optimizer*)optimizer; // TODO return 0; } +void release_SGD(void *optimizer) { + SGD_optimizer* o = (SGD_optimizer*)optimizer; + // nothing allocated on heap +} + paddle_optimizer* paddle_create_SGD_optimizer(double learning_rate) { SGD_optimizer* o = (SGD_optimizer*)malloc(sizeof(SGD_optimizer)); o->learning_rate = learning_rate; paddle_optimizer* container = (paddle_optimizer*)malloc(sizeof(paddle_optimizer)); - container->func = paddle_SGD_update_parameter; + container->update = update_SGD; + container->release = release_SGD; container->optimizer = o; return container; } diff --git a/paddle/go/pserver/optimizer_test.go b/paddle/go/pserver/optimizer_test.go new file mode 100644 index 0000000000..64d6d092aa --- /dev/null +++ b/paddle/go/pserver/optimizer_test.go @@ -0,0 +1,8 @@ +package pserver + +import "testing" + +func TestSGDCreateRelease(t *testing.T) { + o := newOptimizer(sgd, 1) + o.Cleanup() +} From 4808e22e04c2448f6c65933197716a9bbb037766 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 17 May 2017 20:15:50 -0400 Subject: [PATCH 04/15] fix typo --- paddle/go/pserver/optimizer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paddle/go/pserver/optimizer.go b/paddle/go/pserver/optimizer.go index 8c6450bca0..64bdefe660 100644 --- a/paddle/go/pserver/optimizer.go +++ b/paddle/go/pserver/optimizer.go @@ -38,7 +38,7 @@ func (o *optimizer) UpdateParameter(p Parameter, g Gradient) error { r := C.paddle_update_parameter(o.opt, unsafe.Pointer(&p.Content[0]), C.paddle_element_type(p.ElementType), unsafe.Pointer(&g.Content[0]), C.int(len(g.Content))) if r != 0 { - return fmt.Errorf("optimier returned error code: %d", r) + return fmt.Errorf("optimizer update returned error code: %d", r) } return nil } From bc33f9b165d15254a434b3175f465dd2e4e7f70f Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 17 May 2017 20:19:14 -0400 Subject: [PATCH 05/15] fix bug lock is released too soon --- paddle/go/pserver/service.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paddle/go/pserver/service.go b/paddle/go/pserver/service.go index 0d10da9880..22f6cdf40d 100644 --- a/paddle/go/pserver/service.go +++ b/paddle/go/pserver/service.go @@ -110,7 +110,7 @@ func (s *Service) SendGrads(grads []Gradient, dummy *int) error { } s.mu.Lock() - s.mu.Unlock() + defer s.mu.Unlock() for _, g := range grads { if _, ok := s.paramMap[g.Name]; !ok { @@ -134,7 +134,7 @@ func (s *Service) SendGrads(grads []Gradient, dummy *int) error { func (s *Service) GetParams(names []string, parameters *[]Parameter) error { <-s.initialized s.mu.Lock() - s.mu.Unlock() + defer s.mu.Unlock() for _, n := range names { if _, ok := s.paramMap[n]; !ok { From e39e14d1572be690a513dff435b639221c35311d Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 17 May 2017 20:25:47 -0400 Subject: [PATCH 06/15] handle error from s.opt.UpdateParameter --- paddle/go/pserver/service.go | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/paddle/go/pserver/service.go b/paddle/go/pserver/service.go index 22f6cdf40d..47a862c5ad 100644 --- a/paddle/go/pserver/service.go +++ b/paddle/go/pserver/service.go @@ -109,6 +109,11 @@ func (s *Service) SendGrads(grads []Gradient, dummy *int) error { return ErrUnintialized } + count := len(grads) + if count == 0 { + return nil + } + s.mu.Lock() defer s.mu.Unlock() @@ -118,16 +123,25 @@ func (s *Service) SendGrads(grads []Gradient, dummy *int) error { } } - var wg sync.WaitGroup + errCh := make(chan error, count) for _, g := range grads { - wg.Add(1) go func(p Parameter, g Gradient) { - s.opt.UpdateParameter(p, g) - wg.Done() + err := s.opt.UpdateParameter(p, g) + errCh <- err }(s.paramMap[g.Name], g) } - wg.Wait() + recv := 0 + for err := range errCh { + if err != nil { + return err + } + + recv++ + if recv == count { + break + } + } return nil } From f4bc10daac82d8e43406f19faac673ae972b9152 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 17 May 2017 20:27:39 -0400 Subject: [PATCH 07/15] update comment --- paddle/go/pserver/service_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/paddle/go/pserver/service_test.go b/paddle/go/pserver/service_test.go index ebeff1fb89..78dd4d6b58 100644 --- a/paddle/go/pserver/service_test.go +++ b/paddle/go/pserver/service_test.go @@ -65,7 +65,8 @@ func TestFull(t *testing.T) { t.FailNow() } - // we don't care the content, since it's already optimized with gradient + // don't compare content, since it's already changed by + // gradient update. params1[0].Content = nil params1[0].Content = nil p.Content = nil From 9920a06cc6a4a8987b85cd2ad0d0898c74a2bacf Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 17 May 2017 20:32:40 -0400 Subject: [PATCH 08/15] rename local variable --- paddle/go/pserver/optimizer.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/paddle/go/pserver/optimizer.c b/paddle/go/pserver/optimizer.c index 36a612a56f..8d63089b4c 100644 --- a/paddle/go/pserver/optimizer.c +++ b/paddle/go/pserver/optimizer.c @@ -36,11 +36,11 @@ void release_SGD(void *optimizer) { } paddle_optimizer* paddle_create_SGD_optimizer(double learning_rate) { - SGD_optimizer* o = (SGD_optimizer*)malloc(sizeof(SGD_optimizer)); - o->learning_rate = learning_rate; - paddle_optimizer* container = (paddle_optimizer*)malloc(sizeof(paddle_optimizer)); - container->update = update_SGD; - container->release = release_SGD; - container->optimizer = o; - return container; + SGD_optimizer* impl = (SGD_optimizer*)malloc(sizeof(SGD_optimizer)); + impl->learning_rate = learning_rate; + paddle_optimizer* opt = (paddle_optimizer*)malloc(sizeof(paddle_optimizer)); + opt->update = update_SGD; + opt->release = release_SGD; + opt->optimizer = impl; + return opt; } From 27fdccc38040349125f0ab601870649a4c5d4e3e Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Fri, 19 May 2017 15:04:45 -0400 Subject: [PATCH 09/15] fix according to comments --- paddle/go/pserver/service.go | 15 +++++---------- paddle/go/pserver/service_test.go | 23 ++++++++++++----------- 2 files changed, 17 insertions(+), 21 deletions(-) diff --git a/paddle/go/pserver/service.go b/paddle/go/pserver/service.go index 47a862c5ad..3bf26b7651 100644 --- a/paddle/go/pserver/service.go +++ b/paddle/go/pserver/service.go @@ -9,8 +9,7 @@ import ( // ElementType is the type of elements of a Parameter. type ElementType int -var ErrUnintialized = errors.New("pserver not initialized") -var ErrAlreadyIntialized = errors.New("pserver already initialized") +var ErrAlreadyInitialized = errors.New("pserver already initialized") // Supported element types const ( @@ -56,7 +55,7 @@ func NewService() *Service { func (s *Service) BeginInitParams(config []byte, dummy *int) error { select { case <-s.initialized: - return ErrAlreadyIntialized + return ErrAlreadyInitialized default: } @@ -75,7 +74,7 @@ func (s *Service) BeginInitParams(config []byte, dummy *int) error { func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, dummy *int) error { select { case <-s.initialized: - return ErrAlreadyIntialized + return ErrAlreadyInitialized default: } @@ -94,7 +93,7 @@ func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, dummy *int) er func (s *Service) FinishInitParams(dummy0 int, dummy1 *int) error { select { case <-s.initialized: - return ErrAlreadyIntialized + return ErrAlreadyInitialized default: } @@ -103,11 +102,7 @@ func (s *Service) FinishInitParams(dummy0 int, dummy1 *int) error { } func (s *Service) SendGrads(grads []Gradient, dummy *int) error { - select { - case <-s.initialized: - default: - return ErrUnintialized - } + <-s.initialized count := len(grads) if count == 0 { diff --git a/paddle/go/pserver/service_test.go b/paddle/go/pserver/service_test.go index 78dd4d6b58..437d14b28c 100644 --- a/paddle/go/pserver/service_test.go +++ b/paddle/go/pserver/service_test.go @@ -98,21 +98,12 @@ func TestMultipleInit(t *testing.T) { } err = s.FinishInitParams(0, &dummy) - if err != pserver.ErrAlreadyIntialized { + if err != pserver.ErrAlreadyInitialized { t.FailNow() } err = s.BeginInitParams(nil, &dummy) - if err != pserver.ErrAlreadyIntialized { - t.FailNow() - } -} - -func TestUninitialized(t *testing.T) { - s := pserver.NewService() - var dummy int - err := s.SendGrads(nil, &dummy) - if err != pserver.ErrUnintialized { + if err != pserver.ErrAlreadyInitialized { t.FailNow() } } @@ -140,6 +131,16 @@ func TestBlockUntilInitialized(t *testing.T) { wg.Done() }() + wg.Add(1) + go func() { + var dummy int + err := s.SendGrads(nil, &dummy) + if err != nil { + t.FailNow() + } + wg.Done() + }() + var dummy int err := s.BeginInitParams(nil, &dummy) if err != nil { From 44d60bd91eb696a617f50cacc6257b9af76accc1 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Fri, 19 May 2017 15:11:23 -0400 Subject: [PATCH 10/15] add comments for exported functions --- paddle/go/pserver/service.go | 13 ++++++++++++- paddle/go/pserver/service_test.go | 2 +- 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/paddle/go/pserver/service.go b/paddle/go/pserver/service.go index 3bf26b7651..a009b45633 100644 --- a/paddle/go/pserver/service.go +++ b/paddle/go/pserver/service.go @@ -37,6 +37,7 @@ type ParameterWithConfig struct { // Gradient is the gradient of the parameter. type Gradient Parameter +// Service is the RPC service for pserver. type Service struct { initialized chan struct{} @@ -45,6 +46,7 @@ type Service struct { paramMap map[string]Parameter } +// NewService creates a new service. func NewService() *Service { s := &Service{} s.paramMap = make(map[string]Parameter) @@ -52,6 +54,8 @@ func NewService() *Service { return s } +// BeginInitParams tells the parameter server that the parameter +// initialization has begun. func (s *Service) BeginInitParams(config []byte, dummy *int) error { select { case <-s.initialized: @@ -71,6 +75,7 @@ func (s *Service) BeginInitParams(config []byte, dummy *int) error { return nil } +// InitParam initializes a parameter. func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, dummy *int) error { select { case <-s.initialized: @@ -90,6 +95,8 @@ func (s *Service) InitParam(paramWithConfigs ParameterWithConfig, dummy *int) er return nil } +// FinishInitParams tells the parameter server that the parameter +// initialization has finished. func (s *Service) FinishInitParams(dummy0 int, dummy1 *int) error { select { case <-s.initialized: @@ -101,6 +108,8 @@ func (s *Service) FinishInitParams(dummy0 int, dummy1 *int) error { return nil } +// SendGrads sends gradients to parameter servers for parameter +// optimization. func (s *Service) SendGrads(grads []Gradient, dummy *int) error { <-s.initialized @@ -140,6 +149,7 @@ func (s *Service) SendGrads(grads []Gradient, dummy *int) error { return nil } +// GetParams gets parameters from the parameter server. func (s *Service) GetParams(names []string, parameters *[]Parameter) error { <-s.initialized s.mu.Lock() @@ -166,7 +176,8 @@ func (s *Service) GetParams(names []string, parameters *[]Parameter) error { return nil } -func (s *Service) SaveModel(path string, dummy *int) error { +// Save tells the parameter server to save parameters. +func (s *Service) Save(path string, dummy *int) error { <-s.initialized // TODO diff --git a/paddle/go/pserver/service_test.go b/paddle/go/pserver/service_test.go index 437d14b28c..23b2d17dc7 100644 --- a/paddle/go/pserver/service_test.go +++ b/paddle/go/pserver/service_test.go @@ -124,7 +124,7 @@ func TestBlockUntilInitialized(t *testing.T) { wg.Add(1) go func() { var dummy int - err := s.SaveModel("", &dummy) + err := s.Save("", &dummy) if err != nil { t.FailNow() } From ea18f2eeb6bd609be321d997a3c21ded52801fa7 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Fri, 19 May 2017 15:15:46 -0400 Subject: [PATCH 11/15] make test more precise --- paddle/go/pserver/service_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/paddle/go/pserver/service_test.go b/paddle/go/pserver/service_test.go index 23b2d17dc7..6aa7f47c74 100644 --- a/paddle/go/pserver/service_test.go +++ b/paddle/go/pserver/service_test.go @@ -110,6 +110,7 @@ func TestMultipleInit(t *testing.T) { func TestBlockUntilInitialized(t *testing.T) { s := pserver.NewService() + ch := make(chan struct{}, 3) var wg sync.WaitGroup wg.Add(1) go func() { @@ -119,6 +120,7 @@ func TestBlockUntilInitialized(t *testing.T) { t.FailNow() } wg.Done() + ch <- struct{}{} }() wg.Add(1) @@ -129,6 +131,7 @@ func TestBlockUntilInitialized(t *testing.T) { t.FailNow() } wg.Done() + ch <- struct{}{} }() wg.Add(1) @@ -139,6 +142,7 @@ func TestBlockUntilInitialized(t *testing.T) { t.FailNow() } wg.Done() + ch <- struct{}{} }() var dummy int @@ -147,6 +151,13 @@ func TestBlockUntilInitialized(t *testing.T) { t.FailNow() } + select { + case <-ch: + // some function returned before initialization is completed. + t.FailNow() + default: + } + err = s.FinishInitParams(0, &dummy) if err != nil { t.FailNow() From e2fae1685de73772fb2b46ed67b4ddc0b897c83c Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Fri, 19 May 2017 15:30:53 -0400 Subject: [PATCH 12/15] SendGrad will return error if pserver is not initialized. --- paddle/go/pserver/service.go | 7 ++++++- paddle/go/pserver/service_test.go | 22 ++++++++++------------ 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/paddle/go/pserver/service.go b/paddle/go/pserver/service.go index a009b45633..f43e59403a 100644 --- a/paddle/go/pserver/service.go +++ b/paddle/go/pserver/service.go @@ -10,6 +10,7 @@ import ( type ElementType int var ErrAlreadyInitialized = errors.New("pserver already initialized") +var ErrUninitialized = errors.New("pserver not fully initialized") // Supported element types const ( @@ -111,7 +112,11 @@ func (s *Service) FinishInitParams(dummy0 int, dummy1 *int) error { // SendGrads sends gradients to parameter servers for parameter // optimization. func (s *Service) SendGrads(grads []Gradient, dummy *int) error { - <-s.initialized + select { + case <-s.initialized: + default: + return ErrUninitialized + } count := len(grads) if count == 0 { diff --git a/paddle/go/pserver/service_test.go b/paddle/go/pserver/service_test.go index 6aa7f47c74..10185bd0f2 100644 --- a/paddle/go/pserver/service_test.go +++ b/paddle/go/pserver/service_test.go @@ -108,9 +108,18 @@ func TestMultipleInit(t *testing.T) { } } +func TestUninitialized(t *testing.T) { + s := pserver.NewService() + var dummy int + err := s.SendGrads(nil, &dummy) + if err != pserver.ErrUninitialized { + t.FailNow() + } +} + func TestBlockUntilInitialized(t *testing.T) { s := pserver.NewService() - ch := make(chan struct{}, 3) + ch := make(chan struct{}, 2) var wg sync.WaitGroup wg.Add(1) go func() { @@ -134,17 +143,6 @@ func TestBlockUntilInitialized(t *testing.T) { ch <- struct{}{} }() - wg.Add(1) - go func() { - var dummy int - err := s.SendGrads(nil, &dummy) - if err != nil { - t.FailNow() - } - wg.Done() - ch <- struct{}{} - }() - var dummy int err := s.BeginInitParams(nil, &dummy) if err != nil { From 599eb3663e3124f976dfa3d487e47534df61a899 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Sun, 21 May 2017 19:29:34 -0400 Subject: [PATCH 13/15] do clang-format --- paddle/go/pserver/optimizer.c | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/paddle/go/pserver/optimizer.c b/paddle/go/pserver/optimizer.c index 8d63089b4c..b8da3ec959 100644 --- a/paddle/go/pserver/optimizer.c +++ b/paddle/go/pserver/optimizer.c @@ -2,10 +2,10 @@ #include "optimizer.h" -typedef int (*update_func)(void*, void *, paddle_element_type, const void*, int); +typedef int (*update_func)(void*, void*, paddle_element_type, const void*, int); typedef void (*release_func)(void*); -typedef struct paddle_optimizer{ +typedef struct paddle_optimizer { update_func update; release_func release; void* optimizer; @@ -16,23 +16,29 @@ void paddle_release_optimizer(paddle_optimizer* o) { free(o); } -int paddle_update_parameter(paddle_optimizer* o, void *buffer, paddle_element_type element_type, const void* gradient, int num_bytes) { +int paddle_update_parameter(paddle_optimizer* o, + void* buffer, + paddle_element_type element_type, + const void* gradient, + int num_bytes) { return o->update(o->optimizer, buffer, element_type, gradient, num_bytes); } -typedef struct { - double learning_rate; -} SGD_optimizer; +typedef struct { double learning_rate; } SGD_optimizer; -int update_SGD(void* optimizer, void *buffer, paddle_element_type element_type, const void* gradient, int num_bytes) { +int update_SGD(void* optimizer, + void* buffer, + paddle_element_type element_type, + const void* gradient, + int num_bytes) { SGD_optimizer* o = (SGD_optimizer*)optimizer; // TODO return 0; } -void release_SGD(void *optimizer) { - SGD_optimizer* o = (SGD_optimizer*)optimizer; - // nothing allocated on heap +void release_SGD(void* optimizer) { + SGD_optimizer* o = (SGD_optimizer*)optimizer; + // nothing allocated on heap } paddle_optimizer* paddle_create_SGD_optimizer(double learning_rate) { From 97dbb7609c80eb6195f18f28ddf8c7f9134a9c4b Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Sun, 21 May 2017 19:30:52 -0400 Subject: [PATCH 14/15] resolve compile caused by merging branches --- paddle/go/pserver/client.go | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/paddle/go/pserver/client.go b/paddle/go/pserver/client.go index 5b110af648..1c98aea6d1 100644 --- a/paddle/go/pserver/client.go +++ b/paddle/go/pserver/client.go @@ -1,34 +1,5 @@ package pserver -// ElementType is the type of elements of a Parameter. -type ElementType int - -// Supported element types -const ( - Int32 ElementType = iota - UInt32 - Int64 - UInt64 - Float32 - Float64 -) - -// Parameter is a piece of data to sync with the parameter server. -type Parameter struct { - Name string - ElementType ElementType - Content []byte -} - -// ParameterWithConfig contains the parameter and the configuration. -type ParameterWithConfig struct { - Param Parameter - Config []byte // parameter configuration in Proto Buffer format -} - -// Gradient is the gradient of the parameter. -type Gradient Parameter - // Client is the client to parameter servers. type Client struct { } From 25c3f118f00b601fa64d3984a7eb44c572bf287e Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Sun, 21 May 2017 20:32:55 -0400 Subject: [PATCH 15/15] apply clang-format --- paddle/go/pserver/optimizer.h | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/paddle/go/pserver/optimizer.h b/paddle/go/pserver/optimizer.h index cde8da70cc..a7e3ff0530 100644 --- a/paddle/go/pserver/optimizer.h +++ b/paddle/go/pserver/optimizer.h @@ -2,10 +2,10 @@ #define PADDLE_PSERVER_OPTIMIZER_H typedef enum { - PADDLE_ELEMENT_TYPE_INT32 = 0, - PADDLE_ELEMENT_TYPE_UINT32 = 1, - PADDLE_ELEMENT_TYPE_INT64 = 2, - PADDLE_ELEMENT_TYPE_UINT64 = 3, + PADDLE_ELEMENT_TYPE_INT32 = 0, + PADDLE_ELEMENT_TYPE_UINT32 = 1, + PADDLE_ELEMENT_TYPE_INT64 = 2, + PADDLE_ELEMENT_TYPE_UINT64 = 3, PADDLE_ELEMENT_TYPE_FLOAT32 = 4, PADDLE_ELEMENT_TYPE_FLOAT64 = 5, } paddle_element_type; @@ -13,6 +13,10 @@ typedef enum { struct paddle_optimizer; struct paddle_optimizer* paddle_create_SGD_optimizer(double learning_rate); void paddle_release_optimizer(struct paddle_optimizer* o); -int paddle_update_parameter(struct paddle_optimizer* o, void *buffer, paddle_element_type element_type, const void* gradient, int num_bytes); +int paddle_update_parameter(struct paddle_optimizer* o, + void* buffer, + paddle_element_type element_type, + const void* gradient, + int num_bytes); #endif /* PADDLE_PSERVER_OPTIMIZER_H */