You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
106 lines
2.8 KiB
106 lines
2.8 KiB
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go/protocol/offsetdelete"
|
|
)
|
|
|
|
// OffsetDelete deletes the offset for a consumer group on a particular topic
|
|
// for a particular partition.
|
|
type OffsetDelete struct {
|
|
Topic string
|
|
Partition int
|
|
}
|
|
|
|
// OffsetDeleteRequest represents a request sent to a kafka broker to delete
|
|
// the offsets for a partition on a given topic associated with a consumer group.
|
|
type OffsetDeleteRequest struct {
|
|
// Address of the kafka broker to send the request to.
|
|
Addr net.Addr
|
|
|
|
// ID of the consumer group to delete the offsets for.
|
|
GroupID string
|
|
|
|
// Set of topic partitions to delete offsets for.
|
|
Topics map[string][]int
|
|
}
|
|
|
|
// OffsetDeleteResponse represents a response from a kafka broker to a delete
|
|
// offset request.
|
|
type OffsetDeleteResponse struct {
|
|
// An error that may have occurred while attempting to delete an offset
|
|
Error error
|
|
|
|
// The amount of time that the broker throttled the request.
|
|
Throttle time.Duration
|
|
|
|
// Set of topic partitions that the kafka broker has additional info (error?)
|
|
// for.
|
|
Topics map[string][]OffsetDeletePartition
|
|
}
|
|
|
|
// OffsetDeletePartition represents the state of a status of a partition in response
|
|
// to deleting offsets.
|
|
type OffsetDeletePartition struct {
|
|
// ID of the partition.
|
|
Partition int
|
|
|
|
// An error that may have occurred while attempting to delete an offset for
|
|
// this partition.
|
|
Error error
|
|
}
|
|
|
|
// OffsetDelete sends a delete offset request to a kafka broker and returns the
|
|
// response.
|
|
func (c *Client) OffsetDelete(ctx context.Context, req *OffsetDeleteRequest) (*OffsetDeleteResponse, error) {
|
|
topics := make([]offsetdelete.RequestTopic, 0, len(req.Topics))
|
|
|
|
for topicName, partitionIndexes := range req.Topics {
|
|
partitions := make([]offsetdelete.RequestPartition, len(partitionIndexes))
|
|
|
|
for i, c := range partitionIndexes {
|
|
partitions[i] = offsetdelete.RequestPartition{
|
|
PartitionIndex: int32(c),
|
|
}
|
|
}
|
|
|
|
topics = append(topics, offsetdelete.RequestTopic{
|
|
Name: topicName,
|
|
Partitions: partitions,
|
|
})
|
|
}
|
|
|
|
m, err := c.roundTrip(ctx, req.Addr, &offsetdelete.Request{
|
|
GroupID: req.GroupID,
|
|
Topics: topics,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka.(*Client).OffsetDelete: %w", err)
|
|
}
|
|
r := m.(*offsetdelete.Response)
|
|
|
|
res := &OffsetDeleteResponse{
|
|
Error: makeError(r.ErrorCode, ""),
|
|
Throttle: makeDuration(r.ThrottleTimeMs),
|
|
Topics: make(map[string][]OffsetDeletePartition, len(r.Topics)),
|
|
}
|
|
|
|
for _, topic := range r.Topics {
|
|
partitions := make([]OffsetDeletePartition, len(topic.Partitions))
|
|
|
|
for i, p := range topic.Partitions {
|
|
partitions[i] = OffsetDeletePartition{
|
|
Partition: int(p.PartitionIndex),
|
|
Error: makeError(p.ErrorCode, ""),
|
|
}
|
|
}
|
|
|
|
res.Topics[topic.Name] = partitions
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|