@ -14,13 +14,11 @@
from __future__ import print_function
import paddle . dataset . flowers as flowers
import math
import paddle . fluid as fluid
import paddle . fluid . core as core
import unittest
import numpy as np
import paddle
import os
@ -38,101 +36,82 @@ def Lenet(data, class_dim):
return fc2
class TestFetch Op ( unittest . TestCase ) :
def parallel_exe ( self , train_inputs, seed , use_cuda ) :
main = fluid . Program ( )
class TestFetch AndFeed ( unittest . TestCase ) :
def parallel_exe ( self , use_cuda, run_parallel_exe , seed = 1 ) :
main _program = fluid . Program ( )
startup = fluid . Program ( )
startup . random_seed = seed
with fluid . program_guard ( main , startup ) :
with fluid . program_guard ( main _program , startup ) :
data = fluid . layers . data (
name = ' image ' , shape = [ 3 , 224 , 224 ] , dtype = ' float32 ' )
label = fluid . layers . data ( name = ' label ' , shape = [ 1 ] , dtype = ' int64 ' )
out = Lenet ( data , class_dim = 102 )
loss = fluid . layers . cross_entropy ( input = out , label = label )
loss = fluid . layers . mean ( loss )
opt = fluid . optimizer . Momentum (
learning_rate = 0.1 ,
momentum = 0.9 ,
regularization = fluid . regularizer . L2Decay ( 1e-4 ) )
opt . minimize ( loss )
# TODO(zcd): I found that onece the memory optimizer is open,
# parallel_exe doesn't fetch some variable, such as conv2d_0.b_0@GRAD,
# conv2d_1.b_0@GRAD. Those variables should not be pruned.
# fluid.memory_optimize(main)
place = fluid . CUDAPlace ( 0 ) if use_cuda else fluid . CPUPlace ( )
exe = fluid . Executor ( place )
exe . run ( startup )
feeder = fluid . DataFeeder ( place = place , feed_list = [ data , label ] )
pe = fluid . ParallelExecutor (
use_cuda = use_cuda , loss_name = loss . name , main_program = main )
fetch_list = [ ]
all_vars = main . global_block ( ) . vars
for k , v in all_vars . items ( ) :
if ' tmp ' not in k and k [ 0 ] is not ' _ ' or v . persistable :
fetch_list . append ( k )
for data in train_inputs :
ret = pe . run ( fetch_list ,
feed = feeder . feed ( data ) ,
return_numpy = True )
for i in range ( len ( fetch_list ) ) :
assert not math . isnan ( np . sum ( ret [ i ] ) ) and \
not math . isinf ( np . sum ( ret [ i ] ) )
@unittest.skip ( reason = " CI timeout " )
def test_fetch_op ( self ) :
tst_reader = paddle . batch ( flowers . test ( use_xmap = False ) , batch_size = 16 )
tst_reader_iter = tst_reader ( )
iters = 3
train_inputs = [ ]
for i in range ( iters ) :
train_inputs . append ( next ( tst_reader_iter ) )
os . environ [ ' CPU_NUM ' ] = str ( 4 )
if core . is_compiled_with_cuda ( ) :
self . parallel_exe ( train_inputs , seed = 1 , use_cuda = True )
self . parallel_exe ( train_inputs , seed = 1 , use_cuda = False )
class TestFeedParallel ( unittest . TestCase ) :
def parallel_exe ( self , use_cuda , seed ) :
main = fluid . Program ( )
startup = fluid . Program ( )
startup . random_seed = seed
with fluid . scope_guard ( fluid . core . Scope ( ) ) :
with fluid . program_guard ( main , startup ) :
data = fluid . layers . data (
name = ' image ' , shape = [ 3 , 224 , 224 ] , dtype = ' float32 ' )
label = fluid . layers . data (
name = ' label ' , shape = [ 1 ] , dtype = ' int64 ' )
out = Lenet ( data , class_dim = 102 )
loss = fluid . layers . cross_entropy ( input = out , label = label )
loss = fluid . layers . mean ( loss )
opt = fluid . optimizer . Momentum (
learning_rate = 0.1 ,
momentum = 0.9 ,
regularization = fluid . regularizer . L2Decay ( 1e-4 ) )
opt . minimize ( loss )
place = fluid . CUDAPlace ( 0 ) if use_cuda else fluid . CPUPlace ( )
feeder = fluid . DataFeeder ( place = place , feed_list = [ data , label ] )
reader = feeder . decorate_reader (
paddle . batch (
flowers . train ( ) , batch_size = 16 ) , multi_devices = True )
exe = fluid . Executor ( place )
exe . run ( startup )
pe = fluid . ParallelExecutor (
use_cuda = use_cuda , loss_name = loss . name , main_program = main )
use_cuda = use_cuda , loss_name = loss . name , main_program = main_program )
run_parallel_exe ( main_program , pe , use_cuda , data , label , loss )
def run_parallel_exe_with_fetch ( self , main , pe , use_cuda , data , label ,
loss ) :
def get_data ( batch_size = 8 ) :
np . random . seed ( 5 )
while True :
img = np . random . random (
size = [ batch_size , 3 , 224 , 224 ] ) . astype ( np . float32 )
l = ( np . random . random ( size = [ batch_size , 1 ] ) *
10 ) . astype ( np . int64 )
yield img , l
# TODO(zcd): I found that onece the memory optimizer is open,
# parallel_exe doesn't fetch some variable, such as conv2d_0.b_0@GRAD,
# conv2d_1.b_0@GRAD. Those variables should not be pruned.
# fluid.memory_optimize(main)
fetch_list = [ ]
all_vars = main . global_block ( ) . vars
for k , v in all_vars . items ( ) :
if ( ' tmp ' not in k ) and (
k [ 0 ] is not ' _ ' or v . persistable
) and v . type == core . VarDesc . VarType . LOD_TENSOR :
fetch_list . append ( k )
for batch_id , img_label in enumerate ( get_data ( ) ) :
img , l = img_label
train_inputs = { data . name : img , label . name : l }
ret = pe . run ( fetch_list , feed = train_inputs , return_numpy = True )
for i in range ( len ( fetch_list ) ) :
assert not math . isnan ( np . sum ( ret [ i ] ) ) and \
not math . isinf ( np . sum ( ret [ i ] ) )
if batch_id == 2 :
break
def run_parallel_exe_with_feed ( self , main , pe , use_cuda , data , label , loss ) :
def get_data ( batch_size = 8 ) :
np . random . seed ( 5 )
while True :
train_data = [ ]
for _ in range ( batch_size ) :
img = np . random . random (
size = [ 1 , 3 , 224 , 224 ] ) . astype ( np . float32 )
label = ( np . random . random ( size = [ 1 , 1 ] ) *
10 ) . astype ( np . int64 )
train_data . append ( [ img , label ] )
yield train_data
place = fluid . CUDAPlace ( 0 ) if use_cuda else fluid . CPUPlace ( )
feeder = fluid . DataFeeder ( place = place , feed_list = [ data , label ] )
reader = feeder . decorate_reader ( get_data , multi_devices = True )
for batch_id , data in enumerate ( reader ( ) ) :
loss_np = pe . run ( feed = data , fetch_list = [ loss . name ] ) [ 0 ]
@ -140,12 +119,22 @@ class TestFeedParallel(unittest.TestCase):
if batch_id == 2 :
break
@unittest.skip ( reason = " CI timeout " )
def test_feed_op ( self ) :
def test_fetch ( self ) :
os . environ [ ' CPU_NUM ' ] = str ( 4 )
if core . is_compiled_with_cuda ( ) :
self . parallel_exe (
use_cuda = True ,
run_parallel_exe = self . run_parallel_exe_with_fetch )
self . parallel_exe (
use_cuda = False , run_parallel_exe = self . run_parallel_exe_with_fetch )
def test_feed ( self ) :
os . environ [ ' CPU_NUM ' ] = str ( 4 )
if core . is_compiled_with_cuda ( ) :
self . parallel_exe ( use_cuda = True , seed = 1 )
self . parallel_exe ( use_cuda = False , seed = 1 )
self . parallel_exe (
use_cuda = True , run_parallel_exe = self . run_parallel_exe_with_feed )
self . parallel_exe (
use_cuda = False , run_parallel_exe = self . run_parallel_exe_with_feed )
if __name__ == ' __main__ ' :