From d7b5a136caecf2dbf6c17742deae70429dc43a14 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Tue, 23 May 2017 21:56:51 -0400 Subject: [PATCH 1/4] implement task handling for master server's service --- doc/design/cluster_train/master_server.md | 16 +- paddle/go/master/service.go | 179 ++++++++++++++++++++++ paddle/go/master/service_internal_test.go | 32 ++++ 3 files changed, 219 insertions(+), 8 deletions(-) create mode 100644 paddle/go/master/service.go create mode 100644 paddle/go/master/service_internal_test.go diff --git a/doc/design/cluster_train/master_server.md b/doc/design/cluster_train/master_server.md index bb83076525..4bf3c506f1 100644 --- a/doc/design/cluster_train/master_server.md +++ b/doc/design/cluster_train/master_server.md @@ -10,7 +10,7 @@ A dataset is a list of files in *RecordIO* format. A RecordIO file consists of c ## Task Queue -As mentioned in [distributed training design doc](./README.md), a *task* is a data shard that the master server assigns to the trainer process to train on. A task consists of one or multiple *blocks* from one or multiple files. The master server maintains *task queues* to track the training progress. +As mentioned in [distributed training design doc](./README.md), a *task* is a data shard that the master server assigns to the trainer process to train on. A task consists of one or multiple *chunks* from one or multiple files. The master server maintains *task queues* to track the training progress. ### Task Queue Creation @@ -21,23 +21,23 @@ As mentioned in [distributed training design doc](./README.md), a *task* is a da func (m *RPCServer) ReportDataset(Paths []string, dummy *int) error { } ``` -1. The master server will scan through each RecordIO file to generate the *block index* and know how many blocks does each file have. A block can be referenced by the file path and the index of the block within the file. The block index is in memory data structure that enables fast access to each block, and the index of the block with the file is an integer start from 0, representing the n-th block within the file. +1. The master server will scan through each RecordIO file to generate the *chunk index* and know how many chunks does each file have. A chunk can be referenced by the file path and the index of the chunk within the file. The chunk index is in memory data structure that enables fast access to each chunk, and the index of the chunk with the file is an integer start from 0, representing the n-th chunk within the file. - The definition of the block is: + The definition of the chunk is: ```go - type Block struct { - Idx int // index of the block within the file + type Chunk struct { + Idx int // index of the chunk within the file Path string - Index recordio.Index // block index + Index recordio.Index // chunk index } ``` -1. Blocks are grouped into tasks, and tasks are filled into the todo queue. The pending queue and the done queue are initialized with no element. +1. Chunks are grouped into tasks, and tasks are filled into the todo queue. The pending queue and the done queue are initialized with no element. The definition of the task is: ```go type Task struct { Index int - Blocks []Block + Chunks []Chunk } ``` diff --git a/paddle/go/master/service.go b/paddle/go/master/service.go new file mode 100644 index 0000000000..ae7f9687a5 --- /dev/null +++ b/paddle/go/master/service.go @@ -0,0 +1,179 @@ +package master + +import ( + "errors" + "log" + "sync" + "time" + + "github.com/wangkuiyi/recordio" +) + +const ( + targetTaskCount = 300 +) + +// errors +var ( + ErrNoMoreTask = errors.New("no more task for current pass") + ErrPendingTaskNotFound = errors.New("pending task not found") +) + +// Service is the master server service. +type Service struct { + timeoutDur time.Duration + timeoutMax int + + mu sync.Mutex + taskQueues taskQueues +} + +// Recover recovers service state from etcd. +func Recover() (*Service, error) { + // TODO(helin): recover from snapshot state from etcd. + return nil, nil +} + +func partition(chunks []Chunk, targetTaskCount int) []taskEntry { + id := 0 + chunkPerTask := len(chunks) / targetTaskCount + if chunkPerTask <= 0 { + chunkPerTask = 1 + } + + var result []taskEntry + var cur taskEntry + for i, c := range chunks { + if i%chunkPerTask == 0 && len(cur.Task.Chunks) > 0 { + cur.Task.ID = id + id++ + result = append(result, cur) + cur.Task.Chunks = nil + } + + cur.Task.Chunks = append(cur.Task.Chunks, c) + } + + if len(cur.Task.Chunks) > 0 { + cur.Task.ID = id + id++ + result = append(result, cur) + } + + return result +} + +// NewService creates a new service. +func NewService(chunks []Chunk, timeoutDur time.Duration, timeoutMax int) (*Service, error) { + s := &Service{} + s.timeoutDur = timeoutDur + s.timeoutMax = timeoutMax + s.taskQueues = taskQueues{} + s.taskQueues.Pending = make(map[int]taskEntry) + s.taskQueues.Todo = partition(chunks, targetTaskCount) + return s, nil +} + +// Chunk is a chunk of data consisted of several data instances. +type Chunk struct { + Idx int // index of the chunk within the file + Path string + Index recordio.Index // block index +} + +// Task is the basic unit of data instances assigned to trainers. +type Task struct { + ID int + Chunks []Chunk +} + +type taskEntry struct { + Epoch int + NumTimeout int + Task Task +} + +type taskQueues struct { + Todo []taskEntry + Pending map[int]taskEntry // map from task ID to task entry + Done []taskEntry + Failed []Task +} + +// *must* be called with s.mu being held. +func (s *Service) snapshot() error { + // TODO(helin): snapshot state on etcd. + return nil +} + +// GetTask gets a new task from the service. +func (s *Service) GetTask(dummy int, task *Task) error { + s.mu.Lock() + defer s.mu.Unlock() + + if len(s.taskQueues.Todo) == 0 { + return ErrNoMoreTask + } + + t := s.taskQueues.Todo[0] + t.Epoch++ + s.taskQueues.Todo = s.taskQueues.Todo[1:] + s.taskQueues.Pending[t.Task.ID] = t + err := s.snapshot() + if err != nil { + return err + } + + time.AfterFunc(s.timeoutDur, func(taskID int, epoch int) func() { + return func() { + s.mu.Lock() + defer s.mu.Unlock() + + t, ok := s.taskQueues.Pending[taskID] + if !ok { + return + } + + if t.Epoch != epoch { + // new epoch, task launched after the + // schedule of this timeout check. + return + } + + defer func() { + err := s.snapshot() + if err != nil { + log.Println(err) + } + }() + + delete(s.taskQueues.Pending, t.Task.ID) + + t.NumTimeout++ + if t.NumTimeout > s.timeoutMax { + s.taskQueues.Failed = append(s.taskQueues.Failed, t.Task) + return + } + + s.taskQueues.Todo = append(s.taskQueues.Todo, t) + } + }(t.Task.ID, t.Epoch)) + return nil +} + +// TaskFinished tell the service that a task is finished. +func (s *Service) TaskFinished(taskID int, dummy *int) error { + s.mu.Lock() + defer s.mu.Unlock() + + t, ok := s.taskQueues.Pending[taskID] + if !ok { + return ErrPendingTaskNotFound + } + + // task finished, reset timeout + t.NumTimeout = 0 + s.taskQueues.Done = append(s.taskQueues.Done, t) + delete(s.taskQueues.Pending, taskID) + return s.snapshot() +} diff --git a/paddle/go/master/service_internal_test.go b/paddle/go/master/service_internal_test.go new file mode 100644 index 0000000000..1e6197d241 --- /dev/null +++ b/paddle/go/master/service_internal_test.go @@ -0,0 +1,32 @@ +package master + +import "testing" + +func TestPartitionCount(t *testing.T) { + cs := make([]Chunk, 100) + ts := partition(cs, 20) + if len(ts) != 20 { + t.Error(len(ts)) + } + + cs = make([]Chunk, 101) + ts = partition(cs, 20) + if len(ts) != 21 { + t.Error(len(ts)) + } + + ts = partition(cs, 200) + if len(ts) != 101 { + t.Error(len(ts)) + } +} + +func TestPartionIndex(t *testing.T) { + cs := make([]Chunk, 100) + ts := partition(cs, 20) + for i := range ts { + if ts[i].Task.ID != i { + t.Error(ts[i], i) + } + } +} From 025e7f9cb6c4865f18f66d4c6a19b60ddbea3da2 Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Wed, 24 May 2017 20:08:55 -0400 Subject: [PATCH 2/4] implement basic master server --- paddle/go/cmd/master/master.go | 78 ++++++++++++++++++++++++++++++++++ paddle/go/master/service.go | 4 +- 2 files changed, 80 insertions(+), 2 deletions(-) create mode 100644 paddle/go/cmd/master/master.go diff --git a/paddle/go/cmd/master/master.go b/paddle/go/cmd/master/master.go new file mode 100644 index 0000000000..8346b42a32 --- /dev/null +++ b/paddle/go/cmd/master/master.go @@ -0,0 +1,78 @@ +package main + +import ( + "flag" + "net" + "net/http" + "net/rpc" + "os" + "strconv" + "strings" + "time" + + "github.com/PaddlePaddle/Paddle/paddle/go/master" + "github.com/wangkuiyi/recordio" +) + +const ( + taskTimeoutDur = 20 * time.Minute + taskTimeoutMax = 3 +) + +func main() { + port := flag.Int("p", 0, "port of the master server") + dataset := flag.String("d", "", "dataset: comma separated path to RecordIO files") + faultTolerant := flag.Bool("fault-tolerance", false, "enable fault tolerance (requires etcd).") + flag.Parse() + + if *dataset == "" { + panic("no dataset specified.") + } + + if *faultTolerant { + panic("fault tolernat not implemented.") + } + + var chunks []master.Chunk + paths := strings.Split(*dataset, ",") + idx := 0 + for _, path := range paths { + f, err := os.Open(path) + if err != nil { + panic(err) + } + + index, err := recordio.LoadIndex(f) + if err != nil { + panic(err) + } + f.Close() + + count := index.NumChunks() + for i := 0; i < count; i++ { + chunk := master.Chunk{ + Idx: idx, + Path: path, + Index: *index.ChunkIndex(i), + } + chunks = append(chunks, chunk) + } + } + + s := master.NewService(chunks, taskTimeoutDur, taskTimeoutMax) + err := rpc.Register(s) + if err != nil { + panic(err) + } + + rpc.HandleHTTP() + l, err := net.Listen("tcp", ":"+strconv.Itoa(*port)) + if err != nil { + panic(err) + } + + err = http.Serve(l, nil) + if err != nil { + panic(err) + } +} diff --git a/paddle/go/master/service.go b/paddle/go/master/service.go index ae7f9687a5..652d345e01 100644 --- a/paddle/go/master/service.go +++ b/paddle/go/master/service.go @@ -64,14 +64,14 @@ func partition(chunks []Chunk, targetTaskCount int) []taskEntry { } // NewService creates a new service. -func NewService(chunks []Chunk, timeoutDur time.Duration, timeoutMax int) (*Service, error) { +func NewService(chunks []Chunk, timeoutDur time.Duration, timeoutMax int) *Service { s := &Service{} s.timeoutDur = timeoutDur s.timeoutMax = timeoutMax s.taskQueues = taskQueues{} s.taskQueues.Pending = make(map[int]taskEntry) s.taskQueues.Todo = partition(chunks, targetTaskCount) - return s, nil + return s } // Chunk is a chunk of data consisted of several data instances. From 6ce7c8bc873b73a7cd7a5cdb1d0b861d8d3ef23a Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Thu, 25 May 2017 16:05:29 -0400 Subject: [PATCH 3/4] update recordIO include path --- paddle/go/cmd/master/master.go | 2 +- paddle/go/master/service.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/paddle/go/cmd/master/master.go b/paddle/go/cmd/master/master.go index 8346b42a32..16052fd75c 100644 --- a/paddle/go/cmd/master/master.go +++ b/paddle/go/cmd/master/master.go @@ -11,7 +11,7 @@ import ( "time" "github.com/PaddlePaddle/Paddle/paddle/go/master" - "github.com/wangkuiyi/recordio" + "github.com/PaddlePaddle/Paddle/paddle/go/recordio" ) const ( diff --git a/paddle/go/master/service.go b/paddle/go/master/service.go index 652d345e01..cf15f28cc7 100644 --- a/paddle/go/master/service.go +++ b/paddle/go/master/service.go @@ -6,7 +6,7 @@ import ( "sync" "time" - "github.com/wangkuiyi/recordio" + "github.com/PaddlePaddle/Paddle/paddle/go/recordio" ) const ( From 9b11e17d8d38e83c25be358193bb66b778cbc31c Mon Sep 17 00:00:00 2001 From: Helin Wang Date: Thu, 25 May 2017 17:11:29 -0400 Subject: [PATCH 4/4] fix according to comments --- paddle/go/cmd/master/master.go | 41 ++++++++++++++++------- paddle/go/cmd/pserver/pserver.go | 5 +-- paddle/go/master/service.go | 13 ++++--- paddle/go/master/service_internal_test.go | 11 ++++-- 4 files changed, 45 insertions(+), 25 deletions(-) diff --git a/paddle/go/cmd/master/master.go b/paddle/go/cmd/master/master.go index 16052fd75c..ef1f87c2dd 100644 --- a/paddle/go/cmd/master/master.go +++ b/paddle/go/cmd/master/master.go @@ -1,40 +1,55 @@ package main import ( - "flag" + "fmt" "net" "net/http" "net/rpc" "os" + "path/filepath" "strconv" "strings" "time" + "github.com/namsral/flag" + "github.com/PaddlePaddle/Paddle/paddle/go/master" "github.com/PaddlePaddle/Paddle/paddle/go/recordio" ) -const ( - taskTimeoutDur = 20 * time.Minute - taskTimeoutMax = 3 -) - func main() { - port := flag.Int("p", 0, "port of the master server") - dataset := flag.String("d", "", "dataset: comma separated path to RecordIO files") - faultTolerant := flag.Bool("fault-tolerance", false, "enable fault tolerance (requires etcd).") + port := flag.Int("port", 8080, "port of the master server.") + dataset := flag.String("training_dataset", "", "dataset: comma separated path to RecordIO paths, supports golb patterns.") + faultTolerance := flag.Bool("fault_tolerance", false, "enable fault tolerance (requires etcd).") + taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.") + taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.") + chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.") flag.Parse() if *dataset == "" { panic("no dataset specified.") } - if *faultTolerant { - panic("fault tolernat not implemented.") + if *faultTolerance { + panic("fault tolernance not implemented.") } var chunks []master.Chunk - paths := strings.Split(*dataset, ",") + var paths []string + ss := strings.Split(*dataset, ",") + fmt.Println(ss) + for _, s := range ss { + match, err := filepath.Glob(s) + if err != nil { + panic(err) + } + paths = append(paths, match...) + } + + if len(paths) == 0 { + panic("no valid datset specified.") + } + idx := 0 for _, path := range paths { f, err := os.Open(path) @@ -59,7 +74,7 @@ func main() { } } - s := master.NewService(chunks, taskTimeoutDur, taskTimeoutMax) + s := master.NewService(chunks, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax) err := rpc.Register(s) if err != nil { panic(err) diff --git a/paddle/go/cmd/pserver/pserver.go b/paddle/go/cmd/pserver/pserver.go index 41417875fb..bd4bfc7028 100644 --- a/paddle/go/cmd/pserver/pserver.go +++ b/paddle/go/cmd/pserver/pserver.go @@ -1,17 +1,18 @@ package main import ( - "flag" "net" "net/http" "net/rpc" "strconv" + "github.com/namsral/flag" + "github.com/PaddlePaddle/Paddle/paddle/go/pserver" ) func main() { - port := flag.Int("p", 0, "port of the pserver") + port := flag.Int("port", 0, "port of the pserver") flag.Parse() s := pserver.NewService() diff --git a/paddle/go/master/service.go b/paddle/go/master/service.go index cf15f28cc7..7526648287 100644 --- a/paddle/go/master/service.go +++ b/paddle/go/master/service.go @@ -34,17 +34,16 @@ func Recover() (*Service, error) { return nil, nil } -func partition(chunks []Chunk, targetTaskCount int) []taskEntry { +func partition(chunks []Chunk, chunksPerTask int) []taskEntry { id := 0 - chunkPerTask := len(chunks) / targetTaskCount - if chunkPerTask <= 0 { - chunkPerTask = 1 + if chunksPerTask <= 0 { + chunksPerTask = 1 } var result []taskEntry var cur taskEntry for i, c := range chunks { - if i%chunkPerTask == 0 && len(cur.Task.Chunks) > 0 { + if i%chunksPerTask == 0 && len(cur.Task.Chunks) > 0 { cur.Task.ID = id id++ result = append(result, cur) @@ -64,13 +63,13 @@ func partition(chunks []Chunk, targetTaskCount int) []taskEntry { } // NewService creates a new service. -func NewService(chunks []Chunk, timeoutDur time.Duration, timeoutMax int) *Service { +func NewService(chunks []Chunk, chunksPerTask int, timeoutDur time.Duration, timeoutMax int) *Service { s := &Service{} s.timeoutDur = timeoutDur s.timeoutMax = timeoutMax s.taskQueues = taskQueues{} s.taskQueues.Pending = make(map[int]taskEntry) - s.taskQueues.Todo = partition(chunks, targetTaskCount) + s.taskQueues.Todo = partition(chunks, chunksPerTask) return s } diff --git a/paddle/go/master/service_internal_test.go b/paddle/go/master/service_internal_test.go index 1e6197d241..bc435b505c 100644 --- a/paddle/go/master/service_internal_test.go +++ b/paddle/go/master/service_internal_test.go @@ -4,18 +4,23 @@ import "testing" func TestPartitionCount(t *testing.T) { cs := make([]Chunk, 100) - ts := partition(cs, 20) + ts := partition(cs, 5) if len(ts) != 20 { t.Error(len(ts)) } cs = make([]Chunk, 101) - ts = partition(cs, 20) + ts = partition(cs, 5) if len(ts) != 21 { t.Error(len(ts)) } - ts = partition(cs, 200) + ts = partition(cs, 1) + if len(ts) != 101 { + t.Error(len(ts)) + } + + ts = partition(cs, 0) if len(ts) != 101 { t.Error(len(ts)) }