modify pipeline optimizer to only support the mode of sync pipeline training (#25065)

* modify pipeline optimizer, test=develop
fix_copy_if_different
lilong12 5 years ago committed by GitHub
parent b555378e18
commit 3d96601b82
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -0,0 +1,3 @@
paddle.fluid.optimizer.PipelineOptimizer (paddle.fluid.optimizer.PipelineOptimizer, ('document', '2e55a29dbeb874934f7a1a1af3a22b8c'))
paddle.fluid.optimizer.PipelineOptimizer.__init__ (ArgSpec(args=['self', 'optimizer', 'num_microbatches', 'start_cpu_core_id'], varargs=None, keywords=None, defaults=(1, 0)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.optimizer.PipelineOptimizer.minimize (ArgSpec(args=['self', 'loss', 'startup_program', 'parameter_list', 'no_grad_set'], varargs=None, keywords=None, defaults=(None, None, None)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))

File diff suppressed because it is too large Load Diff

@ -49,7 +49,6 @@ endif()
LIST(REMOVE_ITEM TEST_OPS test_conv2d_transpose_op)
if(WIN32)
LIST(REMOVE_ITEM TEST_OPS test_boxps)
LIST(REMOVE_ITEM TEST_OPS test_paddlebox_datafeed)
LIST(REMOVE_ITEM TEST_OPS test_trainer_desc)
LIST(REMOVE_ITEM TEST_OPS test_multiprocess_reader_exception)
LIST(REMOVE_ITEM TEST_OPS test_avoid_twice_initialization)
@ -89,7 +88,6 @@ endif()
if(NOT WITH_GPU OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_pipeline)
LIST(REMOVE_ITEM TEST_OPS test_boxps)
LIST(REMOVE_ITEM TEST_OPS test_paddlebox_datafeed)
endif()
list(REMOVE_ITEM TEST_OPS test_seq_concat_op) # FIXME(helin): https://github.com/PaddlePaddle/Paddle/issues/8290
list(REMOVE_ITEM TEST_OPS test_lstm_unit_op) # # FIXME(qijun) https://github.com/PaddlePaddle/Paddle/issues/5185

@ -87,117 +87,5 @@ class TestRunCmd(unittest.TestCase):
self.assertTrue(ret2 == 0)
class TestBoxPSPreload(unittest.TestCase):
""" TestCases for BoxPS Preload """
def test_boxps_cpu(self):
self.run_boxps_preload(True, True)
self.run_boxps_preload(True, False)
def test_boxps_gpu(self):
self.run_boxps_preload(False, True)
self.run_boxps_preload(False, False)
def run_boxps_preload(self, is_cpu=True, random_with_lineid=False):
program = fluid.Program()
with fluid.program_guard(program):
x = fluid.layers.data(
name='x', shape=[1], dtype='int64', lod_level=0)
y = fluid.layers.data(
name='y', shape=[1], dtype='int64', lod_level=0)
z = layers.data(name='z', shape=[1], dtype='int64')
emb_x, emb_y = _pull_box_sparse([x, y], size=2)
emb_xp = _pull_box_sparse(x, size=2)
concat = layers.concat([emb_x, emb_y], axis=1)
fc = layers.fc(input=concat,
name="fc",
size=1,
num_flatten_dims=1,
bias_attr=False)
loss = layers.reduce_mean(fc)
place = fluid.CPUPlace(
) if is_cpu or not core.is_compiled_with_cuda(
) else fluid.CUDAPlace(0)
exe = fluid.Executor(place)
batch_size = 100
def binary_print(slot, fout):
fout.write(str(len(slot)) + " ")
for e in slot:
fout.write(str(e) + " ")
batch1 = np.ones(
(batch_size, 2, 1)).astype("int64").reshape(batch_size, 2, 1)
filelist = []
place_str = "cpu" if is_cpu else "gpu"
for i in range(2):
filelist.append("test_hdfs_" + place_str + "_" + str(i))
for f in filelist:
with open(f, "w") as fout:
for ins in batch1:
for slot in ins:
binary_print(slot, fout)
fout.write("\n")
def create_dataset():
dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset")
dataset.set_date("20190930")
dataset.set_use_var([x, y])
dataset.set_batch_size(2)
dataset.set_thread(1)
dataset.set_filelist(filelist)
return dataset
datasets = []
datasets.append(create_dataset())
datasets.append(create_dataset())
optimizer = fluid.optimizer.SGD(learning_rate=0.5)
optimizer = fluid.optimizer.PipelineOptimizer(
optimizer,
cut_list=[],
place_list=[place],
concurrency_list=[1],
queue_size=1,
sync_steps=-1)
optimizer.minimize(loss)
program._pipeline_opt["dump_fields"] = [
"fc.tmp_0", "fc.tmp_0@GRAD", "fake_var", "z",
"reduce_mean_3.tmp_0"
]
# fake_var: not in scope
# z: in scope, but no initialized
# reduce_mean_0.tmp_0, dimension is not right
program._pipeline_opt["dump_fields_path"] = "./dump_log/"
program._pipeline_opt["dump_param"] = ["fc.w_0"]
program._pipeline_opt["enable_random_dump"] = True
program._pipeline_opt["dump_interval"] = 10
program._pipeline_opt["random_with_lineid"] = random_with_lineid
exe.run(fluid.default_startup_program())
datasets[0].load_into_memory()
datasets[0].begin_pass()
datasets[0].slots_shuffle([])
datasets[1].preload_into_memory()
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=datasets[0],
print_period=1)
datasets[0].end_pass(True)
datasets[1].wait_preload_done()
datasets[1].begin_pass()
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=datasets[1],
print_period=1,
debug=True)
datasets[1].end_pass(False)
for f in filelist:
os.remove(f)
if os.path.isdir("dump_log"):
shutil.rmtree("dump_log")
if __name__ == '__main__':
unittest.main()

@ -437,129 +437,6 @@ class TestDataNormOpWithSlotDim(OpTest):
self.check_grad(['X'], 'Y', no_grad_set=set([]))
class TestDataNormOpWithSyncStats(unittest.TestCase):
"""
test class for data norm op
test forward and backward
"""
def test_sync_stats(self):
if not core.is_compiled_with_cuda():
return
if os.name == 'nt':
print(
'Skip TestDataNormOpWithSyncStats because nccl is not supported on windows'
)
return
x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
emb = layers.embedding(
input=x,
param_attr=fluid.ParamAttr(name="embx"),
size=[10, 2],
is_sparse=False)
dn = layers.data_norm(
input=emb,
name="hehe",
epsilon=1e-4,
param_attr={
"batch_size": 1e4,
"batch_sum": 1e5,
"batch_square": 1e4
},
summary_decay_rate=1,
sync_stats=True) #[-1,3]
loss = layers.mean(dn)
optimizer = fluid.optimizer.SGD(learning_rate=0.5)
optimizer = fluid.optimizer.PipelineOptimizer(
optimizer,
cut_list=[[emb], [loss]],
place_list=[
fluid.CUDAPlace(0), fluid.CUDAPlace(0), fluid.CPUPlace()
],
concurrency_list=[1, 1, 1],
queue_size=1,
sync_steps=10000000, )
all_p = fluid.default_main_program().global_block().all_parameters()
parameter_without_datanorm = []
for e in all_p:
if e.name.find("batch_size") != -1 or e.name.find(
"batch_sq") != -1 or e.name.find("batch_sum") != -1:
continue
parameter_without_datanorm.append(e.name)
optimizer.minimize(loss, parameter_list=parameter_without_datanorm)
place = fluid.CUDAPlace(0)
exe = fluid.Executor(place)
#prepare data
batch_size = 1
def binary_print(slot, fout):
num = np.int16(len(slot) + 1)
num.tofile(fout)
a = np.int64(batch_size)
a.tofile(fout)
slot.tofile(fout)
#batch1 = np.array([[0,1], [1,2], [2,3]]).astype("int64").reshape(batch_size,2,1)
#batch2 = np.array([[1,2], [2,3], [3,4]]).astype("int64").reshape(batch_size,2,1)
batch1 = np.ones(
(batch_size, 1)).astype("int64").reshape(batch_size, 1, 1)
batch2 = np.ones(
(batch_size, 1)).astype("int64").reshape(batch_size, 1, 1)
data = [batch1, batch2]
data = [batch1]
filelist = []
for i in range(2):
filelist.append("test_pipeline_input_" + str(i))
for f in filelist:
with open(f, "wb") as fout:
for batch_data in data:
for ins in batch_data:
for slot in ins:
binary_print(slot, fout)
dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
dataset.set_use_var([x])
dataset.set_batch_size(batch_size)
dataset.set_filelist(filelist)
block = fluid.default_startup_program().global_block()
block.append_op(
type='c_comm_init_all', attrs={'ring_id': 0,
'devices': [0, 1]})
with open("main_program", "w") as fout:
fout.write(str(fluid.default_main_program()))
with open("startup_program", "w") as fout:
fout.write(str(fluid.default_startup_program()))
exe.run(fluid.default_startup_program())
emb_t = fluid.global_scope().find_var("embx").get_tensor()
para = np.ones((10, 2)).astype("float32")
emb_t.set(para, place)
for epoch in range(1):
exe.train_from_dataset(
fluid.default_main_program(),
dataset,
thread=2,
debug=False,
fetch_list=[],
fetch_info=[],
print_period=1)
batch_size = np.array(fluid.global_scope().find_var("hehe.batch_size")
.get_tensor())
self.assertEqual(batch_size[0], 10002)
b = np.array(fluid.global_scope().find_var("hehe.batch_sum").get_tensor(
))
self.assertEqual(b[0], 100002)
c = np.array(fluid.global_scope().find_var("hehe.batch_square_sum")
.get_tensor())
self.assertEqual(c[0], 10162)
for f in filelist:
os.remove(f)
class TestDataNormOpErrorr(unittest.TestCase):
def test_errors(self):
with program_guard(Program(), Program()):

@ -1,146 +0,0 @@
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import paddle.fluid as fluid
import paddle.fluid.core as core
import os
import unittest
import paddle.fluid.layers as layers
class TestDataFeed(unittest.TestCase):
""" TestBaseCase(Merge PV) """
def setUp(self):
self.batch_size = 10
self.pv_batch_size = 10
self.enable_pv_merge = True
self.merge_by_sid = True
def set_data_config(self):
self.dataset = fluid.DatasetFactory().create_dataset("BoxPSDataset")
self.dataset.set_feed_type("PaddleBoxDataFeed")
self.dataset.set_parse_logkey(True)
self.dataset.set_thread(1)
self.dataset.set_enable_pv_merge(self.enable_pv_merge)
self.dataset.set_batch_size(self.batch_size)
if self.enable_pv_merge:
self.dataset.set_merge_by_sid(self.merge_by_sid)
self.dataset.set_rank_offset("rank_offset")
self.dataset.set_pv_batch_size(self.pv_batch_size)
def test_pboxdatafeed(self):
self.run_dataset(False)
def test_pboxdatafeed(self):
self.run_dataset(True)
def run_dataset(self, is_cpu):
x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0)
rank_offset = fluid.layers.data(
name="rank_offset",
shape=[-1, 7],
dtype="int32",
lod_level=0,
append_batch_size=False)
emb_x, emb_y = fluid.contrib.layers._pull_box_extended_sparse(
[x, y], size=2, extend_size=128)
concat = layers.concat([emb_x[0], emb_x[1], emb_y[0], emb_y[1]], axis=1)
fc = layers.fc(input=concat,
name="fc",
size=1,
num_flatten_dims=1,
bias_attr=False)
loss = layers.reduce_mean(fc)
place = fluid.CPUPlace() if is_cpu or not core.is_compiled_with_cuda(
) else fluid.CUDAPlace(0)
exe = fluid.Executor(place)
with open("test_run_with_dump_a.txt", "w") as f:
data = "1 1702f830eee19501ad7429505f714c1d 1 1 1 9\n"
data += "1 1702f830eee19502ad7429505f714c1d 1 2 1 8\n"
data += "1 1702f830eee19503ad7429505f714c1d 1 3 1 7\n"
data += "1 1702f830eee0de01ad7429505f714c2d 1 4 1 6\n"
data += "1 1702f830eee0df01ad7429505f714c3d 1 5 1 5\n"
data += "1 1702f830eee0df02ad7429505f714c3d 1 6 1 4\n"
f.write(data)
with open("test_run_with_dump_b.txt", "w") as f:
data = "1 1702f830fff22201ad7429505f715c1d 1 1 1 1\n"
data += "1 1702f830fff22202ad7429505f715c1d 1 2 1 2\n"
data += "1 1702f830fff22203ad7429505f715c1d 1 3 1 3\n"
data += "1 1702f830fff22101ad7429505f714ccd 1 4 1 4\n"
data += "1 1702f830fff22102ad7429505f714ccd 1 5 1 5\n"
data += "1 1702f830fff22103ad7429505f714ccd 1 6 1 6\n"
data += "1 1702f830fff22104ad7429505f714ccd 1 6 1 7\n"
f.write(data)
self.set_data_config()
self.dataset.set_use_var([x, y])
self.dataset.set_filelist(
["test_run_with_dump_a.txt", "test_run_with_dump_b.txt"])
optimizer = fluid.optimizer.SGD(learning_rate=0.5)
optimizer = fluid.optimizer.PipelineOptimizer(
optimizer,
cut_list=[],
place_list=[place],
concurrency_list=[1],
queue_size=1,
sync_steps=-1)
optimizer.minimize(loss)
exe.run(fluid.default_startup_program())
self.dataset.set_current_phase(1)
self.dataset.load_into_memory()
self.dataset.preprocess_instance()
self.dataset.begin_pass()
pv_num = self.dataset.get_pv_data_size()
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=self.dataset,
print_period=1)
self.dataset.set_current_phase(0)
self.dataset.postprocess_instance()
exe.train_from_dataset(
program=fluid.default_main_program(),
dataset=self.dataset,
print_period=1)
self.dataset.end_pass(True)
os.remove("test_run_with_dump_a.txt")
os.remove("test_run_with_dump_b.txt")
class TestDataFeed2(TestDataFeed):
""" TestBaseCase(Merge PV not merge by sid) """
def setUp(self):
self.batch_size = 10
self.pv_batch_size = 10
self.enable_pv_merge = True
self.merge_by_sid = False
class TestDataFeed3(TestDataFeed):
""" TestBaseCase(Not Merge PV) """
def setUp(self):
self.batch_size = 10
self.pv_batch_size = 10
self.enable_pv_merge = False
if __name__ == '__main__':
unittest.main()

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save