package kafka import ( "context" "fmt" "net" "time" "github.com/segmentio/kafka-go/protocol/offsetdelete" ) // OffsetDelete deletes the offset for a consumer group on a particular topic // for a particular partition. type OffsetDelete struct { Topic string Partition int } // OffsetDeleteRequest represents a request sent to a kafka broker to delete // the offsets for a partition on a given topic associated with a consumer group. type OffsetDeleteRequest struct { // Address of the kafka broker to send the request to. Addr net.Addr // ID of the consumer group to delete the offsets for. GroupID string // Set of topic partitions to delete offsets for. Topics map[string][]int } // OffsetDeleteResponse represents a response from a kafka broker to a delete // offset request. type OffsetDeleteResponse struct { // An error that may have occurred while attempting to delete an offset Error error // The amount of time that the broker throttled the request. Throttle time.Duration // Set of topic partitions that the kafka broker has additional info (error?) // for. Topics map[string][]OffsetDeletePartition } // OffsetDeletePartition represents the state of a status of a partition in response // to deleting offsets. type OffsetDeletePartition struct { // ID of the partition. Partition int // An error that may have occurred while attempting to delete an offset for // this partition. Error error } // OffsetDelete sends a delete offset request to a kafka broker and returns the // response. func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error) { topics := make([]offsetdelete.RequestTopic, 0, len(req.Topics)) for topicName, partitionIndexes := range req.Topics { partitions := make([]offsetdelete.RequestPartition, len(partitionIndexes)) for i, c := range partitionIndexes { partitions[i] = offsetdelete.RequestPartition{ PartitionIndex: int32(c), } } topics = append(topics, offsetdelete.RequestTopic{ Name: topicName, Partitions: partitions, }) } m, err := c.roundTrip(ctx, req.Addr, &offsetdelete.Request{ GroupID: req.GroupID, Topics: topics, }) if err != nil { return nil, fmt.Errorf("kafka.(*Client).OffsetDelete: %w", err) } r := m.(*offsetdelete.Response) res := &OffsetDeleteResponse{ Error: makeError(r.ErrorCode, ""), Throttle: makeDuration(r.ThrottleTimeMs), Topics: make(map[string][]OffsetDeletePartition, len(r.Topics)), } for _, topic := range r.Topics { partitions := make([]OffsetDeletePartition, len(topic.Partitions)) for i, p := range topic.Partitions { partitions[i] = OffsetDeletePartition{ Partition: int(p.PartitionIndex), Error: makeError(p.ErrorCode, ""), } } res.Topics[topic.Name] = partitions } return res, nil }