You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

614 lines
14 KiB

package kafka
import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
"time"
)
type writeBuffer struct {
w io.Writer
b [16]byte
}
func (wb *writeBuffer) writeInt8(i int8) {
wb.b[0] = byte(i)
wb.Write(wb.b[:1])
}
func (wb *writeBuffer) writeInt16(i int16) {
binary.BigEndian.PutUint16(wb.b[:2], uint16(i))
wb.Write(wb.b[:2])
}
func (wb *writeBuffer) writeInt32(i int32) {
binary.BigEndian.PutUint32(wb.b[:4], uint32(i))
wb.Write(wb.b[:4])
}
func (wb *writeBuffer) writeInt64(i int64) {
binary.BigEndian.PutUint64(wb.b[:8], uint64(i))
wb.Write(wb.b[:8])
}
func (wb *writeBuffer) writeVarInt(i int64) {
u := uint64((i << 1) ^ (i >> 63))
n := 0
for u >= 0x80 && n < len(wb.b) {
wb.b[n] = byte(u) | 0x80
u >>= 7
n++
}
if n < len(wb.b) {
wb.b[n] = byte(u)
n++
}
wb.Write(wb.b[:n])
}
func (wb *writeBuffer) writeString(s string) {
wb.writeInt16(int16(len(s)))
wb.WriteString(s)
}
func (wb *writeBuffer) writeVarString(s string) {
wb.writeVarInt(int64(len(s)))
wb.WriteString(s)
}
func (wb *writeBuffer) writeNullableString(s *string) {
if s == nil {
wb.writeInt16(-1)
} else {
wb.writeString(*s)
}
}
func (wb *writeBuffer) writeBytes(b []byte) {
n := len(b)
if b == nil {
n = -1
}
wb.writeInt32(int32(n))
wb.Write(b)
}
func (wb *writeBuffer) writeVarBytes(b []byte) {
if b != nil {
wb.writeVarInt(int64(len(b)))
wb.Write(b)
} else {
//-1 is used to indicate nil key
wb.writeVarInt(-1)
}
}
func (wb *writeBuffer) writeBool(b bool) {
v := int8(0)
if b {
v = 1
}
wb.writeInt8(v)
}
func (wb *writeBuffer) writeArrayLen(n int) {
wb.writeInt32(int32(n))
}
func (wb *writeBuffer) writeArray(n int, f func(int)) {
wb.writeArrayLen(n)
for i := 0; i < n; i++ {
f(i)
}
}
func (wb *writeBuffer) writeVarArray(n int, f func(int)) {
wb.writeVarInt(int64(n))
for i := 0; i < n; i++ {
f(i)
}
}
func (wb *writeBuffer) writeStringArray(a []string) {
wb.writeArray(len(a), func(i int) { wb.writeString(a[i]) })
}
func (wb *writeBuffer) writeInt32Array(a []int32) {
wb.writeArray(len(a), func(i int) { wb.writeInt32(a[i]) })
}
func (wb *writeBuffer) write(a interface{}) {
switch v := a.(type) {
case int8:
wb.writeInt8(v)
case int16:
wb.writeInt16(v)
case int32:
wb.writeInt32(v)
case int64:
wb.writeInt64(v)
case string:
wb.writeString(v)
case []byte:
wb.writeBytes(v)
case bool:
wb.writeBool(v)
case writable:
v.writeTo(wb)
default:
panic(fmt.Sprintf("unsupported type: %T", a))
}
}
func (wb *writeBuffer) Write(b []byte) (int, error) {
return wb.w.Write(b)
}
func (wb *writeBuffer) WriteString(s string) (int, error) {
return io.WriteString(wb.w, s)
}
func (wb *writeBuffer) Flush() error {
if x, ok := wb.w.(interface{ Flush() error }); ok {
return x.Flush()
}
return nil
}
type writable interface {
writeTo(*writeBuffer)
}
func (wb *writeBuffer) writeFetchRequestV2(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration) error {
h := requestHeader{
ApiKey: int16(fetch),
ApiVersion: int16(v2),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
4 + // replica ID
4 + // max wait time
4 + // min bytes
4 + // topic array length
sizeofString(topic) +
4 + // partition array length
4 + // partition
8 + // offset
4 // max bytes
h.writeTo(wb)
wb.writeInt32(-1) // replica ID
wb.writeInt32(milliseconds(maxWait))
wb.writeInt32(int32(minBytes))
// topic array
wb.writeArrayLen(1)
wb.writeString(topic)
// partition array
wb.writeArrayLen(1)
wb.writeInt32(partition)
wb.writeInt64(offset)
wb.writeInt32(int32(maxBytes))
return wb.Flush()
}
func (wb *writeBuffer) writeFetchRequestV5(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error {
h := requestHeader{
ApiKey: int16(fetch),
ApiVersion: int16(v5),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
4 + // replica ID
4 + // max wait time
4 + // min bytes
4 + // max bytes
1 + // isolation level
4 + // topic array length
sizeofString(topic) +
4 + // partition array length
4 + // partition
8 + // offset
8 + // log start offset
4 // max bytes
h.writeTo(wb)
wb.writeInt32(-1) // replica ID
wb.writeInt32(milliseconds(maxWait))
wb.writeInt32(int32(minBytes))
wb.writeInt32(int32(maxBytes))
wb.writeInt8(isolationLevel) // isolation level 0 - read uncommitted
// topic array
wb.writeArrayLen(1)
wb.writeString(topic)
// partition array
wb.writeArrayLen(1)
wb.writeInt32(partition)
wb.writeInt64(offset)
wb.writeInt64(int64(0)) // log start offset only used when is sent by follower
wb.writeInt32(int32(maxBytes))
return wb.Flush()
}
func (wb *writeBuffer) writeFetchRequestV10(correlationID int32, clientID, topic string, partition int32, offset int64, minBytes, maxBytes int, maxWait time.Duration, isolationLevel int8) error {
h := requestHeader{
ApiKey: int16(fetch),
ApiVersion: int16(v10),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
4 + // replica ID
4 + // max wait time
4 + // min bytes
4 + // max bytes
1 + // isolation level
4 + // session ID
4 + // session epoch
4 + // topic array length
sizeofString(topic) +
4 + // partition array length
4 + // partition
4 + // current leader epoch
8 + // fetch offset
8 + // log start offset
4 + // partition max bytes
4 // forgotten topics data
h.writeTo(wb)
wb.writeInt32(-1) // replica ID
wb.writeInt32(milliseconds(maxWait))
wb.writeInt32(int32(minBytes))
wb.writeInt32(int32(maxBytes))
wb.writeInt8(isolationLevel) // isolation level 0 - read uncommitted
wb.writeInt32(0) //FIXME
wb.writeInt32(-1) //FIXME
// topic array
wb.writeArrayLen(1)
wb.writeString(topic)
// partition array
wb.writeArrayLen(1)
wb.writeInt32(partition)
wb.writeInt32(-1) //FIXME
wb.writeInt64(offset)
wb.writeInt64(int64(0)) // log start offset only used when is sent by follower
wb.writeInt32(int32(maxBytes))
// forgotten topics array
wb.writeArrayLen(0) // forgotten topics not supported yet
return wb.Flush()
}
func (wb *writeBuffer) writeListOffsetRequestV1(correlationID int32, clientID, topic string, partition int32, time int64) error {
h := requestHeader{
ApiKey: int16(listOffsets),
ApiVersion: int16(v1),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
4 + // replica ID
4 + // topic array length
sizeofString(topic) + // topic
4 + // partition array length
4 + // partition
8 // time
h.writeTo(wb)
wb.writeInt32(-1) // replica ID
// topic array
wb.writeArrayLen(1)
wb.writeString(topic)
// partition array
wb.writeArrayLen(1)
wb.writeInt32(partition)
wb.writeInt64(time)
return wb.Flush()
}
func (wb *writeBuffer) writeProduceRequestV2(codec CompressionCodec, correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, msgs ...Message) (err error) {
var size int32
var attributes int8
var compressed *bytes.Buffer
if codec == nil {
size = messageSetSize(msgs...)
} else {
compressed, attributes, size, err = compressMessageSet(codec, msgs...)
if err != nil {
return
}
msgs = []Message{{Value: compressed.Bytes()}}
}
h := requestHeader{
ApiKey: int16(produce),
ApiVersion: int16(v2),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
2 + // required acks
4 + // timeout
4 + // topic array length
sizeofString(topic) + // topic
4 + // partition array length
4 + // partition
4 + // message set size
size
h.writeTo(wb)
wb.writeInt16(requiredAcks) // required acks
wb.writeInt32(milliseconds(timeout))
// topic array
wb.writeArrayLen(1)
wb.writeString(topic)
// partition array
wb.writeArrayLen(1)
wb.writeInt32(partition)
wb.writeInt32(size)
cw := &crc32Writer{table: crc32.IEEETable}
for _, msg := range msgs {
wb.writeMessage(msg.Offset, attributes, msg.Time, msg.Key, msg.Value, cw)
}
releaseBuffer(compressed)
return wb.Flush()
}
func (wb *writeBuffer) writeProduceRequestV3(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {
h := requestHeader{
ApiKey: int16(produce),
ApiVersion: int16(v3),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
sizeofNullableString(transactionalID) +
2 + // required acks
4 + // timeout
4 + // topic array length
sizeofString(topic) + // topic
4 + // partition array length
4 + // partition
4 + // message set size
recordBatch.size
h.writeTo(wb)
wb.writeNullableString(transactionalID)
wb.writeInt16(requiredAcks) // required acks
wb.writeInt32(milliseconds(timeout))
// topic array
wb.writeArrayLen(1)
wb.writeString(topic)
// partition array
wb.writeArrayLen(1)
wb.writeInt32(partition)
recordBatch.writeTo(wb)
return wb.Flush()
}
func (wb *writeBuffer) writeProduceRequestV7(correlationID int32, clientID, topic string, partition int32, timeout time.Duration, requiredAcks int16, transactionalID *string, recordBatch *recordBatch) (err error) {
h := requestHeader{
ApiKey: int16(produce),
ApiVersion: int16(v7),
CorrelationID: correlationID,
ClientID: clientID,
}
h.Size = (h.size() - 4) +
sizeofNullableString(transactionalID) +
2 + // required acks
4 + // timeout
4 + // topic array length
sizeofString(topic) + // topic
4 + // partition array length
4 + // partition
4 + // message set size
recordBatch.size
h.writeTo(wb)
wb.writeNullableString(transactionalID)
wb.writeInt16(requiredAcks) // required acks
wb.writeInt32(milliseconds(timeout))
// topic array
wb.writeArrayLen(1)
wb.writeString(topic)
// partition array
wb.writeArrayLen(1)
wb.writeInt32(partition)
recordBatch.writeTo(wb)
return wb.Flush()
}
func (wb *writeBuffer) writeRecordBatch(attributes int16, size int32, count int, baseTime, lastTime time.Time, write func(*writeBuffer)) {
var (
baseTimestamp = timestamp(baseTime)
lastTimestamp = timestamp(lastTime)
lastOffsetDelta = int32(count - 1)
producerID = int64(-1) // default producer id for now
producerEpoch = int16(-1) // default producer epoch for now
baseSequence = int32(-1) // default base sequence
recordCount = int32(count) // record count
writerBackup = wb.w
)
// dry run to compute the checksum
cw := &crc32Writer{table: crc32.MakeTable(crc32.Castagnoli)}
wb.w = cw
cw.writeInt16(attributes) // attributes, timestamp type 0 - create time, not part of a transaction, no control messages
cw.writeInt32(lastOffsetDelta)
cw.writeInt64(baseTimestamp)
cw.writeInt64(lastTimestamp)
cw.writeInt64(producerID)
cw.writeInt16(producerEpoch)
cw.writeInt32(baseSequence)
cw.writeInt32(recordCount)
write(wb)
wb.w = writerBackup
// actual write to the output buffer
wb.writeInt64(int64(0))
wb.writeInt32(int32(size - 12)) // 12 = batch length + base offset sizes
wb.writeInt32(-1) // partition leader epoch
wb.writeInt8(2) // magic byte
wb.writeInt32(int32(cw.crc32))
wb.writeInt16(attributes)
wb.writeInt32(lastOffsetDelta)
wb.writeInt64(baseTimestamp)
wb.writeInt64(lastTimestamp)
wb.writeInt64(producerID)
wb.writeInt16(producerEpoch)
wb.writeInt32(baseSequence)
wb.writeInt32(recordCount)
write(wb)
}
func compressMessageSet(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int8, size int32, err error) {
compressed = acquireBuffer()
compressor := codec.NewWriter(compressed)
wb := &writeBuffer{w: compressor}
cw := &crc32Writer{table: crc32.IEEETable}
for offset, msg := range msgs {
wb.writeMessage(int64(offset), 0, msg.Time, msg.Key, msg.Value, cw)
}
if err = compressor.Close(); err != nil {
releaseBuffer(compressed)
return
}
attributes = codec.Code()
size = messageSetSize(Message{Value: compressed.Bytes()})
return
}
func (wb *writeBuffer) writeMessage(offset int64, attributes int8, time time.Time, key, value []byte, cw *crc32Writer) {
const magicByte = 1 // compatible with kafka 0.10.0.0+
timestamp := timestamp(time)
size := messageSize(key, value)
// dry run to compute the checksum
cw.crc32 = 0
cw.writeInt8(magicByte)
cw.writeInt8(attributes)
cw.writeInt64(timestamp)
cw.writeBytes(key)
cw.writeBytes(value)
// actual write to the output buffer
wb.writeInt64(offset)
wb.writeInt32(size)
wb.writeInt32(int32(cw.crc32))
wb.writeInt8(magicByte)
wb.writeInt8(attributes)
wb.writeInt64(timestamp)
wb.writeBytes(key)
wb.writeBytes(value)
}
// Messages with magic >2 are called records. This method writes messages using message format 2.
func (wb *writeBuffer) writeRecord(attributes int8, baseTime time.Time, offset int64, msg Message) {
timestampDelta := msg.Time.Sub(baseTime)
offsetDelta := int64(offset)
wb.writeVarInt(int64(recordSize(&msg, timestampDelta, offsetDelta)))
wb.writeInt8(attributes)
wb.writeVarInt(int64(milliseconds(timestampDelta)))
wb.writeVarInt(offsetDelta)
wb.writeVarBytes(msg.Key)
wb.writeVarBytes(msg.Value)
wb.writeVarArray(len(msg.Headers), func(i int) {
h := &msg.Headers[i]
wb.writeVarString(h.Key)
wb.writeVarBytes(h.Value)
})
}
func varIntLen(i int64) int {
u := uint64((i << 1) ^ (i >> 63)) // zig-zag encoding
n := 0
for u >= 0x80 {
u >>= 7
n++
}
return n + 1
}
func varBytesLen(b []byte) int {
return varIntLen(int64(len(b))) + len(b)
}
func varStringLen(s string) int {
return varIntLen(int64(len(s))) + len(s)
}
func varArrayLen(n int, f func(int) int) int {
size := varIntLen(int64(n))
for i := 0; i < n; i++ {
size += f(i)
}
return size
}
func messageSize(key, value []byte) int32 {
return 4 + // crc
1 + // magic byte
1 + // attributes
8 + // timestamp
sizeofBytes(key) +
sizeofBytes(value)
}
func messageSetSize(msgs ...Message) (size int32) {
for _, msg := range msgs {
size += 8 + // offset
4 + // message size
4 + // crc
1 + // magic byte
1 + // attributes
8 + // timestamp
sizeofBytes(msg.Key) +
sizeofBytes(msg.Value)
}
return
}