You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
544 lines
14 KiB
544 lines
14 KiB
package kafka
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
)
|
|
|
|
type readBytesFunc func(*bufio.Reader, int, int) (int, error)
|
|
|
|
// messageSetReader processes the messages encoded into a fetch response.
|
|
// The response may contain a mix of Record Batches (newer format) and Messages
|
|
// (older format).
|
|
type messageSetReader struct {
|
|
*readerStack // used for decompressing compressed messages and record batches
|
|
empty bool // if true, short circuits messageSetReader methods
|
|
debug bool // enable debug log messages
|
|
// How many bytes are expected to remain in the response.
|
|
//
|
|
// This is used to detect truncation of the response.
|
|
lengthRemain int
|
|
|
|
decompressed bytes.Buffer
|
|
}
|
|
|
|
type readerStack struct {
|
|
reader *bufio.Reader
|
|
remain int
|
|
base int64
|
|
parent *readerStack
|
|
count int // how many messages left in the current message set
|
|
header messagesHeader // the current header for a subset of messages within the set.
|
|
}
|
|
|
|
// messagesHeader describes a set of records. there may be many messagesHeader's in a message set.
|
|
type messagesHeader struct {
|
|
firstOffset int64
|
|
length int32
|
|
crc int32
|
|
magic int8
|
|
// v1 composes attributes specific to v0 and v1 message headers
|
|
v1 struct {
|
|
attributes int8
|
|
timestamp int64
|
|
}
|
|
// v2 composes attributes specific to v2 message headers
|
|
v2 struct {
|
|
leaderEpoch int32
|
|
attributes int16
|
|
lastOffsetDelta int32
|
|
firstTimestamp int64
|
|
lastTimestamp int64
|
|
producerID int64
|
|
producerEpoch int16
|
|
baseSequence int32
|
|
count int32
|
|
}
|
|
}
|
|
|
|
func (h messagesHeader) compression() (codec CompressionCodec, err error) {
|
|
const compressionCodecMask = 0x07
|
|
var code int8
|
|
switch h.magic {
|
|
case 0, 1:
|
|
code = h.v1.attributes & compressionCodecMask
|
|
case 2:
|
|
code = int8(h.v2.attributes & compressionCodecMask)
|
|
default:
|
|
err = h.badMagic()
|
|
return
|
|
}
|
|
if code != 0 {
|
|
codec, err = resolveCodec(code)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (h messagesHeader) badMagic() error {
|
|
return fmt.Errorf("unsupported magic byte %d in header", h.magic)
|
|
}
|
|
|
|
func newMessageSetReader(reader *bufio.Reader, remain int) (*messageSetReader, error) {
|
|
res := &messageSetReader{
|
|
readerStack: &readerStack{
|
|
reader: reader,
|
|
remain: remain,
|
|
},
|
|
}
|
|
err := res.readHeader()
|
|
return res, err
|
|
}
|
|
|
|
func (r *messageSetReader) remaining() (remain int) {
|
|
if r.empty {
|
|
return 0
|
|
}
|
|
for s := r.readerStack; s != nil; s = s.parent {
|
|
remain += s.remain
|
|
}
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) discard() (err error) {
|
|
switch {
|
|
case r.empty:
|
|
case r.readerStack == nil:
|
|
default:
|
|
// rewind up to the top-most reader b/c it's the only one that's doing
|
|
// actual i/o. the rest are byte buffers that have been pushed on the stack
|
|
// while reading compressed message sets.
|
|
for r.parent != nil {
|
|
r.readerStack = r.parent
|
|
}
|
|
err = r.discardN(r.remain)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) readMessage(min int64, key readBytesFunc, val readBytesFunc) (
|
|
offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {
|
|
|
|
if r.empty {
|
|
err = RequestTimedOut
|
|
return
|
|
}
|
|
if err = r.readHeader(); err != nil {
|
|
return
|
|
}
|
|
switch r.header.magic {
|
|
case 0, 1:
|
|
offset, timestamp, headers, err = r.readMessageV1(min, key, val)
|
|
// Set an invalid value so that it can be ignored
|
|
lastOffset = -1
|
|
case 2:
|
|
offset, lastOffset, timestamp, headers, err = r.readMessageV2(min, key, val)
|
|
default:
|
|
err = r.header.badMagic()
|
|
}
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) readMessageV1(min int64, key readBytesFunc, val readBytesFunc) (
|
|
offset int64, timestamp int64, headers []Header, err error) {
|
|
|
|
for r.readerStack != nil {
|
|
if r.remain == 0 {
|
|
r.readerStack = r.parent
|
|
continue
|
|
}
|
|
if err = r.readHeader(); err != nil {
|
|
return
|
|
}
|
|
offset = r.header.firstOffset
|
|
timestamp = r.header.v1.timestamp
|
|
var codec CompressionCodec
|
|
if codec, err = r.header.compression(); err != nil {
|
|
return
|
|
}
|
|
r.log("Reading with codec=%T", codec)
|
|
if codec != nil {
|
|
// discard next four bytes...will be -1 to indicate null key
|
|
if err = r.discardN(4); err != nil {
|
|
return
|
|
}
|
|
|
|
// read and decompress the contained message set.
|
|
r.decompressed.Reset()
|
|
if err = r.readBytesWith(func(br *bufio.Reader, sz int, n int) (remain int, err error) {
|
|
// x4 as a guess that the average compression ratio is near 75%
|
|
r.decompressed.Grow(4 * n)
|
|
limitReader := io.LimitedReader{R: br, N: int64(n)}
|
|
codecReader := codec.NewReader(&limitReader)
|
|
_, err = r.decompressed.ReadFrom(codecReader)
|
|
remain = sz - (n - int(limitReader.N))
|
|
codecReader.Close()
|
|
return
|
|
}); err != nil {
|
|
return
|
|
}
|
|
|
|
// the compressed message's offset will be equal to the offset of
|
|
// the last message in the set. within the compressed set, the
|
|
// offsets will be relative, so we have to scan through them to
|
|
// get the base offset. for example, if there are four compressed
|
|
// messages at offsets 10-13, then the container message will have
|
|
// offset 13 and the contained messages will be 0,1,2,3. the base
|
|
// offset for the container, then is 13-3=10.
|
|
if offset, err = extractOffset(offset, r.decompressed.Bytes()); err != nil {
|
|
return
|
|
}
|
|
|
|
// mark the outer message as being read
|
|
r.markRead()
|
|
|
|
// then push the decompressed bytes onto the stack.
|
|
r.readerStack = &readerStack{
|
|
// Allocate a buffer of size 0, which gets capped at 16 bytes
|
|
// by the bufio package. We are already reading buffered data
|
|
// here, no need to reserve another 4KB buffer.
|
|
reader: bufio.NewReaderSize(&r.decompressed, 0),
|
|
remain: r.decompressed.Len(),
|
|
base: offset,
|
|
parent: r.readerStack,
|
|
}
|
|
continue
|
|
}
|
|
|
|
// adjust the offset in case we're reading compressed messages. the
|
|
// base will be zero otherwise.
|
|
offset += r.base
|
|
|
|
// When the messages are compressed kafka may return messages at an
|
|
// earlier offset than the one that was requested, it's the client's
|
|
// responsibility to ignore those.
|
|
//
|
|
// At this point, the message header has been read, so discarding
|
|
// the rest of the message means we have to discard the key, and then
|
|
// the value. Each of those are preceded by a 4-byte length. Discarding
|
|
// them is then reading that length variable and then discarding that
|
|
// amount.
|
|
if offset < min {
|
|
// discard the key
|
|
if err = r.discardBytes(); err != nil {
|
|
return
|
|
}
|
|
// discard the value
|
|
if err = r.discardBytes(); err != nil {
|
|
return
|
|
}
|
|
// since we have fully consumed the message, mark as read
|
|
r.markRead()
|
|
continue
|
|
}
|
|
if err = r.readBytesWith(key); err != nil {
|
|
return
|
|
}
|
|
if err = r.readBytesWith(val); err != nil {
|
|
return
|
|
}
|
|
r.markRead()
|
|
return
|
|
}
|
|
err = errShortRead
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) readMessageV2(_ int64, key readBytesFunc, val readBytesFunc) (
|
|
offset int64, lastOffset int64, timestamp int64, headers []Header, err error) {
|
|
if err = r.readHeader(); err != nil {
|
|
return
|
|
}
|
|
if r.count == int(r.header.v2.count) { // first time reading this set, so check for compression headers.
|
|
var codec CompressionCodec
|
|
if codec, err = r.header.compression(); err != nil {
|
|
return
|
|
}
|
|
if codec != nil {
|
|
batchRemain := int(r.header.length - 49) // TODO: document this magic number
|
|
if batchRemain > r.remain {
|
|
err = errShortRead
|
|
return
|
|
}
|
|
if batchRemain < 0 {
|
|
err = fmt.Errorf("batch remain < 0 (%d)", batchRemain)
|
|
return
|
|
}
|
|
r.decompressed.Reset()
|
|
// x4 as a guess that the average compression ratio is near 75%
|
|
r.decompressed.Grow(4 * batchRemain)
|
|
limitReader := io.LimitedReader{R: r.reader, N: int64(batchRemain)}
|
|
codecReader := codec.NewReader(&limitReader)
|
|
_, err = r.decompressed.ReadFrom(codecReader)
|
|
codecReader.Close()
|
|
if err != nil {
|
|
return
|
|
}
|
|
r.remain -= batchRemain - int(limitReader.N)
|
|
r.readerStack = &readerStack{
|
|
reader: bufio.NewReaderSize(&r.decompressed, 0), // the new stack reads from the decompressed buffer
|
|
remain: r.decompressed.Len(),
|
|
base: -1, // base is unused here
|
|
parent: r.readerStack,
|
|
header: r.header,
|
|
count: r.count,
|
|
}
|
|
// all of the messages in this set are in the decompressed set just pushed onto the reader
|
|
// stack. here we set the parent count to 0 so that when the child set is exhausted, the
|
|
// reader will then try to read the header of the next message set
|
|
r.readerStack.parent.count = 0
|
|
}
|
|
}
|
|
remainBefore := r.remain
|
|
var length int64
|
|
if err = r.readVarInt(&length); err != nil {
|
|
return
|
|
}
|
|
lengthOfLength := remainBefore - r.remain
|
|
var attrs int8
|
|
if err = r.readInt8(&attrs); err != nil {
|
|
return
|
|
}
|
|
var timestampDelta int64
|
|
if err = r.readVarInt(×tampDelta); err != nil {
|
|
return
|
|
}
|
|
timestamp = r.header.v2.firstTimestamp + timestampDelta
|
|
var offsetDelta int64
|
|
if err = r.readVarInt(&offsetDelta); err != nil {
|
|
return
|
|
}
|
|
offset = r.header.firstOffset + offsetDelta
|
|
if err = r.runFunc(key); err != nil {
|
|
return
|
|
}
|
|
if err = r.runFunc(val); err != nil {
|
|
return
|
|
}
|
|
var headerCount int64
|
|
if err = r.readVarInt(&headerCount); err != nil {
|
|
return
|
|
}
|
|
if headerCount > 0 {
|
|
headers = make([]Header, headerCount)
|
|
for i := range headers {
|
|
if err = r.readMessageHeader(&headers[i]); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
lastOffset = r.header.firstOffset + int64(r.header.v2.lastOffsetDelta)
|
|
r.lengthRemain -= int(length) + lengthOfLength
|
|
r.markRead()
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) discardBytes() (err error) {
|
|
r.remain, err = discardBytes(r.reader, r.remain)
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) discardN(sz int) (err error) {
|
|
r.remain, err = discardN(r.reader, r.remain, sz)
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) markRead() {
|
|
if r.count == 0 {
|
|
panic("markRead: negative count")
|
|
}
|
|
r.count--
|
|
r.unwindStack()
|
|
r.log("Mark read remain=%d", r.remain)
|
|
}
|
|
|
|
func (r *messageSetReader) unwindStack() {
|
|
for r.count == 0 {
|
|
if r.remain == 0 {
|
|
if r.parent != nil {
|
|
r.log("Popped reader stack")
|
|
r.readerStack = r.parent
|
|
continue
|
|
}
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
func (r *messageSetReader) readMessageHeader(header *Header) (err error) {
|
|
var keyLen int64
|
|
if err = r.readVarInt(&keyLen); err != nil {
|
|
return
|
|
}
|
|
if header.Key, err = r.readNewString(int(keyLen)); err != nil {
|
|
return
|
|
}
|
|
var valLen int64
|
|
if err = r.readVarInt(&valLen); err != nil {
|
|
return
|
|
}
|
|
if header.Value, err = r.readNewBytes(int(valLen)); err != nil {
|
|
return
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (r *messageSetReader) runFunc(rbFunc readBytesFunc) (err error) {
|
|
var length int64
|
|
if err = r.readVarInt(&length); err != nil {
|
|
return
|
|
}
|
|
if r.remain, err = rbFunc(r.reader, r.remain, int(length)); err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) readHeader() (err error) {
|
|
if r.count > 0 {
|
|
// currently reading a set of messages, no need to read a header until they are exhausted.
|
|
return
|
|
}
|
|
r.header = messagesHeader{}
|
|
if err = r.readInt64(&r.header.firstOffset); err != nil {
|
|
return
|
|
}
|
|
if err = r.readInt32(&r.header.length); err != nil {
|
|
return
|
|
}
|
|
var crcOrLeaderEpoch int32
|
|
if err = r.readInt32(&crcOrLeaderEpoch); err != nil {
|
|
return
|
|
}
|
|
if err = r.readInt8(&r.header.magic); err != nil {
|
|
return
|
|
}
|
|
switch r.header.magic {
|
|
case 0:
|
|
r.header.crc = crcOrLeaderEpoch
|
|
if err = r.readInt8(&r.header.v1.attributes); err != nil {
|
|
return
|
|
}
|
|
r.count = 1
|
|
// Set arbitrary non-zero length so that we always assume the
|
|
// message is truncated since bytes remain.
|
|
r.lengthRemain = 1
|
|
r.log("Read v0 header with offset=%d len=%d magic=%d attributes=%d", r.header.firstOffset, r.header.length, r.header.magic, r.header.v1.attributes)
|
|
case 1:
|
|
r.header.crc = crcOrLeaderEpoch
|
|
if err = r.readInt8(&r.header.v1.attributes); err != nil {
|
|
return
|
|
}
|
|
if err = r.readInt64(&r.header.v1.timestamp); err != nil {
|
|
return
|
|
}
|
|
r.count = 1
|
|
// Set arbitrary non-zero length so that we always assume the
|
|
// message is truncated since bytes remain.
|
|
r.lengthRemain = 1
|
|
r.log("Read v1 header with remain=%d offset=%d magic=%d and attributes=%d", r.remain, r.header.firstOffset, r.header.magic, r.header.v1.attributes)
|
|
case 2:
|
|
r.header.v2.leaderEpoch = crcOrLeaderEpoch
|
|
if err = r.readInt32(&r.header.crc); err != nil {
|
|
return
|
|
}
|
|
if err = r.readInt16(&r.header.v2.attributes); err != nil {
|
|
return
|
|
}
|
|
if err = r.readInt32(&r.header.v2.lastOffsetDelta); err != nil {
|
|
return
|
|
}
|
|
if err = r.readInt64(&r.header.v2.firstTimestamp); err != nil {
|
|
return
|
|
}
|
|
if err = r.readInt64(&r.header.v2.lastTimestamp); err != nil {
|
|
return
|
|
}
|
|
if err = r.readInt64(&r.header.v2.producerID); err != nil {
|
|
return
|
|
}
|
|
if err = r.readInt16(&r.header.v2.producerEpoch); err != nil {
|
|
return
|
|
}
|
|
if err = r.readInt32(&r.header.v2.baseSequence); err != nil {
|
|
return
|
|
}
|
|
if err = r.readInt32(&r.header.v2.count); err != nil {
|
|
return
|
|
}
|
|
r.count = int(r.header.v2.count)
|
|
// Subtracts the header bytes from the length
|
|
r.lengthRemain = int(r.header.length) - 49
|
|
r.log("Read v2 header with count=%d offset=%d len=%d magic=%d attributes=%d", r.count, r.header.firstOffset, r.header.length, r.header.magic, r.header.v2.attributes)
|
|
default:
|
|
err = r.header.badMagic()
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) readNewBytes(len int) (res []byte, err error) {
|
|
res, r.remain, err = readNewBytes(r.reader, r.remain, len)
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) readNewString(len int) (res string, err error) {
|
|
res, r.remain, err = readNewString(r.reader, r.remain, len)
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) readInt8(val *int8) (err error) {
|
|
r.remain, err = readInt8(r.reader, r.remain, val)
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) readInt16(val *int16) (err error) {
|
|
r.remain, err = readInt16(r.reader, r.remain, val)
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) readInt32(val *int32) (err error) {
|
|
r.remain, err = readInt32(r.reader, r.remain, val)
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) readInt64(val *int64) (err error) {
|
|
r.remain, err = readInt64(r.reader, r.remain, val)
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) readVarInt(val *int64) (err error) {
|
|
r.remain, err = readVarInt(r.reader, r.remain, val)
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) readBytesWith(fn readBytesFunc) (err error) {
|
|
r.remain, err = readBytesWith(r.reader, r.remain, fn)
|
|
return
|
|
}
|
|
|
|
func (r *messageSetReader) log(msg string, args ...interface{}) {
|
|
if r.debug {
|
|
log.Printf("[DEBUG] "+msg, args...)
|
|
}
|
|
}
|
|
|
|
func extractOffset(base int64, msgSet []byte) (offset int64, err error) {
|
|
r, remain := bufio.NewReader(bytes.NewReader(msgSet)), len(msgSet)
|
|
for remain > 0 {
|
|
if remain, err = readInt64(r, remain, &offset); err != nil {
|
|
return
|
|
}
|
|
var sz int32
|
|
if remain, err = readInt32(r, remain, &sz); err != nil {
|
|
return
|
|
}
|
|
if remain, err = discardN(r, remain, int(sz)); err != nil {
|
|
return
|
|
}
|
|
}
|
|
offset = base - offset
|
|
return
|
|
}
|
|
|