parent
a67cebaf5f
commit
313454dfae
@ -0,0 +1,2 @@
|
||||
cc_library(header SRCS header.cc)
|
||||
cc_test(header_test SRCS header_test.cc DEPS header)
|
@ -0,0 +1,119 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <fstream>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
// Chunk
|
||||
// a chunk contains the Header and optionally compressed records.
|
||||
class Chunk {
|
||||
public:
|
||||
Chunk() = default;
|
||||
void Add(const char* record, size_t length);
|
||||
void Add(const std::string&);
|
||||
|
||||
bool Dump(std::ostream& os, Compressor ct);
|
||||
void Parse(std::istream& iss, int64_t offset);
|
||||
const std::string Record(int i) { return records_[i]; }
|
||||
|
||||
private:
|
||||
std::vector<std::string> records_;
|
||||
size_t num_bytes_;
|
||||
};
|
||||
|
||||
size_t CompressData(const std::stringstream& ss, Compressor ct, char* buffer);
|
||||
|
||||
uint32_t DeflateData(char* buffer, uint32_t size, Compressor c);
|
||||
|
||||
// implementation
|
||||
void Chunk::Add(const std::string& s) {
|
||||
num_bytes_ += s.size() * sizeof(char);
|
||||
records_.emplace_back(std::move(s));
|
||||
// records_.resize(records_.size()+1);
|
||||
// records_[records_.size()-1] = s;
|
||||
}
|
||||
|
||||
void Chunk::Add(const char* record, size_t length) {
|
||||
Add(std::string(record, length));
|
||||
}
|
||||
|
||||
bool Chunk::Dump(std::ostream& os, Compressor ct) {
|
||||
if (records_.size() == 0) return false;
|
||||
|
||||
// TODO(dzhwinter):
|
||||
// we pack the string with same size buffer,
|
||||
// then compress with another buffer.
|
||||
// Here can be optimized if it is the bottle-neck.
|
||||
std::ostringstream oss;
|
||||
for (auto& record : records_) {
|
||||
unsigned len = record.size();
|
||||
oss << len;
|
||||
oss << record;
|
||||
// os.write(std::to_string(len).c_str(), sizeof(unsigned));
|
||||
// os.write(record.c_str(), record.size());
|
||||
}
|
||||
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
|
||||
size_t compressed = CompressData(oss.str(), ct, buffer.get());
|
||||
|
||||
// TODO(dzhwinter): crc32 checksum
|
||||
size_t checksum = compressed;
|
||||
|
||||
Header hdr(records_.size(), checksum, ct, compressed);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void Chunk::Parse(std::istream& iss, int64_t offset) {
|
||||
iss.seekg(offset, iss.beg);
|
||||
Header hdr;
|
||||
hdr.Parse(iss);
|
||||
|
||||
std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
|
||||
iss.read(buffer.get(), static_cast<size_t>(hdr.CompressSize()));
|
||||
// TODO(dzhwinter): checksum
|
||||
uint32_t deflated_size =
|
||||
DeflateData(buffer.get(), hdr.CompressSize(), hdr.CompressType());
|
||||
std::istringstream deflated(std::string(buffer.get(), deflated_size));
|
||||
for (size_t i = 0; i < hdr.NumRecords(); ++i) {
|
||||
uint32_t rs;
|
||||
deflated >> rs;
|
||||
std::string record(rs, '\0');
|
||||
deflated.read(&record[0], rs);
|
||||
records_.emplace_back(record);
|
||||
num_bytes_ += record.size();
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t DeflateData(char* buffer, uint32_t size, Compressor c) {
|
||||
uint32_t deflated_size = 0;
|
||||
std::string uncompressed;
|
||||
switch (c) {
|
||||
case Compressor::kNoCompress:
|
||||
deflated_size = size;
|
||||
break;
|
||||
case Compressor::kSnappy:
|
||||
// snappy::Uncompress(buffer, size, &uncompressed);
|
||||
// deflated_size = uncompressed.size();
|
||||
// memcpy(buffer, uncompressed.data(), uncompressed.size() *
|
||||
// sizeof(char));
|
||||
break;
|
||||
}
|
||||
return deflated_size;
|
||||
}
|
@ -0,0 +1,81 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "paddle/fluid/recordio/header.h"
|
||||
|
||||
namespace paddle {
|
||||
namespace recordio {
|
||||
|
||||
Header::Header()
|
||||
: num_records_(0),
|
||||
checksum_(0),
|
||||
compressor_(Compressor::kNoCompress),
|
||||
compress_size_(0) {}
|
||||
|
||||
Header::Header(uint32_t num, uint32_t sum, Compressor c, uint32_t cs)
|
||||
: num_records_(num), checksum_(sum), compressor_(c), compress_size_(cs) {}
|
||||
|
||||
void Header::Parse(std::istream& iss) {
|
||||
iss.read(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t));
|
||||
iss.read(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t));
|
||||
iss.read(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t));
|
||||
iss.read(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
|
||||
}
|
||||
|
||||
void Header::Write(std::ostream& os) {
|
||||
os.write(reinterpret_cast<char*>(&num_records_), sizeof(uint32_t));
|
||||
os.write(reinterpret_cast<char*>(&checksum_), sizeof(uint32_t));
|
||||
os.write(reinterpret_cast<char*>(&compressor_), sizeof(uint32_t));
|
||||
os.write(reinterpret_cast<char*>(&compress_size_), sizeof(uint32_t));
|
||||
}
|
||||
|
||||
// std::ostream& operator << (std::ostream& os, Header h) {
|
||||
// os << h.num_records_
|
||||
// << h.checksum_
|
||||
// << static_cast<uint32_t>(h.compressor_)
|
||||
// << h.compress_size_;
|
||||
// return os;
|
||||
// }
|
||||
|
||||
std::ostream& operator<<(std::ostream& os, Header h) {
|
||||
os << h.NumRecords() << h.Checksum()
|
||||
<< static_cast<uint32_t>(h.CompressType()) << h.CompressSize();
|
||||
return os;
|
||||
}
|
||||
|
||||
// bool operator==(Header l, Header r) {
|
||||
// return num_records_ == rhs.NumRecords() &&
|
||||
// checksum_ == rhs.Checksum() &&
|
||||
// compressor_ == rhs.CompressType() &&
|
||||
// compress_size_ == rhs.CompressSize();
|
||||
// }
|
||||
|
||||
bool operator==(Header l, Header r) {
|
||||
return l.NumRecords() == r.NumRecords() && l.Checksum() == r.Checksum() &&
|
||||
l.CompressType() == r.CompressType() &&
|
||||
l.CompressSize() == r.CompressSize();
|
||||
}
|
||||
|
||||
// size_t CompressData(const std::string& os, Compressor ct, char* buffer) {
|
||||
// size_t compress_size = 0;
|
||||
|
||||
// // std::unique_ptr<char[]> buffer(new char[kDefaultMaxChunkSize]);
|
||||
// // std::string compressed;
|
||||
// compress_size =os.size();
|
||||
// memcpy(buffer, os.c_str(), compress_size);
|
||||
// return compress_size;
|
||||
// }
|
||||
|
||||
} // namespace recordio
|
||||
} // namespace paddle
|
@ -0,0 +1,66 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <sstream>
|
||||
|
||||
namespace paddle {
|
||||
namespace recordio {
|
||||
|
||||
// Default ChunkSize
|
||||
constexpr size_t kDefaultMaxChunkSize = 32 * 1024 * 1024;
|
||||
// MagicNumber for memory checking
|
||||
constexpr uint32_t kMagicNumber = 0x01020304;
|
||||
|
||||
enum class Compressor {
|
||||
// NoCompression means writing raw chunk data into files.
|
||||
// With other choices, chunks are compressed before written.
|
||||
kNoCompress = 0,
|
||||
// Snappy had been the default compressing algorithm widely
|
||||
// used in Google. It compromises between speech and
|
||||
// compression ratio.
|
||||
kSnappy = 1,
|
||||
// Gzip is a well-known compression algorithm. It is
|
||||
// recommmended only you are looking for compression ratio.
|
||||
kGzip = 2,
|
||||
};
|
||||
|
||||
// Header is the metadata of Chunk
|
||||
class Header {
|
||||
public:
|
||||
Header();
|
||||
Header(uint32_t num, uint32_t sum, Compressor ct, uint32_t cs);
|
||||
|
||||
void Write(std::ostream& os);
|
||||
void Parse(std::istream& iss);
|
||||
|
||||
uint32_t NumRecords() const { return num_records_; }
|
||||
uint32_t Checksum() const { return checksum_; }
|
||||
Compressor CompressType() const { return compressor_; }
|
||||
uint32_t CompressSize() const { return compress_size_; }
|
||||
|
||||
private:
|
||||
uint32_t num_records_;
|
||||
uint32_t checksum_;
|
||||
Compressor compressor_;
|
||||
uint32_t compress_size_;
|
||||
};
|
||||
|
||||
// Allow Header Loggable
|
||||
std::ostream& operator<<(std::ostream& os, Header h);
|
||||
bool operator==(Header l, Header r);
|
||||
|
||||
} // namespace recordio
|
||||
} // namespace paddle
|
@ -0,0 +1,45 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "paddle/fluid/recordio/header.h"
|
||||
|
||||
#include <sstream>
|
||||
|
||||
#include "gtest/gtest.h"
|
||||
|
||||
using namespace recordio;
|
||||
|
||||
TEST(Recordio, ChunkHead) {
|
||||
Header hdr(0, 1, Compressor::kGzip, 3);
|
||||
std::ostringstream oss;
|
||||
hdr.Write(oss);
|
||||
|
||||
std::istringstream iss(oss.str());
|
||||
Header hdr2;
|
||||
hdr2.Parse(iss);
|
||||
|
||||
std::ostringstream oss2;
|
||||
hdr2.Write(oss2);
|
||||
EXPECT_STREQ(oss2.str().c_str(), oss.str().c_str());
|
||||
}
|
||||
|
||||
TEST(Recordio, Stream) {
|
||||
Header hdr(0, 1, static_cast<Compressor>(2), 3);
|
||||
std::ostringstream oss1;
|
||||
hdr.Write(oss1);
|
||||
|
||||
std::ostringstream oss2;
|
||||
oss2 << hdr;
|
||||
EXPECT_STREQ(oss2.str().c_str(), oss1.str().c_str());
|
||||
}
|
@ -0,0 +1,69 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <fstream>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
class Index {
|
||||
public:
|
||||
int NumRecords() { return num_records_; }
|
||||
|
||||
// Locate returns the index of chunk that contains the given record,
|
||||
// and the record index within the chunk. It returns (-1, -1) if the
|
||||
// record is out of range.
|
||||
void Locate(int record_idx, std::pair<int, int>* out) {
|
||||
size_t sum = 0;
|
||||
for (size_t i = 0; i < chunk_lens_.size(); ++i) {
|
||||
sum += chunk_lens_[i];
|
||||
if (static_cast<size_t>(record_idx) < sum) {
|
||||
out->first = i;
|
||||
out->second = record_idx - sum + chunk_lens_[i];
|
||||
return;
|
||||
}
|
||||
}
|
||||
// out->swap(std::make_pair<int,int>(-1, -1));
|
||||
out->first = -1;
|
||||
out->second = -1;
|
||||
}
|
||||
|
||||
private:
|
||||
std::vector<int64_t> chunk_offsets_;
|
||||
std::vector<uint32_t> chunk_lens_;
|
||||
int num_records_;
|
||||
std::vector<int> chunk_records_;
|
||||
};
|
||||
|
||||
// RangeScanner
|
||||
// creates a scanner that sequencially reads records in the
|
||||
// range [start, start+len). If start < 0, it scans from the
|
||||
// beginning. If len < 0, it scans till the end of file.
|
||||
class RangeScanner {
|
||||
public:
|
||||
RangeScanner(std::istream is, Index idx, int start, int end);
|
||||
bool Scan();
|
||||
const std::string Record();
|
||||
|
||||
private:
|
||||
std::istream stream_;
|
||||
Index index_;
|
||||
int start_, end_, cur_;
|
||||
int chunk_index_;
|
||||
std::unique_ptr<Chunk> chunk_;
|
||||
};
|
@ -0,0 +1,44 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <fstream>
|
||||
#include <memory>
|
||||
#include <sstream>
|
||||
#include <string>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
// Scanner
|
||||
|
||||
class Scanner {
|
||||
public:
|
||||
Scanner(const char* paths);
|
||||
const std::string Record();
|
||||
bool Scan();
|
||||
void Close();
|
||||
|
||||
private:
|
||||
bool NextFile();
|
||||
int Err() { return err_; }
|
||||
|
||||
private:
|
||||
std::vector<std::string> paths_;
|
||||
FILE* cur_file_;
|
||||
RangeScanner* cur_scanner_;
|
||||
int path_idx_;
|
||||
bool end_;
|
||||
int err_;
|
||||
};
|
@ -0,0 +1,45 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#include "paddle/fluid/recordio/writer.h"
|
||||
|
||||
namespace paddle {
|
||||
namespace recordio {
|
||||
|
||||
Writer::Writer(std::ostream& os)
|
||||
: stream_(os.rdbuf()), max_chunk_size_(0), compressor_(0) {}
|
||||
|
||||
Writer::Writer(std::ostream& os, int maxChunkSize, int compressor)
|
||||
: stream_(os.rdbuf()),
|
||||
max_chunk_size_(maxChunkSize),
|
||||
compressor_(compressor) {
|
||||
// clear rdstate
|
||||
stream_.clear();
|
||||
chunk_.reset(new Chunk);
|
||||
}
|
||||
|
||||
size_t Writer::Write(const std::string& buf) {}
|
||||
|
||||
size_t Writer::Write(const char* buf, size_t length) {
|
||||
// std::string s(buf, length);
|
||||
Write(std::string(buf, length));
|
||||
}
|
||||
|
||||
void Writer::Close() {
|
||||
stream_.flush();
|
||||
stream_.setstate(std::ios::eofbit);
|
||||
}
|
||||
|
||||
} // namespace recordio
|
||||
} // namespace paddle
|
@ -0,0 +1,56 @@
|
||||
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
#include <memory>
|
||||
#include <string>
|
||||
|
||||
#include "paddle/fluid/platform/macros.h" // for DISABLE COPY ASSIGN
|
||||
#include "paddle/fluid/recordio/header.h"
|
||||
|
||||
namespace paddle {
|
||||
namespace recordio {
|
||||
|
||||
// Writer creates a RecordIO file.
|
||||
class Writer {
|
||||
public:
|
||||
Writer(std::ostream& os);
|
||||
Writer(std::ostream& os, int maxChunkSize, int c);
|
||||
|
||||
// Writes a record. It returns an error if Close has been called.
|
||||
size_t Write(const char* buf, size_t length);
|
||||
size_t Write(const std::string& buf);
|
||||
size_t Write(std::string&& buf);
|
||||
|
||||
// Close flushes the current chunk and makes the writer invalid.
|
||||
void Close();
|
||||
|
||||
private:
|
||||
// Set rdstate to mark a closed writer
|
||||
std::ostream stream_;
|
||||
std::unique_ptr<Chunk> chunk_;
|
||||
// total records size, excluding metadata, before compression.
|
||||
int max_chunk_size_;
|
||||
int compressor_;
|
||||
DISABLE_COPY_AND_ASSIGN(Writer);
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
Writer& operator<<(const T& val) {
|
||||
stream_ << val;
|
||||
return *this;
|
||||
}
|
||||
|
||||
} // namespace recordio
|
||||
} // namespace paddle
|
Loading…
Reference in new issue