You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

116 lines
2.2 KiB

package kafka
import (
"time"
)
// Message is a data structure representing kafka messages.
type Message struct {
// Topic indicates which topic this message was consumed from via Reader.
//
// When being used with Writer, this can be used to configure the topic if
// not already specified on the writer itself.
Topic string
// Partition is read-only and MUST NOT be set when writing messages
Partition int
Offset int64
HighWaterMark int64
Key []byte
Value []byte
Headers []Header
// If not set at the creation, Time will be automatically set when
// writing the message.
Time time.Time
}
func (msg Message) message(cw *crc32Writer) message {
m := message{
MagicByte: 1,
Key: msg.Key,
Value: msg.Value,
Timestamp: timestamp(msg.Time),
}
if cw != nil {
m.CRC = m.crc32(cw)
}
return m
}
const timestampSize = 8
func (msg *Message) size() int32 {
return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize
}
type message struct {
CRC int32
MagicByte int8
Attributes int8
Timestamp int64
Key []byte
Value []byte
}
func (m message) crc32(cw *crc32Writer) int32 {
cw.crc32 = 0
cw.writeInt8(m.MagicByte)
cw.writeInt8(m.Attributes)
if m.MagicByte != 0 {
cw.writeInt64(m.Timestamp)
}
cw.writeBytes(m.Key)
cw.writeBytes(m.Value)
return int32(cw.crc32)
}
func (m message) size() int32 {
size := 4 + 1 + 1 + sizeofBytes(m.Key) + sizeofBytes(m.Value)
if m.MagicByte != 0 {
size += timestampSize
}
return size
}
func (m message) writeTo(wb *writeBuffer) {
wb.writeInt32(m.CRC)
wb.writeInt8(m.MagicByte)
wb.writeInt8(m.Attributes)
if m.MagicByte != 0 {
wb.writeInt64(m.Timestamp)
}
wb.writeBytes(m.Key)
wb.writeBytes(m.Value)
}
type messageSetItem struct {
Offset int64
MessageSize int32
Message message
}
func (m messageSetItem) size() int32 {
return 8 + 4 + m.Message.size()
}
func (m messageSetItem) writeTo(wb *writeBuffer) {
wb.writeInt64(m.Offset)
wb.writeInt32(m.MessageSize)
m.Message.writeTo(wb)
}
type messageSet []messageSetItem
func (s messageSet) size() (size int32) {
for _, m := range s {
size += m.size()
}
return
}
func (s messageSet) writeTo(wb *writeBuffer) {
for _, m := range s {
m.writeTo(wb)
}
}