update design

feature/design_of_v2_layer_converter
typhoonzero 8 years ago
parent 39d6d1c2e9
commit 1fa58b005a

@ -17,12 +17,16 @@ A training job will be created once user asks Paddle cloud to train a model. The
1. the *master process*, which dispatches tasks to 1. the *master process*, which dispatches tasks to
1. one or more *trainer processes*, which run distributed training and synchronize gradients/models via 1. one or more *trainer processes*, which run distributed training and synchronize gradients/models via
1. one or more *parameter server processes*, where each holds a shard of the global model. 1. one or more *parameter server processes*, where each holds a shard of the global model, and receive the uploaded gradients from every *trainer process*, so they can run the optimize functions to update their parameters.
Their relation is illustrated in the following graph: Their relation is illustrated in the following graph:
<img src="src/paddle-model-sharding.png"/> <img src="src/paddle-model-sharding.png"/>
By coordinate these processes, paddle can complete the procedure of training neural networks using SGD. Paddle can support both "synchronize SGD" and "asynchronize SGD".
When training with "sync SGD", paddle parameter servers use barriers to wait for all trainers to finish gradients update. When using "async SGD", parameter servers would not wait for all trainers, so training and parameter optimize will run in parallel. parameter servers will not depend on each other, they will receive the gradients update in parrallel; Also trainers will not depend on each other, run training jobs in parrallel. Using asyc SGD will be faster when training, but parameters on one of the parameter server will be newer than the other, but this will introduce more Randomness.
### Master Process ### Master Process
The master process will: The master process will:
@ -130,7 +134,7 @@ When the trainer is started by the Kubernetes, it executes the following steps a
If trainer's etcd lease expires, it will try set key `/trainer/<unique ID>` again so that the master process can discover the trainer again. If trainer's etcd lease expires, it will try set key `/trainer/<unique ID>` again so that the master process can discover the trainer again.
Whenever a trainer fails, the master process is responsible to schedule the failed task back to "todo queue". then kubernetes will try to start the trainer somewhere else, then the recovered trainer will try to fetch new task to continue the training. When a trainer fails, Kuberentes would try to restart it. The recovered trainer would fetch tasks from the TODO queue and go on training.
### Parameter Server Process ### Parameter Server Process

