You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

168 lines
3.4 KiB

// Package zstd implements Zstandard compression.
package zstd
import (
"io"
"sync"
"github.com/klauspost/compress/zstd"
)
// Codec is the implementation of a compress.Codec which supports creating
// readers and writers for kafka messages compressed with zstd.
type Codec struct {
// The compression level configured on writers created by the codec.
//
// Default to 3.
Level int
encoderPool sync.Pool // *encoder
}
// Code implements the compress.Codec interface.
func (c *Codec) Code() int8 { return 4 }
// Name implements the compress.Codec interface.
func (c *Codec) Name() string { return "zstd" }
// NewReader implements the compress.Codec interface.
func (c *Codec) NewReader(r io.Reader) io.ReadCloser {
p := new(reader)
if p.dec, _ = decoderPool.Get().(*zstd.Decoder); p.dec != nil {
p.dec.Reset(r)
} else {
z, err := zstd.NewReader(r,
zstd.WithDecoderConcurrency(1),
)
if err != nil {
p.err = err
} else {
p.dec = z
}
}
return p
}
func (c *Codec) level() int {
if c.Level != 0 {
return c.Level
}
return 3
}
func (c *Codec) zstdLevel() zstd.EncoderLevel {
return zstd.EncoderLevelFromZstd(c.level())
}
var decoderPool sync.Pool // *zstd.Decoder
type reader struct {
dec *zstd.Decoder
err error
}
// Close implements the io.Closer interface.
func (r *reader) Close() error {
if r.dec != nil {
r.dec.Reset(devNull{}) // don't retain the underlying reader
decoderPool.Put(r.dec)
r.dec = nil
r.err = io.ErrClosedPipe
}
return nil
}
// Read implements the io.Reader interface.
func (r *reader) Read(p []byte) (int, error) {
if r.err != nil {
return 0, r.err
}
if r.dec == nil {
return 0, io.EOF
}
return r.dec.Read(p)
}
// WriteTo implements the io.WriterTo interface.
func (r *reader) WriteTo(w io.Writer) (int64, error) {
if r.err != nil {
return 0, r.err
}
if r.dec == nil {
return 0, io.ErrClosedPipe
}
return r.dec.WriteTo(w)
}
// NewWriter implements the compress.Codec interface.
func (c *Codec) NewWriter(w io.Writer) io.WriteCloser {
p := new(writer)
if enc, _ := c.encoderPool.Get().(*zstd.Encoder); enc == nil {
z, err := zstd.NewWriter(w,
zstd.WithEncoderLevel(c.zstdLevel()),
zstd.WithEncoderConcurrency(1),
zstd.WithZeroFrames(true),
)
if err != nil {
p.err = err
} else {
p.enc = z
}
} else {
p.enc = enc
p.enc.Reset(w)
}
p.c = c
return p
}
type writer struct {
c *Codec
enc *zstd.Encoder
err error
}
// Close implements the io.Closer interface.
func (w *writer) Close() error {
if w.enc != nil {
// Close needs to be called to write the end of stream marker and flush
// the buffers. The zstd package documents that the encoder is re-usable
// after being closed.
err := w.enc.Close()
if err != nil {
w.err = err
}
w.enc.Reset(devNull{}) // don't retain the underlying writer
w.c.encoderPool.Put(w.enc)
w.enc = nil
return err
}
return w.err
}
// WriteTo implements the io.WriterTo interface.
func (w *writer) Write(p []byte) (int, error) {
if w.err != nil {
return 0, w.err
}
if w.enc == nil {
return 0, io.ErrClosedPipe
}
return w.enc.Write(p)
}
// ReadFrom implements the io.ReaderFrom interface.
func (w *writer) ReadFrom(r io.Reader) (int64, error) {
if w.err != nil {
return 0, w.err
}
if w.enc == nil {
return 0, io.ErrClosedPipe
}
return w.enc.ReadFrom(r)
}
type devNull struct{}
func (devNull) Read([]byte) (int, error) { return 0, io.EOF }
func (devNull) Write([]byte) (int, error) { return 0, nil }