|
|
|
@ -18,6 +18,7 @@
|
|
|
|
|
#include "ThreadPool.h"
|
|
|
|
|
#include "paddle/fluid/framework/blocking_queue.h"
|
|
|
|
|
#include "paddle/fluid/operators/reader/blocking_queue.h"
|
|
|
|
|
#include "paddle/fluid/operators/reader/buffered_reader.h"
|
|
|
|
|
#include "paddle/fluid/operators/reader/reader_op_registry.h"
|
|
|
|
|
|
|
|
|
|
namespace paddle {
|
|
|
|
@ -232,12 +233,17 @@ class OpenFilesOp : public framework::OperatorBase {
|
|
|
|
|
container.reset(new OrderedReaderContainer());
|
|
|
|
|
} else {
|
|
|
|
|
container.reset(new PreemptiveReaderContainer(
|
|
|
|
|
std::min(file_names.size(),
|
|
|
|
|
static_cast<size_t>(std::thread::hardware_concurrency()))));
|
|
|
|
|
static_cast<size_t>(Attr<int>("thread_num"))));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
out->Reset(
|
|
|
|
|
std::make_shared<MultiFileReader>(file_names, std::move(container)));
|
|
|
|
|
std::shared_ptr<framework::ReaderBase> reader(
|
|
|
|
|
new MultiFileReader(file_names, std::move(container)));
|
|
|
|
|
auto buffer_size = Attr<int>("buffer_size");
|
|
|
|
|
if (buffer_size > 1) {
|
|
|
|
|
reader = framework::MakeDecoratedReader<BufferedReader>(
|
|
|
|
|
reader, platform::CPUPlace(), buffer_size);
|
|
|
|
|
}
|
|
|
|
|
out->Reset(reader);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
@ -253,6 +259,8 @@ class OpenFilesOpMaker : public FileReaderMakerBase {
|
|
|
|
|
An OpenFilesOp creates a MultiFileReader, which is able to
|
|
|
|
|
read data multi-threaded from multiple files.
|
|
|
|
|
)DOC");
|
|
|
|
|
AddAttr<int>("thread_num", "Number of thread to read files.");
|
|
|
|
|
AddAttr<int>("buffer_size", "The reading buffer of these files.");
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|