You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
286 lines
7.2 KiB
286 lines
7.2 KiB
package kafka
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go/protocol/listoffsets"
|
|
)
|
|
|
|
// OffsetRequest represents a request to retrieve a single partition offset.
|
|
type OffsetRequest struct {
|
|
Partition int
|
|
Timestamp int64
|
|
}
|
|
|
|
// FirstOffsetOf constructs an OffsetRequest which asks for the first offset of
|
|
// the parition given as argument.
|
|
func FirstOffsetOf(partition int) OffsetRequest {
|
|
return OffsetRequest{Partition: partition, Timestamp: FirstOffset}
|
|
}
|
|
|
|
// LastOffsetOf constructs an OffsetRequest which asks for the last offset of
|
|
// the partition given as argument.
|
|
func LastOffsetOf(partition int) OffsetRequest {
|
|
return OffsetRequest{Partition: partition, Timestamp: LastOffset}
|
|
}
|
|
|
|
// TimeOffsetOf constructs an OffsetRequest which asks for a partition offset
|
|
// at a given time.
|
|
func TimeOffsetOf(partition int, at time.Time) OffsetRequest {
|
|
return OffsetRequest{Partition: partition, Timestamp: timestamp(at)}
|
|
}
|
|
|
|
// PartitionOffsets carries information about offsets available in a topic
|
|
// partition.
|
|
type PartitionOffsets struct {
|
|
Partition int
|
|
FirstOffset int64
|
|
LastOffset int64
|
|
Offsets map[int64]time.Time
|
|
Error error
|
|
}
|
|
|
|
// ListOffsetsRequest represents a request sent to a kafka broker to list of the
|
|
// offsets of topic partitions.
|
|
type ListOffsetsRequest struct {
|
|
// Address of the kafka broker to send the request to.
|
|
Addr net.Addr
|
|
|
|
// A mapping of topic names to list of partitions that the program wishes to
|
|
// get the offsets for.
|
|
Topics map[string][]OffsetRequest
|
|
|
|
// The isolation level for the request.
|
|
//
|
|
// Defaults to ReadUncommitted.
|
|
//
|
|
// This field requires the kafka broker to support the ListOffsets API in
|
|
// version 2 or above (otherwise the value is ignored).
|
|
IsolationLevel IsolationLevel
|
|
}
|
|
|
|
// ListOffsetsResponse represents a response from a kafka broker to a offset
|
|
// listing request.
|
|
type ListOffsetsResponse struct {
|
|
// The amount of time that the broker throttled the request.
|
|
Throttle time.Duration
|
|
|
|
// Mappings of topics names to partition offsets, there will be one entry
|
|
// for each topic in the request.
|
|
Topics map[string][]PartitionOffsets
|
|
}
|
|
|
|
// ListOffsets sends an offset request to a kafka broker and returns the
|
|
// response.
|
|
func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error) {
|
|
type topicPartition struct {
|
|
topic string
|
|
partition int
|
|
}
|
|
|
|
partitionOffsets := make(map[topicPartition]PartitionOffsets)
|
|
|
|
for topicName, requests := range req.Topics {
|
|
for _, r := range requests {
|
|
key := topicPartition{
|
|
topic: topicName,
|
|
partition: r.Partition,
|
|
}
|
|
|
|
partition, ok := partitionOffsets[key]
|
|
if !ok {
|
|
partition = PartitionOffsets{
|
|
Partition: r.Partition,
|
|
FirstOffset: -1,
|
|
LastOffset: -1,
|
|
Offsets: make(map[int64]time.Time),
|
|
}
|
|
}
|
|
|
|
switch r.Timestamp {
|
|
case FirstOffset:
|
|
partition.FirstOffset = 0
|
|
case LastOffset:
|
|
partition.LastOffset = 0
|
|
}
|
|
|
|
partitionOffsets[topicPartition{
|
|
topic: topicName,
|
|
partition: r.Partition,
|
|
}] = partition
|
|
}
|
|
}
|
|
|
|
topics := make([]listoffsets.RequestTopic, 0, len(req.Topics))
|
|
|
|
for topicName, requests := range req.Topics {
|
|
partitions := make([]listoffsets.RequestPartition, len(requests))
|
|
|
|
for i, r := range requests {
|
|
partitions[i] = listoffsets.RequestPartition{
|
|
Partition: int32(r.Partition),
|
|
CurrentLeaderEpoch: -1,
|
|
Timestamp: r.Timestamp,
|
|
}
|
|
}
|
|
|
|
topics = append(topics, listoffsets.RequestTopic{
|
|
Topic: topicName,
|
|
Partitions: partitions,
|
|
})
|
|
}
|
|
|
|
m, err := c.roundTrip(ctx, req.Addr, &listoffsets.Request{
|
|
ReplicaID: -1,
|
|
IsolationLevel: int8(req.IsolationLevel),
|
|
Topics: topics,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka.(*Client).ListOffsets: %w", err)
|
|
}
|
|
|
|
res := m.(*listoffsets.Response)
|
|
ret := &ListOffsetsResponse{
|
|
Throttle: makeDuration(res.ThrottleTimeMs),
|
|
Topics: make(map[string][]PartitionOffsets, len(res.Topics)),
|
|
}
|
|
|
|
for _, t := range res.Topics {
|
|
for _, p := range t.Partitions {
|
|
key := topicPartition{
|
|
topic: t.Topic,
|
|
partition: int(p.Partition),
|
|
}
|
|
|
|
partition := partitionOffsets[key]
|
|
|
|
switch p.Timestamp {
|
|
case FirstOffset:
|
|
partition.FirstOffset = p.Offset
|
|
case LastOffset:
|
|
partition.LastOffset = p.Offset
|
|
default:
|
|
partition.Offsets[p.Offset] = makeTime(p.Timestamp)
|
|
}
|
|
|
|
if p.ErrorCode != 0 {
|
|
partition.Error = Error(p.ErrorCode)
|
|
}
|
|
|
|
partitionOffsets[key] = partition
|
|
}
|
|
}
|
|
|
|
for key, partition := range partitionOffsets {
|
|
ret.Topics[key.topic] = append(ret.Topics[key.topic], partition)
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
type listOffsetRequestV1 struct {
|
|
ReplicaID int32
|
|
Topics []listOffsetRequestTopicV1
|
|
}
|
|
|
|
func (r listOffsetRequestV1) size() int32 {
|
|
return 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
|
|
}
|
|
|
|
func (r listOffsetRequestV1) writeTo(wb *writeBuffer) {
|
|
wb.writeInt32(r.ReplicaID)
|
|
wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
|
|
}
|
|
|
|
type listOffsetRequestTopicV1 struct {
|
|
TopicName string
|
|
Partitions []listOffsetRequestPartitionV1
|
|
}
|
|
|
|
func (t listOffsetRequestTopicV1) size() int32 {
|
|
return sizeofString(t.TopicName) +
|
|
sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
|
|
}
|
|
|
|
func (t listOffsetRequestTopicV1) writeTo(wb *writeBuffer) {
|
|
wb.writeString(t.TopicName)
|
|
wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
|
|
}
|
|
|
|
type listOffsetRequestPartitionV1 struct {
|
|
Partition int32
|
|
Time int64
|
|
}
|
|
|
|
func (p listOffsetRequestPartitionV1) size() int32 {
|
|
return 4 + 8
|
|
}
|
|
|
|
func (p listOffsetRequestPartitionV1) writeTo(wb *writeBuffer) {
|
|
wb.writeInt32(p.Partition)
|
|
wb.writeInt64(p.Time)
|
|
}
|
|
|
|
type listOffsetResponseV1 []listOffsetResponseTopicV1
|
|
|
|
func (r listOffsetResponseV1) size() int32 {
|
|
return sizeofArray(len(r), func(i int) int32 { return r[i].size() })
|
|
}
|
|
|
|
func (r listOffsetResponseV1) writeTo(wb *writeBuffer) {
|
|
wb.writeArray(len(r), func(i int) { r[i].writeTo(wb) })
|
|
}
|
|
|
|
type listOffsetResponseTopicV1 struct {
|
|
TopicName string
|
|
PartitionOffsets []partitionOffsetV1
|
|
}
|
|
|
|
func (t listOffsetResponseTopicV1) size() int32 {
|
|
return sizeofString(t.TopicName) +
|
|
sizeofArray(len(t.PartitionOffsets), func(i int) int32 { return t.PartitionOffsets[i].size() })
|
|
}
|
|
|
|
func (t listOffsetResponseTopicV1) writeTo(wb *writeBuffer) {
|
|
wb.writeString(t.TopicName)
|
|
wb.writeArray(len(t.PartitionOffsets), func(i int) { t.PartitionOffsets[i].writeTo(wb) })
|
|
}
|
|
|
|
type partitionOffsetV1 struct {
|
|
Partition int32
|
|
ErrorCode int16
|
|
Timestamp int64
|
|
Offset int64
|
|
}
|
|
|
|
func (p partitionOffsetV1) size() int32 {
|
|
return 4 + 2 + 8 + 8
|
|
}
|
|
|
|
func (p partitionOffsetV1) writeTo(wb *writeBuffer) {
|
|
wb.writeInt32(p.Partition)
|
|
wb.writeInt16(p.ErrorCode)
|
|
wb.writeInt64(p.Timestamp)
|
|
wb.writeInt64(p.Offset)
|
|
}
|
|
|
|
func (p *partitionOffsetV1) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
|
|
if remain, err = readInt32(r, sz, &p.Partition); err != nil {
|
|
return
|
|
}
|
|
if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
|
|
return
|
|
}
|
|
if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
|
|
return
|
|
}
|
|
if remain, err = readInt64(r, remain, &p.Offset); err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|