|
|
|
@ -31,30 +31,36 @@ type Chunk struct {
|
|
|
|
|
Index recordio.Index // chunk index
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TaskMeta is a struct which stores task's meta info.
|
|
|
|
|
type TaskMeta struct {
|
|
|
|
|
ID int
|
|
|
|
|
Epoch int
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Task is the basic unit of data instances assigned to trainers.
|
|
|
|
|
type Task struct {
|
|
|
|
|
ID int
|
|
|
|
|
Meta TaskMeta
|
|
|
|
|
Chunks []Chunk
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type taskEntry struct {
|
|
|
|
|
Epoch int
|
|
|
|
|
NumTimeout int
|
|
|
|
|
Task Task
|
|
|
|
|
// A task fails if it's timeout or trainer reports it exits unnormally.
|
|
|
|
|
NumFailure int
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type taskQueues struct {
|
|
|
|
|
Todo []taskEntry
|
|
|
|
|
Pending map[int]taskEntry // map from task ID to task entry
|
|
|
|
|
Done []taskEntry
|
|
|
|
|
Failed []Task
|
|
|
|
|
Failed []taskEntry
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Service is the master server service.
|
|
|
|
|
type Service struct {
|
|
|
|
|
chunksPerTask int
|
|
|
|
|
timeoutDur time.Duration
|
|
|
|
|
timeoutMax int
|
|
|
|
|
failureMax int
|
|
|
|
|
ready chan struct{}
|
|
|
|
|
store Store
|
|
|
|
|
|
|
|
|
@ -73,7 +79,7 @@ func partition(chunks []Chunk, chunksPerTask int) []taskEntry {
|
|
|
|
|
var cur taskEntry
|
|
|
|
|
for i, c := range chunks {
|
|
|
|
|
if i%chunksPerTask == 0 && len(cur.Task.Chunks) > 0 {
|
|
|
|
|
cur.Task.ID = id
|
|
|
|
|
cur.Task.Meta.ID = id
|
|
|
|
|
id++
|
|
|
|
|
result = append(result, cur)
|
|
|
|
|
cur.Task.Chunks = nil
|
|
|
|
@ -83,7 +89,7 @@ func partition(chunks []Chunk, chunksPerTask int) []taskEntry {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if len(cur.Task.Chunks) > 0 {
|
|
|
|
|
cur.Task.ID = id
|
|
|
|
|
cur.Task.Meta.ID = id
|
|
|
|
|
result = append(result, cur)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -91,11 +97,11 @@ func partition(chunks []Chunk, chunksPerTask int) []taskEntry {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// NewService creates a new service.
|
|
|
|
|
func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, timeoutMax int) (*Service, error) {
|
|
|
|
|
func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, failureMax int) (*Service, error) {
|
|
|
|
|
s := &Service{}
|
|
|
|
|
s.chunksPerTask = chunksPerTask
|
|
|
|
|
s.timeoutDur = timeoutDur
|
|
|
|
|
s.timeoutMax = timeoutMax
|
|
|
|
|
s.failureMax = failureMax
|
|
|
|
|
s.taskQueues = taskQueues{}
|
|
|
|
|
s.taskQueues.Pending = make(map[int]taskEntry)
|
|
|
|
|
s.ready = make(chan struct{})
|
|
|
|
@ -257,19 +263,10 @@ func (s *Service) SetDataset(globPaths []string, dummy *int) error {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
|
|
|
|
|
return func() {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
t, ok := s.taskQueues.Pending[taskID]
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if t.Epoch != epoch {
|
|
|
|
|
func (s *Service) processFailedTask(t taskEntry, epoch int) {
|
|
|
|
|
if t.Task.Meta.Epoch != epoch {
|
|
|
|
|
// new epoch, task launched after the
|
|
|
|
|
// schedule of this timeout check.
|
|
|
|
|
// schedule of this timeout check or failed status report.
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -280,17 +277,31 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
|
|
|
|
|
delete(s.taskQueues.Pending, t.Task.ID)
|
|
|
|
|
delete(s.taskQueues.Pending, t.Task.Meta.ID)
|
|
|
|
|
|
|
|
|
|
t.NumTimeout++
|
|
|
|
|
if t.NumTimeout > s.timeoutMax {
|
|
|
|
|
log.Warningf("Task %v timed out %d times, discard.", t.Task, t.NumTimeout)
|
|
|
|
|
s.taskQueues.Failed = append(s.taskQueues.Failed, t.Task)
|
|
|
|
|
t.NumFailure++
|
|
|
|
|
if t.NumFailure > s.failureMax {
|
|
|
|
|
log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure)
|
|
|
|
|
s.taskQueues.Failed = append(s.taskQueues.Failed, t)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
log.Warningf("Task %v timed out %d times, retry.", t.Task, t.NumTimeout)
|
|
|
|
|
log.Warningf("Task %v failed %d times, discard.", t.Task, t.NumFailure)
|
|
|
|
|
s.taskQueues.Todo = append(s.taskQueues.Todo, t)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
|
|
|
|
|
return func() {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
t, ok := s.taskQueues.Pending[taskID]
|
|
|
|
|
if !ok {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.processFailedTask(t, epoch)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -339,18 +350,18 @@ func (s *Service) GetTask(dummy int, task *Task) error {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
t := s.taskQueues.Todo[0]
|
|
|
|
|
t.Epoch++
|
|
|
|
|
t.Task.Meta.Epoch++
|
|
|
|
|
s.taskQueues.Todo = s.taskQueues.Todo[1:]
|
|
|
|
|
s.taskQueues.Pending[t.Task.ID] = t
|
|
|
|
|
s.taskQueues.Pending[t.Task.Meta.ID] = t
|
|
|
|
|
err := s.snapshot()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
*task = t.Task
|
|
|
|
|
log.WithFields(s.logFields()).Infof("Task #%d dispatched.", task.ID)
|
|
|
|
|
log.WithFields(s.logFields()).Infof("Task #%v dispatched.", t.Task.Meta)
|
|
|
|
|
|
|
|
|
|
time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.ID, t.Epoch))
|
|
|
|
|
time.AfterFunc(s.timeoutDur, s.checkTimeoutFunc(t.Task.Meta.ID, t.Task.Meta.Epoch))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -365,13 +376,12 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
|
|
|
|
|
|
|
|
|
|
t, ok := s.taskQueues.Pending[taskID]
|
|
|
|
|
if !ok {
|
|
|
|
|
err := errors.New("pending task not found")
|
|
|
|
|
log.WithFields(s.logFields()).Warningln("Pending task #%d not found.", taskID)
|
|
|
|
|
return err
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// task finished, reset timeout
|
|
|
|
|
t.NumTimeout = 0
|
|
|
|
|
t.NumFailure = 0
|
|
|
|
|
s.taskQueues.Done = append(s.taskQueues.Done, t)
|
|
|
|
|
delete(s.taskQueues.Pending, taskID)
|
|
|
|
|
|
|
|
|
@ -389,3 +399,22 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
|
|
|
|
|
}
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TaskFailed tells the service that a task is failed.
|
|
|
|
|
func (s *Service) TaskFailed(meta TaskMeta, dummy *int) error {
|
|
|
|
|
select {
|
|
|
|
|
case <-s.ready:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
t, ok := s.taskQueues.Pending[meta.ID]
|
|
|
|
|
if !ok {
|
|
|
|
|
log.WithFields(s.logFields()).Warningln("TaskFailed:Pending task #%v not found.", t.Task.Meta)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
s.processFailedTask(t, meta.Epoch)
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|