You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

323 lines
8.4 KiB

package kafka
import (
"bufio"
"context"
"encoding"
"errors"
"fmt"
"net"
"strconv"
"time"
"github.com/segmentio/kafka-go/protocol"
produceAPI "github.com/segmentio/kafka-go/protocol/produce"
)
type RequiredAcks int
const (
RequireNone RequiredAcks = 0
RequireOne RequiredAcks = 1
RequireAll RequiredAcks = -1
)
func (acks RequiredAcks) String() string {
switch acks {
case RequireNone:
return "none"
case RequireOne:
return "one"
case RequireAll:
return "all"
default:
return "unknown"
}
}
func (acks RequiredAcks) MarshalText() ([]byte, error) {
return []byte(acks.String()), nil
}
func (acks *RequiredAcks) UnmarshalText(b []byte) error {
switch string(b) {
case "none":
*acks = RequireNone
case "one":
*acks = RequireOne
case "all":
*acks = RequireAll
default:
x, err := strconv.ParseInt(string(b), 10, 64)
parsed := RequiredAcks(x)
if err != nil || (parsed != RequireNone && parsed != RequireOne && parsed != RequireAll) {
return fmt.Errorf("required acks must be one of none, one, or all, not %q", b)
}
*acks = parsed
}
return nil
}
var (
_ encoding.TextMarshaler = RequiredAcks(0)
_ encoding.TextUnmarshaler = (*RequiredAcks)(nil)
)
// ProduceRequest represents a request sent to a kafka broker to produce records
// to a topic partition.
type ProduceRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// The topic to produce the records to.
Topic string
// The partition to produce the records to.
Partition int
// The level of required acknowledgements to ask the kafka broker for.
RequiredAcks RequiredAcks
// The message format version used when encoding the records.
//
// By default, the client automatically determine which version should be
// used based on the version of the Produce API supported by the server.
MessageVersion int
// An optional transaction id when producing to the kafka broker is part of
// a transaction.
TransactionalID string
// The sequence of records to produce to the topic partition.
Records RecordReader
// An optional compression algorithm to apply to the batch of records sent
// to the kafka broker.
Compression Compression
}
// ProduceResponse represents a response from a kafka broker to a produce
// request.
type ProduceResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
// An error that may have occurred while attempting to produce the records.
//
// The error contains both the kafka error code, and an error message
// returned by the kafka broker. Programs may use the standard errors.Is
// function to test the error against kafka error codes.
Error error
// Offset of the first record that was written to the topic partition.
//
// This field will be zero if the kafka broker did no support the Produce
// API in version 3 or above.
BaseOffset int64
// Time at which the broker wrote the records to the topic partition.
//
// This field will be zero if the kafka broker did no support the Produce
// API in version 2 or above.
LogAppendTime time.Time
// First offset in the topic partition that the records were written to.
//
// This field will be zero if the kafka broker did no support the Produce
// API in version 5 or above (or if the first offset is zero).
LogStartOffset int64
// If errors occurred writing specific records, they will be reported in
// this map.
//
// This field will always be empty if the kafka broker did no support the
// Produce API in version 8 or above.
RecordErrors map[int]error
}
// Produce sends a produce request to a kafka broker and returns the response.
//
// If the request contained no records, an error wrapping protocol.ErrNoRecord
// is returned.
//
// When the request is configured with RequiredAcks=none, both the response and
// the error will be nil on success.
func (c *Client) Produce(ctx context.Context, req *ProduceRequest) (*ProduceResponse, error) {
attributes := protocol.Attributes(req.Compression) & 0x7
m, err := c.roundTrip(ctx, req.Addr, &produceAPI.Request{
TransactionalID: req.TransactionalID,
Acks: int16(req.RequiredAcks),
Timeout: c.timeoutMs(ctx, defaultProduceTimeout),
Topics: []produceAPI.RequestTopic{{
Topic: req.Topic,
Partitions: []produceAPI.RequestPartition{{
Partition: int32(req.Partition),
RecordSet: protocol.RecordSet{
Attributes: attributes,
Records: req.Records,
},
}},
}},
})
switch {
case err == nil:
case errors.Is(err, protocol.ErrNoRecord):
return new(ProduceResponse), nil
default:
return nil, fmt.Errorf("kafka.(*Client).Produce: %w", err)
}
if req.RequiredAcks == RequireNone {
return nil, nil
}
res := m.(*produceAPI.Response)
if len(res.Topics) == 0 {
return nil, fmt.Errorf("kafka.(*Client).Produce: %w", protocol.ErrNoTopic)
}
topic := &res.Topics[0]
if len(topic.Partitions) == 0 {
return nil, fmt.Errorf("kafka.(*Client).Produce: %w", protocol.ErrNoPartition)
}
partition := &topic.Partitions[0]
ret := &ProduceResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Error: makeError(partition.ErrorCode, partition.ErrorMessage),
BaseOffset: partition.BaseOffset,
LogAppendTime: makeTime(partition.LogAppendTime),
LogStartOffset: partition.LogStartOffset,
}
if len(partition.RecordErrors) != 0 {
ret.RecordErrors = make(map[int]error, len(partition.RecordErrors))
for _, recErr := range partition.RecordErrors {
ret.RecordErrors[int(recErr.BatchIndex)] = errors.New(recErr.BatchIndexErrorMessage)
}
}
return ret, nil
}
type produceRequestV2 struct {
RequiredAcks int16
Timeout int32
Topics []produceRequestTopicV2
}
func (r produceRequestV2) size() int32 {
return 2 + 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
}
func (r produceRequestV2) writeTo(wb *writeBuffer) {
wb.writeInt16(r.RequiredAcks)
wb.writeInt32(r.Timeout)
wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}
type produceRequestTopicV2 struct {
TopicName string
Partitions []produceRequestPartitionV2
}
func (t produceRequestTopicV2) size() int32 {
return sizeofString(t.TopicName) +
sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
}
func (t produceRequestTopicV2) writeTo(wb *writeBuffer) {
wb.writeString(t.TopicName)
wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}
type produceRequestPartitionV2 struct {
Partition int32
MessageSetSize int32
MessageSet messageSet
}
func (p produceRequestPartitionV2) size() int32 {
return 4 + 4 + p.MessageSet.size()
}
func (p produceRequestPartitionV2) writeTo(wb *writeBuffer) {
wb.writeInt32(p.Partition)
wb.writeInt32(p.MessageSetSize)
p.MessageSet.writeTo(wb)
}
type produceResponsePartitionV2 struct {
Partition int32
ErrorCode int16
Offset int64
Timestamp int64
}
func (p produceResponsePartitionV2) size() int32 {
return 4 + 2 + 8 + 8
}
func (p produceResponsePartitionV2) writeTo(wb *writeBuffer) {
wb.writeInt32(p.Partition)
wb.writeInt16(p.ErrorCode)
wb.writeInt64(p.Offset)
wb.writeInt64(p.Timestamp)
}
func (p *produceResponsePartitionV2) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
if remain, err = readInt32(r, sz, &p.Partition); err != nil {
return
}
if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
return
}
if remain, err = readInt64(r, remain, &p.Offset); err != nil {
return
}
if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
return
}
return
}
type produceResponsePartitionV7 struct {
Partition int32
ErrorCode int16
Offset int64
Timestamp int64
StartOffset int64
}
func (p produceResponsePartitionV7) size() int32 {
return 4 + 2 + 8 + 8 + 8
}
func (p produceResponsePartitionV7) writeTo(wb *writeBuffer) {
wb.writeInt32(p.Partition)
wb.writeInt16(p.ErrorCode)
wb.writeInt64(p.Offset)
wb.writeInt64(p.Timestamp)
wb.writeInt64(p.StartOffset)
}
func (p *produceResponsePartitionV7) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
if remain, err = readInt32(r, sz, &p.Partition); err != nil {
return
}
if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
return
}
if remain, err = readInt64(r, remain, &p.Offset); err != nil {
return
}
if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
return
}
if remain, err = readInt64(r, remain, &p.StartOffset); err != nil {
return
}
return
}