package kafka import ( "bufio" "context" "fmt" "net" "time" "github.com/segmentio/kafka-go/protocol/offsetcommit" ) // OffsetCommit represent the commit of an offset to a partition. // // The extra metadata is opaque to the kafka protocol, it is intended to hold // information like an identifier for the process that committed the offset, // or the time at which the commit was made. type OffsetCommit struct { Partition int Offset int64 Metadata string } // OffsetCommitRequest represents a request sent to a kafka broker to commit // offsets for a partition. type OffsetCommitRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // ID of the consumer group to publish the offsets for. GroupID string // ID of the consumer group generation. GenerationID int // ID of the group member submitting the offsets. MemberID string // ID of the group instance. InstanceID string // Set of topic partitions to publish the offsets for. // // Not that offset commits need to be submitted to the broker acting as the // group coordinator. This will be automatically resolved by the transport. Topics map[string][]OffsetCommit } // OffsetFetchResponse represents a response from a kafka broker to an offset // commit request. type OffsetCommitResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Set of topic partitions that the kafka broker has accepted offset commits // for. Topics map[string][]OffsetCommitPartition } // OffsetFetchPartition represents the state of a single partition in responses // to committing offsets. type OffsetCommitPartition struct { // ID of the partition. Partition int // An error that may have occurred while attempting to publish consumer // group offsets for this partition. // // The error contains both the kafka error code, and an error message // returned by the kafka broker. Programs may use the standard errors.Is // function to test the error against kafka error codes. Error error } // OffsetCommit sends an offset commit request to a kafka broker and returns the // response. func (c *Client) OffsetCommit(ctx context.Context, req *OffsetCommitRequest) (*OffsetCommitResponse, error) { now := time.Now().UnixNano() / int64(time.Millisecond) topics := make([]offsetcommit.RequestTopic, 0, len(req.Topics)) for topicName, commits := range req.Topics { partitions := make([]offsetcommit.RequestPartition, len(commits)) for i, c := range commits { partitions[i] = offsetcommit.RequestPartition{ PartitionIndex: int32(c.Partition), CommittedOffset: c.Offset, CommittedMetadata: c.Metadata, // This field existed in v1 of the OffsetCommit API, setting it // to the current timestamp is probably a safe thing to do, but // it is hard to tell. CommitTimestamp: now, } } topics = append(topics, offsetcommit.RequestTopic{ Name: topicName, Partitions: partitions, }) } m, err := c.roundTrip(ctx, req.Addr, &offsetcommit.Request{ GroupID: req.GroupID, GenerationID: int32(req.GenerationID), MemberID: req.MemberID, GroupInstanceID: req.InstanceID, Topics: topics, // Hardcoded retention; this field existed between v2 and v4 of the // OffsetCommit API, we would have to figure out a way to give the // client control over the API version being used to support configuring // it in the request object. RetentionTimeMs: int64((24 * time.Hour) / time.Millisecond), }) if err != nil { return nil, fmt.Errorf("kafka.(*Client).OffsetCommit: %w", err) } r := m.(*offsetcommit.Response) res := &OffsetCommitResponse{ Throttle: makeDuration(r.ThrottleTimeMs), Topics: make(map[string][]OffsetCommitPartition, len(r.Topics)), } for _, topic := range r.Topics { partitions := make([]OffsetCommitPartition, len(topic.Partitions)) for i, p := range topic.Partitions { partitions[i] = OffsetCommitPartition{ Partition: int(p.PartitionIndex), Error: makeError(p.ErrorCode, ""), } } res.Topics[topic.Name] = partitions } return res, nil } type offsetCommitRequestV2Partition struct { // Partition ID Partition int32 // Offset to be committed Offset int64 // Metadata holds any associated metadata the client wants to keep Metadata string } func (t offsetCommitRequestV2Partition) size() int32 { return sizeofInt32(t.Partition) + sizeofInt64(t.Offset) + sizeofString(t.Metadata) } func (t offsetCommitRequestV2Partition) writeTo(wb *writeBuffer) { wb.writeInt32(t.Partition) wb.writeInt64(t.Offset) wb.writeString(t.Metadata) } type offsetCommitRequestV2Topic struct { // Topic name Topic string // Partitions to commit offsets Partitions []offsetCommitRequestV2Partition } func (t offsetCommitRequestV2Topic) size() int32 { return sizeofString(t.Topic) + sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() }) } func (t offsetCommitRequestV2Topic) writeTo(wb *writeBuffer) { wb.writeString(t.Topic) wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) }) } type offsetCommitRequestV2 struct { // GroupID holds the unique group identifier GroupID string // GenerationID holds the generation of the group. GenerationID int32 // MemberID assigned by the group coordinator MemberID string // RetentionTime holds the time period in ms to retain the offset. RetentionTime int64 // Topics to commit offsets Topics []offsetCommitRequestV2Topic } func (t offsetCommitRequestV2) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.GenerationID) + sizeofString(t.MemberID) + sizeofInt64(t.RetentionTime) + sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) } func (t offsetCommitRequestV2) writeTo(wb *writeBuffer) { wb.writeString(t.GroupID) wb.writeInt32(t.GenerationID) wb.writeString(t.MemberID) wb.writeInt64(t.RetentionTime) wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) }) } type offsetCommitResponseV2PartitionResponse struct { Partition int32 // ErrorCode holds response error code ErrorCode int16 } func (t offsetCommitResponseV2PartitionResponse) size() int32 { return sizeofInt32(t.Partition) + sizeofInt16(t.ErrorCode) } func (t offsetCommitResponseV2PartitionResponse) writeTo(wb *writeBuffer) { wb.writeInt32(t.Partition) wb.writeInt16(t.ErrorCode) } func (t *offsetCommitResponseV2PartitionResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readInt32(r, size, &t.Partition); err != nil { return } if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { return } return } type offsetCommitResponseV2Response struct { Topic string PartitionResponses []offsetCommitResponseV2PartitionResponse } func (t offsetCommitResponseV2Response) size() int32 { return sizeofString(t.Topic) + sizeofArray(len(t.PartitionResponses), func(i int) int32 { return t.PartitionResponses[i].size() }) } func (t offsetCommitResponseV2Response) writeTo(wb *writeBuffer) { wb.writeString(t.Topic) wb.writeArray(len(t.PartitionResponses), func(i int) { t.PartitionResponses[i].writeTo(wb) }) } func (t *offsetCommitResponseV2Response) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.Topic); err != nil { return } fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) { item := offsetCommitResponseV2PartitionResponse{} if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil { return } t.PartitionResponses = append(t.PartitionResponses, item) return } if remain, err = readArrayWith(r, remain, fn); err != nil { return } return } type offsetCommitResponseV2 struct { Responses []offsetCommitResponseV2Response } func (t offsetCommitResponseV2) size() int32 { return sizeofArray(len(t.Responses), func(i int) int32 { return t.Responses[i].size() }) } func (t offsetCommitResponseV2) writeTo(wb *writeBuffer) { wb.writeArray(len(t.Responses), func(i int) { t.Responses[i].writeTo(wb) }) } func (t *offsetCommitResponseV2) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) { item := offsetCommitResponseV2Response{} if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil { return } t.Responses = append(t.Responses, item) return } if remain, err = readArrayWith(r, size, fn); err != nil { return } return }