@ -25,7 +25,7 @@ import (
"sync"
"time"
log "github.com/ sirupsen/logrus "
log "github.com/ inconshreveable/log15 "
"github.com/PaddlePaddle/recordio"
)
@ -170,11 +170,11 @@ func (s *Service) recover() (bool, error) {
}
if state == nil {
log . Info ln ( "No state exists, not recovered." )
log . Info ( "No state exists, not recovered." )
return false , nil
}
log . Info f( "Loaded snapshot of size: %d bytes." , len ( state ) )
log . Info ( "Loaded snapshot." , log . Ctx { "size" : len ( state ) } )
gr , err := gzip . NewReader ( bytes . NewReader ( state ) )
if err != nil {
return false , err
@ -191,11 +191,11 @@ func (s *Service) recover() (bool, error) {
if err != nil {
// Only close failed, recover actually succeed, so
// just log error.
log . Error ln( err )
log . Error ( "error close recover file." , log . Ctx { "error" : err } )
}
s . state = tqs
log . WithFields( s . logFields ( ) ) . Infof ( "Master recovered from snapshot, scheduling pending task timeout check." )
log . Info( "Master recovered from snapshot, scheduling pending task timeout check." , s . logCtx ( ) )
for _ , t := range s . state . Pending {
time . AfterFunc ( s . timeoutDur , s . checkTimeoutFunc ( t . Task . Meta . ID , t . Task . Meta . Epoch ) )
}
@ -224,7 +224,7 @@ func (s *Service) snapshot() error {
}
state := buf . Bytes ( )
log . Info f( "Saving snapshot of size: %d bytes." , len ( state ) )
log . Info ( "Saving snapshot." , log . Ctx { "size bytes" : len ( state ) } )
return s . store . Save ( state )
}
@ -260,7 +260,7 @@ func readChunks(globPaths []string) ([]Chunk, error) {
}
count := index . NumChunks ( )
log . Info f( "readChunks: file %s has %d chunks" , path , count )
log . Info ( "reading chunks." , log . Ctx { "path" : path , "num chunks" : count } )
for i := 0 ; i < count ; i ++ {
chunk := Chunk {
Path : path ,
@ -300,7 +300,7 @@ func (s *Service) SetDataset(globPaths []string, _ *int) error {
err = s . snapshot ( )
if err != nil {
log . Error ln ( err)
log . Error ( "snapshot error" , log. Ctx { " error": err } )
return err
}
close ( s . ready )
@ -320,7 +320,7 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) {
defer func ( ) {
err := s . snapshot ( )
if err != nil {
log . Error ln ( err)
log . Error ( "snapshot error" , log. Ctx { " error": err } )
}
} ( )
@ -328,12 +328,12 @@ func (s *Service) processFailedTask(t taskEntry, epoch int) {
t . NumFailure ++
if t . NumFailure > s . failureMax {
log . Warn ingf( "Task %v failed %d times, discard." , t . Task , t . NumFailure )
log . Warn ( "Task failed to many times, discard." , log . Ctx { "task" : t . Task , "num failed" : t . NumFailure } )
s . state . Failed = append ( s . state . Failed , t )
return
}
log . Warn ingf ( "Task %v failed %d times , re-dispatch.", t. Task , t . NumFailure )
log . Warn ( "Task failed, re-dispatch.", log. Ctx { "task" : t. Task , "num failed" : t . NumFailure } )
s . state . Todo = append ( s . state . Todo , t )
return
}
@ -353,8 +353,8 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
}
// must be called with lock held.
func ( s * Service ) log Fields( ) log . Fields {
return log . Fields {
func ( s * Service ) log Ctx( ) log . Ctx {
return log . Ctx {
"todoLen" : len ( s . state . Todo ) ,
"pendingLen" : len ( s . state . Pending ) ,
"doneLen" : len ( s . state . Done ) ,
@ -383,10 +383,10 @@ func (s *Service) GetTask(passID int, task *Task) error {
if len ( s . state . Todo ) == 0 {
if len ( s . state . Done ) == 0 && len ( s . state . Pending ) == 0 {
log . W ithFields( s . logFields ( ) ) . W arningl n( "All tasks failed, may start next pass" )
log . W arn( "All tasks failed, may start next pass" , s . logCtx ( ) )
return ErrAllTaskFailed
}
log . W ithFields( s . logFields ( ) ) . W arningl n( "No more available task." )
log . W arn( "No more available task." , s . logCtx ( ) )
return ErrNoMoreAvailable
}
@ -400,8 +400,9 @@ func (s *Service) GetTask(passID int, task *Task) error {
}
* task = t . Task
log . WithFields ( s . logFields ( ) ) . Infof ( "Task #%v dispatched." , t . Task . Meta )
ctx := s . logCtx ( )
ctx [ "task meta" ] = t . Task . Meta
log . Info ( "Task dispatched." , ctx )
time . AfterFunc ( s . timeoutDur , s . checkTimeoutFunc ( t . Task . Meta . ID , t . Task . Meta . Epoch ) )
return nil
}
@ -417,7 +418,9 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
t , ok := s . state . Pending [ taskID ]
if ! ok {
log . WithFields ( s . logFields ( ) ) . Warningln ( "Pending task #%d not found." , taskID )
ctx := s . logCtx ( )
ctx [ "task id" ] = taskID
log . Warn ( "Pending task not found." , ctx )
return nil
}
@ -426,7 +429,9 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
s . state . Done = append ( s . state . Done , t )
delete ( s . state . Pending , taskID )
log . WithFields ( s . logFields ( ) ) . Infof ( "Task #%d finished." , taskID )
ctx := s . logCtx ( )
ctx [ "task id" ] = taskID
log . Info ( "Task finished." , ctx )
if len ( s . state . Todo ) == 0 && len ( s . state . Pending ) == 0 {
// increase master side pass count if all tasks finished
s . state . CurPass ++
@ -434,12 +439,14 @@ func (s *Service) TaskFinished(taskID int, dummy *int) error {
s . state . Done = [ ] taskEntry { }
// TODO(typhoonzero): deal with failed tasks
s . state . Failed = [ ] taskEntry { }
log . WithFields ( s . logFields ( ) ) . Warningf ( "all task finished, add new pass data, newpass: %d." , s . state . CurPass )
ctx := s . logCtx ( )
ctx [ "new pass" ] = s . state . CurPass
log . Warn ( "all task finished, add new pass data." , ctx )
}
err := s . snapshot ( )
if err != nil {
log . Error ln ( err)
log . Error ( "snapshot error" , log. Ctx { " error": err } )
}
return err
}
@ -455,7 +462,7 @@ func (s *Service) TaskFailed(meta TaskMeta, dummy *int) error {
t , ok := s . state . Pending [ meta . ID ]
if ! ok {
log . W ithFields( s . logFields ( ) ) . W arningl n( "TaskFailed:Pending task #%v not found.", t. Task . Meta )
log . W arn( "TaskFailed:Pending task not found.", log. Ctx { "task" : t. Task . Meta } )
return nil
}