You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
108 lines
2.5 KiB
108 lines
2.5 KiB
package kafka
|
|
|
|
import (
|
|
"bytes"
|
|
"time"
|
|
)
|
|
|
|
const recordBatchHeaderSize int32 = 0 +
|
|
8 + // base offset
|
|
4 + // batch length
|
|
4 + // partition leader epoch
|
|
1 + // magic
|
|
4 + // crc
|
|
2 + // attributes
|
|
4 + // last offset delta
|
|
8 + // first timestamp
|
|
8 + // max timestamp
|
|
8 + // producer id
|
|
2 + // producer epoch
|
|
4 + // base sequence
|
|
4 // msg count
|
|
|
|
func recordBatchSize(msgs ...Message) (size int32) {
|
|
size = recordBatchHeaderSize
|
|
baseTime := msgs[0].Time
|
|
|
|
for i := range msgs {
|
|
msg := &msgs[i]
|
|
msz := recordSize(msg, msg.Time.Sub(baseTime), int64(i))
|
|
size += int32(msz + varIntLen(int64(msz)))
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) {
|
|
compressed = acquireBuffer()
|
|
compressor := codec.NewWriter(compressed)
|
|
wb := &writeBuffer{w: compressor}
|
|
|
|
for i, msg := range msgs {
|
|
wb.writeRecord(0, msgs[0].Time, int64(i), msg)
|
|
}
|
|
|
|
if err = compressor.Close(); err != nil {
|
|
releaseBuffer(compressed)
|
|
return
|
|
}
|
|
|
|
attributes = int16(codec.Code())
|
|
size = recordBatchHeaderSize + int32(compressed.Len())
|
|
return
|
|
}
|
|
|
|
type recordBatch struct {
|
|
// required input parameters
|
|
codec CompressionCodec
|
|
attributes int16
|
|
msgs []Message
|
|
|
|
// parameters calculated during init
|
|
compressed *bytes.Buffer
|
|
size int32
|
|
}
|
|
|
|
func newRecordBatch(codec CompressionCodec, msgs ...Message) (r *recordBatch, err error) {
|
|
r = &recordBatch{
|
|
codec: codec,
|
|
msgs: msgs,
|
|
}
|
|
if r.codec == nil {
|
|
r.size = recordBatchSize(r.msgs...)
|
|
} else {
|
|
r.compressed, r.attributes, r.size, err = compressRecordBatch(r.codec, r.msgs...)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (r *recordBatch) writeTo(wb *writeBuffer) {
|
|
wb.writeInt32(r.size)
|
|
|
|
baseTime := r.msgs[0].Time
|
|
lastTime := r.msgs[len(r.msgs)-1].Time
|
|
if r.compressed != nil {
|
|
wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
|
|
wb.Write(r.compressed.Bytes())
|
|
})
|
|
releaseBuffer(r.compressed)
|
|
} else {
|
|
wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
|
|
for i, msg := range r.msgs {
|
|
wb.writeRecord(0, r.msgs[0].Time, int64(i), msg)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) int {
|
|
return 1 + // attributes
|
|
varIntLen(int64(milliseconds(timestampDelta))) +
|
|
varIntLen(offsetDelta) +
|
|
varBytesLen(msg.Key) +
|
|
varBytesLen(msg.Value) +
|
|
varArrayLen(len(msg.Headers), func(i int) int {
|
|
h := &msg.Headers[i]
|
|
return varStringLen(h.Key) + varBytesLen(h.Value)
|
|
})
|
|
}
|
|
|