@ -0,0 +1,42 @@
|
||||
# 使用fabric启动集群训练
|
||||
|
||||
## 准备一个Linux集群
|
||||
可以在`paddle/scripts/cluster_train_v2/fabric/docker_cluster`目录下,执行`kubectl -f ssh_servers.yaml`启动一个测试集群,并使用`kubectl get po -o wide`获得这些节点的IP地址。
|
||||
|
||||
## 启动集群作业
|
||||
|
||||
`paddle.py` 提供了自动化脚本来启动不同节点中的所有 PaddlePaddle 集群进程。默认情况下,所有命令行选项可以设置为 `paddle.py` 命令选项并且 `paddle.py` 将透明、自动地将这些选项应用到 PaddlePaddle 底层进程。
|
||||
|
||||
`paddle.py` 为方便作业启动提供了两个独特的命令选项。
|
||||
|
||||
- `job_dispatch_package` 设为本地 `workspace` 目录,它将被分发到 `conf.py` 中设置的所有节点。它有助于帮助频繁修改和访问工作区文件的用户减少负担,否则频繁的多节点工作空间部署可能会很麻烦。
|
||||
- `job_workspace` 设为已部署的工作空间目录,`paddle.py` 将跳过分发阶段直接启动所有节点的集群作业。它可以帮助减少分发延迟。
|
||||
|
||||
`cluster_train/run.sh` 提供了命令样例来运行 `doc/howto/usage/cluster/src/word2vec` 集群任务,只需用您定义的目录修改 `job_dispatch_package` 和 `job_workspace`,然后:
|
||||
```
|
||||
sh run.sh
|
||||
```
|
||||
|
||||
集群作业将会在几秒后启动。
|
||||
|
||||
## 终止集群作业
|
||||
`paddle.py`能获取`Ctrl + C` SIGINT 信号来自动终止它启动的所有进程。只需中断 `paddle.py` 任务来终止集群作业。如果程序崩溃你也可以手动终止。
|
||||
|
||||
## 检查集群训练结果
|
||||
详细信息请检查 $workspace/log 里的日志,每一个节点都有相同的日志结构。
|
||||
|
||||
`paddle_trainer.INFO`
|
||||
提供几乎所有训练的内部输出日志,与本地训练相同。这里检验运行时间模型的收敛。
|
||||
|
||||
`paddle_pserver2.INFO`
|
||||
提供 pserver 运行日志,有助于诊断分布式错误。
|
||||
|
||||
`server.log`
|
||||
提供 parameter server 进程的 stderr 和 stdout。训练失败时可以检查错误日志。
|
||||
|
||||
`train.log`
|
||||
提供训练过程的 stderr 和 stdout。训练失败时可以检查错误日志。
|
||||
|
||||
## 检查模型输出
|
||||
运行完成后,模型文件将被写入节点 0 的 `output` 目录中。
|
||||
工作空间中的 `nodefile` 表示当前集群作业的节点 ID。
|
@ -0,0 +1,43 @@
|
||||
# Cluster Training Using Fabric
|
||||
|
||||
## Prepare a Linux cluster
|
||||
|
||||
Run `kubectl -f ssh_servers.yaml` under the directory: `paddle/scripts/cluster_train_v2/fabric/docker_cluster` will launch a demo cluster. Run `kubectl get po -o wide` to get IP addresses of these nodes.
|
||||
|
||||
## Launching Cluster Job
|
||||
`paddle.py` provides automatical scripts to start all PaddlePaddle cluster processes in different nodes. By default, all command line options can be set as `paddle.py` command options and `paddle.py` will transparently and automatically set these options to PaddlePaddle lower level processes.
|
||||
|
||||
`paddle.py`provides two distinguished command option for easy job launching.
|
||||
|
||||
- `job_dispatch_package` set it with local `workspace` directory, it will be dispatched to all nodes which is set in `conf.py`. It could be helpful for frequently manipulating workspace files. otherwise, frequent multi-nodes workspace deployment is very annoying.
|
||||
- `job_workspace` set it with already deployed workspace directory, `paddle.py` will skip dispatch stage to directly launch cluster job with all nodes. It could help to reduce heavy
|
||||
dispatch latency.
|
||||
|
||||
`cluster_train/run.sh` provides command line sample to run `demo/recommendation` cluster job, just modify `job_dispatch_package` and `job_workspace` with your defined directory, then:
|
||||
```
|
||||
sh run.sh
|
||||
```
|
||||
|
||||
The cluster Job will start in several seconds.
|
||||
|
||||
## Kill Cluster Job
|
||||
`paddle.py` can capture `Ctrl + C` SIGINT signal to automatically kill all processes launched by it. So just stop `paddle.py` to kill cluster job. You should manually kill the job if the program crashed.
|
||||
|
||||
## Check Cluster Training Result
|
||||
Check log in $workspace/log for details, each node owns same log structure.
|
||||
|
||||
`paddle_trainer.INFO`
|
||||
It provides almost all internal output log for training, same as local training. Check runtime model convergence here.
|
||||
|
||||
`paddle_pserver2.INFO`
|
||||
It provides parameter server running log, which could help to diagnose distributed error.
|
||||
|
||||
`server.log`
|
||||
It provides stderr and stdout of parameter server process. Check error log if training crashes.
|
||||
|
||||
`train.log`
|
||||
It provides stderr and stdout of trainer process. Check error log if training crashes.
|
||||
|
||||
## Check Model Output
|
||||
After one pass finished, model files will be written in `output` directory in node 0.
|
||||
`nodefile` in workspace indicates the node id of current cluster job.
|
@ -0,0 +1,41 @@
|
||||
# Cluster Training Using OpenMPI
|
||||
|
||||
## Prepare an OpenMPI cluster
|
||||
|
||||
Run the following command to start a 3-node MPI cluster and one "head" node.
|
||||
|
||||
```bash
|
||||
cd paddle/scripts/cluster_train_v2/openmpi/docker_cluster
|
||||
kubectl create -f head.yaml
|
||||
kubectl create -f mpi-nodes.yaml
|
||||
```
|
||||
|
||||
Then you can log in to every OpenMPI node using ssh without input any passwords.
|
||||
|
||||
## Launching Cluster Job
|
||||
|
||||
Follow the steps to launch a PaddlePaddle training job in OpenMPI cluster:\
|
||||
|
||||
```bash
|
||||
# find out node IP addresses
|
||||
kubectl get po -o wide
|
||||
# generate a "machines" file containing node IP addresses
|
||||
kubectl get po -o wide | grep nodes | awk '{print $6}' > machines
|
||||
# copy necessary files onto "head" node
|
||||
scp -i ssh/id_rsa.mpi.pub machines prepare.py train.py start_mpi_train.sh tutorial@[headIP]:~
|
||||
# login to head node using ssh
|
||||
ssh -i ssh/id_rsa.mpi.pub tutorial@[headIP]
|
||||
# --------------- in head node ---------------
|
||||
# prepare training data
|
||||
python prepare.py
|
||||
# copy training data and dict file to MPI nodes
|
||||
cat machines | xargs -i scp word_dict.pickle train.py start_mpi_train.sh machines {}:/home/tutorial
|
||||
# creat a directory for storing log files
|
||||
mpirun -hostfile machines -n 3 mkdir /home/tutorial/logs
|
||||
# copy training data to every node
|
||||
scp train.txt-00000 test.txt-00000 [node1IP]:/home/tutorial
|
||||
scp train.txt-00001 test.txt-00001 [node2IP]:/home/tutorial
|
||||
scp train.txt-00002 test.txt-00002 [node3IP]:/home/tutorial
|
||||
# start the job
|
||||
mpirun -hostfile machines -n 3 /home/tutorial/start_mpi_train.sh
|
||||
```
|
@ -0,0 +1,7 @@
|
||||
FROM paddledev/paddle:cpu-latest
|
||||
|
||||
MAINTAINER zjsxzong89@gmail.com
|
||||
|
||||
COPY start.sh /root/
|
||||
COPY start_paddle.py /root/
|
||||
CMD ["bash"," -c","/root/start.sh"]
|
After Width: | Height: | Size: 116 KiB |
After Width: | Height: | Size: 236 KiB |
After Width: | Height: | Size: 225 KiB |
After Width: | Height: | Size: 421 KiB |
@ -0,0 +1,7 @@
|
||||
FROM alpine
|
||||
|
||||
RUN apk update && apk upgrade && apk add coreutils
|
||||
ADD quick_start /quick_start
|
||||
ADD get_data.sh /bin/
|
||||
RUN chmod +x /bin/get_data.sh
|
||||
ENTRYPOINT ["/bin/get_data.sh"]
|
@ -0,0 +1,6 @@
|
||||
To build PaddlePaddle data preparation image in tutorial [Distributed PaddlePaddle Training on AWS with Kubernetes](../../k8s_aws_en.md), run following commands:
|
||||
|
||||
```
|
||||
cp -r ../../../../../../demo/quick_start .
|
||||
docker build . -t prepare-data-image-name
|
||||
```
|
@ -0,0 +1,26 @@
|
||||
#!/bin/sh
|
||||
|
||||
out_dir=$OUT_DIR
|
||||
split_count=$SPLIT_COUNT
|
||||
|
||||
set -e
|
||||
|
||||
mkdir -p $out_dir
|
||||
cp -r /quick_start $out_dir/
|
||||
|
||||
mkdir -p $out_dir/0/data
|
||||
cd $out_dir/0/data
|
||||
wget http://paddlepaddle.bj.bcebos.com/demo/quick_start_preprocessed_data/preprocessed_data.tar.gz
|
||||
tar zxvf preprocessed_data.tar.gz
|
||||
rm preprocessed_data.tar.gz
|
||||
|
||||
split -d --number=l/$split_count -a 5 train.txt train.
|
||||
mv train.00000 train.txt
|
||||
|
||||
cd $out_dir
|
||||
end=$(expr $split_count - 1)
|
||||
for i in $(seq 1 $end); do
|
||||
mkdir -p $i/data
|
||||
cp -r 0/data/* $i/data
|
||||
mv $i/data/train.`printf %05d $i` $i/data/train.txt
|
||||
done;
|
@ -0,0 +1,6 @@
|
||||
FROM paddledev/paddle:cpu-latest
|
||||
|
||||
COPY start.sh /root/
|
||||
COPY start_paddle.py /root/
|
||||
RUN chmod +x /root/start.sh
|
||||
CMD ["bash"," -c","/root/start.sh"]
|
@ -0,0 +1,5 @@
|
||||
To build PaddlePaddle training image in tutorial [Distributed PaddlePaddle Training on AWS with Kubernetes](../../k8s_aws_en.md), run following command:
|
||||
|
||||
```
|
||||
docker build . -t train-image-name
|
||||
```
|
@ -0,0 +1,19 @@
|
||||
#!/bin/sh
|
||||
|
||||
set -eu
|
||||
|
||||
jobconfig=${JOB_PATH}"/"${JOB_NAME}"/"${TRAIN_CONFIG_DIR}
|
||||
cd /root
|
||||
cp -rf $jobconfig/* .
|
||||
|
||||
python /root/start_paddle.py \
|
||||
--dot_period=10 \
|
||||
--ports_num=$CONF_PADDLE_PORTS_NUM \
|
||||
--ports_num_for_sparse=$CONF_PADDLE_PORTS_NUM_SPARSE \
|
||||
--log_period=50 \
|
||||
--num_passes=10 \
|
||||
--trainer_count=$TRAINER_COUNT \
|
||||
--saving_period=1 \
|
||||
--local=0 \
|
||||
--config=trainer_config.lr.py \
|
||||
--use_gpu=0
|
@ -0,0 +1,170 @@
|
||||
#!/usr/bin/python
|
||||
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import requests
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import argparse
|
||||
|
||||
# configuration for cluster
|
||||
API = "/api/v1/namespaces/"
|
||||
JOBSELECTOR = "labelSelector=job-name="
|
||||
JOB_PATH = os.getenv("JOB_PATH") + "/" + os.getenv("JOB_NAME")
|
||||
JOB_PATH_OUTPUT = JOB_PATH + "/output"
|
||||
JOBNAME = os.getenv("JOB_NAME")
|
||||
NAMESPACE = os.getenv("JOB_NAMESPACE")
|
||||
PADDLE_NIC = os.getenv("CONF_PADDLE_NIC")
|
||||
PADDLE_PORT = os.getenv("CONF_PADDLE_PORT")
|
||||
PADDLE_PORTS_NUM = os.getenv("CONF_PADDLE_PORTS_NUM")
|
||||
PADDLE_PORTS_NUM_SPARSE = os.getenv("CONF_PADDLE_PORTS_NUM_SPARSE")
|
||||
PADDLE_SERVER_NUM = os.getenv("CONF_PADDLE_GRADIENT_NUM")
|
||||
|
||||
tokenpath = '/var/run/secrets/kubernetes.io/serviceaccount/token'
|
||||
|
||||
|
||||
def refine_unknown_args(cmd_args):
|
||||
'''
|
||||
refine unknown parameters to handle some special parameters
|
||||
'''
|
||||
new_args = []
|
||||
for arg in cmd_args:
|
||||
if arg.startswith("--") and arg.find("=") != -1:
|
||||
equal_pos = arg.find("=") # find first = pos
|
||||
arglist = list(arg)
|
||||
arglist[equal_pos] = " "
|
||||
arg = "".join(arglist)
|
||||
arg = arg.lstrip("-")
|
||||
new_args += arg.split(" ")
|
||||
elif arg.startswith("--") and arg.find("=") == -1:
|
||||
arg = arg.lstrip("-")
|
||||
new_args.append(arg)
|
||||
else:
|
||||
new_args.append(arg)
|
||||
return new_args
|
||||
|
||||
|
||||
def isPodAllRunning(podlist):
|
||||
'''
|
||||
check all pod is running
|
||||
'''
|
||||
require = len(podlist["items"])
|
||||
running = 0
|
||||
for pod in podlist["items"]:
|
||||
if pod["status"]["phase"] == "Running":
|
||||
running += 1
|
||||
print "waiting for pods running, require:", require, "running:", running
|
||||
if require == running:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def getPodList():
|
||||
'''
|
||||
get all container status of the job
|
||||
'''
|
||||
apiserver = "https://" + \
|
||||
os.getenv("KUBERNETES_SERVICE_HOST") + ":" + \
|
||||
os.getenv("KUBERNETES_SERVICE_PORT_HTTPS")
|
||||
|
||||
pod = API + NAMESPACE + "/pods?"
|
||||
job = JOBNAME
|
||||
if os.path.isfile(tokenpath):
|
||||
tokenfile = open(tokenpath, mode='r')
|
||||
token = tokenfile.read()
|
||||
Bearer = "Bearer " + token
|
||||
headers = {"Authorization": Bearer}
|
||||
return requests.get(apiserver + pod + JOBSELECTOR + job,
|
||||
headers=headers,
|
||||
verify=False).json()
|
||||
else:
|
||||
return requests.get(apiserver + pod + JOBSELECTOR + job,
|
||||
verify=False).json()
|
||||
|
||||
|
||||
def getIdMap(podlist):
|
||||
'''
|
||||
generate tainer_id by ip
|
||||
'''
|
||||
ips = []
|
||||
for pod in podlist["items"]:
|
||||
ips.append(pod["status"]["podIP"])
|
||||
ips.sort()
|
||||
idMap = {}
|
||||
for i in range(len(ips)):
|
||||
idMap[ips[i]] = i
|
||||
return idMap
|
||||
|
||||
|
||||
def startPaddle(idMap={}, train_args_dict=None):
|
||||
'''
|
||||
start paddle pserver and trainer
|
||||
'''
|
||||
program = 'paddle train'
|
||||
args = " --nics=" + PADDLE_NIC
|
||||
args += " --port=" + str(PADDLE_PORT)
|
||||
args += " --ports_num=" + str(PADDLE_PORTS_NUM)
|
||||
args += " --comment=" + "paddle_process_by_paddle"
|
||||
ip_string = ""
|
||||
for ip in idMap.keys():
|
||||
ip_string += (ip + ",")
|
||||
ip_string = ip_string.rstrip(",")
|
||||
args += " --pservers=" + ip_string
|
||||
args_ext = ""
|
||||
for key, value in train_args_dict.items():
|
||||
args_ext += (' --' + key + '=' + value)
|
||||
localIP = socket.gethostbyname(socket.gethostname())
|
||||
trainerId = idMap[localIP]
|
||||
args += " " + args_ext + " --trainer_id=" + \
|
||||
str(trainerId) + " --save_dir=" + JOB_PATH_OUTPUT
|
||||
logDir = JOB_PATH_OUTPUT + "/node_" + str(trainerId)
|
||||
if not os.path.exists(JOB_PATH_OUTPUT):
|
||||
os.makedirs(JOB_PATH_OUTPUT)
|
||||
if not os.path.exists(logDir):
|
||||
os.mkdir(logDir)
|
||||
copyCommand = 'cp -rf ' + JOB_PATH + \
|
||||
"/" + str(trainerId) + "/data/*" + " ./data/"
|
||||
os.system(copyCommand)
|
||||
startPserver = 'nohup paddle pserver' + \
|
||||
" --port=" + str(PADDLE_PORT) + \
|
||||
" --ports_num=" + str(PADDLE_PORTS_NUM) + \
|
||||
" --ports_num_for_sparse=" + str(PADDLE_PORTS_NUM_SPARSE) + \
|
||||
" --nics=" + PADDLE_NIC + \
|
||||
" --comment=" + "paddle_process_by_paddle" + \
|
||||
" --num_gradient_servers=" + str(PADDLE_SERVER_NUM) +\
|
||||
" > " + logDir + "/server.log 2>&1 &"
|
||||
print startPserver
|
||||
os.system(startPserver)
|
||||
# wait until pservers completely start
|
||||
time.sleep(20)
|
||||
startTrainer = program + args + " 2>&1 | tee " + \
|
||||
logDir + "/train.log"
|
||||
print startTrainer
|
||||
os.system(startTrainer)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="start_paddle.py", description='simple tool for k8s')
|
||||
args, train_args_list = parser.parse_known_args()
|
||||
train_args = refine_unknown_args(train_args_list)
|
||||
train_args_dict = dict(zip(train_args[:-1:2], train_args[1::2]))
|
||||
podlist = getPodList()
|
||||
# need to wait until all pods are running
|
||||
while not isPodAllRunning(podlist):
|
||||
time.sleep(20)
|
||||
podlist = getPodList()
|
||||
idMap = getIdMap(podlist)
|
||||
startPaddle(idMap, train_args_dict)
|
After Width: | Height: | Size: 242 KiB |
After Width: | Height: | Size: 70 KiB |
After Width: | Height: | Size: 35 KiB |
After Width: | Height: | Size: 51 KiB |
After Width: | Height: | Size: 87 KiB |