|
|
|
@ -42,13 +42,6 @@ bool DataFeed::SetFileList(const std::vector<std::string>& files) {
|
|
|
|
|
CheckInit();
|
|
|
|
|
// Do not set finish_set_filelist_ flag,
|
|
|
|
|
// since a user may set file many times after init reader
|
|
|
|
|
/*
|
|
|
|
|
if (finish_set_filelist_) {
|
|
|
|
|
VLOG(3) << "info: you have set the filelist.";
|
|
|
|
|
return false;
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|
// PADDLE_ENFORCE(files.size(), "You have set an empty filelist.");
|
|
|
|
|
filelist_.assign(files.begin(), files.end());
|
|
|
|
|
|
|
|
|
|
finish_set_filelist_ = true;
|
|
|
|
@ -113,7 +106,6 @@ void PrivateQueueDataFeed<T>::ReadThread() {
|
|
|
|
|
int err_no = 0;
|
|
|
|
|
fp_ = fs_open_read(filename, &err_no, pipe_command_);
|
|
|
|
|
__fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
|
|
|
|
|
thread_local string::LineFileReader reader;
|
|
|
|
|
T instance;
|
|
|
|
|
while (ParseOneInstanceFromPipe(&instance)) {
|
|
|
|
|
queue_->Send(instance);
|
|
|
|
@ -149,7 +141,7 @@ InMemoryDataFeed<T>::InMemoryDataFeed() {
|
|
|
|
|
cur_channel_ = 0;
|
|
|
|
|
shuffled_ins_ = std::make_shared<paddle::framework::BlockingQueue<T>>();
|
|
|
|
|
shuffled_ins_out_ = std::make_shared<paddle::framework::BlockingQueue<T>>();
|
|
|
|
|
fleet_send_batch_size_ = 80000;
|
|
|
|
|
fleet_send_batch_size_ = 80000; // hard code here
|
|
|
|
|
memory_data_ = nullptr;
|
|
|
|
|
mutex_for_update_memory_data_ = nullptr;
|
|
|
|
|
this->file_idx_ = nullptr;
|
|
|
|
@ -441,7 +433,6 @@ void MultiSlotDataFeed::ReadThread() {
|
|
|
|
|
fp_ = fs_open_read(filename, &err_no, pipe_command_);
|
|
|
|
|
CHECK(fp_ != nullptr);
|
|
|
|
|
__fsetlocking(&*fp_, FSETLOCKING_BYCALLER);
|
|
|
|
|
thread_local string::LineFileReader reader;
|
|
|
|
|
std::vector<MultiSlotType> instance;
|
|
|
|
|
int ins_num = 0;
|
|
|
|
|
while (ParseOneInstanceFromPipe(&instance)) {
|
|
|
|
|