@ -1,33 +1,5 @@
# Paddle大规模分布式训练设计
## 概览
参考[这里](./README.md)
## 分布式训练架构
常见的深度学习分布式训练的架构如图:
<img src="src/trainer.png" width="500"/>
为了完成一个深度学习的训练任务集群中会运行多个trainer和parameter server每个trainer启动时会先尝试从parameter server集群下载最新的参数然后以mini-batch为单位读取训练数据集中的一部分数据(Data shard)。trainer会在训练过程中持续与parameter server通讯上传计算出来的梯度以及下载最新的模型。
每个parameter server保存所有parameter的一个分片(Global model shard)并负责接受所有trainer发送的梯度完成SGD和优化算法然后发送更新后的parameter到每个trainer。
这样通过trainer和parameter server的分布式协作可以完成神经网络的SGD方法的训练。Paddle可以同时支持同步SGD(synchronize SGD)和异步SGD(asynchronize SGD)。
在使用同步SGD训练神经网络时Paddle使用同步屏障(barrier)使梯度的提交和参数的更新按照顺序方式执行。在异步SGD中则并不会等待所有trainer提交梯度才更新参数这样极大的提高了计算的并行性parameter server之间不相互依赖并行的接收梯度和更新参数parameter server也不会等待trainer全部都提交梯度之后才开始下一步trainer之间也不会相互依赖并行的执行模型的训练。可以看出虽然异步SGD方式会提高参数更新并行度, 但是并不能保证参数同步更新在任意时间某一台parameter server上保存的参数可能比另一台要更新与同步SGD相比梯度会有噪声。
在上面的分布式计算模型中使用异步SGD比同步SGD可以一定程度的提供训练任务的容灾性。假设在某一时刻一个trainer进程停止工作其他的trainer仍然可以完成对部分数据的训练。
参考上面所描述的Paddle实现细节可以进一步的优化以下方面
1. 目前模型的参数是保存在parameter server进程的内存中的。在同步SGD或异步SGD训练过程中任意一台parameter server不能异常退出否则参数丢失训练不能继续执行。需要考虑每个模型分片(model shard)保存多个副本(replica)防止parameter server单点故障。
1. 不能在一个训练任务中动态的增加或减少Trainer个数或parameter个数异步SGD是否可以增加Trainer?
1. 在同步SGD训练过程中需要保证参数更新满足事务性操作。即可能在更新参数过程中存放这个参数的shard所在的服务器故障就需要rollback并重新更新这个参数shard的其他存活副本。
1. 为了支持大量的训练任务和使用模型的应用在一个集群上,需要支持训练任务节点的伸缩。
1. 支持训练任务的前置任务和后置任务,支持训练任务的定时调度和对在线流式数据的处理
## 模型参数检查点(Checkpointing) ## 模型参数检查点(Checkpointing)
模型数据检查点的实现可以有效的避免parameter server的单点或多点同时故障。模型参数检查点通过定期向磁盘上保存一份存储在parameter server内存中的模型数据的完整镜像来保证训练过程可以从中间状态重新启动。在一个不可中断并缺少备份的训练任务中可以通过阶段性的保存每个parameter server的数据快照(snapshot)到 ***分布式存储服务/分布式存储挂载点*** 达到容灾的目的比如每隔10分钟或1小时保存最新的快照,并删除更早的快照。在出现单点故障时,只需要恢复这台节点,或者将这台节点迁移到另一个节点并启动即可恢复训练任务。 模型数据检查点的实现可以有效的避免parameter server的单点或多点同时故障。模型参数检查点通过定期向磁盘上保存一份存储在parameter server内存中的模型数据的完整镜像来保证训练过程可以从中间状态重新启动。在一个不可中断并缺少备份的训练任务中可以通过阶段性的保存每个parameter server的数据快照(snapshot)到 ***分布式存储服务*** 达到容灾的目的比如每隔10分钟最新的快照并删除更早的快照。在出现单点故障时只需要恢复这台节点或者将这台节点迁移到另一个节点并启动即可恢复训练任务。
<img src="src/checkpointing.png" width="500"/> <img src="src/checkpointing.png" width="500"/>
@ -36,18 +8,15 @@
说明: 说明:
* parameter server在集群中启动后自动挂载分布式存储目录并把快照保存到这个目录下。 * parameter server在集群中启动后自动挂载分布式存储目录并把快照保存到这个目录下。
* 所有parameter server和trainer在etcd上注册自己的id节点为TTL节点`/ps/[id]`和`/trainer/[id]`,并保持心跳。
* ***注trainer在故障恢复后master会将失败的task重新分配给恢复的trainer执行。这样会引入更大的随机性。***
* ***注parameter server在保存检查点时利用了Linux内核的“写时复制”技术在fork的进程中保存检查点原进程可以继续接收trainer的梯度更新请求而不影响检查点数据的保存。*** * ***注parameter server在保存检查点时利用了Linux内核的“写时复制”技术在fork的进程中保存检查点原进程可以继续接收trainer的梯度更新请求而不影响检查点数据的保存。***
* ***注每个parameter server的检查点各自独立保存暂时不考虑多个parameter server同步的保存一个特定时间点的全局检查点同样会引入随机性。*** * ***注每个parameter server的检查点各自独立保存暂时不考虑多个parameter server同步的保存一个特定时间点的全局检查点因为这样做也没法保证消除随机性。***
检查点保存程序流程: 检查点保存程序流程:
1. 如果满足条件""每个pass或每n个mini-batch"时parameter server会`fork`自己,子进程中执行保存检查点任务,父进程继续工作。如果已经有子进程在进行保存检查点工作,则忽略。 1. 如果满足条件""每个pass或每n个mini-batch"时parameter server会锁住保存parameter的内存开始保存检查点。如果已经正在执行保存检查点的任务,则忽略。
2. parameter server生成一个UUID向指定的目录中一个新的文件文件名为此UUID写入快照数据。在快照写入完成后计算这个文件的MD5 sum。然后在etcd的`/checkpoints/[pserver_id]`中写入json内容`{"uuid": [UUID], "md5", "MD5 sum", "timestamp": xxxx}`。 2. parameter server生成一个UUID向指定的目录中一个新的文件文件名为此UUID写入快照数据。在快照写入完成后计算这个文件的MD5 sum。然后在etcd的`/checkpoints/[pserver_id]`中写入json内容`{"uuid": [UUID], "md5", "MD5 sum", "timestamp": xxxx}`。
3. 删除磁盘目录中不是当前uuid的快照文件。 3. 删除磁盘目录中不是当前uuid的快照文件。
4. 关闭fork出来的进程 4. 释放对paramters内存的锁定
这里需要用户额外注意在您的实际环境中训练任务的运行可能会占满trainer和parameter server之间的网络带宽如果parameter server此时还需要通过网络访问分布式存储以保存快照可能会造成网络拥塞而出现阶段性的运行停滞。 这里需要用户额外注意在您的实际环境中训练任务的运行可能会占满trainer和parameter server之间的网络带宽如果parameter server此时还需要通过网络访问分布式存储以保存快照可能会造成网络拥塞而出现阶段性的运行停滞。

