You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py

528 lines
24 KiB

# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
"""Defination of Server and Worker."""
from . import ps_pb2 as pslib
class Server(object):
"""
A Server basic class
it's a base class, does not have implementation
"""
def __init__(self):
pass
class Worker(object):
"""
A Worker basic class.
it's a base class, does not have implementation
"""
def __init__(self):
pass
class DownpourServer(Server):
"""
DownpourServer class is used to generate server program_desc
Args:
server: it is pslib.ServerParameter()
Examples:
server = DownpourServer()
"""
def __init__(self):
self._server = pslib.ServerParameter()
self._server.downpour_server_param.service_param.server_class = "DownpourBrpcPsServer"
self._server.downpour_server_param.service_param.client_class = "DownpourBrpcPsClient"
self._server.downpour_server_param.service_param.service_class = "DownpourPsService"
self._server.downpour_server_param.service_param.start_server_port = 0
self._server.downpour_server_param.service_param.server_thread_num = 12
def add_sparse_table(self, table_id, strategy):
"""
Args:
table_id(int): id of sparse params table
strategy(dict): the config dict.
Returns:
return None
"""
for table in self._server.downpour_server_param.downpour_table_param:
if table.table_id == table_id:
if table.type == pslib.PS_SPARSE_TABLE:
return
else:
raise ValueError("expect table %s type=%s, but actual type=%s" \
%(table_id, pslib.PS_SPARSE_TABLE, table.type))
if strategy is None:
strategy = dict()
table = self._server.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.type = pslib.PS_SPARSE_TABLE
support_sparse_key_list = ['sparse_table_class', 'sparse_compress_in_save', 'sparse_shard_num', \
'sparse_accessor_class', 'sparse_learning_rate', 'sparse_initial_g2sum', 'sparse_initial_range', \
'sparse_weight_bounds', 'sparse_embedx_dim', 'sparse_embedx_threshold', 'sparse_nonclk_coeff', \
'sparse_click_coeff', 'sparse_base_threshold', 'sparse_delta_threshold', 'sparse_delta_keep_days', \
'sparse_delete_after_unseen_days', 'sparse_show_click_decay_rate', 'sparse_delete_threshold', \
'sparse_converter', 'sparse_deconverter', 'sparse_enable_cache', 'sparse_cache_rate', \
'sparse_cache_file_num', 'sparse_beta1_decay_rate', 'sparse_beta2_decay_rate', \
'sparse_ada_epsilon', 'sparse_optimizer', 'sparse_ssd_unseenday_threshold']
for key in strategy:
if key not in support_sparse_key_list:
raise ValueError("strategy key '%s' not support" % (key))
support_table_calss = ['DownpourSparseTable', 'DownpourSparseSSDTable']
if strategy.get('sparse_table_class') is not None:
table_class = strategy.get('sparse_table_class')
if table_class not in support_table_calss:
raise ValueError(
"support sparse_table_class: [ 'DownpourSparseTable', 'DownpourSparseSSDTable'], \
but actual %s" % (table_class))
else:
table_class = 'DownpourSparseTable'
table.table_class = table_class
if table_class == 'DownpourSparseTable' or table_class == 'DownpourSparseSSDTable':
table.enable_sparse_table_cache = strategy.get(
'sparse_enable_cache', True)
table.sparse_table_cache_rate = strategy.get('sparse_cache_rate',
0.00055)
table.sparse_table_cache_file_num = strategy.get(
'sparse_cache_file_num', 16)
table.compress_in_save = strategy.get('sparse_compress_in_save',
True)
table.shard_num = strategy.get('sparse_shard_num', 1000)
# DownpourFeatureValueAccessor: for ctr task, has cvm, embedding and sgd info
# DownpourCtrAccessor : for ctr task, has cvm, slot, embedding and sgd info
# DownpourSparseValueAccessor : for general task, has embedding and sgd info
# DownpourCtrDoubleAccessor : for ctr task, which show clk are in double
support_accessor_class = [
'DownpourFeatureValueAccessor', 'DownpourCtrAccessor',
'DownpourSparseValueAccessor', 'DownpourCtrDoubleAccessor'
]
if strategy.get('sparse_accessor_class') is not None:
accessor_class = strategy.get('sparse_accessor_class')
if accessor_class not in support_accessor_class:
raise ValueError(
"support sparse_accessor_class: ['DownpourFeatureValueAccessor', 'DownpourCtrAccessor', \
'DownpourSparseValueAccessor', 'DownpourCtrDoubleAccessor'], \
but actual %s" % (accessor_class))
else:
accessor_class = 'DownpourCtrAccessor'
table.accessor.accessor_class = accessor_class
if accessor_class == 'DownpourFeatureValueAccessor' or accessor_class == 'DownpourCtrAccessor' or accessor_class == 'DownpourCtrDoubleAccessor':
table.accessor.sparse_sgd_param.learning_rate = strategy.get(
'sparse_learning_rate', 0.05)
table.accessor.sparse_sgd_param.initial_g2sum = strategy.get(
'sparse_initial_g2sum', 3)
table.accessor.sparse_sgd_param.initial_range = strategy.get(
'sparse_initial_range', 1e-4)
if strategy.get('sparse_weight_bounds') is None:
table.accessor.sparse_sgd_param.weight_bounds.extend(
[-10, 10])
else:
table.accessor.sparse_sgd_param.weight_bounds.extend(
strategy.get('sparse_weight_bounds'))
table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8)
table.accessor.embedx_threshold = strategy.get(
'sparse_embedx_threshold', 10)
table.accessor.fea_dim = int(table.accessor.embedx_dim) + 3
table.accessor.downpour_accessor_param.nonclk_coeff = strategy.get(
'sparse_nonclk_coeff', 0.1)
table.accessor.downpour_accessor_param.click_coeff = strategy.get(
'sparse_click_coeff', 1)
table.accessor.downpour_accessor_param.base_threshold = strategy.get(
'sparse_base_threshold', 1.5)
table.accessor.downpour_accessor_param.delta_threshold = strategy.get(
'sparse_delta_threshold', 0.25)
table.accessor.downpour_accessor_param.delta_keep_days = strategy.get(
'sparse_delta_keep_days', 16)
table.accessor.downpour_accessor_param.delete_after_unseen_days = strategy.get(
'sparse_delete_after_unseen_days', 30)
table.accessor.downpour_accessor_param.ssd_unseenday_threshold = strategy.get(
'sparse_ssd_unseenday_threshold', 1)
table.accessor.downpour_accessor_param.show_click_decay_rate = strategy.get(
'sparse_show_click_decay_rate', 0.98)
table.accessor.downpour_accessor_param.delete_threshold = strategy.get(
'sparse_delete_threshold', 0.8)
converter = strategy.get(
'sparse_converter',
"(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)")
deconverter = strategy.get(
'sparse_deconverter',
"(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)"
)
table1 = table.accessor.table_accessor_save_param.add()
table1.param = 1
table1.converter = converter
table1.deconverter = deconverter
table2 = table.accessor.table_accessor_save_param.add()
table2.param = 2
table2.converter = converter
table2.deconverter = deconverter
elif accessor_class == 'DownpourSparseValueAccessor':
optimizer_name = strategy.get("sparse_optimizer", "adam")
table.accessor.sparse_commonsgd_param.name = optimizer_name
table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8)
table.accessor.fea_dim = int(table.accessor.embedx_dim)
if optimizer_name == "naive":
table.accessor.sparse_commonsgd_param.naive.learning_rate = \
strategy.get('sparse_learning_rate', 0.05)
table.accessor.sparse_commonsgd_param.naive.initial_range = \
strategy.get('sparse_initial_range', 1e-4)
if strategy.get('sparse_weight_bounds') is None:
table.accessor.sparse_commonsgd_param.naive.weight_bounds.extend(
[-10, 10])
else:
table.accessor.sparse_commonsgd_param.naive.weight_bounds.extend(
strategy.get('sparse_weight_bounds'))
elif optimizer_name == "adagrad":
table.accessor.sparse_commonsgd_param.adagrad.learning_rate = \
strategy.get('sparse_learning_rate', 0.05)
table.accessor.sparse_commonsgd_param.adagrad.initial_range = \
strategy.get('sparse_initial_range', 1e-4)
table.accessor.sparse_commonsgd_param.adagrad.initial_g2sum = strategy.get(
'sparse_initial_g2sum', 3)
if strategy.get('sparse_weight_bounds') is None:
table.accessor.sparse_commonsgd_param.adagrad.weight_bounds.extend(
[-10, 10])
else:
table.accessor.sparse_commonsgd_param.adagrad.weight_bounds.extend(
strategy.get('sparse_weight_bounds'))
elif optimizer_name == "adam":
table.accessor.sparse_commonsgd_param.adam.learning_rate = \
strategy.get('sparse_learning_rate', 0.001)
table.accessor.sparse_commonsgd_param.adam.initial_range = \
strategy.get('sparse_initial_range', 1e-4)
table.accessor.sparse_commonsgd_param.adam.beta1_decay_rate = strategy.get(
'sparse_beta1_decay_rate', 0.9)
table.accessor.sparse_commonsgd_param.adam.beta2_decay_rate = strategy.get(
'sparse_beta2_decay_rate', 0.999)
table.accessor.sparse_commonsgd_param.adam.ada_epsilon = strategy.get(
'sparse_ada_epsilon', 1e-8)
if strategy.get('sparse_weight_bounds') is None:
table.accessor.sparse_commonsgd_param.adam.weight_bounds.extend(
[-10, 10])
else:
table.accessor.sparse_commonsgd_param.adam.weight_bounds.extend(
strategy.get('sparse_weight_bounds'))
converter = strategy.get(
'sparse_converter',
"(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)")
deconverter = strategy.get(
'sparse_deconverter',
"(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)"
)
table1 = table.accessor.table_accessor_save_param.add()
table1.param = 1
table1.converter = converter
table1.deconverter = deconverter
table2 = table.accessor.table_accessor_save_param.add()
table2.param = 2
table2.converter = converter
table2.deconverter = deconverter
def add_dense_table(self, table_id, param_var, grad_var, strategy,
sparse_table_names):
"""
Args:
table_id(int): id of sparse params table
param_var(list): param vars
grad_var(list): param grad vars
strategy(dict): the dense config dict
sparse_table_names(list): sparse table names
Returns:
return None
"""
fea_dim = 0
dense_param_vars = []
for p in param_var:
if p.name not in sparse_table_names:
dense_param_vars.append(p)
for param in dense_param_vars:
fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
for table in self._server.downpour_server_param.downpour_table_param:
if table.table_id == table_id:
if table.type == pslib.PS_DENSE_TABLE:
table.accessor.fea_dim = fea_dim
return
else:
raise ValueError("expect table %s type=%s, but actual type=%s" \
%(table_id, pslib.PS_DENSE_TABLE, table.type))
if strategy is None:
strategy = dict()
table = self._server.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
support_dense_key_list = ['dense_table_class', 'dense_compress_in_save', 'dense_accessor_class', \
'dense_optimizer', 'dense_learning_rate', 'dense_avg_decay', 'dense_ada_decay', \
'dense_ada_epsilon', 'dense_mom_decay', 'dense_naive_lr']
for key in strategy:
if key not in support_dense_key_list:
raise ValueError("strategy key '%s' not support" % (key))
table.table_class = strategy.get('dense_table_class',
"DownpourDenseTable")
table.type = pslib.PS_DENSE_TABLE
table.compress_in_save = strategy.get('dense_compress_in_save', True)
table.accessor.accessor_class = strategy.get(
'dense_accessor_class', "DownpourDenseValueAccessor")
table.accessor.dense_sgd_param.name = strategy.get('dense_optimizer',
"adam")
table.accessor.dense_sgd_param.adam.learning_rate = strategy.get(
'dense_learning_rate', 5e-06)
table.accessor.dense_sgd_param.adam.avg_decay_rate = strategy.get(
'dense_avg_decay', 0.999993)
table.accessor.dense_sgd_param.adam.ada_decay_rate = strategy.get(
'dense_ada_decay', 0.9999)
table.accessor.dense_sgd_param.adam.ada_epsilon = strategy.get(
'dense_ada_epsilon', 1e-8)
table.accessor.dense_sgd_param.adam.mom_decay_rate = strategy.get(
'dense_mom_decay', 0.99)
table.accessor.dense_sgd_param.naive.learning_rate = strategy.get(
'dense_naive_lr', 0.0002)
table.accessor.fea_dim = fea_dim
def add_data_norm_table(self, table_id, learning_rate, param_var, grad_var,
strategy, sparse_table_names):
"""
Args:
table_id(int): id of datanorm table
learning_rate(float): the learning rate used to update parameters
param_var(list): param vars
grad_var(list): param grad vars
strategy(dict): the datanorm config dict
sparse_table_names(list): sparse table names
Returns:
return None
"""
fea_dim = 0
dense_param_vars = []
for p in param_var:
if p.name not in sparse_table_names:
dense_param_vars.append(p)
for param in dense_param_vars:
fea_dim += reduce(lambda x, y: x * y, param.shape, 1)
for table in self._server.downpour_server_param.downpour_table_param:
if table.table_id == table_id:
if table.type == pslib.PS_DENSE_TABLE:
table.accessor.fea_dim = fea_dim
return
else:
raise ValueError("expect table %s type=%s, but actual type=%s" \
%(table_id, pslib.PS_DENSE_TABLE, table.type))
if strategy is None:
strategy = dict()
support_datanorm_key_list = ['datanorm_table_class', 'datanorm_compress_in_save',\
'datanorm_accessor_class', 'datanorm_operation', 'datanorm_decay_rate']
for key in strategy:
if key not in support_datanorm_key_list:
raise ValueError("strategy key '%s' not support" % (key))
table = self._server.downpour_server_param.downpour_table_param.add()
table.table_id = table_id
table.table_class = strategy.get('datanorm_table_class',
'DownpourDenseTable')
table.type = pslib.PS_DENSE_TABLE
table.compress_in_save = strategy.get('datanorm_compress_in_save', True)
table.accessor.accessor_class = strategy.get(
'datanorm_accessor_class', 'DownpourDenseValueAccessor')
table.accessor.dense_sgd_param.name = strategy.get('datanorm_operation',
'summary')
table.accessor.dense_sgd_param.summary.summary_decay_rate = strategy.get(
'datanorm_decay_rate', 0.999999)
table.accessor.fea_dim = fea_dim
def get_desc(self):
"""
Return downpour server program_desc
"""
return self._server
class DownpourWorker(Worker):
"""
DownpourWorker class is used to generate worker program_desc
Args:
window (int): push params frequency
worker: it is pslib.DownpourTrainerParameter
Examples:
worker = DownpourWorker(1)
"""
def __init__(self, window):
self.window = window
self._worker = pslib.DownpourTrainerParameter()
def add_sparse_table(self,
table_id,
slot_key_vars,
slot_value_vars,
slot_value_grads=None):
"""
Args:
table_id(int): id of sparse params table
slot_key_vars(list): slot key id
slot_value_vars(list): slot key value after embedding
slot_value_grads(list): grad of all params, default is None
Returns:
return None
"""
if slot_value_grads is None:
slot_value_grad_names = \
[var.name + "@GRAD" for var in slot_value_vars]
else:
value_to_key = {}
for i in range(len(slot_key_vars)):
value_to_key[slot_value_vars[i].name] = slot_key_vars[i]
slot_value_grad_names = []
all_grad_names = [var.name for var in slot_value_grads]
for var in slot_value_vars:
if var.name + "@GRAD" in all_grad_names:
slot_value_grad_names.append(var.name + "@GRAD")
sorted_slot_value_vars = [i for i in slot_value_vars if \
i.name + "@GRAD" in slot_value_grad_names]
sorted_slot_value_vars += [i for i in slot_value_vars if \
i.name + "@GRAD" not in slot_value_grad_names]
sorted_slot_key_vars = \
[value_to_key[v.name] for v in sorted_slot_value_vars]
target_table = None
for table in self._worker.sparse_table:
if table.table_id == table_id:
keys = table.slot_key
key_names = [var.name for var in sorted_slot_key_vars]
for key_name in key_names:
if key_name not in keys:
raise ValueError("sparse table %s slot_key error" %
table_id)
target_table = table
break
table = target_table
if table is not None:
self._worker.sparse_table.remove(table)
table = self._worker.sparse_table.add()
table.table_id = table_id
table.slot_key.extend([var.name for var in sorted_slot_key_vars])
table.slot_value.extend([var.name for var in sorted_slot_value_vars])
table.slot_gradient.extend(slot_value_grad_names)
def add_dense_table(self, table_id, learning_rate, param_vars, grad_vars,
dense_start_table_id, sparse_table_names):
"""
Args:
table_id(int): id of sparse params table
learning_rate(float): the learning rate used to update parameters. \
Can be a float value
param_vars(list): all dense param. it is a list.
grad_vars(list): all dense grad parm it is a list.
dense_start_table_id(int): dense table start index
sparse_table_names(list): sparse table names
Returns:
return None
"""
sparse_table_name_grad = []
for name in sparse_table_names:
sparse_table_name_grad.append(name + "@GRAD")
dense_param_name = []
for p in param_vars:
if p.name not in sparse_table_names:
dense_param_name.append(p.name)
dense_grad_name = []
for g in grad_vars:
if g.name not in sparse_table_name_grad:
dense_grad_name.append(g.name)
dense_param_name.sort()
dense_grad_name.sort()
for table in self._worker.dense_table:
if table.table_id == table_id:
desc_dense_param_name = list(table.dense_variable_name)
desc_dense_param_name.sort()
if dense_param_name == desc_dense_param_name:
desc_dense_grad_name = list(
table.dense_gradient_variable_name)
desc_dense_grad_name.sort()
if dense_grad_name == desc_dense_grad_name:
return
else:
raise ValueError(
"dense table %s dense_gradient_variable_name "
"error" % table_id)
else:
raise ValueError(
"dense table %s dense_variable_name error" % table_id)
table = self._worker.dense_table.add()
table.table_id = table_id
#def cmp_fc(x, y):
# if x.startswith("fc_") and y.startswith("fc_"):
# index_x = x.find('.')
# index_y = y.find('.')
# if index_x > 0 and index_y > 0:
# num_x = x[3:index_x]
# num_y = y[3:index_y]
# if num_x.isdigit() and num_y.isdigit():
# if int(num_x) < int(num_y):
# return -1
# if int(num_x) > int(num_y):
# return 1
# if x[index_x + 1] == 'w' and y[index_y + 1] == 'b':
# return -1
# if x[index_x + 1] == 'b' and y[index_y + 1] == 'w':
# return 1
# if x < y:
# return -1
# else:
# return 1
#table.dense_variable_name.extend(sorted(dense_param_name, cmp_fc))
#table.dense_gradient_variable_name.extend(
# sorted(dense_grad_name, cmp_fc))
table.dense_variable_name.extend(dense_param_name)
table.dense_gradient_variable_name.extend(dense_grad_name)
def get_desc(self):
"""
Return downpour worker program_desc
"""
return self._worker