Merge branch 'develop' into recordio

gangliao-patch-1
gongweibao 8 years ago committed by GitHub
commit e4c7d8cc2b

@ -1,23 +1,22 @@
group: deprecated-2017Q2
language: cpp
cache:
directories:
- $HOME/third_party
- $HOME/.ccache
- $HOME/.cache/pip
- $TRAVIS_BUILD_DIR/build/third_party
sudo: required
dist: trusty
os:
- linux
env:
- JOB=DOCS
- JOB=BUILD_AND_TEST
- JOB=PRE_COMMIT
- JOB=build_doc
- JOB=check_style
addons:
apt:
packages:
- gcc-4.8
- g++-4.8
- gfortran-4.8
- git
- build-essential
- python
@ -34,18 +33,7 @@ addons:
- libtool
- ccache
before_install:
- |
if [ ${JOB} == "BUILD_AND_TEST" ]; then
local change_list=`git diff --name-only $TRAVIS_COMMIT_RANGE`
if [ $? -eq 0 ]; then # if git diff return no zero, then rerun unit test.
if ! echo ${change_list} | grep -qvE '(\.md$)|(\.rst$)|(\.jpg$)|(\.png$)'
then
echo "Only markdown docs were updated, stopping build process."
exit
fi
fi
fi
- if [[ "$JOB" == "PRE_COMMIT" ]]; then sudo ln -s /usr/bin/clang-format-3.8 /usr/bin/clang-format; fi
- if [[ "$JOB" == "check_style" ]]; then sudo ln -s /usr/bin/clang-format-3.8 /usr/bin/clang-format; fi
# Paddle is using protobuf 3.1 currently. Protobuf 3.2 breaks the compatibility. So we specify the python
# protobuf version.
- pip install numpy wheel 'protobuf==3.1' sphinx==1.5.6 recommonmark sphinx-rtd-theme==0.1.9 virtualenv pre-commit requests==2.9.2 LinkChecker
@ -55,7 +43,7 @@ before_install:
function timeout() { perl -e 'alarm shift; exec @ARGV' "$@"; }
script:
- |
timeout 2580 paddle/scripts/travis/main.sh # 43min timeout
timeout 2580 paddle/scripts/travis/${JOB}.sh # 43min timeout
RESULT=$?; if [ $RESULT -eq 0 ] || [ $RESULT -eq 142 ]; then true; else false; fi;
notifications:
email:

