Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into srl_api_v2

avx_docs
dangqingqing 8 years ago
commit db6dc9e2e6

@ -10,6 +10,7 @@
usage/cmd_parameter/index_cn.rst
usage/concepts/use_concepts_cn.rst
usage/cluster/cluster_train_cn.md
usage/k8s/k8s_basis_cn.md
usage/k8s/k8s_cn.md
usage/k8s/k8s_distributed_cn.md

@ -0,0 +1,75 @@
# Kubernetes 简介
[*Kubernetes*](http://kubernetes.io/)是Google开源的容器集群管理系统其提供应用部署、维护、扩展机制等功能利用Kubernetes能方便地管理跨机器运行容器化的应用。Kubernetes可以在物理机或虚拟机上运行且支持部署到[AWS](http://kubernetes.io/docs/getting-started-guides/aws)[Azure](http://kubernetes.io/docs/getting-started-guides/azure/)[GCE](http://kubernetes.io/docs/getting-started-guides/gce)等多种公有云环境。介绍分布式训练之前,需要对[Kubernetes](http://kubernetes.io/)有一个基本的认识下面先简要介绍一下本文用到的几个Kubernetes概念。
- [*Node*](http://kubernetes.io/docs/admin/node/) 表示一个Kubernetes集群中的一个工作节点这个节点可以是物理机或者虚拟机Kubernetes集群就是由node节点与master节点组成的。
- [*Pod*](http://kubernetes.io/docs/user-guide/pods/) 是一组(一个或多个)容器pod是Kubernetes的最小调度单元一个pod中的所有容器会被调度到同一个node上。Pod中的容器共享NETPIDIPCUTS等Linux namespace。由于容器之间共享NET namespace所以它们使用同一个IP地址可以通过*localhost*互相通信。不同pod之间可以通过IP地址访问。
- [*Job*](http://kubernetes.io/docs/user-guide/jobs/) 描述Kubernetes上运行的作业一次作业称为一个job通常每个job包括一个或者多个podsjob启动后会创建这些pod并开始执行一个程序等待这个程序执行成功并返回0则成功退出如果执行失败也可以配置不同的重试机制。
- [*Volume*](http://kubernetes.io/docs/user-guide/volumes/) 存储卷是pod内的容器都可以访问的共享目录也是容器与node之间共享文件的方式因为容器内的文件都是暂时存在的当容器因为各种原因被销毁时其内部的文件也会随之消失。通过volume就可以将这些文件持久化存储。Kubernetes支持多种volume例如hostPath(宿主机目录)gcePersistentDiskawsElasticBlockStore等。
- [*Namespaces*](https://kubernetes.io/docs/user-guide/namespaces/) 命名空间在kubernetes中创建的所有资源对象(例如上文的podjob)等都属于一个命名空间,在同一个命名空间中,资源对象的名字是唯一的,不同空间的资源名可以重复,命名空间主要为了对象进行逻辑上的分组便于管理。本文只使用了默认命名空间。
- [*PersistentVolume*](https://kubernetes.io/docs/user-guide/persistent-volumes/): 和[*PersistentVolumeClaim*](https://kubernetes.io/docs/user-guide/persistent-volumes/#persistentvolumeclaims)结合将外部的存储服务在Kubernetes中描述成为统一的资源形式便于存储资源管理和Pod引用。
# 部署Kubernetes集群
Kubernetes提供了多种集群部署的方案本文档内不重复介绍。这里给出集中常见的部署方法
- [*minikube*](https://kubernetes.io/docs/getting-started-guides/minikube/): 快速在本地启动一个单机的kubernetes服务器便于本地验证和测试。
- [*kubeadm*](http://kubernetes.io/docs/getting-started-guides/kubeadm/): 在不同操作系统,不同主机(Bare-Metal, AWS, GCE)条件下,快速部署集群。
- [*AWS EC2*](https://kubernetes.io/docs/getting-started-guides/aws/): 在aws上快速部署集群。
- [*Bare-Metal*](https://kubernetes.io/docs/getting-started-guides/centos/centos_manual_config/): 在物理机上手动部署。
可以参考[这个表格](https://kubernetes.io/docs/getting-started-guides/#table-of-solutions)选择适合您的场景的合适方案。
# 选择存储方案
容器不会保留在运行时生成的数据job或者应用程序在容器中运行时生成的数据会在容器销毁时消失。为了完成分布式机器学习训练任务需要有一个外部的存储服务来保存训练所需数据和训练输出。
常见的可选存储服务包括:
- [*NFS*](https://github.com/kubernetes/kubernetes/tree/master/examples/volumes/nfs): 可以将磁盘上某个目录共享给网络中其他机器访问。部署和配置比较简单可以用于小量数据的验证。不提供分布式存储高可用冗余等功能。NFS的部署方法可以参考[这里](http://www.tecmint.com/how-to-setup-nfs-server-in-linux/)。
- [*GlusterFS*](http://gluster.readthedocs.io/en/latest/Quick-Start-Guide/Quickstart/): 网络分布式文件系统可以在Kubernetes中按照[这个](https://github.com/kubernetes/kubernetes/tree/master/examples/volumes/glusterfs)例子使用。
- [*Ceph*](http://docs.ceph.com/docs/master/): 分布式文件系统支持rbdPOSIX API接口(ceph fs)和对象存储API参考[这里](https://kubernetes.io/docs/user-guide/volumes/#rbd)。
- [*MooseFS*](https://moosefs.com/documentation.html): 一个分布式的存储系统。需要先挂载到服务器Node上再通过kubernetes hostPath Volume挂载到容器中。
# 配置kubectl
## 安装kubectl
```
# OS X
curl -LO https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/darwin/amd64/kubectl
# Linux
curl -LO https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/linux/amd64/kubectl
# Windows
curl -LO https://storage.googleapis.com/kubernetes-release/release/$(curl -s https://storage.googleapis.com/kubernetes-release/release/stable.txt)/bin/windows/amd64/kubectl.exe
```
## 配置kubectl访问你的kubernetes集群
编辑`~/.kube/config`这个配置文件,修改`Master-IP`的地址。如果使用SSL认证则需要配置`certificate-authority`和`users`中的用户证书。如果是使用非SSL方式访问比如通过8080端口也可以去掉这些证书的配置。
```
apiVersion: v1
clusters:
- cluster:
certificate-authority: /path/to/ca.crt
server: https://[Master-IP]:443
name: minikube
contexts:
- context:
cluster: minikube
user: minikube
name: minikube
current-context: minikube
kind: Config
preferences: {}
users:
- name: minikube
user:
client-certificate: /path/to/apiserver.crt
client-key: /Users/wuyi/.minikube/apiserver.key
```

File diff suppressed because it is too large Load Diff

Binary file not shown.

Before

Width:  |  Height:  |  Size: 22 KiB

After

Width:  |  Height:  |  Size: 501 KiB

@ -47,6 +47,9 @@ void setUseGpu(bool useGpu);
/// Return true if this py_paddle is compiled in GPU Version
bool isGpuVersion();
/// Return FLAGS_trainer_count
int getTrainerCount();
/// The Error of IO Operation. Such as file not found, etc.
class IOError {};

@ -54,5 +54,7 @@ bool isGpuVersion() {
#endif
}
int getTrainerCount() { return FLAGS_trainer_count; }
static_assert(NUM_PARAMETER_TYPES == paddle::NUM_PARAMETER_TYPES,
"The Parameter Type should be same in core/api and core/common");

@ -26,6 +26,15 @@ class IScanner(object):
if not isinstance(self.input_type, dp2.InputType):
raise ValueError("input type should be dataprovider2.InputType")
self.pos = pos
# data_in_gpu is used to indicate whether to create argument on GPU
# or not in GPU mode. Now if using one thread (trainer_count=1),
# trainer uses NeuralNetwork which needs to create argument on GPU
# before calling forward function. So, set data_in_gpu to True.
# Otherwise, trainer uses MultiGradientMachine which will transfer
# data from CPU to GPU in the forward function, set data_in_gpu to
# False in this case.
self.data_in_gpu = swig_paddle.isUsingGpu(
) and swig_paddle.getTrainerCount() == 1
def scan(self, dat):
pass
@ -53,7 +62,8 @@ class DenseScanner(IScanner):
assert isinstance(argument, swig_paddle.Arguments)
if self.__mat__.dtype != numpy.float32:
self.__mat__ = self.__mat__.astype(numpy.float32)
m = swig_paddle.Matrix.createDenseFromNumpy(self.__mat__, True, False)
m = swig_paddle.Matrix.createDenseFromNumpy(self.__mat__, True,
self.data_in_gpu)
argument.setSlotValue(self.pos, m)
@ -75,10 +85,13 @@ class SparseBinaryScanner(IScanner):
def finish_scan(self, argument):
assert isinstance(argument, swig_paddle.Arguments)
m = swig_paddle.Matrix.createSparse(self.__height__,
self.input_type.dim,
len(self.__cols__),
len(self.__value__) == 0)
m = swig_paddle.Matrix.createSparse(
self.__height__,
self.input_type.dim,
len(self.__cols__),
len(self.__value__) == 0,
False, # trans
False) # TODO supoort GPU
assert isinstance(m, swig_paddle.Matrix)
m.sparseCopyFrom(self.__rows__, self.__cols__, self.__value__)
argument.setSlotValue(self.pos, m)
@ -102,7 +115,7 @@ class IndexScanner(IScanner):
self.__ids__.append(dat)
def finish_scan(self, argument):
ids = swig_paddle.IVector.create(self.__ids__)
ids = swig_paddle.IVector.create(self.__ids__, self.data_in_gpu)
assert isinstance(argument, swig_paddle.Arguments)
argument.setSlotIds(self.pos, ids)

@ -235,4 +235,8 @@ class DataFeederTest(unittest.TestCase):
if __name__ == '__main__':
api.initPaddle("--use_gpu=0")
unittest.main()
suite = unittest.TestLoader().loadTestsFromTestCase(DataFeederTest)
unittest.TextTestRunner().run(suite)
if api.isGpuVersion():
api.setUseGpu(True)
unittest.main()

@ -21,7 +21,21 @@ import layer as v2_layer
__all__ = ['Topology']
def __flatten__(lis):
"""
Given a list, possibly nested to any level, return it flattened.
"""
new_lis = []
for item in lis:
if isinstance(item, collections.Sequence):
new_lis.extend(__flatten__(item))
else:
new_lis.append(item)
return new_lis
def __bfs_travel__(callback, *layers):
layers = __flatten__(layers)
for each_layer in layers:
__break__ = callback(each_layer)
if __break__:

Loading…
Cancel
Save