|
|
|
@ -18,15 +18,14 @@
|
|
|
|
|
namespace paddle {
|
|
|
|
|
namespace framework {
|
|
|
|
|
|
|
|
|
|
Dataset::Dataset() {
|
|
|
|
|
thread_num_ = 1;
|
|
|
|
|
}
|
|
|
|
|
Dataset::Dataset() { thread_num_ = 1; }
|
|
|
|
|
|
|
|
|
|
void Dataset::SetFileList(const std::vector<std::string>& filelist) {
|
|
|
|
|
filelist_ = filelist;
|
|
|
|
|
int file_cnt = filelist_.size();
|
|
|
|
|
if (thread_num_ > file_cnt) {
|
|
|
|
|
VLOG(1) << "DataSet thread num = " << thread_num_ << ", file num = " << file_cnt
|
|
|
|
|
VLOG(1) << "DataSet thread num = " << thread_num_
|
|
|
|
|
<< ", file num = " << file_cnt
|
|
|
|
|
<< ". Changing DataSet thread num = " << file_cnt;
|
|
|
|
|
thread_num_ = file_cnt;
|
|
|
|
|
}
|
|
|
|
@ -35,22 +34,23 @@ void Dataset::SetFileList(const std::vector<std::string>& filelist) {
|
|
|
|
|
void Dataset::SetThreadNum(int thread_num) {
|
|
|
|
|
int file_cnt = filelist_.size();
|
|
|
|
|
if (file_cnt != 0 && thread_num > file_cnt) {
|
|
|
|
|
VLOG(1) << "DataSet thread num = " << thread_num << ", file num = " << file_cnt
|
|
|
|
|
VLOG(1) << "DataSet thread num = " << thread_num
|
|
|
|
|
<< ", file num = " << file_cnt
|
|
|
|
|
<< ". Changing DataSet thread num = " << file_cnt;
|
|
|
|
|
thread_num = file_cnt;
|
|
|
|
|
}
|
|
|
|
|
thread_num_ = thread_num;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void Dataset::SetTrainerNum(int trainer_num) {
|
|
|
|
|
trainer_num_ = trainer_num;
|
|
|
|
|
}
|
|
|
|
|
void Dataset::SetTrainerNum(int trainer_num) { trainer_num_ = trainer_num; }
|
|
|
|
|
|
|
|
|
|
void Dataset::SetDataFeedDesc(const paddle::framework::DataFeedDesc& data_feed_desc) {
|
|
|
|
|
void Dataset::SetDataFeedDesc(
|
|
|
|
|
const paddle::framework::DataFeedDesc& data_feed_desc) {
|
|
|
|
|
data_feed_desc_ = data_feed_desc;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
std::vector<std::shared_ptr<paddle::framework::DataFeed>> Dataset::GetReaders() {
|
|
|
|
|
std::vector<std::shared_ptr<paddle::framework::DataFeed>>
|
|
|
|
|
Dataset::GetReaders() {
|
|
|
|
|
return readers_;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -60,8 +60,8 @@ void Dataset::LoadIntoMemory() {
|
|
|
|
|
}
|
|
|
|
|
std::vector<std::thread> load_threads;
|
|
|
|
|
for (int64_t i = 0; i < thread_num_; ++i) {
|
|
|
|
|
load_threads.push_back(std::thread(&paddle::framework::DataFeed::LoadIntoMemory,
|
|
|
|
|
readers_[i].get()));
|
|
|
|
|
load_threads.push_back(std::thread(
|
|
|
|
|
&paddle::framework::DataFeed::LoadIntoMemory, readers_[i].get()));
|
|
|
|
|
}
|
|
|
|
|
for (std::thread& t : load_threads) {
|
|
|
|
|
t.join();
|
|
|
|
@ -74,8 +74,8 @@ void Dataset::LocalShuffle() {
|
|
|
|
|
}
|
|
|
|
|
std::vector<std::thread> local_shuffle_threads;
|
|
|
|
|
for (int64_t i = 0; i < thread_num_; ++i) {
|
|
|
|
|
local_shuffle_threads.push_back(std::thread(&paddle::framework::DataFeed::LocalShuffle,
|
|
|
|
|
readers_[i].get()));
|
|
|
|
|
local_shuffle_threads.push_back(std::thread(
|
|
|
|
|
&paddle::framework::DataFeed::LocalShuffle, readers_[i].get()));
|
|
|
|
|
}
|
|
|
|
|
for (std::thread& t : local_shuffle_threads) {
|
|
|
|
|
t.join();
|
|
|
|
@ -115,14 +115,14 @@ void Dataset::CreateReaders() {
|
|
|
|
|
readers_[0]->SetFileList(filelist_);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int Dataset::ReceiveFromClient(int msg_type, int client_id, const std::string& msg) {
|
|
|
|
|
int Dataset::ReceiveFromClient(int msg_type, int client_id,
|
|
|
|
|
const std::string& msg) {
|
|
|
|
|
// can also use hash
|
|
|
|
|
// int64_t index = paddle::ps::local_random_engine()() % thread_num_;
|
|
|
|
|
// todo
|
|
|
|
|
int64_t index = 0;
|
|
|
|
|
readers_[index]->PutInsToChannel(msg);
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} // end namespace framework
|
|
|
|
|
} // end namespace paddle
|
|
|
|
|