You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

286 lines
7.2 KiB

package kafka
import (
"bufio"
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/listoffsets"
)
// OffsetRequest represents a request to retrieve a single partition offset.
type OffsetRequest struct {
Partition int
Timestamp int64
}
// FirstOffsetOf constructs an OffsetRequest which asks for the first offset of
// the parition given as argument.
func FirstOffsetOf(partition int) OffsetRequest {
return OffsetRequest{Partition: partition, Timestamp: FirstOffset}
}
// LastOffsetOf constructs an OffsetRequest which asks for the last offset of
// the partition given as argument.
func LastOffsetOf(partition int) OffsetRequest {
return OffsetRequest{Partition: partition, Timestamp: LastOffset}
}
// TimeOffsetOf constructs an OffsetRequest which asks for a partition offset
// at a given time.
func TimeOffsetOf(partition int, at time.Time) OffsetRequest {
return OffsetRequest{Partition: partition, Timestamp: timestamp(at)}
}
// PartitionOffsets carries information about offsets available in a topic
// partition.
type PartitionOffsets struct {
Partition int
FirstOffset int64
LastOffset int64
Offsets map[int64]time.Time
Error error
}
// ListOffsetsRequest represents a request sent to a kafka broker to list of the
// offsets of topic partitions.
type ListOffsetsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// A mapping of topic names to list of partitions that the program wishes to
// get the offsets for.
Topics map[string][]OffsetRequest
// The isolation level for the request.
//
// Defaults to ReadUncommitted.
//
// This field requires the kafka broker to support the ListOffsets API in
// version 2 or above (otherwise the value is ignored).
IsolationLevel IsolationLevel
}
// ListOffsetsResponse represents a response from a kafka broker to a offset
// listing request.
type ListOffsetsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
// Mappings of topics names to partition offsets, there will be one entry
// for each topic in the request.
Topics map[string][]PartitionOffsets
}
// ListOffsets sends an offset request to a kafka broker and returns the
// response.
func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error) {
type topicPartition struct {
topic string
partition int
}
partitionOffsets := make(map[topicPartition]PartitionOffsets)
for topicName, requests := range req.Topics {
for _, r := range requests {
key := topicPartition{
topic: topicName,
partition: r.Partition,
}
partition, ok := partitionOffsets[key]
if !ok {
partition = PartitionOffsets{
Partition: r.Partition,
FirstOffset: -1,
LastOffset: -1,
Offsets: make(map[int64]time.Time),
}
}
switch r.Timestamp {
case FirstOffset:
partition.FirstOffset = 0
case LastOffset:
partition.LastOffset = 0
}
partitionOffsets[topicPartition{
topic: topicName,
partition: r.Partition,
}] = partition
}
}
topics := make([]listoffsets.RequestTopic, 0, len(req.Topics))
for topicName, requests := range req.Topics {
partitions := make([]listoffsets.RequestPartition, len(requests))
for i, r := range requests {
partitions[i] = listoffsets.RequestPartition{
Partition: int32(r.Partition),
CurrentLeaderEpoch: -1,
Timestamp: r.Timestamp,
}
}
topics = append(topics, listoffsets.RequestTopic{
Topic: topicName,
Partitions: partitions,
})
}
m, err := c.roundTrip(ctx, req.Addr, &listoffsets.Request{
ReplicaID: -1,
IsolationLevel: int8(req.IsolationLevel),
Topics: topics,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).ListOffsets: %w", err)
}
res := m.(*listoffsets.Response)
ret := &ListOffsetsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Topics: make(map[string][]PartitionOffsets, len(res.Topics)),
}
for _, t := range res.Topics {
for _, p := range t.Partitions {
key := topicPartition{
topic: t.Topic,
partition: int(p.Partition),
}
partition := partitionOffsets[key]
switch p.Timestamp {
case FirstOffset:
partition.FirstOffset = p.Offset
case LastOffset:
partition.LastOffset = p.Offset
default:
partition.Offsets[p.Offset] = makeTime(p.Timestamp)
}
if p.ErrorCode != 0 {
partition.Error = Error(p.ErrorCode)
}
partitionOffsets[key] = partition
}
}
for key, partition := range partitionOffsets {
ret.Topics[key.topic] = append(ret.Topics[key.topic], partition)
}
return ret, nil
}
type listOffsetRequestV1 struct {
ReplicaID int32
Topics []listOffsetRequestTopicV1
}
func (r listOffsetRequestV1) size() int32 {
return 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
}
func (r listOffsetRequestV1) writeTo(wb *writeBuffer) {
wb.writeInt32(r.ReplicaID)
wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}
type listOffsetRequestTopicV1 struct {
TopicName string
Partitions []listOffsetRequestPartitionV1
}
func (t listOffsetRequestTopicV1) size() int32 {
return sizeofString(t.TopicName) +
sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
}
func (t listOffsetRequestTopicV1) writeTo(wb *writeBuffer) {
wb.writeString(t.TopicName)
wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}
type listOffsetRequestPartitionV1 struct {
Partition int32
Time int64
}
func (p listOffsetRequestPartitionV1) size() int32 {
return 4 + 8
}
func (p listOffsetRequestPartitionV1) writeTo(wb *writeBuffer) {
wb.writeInt32(p.Partition)
wb.writeInt64(p.Time)
}
type listOffsetResponseV1 []listOffsetResponseTopicV1
func (r listOffsetResponseV1) size() int32 {
return sizeofArray(len(r), func(i int) int32 { return r[i].size() })
}
func (r listOffsetResponseV1) writeTo(wb *writeBuffer) {
wb.writeArray(len(r), func(i int) { r[i].writeTo(wb) })
}
type listOffsetResponseTopicV1 struct {
TopicName string
PartitionOffsets []partitionOffsetV1
}
func (t listOffsetResponseTopicV1) size() int32 {
return sizeofString(t.TopicName) +
sizeofArray(len(t.PartitionOffsets), func(i int) int32 { return t.PartitionOffsets[i].size() })
}
func (t listOffsetResponseTopicV1) writeTo(wb *writeBuffer) {
wb.writeString(t.TopicName)
wb.writeArray(len(t.PartitionOffsets), func(i int) { t.PartitionOffsets[i].writeTo(wb) })
}
type partitionOffsetV1 struct {
Partition int32
ErrorCode int16
Timestamp int64
Offset int64
}
func (p partitionOffsetV1) size() int32 {
return 4 + 2 + 8 + 8
}
func (p partitionOffsetV1) writeTo(wb *writeBuffer) {
wb.writeInt32(p.Partition)
wb.writeInt16(p.ErrorCode)
wb.writeInt64(p.Timestamp)
wb.writeInt64(p.Offset)
}
func (p *partitionOffsetV1) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
if remain, err = readInt32(r, sz, &p.Partition); err != nil {
return
}
if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
return
}
if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
return
}
if remain, err = readInt64(r, remain, &p.Offset); err != nil {
return
}
return
}