You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
614 lines
14 KiB
614 lines
14 KiB
package kafka
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"hash/crc32"
|
|
"io"
|
|
"time"
|
|
)
|
|
|
|
type writeBuffer struct {
|
|
w io.Writer
|
|
b [16]byte
|
|
}
|
|
|
|
func (wb *writeBuffer) writeInt8(i int8) {
|
|
wb.b[0] = byte(i)
|
|
wb.Write(wb.b[:1])
|
|
}
|
|
|
|
func (wb *writeBuffer) writeInt16(i int16) {
|
|
binary.BigEndian.PutUint16(wb.b[:2], uint16(i))
|
|
wb.Write(wb.b[:2])
|
|
}
|
|
|
|
func (wb *writeBuffer) writeInt32(i int32) {
|
|
binary.BigEndian.PutUint32(wb.b[:4], uint32(i))
|
|
wb.Write(wb.b[:4])
|
|
}
|
|
|
|
func (wb *writeBuffer) writeInt64(i int64) {
|
|
binary.BigEndian.PutUint64(wb.b[:8], uint64(i))
|
|
wb.Write(wb.b[:8])
|
|
}
|
|
|
|
func (wb *writeBuffer) writeVarInt(i int64) {
|
|
u := uint64((i << 1) ^ (i >> 63))
|
|
n := 0
|
|
|
|
for u >= 0x80 && n < len(wb.b) {
|
|
wb.b[n] = byte(u) | 0x80
|
|
u >>= 7
|
|
n++
|
|
}
|
|
|
|
if n < len(wb.b) {
|
|
wb.b[n] = byte(u)
|
|
n++
|
|
}
|
|
|
|
wb.Write(wb.b[:n])
|
|
}
|
|
|
|
func (wb *writeBuffer) writeString(s string) {
|
|
wb.writeInt16(int16(len(s)))
|
|
wb.WriteString(s)
|
|
}
|
|
|
|
func (wb *writeBuffer) writeVarString(s string) {
|
|
wb.writeVarInt(int64(len(s)))
|
|
wb.WriteString(s)
|
|
}
|
|
|
|
func (wb *writeBuffer) writeNullableString(s *string) {
|
|
if s == nil {
|
|
wb.writeInt16(-1)
|
|
} else {
|
|
wb.writeString(*s)
|
|
}
|
|
}
|
|
|
|
func (wb *writeBuffer) writeBytes(b []byte) {
|
|
n := len(b)
|
|
if b == nil {
|
|
n = -1
|
|
}
|
|
wb.writeInt32(int32(n))
|
|
wb.Write(b)
|
|
}
|
|
|
|
func (wb *writeBuffer) writeVarBytes(b []byte) {
|
|
if b != nil {
|
|
wb.writeVarInt(int64(len(b)))
|
|
wb.Write(b)
|
|
} else {
|
|
//-1 is used to indicate nil key
|
|
wb.writeVarInt(-1)
|
|
}
|
|
}
|
|
|
|
func (wb *writeBuffer) writeBool(b bool) {
|
|
v := int8(0)
|
|
if b {
|
|
v = 1
|
|
}
|
|
wb.writeInt8(v)
|
|
}
|
|
|
|
func (wb *writeBuffer) writeArrayLen(n int) {
|
|
wb.writeInt32(int32(n))
|
|
}
|
|
|
|
func (wb *writeBuffer) writeArray(n int, f func(int)) {
|
|
wb.writeArrayLen(n)
|
|
for i := 0; i < n; i++ {
|
|
f(i)
|
|
}
|
|
}
|
|
|
|
func (wb *writeBuffer) writeVarArray(n int, f func(int)) {
|
|
wb.writeVarInt(int64(n))
|
|
for i := 0; i < n; i++ {
|
|
f(i)
|
|
}
|
|
}
|
|
|
|
func (wb *writeBuffer) writeStringArray(a []string) {
|
|
wb.writeArray(len(a), func(i int) { wb.writeString(a[i]) })
|
|
}
|
|
|
|
func (wb *writeBuffer) writeInt32Array(a []int32) {
|
|
wb.writeArray(len(a), func(i int) { wb.writeInt32(a[i]) })
|
|
}
|
|
|
|
func (wb *writeBuffer) write(a interface{}) {
|
|
switch v := a.(type) {
|
|
case int8:
|
|
wb.writeInt8(v)
|
|
case int16:
|
|
wb.writeInt16(v)
|
|
case int32:
|
|
wb.writeInt32(v)
|
|
case int64:
|
|
wb.writeInt64(v)
|
|
case string:
|
|
wb.writeString(v)
|
|
case []byte:
|
|
wb.writeBytes(v)
|
|
case bool:
|
|
wb.writeBool(v)
|
|
case writable:
|
|
v.writeTo(wb)
|
|
default:
|
|
panic(fmt.Sprintf("unsupported type: %T", a))
|
|
}
|
|
}
|
|
|
|
func (wb *writeBuffer) Write(b []byte) (int, error) {
|
|
return wb.w.Write(b)
|
|
}
|
|
|
|
func (wb *writeBuffer) WriteString(s string) (int, error) {
|
|
return io.WriteString(wb.w, s)
|
|
}
|
|
|
|
func (wb *writeBuffer) Flush() error {
|
|
if x, ok := wb.w.(interface{ Flush() error }); ok {
|
|
return x.Flush()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
type writable interface {
|
|
writeTo(*writeBuffer)
|
|
}
|
|
|
|
func (wb *writeBuffer) writeFetchRequestV2(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration) error {
|
|
h := requestHeader{
|
|
ApiKey: int16(fetch),
|
|
ApiVersion: int16(v2),
|
|
CorrelationID: correlationID,
|
|
ClientID: clientID,
|
|
}
|
|
h.Size = (h.size() - 4) +
|
|
4 + // replica ID
|
|
4 + // max wait time
|
|
4 + // min bytes
|
|
4 + // topic array length
|
|
sizeofString(topic) +
|
|
4 + // partition array length
|
|
4 + // partition
|
|
8 + // offset
|
|
4 // max bytes
|
|
|
|
h.writeTo(wb)
|
|
wb.writeInt32(-1) // replica ID
|
|
wb.writeInt32(milliseconds(maxWait))
|
|
wb.writeInt32(int32(minBytes))
|
|
|
|
// topic array
|
|
wb.writeArrayLen(1)
|
|
wb.writeString(topic)
|
|
|
|
// partition array
|
|
wb.writeArrayLen(1)
|
|
wb.writeInt32(partition)
|
|
wb.writeInt64(offset)
|
|
wb.writeInt32(int32(maxBytes))
|
|
|
|
return wb.Flush()
|
|
}
|
|
|
|
func (wb *writeBuffer) writeFetchRequestV5(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error {
|
|
h := requestHeader{
|
|
ApiKey: int16(fetch),
|
|
ApiVersion: int16(v5),
|
|
CorrelationID: correlationID,
|
|
ClientID: clientID,
|
|
}
|
|
h.Size = (h.size() - 4) +
|
|
4 + // replica ID
|
|
4 + // max wait time
|
|
4 + // min bytes
|
|
4 + // max bytes
|
|
1 + // isolation level
|
|
4 + // topic array length
|
|
sizeofString(topic) +
|
|
4 + // partition array length
|
|
4 + // partition
|
|
8 + // offset
|
|
8 + // log start offset
|
|
4 // max bytes
|
|
|
|
h.writeTo(wb)
|
|
wb.writeInt32(-1) // replica ID
|
|
wb.writeInt32(milliseconds(maxWait))
|
|
wb.writeInt32(int32(minBytes))
|
|
wb.writeInt32(int32(maxBytes))
|
|
wb.writeInt8(isolationLevel) // isolation level 0 - read uncommitted
|
|
|
|
// topic array
|
|
wb.writeArrayLen(1)
|
|
wb.writeString(topic)
|
|
|
|
// partition array
|
|
wb.writeArrayLen(1)
|
|
wb.writeInt32(partition)
|
|
wb.writeInt64(offset)
|
|
wb.writeInt64(int64(0)) // log start offset only used when is sent by follower
|
|
wb.writeInt32(int32(maxBytes))
|
|
|
|
return wb.Flush()
|
|
}
|
|
|
|
func (wb *writeBuffer) writeFetchRequestV10(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error {
|
|
h := requestHeader{
|
|
ApiKey: int16(fetch),
|
|
ApiVersion: int16(v10),
|
|
CorrelationID: correlationID,
|
|
ClientID: clientID,
|
|
}
|
|
h.Size = (h.size() - 4) +
|
|
4 + // replica ID
|
|
4 + // max wait time
|
|
4 + // min bytes
|
|
4 + // max bytes
|
|
1 + // isolation level
|
|
4 + // session ID
|
|
4 + // session epoch
|
|
4 + // topic array length
|
|
sizeofString(topic) +
|
|
4 + // partition array length
|
|
4 + // partition
|
|
4 + // current leader epoch
|
|
8 + // fetch offset
|
|
8 + // log start offset
|
|
4 + // partition max bytes
|
|
4 // forgotten topics data
|
|
|
|
h.writeTo(wb)
|
|
wb.writeInt32(-1) // replica ID
|
|
wb.writeInt32(milliseconds(maxWait))
|
|
wb.writeInt32(int32(minBytes))
|
|
wb.writeInt32(int32(maxBytes))
|
|
wb.writeInt8(isolationLevel) // isolation level 0 - read uncommitted
|
|
wb.writeInt32(0) //FIXME
|
|
wb.writeInt32(-1) //FIXME
|
|
|
|
// topic array
|
|
wb.writeArrayLen(1)
|
|
wb.writeString(topic)
|
|
|
|
// partition array
|
|
wb.writeArrayLen(1)
|
|
wb.writeInt32(partition)
|
|
wb.writeInt32(-1) //FIXME
|
|
wb.writeInt64(offset)
|
|
wb.writeInt64(int64(0)) // log start offset only used when is sent by follower
|
|
wb.writeInt32(int32(maxBytes))
|
|
|
|
// forgotten topics array
|
|
wb.writeArrayLen(0) // forgotten topics not supported yet
|
|
|
|
return wb.Flush()
|
|
}
|
|
|
|
func (wb *writeBuffer) writeListOffsetRequestV1(correlationID int32, clientID, topic string, partition int32, time int64) error {
|
|
h := requestHeader{
|
|
ApiKey: int16(listOffsets),
|
|
ApiVersion: int16(v1),
|
|
CorrelationID: correlationID,
|
|
ClientID: clientID,
|
|
}
|
|
h.Size = (h.size() - 4) +
|
|
4 + // replica ID
|
|
4 + // topic array length
|
|
sizeofString(topic) + // topic
|
|
4 + // partition array length
|
|
4 + // partition
|
|
8 // time
|
|
|
|
h.writeTo(wb)
|
|
wb.writeInt32(-1) // replica ID
|
|
|
|
// topic array
|
|
wb.writeArrayLen(1)
|
|
wb.writeString(topic)
|
|
|
|
// partition array
|
|
wb.writeArrayLen(1)
|
|
wb.writeInt32(partition)
|
|
wb.writeInt64(time)
|
|
|
|
return wb.Flush()
|
|
}
|
|
|
|
func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) (err error) {
|
|
var size int32
|
|
var attributes int8
|
|
var compressed *bytes.Buffer
|
|
|
|
if codec == nil {
|
|
size = messageSetSize(msgs...)
|
|
} else {
|
|
compressed, attributes, size, err = compressMessageSet(codec, msgs...)
|
|
if err != nil {
|
|
return
|
|
}
|
|
msgs = []Message{{Value: compressed.Bytes()}}
|
|
}
|
|
|
|
h := requestHeader{
|
|
ApiKey: int16(produce),
|
|
ApiVersion: int16(v2),
|
|
CorrelationID: correlationID,
|
|
ClientID: clientID,
|
|
}
|
|
h.Size = (h.size() - 4) +
|
|
2 + // required acks
|
|
4 + // timeout
|
|
4 + // topic array length
|
|
sizeofString(topic) + // topic
|
|
4 + // partition array length
|
|
4 + // partition
|
|
4 + // message set size
|
|
size
|
|
|
|
h.writeTo(wb)
|
|
wb.writeInt16(requiredAcks) // required acks
|
|
wb.writeInt32(milliseconds(timeout))
|
|
|
|
// topic array
|
|
wb.writeArrayLen(1)
|
|
wb.writeString(topic)
|
|
|
|
// partition array
|
|
wb.writeArrayLen(1)
|
|
wb.writeInt32(partition)
|
|
|
|
wb.writeInt32(size)
|
|
cw := &crc32Writer{table: crc32.IEEETable}
|
|
|
|
for _, msg := range msgs {
|
|
wb.writeMessage(msg.Offset, attributes, msg.Time, msg.Key, msg.Value, cw)
|
|
}
|
|
|
|
releaseBuffer(compressed)
|
|
return wb.Flush()
|
|
}
|
|
|
|
func (wb *writeBuffer) writeProduceRequestV3(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {
|
|
|
|
h := requestHeader{
|
|
ApiKey: int16(produce),
|
|
ApiVersion: int16(v3),
|
|
CorrelationID: correlationID,
|
|
ClientID: clientID,
|
|
}
|
|
|
|
h.Size = (h.size() - 4) +
|
|
sizeofNullableString(transactionalID) +
|
|
2 + // required acks
|
|
4 + // timeout
|
|
4 + // topic array length
|
|
sizeofString(topic) + // topic
|
|
4 + // partition array length
|
|
4 + // partition
|
|
4 + // message set size
|
|
recordBatch.size
|
|
|
|
h.writeTo(wb)
|
|
wb.writeNullableString(transactionalID)
|
|
wb.writeInt16(requiredAcks) // required acks
|
|
wb.writeInt32(milliseconds(timeout))
|
|
|
|
// topic array
|
|
wb.writeArrayLen(1)
|
|
wb.writeString(topic)
|
|
|
|
// partition array
|
|
wb.writeArrayLen(1)
|
|
wb.writeInt32(partition)
|
|
|
|
recordBatch.writeTo(wb)
|
|
|
|
return wb.Flush()
|
|
}
|
|
|
|
func (wb *writeBuffer) writeProduceRequestV7(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {
|
|
|
|
h := requestHeader{
|
|
ApiKey: int16(produce),
|
|
ApiVersion: int16(v7),
|
|
CorrelationID: correlationID,
|
|
ClientID: clientID,
|
|
}
|
|
h.Size = (h.size() - 4) +
|
|
sizeofNullableString(transactionalID) +
|
|
2 + // required acks
|
|
4 + // timeout
|
|
4 + // topic array length
|
|
sizeofString(topic) + // topic
|
|
4 + // partition array length
|
|
4 + // partition
|
|
4 + // message set size
|
|
recordBatch.size
|
|
|
|
h.writeTo(wb)
|
|
wb.writeNullableString(transactionalID)
|
|
wb.writeInt16(requiredAcks) // required acks
|
|
wb.writeInt32(milliseconds(timeout))
|
|
|
|
// topic array
|
|
wb.writeArrayLen(1)
|
|
wb.writeString(topic)
|
|
|
|
// partition array
|
|
wb.writeArrayLen(1)
|
|
wb.writeInt32(partition)
|
|
|
|
recordBatch.writeTo(wb)
|
|
|
|
return wb.Flush()
|
|
}
|
|
|
|
func (wb *writeBuffer) writeRecordBatch(attributes int16, size int32, count int, baseTime, lastTime time.Time, write func(*writeBuffer)) {
|
|
var (
|
|
baseTimestamp = timestamp(baseTime)
|
|
lastTimestamp = timestamp(lastTime)
|
|
lastOffsetDelta = int32(count - 1)
|
|
producerID = int64(-1) // default producer id for now
|
|
producerEpoch = int16(-1) // default producer epoch for now
|
|
baseSequence = int32(-1) // default base sequence
|
|
recordCount = int32(count) // record count
|
|
writerBackup = wb.w
|
|
)
|
|
|
|
// dry run to compute the checksum
|
|
cw := &crc32Writer{table: crc32.MakeTable(crc32.Castagnoli)}
|
|
wb.w = cw
|
|
cw.writeInt16(attributes) // attributes, timestamp type 0 - create time, not part of a transaction, no control messages
|
|
cw.writeInt32(lastOffsetDelta)
|
|
cw.writeInt64(baseTimestamp)
|
|
cw.writeInt64(lastTimestamp)
|
|
cw.writeInt64(producerID)
|
|
cw.writeInt16(producerEpoch)
|
|
cw.writeInt32(baseSequence)
|
|
cw.writeInt32(recordCount)
|
|
write(wb)
|
|
wb.w = writerBackup
|
|
|
|
// actual write to the output buffer
|
|
wb.writeInt64(int64(0))
|
|
wb.writeInt32(int32(size - 12)) // 12 = batch length + base offset sizes
|
|
wb.writeInt32(-1) // partition leader epoch
|
|
wb.writeInt8(2) // magic byte
|
|
wb.writeInt32(int32(cw.crc32))
|
|
|
|
wb.writeInt16(attributes)
|
|
wb.writeInt32(lastOffsetDelta)
|
|
wb.writeInt64(baseTimestamp)
|
|
wb.writeInt64(lastTimestamp)
|
|
wb.writeInt64(producerID)
|
|
wb.writeInt16(producerEpoch)
|
|
wb.writeInt32(baseSequence)
|
|
wb.writeInt32(recordCount)
|
|
write(wb)
|
|
}
|
|
|
|
func compressMessageSet(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int8, size int32, err error) {
|
|
compressed = acquireBuffer()
|
|
compressor := codec.NewWriter(compressed)
|
|
wb := &writeBuffer{w: compressor}
|
|
cw := &crc32Writer{table: crc32.IEEETable}
|
|
|
|
for offset, msg := range msgs {
|
|
wb.writeMessage(int64(offset), 0, msg.Time, msg.Key, msg.Value, cw)
|
|
}
|
|
|
|
if err = compressor.Close(); err != nil {
|
|
releaseBuffer(compressed)
|
|
return
|
|
}
|
|
|
|
attributes = codec.Code()
|
|
size = messageSetSize(Message{Value: compressed.Bytes()})
|
|
return
|
|
}
|
|
|
|
func (wb *writeBuffer) writeMessage(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer) {
|
|
const magicByte = 1 // compatible with kafka 0.10.0.0+
|
|
|
|
timestamp := timestamp(time)
|
|
size := messageSize(key, value)
|
|
|
|
// dry run to compute the checksum
|
|
cw.crc32 = 0
|
|
cw.writeInt8(magicByte)
|
|
cw.writeInt8(attributes)
|
|
cw.writeInt64(timestamp)
|
|
cw.writeBytes(key)
|
|
cw.writeBytes(value)
|
|
|
|
// actual write to the output buffer
|
|
wb.writeInt64(offset)
|
|
wb.writeInt32(size)
|
|
wb.writeInt32(int32(cw.crc32))
|
|
wb.writeInt8(magicByte)
|
|
wb.writeInt8(attributes)
|
|
wb.writeInt64(timestamp)
|
|
wb.writeBytes(key)
|
|
wb.writeBytes(value)
|
|
}
|
|
|
|
// Messages with magic >2 are called records. This method writes messages using message format 2.
|
|
func (wb *writeBuffer) writeRecord(attributes int8, baseTime time.Time, offset int64, msg Message) {
|
|
timestampDelta := msg.Time.Sub(baseTime)
|
|
offsetDelta := int64(offset)
|
|
|
|
wb.writeVarInt(int64(recordSize(&msg, timestampDelta, offsetDelta)))
|
|
wb.writeInt8(attributes)
|
|
wb.writeVarInt(int64(milliseconds(timestampDelta)))
|
|
wb.writeVarInt(offsetDelta)
|
|
|
|
wb.writeVarBytes(msg.Key)
|
|
wb.writeVarBytes(msg.Value)
|
|
wb.writeVarArray(len(msg.Headers), func(i int) {
|
|
h := &msg.Headers[i]
|
|
wb.writeVarString(h.Key)
|
|
wb.writeVarBytes(h.Value)
|
|
})
|
|
}
|
|
|
|
func varIntLen(i int64) int {
|
|
u := uint64((i << 1) ^ (i >> 63)) // zig-zag encoding
|
|
n := 0
|
|
|
|
for u >= 0x80 {
|
|
u >>= 7
|
|
n++
|
|
}
|
|
|
|
return n + 1
|
|
}
|
|
|
|
func varBytesLen(b []byte) int {
|
|
return varIntLen(int64(len(b))) + len(b)
|
|
}
|
|
|
|
func varStringLen(s string) int {
|
|
return varIntLen(int64(len(s))) + len(s)
|
|
}
|
|
|
|
func varArrayLen(n int, f func(int) int) int {
|
|
size := varIntLen(int64(n))
|
|
for i := 0; i < n; i++ {
|
|
size += f(i)
|
|
}
|
|
return size
|
|
}
|
|
|
|
func messageSize(key, value []byte) int32 {
|
|
return 4 + // crc
|
|
1 + // magic byte
|
|
1 + // attributes
|
|
8 + // timestamp
|
|
sizeofBytes(key) +
|
|
sizeofBytes(value)
|
|
}
|
|
|
|
func messageSetSize(msgs ...Message) (size int32) {
|
|
for _, msg := range msgs {
|
|
size += 8 + // offset
|
|
4 + // message size
|
|
4 + // crc
|
|
1 + // magic byte
|
|
1 + // attributes
|
|
8 + // timestamp
|
|
sizeofBytes(msg.Key) +
|
|
sizeofBytes(msg.Value)
|
|
}
|
|
return
|
|
}
|
|
|