@ -71,7 +71,7 @@ if(ANDROID)
"Disable RDMA when cross-compiling for Android" FORCE)
endif(ANDROID)
set(THIRD_PARTY_PATH "${PROJ_ROOT}/third_party" CACHE STRING
set(THIRD_PARTY_PATH "${CMAKE_BINARY_DIR}/third_party" CACHE STRING
"A path setting third party libraries download & build directories.")
if (WITH_C_API AND WITH_PYTHON)

@ -25,7 +25,7 @@ COPY ./paddle/scripts/docker/root/ /root/
RUN apt-get update && \
apt-get install -y \
git python-pip python-dev openssh-server bison \
wget unzip tar xz-utils bzip2 gzip coreutils \
wget unzip tar xz-utils bzip2 gzip coreutils ntp \
curl sed grep graphviz libjpeg-dev zlib1g-dev \
python-numpy python-matplotlib gcc g++ \
automake locales clang-format-3.8 swig doxygen cmake \

@ -21,7 +21,8 @@ IF(NOT ${CBLAS_FOUND})
SET(CBLAS_INSTALL_DIR ${THIRD_PARTY_PATH}/install/openblas)
SET(CBLAS_INC_DIR "${CBLAS_INSTALL_DIR}/include" CACHE PATH "openblas include directory." FORCE)
SET(CBLAS_LIBRARIES "${CBLAS_INSTALL_DIR}/lib/${LIBRARY_PREFIX}openblas${STATIC_LIBRARY_SUFFIX}"
SET(CBLAS_LIBRARIES
"${CBLAS_INSTALL_DIR}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}openblas${CMAKE_STATIC_LIBRARY_SUFFIX}"
CACHE FILEPATH "openblas library." FORCE)
SET(COMMON_ARGS CC=${CMAKE_C_COMPILER} NO_SHARED=1 NO_LAPACK=1 libs)

@ -14,11 +14,41 @@
INCLUDE(ExternalProject)
# Print and set the protobuf library information,
# finish this cmake process and exit from this file.
macro(PROMPT_PROTOBUF_LIB)
SET(protobuf_DEPS ${ARGN})
MESSAGE(STATUS "Protobuf protoc executable: ${PROTOBUF_PROTOC_EXECUTABLE}")
MESSAGE(STATUS "Protobuf library: ${PROTOBUF_LIBRARY}")
MESSAGE(STATUS "Protobuf version: ${PROTOBUF_VERSION}")
INCLUDE_DIRECTORIES(${PROTOBUF_INCLUDE_DIR})
# Assuming that all the protobuf libraries are of the same type.
IF(${PROTOBUF_LIBRARY} MATCHES "${CMAKE_STATIC_LIBRARY_SUFFIX}$")
SET(protobuf_LIBTYPE STATIC)
ELSEIF(${PROTOBUF_LIBRARY} MATCHES "${CMAKE_SHARED_LIBRARY_SUFFIX}$")
SET(protobuf_LIBTYPE SHARED)
ELSE()
MESSAGE(FATAL_ERROR "Unknown library type: ${PROTOBUF_LIBRARY}")
ENDIF()
ADD_LIBRARY(protobuf ${protobuf_LIBTYPE} IMPORTED GLOBAL)
SET_PROPERTY(TARGET protobuf PROPERTY IMPORTED_LOCATION ${PROTOBUF_LIBRARY})
ADD_LIBRARY(protobuf_lite ${protobuf_LIBTYPE} IMPORTED GLOBAL)
SET_PROPERTY(TARGET protobuf_lite PROPERTY IMPORTED_LOCATION ${PROTOBUF_LITE_LIBRARY})
ADD_LIBRARY(protoc ${protobuf_LIBTYPE} IMPORTED GLOBAL)
SET_PROPERTY(TARGET protoc PROPERTY IMPORTED_LOCATION ${PROTOC_LIBRARY})
FOREACH(dep ${protobuf_DEPS})
ADD_DEPENDENCIES(protobuf ${dep})
ADD_DEPENDENCIES(protobuf_lite ${dep})
ADD_DEPENDENCIES(protoc ${dep})
ENDFOREACH()
LIST(APPEND external_project_dependencies protobuf)
RETURN()
endmacro()
macro(SET_PROTOBUF_VERSION)
@ -43,22 +73,23 @@ if (NOT "${PROTOBUF_ROOT}" STREQUAL "")
endif()
FUNCTION(build_protobuf TARGET_NAME BUILD_FOR_HOST)
SET(PROTOBUF_SOURCES_DIR ${THIRD_PARTY_PATH}/${TARGET_NAME})
SET(PROTOBUF_INSTALL_DIR ${THIRD_PARTY_PATH}/install/${TARGET_NAME})
STRING(REPLACE "extern_" "" TARGET_DIR_NAME "${TARGET_NAME}")
SET(PROTOBUF_SOURCES_DIR ${THIRD_PARTY_PATH}/${TARGET_DIR_NAME})
SET(PROTOBUF_INSTALL_DIR ${THIRD_PARTY_PATH}/install/${TARGET_DIR_NAME})
SET(${TARGET_NAME}_INCLUDE_DIR "${PROTOBUF_INSTALL_DIR}/include" PARENT_SCOPE)
SET(PROTOBUF_INCLUDE_DIR "${PROTOBUF_INSTALL_DIR}/include" PARENT_SCOPE)
SET(${TARGET_NAME}_LITE_LIBRARY
"${PROTOBUF_INSTALL_DIR}/lib/libprotobuf-lite${STATIC_LIBRARY_SUFFIX}"
"${PROTOBUF_INSTALL_DIR}/lib/libprotobuf-lite${CMAKE_STATIC_LIBRARY_SUFFIX}"
PARENT_SCOPE)
SET(${TARGET_NAME}_LIBRARY
"${PROTOBUF_INSTALL_DIR}/lib/libprotobuf${STATIC_LIBRARY_SUFFIX}"
"${PROTOBUF_INSTALL_DIR}/lib/libprotobuf${CMAKE_STATIC_LIBRARY_SUFFIX}"
PARENT_SCOPE)
SET(${TARGET_NAME}_PROTOC_LIBRARY
"${PROTOBUF_INSTALL_DIR}/lib/libprotoc${STATIC_LIBRARY_SUFFIX}"
"${PROTOBUF_INSTALL_DIR}/lib/libprotoc${CMAKE_STATIC_LIBRARY_SUFFIX}"
PARENT_SCOPE)
SET(${TARGET_NAME}_PROTOC_EXECUTABLE
"${PROTOBUF_INSTALL_DIR}/bin/protoc${EXECUTABLE_SUFFIX}"
"${PROTOBUF_INSTALL_DIR}/bin/protoc${CMAKE_EXECUTABLE_SUFFIX}"
PARENT_SCOPE)
SET(OPTIONAL_CACHE_ARGS "")
@ -109,6 +140,8 @@ IF(NOT CMAKE_CROSSCOMPILING)
SET_PROTOBUF_VERSION()
IF("${PROTOBUF_VERSION}" VERSION_LESS "3.1.0")
SET(PROTOBUF_FOUND OFF)
ELSE()
PROMPT_PROTOBUF_LIB()
ENDIF()
ENDIF(PROTOBUF_FOUND)
ELSE()
@ -120,18 +153,22 @@ ELSE()
ENDIF()
IF(NOT PROTOBUF_FOUND)
build_protobuf(protobuf FALSE)
LIST(APPEND external_project_dependencies protobuf)
build_protobuf(extern_protobuf FALSE)
SET(PROTOBUF_INCLUDE_DIR ${protobuf_INCLUDE_DIR}
SET(PROTOBUF_INCLUDE_DIR ${extern_protobuf_INCLUDE_DIR}
CACHE PATH "protobuf include directory." FORCE)
IF(NOT CMAKE_CROSSCOMPILING)
SET(PROTOBUF_PROTOC_EXECUTABLE ${protobuf_PROTOC_EXECUTABLE}
SET(PROTOBUF_LITE_LIBRARY ${extern_protobuf_LITE_LIBRARY}
CACHE FILEPATH "protobuf lite library." FORCE)
SET(PROTOBUF_LIBRARY ${extern_protobuf_LIBRARY}
CACHE FILEPATH "protobuf library." FORCE)
SET(PROTOBUF_PROTOC_LIBRARY ${extern_protobuf_PROTOC_LIBRARY}
CACHE FILEPATH "protoc library." FORCE)
IF(CMAKE_CROSSCOMPILING)
PROMPT_PROTOBUF_LIB(protobuf_host extern_protobuf)
ELSE()
SET(PROTOBUF_PROTOC_EXECUTABLE ${extern_protobuf_PROTOC_EXECUTABLE}
CACHE FILEPATH "protobuf executable." FORCE)
PROMPT_PROTOBUF_LIB(extern_protobuf)
ENDIF()
SET(PROTOBUF_LITE_LIBRARY ${protobuf_LITE_LIBRARY} CACHE FILEPATH "protobuf lite library." FORCE)
SET(PROTOBUF_LIBRARY ${protobuf_LIBRARY} CACHE FILEPATH "protobuf library." FORCE)
SET(PROTOBUF_PROTOC_LIBRARY ${protobuf_PROTOC_LIBRARY} CACHE FILEPATH "protoc library." FORCE)
ENDIF(NOT PROTOBUF_FOUND)
PROMPT_PROTOBUF_LIB()

@ -84,24 +84,6 @@ IF(DEFINED CMAKE_SYSTEM_NAME)
ENDIF()
ENDIF()
# prefix and suffix on different os
IF(WIN32)
SET(LIBRARY_PREFIX "")
SET(SHARED_LIBRARY_SUFFIX ".dll")
SET(STATIC_LIBRARY_SUFFIX ".lib")
SET(EXECUTABLE_SUFFIX ".exe")
ELSE(WIN32)
SET(LIBRARY_PREFIX "lib")
IF(APPLE)
SET(SHARED_LIBRARY_SUFFIX ".dylib")
ELSE(APPLE)
SET(SHARED_LIBRARY_SUFFIX ".so")
ENDIF(APPLE)
SET(STATIC_LIBRARY_SUFFIX ".a")
SET(EXECUTABLE_SUFFIX "")
ENDIF(WIN32)
# external dependencies log output
SET(EXTERNAL_PROJECT_LOG_ARGS
LOG_DOWNLOAD 0 # Wrap download in script to log output

@ -99,3 +99,12 @@ value_printer
.. automodule:: paddle.v2.evaluator
:members: value_printer
:noindex:
Detection
=====
detection_map
-------------
.. automodule:: paddle.v2.evaluator
:members: detection_map
:noindex:

@ -1,45 +1,69 @@
package main
import (
"fmt"
"net"
"net/http"
"net/rpc"
"strconv"
"strings"
"time"
"github.com/namsral/flag"
log "github.com/sirupsen/logrus"
"github.com/PaddlePaddle/Paddle/go/master"
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
)
func main() {
port := flag.Int("port", 8080, "port of the master server.")
faultTolerance := flag.Bool("fault_tolerance", false, "enable fault tolerance (requires etcd).")
ttlSec := flag.Int("ttl", 60, "etcd lease TTL in seconds.")
endpoints := flag.String("endpoints", "http://127.0.0.1:2379", "comma separated etcd endpoints. If empty, fault tolerance will not be enabled.")
taskTimeoutDur := flag.Duration("task_timout_dur", 20*time.Minute, "task timout duration.")
taskTimeoutMax := flag.Int("task_timeout_max", 3, "max timtout count for each task before it being declared failed task.")
chunkPerTask := flag.Int("chunk_per_task", 10, "chunk per task.")
flag.Parse()
if *faultTolerance {
panic("fault tolernance not implemented.")
if *endpoints == "" {
log.Warningln("-endpoints not set, fault tolerance not be enabled.")
}
var store master.Store
if *endpoints != "" {
eps := strings.Split(*endpoints, ",")
ip, err := networkhelper.GetExternalIP()
if err != nil {
log.Fatal(err)
}
addr := fmt.Sprintf("%s:%d", ip, *port)
store, err = master.NewEtcdClient(eps, addr, master.DefaultLockPath, master.DefaultAddrPath, master.DefaultStatePath, *ttlSec)
if err != nil {
log.Fatal(err)
}
} else {
store = &master.InMemStore{}
}
s, err := master.NewService(store, *chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
if err != nil {
log.Fatal(err)
}
s := master.NewService(*chunkPerTask, *taskTimeoutDur, *taskTimeoutMax)
err := rpc.Register(s)
err = rpc.Register(s)
if err != nil {
panic(err)
log.Fatal(err)
}
rpc.HandleHTTP()
l, err := net.Listen("tcp", ":"+strconv.Itoa(*port))
if err != nil {
panic(err)
log.Fatal(err)
}
err = http.Serve(l, nil)
if err != nil {
panic(err)
log.Fatal(err)
}
}

@ -5,18 +5,36 @@ import (
"net/http"
"net/rpc"
"strconv"
"time"
"github.com/namsral/flag"
"github.com/PaddlePaddle/Paddle/go/pserver"
log "github.com/sirupsen/logrus"
)
func main() {
port := flag.Int("port", 0, "port of the pserver")
etcdEndpoint := flag.String("etcd-endpoint", "http://127.0.0.1:2379",
"comma separated endpoint string for pserver to connect to etcd")
etcdTimeout := flag.Int("etcd-timeout", 5, "timeout for etcd calls")
numPservers := flag.Int("num-pservers", 1, "total pserver count in a training job")
logLevel := flag.String("log-level", "info",
"log level, possible values: debug, info, warning, error, fatal, panic")
flag.Parse()
s := pserver.NewService()
err := rpc.Register(s)
level, err := log.ParseLevel(*logLevel)
if err != nil {
panic(err)
}
log.SetLevel(level)
timeout := time.Second * time.Duration((*etcdTimeout))
s, err := pserver.NewService(*etcdEndpoint, *numPservers, timeout)
if err != nil {
panic(err)
}
err = rpc.Register(s)
if err != nil {
panic(err)
}
@ -27,7 +45,9 @@ func main() {
panic(err)
}
log.Infof("start pserver at port %d", *port)
err = http.Serve(l, nil)
if err != nil {
panic(err)
}

@ -47,9 +47,13 @@ func TestGetFinishTask(t *testing.T) {
}
go func(l net.Listener) {
s := NewService(chunkPerTask, time.Second, 1)
s, err := NewService(&InMemStore{}, chunkPerTask, time.Second, 1)
if err != nil {
panic(err)
}
server := rpc.NewServer()
err := server.Register(s)
err = server.Register(s)
if err != nil {
panic(err)
}

@ -33,9 +33,13 @@ func TestNextRecord(t *testing.T) {
}
go func(l net.Listener) {
s := master.NewService(10, time.Second, 1)
s, err := master.NewService(&master.InMemStore{}, 10, time.Second, 1)
if err != nil {
panic(err)
}
server := rpc.NewServer()
err := server.Register(s)
err = server.Register(s)
if err != nil {
panic(err)
}

@ -0,0 +1,144 @@
package master
import (
"context"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus"
)
const (
// DefaultLockPath is the default etcd master lock path.
DefaultLockPath = "/master/lock"
// DefaultStatePath is the default etcd key for master state.
DefaultStatePath = "/master/state"
// DefaultAddrPath is the default etcd key for master address.
DefaultAddrPath = "/master/addr"
)
// EtcdClient is the etcd client that master uses for fault tolerance
// and service registry.
type EtcdClient struct {
lockPath string
statePath string
client *clientv3.Client
lock *concurrency.Mutex
}
// NewEtcdClient creates a new EtcdClient.
func NewEtcdClient(endpoints []string, addr string, lockPath, addrPath, statePath string, ttlSec int) (*EtcdClient, error) {
log.Debugf("Connecting to etcd at %v", endpoints)
// TODO(helin): gracefully shutdown etcd store. Becuase etcd
// store holds a etcd lock, even though the lock will expire
// when the lease timeout, we need to implement graceful
// shutdown to release the lock.
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: dialTimeout,
})
if err != nil {
return nil, err
}
sess, err := concurrency.NewSession(cli, concurrency.WithTTL(ttlSec))
if err != nil {
return nil, err
}
lock := concurrency.NewMutex(sess, lockPath)
// It's fine for the lock to get stuck, in this case we have
// multiple master servers running (only configured to have
// one master running, but split-brain problem may cuase
// multiple master servers running), and the cluster management
// software will kill one of them.
log.Debugf("Trying to acquire lock at %s.", lockPath)
err = lock.Lock(context.TODO())
if err != nil {
return nil, err
}
log.Debugf("Successfully acquired lock at %s.", lockPath)
put := clientv3.OpPut(addrPath, string(addr))
resp, err := cli.Txn(context.Background()).If(lock.IsOwner()).Then(put).Commit()
if err != nil {
return nil, err
}
if !resp.Succeeded {
log.Fatal("No longer owns the master lock. Exiting.")
}
e := &EtcdClient{
lockPath: lockPath,
statePath: statePath,
client: cli,
lock: lock,
}
return e, nil
}
// Save saves the state into the etcd.
func (e *EtcdClient) Save(state []byte) error {
ctx := context.TODO()
put := clientv3.OpPut(e.statePath, string(state))
resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(put).Commit()
if err != nil {
return err
}
if !resp.Succeeded {
log.Errorln("No longer owns the lock, trying to lock again")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
err := e.lock.Lock(ctx)
cancel()
if err != nil {
// We lost the master lock and can not acquire
// it back, it means some other master is
// already started. We don't want cluster
// managment system to kill the master server
// who is holding the lock and running
// correctly. So the most feasible solution is
// to kill current master server. The current
// state is not saved, but the trainer's RPC
// call will fail, so the trainer will retry.
log.Fatalf("Could not acquire the lock at %s: %v. Exiting.", e.lockPath, err)
}
log.Infof("Successfully acquired lock at %s.", e.lockPath)
return e.Save(state)
}
return nil
}
// Load loads the state from etcd.
func (e *EtcdClient) Load() ([]byte, error) {
ctx := context.TODO()
get := clientv3.OpGet(e.statePath)
resp, err := e.client.Txn(ctx).If(e.lock.IsOwner()).Then(get).Commit()
if err != nil {
return nil, err
}
if !resp.Succeeded {
log.Errorln("No longer owns the lock, trying to lock and load again.")
err = e.lock.Lock(context.Background())
if err != nil {
return nil, err
}
return e.Load()
}
kvs := resp.Responses[0].GetResponseRange().Kvs
if len(kvs) == 0 {
// No state exists
return nil, nil
}
state := kvs[0].Value
return state, nil
}

@ -0,0 +1,28 @@
package master
import "sync"
// InMemStore is an in memory implementation of Store interface.
//
// It does not tolerate the fault that casues the program to crash.
type InMemStore struct {
mu sync.Mutex
buf []byte
}
// Save saves the state into the in-memory store.
func (m *InMemStore) Save(state []byte) error {
m.mu.Lock()
defer m.mu.Unlock()
m.buf = state
return nil
}
// Load loads the state from the in-memory store.
func (m *InMemStore) Load() ([]byte, error) {
m.mu.Lock()
defer m.mu.Unlock()
return m.buf, nil
}

@ -1,6 +1,9 @@
package master
import (
"bytes"
"compress/gzip"
"encoding/gob"
"errors"
"os"
"path/filepath"
@ -12,24 +15,54 @@ import (
"github.com/PaddlePaddle/recordio"
)
const (
dialTimeout = 5 * time.Second
)
// Store is the interface for save and load the master state.
type Store interface {
Save([]byte) error
Load() ([]byte, error)
}
// Chunk is a chunk of data consisted of several data instances.
type Chunk struct {
Path string
Index recordio.Index // chunk index
}
// Task is the basic unit of data instances assigned to trainers.
type Task struct {
ID int
Chunks []Chunk
}
type taskEntry struct {
Epoch int
NumTimeout int
Task Task
}
type taskQueues struct {
Todo []taskEntry
Pending map[int]taskEntry // map from task ID to task entry
Done []taskEntry
Failed []Task
}
// Service is the master server service.
type Service struct {
chunksPerTask int
timeoutDur time.Duration
timeoutMax int
ready chan struct{}
store Store
mu sync.Mutex
initDone bool
taskQueues taskQueues
}
// Recover recovers service state from etcd.
func Recover() (*Service, error) {
// TODO(helin): recover from snapshot state from etcd.
return nil, nil
}
func partition(chunks []Chunk, chunksPerTask int) []taskEntry {
id := 0
if chunksPerTask <= 0 {
@ -58,7 +91,7 @@ func partition(chunks []Chunk, chunksPerTask int) []taskEntry {
}
// NewService creates a new service.
func NewService(chunksPerTask int, timeoutDur time.Duration, timeoutMax int) *Service {
func NewService(store Store, chunksPerTask int, timeoutDur time.Duration, timeoutMax int) (*Service, error) {
s := &Service{}
s.chunksPerTask = chunksPerTask
s.timeoutDur = timeoutDur
@ -66,38 +99,82 @@ func NewService(chunksPerTask int, timeoutDur time.Duration, timeoutMax int) *Se
s.taskQueues = taskQueues{}
s.taskQueues.Pending = make(map[int]taskEntry)
s.ready = make(chan struct{})
return s
}
s.store = store
recovered, err := s.recover()
if err != nil {
return nil, err
}
// Chunk is a chunk of data consisted of several data instances.
type Chunk struct {
Path string
Index recordio.Index // chunk index
}
if recovered {
// Recovered. Now the state is already initialized,
// and the master is ready.
s.initDone = true
close(s.ready)
log.Info("Master recovered from saved state.")
}
// Task is the basic unit of data instances assigned to trainers.
type Task struct {
ID int
Chunks []Chunk
return s, nil
}
type taskEntry struct {
Epoch int
NumTimeout int
Task Task
}
// recover recovers service state from etcd.
func (s *Service) recover() (bool, error) {
state, err := s.store.Load()
if err != nil {
return false, err
}
type taskQueues struct {
Todo []taskEntry
Pending map[int]taskEntry // map from task ID to task entry
Done []taskEntry
Failed []Task
if state == nil {
log.Infoln("No state exists, not recovered.")
return false, nil
}
log.Infof("Loaded snapshot of size: %d bytes.", len(state))
gr, err := gzip.NewReader(bytes.NewReader(state))
if err != nil {
return false, err
}
dec := gob.NewDecoder(gr)
var tqs taskQueues
err = dec.Decode(&tqs)
if err != nil {
return false, err
}
err = gr.Close()
if err != nil {
// Only close failed, recover actually succeed, so
// just log error.
log.Errorln(err)
}
s.taskQueues = tqs
return true, nil
}
// *must* be called with s.mu being held.
// snapshot *must* be called with s.mu being held.
func (s *Service) snapshot() error {
// TODO(helin): snapshot state on etcd.
return nil
// TOOD(helin): etcd request has a size limit, so the snapshot
// size is limited by the max request size. We should either
// divide the snapshot into smaller chunks and save under
// different keys, or configure the request size to be big
// enough:
// https://github.com/coreos/etcd/blob/2f84f3d8d8ed8f9537ab6ffa44a3a1c7eddfa9b1/embed/config.go#L44
var buf bytes.Buffer
gw := gzip.NewWriter(&buf)
enc := gob.NewEncoder(gw)
err := enc.Encode(s.taskQueues)
if err != nil {
return err
}
err = gw.Close()
if err != nil {
return err
}
state := buf.Bytes()
log.Infof("Saving snapshot of size: %d bytes.", len(state))
return s.store.Save(state)
}
func readChunks(globPaths []string) ([]Chunk, error) {
@ -207,12 +284,12 @@ func (s *Service) checkTimeoutFunc(taskID int, epoch int) func() {
t.NumTimeout++
if t.NumTimeout > s.timeoutMax {
log.Warningf("Task %v timed out %d times, discard.\n", t.Task, t.NumTimeout)
log.Warningf("Task %v timed out %d times, discard.", t.Task, t.NumTimeout)
s.taskQueues.Failed = append(s.taskQueues.Failed, t.Task)
return
}
log.Warningf("Task %v timed out %d times, retry.\n", t.Task, t.NumTimeout)
log.Warningf("Task %v timed out %d times, retry.", t.Task, t.NumTimeout)
s.taskQueues.Todo = append(s.taskQueues.Todo, t)
}
}

@ -133,7 +133,7 @@ func paddle_init_param(client C.paddle_pserver_client, param C.paddle_parameter,
if err != nil {
if err.Error() == pserver.AlreadyInitialized {
log.Warningf("parameter %s already initialized, treat paddle_init_param as sucessful.\n", name)
log.Warningf("parameter %s already initialized, treat paddle_init_param as sucessful.", name)
return C.PSERVER_OK
}
log.Errorln(err)
@ -200,7 +200,7 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
for i, p := range ps {
pn[i] = p.Name
}
log.Errorf("pserver returned wrong number of parameters. Requested: %s, returned: %s.\n", strings.Join(pn, ", "), strings.Join(ns, ", "))
log.Errorf("pserver returned wrong number of parameters. Requested: %s, returned: %s.", strings.Join(pn, ", "), strings.Join(ns, ", "))
return C.PSERVER_ERROR
}
@ -210,7 +210,7 @@ func paddle_get_params(client C.paddle_pserver_client, dst **C.paddle_parameter,
for i, p := range ps {
pn[i] = p.Name
}
log.Errorf("pserver returned wrong parameters, or not in requested order. Requested: %s, returned: %s.\n", strings.Join(pn, ", "), strings.Join(ns, ", "))
log.Errorf("pserver returned wrong parameters, or not in requested order. Requested: %s, returned: %s.", strings.Join(pn, ", "), strings.Join(ns, ", "))
return C.PSERVER_ERROR
}
}

@ -7,6 +7,7 @@ import (
"strconv"
"strings"
"testing"
"time"
"github.com/PaddlePaddle/Paddle/go/pserver"
)
@ -30,9 +31,12 @@ func init() {
port[i] = p
go func(l net.Listener) {
s := pserver.NewService()
s, err := pserver.NewService("", time.Second*5)
if err != nil {
panic(err)
}
server := rpc.NewServer()
err := server.Register(s)
err = server.Register(s)
if err != nil {
panic(err)
}

@ -1,9 +1,18 @@
package pserver
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"
"github.com/PaddlePaddle/Paddle/go/utils/networkhelper"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
log "github.com/sirupsen/logrus"
)
// ElementType is the type of elements of a Parameter.
@ -24,6 +33,9 @@ const (
Float64
)
// PsDesired is etcd path for store desired pserver count
const PsDesired = "/ps_desired"
// Parameter is a piece of data to sync with the parameter server.
type Parameter struct {
Name string
@ -47,14 +59,154 @@ type Service struct {
mu sync.Mutex
opt *optimizer
paramMap map[string]Parameter
etcdEndpoints string
etcdClient *clientv3.Client
// etcdTimeout is also used as retry intervals.
etcdTimeout time.Duration
// desired number of pservers in the job.
// assume desired will not change during one training job.
desired int
// FIXME: ensure GetExternalIP gets the correct ip for trainers to connect.
externalIP string
}
// NewService creates a new service.
func NewService() *Service {
// NewService creates a new service, will bypass etcd registration if no
// endpoints specified.
func NewService(endpoints string, numPservers int, timeout time.Duration) (*Service, error) {
s := &Service{opt: newOptimizer(sgd, 0.005)}
s.paramMap = make(map[string]Parameter)
s.initialized = make(chan struct{})
return s
s.etcdEndpoints = endpoints
s.etcdTimeout = timeout
var err error
s.externalIP, err = networkhelper.GetExternalIP()
if err != nil {
return nil, err
}
if endpoints != "" {
// initialize connection to etcd, try
ep := strings.Split(s.etcdEndpoints, ",")
for {
cli, err := clientv3.New(clientv3.Config{
Endpoints: ep,
DialTimeout: s.etcdTimeout,
})
if err != nil {
log.Errorf("connect to etcd error: %v", err)
time.Sleep(s.etcdTimeout)
continue
}
s.etcdClient = cli
log.Debugf("inited client to %s", s.etcdEndpoints)
break
}
// init /ps_desired using transaction, for multiple pservers may want to write
// it at the same time.
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := s.initDesiredPsercers(ctx, numPservers)
cancel()
if err != nil {
log.Warn(err)
time.Sleep(s.etcdTimeout)
continue
}
break
}
// TODO: when implementing extending or reducing pservers, /ps_desired is
// changed, then we need to watch /ps_desired node for events. For now, just
// write once when init and read from it.
// wait and set s.desired init value
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
resp, err := s.etcdClient.Get(ctx, PsDesired)
cancel()
if err != nil {
log.Errorf("getting %s error: %v", PsDesired, err)
time.Sleep(s.etcdTimeout)
continue
}
if len(resp.Kvs) != 0 {
s.desired, err = strconv.Atoi(string(resp.Kvs[0].Value))
if err != nil {
log.Errorf("value of %s invalid %v\n", PsDesired, err)
time.Sleep(s.etcdTimeout)
// NOTE: wait util ps_desired value change
continue
}
break
}
}
// try register pserver node on etcd
for {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err := s.registerPserverEtcd(ctx)
cancel()
if err != nil {
log.Warn(err)
time.Sleep(s.etcdTimeout)
continue
}
break
}
} // if endpoints != ""
// Bypass etcd registration if no endpoints specified
return s, nil
}
func (s *Service) initDesiredPsercers(ctx context.Context, numPservers int) (*clientv3.TxnResponse, error) {
return concurrency.NewSTM(s.etcdClient, func(c concurrency.STM) error {
dsStr := c.Get(PsDesired)
if dsStr == "" {
c.Put(PsDesired, strconv.Itoa(numPservers))
}
return nil
}, concurrency.WithAbortContext(ctx), concurrency.WithIsolation(concurrency.RepeatableReads))
}
// registerPserverEtcd registers pserver node on etcd using transaction.
func (s *Service) registerPserverEtcd(ctx context.Context) (*clientv3.TxnResponse, error) {
return concurrency.NewSTM(s.etcdClient, func(c concurrency.STM) error {
registered := false
for i := 0; i < s.desired; i++ {
psKey := "/ps/" + strconv.Itoa(i)
log.Debugf("checking %s", psKey)
ps := c.Get(psKey)
log.Debugf("got value (%s) for key: %s", ps, psKey)
if ps == "" {
resp, err := s.etcdClient.Grant(context.TODO(), 5)
if err != nil {
log.Fatal(err)
}
// find the first id and write info
c.Put(psKey, s.externalIP, clientv3.WithLease(resp.ID))
log.Debugf("set pserver node %s with value %s", psKey, s.externalIP)
ch, kaerr := s.etcdClient.KeepAlive(context.TODO(), resp.ID)
if kaerr != nil {
log.Errorf("keepalive etcd node error: %v", kaerr)
return kaerr
}
// Eat the keep alive message so etcd
// will not expire the lease.
go func(ch <-chan *clientv3.LeaseKeepAliveResponse) {
ka := <-ch
log.Debugf("keepalive: %d\n", ka.TTL)
}(ch)
log.Debug("register finished")
registered = true
break
}
}
if registered == true {
return nil
}
return errors.New("not registerd, may due to already have enough pservers")
}, concurrency.WithAbortContext(ctx), concurrency.WithIsolation(concurrency.RepeatableReads))
}
// InitParam initializes a parameter.

@ -10,12 +10,15 @@ import (
)
func TestFull(t *testing.T) {
s := pserver.NewService()
s, err := pserver.NewService("", time.Second*5)
if err != nil {
t.Error(err)
}
var p pserver.Parameter
p.Name = "param_a"
p.Content = []byte{1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0}
p.ElementType = pserver.Int32
err := s.InitParam(pserver.ParameterWithConfig{Param: p, Config: nil}, nil)
err = s.InitParam(pserver.ParameterWithConfig{Param: p, Config: nil}, nil)
if err != nil {
t.FailNow()
}
@ -72,8 +75,11 @@ func TestFull(t *testing.T) {
}
func TestMultipleInit(t *testing.T) {
s := pserver.NewService()
err := s.FinishInitParams(0, nil)
s, err := pserver.NewService("", time.Second*5)
if err != nil {
t.Error(err)
}
err = s.FinishInitParams(0, nil)
if err != nil {
t.FailNow()
}
@ -85,15 +91,18 @@ func TestMultipleInit(t *testing.T) {
}
func TestUninitialized(t *testing.T) {
s := pserver.NewService()
err := s.SendGrad(pserver.Gradient{}, nil)
s, err := pserver.NewService("", time.Second*5)
err = s.SendGrad(pserver.Gradient{}, nil)
if err.Error() != pserver.Uninitialized {
t.FailNow()
}
}
func TestBlockUntilInitialized(t *testing.T) {
s := pserver.NewService()
s, err := pserver.NewService("", time.Second*5)
if err != nil {
t.Error(err)
}
ch := make(chan struct{}, 2)
errCh := make(chan error, 2)
var wg sync.WaitGroup
@ -133,7 +142,7 @@ func TestBlockUntilInitialized(t *testing.T) {
p.Name = "param_a"
p.Content = []byte{1, 0, 0, 0, 2, 0, 0, 0, 3, 0, 0, 0}
p.ElementType = pserver.Int32
err := s.InitParam(pserver.ParameterWithConfig{Param: p, Config: nil}, nil)
err = s.InitParam(pserver.ParameterWithConfig{Param: p, Config: nil}, nil)
if err != nil {
t.FailNow()
}

@ -0,0 +1,45 @@
package networkhelper
import (
"errors"
"net"
)
// GetExternalIP returns the ip address of local network interface, not the
// loopback device.
func GetExternalIP() (string, error) {
ifaces, err := net.Interfaces()
if err != nil {
return "", err
}
for _, iface := range ifaces {
if iface.Flags&net.FlagUp == 0 {
continue // interface down
}
if iface.Flags&net.FlagLoopback != 0 {
continue // loopback interface
}
addrs, err := iface.Addrs()
if err != nil {
return "", err
}
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip == nil || ip.IsLoopback() {
continue
}
ip = ip.To4()
if ip == nil {
continue // not an ipv4 address
}
return ip.String(), nil
}
}
return "", errors.New("are you connected to the network?")
}

@ -0,0 +1,10 @@
package networkhelper
import "testing"
func TestGetIP(t *testing.T) {
_, err := GetExternalIP()
if err != nil {
t.Errorf("GetExternalIP returns error : %v\n", err)
}
}

@ -2,3 +2,5 @@ cc_library(ddim SRCS ddim.cc)
cc_test(ddim_test SRCS ddim_test.cc DEPS ddim)
nv_test(dim_test SRCS dim_test.cu DEPS ddim)
cc_test(variable_test SRCS variable_test.cc)

@ -1,5 +1,3 @@
//#include <stdexcept>
//#include <unittest/unittest.h>
#include <sstream>
#include <vector>

@ -0,0 +1,68 @@
/*
Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#pragma once
#include <memory>
#include <typeindex>
#include <typeinfo>
#include "paddle/platform/assert.h"
namespace paddle {
namespace framework {
class Variable {
public:
template <typename T>
const T& Get() const {
PADDLE_ASSERT(holder_ != nullptr);
PADDLE_ASSERT(std::type_index(typeid(T)) ==
std::type_index(holder_->Type()));
return *static_cast<const T*>(holder_->Ptr());
}
template <typename T>
T* GetMutable() {
if (holder_ == nullptr ||
std::type_index(typeid(T)) != std::type_index(holder_->Type())) {
holder_.reset(new PlaceholderImpl<T>(new T()));
}
return static_cast<T*>(holder_->Ptr());
}
private:
struct Placeholder {
virtual ~Placeholder() {}
virtual const std::type_info& Type() const = 0;
virtual void* Ptr() const = 0;
};
// Placeholder hides type T, so it doesn't appear as a template
// parameter of Variable.
template <typename T>
struct PlaceholderImpl : public Placeholder {
PlaceholderImpl(T* ptr) : ptr_(ptr), type_(typeid(T)) {}
virtual const std::type_info& Type() const { return type_; }
virtual void* Ptr() const { return static_cast<void*>(ptr_.get()); }
std::unique_ptr<T> ptr_;
const std::type_info& type_;
};
std::unique_ptr<Placeholder>
holder_; // pointers to a PlaceholderImpl object indeed.
};
} // namespace framework
} // namespace paddle

@ -0,0 +1,52 @@
# Design Doc: Variable
Variable is also known as *blob* in MxNet and Caffe2. It is the input and output type of operators, where a neural network is a graph of operators.
## Requirements: Lazy Memory Allocation
For the flexibility of a DL system, a variable should be able to contain any typed value -- a tensor in most cases, but could also be some integer IDs or a scope of other variables in the case of RNN.
To use the minimum amount of memory, we'd like that a variable to allocate memory when it has to, or, lazy memory allocation. Let's take the following example:
```cpp
Variable vr, v1, v2;
Tensor* t1 = new Tensor();
Tensor* t2 = new Tensor();
Randomize(
/* malloc */ v1.GetMutable<Tensor>().mutable_data<float16>(DDim(100,200)),
/* size */ t1.Size());
Randomize(
/* malloc */ v2.GetMutable<Tensor>().mutable_data<float16>(DDim(200,300)),
/* size */ t2.Size());
Mult(
/*result*/ vr.GetMutable<Tensor>().mutable_data<v1.Type()>(SizeOfMult(v1, v2)),
/*input1*/ v1.Get<Tensor>().data(),
/*input2*/ v2.Get<Tensor>().data());
```
We see that a variable holds nothing until `Variable::GetMutable<Tensor>()` allocates a tensor and puts it in the variable. Similarly, a tensor gets its memory until `Tensor::mutable_data()`.
This syntax for lazy memory allocation when we call `Randomize` and `Mult`, those functions that mutate the variable, so it saves us some line of C++ code.
## Implementation: Type Hiding
To make memory allocation lazy, we cannot assume that we know the type held by a variable at definition time. In other words, `class Variable` cannot be a template `template <T> class Variable`.
Because we don't know the type `T`, we cannot save a `T*` as `Variable's` data member. Instead, we save an interface object `Placeholder`, who can return the pointer to the saved object via `Placeholder::Ptr()` as `void*`.
But anyway, Variable needs to know `T` so could it `delete<T>(ptr)` and so could `Variable::Get` checks the expected type and the saved object's type.
We save `T` in `PlaceholderImpl`, the implementation of `Placeholder`. Please be aware that `PlaceholderImpl` is a class template and `T` is passed in as a template parameter.
Because `PlaceholderImpl` knows `T`, it can save and return `typeid(T)` for the type comparison in `Variable::Get` and `Variable::GetMutable`.
## Conclusion
The technique type hiding utilizes C++ class templates, interface and derivation, and C++ RTTI (typeid). This combination saves us from definition something like `caffe2::TypeMata`, which takes hundreds of lines of C++ code.

@ -0,0 +1,40 @@
/*
Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include <memory>
#include <string>
#include "gtest/gtest.h"
#include "paddle/framework/variable.h"
TEST(Variable, GetMutable) {
using paddle::framework::Variable;
struct Tensor {
int content_;
};
std::unique_ptr<Variable> v(new Variable());
Tensor* t = v->GetMutable<Tensor>();
t->content_ = 1234;
const Tensor& tt = v->Get<Tensor>();
EXPECT_EQ(1234, tt.content_);
std::string* s = v->GetMutable<std::string>();
*s = "hello";
const std::string& ss = v->Get<std::string>();
EXPECT_EQ("hello", ss);
}

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save