You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/go/recordio/chunk.go

182 lines
4.2 KiB

package recordio
import (
"bytes"
"compress/gzip"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"github.com/golang/snappy"
)
// A Chunk contains the Header and optionally compressed records. To
// create a chunk, just use ch := &Chunk{}.
type Chunk struct {
records [][]byte
numBytes int // sum of record lengths.
}
func (ch *Chunk) add(record []byte) {
ch.records = append(ch.records, record)
ch.numBytes += len(record)
}
// dump the chunk into w, and clears the chunk and makes it ready for
// the next add invocation.
func (ch *Chunk) dump(w io.Writer, compressorIndex int) error {
// NOTE: don't check ch.numBytes instead, because empty
// records are allowed.
if len(ch.records) == 0 {
return nil
}
// Write raw records and their lengths into data buffer.
var data bytes.Buffer
for _, r := range ch.records {
var rs [4]byte
binary.LittleEndian.PutUint32(rs[:], uint32(len(r)))
if _, e := data.Write(rs[:]); e != nil {
return fmt.Errorf("Failed to write record length: %v", e)
}
if _, e := data.Write(r); e != nil {
return fmt.Errorf("Failed to write record: %v", e)
}
}
compressed, e := compressData(&data, compressorIndex)
if e != nil {
return e
}
// Write chunk header and compressed data.
hdr := &Header{
checkSum: crc32.ChecksumIEEE(compressed.Bytes()),
compressor: uint32(compressorIndex),
compressedSize: uint32(compressed.Len()),
numRecords: uint32(len(ch.records)),
}
if _, e := hdr.write(w); e != nil {
return fmt.Errorf("Failed to write chunk header: %v", e)
}
if _, e := w.Write(compressed.Bytes()); e != nil {
return fmt.Errorf("Failed to write chunk data: %v", e)
}
// Clear the current chunk.
ch.records = nil
ch.numBytes = 0
return nil
}
type noopCompressor struct {
*bytes.Buffer
}
func (c *noopCompressor) Close() error {
return nil
}
func compressData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) {
compressed := new(bytes.Buffer)
var compressor io.WriteCloser
switch compressorIndex {
case NoCompression:
compressor = &noopCompressor{compressed}
case Snappy:
compressor = snappy.NewBufferedWriter(compressed)
case Gzip:
compressor = gzip.NewWriter(compressed)
default:
return nil, fmt.Errorf("Unknown compression algorithm: %d", compressorIndex)
}
if _, e := io.Copy(compressor, src); e != nil {
return nil, fmt.Errorf("Failed to compress chunk data: %v", e)
}
compressor.Close()
return compressed, nil
}
// parse the specified chunk from r.
func parseChunk(r io.ReadSeeker, chunkOffset int64) (*Chunk, error) {
var e error
var hdr *Header
if _, e = r.Seek(chunkOffset, io.SeekStart); e != nil {
return nil, fmt.Errorf("Failed to seek chunk: %v", e)
}
hdr, e = parseHeader(r)
if e != nil {
return nil, fmt.Errorf("Failed to parse chunk header: %v", e)
}
var buf bytes.Buffer
if _, e = io.CopyN(&buf, r, int64(hdr.compressedSize)); e != nil {
return nil, fmt.Errorf("Failed to read chunk data: %v", e)
}
if hdr.checkSum != crc32.ChecksumIEEE(buf.Bytes()) {
return nil, fmt.Errorf("Checksum checking failed.")
}
deflated, e := deflateData(&buf, int(hdr.compressor))
if e != nil {
return nil, e
}
ch := &Chunk{}
for i := 0; i < int(hdr.numRecords); i++ {
var rs [4]byte
if _, e = deflated.Read(rs[:]); e != nil {
return nil, fmt.Errorf("Failed to read record length: %v", e)
}
r := make([]byte, binary.LittleEndian.Uint32(rs[:]))
if _, e = deflated.Read(r); e != nil {
return nil, fmt.Errorf("Failed to read a record: %v", e)
}
ch.records = append(ch.records, r)
ch.numBytes += len(r)
}
return ch, nil
}
func deflateData(src io.Reader, compressorIndex int) (*bytes.Buffer, error) {
var e error
var deflator io.Reader
switch compressorIndex {
case NoCompression:
deflator = src
case Snappy:
deflator = snappy.NewReader(src)
case Gzip:
deflator, e = gzip.NewReader(src)
if e != nil {
return nil, fmt.Errorf("Failed to create gzip reader: %v", e)
}
default:
return nil, fmt.Errorf("Unknown compression algorithm: %d", compressorIndex)
}
deflated := new(bytes.Buffer)
if _, e = io.Copy(deflated, deflator); e != nil {
return nil, fmt.Errorf("Failed to deflate chunk data: %v", e)
}
return deflated, nil
}