|
|
|
@ -14,6 +14,8 @@
|
|
|
|
|
|
|
|
|
|
#include "paddle/fluid/operators/reader/ctr_reader.h"
|
|
|
|
|
|
|
|
|
|
#include <gzstream.h>
|
|
|
|
|
|
|
|
|
|
#include <cstdlib>
|
|
|
|
|
#include <fstream>
|
|
|
|
|
#include <iostream>
|
|
|
|
@ -24,10 +26,6 @@
|
|
|
|
|
#include <algorithm>
|
|
|
|
|
#include <random>
|
|
|
|
|
|
|
|
|
|
#include <boost/iostreams/copy.hpp>
|
|
|
|
|
#include <boost/iostreams/filter/gzip.hpp>
|
|
|
|
|
#include <boost/iostreams/filtering_streambuf.hpp>
|
|
|
|
|
|
|
|
|
|
namespace paddle {
|
|
|
|
|
namespace operators {
|
|
|
|
|
namespace reader {
|
|
|
|
@ -75,23 +73,19 @@ static inline void parse_line(
|
|
|
|
|
|
|
|
|
|
class GzipReader {
|
|
|
|
|
public:
|
|
|
|
|
explicit GzipReader(const std::string& file_name) : instream_(&inbuf_) {
|
|
|
|
|
file_ = std::ifstream(file_name, std::ios_base::in | std::ios_base::binary);
|
|
|
|
|
inbuf_.push(boost::iostreams::gzip_decompressor());
|
|
|
|
|
inbuf_.push(file_);
|
|
|
|
|
// Convert streambuf to istream
|
|
|
|
|
}
|
|
|
|
|
explicit GzipReader(const std::string& file_name)
|
|
|
|
|
: gzstream_(file_name.c_str()) {}
|
|
|
|
|
|
|
|
|
|
~GzipReader() { file_.close(); }
|
|
|
|
|
~GzipReader() {}
|
|
|
|
|
|
|
|
|
|
bool HasNext() { return instream_.peek() != EOF; }
|
|
|
|
|
bool HasNext() { return gzstream_.peek() != EOF; }
|
|
|
|
|
|
|
|
|
|
void NextLine(std::string& line) { std::getline(instream_, line); } // NOLINT
|
|
|
|
|
void NextLine(std::string* line) { // NOLINT
|
|
|
|
|
std::getline(gzstream_, line);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
boost::iostreams::filtering_streambuf<boost::iostreams::input> inbuf_;
|
|
|
|
|
std::ifstream file_;
|
|
|
|
|
std::istream instream_;
|
|
|
|
|
igzstream gzstream_;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
class MultiGzipReader {
|
|
|
|
@ -113,8 +107,8 @@ class MultiGzipReader {
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void NextLine(std::string& line) { // NOLINT
|
|
|
|
|
readers_[current_reader_index_]->NextLine(line);
|
|
|
|
|
void NextLine(std::string* line) {
|
|
|
|
|
readers_[current_reader_index_]->NextLine(*line);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
@ -122,12 +116,6 @@ class MultiGzipReader {
|
|
|
|
|
size_t current_reader_index_ = 0;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// void CTRReader::ReadThread(
|
|
|
|
|
// const std::vector<std::string> &file_list,
|
|
|
|
|
// const std::vector<std::string>& slots,
|
|
|
|
|
// int batch_size,
|
|
|
|
|
// std::shared_ptr<LoDTensorBlockingQueue>& queue) {}
|
|
|
|
|
|
|
|
|
|
void CTRReader::ReadThread(const std::vector<std::string>& file_list,
|
|
|
|
|
const std::vector<std::string>& slots,
|
|
|
|
|
int batch_size,
|
|
|
|
@ -135,14 +123,12 @@ void CTRReader::ReadThread(const std::vector<std::string>& file_list,
|
|
|
|
|
std::string line;
|
|
|
|
|
|
|
|
|
|
// read all files
|
|
|
|
|
std::vector<std::string> all_lines;
|
|
|
|
|
MultiGzipReader reader(file_list);
|
|
|
|
|
reader.NextLine(&line);
|
|
|
|
|
|
|
|
|
|
for (int j = 0; j < all_lines.size(); ++j) {
|
|
|
|
|
std::unordered_map<std::string, std::vector<int64_t>> slots_to_data;
|
|
|
|
|
int64_t label;
|
|
|
|
|
parse_line(all_lines[j], slots, &label, &slots_to_data);
|
|
|
|
|
}
|
|
|
|
|
std::unordered_map<std::string, std::vector<int64_t>> slots_to_data;
|
|
|
|
|
int64_t label;
|
|
|
|
|
parse_line(line, slots, &label, &slots_to_data);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} // namespace reader
|
|
|
|
|