commit
85880b87ad
@ -0,0 +1,91 @@
|
||||
# Design Doc: Master Server
|
||||
|
||||
For an overview of master server's role, please refer to [distributed training design doc](./README.md). In this design doc we will discuss the master server in more details. The master will be implemented in [Go](https://golang.org/).
|
||||
|
||||
## Dataset
|
||||
|
||||
<img src="src/dataset.png"/>
|
||||
|
||||
A dataset is a list of files in *RecordIO* format. A RecordIO file consists of chunks, whereas each chunk consists some records.
|
||||
|
||||
## Task Queue
|
||||
|
||||
As mentioned in [distributed training design doc](./README.md), a *task* is a data shard that the master server assigns to the trainer process to train on. A task consists of one or multiple *blocks* from one or multiple files. The master server maintains *task queues* to track the training progress.
|
||||
|
||||
### Task Queue Creation
|
||||
|
||||
1. Each trainer will make an RPC call (using Go's [rpc](https://golang.org/pkg/net/rpc/) package) to the master server, telling it the RecordIO files representing the dataset specified by the user. Since every trainer will tell the master server the same dataset, only the first RPC call will be honored.
|
||||
|
||||
The RPC interface is:
|
||||
```go
|
||||
func (m *RPCServer) ReportDataset(Paths []string, dummy *int) error {
|
||||
}
|
||||
```
|
||||
1. The master server will scan through each RecordIO file to generate the *block index* and know how many blocks does each file have. A block can be referenced by the file path and the index of the block within the file. The block index is in memory data structure that enables fast access to each block, and the index of the block with the file is an integer start from 0, representing the n-th block within the file.
|
||||
|
||||
The definition of the block is:
|
||||
```go
|
||||
type Block struct {
|
||||
Idx int // index of the block within the file
|
||||
Path string
|
||||
Index recordio.Index // block index
|
||||
}
|
||||
```
|
||||
1. Blocks are grouped into tasks, and tasks are filled into the todo queue. The pending queue and the done queue are initialized with no element.
|
||||
|
||||
The definition of the task is:
|
||||
```go
|
||||
type Task struct {
|
||||
Index int
|
||||
Blocks []Block
|
||||
}
|
||||
```
|
||||
|
||||
The elements in the tasks queues is of type `TaskEntry`, containing a timeout counter (described in [task retry logic](#task-retry-logic)), and a task:
|
||||
```go
|
||||
type TaskEntry struct {
|
||||
NumTimeout int
|
||||
Task Task
|
||||
}
|
||||
```
|
||||
|
||||
The definition of task queues is:
|
||||
```go
|
||||
type TaskQueues struct {
|
||||
Todo []TaskEntry
|
||||
Pending map[int]TaskEntry // map from task index to task entry
|
||||
Done []TaskEntry
|
||||
}
|
||||
```
|
||||
|
||||
### Task Queue Persistence
|
||||
|
||||
The task queues need to be persisted on [etcd](https://github.com/coreos/etcd) for fault recovery. Since the task queues only change once a task is completed or timed out, which is not very frequent, we can afford to synchronize with etcd every time the task queues change.
|
||||
|
||||
We will serialize the task queues data structure with [gob encoding](https://golang.org/pkg/encoding/gob/), compress with gzip, and save into etcd synchronously under key `/task_queues`.
|
||||
|
||||
### Task Dispatch
|
||||
|
||||
The trainer will make an RPC call to master to get a new task when:
|
||||
|
||||
- the trainer first started, or
|
||||
- the trainer finishes a task.
|
||||
|
||||
The RPC interface is:
|
||||
```go
|
||||
func (m *RPCServer) GetTask(finished *Task, result *Task) error {
|
||||
}
|
||||
```
|
||||
Argument `finished` will be `nil` when the trainer is just started.
|
||||
|
||||
During the RPC call the master will do the following:
|
||||
|
||||
- Make a copy of the task queues, and update the copy reflecting the finished tasks and the new pending tasks.
|
||||
- Synchronize the copy of task queues with etcd using a transaction conditioned on holding the master lock.
|
||||
- Replace the task queues with the copy and report to the trainer with the new tasks if succeeded, or discard the copy and report the error to the trainer if failed.
|
||||
|
||||
### Task Retry Logic
|
||||
|
||||
When a task is dispatched to the trainer, the master will schedule a function for execution after the timeout duration (based on the moving average of task completion time). If the task entry in still in the pending queue, its timeout counter will increase by one, and the task will be moved to todo queue. If the timeout counter is above the threshold, the master will log the error and discard the task.
|
||||
|
||||
Please note that since a timed out task could be completed after it has been dispatched for retry, so it is possible for a task to be processed multiple times. We do not try to prevent it from happening since it's fine to train on the same task multiple times due to the stochastic nature of the stochastic gradient decent algorithm.
|
Binary file not shown.
After Width: | Height: | Size: 11 KiB |
@ -0,0 +1,52 @@
|
||||
import paddle.v2 as paddle
|
||||
import numpy as np
|
||||
|
||||
# init paddle
|
||||
paddle.init(use_gpu=False)
|
||||
|
||||
# network config
|
||||
x = paddle.layer.data(name='x', type=paddle.data_type.dense_vector(2))
|
||||
y_predict = paddle.layer.fc(input=x, size=1, act=paddle.activation.Linear())
|
||||
y = paddle.layer.data(name='y', type=paddle.data_type.dense_vector(1))
|
||||
cost = paddle.layer.mse_cost(input=y_predict, label=y)
|
||||
|
||||
# create parameters
|
||||
parameters = paddle.parameters.create(cost)
|
||||
# create optimizer
|
||||
optimizer = paddle.optimizer.Momentum(momentum=0)
|
||||
# create trainer
|
||||
trainer = paddle.trainer.SGD(cost=cost,
|
||||
parameters=parameters,
|
||||
update_equation=optimizer)
|
||||
|
||||
|
||||
# event_handler to print training info
|
||||
def event_handler(event):
|
||||
if isinstance(event, paddle.event.EndIteration):
|
||||
if event.batch_id % 1 == 0:
|
||||
print "Pass %d, Batch %d, Cost %f" % (event.pass_id, event.batch_id,
|
||||
event.cost)
|
||||
|
||||
|
||||
# define training dataset reader
|
||||
def train_reader():
|
||||
train_x = np.array([[1, 1], [1, 2], [3, 4], [5, 2]])
|
||||
train_y = np.array([-2, -3, -7, -7])
|
||||
|
||||
def reader():
|
||||
for i in xrange(train_y.shape[0]):
|
||||
yield train_x[i], train_y[i]
|
||||
|
||||
return reader
|
||||
|
||||
|
||||
# define feeding map
|
||||
feeding = {'x': 0, 'y': 1}
|
||||
|
||||
# training
|
||||
trainer.train(
|
||||
reader=paddle.batch(
|
||||
train_reader(), batch_size=1),
|
||||
feeding=feeding,
|
||||
event_handler=event_handler,
|
||||
num_passes=100)
|
@ -1,68 +0,0 @@
|
||||
graph pp_topology {
|
||||
rankdir=BT;
|
||||
subgraph cluster_node0 {
|
||||
style=filled;
|
||||
color=lightgrey;
|
||||
node [style=filled, color=white, shape=box];
|
||||
label = "机器0"
|
||||
|
||||
pserver0 [label="Parameter \n Server 0"]
|
||||
trainer0 [label="Trainer 0"]
|
||||
}
|
||||
subgraph cluster_node1 {
|
||||
style=filled;
|
||||
color=lightgrey;
|
||||
node [style=filled, color=white, shape=box];
|
||||
label = "机器1"
|
||||
|
||||
pserver1 [label="Parameter \n Server 1"]
|
||||
trainer1 [label="Trainer 1"]
|
||||
}
|
||||
|
||||
subgraph cluster_node2 {
|
||||
style=filled;
|
||||
color=lightgrey;
|
||||
node [style=filled, color=white, shape=box];
|
||||
label = "机器2"
|
||||
|
||||
pserver2 [label="Parameter \n Server 2"]
|
||||
trainer2 [label="Trainer 2"]
|
||||
}
|
||||
|
||||
subgraph cluster_node3 {
|
||||
style=filled;
|
||||
color=lightgrey;
|
||||
node [style=filled, color=white, shape=box];
|
||||
label = "机器3"
|
||||
|
||||
pserver3 [label="Parameter \n Server 3"]
|
||||
trainer3 [label="Trainer 3"]
|
||||
}
|
||||
|
||||
data [label="数据", shape=hexagon]
|
||||
|
||||
trainer0 -- pserver0
|
||||
trainer0 -- pserver1
|
||||
trainer0 -- pserver2
|
||||
trainer0 -- pserver3
|
||||
|
||||
trainer1 -- pserver0
|
||||
trainer1 -- pserver1
|
||||
trainer1 -- pserver2
|
||||
trainer1 -- pserver3
|
||||
|
||||
trainer2 -- pserver0
|
||||
trainer2 -- pserver1
|
||||
trainer2 -- pserver2
|
||||
trainer2 -- pserver3
|
||||
|
||||
trainer3 -- pserver0
|
||||
trainer3 -- pserver1
|
||||
trainer3 -- pserver2
|
||||
trainer3 -- pserver3
|
||||
|
||||
data -- trainer0
|
||||
data -- trainer1
|
||||
data -- trainer2
|
||||
data -- trainer3
|
||||
}
|
@ -1,29 +0,0 @@
|
||||
from paddle.trainer_config_helpers import *
|
||||
|
||||
define_py_data_sources2(
|
||||
train_list='train.list',
|
||||
test_list='test.list',
|
||||
module='provider',
|
||||
obj='process')
|
||||
settings(
|
||||
batch_size=128,
|
||||
learning_rate=1e-3,
|
||||
learning_method=AdamOptimizer(),
|
||||
regularization=L2Regularization(0.5))
|
||||
|
||||
img = data_layer(name='pixel', size=28 * 28)
|
||||
|
||||
hidden1 = simple_img_conv_pool(
|
||||
input=img, filter_size=3, num_filters=32, pool_size=3, num_channel=1)
|
||||
|
||||
hidden2 = fc_layer(
|
||||
input=hidden1,
|
||||
size=200,
|
||||
act=TanhActivation(),
|
||||
layer_attr=ExtraAttr(drop_rate=0.5))
|
||||
predict = fc_layer(input=hidden2, size=10, act=SoftmaxActivation())
|
||||
|
||||
outputs(
|
||||
classification_cost(
|
||||
input=predict, label=data_layer(
|
||||
name='label', size=10)))
|
@ -0,0 +1,2 @@
|
||||
build
|
||||
third-party
|
@ -0,0 +1,34 @@
|
||||
cmake_minimum_required(VERSION 3.0)
|
||||
|
||||
if(GTEST_INCLUDE_DIR AND GTEST_LIBRARIES)
|
||||
message("-- Found gtest (include: ${GTEST_INCLUDE_DIR}, library: ${GTEST_LIBRARIES})")
|
||||
else()
|
||||
# find #include <majel/xx.h>
|
||||
get_filename_component(PARENT_DIR ${CMAKE_CURRENT_SOURCE_DIR} DIRECTORY)
|
||||
include_directories(${PARENT_DIR})
|
||||
|
||||
# find cmake directory modules
|
||||
get_filename_component(PARENT_DIR ${PARENT_DIR} DIRECTORY)
|
||||
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${PARENT_DIR}/cmake")
|
||||
|
||||
# enable c++11
|
||||
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11")
|
||||
|
||||
# enable gtest
|
||||
set(THIRD_PARTY_PATH ${CMAKE_CURRENT_SOURCE_DIR}/third_party)
|
||||
set(WITH_TESTING ON)
|
||||
include(external/gtest)
|
||||
endif()
|
||||
|
||||
########################### Build Majel #############################
|
||||
set(MAJEL_CXX_FILES place.cpp)
|
||||
set(MAJEL_CUDA_FILES "")
|
||||
|
||||
if(CUDA_FOUND)
|
||||
cuda_add_library(majel ${MAJEL_CUDA_FILES} ${MAJEL_CXX_FILES})
|
||||
else()
|
||||
add_library(majel ${MAJEL_CXX_FILES})
|
||||
endif()
|
||||
#####################################################################
|
||||
|
||||
add_subdirectory(test)
|
@ -0,0 +1,126 @@
|
||||
# Tensor: An Unified Data Type in PaddlePaddle
|
||||
|
||||
## Pain Point
|
||||
|
||||
In this week, we discussed several potential weaknesses of PaddlePaddle caused by rapid iteration and development to promote new business products on the line in recent four years. For instance, current Matrix/Vector implementation in PaddlePaddle are long and tedious to read, which interfered seriously with the contribution of both fresh and professional engineers. More seriously for this issue, it will also become too challenging to maintain over time.
|
||||
|
||||
|
||||
## Learn from Majel
|
||||
|
||||
Consequently, we decide to refactor PaddlePaddle step-by-step. First, refactor and replace Matrix/Vector to Tensor, a modern terminology in the deep learning system. Fortunately, we can learn from Majel how to define a Tensor.
|
||||
|
||||
To simplify heterogeneous resource allocation in any dimensions (1-9) and types (double, float, float16), Majel consists of several primitives such as `Dim`, `Place` and `Array`, all of them are standard C++ class templates.
|
||||
|
||||
1. `Place`: memory location [i.e. CPU/GPU].
|
||||
2. `Allocation`: heterogeneous resource allocator [i.e. 20MB in GPU].
|
||||
3. `Dim`: size of each dimension. [i.e. Dim<4>({10, 2, 5, 1})]
|
||||
4. `Array`: dynamic array consists of `Place`, `Dim`, and a pointer to memory.
|
||||
|
||||
If you dig deeper into Majel source code, you will find Majel heavily use `boost.variant`. The variant class template is a safe, generic, stack-based discriminated union container, **offering a simple solution for manipulating an object from a heterogeneous set of types in a uniform manner**. Whereas standard containers such as std::vector may be thought of as "multi-value, single type," variant is "multi-type, single value."
|
||||
|
||||
As a simple example, consider the following:
|
||||
|
||||
```c++
|
||||
#include "boost/variant.hpp"
|
||||
#include <iostream>
|
||||
|
||||
class my_visitor : public boost::static_visitor<int>
|
||||
{
|
||||
public:
|
||||
int operator()(int i) const
|
||||
{
|
||||
return i;
|
||||
}
|
||||
|
||||
int operator()(const std::string & str) const
|
||||
{
|
||||
return str.length();
|
||||
}
|
||||
};
|
||||
|
||||
int main()
|
||||
{
|
||||
boost::variant< int, std::string > u("hello world");
|
||||
std::cout << u; // output: hello world
|
||||
|
||||
int result = boost::apply_visitor( my_visitor(), u );
|
||||
std::cout << result; // output: 11 (i.e., length of "hello world")
|
||||
}
|
||||
```
|
||||
|
||||
In Majel, `DDimVar` is derived from `Dim`, `DArrayVar` is from `Array`.
|
||||
|
||||
```c++
|
||||
template<int i>
|
||||
struct Dim {
|
||||
...
|
||||
int head;
|
||||
Dim<i-1> tail;
|
||||
}
|
||||
```
|
||||
|
||||
```c++
|
||||
template<typename T, int D>
|
||||
class Array : public Buffer {
|
||||
...
|
||||
private:
|
||||
Dim<D> size_;
|
||||
Dim<D> stride_;
|
||||
T* ptr_;
|
||||
};
|
||||
```
|
||||
|
||||
```c++
|
||||
typedef boost::variant<GpuPlace, CpuPlace> Place;
|
||||
typedef boost::variant<Dim<1>, Dim<2>, Dim<3>, Dim<4>, Dim<5>,
|
||||
Dim<6>, Dim<7>, Dim<8>, Dim<9>> DDimVar;
|
||||
typedef boost::variant<
|
||||
Array<float, 1>,
|
||||
Array<float, 2>,
|
||||
Array<float, 3>,
|
||||
Array<float, 4>,
|
||||
|
||||
Array<double, 1>,
|
||||
Array<double, 2>,
|
||||
Array<double, 3>,
|
||||
Array<double, 4>,
|
||||
|
||||
Array<float16, 1>,
|
||||
Array<float16, 2>,
|
||||
Array<float16, 3>,
|
||||
Array<float16, 4> > DArrayVar;
|
||||
```
|
||||
|
||||
Because `variant` may be thought of as "multi-type, single value", we can utilize it to implement unified interfaces for PaddlePaddle.
|
||||
|
||||
## implement Tensor in Paddle
|
||||
|
||||
Before writing code, please make sure you already look through Majel Source Code and grabbed the design philosophy of `DArray` in Majel.
|
||||
|
||||
To assign subtasks to our colleagues, we have to discuss how to divide it to independent subtasks.
|
||||
|
||||
- [ ] 1. First, we need to consider the third-party dependencies in Majel.
|
||||
|
||||
Majel heavily use `boost.variant`, but we don't want to integrate `boost` into PaddlePaddle. It's better to replace boost using the lightweight implementation. https://github.com/mapbox/variant Mapbox variant has the same speedy performance of `boost::variant `but is faster to compile, results in smaller binaries, and has no dependencies.
|
||||
|
||||
> @gangliao
|
||||
|
||||
- [ ] 2. Re-implement `Place` and `Allocation/Memory`
|
||||
|
||||
I found @wangkuiyi submitted a pull request includes `Place`. @gangliao and @qijun could re-implement `Allocation`, because we have the GPU development experience before joining Paddle team.
|
||||
|
||||
> @wangkuiyi @gangliao @qijun
|
||||
|
||||
- [ ] 3. Re-implement `Dim`.
|
||||
|
||||
`Dim` is an excellent implementation in Majel.
|
||||
|
||||
> ???
|
||||
|
||||
- [ ] 4. Re-implement `Array/Tensor`.
|
||||
|
||||
> Prerequisites: 1 - 3
|
||||
|
||||
- [ ] 5. Re-implement fundamental operators for `Array/Tensor`.
|
||||
|
||||
> Prerequisites: 1 - 4
|
@ -0,0 +1,49 @@
|
||||
#include <majel/place.h>
|
||||
|
||||
namespace majel {
|
||||
|
||||
namespace detail {
|
||||
|
||||
class PlacePrinter : public boost::static_visitor<> {
|
||||
private:
|
||||
std::ostream& os_;
|
||||
|
||||
public:
|
||||
PlacePrinter(std::ostream& os) : os_(os) {}
|
||||
|
||||
void operator()(const CpuPlace&) { os_ << "CpuPlace"; }
|
||||
|
||||
void operator()(const GpuPlace& p) { os_ << "GpuPlace(" << p.device << ")"; }
|
||||
};
|
||||
|
||||
} // namespace majel
|
||||
|
||||
static Place the_default_place;
|
||||
|
||||
void set_place(const Place& place) { the_default_place = place; }
|
||||
|
||||
const Place& get_place() { return the_default_place; }
|
||||
|
||||
const GpuPlace default_gpu() { return GpuPlace(0); }
|
||||
|
||||
const CpuPlace default_cpu() { return CpuPlace(); }
|
||||
|
||||
bool is_gpu_place(const Place& p) {
|
||||
return boost::apply_visitor(IsGpuPlace(), p);
|
||||
}
|
||||
|
||||
bool is_cpu_place(const Place& p) {
|
||||
return !boost::apply_visitor(IsGpuPlace(), p);
|
||||
}
|
||||
|
||||
bool places_are_same_class(const Place& p1, const Place& p2) {
|
||||
return is_gpu_place(p1) == is_gpu_place(p2);
|
||||
}
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, const majel::Place& p) {
|
||||
majel::detail::PlacePrinter printer(os);
|
||||
boost::apply_visitor(printer, p);
|
||||
return os;
|
||||
}
|
||||
|
||||
} // namespace majel
|
@ -0,0 +1,50 @@
|
||||
#pragma once
|
||||
#include <boost/variant.hpp>
|
||||
#include <iostream>
|
||||
|
||||
namespace majel {
|
||||
|
||||
struct CpuPlace {
|
||||
CpuPlace() {} // WORKAROUND: for some reason, omitting this constructor
|
||||
// causes errors with boost 1.59 and OSX
|
||||
// needed for variant equality comparison
|
||||
inline bool operator==(const CpuPlace&) const { return true; }
|
||||
|
||||
inline bool operator!=(const CpuPlace&) const { return false; }
|
||||
};
|
||||
|
||||
struct GpuPlace {
|
||||
GpuPlace(int d) : device(d) {}
|
||||
|
||||
// needed for variant equality comparison
|
||||
inline bool operator==(const GpuPlace& o) const { return device == o.device; }
|
||||
|
||||
inline bool operator!=(const GpuPlace& o) const { return !(*this == o); }
|
||||
|
||||
GpuPlace() : GpuPlace(0) {}
|
||||
int device;
|
||||
};
|
||||
|
||||
class IsGpuPlace : public boost::static_visitor<bool> {
|
||||
public:
|
||||
bool operator()(const CpuPlace&) const { return false; }
|
||||
|
||||
bool operator()(const GpuPlace& gpu) const { return true; }
|
||||
};
|
||||
|
||||
typedef boost::variant<GpuPlace, CpuPlace> Place;
|
||||
|
||||
void set_place(const Place&);
|
||||
|
||||
const Place& get_place();
|
||||
|
||||
const GpuPlace default_gpu();
|
||||
const CpuPlace default_cpu();
|
||||
|
||||
bool is_gpu_place(const Place&);
|
||||
bool is_cpu_place(const Place&);
|
||||
bool places_are_same_class(const Place&, const Place&);
|
||||
|
||||
std::ostream& operator<<(std::ostream&, const majel::Place&);
|
||||
|
||||
} // namespace majel
|
@ -0,0 +1,10 @@
|
||||
file(GLOB_RECURSE ALL_TEST_FILES RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "*.cpp" "*.cc")
|
||||
|
||||
add_executable(majel_tests ${ALL_TEST_FILES})
|
||||
add_dependencies(majel_tests majel)
|
||||
target_link_libraries(majel_tests
|
||||
${Boost_LIBRARIES}
|
||||
${GTEST_LIBRARIES}
|
||||
majel
|
||||
)
|
||||
add_test(majel_tests majel_tests)
|
@ -0,0 +1,40 @@
|
||||
#include "majel/place.h"
|
||||
#include <sstream>
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
TEST(Place, Equality) {
|
||||
majel::CpuPlace cpu;
|
||||
majel::GpuPlace g0(0), g1(1), gg0(0);
|
||||
|
||||
EXPECT_EQ(cpu, cpu);
|
||||
EXPECT_EQ(g0, g0);
|
||||
EXPECT_EQ(g1, g1);
|
||||
EXPECT_EQ(g0, gg0);
|
||||
|
||||
EXPECT_NE(g0, g1);
|
||||
|
||||
EXPECT_TRUE(majel::places_are_same_class(g0, gg0));
|
||||
EXPECT_FALSE(majel::places_are_same_class(g0, cpu));
|
||||
}
|
||||
|
||||
TEST(Place, Default) {
|
||||
EXPECT_TRUE(majel::is_gpu_place(majel::get_place()));
|
||||
EXPECT_TRUE(majel::is_gpu_place(majel::default_gpu()));
|
||||
EXPECT_TRUE(majel::is_cpu_place(majel::default_cpu()));
|
||||
|
||||
majel::set_place(majel::CpuPlace());
|
||||
EXPECT_TRUE(majel::is_cpu_place(majel::get_place()));
|
||||
}
|
||||
|
||||
TEST(Place, Print) {
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << majel::GpuPlace(1);
|
||||
EXPECT_EQ("GpuPlace(1)", ss.str());
|
||||
}
|
||||
{
|
||||
std::stringstream ss;
|
||||
ss << majel::CpuPlace();
|
||||
EXPECT_EQ("CpuPlace", ss.str());
|
||||
}
|
||||
}
|
@ -0,0 +1,6 @@
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
::testing::InitGoogleTest(&argc, argv);
|
||||
return RUN_ALL_TESTS();
|
||||
}
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue