|
|
|
@ -110,12 +110,14 @@ Status CsvOp::Init() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CsvOp::CsvParser::CsvParser(int32_t worker_id, std::shared_ptr<JaggedConnector> connector, int64_t rows_per_buffer,
|
|
|
|
|
char field_delim, std::vector<std::shared_ptr<CsvOp::BaseRecord>> column_default)
|
|
|
|
|
char field_delim, std::vector<std::shared_ptr<CsvOp::BaseRecord>> column_default,
|
|
|
|
|
std::string file_path)
|
|
|
|
|
: worker_id_(worker_id),
|
|
|
|
|
buffer_connector_(connector),
|
|
|
|
|
csv_rows_per_buffer_(rows_per_buffer),
|
|
|
|
|
csv_field_delim_(field_delim),
|
|
|
|
|
column_default_(column_default),
|
|
|
|
|
file_path_(file_path),
|
|
|
|
|
cur_state_(START_OF_FILE),
|
|
|
|
|
pos_(0),
|
|
|
|
|
cur_row_(0),
|
|
|
|
@ -358,8 +360,11 @@ Status CsvOp::CsvParser::InitCsvParser() {
|
|
|
|
|
{{State::START_OF_FILE, Message::MS_NORMAL},
|
|
|
|
|
{State::UNQUOTE,
|
|
|
|
|
[this](CsvParser &, char c) -> int {
|
|
|
|
|
TensorRow row(column_default_.size(), nullptr);
|
|
|
|
|
std::vector<std::string> file_path(column_default_.size(), file_path_);
|
|
|
|
|
row.setPath(file_path);
|
|
|
|
|
this->tensor_table_ = std::make_unique<TensorQTable>();
|
|
|
|
|
this->tensor_table_->push_back(TensorRow(column_default_.size(), nullptr));
|
|
|
|
|
this->tensor_table_->push_back(row);
|
|
|
|
|
this->str_buf_[0] = c;
|
|
|
|
|
this->pos_ = 1;
|
|
|
|
|
return 0;
|
|
|
|
@ -367,15 +372,21 @@ Status CsvOp::CsvParser::InitCsvParser() {
|
|
|
|
|
{{State::START_OF_FILE, Message::MS_DELIM},
|
|
|
|
|
{State::DELIM,
|
|
|
|
|
[this](CsvParser &, char c) -> int {
|
|
|
|
|
TensorRow row(column_default_.size(), nullptr);
|
|
|
|
|
std::vector<std::string> file_path(column_default_.size(), file_path_);
|
|
|
|
|
row.setPath(file_path);
|
|
|
|
|
this->tensor_table_ = std::make_unique<TensorQTable>();
|
|
|
|
|
this->tensor_table_->push_back(TensorRow(column_default_.size(), nullptr));
|
|
|
|
|
this->tensor_table_->push_back(row);
|
|
|
|
|
return this->PutRecord(c);
|
|
|
|
|
}}},
|
|
|
|
|
{{State::START_OF_FILE, Message::MS_QUOTE},
|
|
|
|
|
{State::QUOTE,
|
|
|
|
|
[this](CsvParser &, char c) -> int {
|
|
|
|
|
TensorRow row(column_default_.size(), nullptr);
|
|
|
|
|
std::vector<std::string> file_path(column_default_.size(), file_path_);
|
|
|
|
|
row.setPath(file_path);
|
|
|
|
|
this->tensor_table_ = std::make_unique<TensorQTable>();
|
|
|
|
|
this->tensor_table_->push_back(TensorRow(column_default_.size(), nullptr));
|
|
|
|
|
this->tensor_table_->push_back(row);
|
|
|
|
|
this->pos_ = 0;
|
|
|
|
|
return 0;
|
|
|
|
|
}}},
|
|
|
|
@ -458,7 +469,10 @@ Status CsvOp::CsvParser::InitCsvParser() {
|
|
|
|
|
{State::UNQUOTE,
|
|
|
|
|
[this](CsvParser &, char c) -> int {
|
|
|
|
|
if (this->total_rows_ > this->start_offset_ && this->total_rows_ <= this->end_offset_) {
|
|
|
|
|
this->tensor_table_->push_back(TensorRow(column_default_.size(), nullptr));
|
|
|
|
|
TensorRow row(column_default_.size(), nullptr);
|
|
|
|
|
std::vector<std::string> file_path(column_default_.size(), file_path_);
|
|
|
|
|
row.setPath(file_path);
|
|
|
|
|
this->tensor_table_->push_back(row);
|
|
|
|
|
}
|
|
|
|
|
this->str_buf_[0] = c;
|
|
|
|
|
this->pos_ = 1;
|
|
|
|
@ -468,7 +482,10 @@ Status CsvOp::CsvParser::InitCsvParser() {
|
|
|
|
|
{State::DELIM,
|
|
|
|
|
[this](CsvParser &, char c) -> int {
|
|
|
|
|
if (this->total_rows_ > this->start_offset_ && this->total_rows_ <= this->end_offset_) {
|
|
|
|
|
this->tensor_table_->push_back(TensorRow(column_default_.size(), nullptr));
|
|
|
|
|
TensorRow row(column_default_.size(), nullptr);
|
|
|
|
|
std::vector<std::string> file_path(column_default_.size(), file_path_);
|
|
|
|
|
row.setPath(file_path);
|
|
|
|
|
this->tensor_table_->push_back(row);
|
|
|
|
|
}
|
|
|
|
|
return this->PutRecord(c);
|
|
|
|
|
}}},
|
|
|
|
@ -476,7 +493,10 @@ Status CsvOp::CsvParser::InitCsvParser() {
|
|
|
|
|
{State::QUOTE,
|
|
|
|
|
[this](CsvParser &, char c) -> int {
|
|
|
|
|
if (this->total_rows_ > this->start_offset_ && this->total_rows_ <= this->end_offset_) {
|
|
|
|
|
this->tensor_table_->push_back(TensorRow(column_default_.size(), nullptr));
|
|
|
|
|
TensorRow row(column_default_.size(), nullptr);
|
|
|
|
|
std::vector<std::string> file_path(column_default_.size(), file_path_);
|
|
|
|
|
row.setPath(file_path);
|
|
|
|
|
this->tensor_table_->push_back(row);
|
|
|
|
|
}
|
|
|
|
|
return 0;
|
|
|
|
|
}}},
|
|
|
|
@ -497,7 +517,7 @@ Status CsvOp::Reset() {
|
|
|
|
|
|
|
|
|
|
Status CsvOp::LoadFile(const std::string &file, const int64_t start_offset, const int64_t end_offset,
|
|
|
|
|
const int32_t worker_id) {
|
|
|
|
|
CsvParser csv_parser(worker_id, jagged_buffer_connector_, rows_per_buffer_, field_delim_, column_default_list_);
|
|
|
|
|
CsvParser csv_parser(worker_id, jagged_buffer_connector_, rows_per_buffer_, field_delim_, column_default_list_, file);
|
|
|
|
|
csv_parser.SetStartOffset(start_offset);
|
|
|
|
|
csv_parser.SetEndOffset(end_offset);
|
|
|
|
|
std::ifstream ifs;
|
|
|
|
@ -512,7 +532,7 @@ Status CsvOp::LoadFile(const std::string &file, const int64_t start_offset, cons
|
|
|
|
|
csv_parser.Reset();
|
|
|
|
|
try {
|
|
|
|
|
while (ifs.good()) {
|
|
|
|
|
// when ifstream reachs the end of file, the function get() return std::char_traits<char>::eof()
|
|
|
|
|
// when ifstream reaches the end of file, the function get() return std::char_traits<char>::eof()
|
|
|
|
|
// which is a 32-bit -1, it's not equal to the 8-bit -1 on Euler OS. So instead of char, we use
|
|
|
|
|
// int to receive its return value.
|
|
|
|
|
int chr = ifs.get();
|
|
|
|
@ -799,7 +819,7 @@ Status CsvOp::CalculateNumRowsPerShard() {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int64_t CsvOp::CountTotalRows(const std::string &file) {
|
|
|
|
|
CsvParser csv_parser(0, jagged_buffer_connector_, rows_per_buffer_, field_delim_, column_default_list_);
|
|
|
|
|
CsvParser csv_parser(0, jagged_buffer_connector_, rows_per_buffer_, field_delim_, column_default_list_, file);
|
|
|
|
|
std::ifstream ifs;
|
|
|
|
|
ifs.open(file, std::ifstream::in);
|
|
|
|
|
if (!ifs.is_open()) {
|
|
|
|
|