You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

124 lines
2.7 KiB

package compress
import (
"encoding"
"fmt"
"io"
"strconv"
"strings"
"github.com/segmentio/kafka-go/compress/gzip"
"github.com/segmentio/kafka-go/compress/lz4"
"github.com/segmentio/kafka-go/compress/snappy"
"github.com/segmentio/kafka-go/compress/zstd"
)
// Compression represents the the compression applied to a record set.
type Compression int8
const (
None Compression = 0
Gzip Compression = 1
Snappy Compression = 2
Lz4 Compression = 3
Zstd Compression = 4
)
func (c Compression) Codec() Codec {
if i := int(c); i >= 0 && i < len(Codecs) {
return Codecs[i]
}
return nil
}
func (c Compression) String() string {
if codec := c.Codec(); codec != nil {
return codec.Name()
}
return "uncompressed"
}
func (c Compression) MarshalText() ([]byte, error) {
return []byte(c.String()), nil
}
func (c *Compression) UnmarshalText(b []byte) error {
switch string(b) {
case "none", "uncompressed":
*c = None
return nil
}
for _, codec := range Codecs[None+1:] {
if codec.Name() == string(b) {
*c = Compression(codec.Code())
return nil
}
}
i, err := strconv.ParseInt(string(b), 10, 64)
if err == nil && i >= 0 && i < int64(len(Codecs)) {
*c = Compression(i)
return nil
}
s := &strings.Builder{}
s.WriteString("none, uncompressed")
for i, codec := range Codecs[None+1:] {
if i < (len(Codecs) - 1) {
s.WriteString(", ")
} else {
s.WriteString(", or ")
}
s.WriteString(codec.Name())
}
return fmt.Errorf("compression format must be one of %s, not %q", s, b)
}
var (
_ encoding.TextMarshaler = Compression(0)
_ encoding.TextUnmarshaler = (*Compression)(nil)
)
// Codec represents a compression codec to encode and decode the messages.
// See : https://cwiki.apache.org/confluence/display/KAFKA/Compression
//
// A Codec must be safe for concurrent access by multiple go routines.
type Codec interface {
// Code returns the compression codec code
Code() int8
// Human-readable name for the codec.
Name() string
// Constructs a new reader which decompresses data from r.
NewReader(r io.Reader) io.ReadCloser
// Constructs a new writer which writes compressed data to w.
NewWriter(w io.Writer) io.WriteCloser
}
var (
// The global gzip codec installed on the Codecs table.
GzipCodec gzip.Codec
// The global snappy codec installed on the Codecs table.
SnappyCodec snappy.Codec
// The global lz4 codec installed on the Codecs table.
Lz4Codec lz4.Codec
// The global zstd codec installed on the Codecs table.
ZstdCodec zstd.Codec
// The global table of compression codecs supported by the kafka protocol.
Codecs = [...]Codec{
None: nil,
Gzip: &GzipCodec,
Snappy: &SnappyCodec,
Lz4: &Lz4Codec,
Zstd: &ZstdCodec,
}
)