Merge pull request #1620 from helinwang/architecture
paddle dist architecture (draft)release/0.10.0
commit
b088e80820
@ -0,0 +1,172 @@
|
|||||||
|
# Design Doc: Distributed Training
|
||||||
|
|
||||||
|
## Objective
|
||||||
|
|
||||||
|
In [this slides](https://www.slideshare.net/cxwangyi/paddlepaddle-a-complete-solution-for-businesses), we explained that we'd like PaddlePaddle running on general-purpose clusters like those managed by Kubernetes, so to address demands for AI from both Internet and non-Internet industries.
|
||||||
|
|
||||||
|
This poses technical challenges to PaddlePaddle:
|
||||||
|
|
||||||
|
1. Support fault-recovery.
|
||||||
|
1. Support both offline and online training.
|
||||||
|
1. [Serverless computing](https://en.wikipedia.org/wiki/Serverless_computing) of distributed training.
|
||||||
|
|
||||||
|
|
||||||
|
## Training Job
|
||||||
|
|
||||||
|
A training job will be created once user asks Paddle cloud to train a model. The training job is made up of different processes that collaboratively consume data and produce a trained model. There are three kinds of processes:
|
||||||
|
|
||||||
|
1. the *master process*, which dispatches tasks to
|
||||||
|
1. one or more *trainer processes*, which run distributed training and synchronize gradients/models via
|
||||||
|
1. one or more *parameter server processes*, where each holds a shard of the global model.
|
||||||
|
|
||||||
|
Their relation is illustrated in the following graph:
|
||||||
|
|
||||||
|
<img src="src/paddle-model-sharding.png"/>
|
||||||
|
|
||||||
|
### Master Process
|
||||||
|
|
||||||
|
The master process will:
|
||||||
|
|
||||||
|
- Partition a dataset into [tasks](#task) and dispatch tasks to trainers.
|
||||||
|
- Keep track of training progress on the dataset with [task queue](#task-queue). A training job will iterate on the dataset for a full pass until it goes into next pass.
|
||||||
|
|
||||||
|
|
||||||
|
#### Task
|
||||||
|
|
||||||
|
A task is a data shard to be trained. The total number of tasks will be much bigger than the total number of trainers. The number of data instances inside a task will be much bigger than the mini-batch size.
|
||||||
|
|
||||||
|
#### Task Queue
|
||||||
|
|
||||||
|
The master process has three task queues to track training progress. As illustrated in the graph below, Job A and Job B both have one master process. Each master process has three task queues.
|
||||||
|
|
||||||
|
<img src="src/paddle-task-queues.png"/>
|
||||||
|
|
||||||
|
- The todo queue holds tasks to be dispatched. When a job starts, the master process fills in the todo queue with all tasks.
|
||||||
|
- The pending queue holds tasks that are currently training by trainers.
|
||||||
|
- the done queue holds tasks that are already trained.
|
||||||
|
|
||||||
|
The life cycle of a single task is illustrated below:
|
||||||
|
|
||||||
|
<img src="src/paddle-task-states.png"/>
|
||||||
|
|
||||||
|
1. When a new pass of training starts, all tasks will be placed in the todo queue.
|
||||||
|
1. The master process will dispatch few tasks to each trainer at a time, puts them in the pending queue and waits for completion.
|
||||||
|
1. The trainer will work on its tasks and tell the master process once a task is completed. The master process will dispatch a new task to that trainer.
|
||||||
|
1. If a task timeout. the master process will move it back to the todo queue. The timeout count will increase by one. If the timeout count is above a threshold, the task is likely to cause a trainer to crash, so it will be discarded.
|
||||||
|
1. The master process will move completed task to the done queue. When the todo queue is empty, the master process will start a new pass by moving all tasks in the done queue to todo queue and reset the timeout counter of all tasks to zero.
|
||||||
|
|
||||||
|
### Trainer Process
|
||||||
|
|
||||||
|
The trainer process will:
|
||||||
|
|
||||||
|
- Receive tasks from the master.
|
||||||
|
- Work on the tasks: calculate and upload gradient to parameter servers, and update local model by downloading new parameters from parameter servers.
|
||||||
|
|
||||||
|
### Parameter Server Process
|
||||||
|
|
||||||
|
Parameter server processes hold the parameters collaboratively. The parameters are partitioned on different parameter servers.
|
||||||
|
|
||||||
|
The parameter server will:
|
||||||
|
|
||||||
|
- Receive gradient from the trainers, update its parameters, and give the trainers the latest parameters.
|
||||||
|
- Periodically save its parameters to distributed file system by overriding the previous save.
|
||||||
|
|
||||||
|
### Optimization Algorithms
|
||||||
|
|
||||||
|
The communication pattern between the trainers and the parameter servers depends on the category of optimization algorithm:
|
||||||
|
|
||||||
|
- Synchronous Stochastic Gradient Descent (sync-SGD)
|
||||||
|
|
||||||
|
Parameter server will wait for all trainer finish n-th mini-batch calculation and send their gradients before broadcasting new parameters to every trainer. Every trainer will wait for the new parameters before starting n+1-th mini-batch.
|
||||||
|
|
||||||
|
- Asynchronous Stochastic Gradient Descent (async-SGD)
|
||||||
|
|
||||||
|
There will no synchronization between different trainers, and parameter server updates its parameter as soon as it receives new gradient:
|
||||||
|
|
||||||
|
- Each trainer uploads its accumulated gradient every n mini-batches.
|
||||||
|
- Every m mini-batches, the trainer downloads new parameters from parameter server.
|
||||||
|
- n and m do not have to be equal.
|
||||||
|
|
||||||
|
## Fault Tolerant
|
||||||
|
|
||||||
|
The training job will pause if the master processes is dead, or any of the parameter server process is dead. They will be started by [Kubernetes](https://kubernetes.io/) and recover in few minutes. Please refer to [fault recovery](#fault-recovery).
|
||||||
|
|
||||||
|
The training job will continue to make progress if there is at least one training process running. The strategy depends on the type of optimization algorithm:
|
||||||
|
|
||||||
|
- sync-SGD
|
||||||
|
|
||||||
|
TODO
|
||||||
|
|
||||||
|
- async-SGD
|
||||||
|
|
||||||
|
Since async-SGD does not require synchronization between mini-batches, the system will by definition make process if at least one trainer is running.
|
||||||
|
|
||||||
|
## Fault Recovery
|
||||||
|
|
||||||
|
PaddlePaddle uses [etcd](https://github.com/coreos/etcd) to keep track of the states of processes. Because etcd is a distributed reliable key-value store, the restarted process can recover its states from etcd. The model parameters are periodically saved into distributed file system, so a restarted parameter server can recover its parameters from the saved file.
|
||||||
|
|
||||||
|
Now we will introduce how each process recovers from a failure, the graph below shows how etcd is used:
|
||||||
|
|
||||||
|
<img src="src/paddle-etcd.png"/>
|
||||||
|
|
||||||
|
### Master Process
|
||||||
|
|
||||||
|
When the master is started by the Kubernetes, it executes the following steps at startup:
|
||||||
|
|
||||||
|
1. Grabs a unique *master* lock in etcd, which prevents concurrent master instantiations.
|
||||||
|
1. Recovers the task queues from etcd if they already exist, otherwise, the master will create them.
|
||||||
|
1. Watches the trainer prefix keys `/trainer/` on etcd to find the live trainers.
|
||||||
|
1. Starts dispatching the tasks to the trainers, and updates task queue using an etcd transaction to ensure lock is held during the update.
|
||||||
|
|
||||||
|
The master process will kill itself if its etcd lease expires.
|
||||||
|
|
||||||
|
When the master process is dead for any reason, Kubernetes will restart it. It will be online again with all states recovered from etcd in few minutes.
|
||||||
|
|
||||||
|
### Trainer Process
|
||||||
|
|
||||||
|
When the trainer is started by the Kubernetes, it executes the following steps at startup:
|
||||||
|
|
||||||
|
1. Watches the available parameter server prefix keys `/ps/` on etcd and waits until the count of parameter servers reaches the desired count.
|
||||||
|
1. Generates a unique ID, and sets key `/trainer/<unique ID>` with its contact address as value. The key will be deleted when the lease expires, so the master will be aware of the trainer being online and offline.
|
||||||
|
1. Waits for tasks from the master to start training.
|
||||||
|
|
||||||
|
If trainer's etcd lease expires, it will try set key `/trainer/<unique ID>` again so that the master process can discover the trainer again.
|
||||||
|
|
||||||
|
### Parameter Server Process
|
||||||
|
|
||||||
|
When the parameter server is started by Kubernetes, it executes the following steps at startup:
|
||||||
|
|
||||||
|
1. Read desired total number of parameter servers from etcd `/ps_desired`
|
||||||
|
1. Search through etcd keys `/ps/<index>` (`/ps/0`, `/ps/1`, ...) to find the first non-existant key whose index is smaller than the total number of parameter servers. Set the key using a transaction to avoid concurrent writes. The parameter server's index is inferred from the key name.
|
||||||
|
|
||||||
|
The desired number of parameter servers is 3:
|
||||||
|
|
||||||
|
<img src="src/paddle-ps-0.png"/>
|
||||||
|
|
||||||
|
The third parameter server joined:
|
||||||
|
|
||||||
|
<img src="src/paddle-ps-1.png"/>
|
||||||
|
|
||||||
|
1. The parameter server can load parameters if there are already saved parameters in the save path (inferred from its index).
|
||||||
|
1. Now the parameter server is ready for the trainers' requests.
|
||||||
|
|
||||||
|
If the parameter server's etcd lease expires, the parameter server will kill itself.
|
||||||
|
|
||||||
|
|
||||||
|
## Dynamic Scaling
|
||||||
|
|
||||||
|
### Trainer Scaling
|
||||||
|
|
||||||
|
TODO
|
||||||
|
|
||||||
|
### Parameter Server Scaling
|
||||||
|
|
||||||
|
Not planned for v1.
|
||||||
|
|
||||||
|
## Training Dataset Format
|
||||||
|
|
||||||
|
TODO
|
||||||
|
|
||||||
|
## User Interface
|
||||||
|
|
||||||
|
TODO
|
Binary file not shown.
After Width: | Height: | Size: 55 KiB |
Binary file not shown.
After Width: | Height: | Size: 38 KiB |
After Width: | Height: | Size: 21 KiB |
After Width: | Height: | Size: 28 KiB |
Binary file not shown.
Binary file not shown.
After Width: | Height: | Size: 34 KiB |
Binary file not shown.
After Width: | Height: | Size: 18 KiB |
Loading…
Reference in new issue