|
|
|
@ -17,7 +17,6 @@ import os
|
|
|
|
|
import six
|
|
|
|
|
import sys
|
|
|
|
|
from .. import compat as cpt
|
|
|
|
|
from . import framework
|
|
|
|
|
|
|
|
|
|
from . import core
|
|
|
|
|
from . import framework
|
|
|
|
@ -36,6 +35,30 @@ def _place_obj(place):
|
|
|
|
|
return p
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _is_pserver_mode(main_program):
|
|
|
|
|
main = main_program if main_program \
|
|
|
|
|
else default_main_program()
|
|
|
|
|
for op in main.global_block().ops:
|
|
|
|
|
if op.type in ["send", "recv"]:
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_available_places(use_cuda):
|
|
|
|
|
if use_cuda:
|
|
|
|
|
gpus_env = os.getenv("FLAGS_selected_gpus")
|
|
|
|
|
if gpus_env:
|
|
|
|
|
gpus = [int(s) for s in gpus_env.split(",")]
|
|
|
|
|
else:
|
|
|
|
|
gpus = [i for i in six.moves.range(core.get_cuda_device_count())]
|
|
|
|
|
places = [core.CUDAPlace(i) for i in gpus]
|
|
|
|
|
else:
|
|
|
|
|
cpu_num = int(os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
|
|
|
|
|
places = [core.CPUPlace() for _ in six.moves.range(cpu_num)]
|
|
|
|
|
assert places, "no place for execution"
|
|
|
|
|
return places
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CompiledProgram(object):
|
|
|
|
|
"""
|
|
|
|
|
Compiles to Graph for execution.
|
|
|
|
@ -127,8 +150,7 @@ class CompiledProgram(object):
|
|
|
|
|
self._exec_strategy = ExecutionStrategy()
|
|
|
|
|
if self._build_strategy is None:
|
|
|
|
|
self._build_strategy = BuildStrategy()
|
|
|
|
|
self._build_strategy.is_distribution = framework.is_pserver_mode(
|
|
|
|
|
self._program)
|
|
|
|
|
self._build_strategy.is_distribution = _is_pserver_mode(self._program)
|
|
|
|
|
return self
|
|
|
|
|
|
|
|
|
|
def with_inference_optimize(self, config):
|
|
|
|
@ -153,9 +175,9 @@ class CompiledProgram(object):
|
|
|
|
|
def _with_distributed(self):
|
|
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
|
|
|
|
def _compile_data_parallel(self):
|
|
|
|
|
def _compile_data_parallel(self, use_cuda=False, scope=None):
|
|
|
|
|
if self._share_vars_from:
|
|
|
|
|
if self._scope:
|
|
|
|
|
if scope:
|
|
|
|
|
sys.stderr.write("share_vars_from is set, scope is ignored.\n")
|
|
|
|
|
if not self._share_vars_from._is_data_parallel:
|
|
|
|
|
raise ValueError("share_vars_from is not data parallel. Cannot "
|
|
|
|
@ -166,23 +188,11 @@ class CompiledProgram(object):
|
|
|
|
|
"var to share.")
|
|
|
|
|
self._local_scopes = self._share_vars_from._executor.local_scopes()
|
|
|
|
|
else:
|
|
|
|
|
assert scope is not None, ""
|
|
|
|
|
self._local_scopes = []
|
|
|
|
|
|
|
|
|
|
self._exec_strategy.use_cuda = isinstance(self._place, core.CUDAPlace)
|
|
|
|
|
if self._exec_strategy.use_cuda:
|
|
|
|
|
gpus_env = os.getenv("FLAGS_selected_gpus")
|
|
|
|
|
if gpus_env:
|
|
|
|
|
gpus = [int(s) for s in gpus_env.split(",")]
|
|
|
|
|
else:
|
|
|
|
|
gpus = [
|
|
|
|
|
i for i in six.moves.range(core.get_cuda_device_count())
|
|
|
|
|
]
|
|
|
|
|
self._places = [core.CUDAPlace(i) for i in gpus]
|
|
|
|
|
else:
|
|
|
|
|
cpu_num = int(
|
|
|
|
|
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
|
|
|
|
|
self._places = [core.CPUPlace() for _ in six.moves.range(cpu_num)]
|
|
|
|
|
assert self._places, "no place for execution"
|
|
|
|
|
self._exec_strategy.use_cuda = use_cuda
|
|
|
|
|
self._places = get_available_places(self._exec_strategy.use_cuda)
|
|
|
|
|
|
|
|
|
|
if self._exec_strategy.num_threads == 0:
|
|
|
|
|
if self._exec_strategy.use_cuda:
|
|
|
|
@ -197,9 +207,11 @@ class CompiledProgram(object):
|
|
|
|
|
# FIXME(dzhwinter): enable_inplace should be after memory_optimize
|
|
|
|
|
# if turn on python memory optimize, turn off the inplace_pass.
|
|
|
|
|
if self._build_strategy.memory_optimize is None:
|
|
|
|
|
self._build_strategy.memory_optimize = False if self._program and self._program._is_mem_optimized else True
|
|
|
|
|
self._build_strategy.memory_optimize = False \
|
|
|
|
|
if self._program and self._program._is_mem_optimized else True
|
|
|
|
|
if self._build_strategy.enable_inplace is None:
|
|
|
|
|
self._build_strategy.enable_inplace = False if self._program and self._program._is_mem_optimized else True
|
|
|
|
|
self._build_strategy.enable_inplace = False \
|
|
|
|
|
if self._program and self._program._is_mem_optimized else True
|
|
|
|
|
|
|
|
|
|
# TODO(wuyi): trainer endpoings should be passed in through
|
|
|
|
|
# build_strategy, not program.xxx.
|
|
|
|
@ -221,12 +233,12 @@ class CompiledProgram(object):
|
|
|
|
|
|
|
|
|
|
places = list(map(_place_obj, self._places))
|
|
|
|
|
|
|
|
|
|
return core.ParallelExecutor(
|
|
|
|
|
places,
|
|
|
|
|
set(self._persistable_vars),
|
|
|
|
|
cpt.to_text(self._loss_name)
|
|
|
|
|
if self._loss_name else six.u(''), self._scope, self._local_scopes,
|
|
|
|
|
self._exec_strategy, self._build_strategy, self._graph)
|
|
|
|
|
return core.ParallelExecutor(places,
|
|
|
|
|
set(self._persistable_vars),
|
|
|
|
|
cpt.to_text(self._loss_name)
|
|
|
|
|
if self._loss_name else six.u(''), scope,
|
|
|
|
|
self._local_scopes, self._exec_strategy,
|
|
|
|
|
self._build_strategy, self._graph)
|
|
|
|
|
|
|
|
|
|
def _compile_inference(self):
|
|
|
|
|
return core.create_paddle_predictor(self._infer_config)
|
|
|
|
@ -253,7 +265,9 @@ class CompiledProgram(object):
|
|
|
|
|
self._scope = scope
|
|
|
|
|
self._place = place
|
|
|
|
|
if self._is_data_parallel:
|
|
|
|
|
self._executor = self._compile_data_parallel()
|
|
|
|
|
self._executor = self._compile_data_parallel(
|
|
|
|
|
use_cuda=isinstance(self._place, core.CUDAPlace),
|
|
|
|
|
scope=self._scope)
|
|
|
|
|
elif self._is_inference:
|
|
|
|
|
self._executor = self._compile_inference()
|
|
|
|
|
else:
|
|
|
|
|