Save checkpoint automatically (#25917)

revert-24895-update_cub
gongweibao 5 years ago committed by GitHub
parent e853ece0a2
commit 0067a2e4ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -25,11 +25,13 @@ import six
from .data_feeder import convert_dtype
from .framework import Program, default_main_program, Variable, Operator, convert_np_dtype_to_dtype_
from . import core
from . import unique_name
from . import compiler
from .. import compat as cpt
from .trainer_factory import TrainerFactory
from .trainer_factory import FetchHandlerMonitor
import copy
from .incubate.checkpoint import auto_checkpoint as acp
__all__ = ['Executor', 'global_scope', 'scope_guard']
@ -559,6 +561,9 @@ class Executor(object):
self._closed = False
self.pruned_program_scope_caches = dict()
self._auto_checkpoint_name = unique_name.generate(
"__auto_checkpoint_executor__")
def _get_scope_cache(self, program_cache_key):
return self.scope_caches.get(program_cache_key, None)
@ -1152,6 +1157,8 @@ class Executor(object):
compiled = isinstance(program, compiler.CompiledProgram)
acp._auto_checkpoint(self, program)
# For backward compatibility, run directly.
if not compiled:
# In distributed training, the compiled program is saved in Program._graph

@ -2385,12 +2385,29 @@ class Operator(object):
def _is_optimize_op(self):
op_maker = core.op_proto_and_checker_maker
OPTIMIZE = core.op_proto_and_checker_maker.OpRole.Optimize
if not self.desc.has_attr(op_maker.kOpRoleAttrName()):
return False
op_role = self.desc.attr(op_maker.kOpRoleAttrName())
if op_role & int(OPTIMIZE):
return True
else:
return False
def _is_backward_op(self):
op_maker = core.op_proto_and_checker_maker
BACKWARD = core.op_proto_and_checker_maker.OpRole.Backward
if not self.desc.has_attr(op_maker.kOpRoleAttrName()):
return False
op_role = self.desc.attr(op_maker.kOpRoleAttrName())
if op_role & int(BACKWARD):
return True
return False
class Block(object):
"""
@ -3942,6 +3959,10 @@ class Program(object):
# appending gradients times
self._appending_grad_times = 0
# identifier for auto checkpoint
self._auto_checkpoint_name = unique_name.generate(
"__auto_checkpoint_program__")
# compiled program, i.e. Graph
self._graph = None

@ -0,0 +1,13 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

File diff suppressed because it is too large Load Diff

@ -0,0 +1,223 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..fleet.utils.fs import FS, LocalFS
from ..fleet.utils.hdfs import HDFSClient
from ...compiler import CompiledProgram
class SerializableBase(object):
def serialize(self, path):
raise NotImplementedError
def deserialize(self, path):
raise NotImplementedError
class PaddleModel(SerializableBase):
def __init__(self, exe, program):
self._exe = exe
self._origin_program = program
self._program = program
if isinstance(program, CompiledProgram):
self._program = program._program
self._file_name = "_paddle_fleet_param__"
def serialize(self, path):
from ...io import save_persistables
save_persistables(
executor=self._exe,
dirname=path,
main_program=self._program,
filename=self._file_name)
def deserialize(self, path):
from ...io import load_persistables
load_persistables(
executor=self._exe,
dirname=path,
main_program=self._program,
filename=self._file_name)
class CheckpointSaver(object):
def __init__(self, fs):
self._fs = fs
self._checkpoint_prefix = "__paddle_checkpoint__"
def save_checkpoint(self,
path,
slists,
trainer_id=None,
local_cache_path=".cache"):
"""
Serialize objects in slists to path
Return really saved path and checkpoint_no
"""
if not self._fs.is_exist(path):
self._fs.mkdirs(path)
else:
assert self._fs.is_dir(path), "path:{} must be a directory".format(
path)
max_no = self._get_last_checkpoint_no(path)
if max_no < 0:
max_no = -1
max_no += 1
real_path = "{}/{}.{}".format(path, self._checkpoint_prefix, max_no)
tmp_path = "{}.tmp".format(real_path)
saved_path = tmp_path
local_fs = LocalFS()
cache_path = None
if self._fs.need_upload_download():
cache_path = "{}/{}.{}.saved_cache".format(
local_cache_path, self._checkpoint_prefix, max_no)
if trainer_id is not None:
cache_path = "{}.{}".format(cache_path, trainer_id)
if not local_fs.is_exist(cache_path):
local_fs.mkdirs(cache_path)
else:
assert local_fs.is_dir(cache_path), \
"cache path:{} must be a directory".format(cache_path)
saved_path = cache_path
for s in slists:
s.serialize(saved_path)
if self._fs.need_upload_download():
self._fs.delete(tmp_path)
self._fs.upload(cache_path, tmp_path)
local_fs.delete(cache_path)
self._fs.mv(tmp_path, real_path)
return real_path, max_no
def load_checkpoint(self,
path,
slists,
trainer_id,
local_cache_path=".cache",
checkpoint_no=None,
ignore_empty=True):
"""
Deserialize objects in slists from path
Return really load path
"""
if checkpoint_no is None:
max_no = self._get_last_checkpoint_no(path)
if not ignore_empty:
assert max_no >= 0, "Can't find checkpoint"
if max_no < 0:
return None
checkpoint_no = max_no
else:
assert isinstance(checkpoint_no, int)
assert checkpoint_no >= 0
local_fs = LocalFS()
if self._fs.need_upload_download():
cache_path = "{}/{}.{}.load_cache".format(
local_cache_path, self._checkpoint_prefix, checkpoint_no)
if trainer_id is not None:
cache_path = "{}.{}".format(cache_path, trainer_id)
if not local_fs.is_exist(local_cache_path):
local_fs.mkdirs(local_cache_path)
if local_fs.is_exist(cache_path):
local_fs.delete(cache_path)
real_path = "{}/{}.{}".format(path, self._checkpoint_prefix,
checkpoint_no)
load_path = real_path
if self._fs.need_upload_download():
self._fs.download(real_path, cache_path)
load_path = cache_path
for s in slists:
s.deserialize(load_path)
if self._fs.need_upload_download() and cache_path:
local_fs.delete(cache_path)
return real_path
def get_checkpoint_no(self, root_path):
a = []
dirs = self._fs.list_dirs(root_path)
for d in dirs:
g = d.split(".")
if len(g) != 2:
continue
if g[0] != self._checkpoint_prefix:
continue
try:
n = int(g[1])
a.append(n)
except:
continue
a.sort()
return a
def _get_last_checkpoint_no(self, root_path):
"""
only get the first depth
"""
a = self.get_checkpoint_no(root_path)
if len(a) > 0:
return a[-1]
return -1
def clean_redundant_checkpoints(self, root_path, reserved=[]):
max_no = self._get_last_checkpoint_no(root_path)
if max_no < 0:
return
s = set(reserved)
if len(s) == 0:
s.add(max_no)
dirs = self._fs.list_dirs(root_path)
for d in dirs:
g = d.split(".")
if len(g) != 2:
continue
if g[0] != self._checkpoint_prefix:
continue
try:
n = int(g[1])
if n not in s:
path = "{}/{}.{}".format(root_path, self._checkpoint_prefix,
n)
self._fs.delete(path)
except Exception as e:
print(e)
continue

@ -27,6 +27,7 @@ from paddle.fluid.incubate.fleet.base.fleet_base import DistributedOptimizer
from paddle.fluid import compiler
from paddle.fluid.incubate.fleet.utils.fs import LocalFS
from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel, CheckpointSaver
import os
import sys
@ -46,21 +47,6 @@ class DistFCConfig(object):
pass
class TrainStatus(object):
def __init__(self, epoch_no=-1):
# completed epoch
self._epoch_no = epoch_no
def next(self):
return self._epoch_no + 1
def __eq__(self, t):
return self._epoch_no == t._epoch_no
def __ne__(self, t):
return not self == t
class Collective(Fleet):
def __init__(self):
super(Collective, self).__init__(Mode.COLLECTIVE)
@ -152,90 +138,10 @@ class Collective(Fleet):
io.save_persistables(executor, dirname, main_program, filename=filename)
def _save_train_status(self, path, train_status):
d = {}
d["epoch_no"] = train_status._epoch_no
file_name = "{}/fleet_train_status".format(path)
with open(file_name, 'w') as f:
json.dump(d, f)
def _load_train_status(self, path):
file_name = "{}/fleet_train_status".format(path)
r = TrainStatus()
if not os.path.isfile(file_name):
return r
d = {}
with open(file_name, 'r') as f:
d = json.load(f)
assert "epoch_no" in d, "Can't find epoch_no in dict from train_status file:{}".format(
d)
r._epoch_no = d["epoch_no"]
assert r._epoch_no >= 0, "Data in checkpoint file is not valid:{}".format(
d)
return r
def _get_last_checkpoint_no(self, root_path, fs):
"""
only get the first depth
"""
max_no = -1
d = {}
dirs = fs.list_dirs(root_path)
for d in dirs:
g = d.split(".")
if len(g) != 2:
continue
if g[0] != "__paddle_fleet_checkpoint__":
continue
try:
n = int(g[1])
if n > max_no:
max_no = n
except:
continue
return max_no
def clean_redundant_checkpoints(self,
root_path,
fs=LocalFS(),
checkpoint_num=1):
max_no = self._get_last_checkpoint_no(root_path, fs)
if max_no < 0:
return
if checkpoint_num < 1:
checkpoint_num = 1
dirs = fs.list_dirs(root_path)
for d in dirs:
g = d.split(".")
if len(g) != 2:
continue
if g[0] != self._checkpoint_prefix:
continue
try:
n = int(g[1])
if n <= max_no - checkpoint_num:
path = "{}/{}.{}".format(root_path, self._checkpoint_prefix,
n)
fs.delete(path)
except Exception as e:
print(e)
continue
def save_checkpoint(self,
executor,
path,
trainer_id,
train_status,
main_program=None,
fs=LocalFS(),
@ -248,53 +154,25 @@ class Collective(Fleet):
if main_program == None:
main_program = self._transpiled_program
if not fs.is_exist(path):
fs.mkdirs(path)
else:
assert fs.is_dir(path), "path:%s must be a directory".format(path)
max_no = self._get_last_checkpoint_no(path, fs=fs)
if max_no < 0:
max_no = -1
real_path = "{}/{}.{}".format(path, self._checkpoint_prefix, max_no + 1)
tmp_path = "{}.tmp".format(real_path)
saved_path = tmp_path
local_fs = LocalFS()
cache_path = None
if fs.need_upload_download():
cache_path = "{}/{}.{}.saved_cache".format(
local_cache_path, self._checkpoint_prefix, max_no + 1)
if not local_fs.is_exist(cache_path):
local_fs.mkdirs(cache_path)
else:
assert fs.is_dir(
path), "cache path:{} must be a directory".format(
cache_path)
saved_path = cache_path
self.save_persistables(
executor=executor,
dirname=saved_path,
main_program=main_program,
filename=self._param_file_name)
self._save_train_status(path=saved_path, train_status=train_status)
if fs.need_upload_download():
fs.delete(tmp_path)
fs.upload(cache_path, tmp_path)
fs.mv(tmp_path, real_path)
m = PaddleModel(executor, main_program)
t = train_status
c = CheckpointSaver(fs)
real_path, checkpoint_no = c.save_checkpoint(
path=path,
slists=[m, t],
trainer_id=trainer_id,
local_cache_path=local_cache_path)
if not remain_all_checkpoint:
self.clean_redundant_checkpoints(path)
c.clean_redundant_checkpoints(path)
return real_path, checkpoint_no
def load_checkpoint(self,
executor,
path,
trainer_id,
train_status,
main_program=None,
fs=LocalFS(),
local_cache_path=".cache",
@ -302,39 +180,17 @@ class Collective(Fleet):
"""
This function load persistables and current epoch num from path.
"""
max_no = self._get_last_checkpoint_no(path, fs)
if not ignore_empty:
assert max_no >= 0, "Can't find checkpoint"
if max_no < 0:
return None
local_fs = LocalFS()
if fs.need_upload_download():
cache_path = "{}/{}.{}.load_cache.{}".format(
local_cache_path, self._checkpoint_prefix, max_no, trainer_id)
if not local_fs.is_exist(local_cache_path):
local_fs.mkdirs(local_cache_path)
if local_fs.is_exist(cache_path):
local_fs.delete(cache_path)
real_path = "{}/{}.{}".format(path, self._checkpoint_prefix, max_no)
load_path = real_path
if fs.need_upload_download():
fs.download(real_path, cache_path)
load_path = cache_path
if main_program == None:
main_program = self._transpiled_program
io.load_persistables(
executor=executor,
dirname=load_path,
main_program=main_program,
filename=self._param_file_name)
return self._load_train_status(load_path)
m = PaddleModel(executor, main_program)
c = CheckpointSaver(fs)
return c.load_checkpoint(
path, [m, train_status],
trainer_id=trainer_id,
ignore_empty=ignore_empty,
local_cache_path=local_cache_path)
fleet = Collective()

@ -45,6 +45,10 @@ class FSTimeOut(Exception):
pass
class FSShellCmdAborted(ExecuteError):
pass
class FS(object):
@abc.abstractmethod
def ls_dir(self, fs_path):
@ -87,7 +91,7 @@ class FS(object):
raise NotImplementedError
@abc.abstractmethod
def mv(self, fs_src_path, fs_dst_path):
def mv(self, fs_src_path, fs_dst_path, overwrite=False, test_exists=False):
raise NotImplementedError
@abc.abstractmethod
@ -98,6 +102,10 @@ class FS(object):
def list_dirs(self, fs_path):
raise NotImplementedError
@abc.abstractmethod
def touch(self, fs_path, exist_ok=True):
raise NotImplementedError
class LocalFS(FS):
def ls_dir(self, fs_path):
@ -138,13 +146,21 @@ class LocalFS(FS):
def is_exist(self, fs_path):
return os.path.exists(fs_path)
def touch(self, fs_path):
return Path(fs_path).touch()
def touch(self, fs_path, exist_ok=True):
if self.is_exist(fs_path):
if exist_ok:
return
raise FSFileExistsError
return Path(fs_path).touch(exist_ok=True)
def mv(self, src_path, dst_path):
def mv(self, src_path, dst_path, overwrite=False, test_exists=False):
if not self.is_exist(src_path):
raise FSFileNotExistsError
if overwrite and self.is_exist(dst_path):
self.delete(dst_path)
if self.is_exist(dst_path):
raise FSFileExistsError

File diff suppressed because it is too large Load Diff

@ -86,6 +86,10 @@ if(WIN32)
LIST(REMOVE_ITEM TEST_OPS test_ref_by_trainer_id_op)
endif()
LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint)
LIST(REMOVE_ITEM TEST_OPS test_auto_checkpoint2)
LIST(REMOVE_ITEM TEST_OPS test_checkpoint_saver)
if(APPLE OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_hdfs)
LIST(REMOVE_ITEM TEST_OPS test_fs_interface)
@ -190,10 +194,11 @@ function(bash_test_modules TARGET_NAME)
endif()
set(options SERIAL)
set(oneValueArgs "")
set(multiValueArgs MODULES DEPS ENVS LABELS)
set(oneValueArgs TIMEOUT START_BASH)
set(multiValueArgs DEPS ENVS LABELS)
cmake_parse_arguments(bash_test_modules "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
set(timeout 350)
if(${bash_test_modules_TIMEOUT})
set(timeout ${bash_test_modules_TIMEOUT})
@ -204,13 +209,13 @@ function(bash_test_modules TARGET_NAME)
COMMAND ${CMAKE_COMMAND} -E env PYTHONPATH=${PADDLE_BINARY_DIR}/python
TEST_TARGET_NAME=${TARGET_NAME} TEST_TIMEOUT=${timeout} ${bash_test_modules_ENVS}
WITH_COVERAGE=ON COVERAGE_FILE=${PADDLE_BINARY_DIR}/python-coverage.data
bash ${CMAKE_CURRENT_BINARY_DIR}/${bash_test_modules_MODULES}
bash ${CMAKE_CURRENT_BINARY_DIR}/${bash_test_modules_START_BASH}
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
else()
add_test(NAME ${TARGET_NAME}
COMMAND ${CMAKE_COMMAND} -E env PYTHONPATH=${PADDLE_BINARY_DIR}/python
TEST_TARGET_NAME=${TARGET_NAME} TEST_TIMEOUT=${timeout} ${bash_test_modules_ENVS}
bash ${CMAKE_CURRENT_BINARY_DIR}/${bash_test_modules_MODULES}
bash ${CMAKE_CURRENT_BINARY_DIR}/${bash_test_modules_START_BASH}
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
endif()
@ -397,15 +402,16 @@ if(WITH_DISTRIBUTE)
if(NOT APPLE)
if(WITH_GPU)
# NOTE. test_launch only work in gpu collective mode
bash_test_modules(test_launch MODULES test_launch.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_launch START_BASH test_launch.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
py_test_modules(test_fleet_checkpoint MODULES test_fleet_checkpoint)
endif()
bash_test_modules(test_launch_ps MODULES test_launch_ps.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch MODULES test_fleet_launch.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_launch_ps START_BASH test_launch_ps.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
bash_test_modules(test_fleet_launch START_BASH test_fleet_launch.sh ENVS PADDLE_BINARY_DIR=${PADDLE_BINARY_DIR})
set(dist_ut_port 20001)
foreach(TEST_OP ${DIST_TEST_OPS})
bash_test_modules(${TEST_OP} MODULES dist_test.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}")
bash_test_modules(${TEST_OP} START_BASH dist_test.sh SERIAL LABELS "RUN_TYPE=EXCLUSIVE" ENVS "PADDLE_DIST_UT_PORT=${dist_ut_port}")
MATH(EXPR dist_ut_port "${dist_ut_port}+50")
endforeach(TEST_OP)
endif(NOT APPLE)
@ -441,6 +447,12 @@ if(NOT WIN32)
set_tests_properties(test_parallel_executor_fetch_feed PROPERTIES TIMEOUT 450)
endif()
if(NOT APPLE AND NOT WIN32)
bash_test_modules(test_auto_checkpoint START_BASH dist_test.sh TIMEOUT 600)
bash_test_modules(test_auto_checkpoint2 START_BASH dist_test.sh TIMEOUT 600)
bash_test_modules(test_checkpoint_saver START_BASH dist_test.sh TIMEOUT 600)
endif()
add_subdirectory(sequence)
add_subdirectory(dygraph_to_static)

@ -0,0 +1,131 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os
import sys
from paddle.fluid.incubate.fleet.utils.fs import LocalFS
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp
from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel
from paddle.fluid.framework import program_guard
from paddle.fluid import unique_name
import numpy as np
from paddle.io import Dataset, BatchSampler, DataLoader
BATCH_NUM = 20
BATCH_SIZE = 16
#IMAGE_SIZE = 128
CLASS_NUM = 10
USE_GPU = False # whether use GPU to run model
places = fluid.cuda_places() if USE_GPU else fluid.cpu_places()
logger = None
def get_logger():
global logger
logger = acp._get_logger(20)
return logger
def get_random_images_and_labels(image_shape, label_shape):
image = np.random.random(size=image_shape).astype('float32')
label = np.random.random(size=label_shape).astype('int64')
return image, label
def sample_list_generator_creator():
def __reader__():
for _ in range(BATCH_NUM):
sample_list = []
for _ in range(BATCH_SIZE):
image, label = get_random_images_and_labels([16, 16], [1])
sample_list.append([image, label])
yield sample_list
return __reader__
class AutoCheckpointBase(unittest.TestCase):
def _init_env(self,
exe,
main_prog,
startup_prog,
minimize=True,
iterable=True):
def simple_net():
image = fluid.data(
name='image', shape=[-1, 16, 16], dtype='float32')
label = fluid.data(name='label', shape=[-1, 1], dtype='int64')
fc_tmp = fluid.layers.fc(image, size=CLASS_NUM)
cross_entropy = fluid.layers.softmax_with_cross_entropy(fc_tmp,
label)
loss = fluid.layers.reduce_mean(cross_entropy)
sgd = fluid.optimizer.SGD(learning_rate=1e-3)
if minimize:
sgd.minimize(loss)
return sgd, loss, image, label
with program_guard(main_prog, startup_prog):
sgd, loss, image, label = simple_net()
if minimize:
compiled = fluid.CompiledProgram(main_prog).with_data_parallel(
loss_name=loss.name)
else:
compiled = None
loader = fluid.io.DataLoader.from_generator(
feed_list=[image, label],
capacity=64,
use_double_buffer=True,
iterable=iterable)
loader.set_sample_list_generator(sample_list_generator_creator(),
places[0])
if minimize:
exe.run(startup_prog)
return compiled, loader, sgd, loss, image, label
def _generate(self):
main_prog = fluid.Program()
startup_prog = fluid.Program()
exe = fluid.Executor(places[0])
return exe, main_prog, startup_prog
def _reset_generator(self):
unique_name.generator = fluid.unique_name.UniqueNameGenerator()
acp.generator = fluid.unique_name.UniqueNameGenerator()
acp.g_acp_type = None
acp.g_checker = acp.AutoCheckpointChecker()
acp.g_program_attr = {}
def _clear_envs(self):
os.environ.pop("PADDLE_RUNNING_ENV", None)
def _readd_envs(self):
os.environ["PADDLE_RUNNING_ENV"] = "PADDLE_EDL_AUTO_CHECKPOINT"

File diff suppressed because it is too large Load Diff

@ -0,0 +1,77 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os
import sys
from paddle.fluid.incubate.fleet.utils.fs import LocalFS
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
import paddle.fluid.incubate.checkpoint.auto_checkpoint as acp
from paddle.fluid.incubate.checkpoint.checkpoint_saver import PaddleModel
from paddle.fluid.framework import program_guard
from paddle.fluid import unique_name
import numpy as np
from paddle.io import Dataset, BatchSampler, DataLoader
from paddle.fluid.tests.unittests.auto_checkpoint_utils import AutoCheckpointBase, get_logger
from paddle.fluid.tests.unittests.test_auto_checkpoint import AutoCheckPointACLBase
logger = get_logger()
class AutoCheckpointTest2(AutoCheckPointACLBase):
def setUp(self):
get_logger()
logger.info("enter tests")
self._old_environ = dict(os.environ)
proc_env = {
"PADDLE_RUNNING_ENV": "PADDLE_EDL_AUTO_CHECKPOINT",
"PADDLE_TRAINER_ID": "0",
"PADDLE_RUNNING_PLATFORM": "PADDLE_CLOUD",
"PADDLE_JOB_ID": "test_job_auto_2",
"PADDLE_EDL_HDFS_HOME": "/usr/local/hadoop-2.7.7",
"PADDLE_EDL_HDFS_NAME": "",
"PADDLE_EDL_HDFS_UGI": "",
"PADDLE_EDL_HDFS_CHECKPOINT_PATH": "auto_checkpoint_2",
"PADDLE_EDL_ONLY_FOR_CE_TEST": "1",
"PADDLE_EDL_FS_CACHE": ".auto_checkpoint_test_2",
"PADDLE_EDL_SAVE_CHECKPOINT_INTER": "0"
}
os.environ.update(proc_env)
def test_corner_epoch_no(self):
logger.info("begin test_corener_epoch_no")
checker = acp._get_checker()
fs = HDFSClient(checker.hdfs_home, None)
for i in range(3):
fs.delete(checker.hdfs_checkpoint_path)
self._reset_generator()
self._run_save_0(break_epoch_no=i)
self._reset_generator()
self._run_load_0(break_epoch_no=i)
fs.delete(checker.hdfs_checkpoint_path)
logger.info("end test_corener_epoch_no")
if __name__ == '__main__':
unittest.main()

@ -0,0 +1,57 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
from paddle.fluid.incubate.checkpoint.auto_checkpoint import ExeTrainStatus
from paddle.fluid.incubate.checkpoint.checkpoint_saver import CheckpointSaver
import os
import sys
from paddle.fluid.incubate.fleet.utils.fs import LocalFS
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
from paddle.fluid.incubate.checkpoint.checkpoint_saver import CheckpointSaver
class CheckpointerSaverTest(unittest.TestCase):
def test(self):
fs = HDFSClient("/usr/local/hadoop-2.7.7", None)
dir_path = "./checkpointsaver_test"
fs.delete(dir_path)
s = CheckpointSaver(fs)
fs.mkdirs("{}/exe.exe".format(dir_path))
fs.mkdirs("{}/exe.1".format(dir_path))
fs.mkdirs("{}/exe".format(dir_path))
a = s.get_checkpoint_no(dir_path)
self.assertEqual(len(a), 0)
fs.mkdirs("{}/__paddle_checkpoint__.0".format(dir_path))
fs.mkdirs("{}/__paddle_checkpoint__.exe".format(dir_path))
a = s.get_checkpoint_no(dir_path)
self.assertEqual(len(a), 1)
s.clean_redundant_checkpoints(dir_path)
s.clean_redundant_checkpoints(dir_path)
fs.delete(dir_path)
if __name__ == '__main__':
unittest.main()

@ -170,7 +170,8 @@ def program_equal(a, b):
k))
return False
assert (len(a.blocks) == len(b.blocks))
elif k == '_auto_checkpoint_name':
continue
elif (v != b.__dict__[k]):
raise ValueError("In program_equal not equal:{0}\n".format(k))

@ -15,12 +15,15 @@
import unittest
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet, TrainStatus
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
from paddle.fluid.incubate.checkpoint.auto_checkpoint import ExeTrainStatus
from paddle.fluid.incubate.checkpoint.checkpoint_saver import CheckpointSaver
import os
import sys
from paddle.fluid.incubate.fleet.utils.fs import LocalFS
from paddle.fluid.incubate.fleet.utils.hdfs import HDFSClient
from paddle.fluid.incubate.checkpoint.checkpoint_saver import CheckpointSaver
class FleetTest(unittest.TestCase):
@ -49,24 +52,35 @@ class FleetTest(unittest.TestCase):
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
status = TrainStatus(2)
fleet.save_checkpoint(exe, dir_path, train_status=status, fs=fs)
n1 = fleet._get_last_checkpoint_no(dir_path, fs=fs)
status = ExeTrainStatus()
status.epoch_no = 2
_, n1 = fleet.save_checkpoint(
exe, dir_path, trainer_id=0, train_status=status, fs=fs)
status2 = fleet.load_checkpoint(exe, dir_path, trainer_id=0, fs=fs)
status2 = ExeTrainStatus()
fleet.load_checkpoint(
exe, dir_path, trainer_id=0, fs=fs, train_status=status2)
self.assertEqual(status2, status)
fleet.save_checkpoint(exe, dir_path, train_status=status, fs=fs)
n2 = fleet._get_last_checkpoint_no(dir_path, fs=fs)
_, n2 = fleet.save_checkpoint(
exe,
dir_path,
trainer_id=0,
train_status=status,
fs=fs,
remain_all_checkpoint=False)
self.assertEqual(n2, n1 + 1)
fleet.clean_redundant_checkpoints(dir_path, fs=fs)
c = CheckpointSaver(fs)
cp_nos = c.get_checkpoint_no(dir_path)
assert len(cp_nos) == 1 # cleanup all others
# unnormal
# test remain_all_checkpoint
fleet.save_checkpoint(
exe,
dir_path,
trainer_id=0,
train_status=status,
fs=fs,
remain_all_checkpoint=False)
@ -79,6 +93,7 @@ class FleetTest(unittest.TestCase):
fleet.save_checkpoint(
exe,
dir_path,
trainer_id=0,
train_status=status,
fs=fs,
cache_path=cache_path)
@ -88,8 +103,13 @@ class FleetTest(unittest.TestCase):
# can't load under a file
try:
status2 = fleet.load_checkpoint(
exe, dir_path, trainer_id=0, fs=fs, cache_path=cache_path)
fleet.load_checkpoint(
exe,
dir_path,
trainer_id=0,
train_status=status2,
fs=fs,
cache_path=cache_path)
self.assertFalse(True)
except:
pass

@ -15,7 +15,7 @@
import unittest
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet, TrainStatus
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os
import sys
import inspect
@ -38,6 +38,8 @@ class FSTest(unittest.TestCase):
func(a)
elif len(args) == 3:
func(a, a)
elif len(args) == 5:
func(a, a, a, a)
print("args:", args, len(args), "func:", func)
self.assertFalse(True)
except NotImplementedError as e:

@ -15,7 +15,7 @@
import unittest
import paddle.fluid as fluid
import paddle.fluid.incubate.fleet.base.role_maker as role_maker
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet, TrainStatus
from paddle.fluid.incubate.fleet.collective import CollectiveOptimizer, fleet
import os
import sys
@ -57,6 +57,12 @@ class FSTest(unittest.TestCase):
fs.delete(dir_path)
self.assertTrue(not fs.is_exist(dir_path))
fs.mkdirs(dir_path)
fs.mkdirs(new_dir_path)
fs.mv(dir_path, new_dir_path, overwrite=True)
self.assertTrue(not fs.is_exist(dir_path))
self.assertTrue(fs.is_exist(new_dir_path))
def _test_touch_file(self, fs):
file_path = os.path.abspath("./test_file")
@ -104,6 +110,35 @@ class FSTest(unittest.TestCase):
fs.delete(dst_file)
fs.delete(src_file)
def _test_try_download(self, fs):
src_file = os.path.abspath("./test_try_download.src")
dst_file = os.path.abspath("./test_try_download.dst")
fs.delete(dst_file)
fs.delete(src_file)
try:
fs._try_download(src_file, dst_file)
self.assertFalse(True)
except Exception as e:
pass
fs.delete(dst_file)
fs.delete(src_file)
def _test_try_upload(self, fs):
src_file = os.path.abspath("./test_try_upload.src")
dst_file = os.path.abspath("./test_try_uolpad.dst")
try:
fs._try_upload(src_file, dst_file)
self.assertFalse(True)
except Exception as e:
pass
fs.delete(dst_file)
fs.delete(src_file)
def _test_download(self, fs):
src_file = os.path.abspath("./test_download.src")
dst_file = os.path.abspath("./test_download.dst")
@ -138,8 +173,27 @@ class FSTest(unittest.TestCase):
fs.mkdirs(dir_name)
fs.mkdirs(dir_name)
def _test_rm(self, fs):
dir_name = "./test_rm_no_exist.flag"
fs.delete(dir_name)
try:
fs._rmr(dir_name)
self.assertFalse(True)
except Exception as e:
pass
try:
fs._rm(dir_name)
self.assertFalse(True)
except Exception as e:
pass
def test_exists(self):
fs = HDFSClient("/usr/local/hadoop-2.7.7/", None, time_out=15 * 1000)
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=15 * 1000,
sleep_inter=100)
self.assertFalse(fs.is_exist(os.path.abspath("./xxxx")))
self.assertFalse(fs.is_dir(os.path.abspath("./xxxx")))
self.assertTrue(fs.is_dir(os.path.abspath("./xxx/..")))
@ -149,27 +203,39 @@ class FSTest(unittest.TestCase):
dirs, files = fs.ls_dir(os.path.abspath("./xxx/.."))
def test_hdfs(self):
fs = HDFSClient("/usr/local/hadoop-2.7.7/", None, time_out=15 * 1000)
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=15 * 1000,
sleep_inter=100)
self._test_rm(fs)
self._test_touch(fs)
self._test_dirs(fs)
self._test_upload(fs)
self._test_download(fs)
self._test_mkdirs(fs)
self._test_list_dir(fs)
self._test_try_upload(fs)
self._test_try_download(fs)
def test_local(self):
fs = LocalFS()
self._test_rm(fs)
self._test_touch(fs)
self._test_dirs(fs)
self._test_touch_file(fs)
self._test_mkdirs(fs)
self._test_list_dir(fs)
self._test_try_upload(fs)
self._test_try_download(fs)
def test_timeout(self):
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=6 * 1000,
sleep_inter=2000)
sleep_inter=100)
src = "hdfs_test_timeout"
dst = "new_hdfs_test_timeout"
fs.delete(dst)
@ -190,7 +256,11 @@ class FSTest(unittest.TestCase):
print("second mv ret:{} output:{}".format(ret, output))
def test_is_dir(self):
fs = HDFSClient("/usr/local/hadoop-2.7.7/", None, time_out=15 * 1000)
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=15 * 1000,
sleep_inter=100)
self.assertFalse(fs.is_dir("./test_hdfs.py"))
s = """
java.io.IOException: Input/output error
@ -212,12 +282,38 @@ java.io.IOException: Input/output error
def test_config(self):
config = {"fs.default.name": "hdfs://xxx", "hadoop.job.ugi": "ugi"}
fs = HDFSClient("/usr/local/hadoop-2.7.7/", config, time_out=15 * 1000)
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
config,
time_out=15 * 1000,
sleep_inter=100)
def _test_list_dir(self, fs):
fs = HDFSClient("/usr/local/hadoop-2.7.7/", None, time_out=15 * 1000)
fs = HDFSClient(
"/usr/local/hadoop-2.7.7/",
None,
time_out=15 * 1000,
sleep_inter=100)
fs.ls_dir("test_not_exists")
def _test_touch(self, fs):
path = "./touch.flag"
fs.touch(path, exist_ok=True)
try:
fs.touch("./touch.flag", exist_ok=False)
self.assertFalse(0, "can't reach here")
except FSFileExistsError as e:
pass
try:
fs._touchz("./touch.flag")
self.assertFalse(True, "can't reach here")
except Exception as e:
pass
self.assertFalse(fs.is_dir(path))
fs.delete(path)
if __name__ == '__main__':
unittest.main()

@ -178,6 +178,7 @@ packages=['paddle',
'paddle.fluid.incubate',
'paddle.fluid.incubate.data_generator',
'paddle.fluid.incubate.fleet',
'paddle.fluid.incubate.checkpoint',
'paddle.fluid.incubate.fleet.base',
'paddle.fluid.incubate.fleet.parameter_server',
'paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler',

Loading…
Cancel
Save