@ -7,7 +7,9 @@
* Paddle训练任务 * Paddle训练任务
* 在线模型预测服务 * 在线模型预测服务
<img src="src/data_dispatch.png" width="500"/> <img src="src/paddle-cloud-in-data-center.png" width="500"/>
在上图中显示了在一个实际生产环境中的应用人脸识别的数据流图。生产环境的日志数据会通过实时流的方式Kafka和离线数据的方式(HDFS)存储,并在集群中运行多个分布式数据处理任务,比如流式数据处理(online data process),离线批处理(offline data process)完成数据的预处理提供给paddle作为训练数据。用于也可以上传labeled data到分布式存储补充训练数据。在paddle之上运行的深度学习训练输出的模型会提供给在线人脸识别的应用使用。
### 训练数据的存储 ### 训练数据的存储
@ -15,7 +17,7 @@
在Kubernetes上运行的不同的计算框架可以通过Volume或PersistentVolume挂载存储空间到每个容器中。 在Kubernetes上运行的不同的计算框架可以通过Volume或PersistentVolume挂载存储空间到每个容器中。
存储中的共享位置需要保存PaddlePaddle book中的所有dataset数据并且可以被提交的job直接使用。 GlusterFS存储系统中的公开目录需要保存一些预置的公开数据集比如MNIST, BOW, imagenet数据集等并且可以被提交的job直接使用。
### 上传训练文件 ### 上传训练文件
@ -25,15 +27,15 @@
paddle upload train_data.list paddle upload train_data.list
``` ```
其中`.list`文件描述了训练数据的文件和对应的label对于图像类数据`.list文件`样例如下每一行包含了图片文件的路径和其label 其中`.list`文件描述了训练数据的文件和对应的label对于图像类数据`.list文件`样例如下每一行包含了图片文件的路径和其label用tab分隔开
``` ```
/data/image1.jpg 1 ./data/image1.jpg 1
/data/image1.jpg 5 ./data/image2.jpg 5
/data/image1.jpg 2 ./data/image3.jpg 2
/data/image1.jpg 5 ./data/image4.jpg 5
/data/image1.jpg 1 ./data/image5.jpg 1
/data/image1.jpg 8 ./data/image6.jpg 8
... ...
``` ```
@ -48,20 +50,26 @@ L&apos; inflation accélérée , mesurée dans la zone euro , est due principale
### 使用reader ### 使用reader
使用v2 API编写训练任务是可以编写如下简单的reader,返回文件中的各列,然后在调用`trainer.train()`时传入,完成训练数据的读取: 用户在使用v2 API编写训练任务时可以使用paddle内置的reader完成对GlusterFS存储中的训练数据的读取,返回文件中的各列,然后在调用`trainer.train()`时传入,完成训练数据的读取:
```python ```python
def train(): reader = paddle.dist.reader("dataset-name")
fp = open("/glusterfs/mount/dir/yourfile_%d.list" % TRAINER_ID, "r") batch_reader = paddle.batch(paddle.dataset.mnist.train(), 128)
trainer.train(batch_reader, ...)
```
def reader(): trainer.train内部会获取reader的内容
for l in fp:
yield l[:-1].split("\t")
return reader
``` ```
def paddle.train(batch_reader):
r = batch_reader() # create a interator for one pass of data
for batch in r:
# train
```
这里面batch是含有128个data instance的mini-batch。每一个data instance会是一个tupletuple元素的顺序与`.list`文件文件中每一列的顺序是一致的。每一个data instance会是(raw_image_file_binary_data, label)。其中raw_image_file_binary_data是对应图像文件的没有解码的原始二进制数据用户需要自己解码。label是文本类型比如“1“”2“这里用户需要的其实是整形用户需要自己转换成整形。
## TODO ## TODO
### 支持将数据合并成内部的文件格式(key-value)方便sharding与顺序读取
### 支持用户自定义的数据预处理job ### 支持用户自定义的数据预处理job
### 支持SSTable格式的key-value数据

Loading…
Cancel
Save