You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
434 lines
18 KiB
434 lines
18 KiB
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Defination of device workers."""
|
|
|
|
from __future__ import print_function
|
|
|
|
__all__ = [
|
|
'DeviceWorker', 'Hogwild', 'DownpourSGD', 'Section', 'DownpourSGDOPT'
|
|
]
|
|
|
|
|
|
class DeviceWorker(object):
|
|
"""
|
|
DeviceWorker is an abstract class, which generates worker desc.
|
|
This class is an inner class that we do computation logics within
|
|
the implementation. For example, execution of a program or a graph.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Init."""
|
|
self._program = None
|
|
self._infer = None
|
|
|
|
def _set_infer(self, infer=False):
|
|
"""
|
|
set inference flag for current device worker
|
|
|
|
Args:
|
|
infer(bool): whether to do inference
|
|
"""
|
|
self._infer = infer
|
|
|
|
def _set_fleet_desc(self, fleet_desc):
|
|
"""
|
|
Set fleet desc.
|
|
|
|
Args:
|
|
fleet_desc(PSParameter): pslib.PSParameter object
|
|
"""
|
|
self._fleet_desc = fleet_desc
|
|
|
|
def _set_program(self, program):
|
|
"""
|
|
Set program.
|
|
|
|
Args:
|
|
program(Program): a Program object
|
|
"""
|
|
self._program = program
|
|
|
|
def _gen_worker_desc(self, trainer_desc):
|
|
"""
|
|
Generator worker desc.
|
|
|
|
Args:
|
|
trainer_desc(TrainerDesc): a TrainerDesc object
|
|
"""
|
|
raise NotImplementedError(
|
|
"DeviceWorker does not implement gen_worker_desc, "
|
|
"please use Hogwild or DownpourSGD, etc.")
|
|
|
|
|
|
class Hogwild(DeviceWorker):
|
|
"""
|
|
Hogwild is a kind of SGD algorithm.
|
|
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""Init."""
|
|
super(Hogwild, self).__init__()
|
|
|
|
def _gen_worker_desc(self, trainer_desc):
|
|
"""
|
|
Generator worker desc, which device worker is HogwildWorker.
|
|
|
|
Args:
|
|
trainer_desc(TrainerDesc): a TrainerDesc object
|
|
"""
|
|
trainer_desc.device_worker_name = "HogwildWorker"
|
|
if self._infer:
|
|
# just ignore feed op for inference model
|
|
trainer_desc.hogwild_param.skip_ops.extend(["feed"])
|
|
|
|
dense_table_set = set()
|
|
program_id = str(id(self._program))
|
|
if self._program == None:
|
|
print("program of current device worker is not configured")
|
|
exit(-1)
|
|
opt_info = self._program._fleet_opt
|
|
# when opt_info is None or empty dict, it should return
|
|
if not opt_info:
|
|
return
|
|
|
|
from paddle.fluid.incubate.fleet.parameter_server import version
|
|
|
|
if version.is_transpiler() and "fleet_desc" not in opt_info:
|
|
return
|
|
|
|
program_configs = opt_info["program_configs"]
|
|
downpour = trainer_desc.downpour_param
|
|
hogwild = trainer_desc.hogwild_param
|
|
|
|
for pid in program_configs:
|
|
if pid == program_id:
|
|
pc = downpour.program_config.add()
|
|
pc.program_id = program_id
|
|
for i in program_configs[program_id]["push_sparse"]:
|
|
pc.push_sparse_table_id.extend([i])
|
|
for i in program_configs[program_id]["push_dense"]:
|
|
pc.push_dense_table_id.extend([i])
|
|
dense_table_set.add(i)
|
|
for i in program_configs[program_id]["pull_sparse"]:
|
|
pc.pull_sparse_table_id.extend([i])
|
|
for i in program_configs[program_id]["pull_dense"]:
|
|
pc.pull_dense_table_id.extend([i])
|
|
dense_table_set.add(i)
|
|
break
|
|
|
|
trainer_desc.device_worker_name = "HogwildWorker"
|
|
pull_thread = trainer_desc.pull_dense_param
|
|
pull_thread.device_num = trainer_desc.thread_num
|
|
if opt_info.get("program_id_to_worker") is None:
|
|
raise ValueError("opt_info must have program_id_to_worker")
|
|
prog_id_to_worker = opt_info["program_id_to_worker"]
|
|
if prog_id_to_worker.get(program_id) is None:
|
|
raise ValueError("%s not found in program_id_to_worker" %
|
|
program_id)
|
|
worker = opt_info["program_id_to_worker"][program_id]
|
|
for i in worker.get_desc().dense_table:
|
|
if i.table_id in dense_table_set:
|
|
dense_table = pull_thread.dense_table.add()
|
|
dense_table.dense_value_name.extend(i.dense_variable_name)
|
|
dense_table.table_id = \
|
|
i.table_id
|
|
sparse_len = len(worker.get_desc().sparse_table)
|
|
for i in range(sparse_len):
|
|
sparse_table = downpour.sparse_table.add()
|
|
sparse_table.table_id = worker.get_desc().sparse_table[i].table_id
|
|
sparse_table.sparse_key_name.extend(worker.get_desc().sparse_table[
|
|
i].slot_key)
|
|
sparse_table.sparse_value_name.extend(worker.get_desc()
|
|
.sparse_table[i].slot_value)
|
|
sparse_table.sparse_grad_name.extend(worker.get_desc().sparse_table[
|
|
i].slot_gradient)
|
|
sparse_table.fea_dim = \
|
|
self._fleet_desc.server_param.downpour_server_param.downpour_table_param[
|
|
i].accessor.fea_dim
|
|
# not use emb_dim
|
|
sparse_table.emb_dim = -1
|
|
# not use hard code click
|
|
sparse_table.label_var_name = ""
|
|
if opt_info["stat_var_names"]:
|
|
for i in opt_info["stat_var_names"]:
|
|
hogwild.stat_var_names.extend([i])
|
|
downpour.stat_var_names.extend([i])
|
|
|
|
for i in worker.get_desc().dense_table:
|
|
if i.table_id in dense_table_set:
|
|
dense_table = downpour.dense_table.add()
|
|
dense_table.table_id = i.table_id
|
|
dense_table.dense_value_name.extend(i.dense_variable_name)
|
|
dense_table.dense_grad_name.extend(
|
|
i.dense_gradient_variable_name)
|
|
hogwild.skip_ops.extend(worker.get_desc().skip_op)
|
|
if self._infer:
|
|
hogwild.skip_ops.extend(
|
|
["push_sparse", "push_sparse_v2", "push_dense"])
|
|
|
|
|
|
class DownpourSGD(DeviceWorker):
|
|
"""
|
|
DownpourSGD is a kind of distributed SGD algorithm.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""
|
|
Init.
|
|
initialize downpourSGD device worker
|
|
"""
|
|
super(DownpourSGD, self).__init__()
|
|
|
|
def _gen_worker_desc(self, trainer_desc):
|
|
"""
|
|
Generator worker desc, which device worker is DownpourWorker.
|
|
|
|
Args:
|
|
trainer_desc(TrainerDesc): a TrainerDesc object
|
|
"""
|
|
dense_table_set = set()
|
|
program_id = str(id(self._program))
|
|
if self._program == None:
|
|
print("program of current device worker is not configured")
|
|
exit(-1)
|
|
opt_info = self._program._fleet_opt
|
|
program_configs = opt_info["program_configs"]
|
|
downpour = trainer_desc.downpour_param
|
|
|
|
for pid in program_configs:
|
|
if pid == program_id:
|
|
pc = downpour.program_config.add()
|
|
pc.program_id = program_id
|
|
for i in program_configs[program_id]["push_sparse"]:
|
|
pc.push_sparse_table_id.extend([i])
|
|
for i in program_configs[program_id]["push_dense"]:
|
|
pc.push_dense_table_id.extend([i])
|
|
dense_table_set.add(i)
|
|
for i in program_configs[program_id]["pull_sparse"]:
|
|
pc.pull_sparse_table_id.extend([i])
|
|
for i in program_configs[program_id]["pull_dense"]:
|
|
pc.pull_dense_table_id.extend([i])
|
|
dense_table_set.add(i)
|
|
break
|
|
|
|
trainer_desc.device_worker_name = opt_info.get("worker_class",
|
|
"DownpourWorker")
|
|
pull_thread = trainer_desc.pull_dense_param
|
|
pull_thread.device_num = trainer_desc.thread_num
|
|
if opt_info.get("program_id_to_worker") is None:
|
|
raise ValueError("opt_info must have program_id_to_worker")
|
|
prog_id_to_worker = opt_info["program_id_to_worker"]
|
|
if prog_id_to_worker.get(program_id) is None:
|
|
raise ValueError("%s not found in program_id_to_worker" %
|
|
program_id)
|
|
worker = opt_info["program_id_to_worker"][program_id]
|
|
for i in worker.get_desc().dense_table:
|
|
if i.table_id in dense_table_set:
|
|
dense_table = pull_thread.dense_table.add()
|
|
dense_table.dense_value_name.extend(i.dense_variable_name)
|
|
dense_table.table_id = \
|
|
i.table_id
|
|
sparse_len = len(worker.get_desc().sparse_table)
|
|
for i in range(sparse_len):
|
|
sparse_table = downpour.sparse_table.add()
|
|
sparse_table.table_id = worker.get_desc().sparse_table[i].table_id
|
|
sparse_table.sparse_key_name.extend(worker.get_desc().sparse_table[
|
|
i].slot_key)
|
|
sparse_table.sparse_value_name.extend(worker.get_desc()
|
|
.sparse_table[i].slot_value)
|
|
sparse_table.sparse_grad_name.extend(worker.get_desc().sparse_table[
|
|
i].slot_gradient)
|
|
if opt_info["use_cvm"] or "no_cvm" in opt_info and opt_info[
|
|
"no_cvm"] == True:
|
|
sparse_table.emb_dim = \
|
|
self._fleet_desc.server_param.downpour_server_param.downpour_table_param[
|
|
i].accessor.fea_dim
|
|
sparse_table.fea_dim = sparse_table.emb_dim
|
|
else:
|
|
sparse_table.emb_dim = \
|
|
self._fleet_desc.server_param.downpour_server_param.downpour_table_param[
|
|
i].accessor.fea_dim - 2
|
|
sparse_table.fea_dim = sparse_table.emb_dim + 2
|
|
# TODO(guru4elephant): hard code here, need to improve
|
|
sparse_table.label_var_name = "click"
|
|
if opt_info["stat_var_names"]:
|
|
for i in opt_info["stat_var_names"]:
|
|
downpour.stat_var_names.extend([i])
|
|
|
|
for i in worker.get_desc().dense_table:
|
|
if i.table_id in dense_table_set:
|
|
dense_table = downpour.dense_table.add()
|
|
dense_table.table_id = i.table_id
|
|
dense_table.dense_value_name.extend(i.dense_variable_name)
|
|
dense_table.dense_grad_name.extend(
|
|
i.dense_gradient_variable_name)
|
|
downpour.skip_ops.extend(worker.get_desc().skip_op)
|
|
if self._infer:
|
|
downpour.push_dense = False
|
|
downpour.push_sparse = False
|
|
|
|
|
|
class DownpourSGDOPT(DeviceWorker):
|
|
"""
|
|
DownpourSGDOPT is a kind of distributed SGD algorithm.
|
|
"""
|
|
|
|
def __init__(self):
|
|
"""
|
|
Init.
|
|
initialize downpourSGDOPT device worker
|
|
"""
|
|
super(DownpourSGDOPT, self).__init__()
|
|
|
|
def _gen_worker_desc(self, trainer_desc):
|
|
"""
|
|
Generator worker desc, which device worker is DownpourWorker.
|
|
|
|
Args:
|
|
trainer_desc(TrainerDesc): a TrainerDesc object
|
|
"""
|
|
dense_table_set = set()
|
|
program_id = str(id(self._program))
|
|
if self._program == None:
|
|
print("program of current device worker is not configured")
|
|
exit(-1)
|
|
opt_info = self._program._fleet_opt
|
|
program_configs = opt_info["program_configs"]
|
|
downpour = trainer_desc.downpour_param
|
|
|
|
for pid in program_configs:
|
|
if pid == program_id:
|
|
pc = downpour.program_config.add()
|
|
pc.program_id = program_id
|
|
for i in program_configs[program_id]["push_sparse"]:
|
|
pc.push_sparse_table_id.extend([i])
|
|
for i in program_configs[program_id]["push_dense"]:
|
|
pc.push_dense_table_id.extend([i])
|
|
dense_table_set.add(i)
|
|
for i in program_configs[program_id]["pull_sparse"]:
|
|
pc.pull_sparse_table_id.extend([i])
|
|
for i in program_configs[program_id]["pull_dense"]:
|
|
pc.pull_dense_table_id.extend([i])
|
|
dense_table_set.add(i)
|
|
break
|
|
|
|
trainer_desc.device_worker_name = "DownpourWorkerOpt"
|
|
pull_thread = trainer_desc.pull_dense_param
|
|
pull_thread.device_num = trainer_desc.thread_num
|
|
if opt_info.get("program_id_to_worker") is None:
|
|
raise ValueError("opt_info must have program_id_to_worker")
|
|
prog_id_to_worker = opt_info["program_id_to_worker"]
|
|
if prog_id_to_worker.get(program_id) is None:
|
|
raise ValueError("%s not found in program_id_to_worker" %
|
|
program_id)
|
|
worker = opt_info["program_id_to_worker"][program_id]
|
|
for i in worker.get_desc().dense_table:
|
|
if i.table_id in dense_table_set:
|
|
dense_table = pull_thread.dense_table.add()
|
|
dense_table.dense_value_name.extend(i.dense_variable_name)
|
|
dense_table.table_id = \
|
|
i.table_id
|
|
sparse_len = len(worker.get_desc().sparse_table)
|
|
for i in range(sparse_len):
|
|
sparse_table = downpour.sparse_table.add()
|
|
sparse_table.table_id = worker.get_desc().sparse_table[i].table_id
|
|
sparse_table.sparse_key_name.extend(worker.get_desc().sparse_table[
|
|
i].slot_key)
|
|
sparse_table.sparse_value_name.extend(worker.get_desc()
|
|
.sparse_table[i].slot_value)
|
|
sparse_table.sparse_grad_name.extend(worker.get_desc().sparse_table[
|
|
i].slot_gradient)
|
|
if opt_info["use_cvm"] or "no_cvm" in opt_info and opt_info[
|
|
"no_cvm"] == True:
|
|
sparse_table.emb_dim = \
|
|
self._fleet_desc.server_param.downpour_server_param.downpour_table_param[
|
|
i].accessor.fea_dim
|
|
sparse_table.fea_dim = sparse_table.emb_dim
|
|
else:
|
|
sparse_table.emb_dim = \
|
|
self._fleet_desc.server_param.downpour_server_param.downpour_table_param[
|
|
i].accessor.fea_dim - 2
|
|
sparse_table.fea_dim = sparse_table.emb_dim + 2
|
|
# TODO(guru4elephant): hard code here, need to improve
|
|
sparse_table.label_var_name = "click"
|
|
if "local_tables" in opt_info and sparse_table.table_id in opt_info[
|
|
"local_tables"]:
|
|
sparse_table.is_local = True
|
|
if "async_tables" in opt_info and sparse_table.table_id in opt_info[
|
|
"async_tables"]:
|
|
sparse_table.is_async = True
|
|
if opt_info["stat_var_names"]:
|
|
for i in opt_info["stat_var_names"]:
|
|
downpour.stat_var_names.extend([i])
|
|
|
|
for i in worker.get_desc().dense_table:
|
|
if i.table_id in dense_table_set:
|
|
dense_table = downpour.dense_table.add()
|
|
dense_table.table_id = i.table_id
|
|
dense_table.dense_value_name.extend(i.dense_variable_name)
|
|
dense_table.dense_grad_name.extend(
|
|
i.dense_gradient_variable_name)
|
|
downpour.skip_ops.extend(worker.get_desc().skip_op)
|
|
if self._infer:
|
|
downpour.push_dense = False
|
|
downpour.push_sparse = False
|
|
|
|
|
|
class Section(DeviceWorker):
|
|
"""SectionWorker."""
|
|
|
|
def __init__(self):
|
|
"""Init."""
|
|
super(Section, self).__init__()
|
|
|
|
def _gen_worker_desc(self, trainer_desc):
|
|
"""
|
|
Generator worker desc, which device worker is SectionWorker.
|
|
Args:
|
|
trainer_desc(TrainerDesc): a TrainerDesc object
|
|
"""
|
|
from google.protobuf import text_format
|
|
from . import core
|
|
trainer_desc.device_worker_name = "SectionWorker"
|
|
pipeline_opt = self._program._pipeline_opt
|
|
section_param = trainer_desc.section_param
|
|
section_param.num_microbatches = pipeline_opt["num_microbatches"]
|
|
section_param.start_cpu_core_id = pipeline_opt["start_cpu_core_id"]
|
|
for i, program in enumerate(pipeline_opt["section_program_list"]):
|
|
cfg = section_param.section_config.add()
|
|
cfg.program_desc.ParseFromString(program["program"]._get_desc()
|
|
.serialize_to_string())
|
|
# TODO: why does not work
|
|
# cfg.program_desc.CopyFrom(program.program._get_desc())
|
|
place = pipeline_opt["place_list"][i]
|
|
place_id = pipeline_opt["place_id_list"][i]
|
|
if isinstance(place, core.CPUPlace):
|
|
cfg.place = cfg.CPUPlace
|
|
elif isinstance(place, core.CUDAPlace):
|
|
cfg.place = cfg.CUDAPlace
|
|
elif isinstance(place, core.CUDAPinnedPlace):
|
|
cfg.place = cfg.CUDAPinnedPlace
|
|
else:
|
|
raise NotImplementedError(
|
|
"SectionWorker only supports CPUPlace, CUDAPlace and CUDAPinnedPlace now."
|
|
)
|
|
cfg.place_id = place_id
|
|
|
|
|
|
class DeviceWorkerFactory(object):
|
|
def _create_device_worker(self, worker_type):
|
|
classname = worker_type.capitalize()
|
|
return globals()[classname]()
|