In [this slides](https://www.slideshare.net/cxwangyi/paddlepaddle-a-complete-solution-for-businesses), we explained that we'd like PaddlePaddle running on general-purpose clusters like those managed by Kubernetes, so to address demands for AI from both Internet and non-Internet industries.
A training job will be created once user asks Paddle cloud to train a model. The training job is made up of different processes that collaboratively consume data and produce a trained model. There are three kinds of processes:
1. one or more *parameter server processes*, where each holds a shard of the global model, and receive the uploaded gradients from every *trainer process*, so they can run the optimize functions to update their parameters.
By coordinate these processes, paddle can complete the procedure of training neural networks using SGD. Paddle can support both "synchronize SGD" and "asynchronize SGD".
When training with "sync SGD", paddle parameter servers use barriers to wait for all trainers to finish gradients update. When using "async SGD", parameter servers would not wait for all trainers, so training and parameter optimize will run in parallel. parameter servers will not depend on each other, they will receive the gradients update in parrallel; Also trainers will not depend on each other, run training jobs in parrallel. Using asyc SGD will be faster when training, but parameters on one of the parameter server will be newer than the other, but this will introduce more Randomness.
- Keep track of training progress on the dataset with [task queue](#task-queue). A training job will iterate on the dataset for a full pass until it goes into next pass.
A task is a data shard to be trained. The total number of tasks will be much bigger than the total number of trainers. The number of data instances inside a task will be much bigger than the mini-batch size.
The master process has three task queues to track training progress. As illustrated in the graph below, Job A and Job B both have one master process. Each master process has three task queues.
1. The trainer will work on its tasks and tell the master process once a task is completed. The master process will dispatch a new task to that trainer.
1. If a task timeout. the master process will move it back to the todo queue. The timeout count will increase by one. If the timeout count is above a threshold, the task is likely to cause a trainer to crash, so it will be discarded.
1. The master process will move completed task to the done queue. When the todo queue is empty, the master process will start a new pass by moving all tasks in the done queue to todo queue and reset the timeout counter of all tasks to zero.
Parameter server will wait for all trainer finish n-th mini-batch calculation and send their gradients before broadcasting new parameters to every trainer. Every trainer will wait for the new parameters before starting n+1-th mini-batch.
The training job will pause if the master processes is dead, or any of the parameter server process is dead. They will be started by [Kubernetes](https://kubernetes.io/) and recover in few minutes. Please refer to [fault recovery](#fault-recovery).
The training job will continue to make progress if there is at least one training process running. The strategy depends on the type of optimization algorithm:
- sync-SGD
TODO
- async-SGD
Since async-SGD does not require synchronization between mini-batches, the system will by definition make process if at least one trainer is running.
PaddlePaddle uses [etcd](https://github.com/coreos/etcd) to keep track of the states of processes. Because etcd is a distributed reliable key-value store, the restarted process can recover its states from etcd. The model parameters are periodically saved into distributed file system, so a restarted parameter server can recover its parameters from the saved file.
When the master process is dead for any reason, Kubernetes will restart it. It will be online again with all states recovered from etcd in few minutes.
1. Watches the available parameter server prefix keys `/ps/` on etcd and waits until the count of parameter servers reaches the desired count.
1. Generates a unique ID, and sets key `/trainer/<unique ID>` with its contact address as value. The key will be deleted when the lease expires, so the master will be aware of the trainer being online and offline.
1. Search through etcd keys `/ps/<index>` (`/ps/0`, `/ps/1`, ...) to find the first non-existant key whose index is smaller than the total number of parameter servers. Set the key using a transaction to avoid concurrent writes. The parameter server's index is inferred from the key name.