|
|
|
@ -64,52 +64,27 @@ class CsvOp : public ParallelOp {
|
|
|
|
|
CsvParser() = delete;
|
|
|
|
|
|
|
|
|
|
CsvParser(int32_t worker_id, std::shared_ptr<JaggedConnector> connector, int64_t rows_per_buffer, char field_delim,
|
|
|
|
|
std::vector<std::shared_ptr<CsvOp::BaseRecord>> column_default)
|
|
|
|
|
: worker_id_(worker_id),
|
|
|
|
|
buffer_connector_(connector),
|
|
|
|
|
csv_rows_per_buffer_(rows_per_buffer),
|
|
|
|
|
csv_field_delim_(field_delim),
|
|
|
|
|
column_default_(column_default),
|
|
|
|
|
cur_state_(START_OF_FILE),
|
|
|
|
|
pos_(0),
|
|
|
|
|
cur_row_(0),
|
|
|
|
|
cur_col_(0),
|
|
|
|
|
total_rows_(0),
|
|
|
|
|
start_offset_(0),
|
|
|
|
|
end_offset_(std::numeric_limits<int64_t>::max()),
|
|
|
|
|
err_message_("unknown") {
|
|
|
|
|
cur_buffer_ = std::make_unique<DataBuffer>(0, DataBuffer::BufferFlags::kDeBFlagNone);
|
|
|
|
|
initCsvParser();
|
|
|
|
|
}
|
|
|
|
|
std::vector<std::shared_ptr<CsvOp::BaseRecord>> column_default);
|
|
|
|
|
|
|
|
|
|
~CsvParser() = default;
|
|
|
|
|
|
|
|
|
|
void Reset() {
|
|
|
|
|
cur_state_ = START_OF_FILE;
|
|
|
|
|
pos_ = 0;
|
|
|
|
|
cur_row_ = 0;
|
|
|
|
|
cur_col_ = 0;
|
|
|
|
|
}
|
|
|
|
|
void Reset();
|
|
|
|
|
|
|
|
|
|
void setStartOffset(int64_t start_offset) { start_offset_ = start_offset; }
|
|
|
|
|
void SetStartOffset(int64_t start_offset) { start_offset_ = start_offset; }
|
|
|
|
|
|
|
|
|
|
void setEndOffset(int64_t end_offset) { end_offset_ = end_offset; }
|
|
|
|
|
void SetEndOffset(int64_t end_offset) { end_offset_ = end_offset; }
|
|
|
|
|
|
|
|
|
|
int processMessage(int c) {
|
|
|
|
|
Message m = getMessage(c);
|
|
|
|
|
StateDiagram::iterator it = sd.find({cur_state_, m});
|
|
|
|
|
if (it == sd.end()) {
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
int ret = it->second.second(*this, static_cast<char>(c));
|
|
|
|
|
cur_state_ = it->second.first;
|
|
|
|
|
return ret;
|
|
|
|
|
}
|
|
|
|
|
int ProcessMessage(int c);
|
|
|
|
|
|
|
|
|
|
int countRows(int c);
|
|
|
|
|
int CountRows(int c);
|
|
|
|
|
|
|
|
|
|
Status initCsvParser();
|
|
|
|
|
Status InitCsvParser();
|
|
|
|
|
|
|
|
|
|
int64_t GetTotalRows() { return total_rows_; }
|
|
|
|
|
|
|
|
|
|
std::string GetErrorMessage() { return err_message_; }
|
|
|
|
|
|
|
|
|
|
private:
|
|
|
|
|
enum State : uint8_t {
|
|
|
|
|
START_OF_FILE = 0,
|
|
|
|
|
UNQUOTE,
|
|
|
|
@ -130,55 +105,24 @@ class CsvOp : public ParallelOp {
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
typedef std::pair<State, Message> StateMessagePair;
|
|
|
|
|
typedef std::pair<State, std::function<int(CsvParser &, char)>> StateActionPair;
|
|
|
|
|
typedef std::pair<State, std::function<int(CsvParser &, int)>> StateActionPair;
|
|
|
|
|
typedef std::map<StateMessagePair, StateActionPair> StateDiagram;
|
|
|
|
|
|
|
|
|
|
Message getMessage(int c) {
|
|
|
|
|
if (c == csv_field_delim_) {
|
|
|
|
|
return Message::MS_DELIM;
|
|
|
|
|
} else if (c == '"') {
|
|
|
|
|
return Message::MS_QUOTE;
|
|
|
|
|
} else if (c == '\r' || c == '\n') {
|
|
|
|
|
return Message::MS_END_OF_LINE;
|
|
|
|
|
} else if (c == std::char_traits<char>::eof()) {
|
|
|
|
|
return Message::MS_END_OF_FILE;
|
|
|
|
|
} else {
|
|
|
|
|
return Message::MS_NORMAL;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
Message GetMessage(int c);
|
|
|
|
|
|
|
|
|
|
int null_func(char c) { return 0; }
|
|
|
|
|
int NullFunc(int c) { return 0; }
|
|
|
|
|
|
|
|
|
|
int put_char(char c) {
|
|
|
|
|
if (pos_ >= str_buf_.size()) {
|
|
|
|
|
str_buf_.resize(str_buf_.size() * 2);
|
|
|
|
|
}
|
|
|
|
|
str_buf_[pos_] = c;
|
|
|
|
|
pos_++;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
int PutChar(int c);
|
|
|
|
|
|
|
|
|
|
int put_record(char c);
|
|
|
|
|
int PutRecord(int c);
|
|
|
|
|
|
|
|
|
|
int put_row(char c);
|
|
|
|
|
int PutRow(int c);
|
|
|
|
|
|
|
|
|
|
int end_file(char c);
|
|
|
|
|
int EndFile(int c);
|
|
|
|
|
|
|
|
|
|
int add_row(char c) {
|
|
|
|
|
total_rows_++;
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
int AddRow(int c);
|
|
|
|
|
|
|
|
|
|
int catch_exception(char c) {
|
|
|
|
|
if (getMessage(c) == Message::MS_QUOTE && cur_state_ == State::UNQUOTE) {
|
|
|
|
|
err_message_ = "Invalid quote in unquote field.";
|
|
|
|
|
} else if (getMessage(c) == Message::MS_END_OF_FILE && cur_state_ == State::QUOTE) {
|
|
|
|
|
err_message_ = "Reach the end of file in quote field.";
|
|
|
|
|
} else if (getMessage(c) == Message::MS_NORMAL && cur_state_ == State::SECOND_QUOTE) {
|
|
|
|
|
err_message_ = "Receive unquote char in quote field.";
|
|
|
|
|
}
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
int CatchException(int c);
|
|
|
|
|
|
|
|
|
|
int32_t worker_id_;
|
|
|
|
|
std::shared_ptr<JaggedConnector> buffer_connector_;
|
|
|
|
@ -401,8 +345,8 @@ class CsvOp : public ParallelOp {
|
|
|
|
|
|
|
|
|
|
// Select file and push it to the block queue.
|
|
|
|
|
// @param file_name - File name.
|
|
|
|
|
// @param start_file - If file contains the first sample of data.
|
|
|
|
|
// @param end_file - If file contains the end sample of data.
|
|
|
|
|
// @param start_offset - If file contains the first sample of data.
|
|
|
|
|
// @param end_offset - If file contains the end sample of data.
|
|
|
|
|
// @param pre_count - Total rows of previous files.
|
|
|
|
|
// @return Status - the error code returned.
|
|
|
|
|
bool NeedPushFileToBlockQueue(const std::string &file_name, int64_t *start_offset, int64_t *end_offset,
|
|
|
|
|