You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
168 lines
3.4 KiB
168 lines
3.4 KiB
// Package zstd implements Zstandard compression.
|
|
package zstd
|
|
|
|
import (
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/klauspost/compress/zstd"
|
|
)
|
|
|
|
// Codec is the implementation of a compress.Codec which supports creating
|
|
// readers and writers for kafka messages compressed with zstd.
|
|
type Codec struct {
|
|
// The compression level configured on writers created by the codec.
|
|
//
|
|
// Default to 3.
|
|
Level int
|
|
|
|
encoderPool sync.Pool // *encoder
|
|
}
|
|
|
|
// Code implements the compress.Codec interface.
|
|
func (c *Codec) Code() int8 { return 4 }
|
|
|
|
// Name implements the compress.Codec interface.
|
|
func (c *Codec) Name() string { return "zstd" }
|
|
|
|
// NewReader implements the compress.Codec interface.
|
|
func (c *Codec) NewReader(r io.Reader) io.ReadCloser {
|
|
p := new(reader)
|
|
if p.dec, _ = decoderPool.Get().(*zstd.Decoder); p.dec != nil {
|
|
p.dec.Reset(r)
|
|
} else {
|
|
z, err := zstd.NewReader(r,
|
|
zstd.WithDecoderConcurrency(1),
|
|
)
|
|
if err != nil {
|
|
p.err = err
|
|
} else {
|
|
p.dec = z
|
|
}
|
|
}
|
|
return p
|
|
}
|
|
|
|
func (c *Codec) level() int {
|
|
if c.Level != 0 {
|
|
return c.Level
|
|
}
|
|
return 3
|
|
}
|
|
|
|
func (c *Codec) zstdLevel() zstd.EncoderLevel {
|
|
return zstd.EncoderLevelFromZstd(c.level())
|
|
}
|
|
|
|
var decoderPool sync.Pool // *zstd.Decoder
|
|
|
|
type reader struct {
|
|
dec *zstd.Decoder
|
|
err error
|
|
}
|
|
|
|
// Close implements the io.Closer interface.
|
|
func (r *reader) Close() error {
|
|
if r.dec != nil {
|
|
r.dec.Reset(devNull{}) // don't retain the underlying reader
|
|
decoderPool.Put(r.dec)
|
|
r.dec = nil
|
|
r.err = io.ErrClosedPipe
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Read implements the io.Reader interface.
|
|
func (r *reader) Read(p []byte) (int, error) {
|
|
if r.err != nil {
|
|
return 0, r.err
|
|
}
|
|
if r.dec == nil {
|
|
return 0, io.EOF
|
|
}
|
|
return r.dec.Read(p)
|
|
}
|
|
|
|
// WriteTo implements the io.WriterTo interface.
|
|
func (r *reader) WriteTo(w io.Writer) (int64, error) {
|
|
if r.err != nil {
|
|
return 0, r.err
|
|
}
|
|
if r.dec == nil {
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
return r.dec.WriteTo(w)
|
|
}
|
|
|
|
// NewWriter implements the compress.Codec interface.
|
|
func (c *Codec) NewWriter(w io.Writer) io.WriteCloser {
|
|
p := new(writer)
|
|
if enc, _ := c.encoderPool.Get().(*zstd.Encoder); enc == nil {
|
|
z, err := zstd.NewWriter(w,
|
|
zstd.WithEncoderLevel(c.zstdLevel()),
|
|
zstd.WithEncoderConcurrency(1),
|
|
zstd.WithZeroFrames(true),
|
|
)
|
|
if err != nil {
|
|
p.err = err
|
|
} else {
|
|
p.enc = z
|
|
}
|
|
} else {
|
|
p.enc = enc
|
|
p.enc.Reset(w)
|
|
}
|
|
p.c = c
|
|
return p
|
|
}
|
|
|
|
type writer struct {
|
|
c *Codec
|
|
enc *zstd.Encoder
|
|
err error
|
|
}
|
|
|
|
// Close implements the io.Closer interface.
|
|
func (w *writer) Close() error {
|
|
if w.enc != nil {
|
|
// Close needs to be called to write the end of stream marker and flush
|
|
// the buffers. The zstd package documents that the encoder is re-usable
|
|
// after being closed.
|
|
err := w.enc.Close()
|
|
if err != nil {
|
|
w.err = err
|
|
}
|
|
w.enc.Reset(devNull{}) // don't retain the underlying writer
|
|
w.c.encoderPool.Put(w.enc)
|
|
w.enc = nil
|
|
return err
|
|
}
|
|
return w.err
|
|
}
|
|
|
|
// WriteTo implements the io.WriterTo interface.
|
|
func (w *writer) Write(p []byte) (int, error) {
|
|
if w.err != nil {
|
|
return 0, w.err
|
|
}
|
|
if w.enc == nil {
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
return w.enc.Write(p)
|
|
}
|
|
|
|
// ReadFrom implements the io.ReaderFrom interface.
|
|
func (w *writer) ReadFrom(r io.Reader) (int64, error) {
|
|
if w.err != nil {
|
|
return 0, w.err
|
|
}
|
|
if w.enc == nil {
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
return w.enc.ReadFrom(r)
|
|
}
|
|
|
|
type devNull struct{}
|
|
|
|
func (devNull) Read([]byte) (int, error) { return 0, io.EOF }
|
|
func (devNull) Write([]byte) (int, error) { return 0, nil }
|
|
|