package kafka import ( "context" "fmt" "net" "time" "github.com/segmentio/kafka-go/protocol/txnoffsetcommit" ) // TxnOffsetCommitRequest represents a request sent to a kafka broker to commit // offsets for a partition within a transaction. type TxnOffsetCommitRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // The transactional id key. TransactionalID string // ID of the consumer group to publish the offsets for. GroupID string // The Producer ID (PID) for the current producer session; // received from an InitProducerID request. ProducerID int // The epoch associated with the current producer session for the given PID ProducerEpoch int // GenerationID is the current generation for the group. GenerationID int // ID of the group member submitting the offsets. MemberID string // GroupInstanceID is a unique identifier for the consumer. GroupInstanceID string // Set of topic partitions to publish the offsets for. // // Not that offset commits need to be submitted to the broker acting as the // group coordinator. This will be automatically resolved by the transport. Topics map[string][]TxnOffsetCommit } // TxnOffsetCommit represent the commit of an offset to a partition within a transaction. // // The extra metadata is opaque to the kafka protocol, it is intended to hold // information like an identifier for the process that committed the offset, // or the time at which the commit was made. type TxnOffsetCommit struct { Partition int Offset int64 Metadata string } // TxnOffsetFetchResponse represents a response from a kafka broker to an offset // commit request within a transaction. type TxnOffsetCommitResponse struct { // The amount of time that the broker throttled the request. Throttle time.Duration // Set of topic partitions that the kafka broker has accepted offset commits // for. Topics map[string][]TxnOffsetCommitPartition } // TxnOffsetFetchPartition represents the state of a single partition in responses // to committing offsets within a transaction. type TxnOffsetCommitPartition struct { // ID of the partition. Partition int // An error that may have occurred while attempting to publish consumer // group offsets for this partition. // // The error contains both the kafka error code, and an error message // returned by the kafka broker. Programs may use the standard errors.Is // function to test the error against kafka error codes. Error error } // TxnOffsetCommit sends an txn offset commit request to a kafka broker and returns the // response. func (c *Client) TxnOffsetCommit( ctx context.Context, req *TxnOffsetCommitRequest, ) (*TxnOffsetCommitResponse, error) { protoReq := &txnoffsetcommit.Request{ TransactionalID: req.TransactionalID, GroupID: req.GroupID, ProducerID: int64(req.ProducerID), ProducerEpoch: int16(req.ProducerEpoch), GenerationID: int32(req.GenerationID), MemberID: req.MemberID, GroupInstanceID: req.GroupInstanceID, Topics: make([]txnoffsetcommit.RequestTopic, 0, len(req.Topics)), } for topic, partitions := range req.Topics { parts := make([]txnoffsetcommit.RequestPartition, len(partitions)) for i, partition := range partitions { parts[i] = txnoffsetcommit.RequestPartition{ Partition: int32(partition.Partition), CommittedOffset: int64(partition.Offset), CommittedMetadata: partition.Metadata, } } t := txnoffsetcommit.RequestTopic{ Name: topic, Partitions: parts, } protoReq.Topics = append(protoReq.Topics, t) } m, err := c.roundTrip(ctx, req.Addr, protoReq) if err != nil { return nil, fmt.Errorf("kafka.(*Client).TxnOffsetCommit: %w", err) } r := m.(*txnoffsetcommit.Response) res := &TxnOffsetCommitResponse{ Throttle: makeDuration(r.ThrottleTimeMs), Topics: make(map[string][]TxnOffsetCommitPartition, len(r.Topics)), } for _, topic := range r.Topics { partitions := make([]TxnOffsetCommitPartition, 0, len(topic.Partitions)) for _, partition := range topic.Partitions { partitions = append(partitions, TxnOffsetCommitPartition{ Partition: int(partition.Partition), Error: makeError(partition.ErrorCode, ""), }) } res.Topics[topic.Name] = partitions } return res, nil }