|
|
|
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
|
|
|
|
from paddle.fluid.proto import data_feed_pb2
|
|
|
|
from google.protobuf import text_format
|
|
|
|
from . import core
|
|
|
|
__all__ = ['DatasetFactory', 'InMemoryDataset', 'QueueDataset']
|
|
|
|
|
|
|
|
|
|
|
|
class DatasetFactory(object):
|
|
|
|
"""
|
|
|
|
DatasetFactory is a factory which create dataset by its name,
|
|
|
|
you can create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset",
|
|
|
|
the default is "QueueDataset".
|
|
|
|
|
|
|
|
Example:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self):
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
""" Init. """
|
|
|
|
pass
|
|
|
|
|
|
|
|
def create_dataset(self, datafeed_class="QueueDataset"):
|
|
|
|
"""
|
|
|
|
Create "QueueDataset" or "InMemoryDataset", or "FileInstantDataset",
|
|
|
|
the default is "QueueDataset".
|
|
|
|
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
Args:
|
|
|
|
datafeed_class(str): datafeed class name, QueueDataset or InMemoryDataset.
|
|
|
|
Default is QueueDataset.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset()
|
|
|
|
|
|
|
|
"""
|
|
|
|
try:
|
|
|
|
dataset = globals()[datafeed_class]()
|
|
|
|
return dataset
|
|
|
|
except:
|
|
|
|
raise ValueError("datafeed class %s does not exist" %
|
|
|
|
datafeed_class)
|
|
|
|
|
|
|
|
|
|
|
|
class DatasetBase(object):
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
""" Base dataset class. """
|
|
|
|
|
|
|
|
def __init__(self):
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
""" Init. """
|
|
|
|
# define class name here
|
|
|
|
# to decide whether we need create in memory instance
|
|
|
|
self.proto_desc = data_feed_pb2.DataFeedDesc()
|
|
|
|
self.proto_desc.pipe_command = "cat"
|
|
|
|
self.dataset = core.Dataset("MultiSlotDataset")
|
|
|
|
self.thread_num = 0
|
|
|
|
self.filelist = []
|
|
|
|
|
|
|
|
def set_pipe_command(self, pipe_command):
|
|
|
|
"""
|
|
|
|
Set pipe command of current dataset
|
|
|
|
A pipe command is a UNIX pipeline command that can be used only
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset()
|
|
|
|
dataset.set_pipe_command("python my_script.py")
|
|
|
|
|
|
|
|
Args:
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
pipe_command(str): pipe command
|
|
|
|
|
|
|
|
"""
|
|
|
|
self.proto_desc.pipe_command = pipe_command
|
|
|
|
|
|
|
|
def set_fea_eval(self, record_candidate_size, fea_eval=True):
|
|
|
|
"""
|
|
|
|
set fea eval mode for slots shuffle to debug the importance level of
|
|
|
|
slots(features), fea_eval need to be set True for slots shuffle.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
record_candidate_size(int): size of instances candidate to shuffle
|
|
|
|
one slot
|
|
|
|
fea_eval(bool): wheather enable fea eval mode to enable slots shuffle.
|
|
|
|
default is True.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
dataset.set_fea_eval(1000000, True)
|
|
|
|
|
|
|
|
"""
|
|
|
|
if fea_eval:
|
|
|
|
self.dataset.set_fea_eval(fea_eval, record_candidate_size)
|
|
|
|
self.fea_eval = fea_eval
|
|
|
|
|
|
|
|
def slots_shuffle(self, slots):
|
|
|
|
"""
|
|
|
|
Slots Shuffle
|
|
|
|
Slots Shuffle is a shuffle method in slots level, which is usually used
|
|
|
|
in sparse feature with large scale of instances. To compare the metric, i.e.
|
|
|
|
auc while doing slots shuffle on one or several slots with baseline to
|
|
|
|
evaluate the importance level of slots(features).
|
|
|
|
|
|
|
|
Args:
|
|
|
|
slots(list[string]): the set of slots(string) to do slots shuffle.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
dataset.set_merge_by_lineid()
|
|
|
|
#suppose there is a slot 0
|
|
|
|
dataset.slots_shuffle(['0'])
|
|
|
|
"""
|
|
|
|
if self.fea_eval:
|
|
|
|
slots_set = set(slots)
|
|
|
|
self.dataset.slots_shuffle(slots_set)
|
|
|
|
|
|
|
|
def set_batch_size(self, batch_size):
|
|
|
|
"""
|
|
|
|
Set batch size. Will be effective during training
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset()
|
|
|
|
dataset.set_batch_size(128)
|
|
|
|
|
|
|
|
Args:
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
batch_size(int): batch size
|
|
|
|
|
|
|
|
"""
|
|
|
|
self.proto_desc.batch_size = batch_size
|
|
|
|
|
|
|
|
def set_thread(self, thread_num):
|
|
|
|
"""
|
|
|
|
Set thread num, it is the num of readers.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset()
|
|
|
|
dataset.set_thread(12)
|
|
|
|
|
|
|
|
Args:
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
thread_num(int): thread num
|
|
|
|
"""
|
|
|
|
self.dataset.set_thread_num(thread_num)
|
|
|
|
self.thread_num = thread_num
|
|
|
|
|
|
|
|
def set_filelist(self, filelist):
|
|
|
|
"""
|
|
|
|
Set file list in current worker.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset()
|
|
|
|
dataset.set_filelist(['a.txt', 'b.txt'])
|
|
|
|
|
|
|
|
Args:
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
filelist(list): file list
|
|
|
|
"""
|
|
|
|
self.dataset.set_filelist(filelist)
|
|
|
|
self.filelist = filelist
|
|
|
|
|
|
|
|
def set_use_var(self, var_list):
|
|
|
|
"""
|
|
|
|
Set Variables which you will use.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset()
|
|
|
|
dataset.set_use_var([data, label])
|
|
|
|
|
|
|
|
Args:
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
var_list(list): variable list
|
|
|
|
"""
|
|
|
|
multi_slot = self.proto_desc.multi_slot_desc
|
|
|
|
for var in var_list:
|
|
|
|
slot_var = multi_slot.slots.add()
|
|
|
|
slot_var.is_used = True
|
|
|
|
slot_var.name = var.name
|
|
|
|
if var.lod_level == 0:
|
|
|
|
slot_var.is_dense = True
|
|
|
|
slot_var.shape.extend(var.shape)
|
|
|
|
if var.dtype == core.VarDesc.VarType.FP32:
|
|
|
|
slot_var.type = "float"
|
|
|
|
elif var.dtype == core.VarDesc.VarType.INT64:
|
|
|
|
slot_var.type = "uint64"
|
|
|
|
else:
|
|
|
|
raise ValueError(
|
|
|
|
"Currently, fluid.dataset only supports dtype=float32 and dtype=int64"
|
|
|
|
)
|
|
|
|
|
|
|
|
def set_hdfs_config(self, fs_name, fs_ugi):
|
|
|
|
"""
|
|
|
|
Set hdfs config: fs name ad ugi
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset()
|
|
|
|
dataset.set_hdfs_config("my_fs_name", "my_fs_ugi")
|
|
|
|
|
|
|
|
Args:
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
fs_name(str): fs name
|
|
|
|
fs_ugi(str): fs ugi
|
|
|
|
"""
|
|
|
|
self.dataset.set_hdfs_config(fs_name, fs_ugi)
|
|
|
|
|
|
|
|
def _prepare_to_run(self):
|
|
|
|
"""
|
|
|
|
Set data_feed_desc before load or shuffle,
|
|
|
|
user no need to call this function.
|
|
|
|
"""
|
|
|
|
if self.thread_num > len(self.filelist):
|
|
|
|
self.thread_num = len(self.filelist)
|
|
|
|
self.dataset.set_thread_num(self.thread_num)
|
|
|
|
self.dataset.set_data_feed_desc(self.desc())
|
|
|
|
self.dataset.create_readers()
|
|
|
|
|
|
|
|
def _finish_to_run(self):
|
|
|
|
self.dataset.destroy_readers()
|
|
|
|
|
|
|
|
def desc(self):
|
|
|
|
"""
|
|
|
|
Returns a protobuf message for this DataFeedDesc
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset()
|
|
|
|
print(dataset.desc())
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
A string message
|
|
|
|
"""
|
|
|
|
return text_format.MessageToString(self.proto_desc)
|
|
|
|
|
|
|
|
|
|
|
|
class InMemoryDataset(DatasetBase):
|
|
|
|
"""
|
|
|
|
InMemoryDataset, it will load data into memory
|
|
|
|
and shuffle data before training.
|
|
|
|
This class should be created by DatasetFactory
|
|
|
|
|
|
|
|
Example:
|
|
|
|
dataset = paddle.fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self):
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
""" Init. """
|
|
|
|
super(InMemoryDataset, self).__init__()
|
|
|
|
self.proto_desc.name = "MultiSlotInMemoryDataFeed"
|
|
|
|
self.fleet_send_batch_size = None
|
|
|
|
self.queue_num = None
|
|
|
|
self.parse_ins_id = False
|
|
|
|
self.parse_content = False
|
|
|
|
self.merge_by_lineid = False
|
|
|
|
|
|
|
|
def _prepare_to_run(self):
|
|
|
|
"""
|
|
|
|
Set data_feed_desc before load or shuffle,
|
|
|
|
user no need to call this function.
|
|
|
|
"""
|
|
|
|
if self.thread_num > len(self.filelist):
|
|
|
|
self.thread_num = len(self.filelist)
|
|
|
|
if self.thread_num == 0:
|
|
|
|
self.thread_num = 1
|
|
|
|
self.dataset.set_thread_num(self.thread_num)
|
|
|
|
if self.queue_num is None:
|
|
|
|
self.queue_num = self.thread_num
|
|
|
|
self.dataset.set_queue_num(self.queue_num)
|
|
|
|
self.dataset.set_parse_ins_id(self.parse_ins_id)
|
|
|
|
self.dataset.set_parse_content(self.parse_content)
|
|
|
|
self.dataset.set_data_feed_desc(self.desc())
|
|
|
|
self.dataset.create_channel()
|
|
|
|
self.dataset.create_readers()
|
|
|
|
|
|
|
|
def set_queue_num(self, queue_num):
|
|
|
|
"""
|
|
|
|
Set Dataset output queue num, training threads get data from queues
|
|
|
|
|
|
|
|
Args:
|
|
|
|
queue_num(int): dataset output queue num
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
dataset.set_queue_num(12)
|
|
|
|
|
|
|
|
"""
|
|
|
|
self.queue_num = queue_num
|
|
|
|
|
|
|
|
def set_parse_ins_id(self, parse_ins_id):
|
|
|
|
"""
|
|
|
|
Set id Dataset need to parse insid
|
|
|
|
|
|
|
|
Args:
|
|
|
|
parse_ins_id(bool): if parse ins_id or not
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
dataset.set_parse_ins_id(True)
|
|
|
|
|
|
|
|
"""
|
|
|
|
self.parse_ins_id = parse_ins_id
|
|
|
|
|
|
|
|
def set_parse_content(self, parse_content):
|
|
|
|
"""
|
|
|
|
Set if Dataset need to parse content
|
|
|
|
|
|
|
|
Args:
|
|
|
|
parse_content(bool): if parse content or not
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
dataset.set_parse_content(True)
|
|
|
|
|
|
|
|
"""
|
|
|
|
self.parse_content = parse_content
|
|
|
|
|
|
|
|
def set_fleet_send_batch_size(self, fleet_send_batch_size):
|
|
|
|
"""
|
|
|
|
Set fleet send batch size, default is 80000
|
|
|
|
|
|
|
|
Args:
|
|
|
|
fleet_send_batch_size(int): fleet send batch size
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
dataset.set_fleet_send_batch_size(800)
|
|
|
|
|
|
|
|
"""
|
|
|
|
self.fleet_send_batch_size = fleet_send_batch_size
|
|
|
|
|
|
|
|
def set_merge_by_lineid(self,
|
|
|
|
var_list,
|
|
|
|
erase_duplicate_feas=True,
|
|
|
|
min_merge_size=2,
|
|
|
|
keep_unmerged_ins=True):
|
|
|
|
"""
|
|
|
|
Set merge by line id, instances of same line id will be merged after
|
|
|
|
shuffle, you should parse line id in data generator.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
var_list(list): slots that can be merge. each element in var_list
|
|
|
|
is Variable. some slots such as show and click, we
|
|
|
|
usually don't merge them for same line id, so user
|
|
|
|
should specify which slot can be merged.
|
|
|
|
erase_duplicate_feas(bool): whether erase duplicate feasigns when
|
|
|
|
merge. default is True.
|
|
|
|
min_merge_size(int): minimal size to merge. default is 2.
|
|
|
|
keep_unmerged_ins(bool): whether to keep unmerged ins, such as
|
|
|
|
ins with unique id or the num of ins with
|
|
|
|
same id is less than min_merge_size.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
dataset.set_merge_by_lineid()
|
|
|
|
|
|
|
|
"""
|
|
|
|
var_name_list = [i.name for i in var_list]
|
|
|
|
self.dataset.set_merge_by_lineid(var_name_list, erase_duplicate_feas,
|
|
|
|
min_merge_size, keep_unmerged_ins)
|
|
|
|
self.merge_by_lineid = True
|
|
|
|
|
|
|
|
def load_into_memory(self):
|
|
|
|
"""
|
|
|
|
Load data into memory
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
filelist = ["a.txt", "b.txt"]
|
|
|
|
dataset.set_filelist(filelist)
|
|
|
|
dataset.load_into_memory()
|
|
|
|
"""
|
|
|
|
self._prepare_to_run()
|
|
|
|
self.dataset.load_into_memory()
|
|
|
|
|
|
|
|
def preload_into_memory(self):
|
|
|
|
"""
|
|
|
|
Load data into memory in async mode
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
filelist = ["a.txt", "b.txt"]
|
|
|
|
dataset.set_filelist(filelist)
|
|
|
|
dataset.preload_into_memory()
|
|
|
|
dataset.wait_preload_done()
|
|
|
|
"""
|
|
|
|
self._prepare_to_run()
|
|
|
|
self.dataset.preload_into_memory()
|
|
|
|
|
|
|
|
def wait_preload_done(self):
|
|
|
|
"""
|
|
|
|
Wait preload_into_memory done
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
filelist = ["a.txt", "b.txt"]
|
|
|
|
dataset.set_filelist(filelist)
|
|
|
|
dataset.preload_into_memory()
|
|
|
|
dataset.wait_preload_done()
|
|
|
|
"""
|
|
|
|
self.dataset.wait_preload_done()
|
|
|
|
|
|
|
|
def local_shuffle(self):
|
|
|
|
"""
|
|
|
|
Local shuffle
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
filelist = ["a.txt", "b.txt"]
|
|
|
|
dataset.set_filelist(filelist)
|
|
|
|
dataset.load_into_memory()
|
|
|
|
dataset.local_shuffle()
|
|
|
|
"""
|
|
|
|
self.dataset.local_shuffle()
|
|
|
|
|
|
|
|
def global_shuffle(self, fleet=None):
|
|
|
|
"""
|
|
|
|
Global shuffle.
|
|
|
|
Global shuffle can be used only in distributed mode. i.e. multiple
|
|
|
|
processes on single machine or multiple machines training together.
|
|
|
|
If you run in distributed mode, you should pass fleet instead of None.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
filelist = ["a.txt", "b.txt"]
|
|
|
|
dataset.set_filelist(filelist)
|
|
|
|
dataset.load_into_memory()
|
|
|
|
dataset.global_shuffle(fleet)
|
|
|
|
|
|
|
|
Args:
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
fleet(Fleet): fleet singleton. Default None.
|
|
|
|
|
|
|
|
"""
|
|
|
|
trainer_num = 1
|
|
|
|
if fleet is not None:
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
fleet._role_maker._barrier_worker()
|
|
|
|
trainer_num = fleet.worker_num()
|
|
|
|
if self.fleet_send_batch_size is None:
|
|
|
|
self.fleet_send_batch_size = 800 * trainer_num
|
|
|
|
self.dataset.register_client2client_msg_handler()
|
|
|
|
self.dataset.set_trainer_num(trainer_num)
|
|
|
|
self.dataset.set_fleet_send_batch_size(self.fleet_send_batch_size)
|
|
|
|
if fleet is not None:
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
fleet._role_maker._barrier_worker()
|
|
|
|
self.dataset.global_shuffle()
|
|
|
|
if fleet is not None:
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
fleet._role_maker._barrier_worker()
|
|
|
|
if self.merge_by_lineid:
|
|
|
|
self.dataset.merge_by_lineid()
|
|
|
|
if fleet is not None:
|
|
|
|
fleet._role_maker._barrier_worker()
|
|
|
|
|
|
|
|
def release_memory(self):
|
|
|
|
"""
|
|
|
|
Release InMemoryDataset memory data, when data will not be used again.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
filelist = ["a.txt", "b.txt"]
|
|
|
|
dataset.set_filelist(filelist)
|
|
|
|
dataset.load_into_memory()
|
|
|
|
dataset.global_shuffle(fleet)
|
|
|
|
exe = fluid.Executor(fluid.CPUPlace())
|
|
|
|
exe.run(fluid.default_startup_program())
|
|
|
|
exe.train_from_dataset(fluid.default_main_program(), dataset)
|
|
|
|
dataset.release_memory()
|
|
|
|
|
|
|
|
"""
|
|
|
|
self.dataset.release_memory()
|
|
|
|
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
def get_memory_data_size(self, fleet=None):
|
|
|
|
"""
|
|
|
|
Get memory data size, user can call this function to know the num
|
|
|
|
of ins in all workers after load into memory.
|
|
|
|
|
|
|
|
Note:
|
|
|
|
This function may cause bad performance, because it has barrier
|
|
|
|
|
|
|
|
Args:
|
|
|
|
fleet(Fleet): Fleet Object.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The size of memory data.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
filelist = ["a.txt", "b.txt"]
|
|
|
|
dataset.set_filelist(filelist)
|
|
|
|
dataset.load_into_memory()
|
|
|
|
print dataset.get_memory_data_size(fleet)
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
|
|
|
|
"""
|
|
|
|
import numpy as np
|
|
|
|
local_data_size = self.dataset.get_memory_data_size()
|
|
|
|
local_data_size = np.array([local_data_size])
|
|
|
|
if fleet is not None:
|
|
|
|
global_data_size = local_data_size * 0
|
|
|
|
fleet._role_maker._node_type_comm.Allreduce(local_data_size,
|
|
|
|
global_data_size)
|
|
|
|
return global_data_size[0]
|
|
|
|
return local_data_size[0]
|
|
|
|
|
|
|
|
def get_shuffle_data_size(self, fleet=None):
|
|
|
|
"""
|
|
|
|
Get shuffle data size, user can call this function to know the num
|
|
|
|
of ins in all workers after local/global shuffle.
|
|
|
|
|
|
|
|
Note:
|
|
|
|
This function may cause bad performance to local shuffle,
|
|
|
|
because it has barrier. It does not affect global shuffle.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
fleet(Fleet): Fleet Object.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
The size of shuffle data.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("InMemoryDataset")
|
|
|
|
filelist = ["a.txt", "b.txt"]
|
|
|
|
dataset.set_filelist(filelist)
|
|
|
|
dataset.load_into_memory()
|
|
|
|
dataset.global_shuffle(fleet)
|
|
|
|
print dataset.get_shuffle_data_size(fleet)
|
add save/load model, shrink table, cvm, config file & fix pull dense bug (#17118)
* add save/load model, shrink table, cvm, config file & fix pull dense bug
test=develop
* fix global shuffle bug, fix pull dense bug, fix release memeory bug, fix shrink error
add client flush, add get data size
test=develop
* fix global shuffle bug
test=develop
* fix global shuffle bug
test=develop
* fix code style
test=develop
* fix code style & modify pslib cmake
test=develop
* fix error of _role_maker
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix code style
test=develop
* fix windows compile error of fleet
test=develop
* fix global shuffle bug
* add comment
test=develop
* update pslib.cmake
test=develop
* fix fill sparse bug
test=develop
* fix push sparse bug
test=develop
6 years ago
|
|
|
|
|
|
|
"""
|
|
|
|
import numpy as np
|
|
|
|
local_data_size = self.dataset.get_shuffle_data_size()
|
|
|
|
local_data_size = np.array([local_data_size])
|
|
|
|
if fleet is not None:
|
|
|
|
global_data_size = local_data_size * 0
|
|
|
|
fleet._role_maker._node_type_comm.Allreduce(local_data_size,
|
|
|
|
global_data_size)
|
|
|
|
return global_data_size[0]
|
|
|
|
return local_data_size[0]
|
|
|
|
|
|
|
|
|
|
|
|
class QueueDataset(DatasetBase):
|
|
|
|
"""
|
|
|
|
QueueDataset, it will process data streamly.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("QueueDataset")
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
"""
|
|
|
|
Initialize QueueDataset
|
|
|
|
This class should be created by DatasetFactory
|
|
|
|
"""
|
|
|
|
super(QueueDataset, self).__init__()
|
|
|
|
self.proto_desc.name = "MultiSlotDataFeed"
|
|
|
|
|
|
|
|
def _prepare_to_run(self):
|
|
|
|
"""
|
|
|
|
Set data_feed_desc/thread num/filelist before run,
|
|
|
|
user no need to call this function.
|
|
|
|
"""
|
|
|
|
if self.thread_num > len(self.filelist):
|
|
|
|
self.thread_num = len(self.filelist)
|
|
|
|
if self.thread_num == 0:
|
|
|
|
self.thread_num = 1
|
|
|
|
self.dataset.set_thread_num(self.thread_num)
|
|
|
|
self.dataset.set_filelist(self.filelist)
|
|
|
|
self.dataset.set_data_feed_desc(self.desc())
|
|
|
|
self.dataset.create_readers()
|
|
|
|
|
|
|
|
def local_shuffle(self):
|
|
|
|
"""
|
|
|
|
Local shuffle data.
|
|
|
|
|
|
|
|
Local shuffle is not supported in QueueDataset
|
|
|
|
NotImplementedError will be raised
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("QueueDataset")
|
|
|
|
dataset.local_shuffle()
|
|
|
|
|
|
|
|
"""
|
|
|
|
raise NotImplementedError(
|
|
|
|
"QueueDataset does not support local shuffle, "
|
|
|
|
"please use InMemoryDataset for local_shuffle")
|
|
|
|
|
|
|
|
def global_shuffle(self, fleet=None):
|
|
|
|
"""
|
|
|
|
Global shuffle data.
|
|
|
|
|
|
|
|
Global shuffle is not supported in QueueDataset
|
|
|
|
NotImplementedError will be raised
|
|
|
|
|
|
|
|
Args:
|
|
|
|
fleet(Fleet): fleet singleton. Default None.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
from paddle.fluid.incubate.fleet.parameter_server.pslib import fleet
|
|
|
|
dataset = fluid.DatasetFactory().create_dataset("QueueDataset")
|
|
|
|
dataset.global_shuffle(fleet)
|
|
|
|
|
|
|
|
"""
|
|
|
|
raise NotImplementedError(
|
|
|
|
"QueueDataset does not support global shuffle, "
|
|
|
|
"please use InMemoryDataset for global_shuffle")
|
|
|
|
|
|
|
|
|
|
|
|
class FileInstantDataset(DatasetBase):
|
|
|
|
"""
|
|
|
|
FileInstantDataset, it will process data streamly.
|
|
|
|
|
|
|
|
Examples:
|
|
|
|
.. code-block:: python
|
|
|
|
|
|
|
|
import paddle.fluid as fluid
|
|
|
|
dataset = fluid.DatasetFactory.create_dataset("FileInstantDataset")
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
"""
|
|
|
|
Init
|
|
|
|
"""
|
|
|
|
super(FileInstantDataset, self).__init__()
|
|
|
|
self.proto_desc.name = "MultiSlotFileInstantDataFeed"
|
|
|
|
|
|
|
|
def local_shuffle(self):
|
|
|
|
"""
|
|
|
|
Local shuffle, FileInstantDataset does not support local shuffle
|
|
|
|
"""
|
|
|
|
raise NotImplementedError(
|
|
|
|
"FileInstantDataset does not support local shuffle, "
|
|
|
|
"please use InMemoryDataset for local_shuffle")
|
|
|
|
|
|
|
|
def global_shuffle(self, fleet=None):
|
|
|
|
"""
|
|
|
|
Global shuffle
|
|
|
|
"""
|
|
|
|
raise NotImplementedError(
|
|
|
|
"FileInstantDataset does not support global shuffle, "
|
|
|
|
"please use InMemoryDataset for global_shuffle")
|