You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

108 lines
2.5 KiB

package kafka
import (
"bytes"
"time"
)
const recordBatchHeaderSize int32 = 0 +
8 + // base offset
4 + // batch length
4 + // partition leader epoch
1 + // magic
4 + // crc
2 + // attributes
4 + // last offset delta
8 + // first timestamp
8 + // max timestamp
8 + // producer id
2 + // producer epoch
4 + // base sequence
4 // msg count
func recordBatchSize(msgs ...Message) (size int32) {
size = recordBatchHeaderSize
baseTime := msgs[0].Time
for i := range msgs {
msg := &msgs[i]
msz := recordSize(msg, msg.Time.Sub(baseTime), int64(i))
size += int32(msz + varIntLen(int64(msz)))
}
return
}
func compressRecordBatch(codec CompressionCodec, msgs ...Message) (compressed *bytes.Buffer, attributes int16, size int32, err error) {
compressed = acquireBuffer()
compressor := codec.NewWriter(compressed)
wb := &writeBuffer{w: compressor}
for i, msg := range msgs {
wb.writeRecord(0, msgs[0].Time, int64(i), msg)
}
if err = compressor.Close(); err != nil {
releaseBuffer(compressed)
return
}
attributes = int16(codec.Code())
size = recordBatchHeaderSize + int32(compressed.Len())
return
}
type recordBatch struct {
// required input parameters
codec CompressionCodec
attributes int16
msgs []Message
// parameters calculated during init
compressed *bytes.Buffer
size int32
}
func newRecordBatch(codec CompressionCodec, msgs ...Message) (r *recordBatch, err error) {
r = &recordBatch{
codec: codec,
msgs: msgs,
}
if r.codec == nil {
r.size = recordBatchSize(r.msgs...)
} else {
r.compressed, r.attributes, r.size, err = compressRecordBatch(r.codec, r.msgs...)
}
return
}
func (r *recordBatch) writeTo(wb *writeBuffer) {
wb.writeInt32(r.size)
baseTime := r.msgs[0].Time
lastTime := r.msgs[len(r.msgs)-1].Time
if r.compressed != nil {
wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
wb.Write(r.compressed.Bytes())
})
releaseBuffer(r.compressed)
} else {
wb.writeRecordBatch(r.attributes, r.size, len(r.msgs), baseTime, lastTime, func(wb *writeBuffer) {
for i, msg := range r.msgs {
wb.writeRecord(0, r.msgs[0].Time, int64(i), msg)
}
})
}
}
func recordSize(msg *Message, timestampDelta time.Duration, offsetDelta int64) int {
return 1 + // attributes
varIntLen(int64(milliseconds(timestampDelta))) +
varIntLen(offsetDelta) +
varBytesLen(msg.Key) +
varBytesLen(msg.Value) +
varArrayLen(len(msg.Headers), func(i int) int {
h := &msg.Headers[i]
return varStringLen(h.Key) + varBytesLen(h.Value)
})
}