You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
287 lines
7.4 KiB
287 lines
7.4 KiB
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
metadataAPI "github.com/segmentio/kafka-go/protocol/metadata"
|
|
)
|
|
|
|
// MetadataRequest represents a request sent to a kafka broker to retrieve its
|
|
// cluster metadata.
|
|
type MetadataRequest struct {
|
|
// Address of the kafka broker to send the request to.
|
|
Addr net.Addr
|
|
|
|
// The list of topics to retrieve metadata for.
|
|
Topics []string
|
|
}
|
|
|
|
// MetadatResponse represents a response from a kafka broker to a metadata
|
|
// request.
|
|
type MetadataResponse struct {
|
|
// The amount of time that the broker throttled the request.
|
|
Throttle time.Duration
|
|
|
|
// Name of the kafka cluster that client retrieved metadata from.
|
|
ClusterID string
|
|
|
|
// The broker which is currently the controller for the cluster.
|
|
Controller Broker
|
|
|
|
// The list of brokers registered to the cluster.
|
|
Brokers []Broker
|
|
|
|
// The list of topics available on the cluster.
|
|
Topics []Topic
|
|
}
|
|
|
|
// Metadata sends a metadata request to a kafka broker and returns the response.
|
|
func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error) {
|
|
m, err := c.roundTrip(ctx, req.Addr, &metadataAPI.Request{
|
|
TopicNames: req.Topics,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka.(*Client).Metadata: %w", err)
|
|
}
|
|
|
|
res := m.(*metadataAPI.Response)
|
|
ret := &MetadataResponse{
|
|
Throttle: makeDuration(res.ThrottleTimeMs),
|
|
Brokers: make([]Broker, len(res.Brokers)),
|
|
Topics: make([]Topic, len(res.Topics)),
|
|
ClusterID: res.ClusterID,
|
|
}
|
|
|
|
brokers := make(map[int32]Broker, len(res.Brokers))
|
|
|
|
for i, b := range res.Brokers {
|
|
broker := Broker{
|
|
Host: b.Host,
|
|
Port: int(b.Port),
|
|
ID: int(b.NodeID),
|
|
Rack: b.Rack,
|
|
}
|
|
|
|
ret.Brokers[i] = broker
|
|
brokers[b.NodeID] = broker
|
|
|
|
if b.NodeID == res.ControllerID {
|
|
ret.Controller = broker
|
|
}
|
|
}
|
|
|
|
for i, t := range res.Topics {
|
|
ret.Topics[i] = Topic{
|
|
Name: t.Name,
|
|
Internal: t.IsInternal,
|
|
Partitions: make([]Partition, len(t.Partitions)),
|
|
Error: makeError(t.ErrorCode, ""),
|
|
}
|
|
|
|
for j, p := range t.Partitions {
|
|
partition := Partition{
|
|
Topic: t.Name,
|
|
ID: int(p.PartitionIndex),
|
|
Leader: brokers[p.LeaderID],
|
|
Replicas: make([]Broker, len(p.ReplicaNodes)),
|
|
Isr: make([]Broker, len(p.IsrNodes)),
|
|
Error: makeError(p.ErrorCode, ""),
|
|
}
|
|
|
|
for i, id := range p.ReplicaNodes {
|
|
partition.Replicas[i] = brokers[id]
|
|
}
|
|
|
|
for i, id := range p.IsrNodes {
|
|
partition.Isr[i] = brokers[id]
|
|
}
|
|
|
|
ret.Topics[i].Partitions[j] = partition
|
|
}
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
type topicMetadataRequestV1 []string
|
|
|
|
func (r topicMetadataRequestV1) size() int32 {
|
|
return sizeofStringArray([]string(r))
|
|
}
|
|
|
|
func (r topicMetadataRequestV1) writeTo(wb *writeBuffer) {
|
|
// communicate nil-ness to the broker by passing -1 as the array length.
|
|
// for this particular request, the broker interpets a zero length array
|
|
// as a request for no topics whereas a nil array is for all topics.
|
|
if r == nil {
|
|
wb.writeArrayLen(-1)
|
|
} else {
|
|
wb.writeStringArray([]string(r))
|
|
}
|
|
}
|
|
|
|
type metadataResponseV1 struct {
|
|
Brokers []brokerMetadataV1
|
|
ControllerID int32
|
|
Topics []topicMetadataV1
|
|
}
|
|
|
|
func (r metadataResponseV1) size() int32 {
|
|
n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() })
|
|
n2 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
|
|
return 4 + n1 + n2
|
|
}
|
|
|
|
func (r metadataResponseV1) writeTo(wb *writeBuffer) {
|
|
wb.writeArray(len(r.Brokers), func(i int) { r.Brokers[i].writeTo(wb) })
|
|
wb.writeInt32(r.ControllerID)
|
|
wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
|
|
}
|
|
|
|
type brokerMetadataV1 struct {
|
|
NodeID int32
|
|
Host string
|
|
Port int32
|
|
Rack string
|
|
}
|
|
|
|
func (b brokerMetadataV1) size() int32 {
|
|
return 4 + 4 + sizeofString(b.Host) + sizeofString(b.Rack)
|
|
}
|
|
|
|
func (b brokerMetadataV1) writeTo(wb *writeBuffer) {
|
|
wb.writeInt32(b.NodeID)
|
|
wb.writeString(b.Host)
|
|
wb.writeInt32(b.Port)
|
|
wb.writeString(b.Rack)
|
|
}
|
|
|
|
type topicMetadataV1 struct {
|
|
TopicErrorCode int16
|
|
TopicName string
|
|
Internal bool
|
|
Partitions []partitionMetadataV1
|
|
}
|
|
|
|
func (t topicMetadataV1) size() int32 {
|
|
return 2 + 1 +
|
|
sizeofString(t.TopicName) +
|
|
sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
|
|
}
|
|
|
|
func (t topicMetadataV1) writeTo(wb *writeBuffer) {
|
|
wb.writeInt16(t.TopicErrorCode)
|
|
wb.writeString(t.TopicName)
|
|
wb.writeBool(t.Internal)
|
|
wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
|
|
}
|
|
|
|
type partitionMetadataV1 struct {
|
|
PartitionErrorCode int16
|
|
PartitionID int32
|
|
Leader int32
|
|
Replicas []int32
|
|
Isr []int32
|
|
}
|
|
|
|
func (p partitionMetadataV1) size() int32 {
|
|
return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr)
|
|
}
|
|
|
|
func (p partitionMetadataV1) writeTo(wb *writeBuffer) {
|
|
wb.writeInt16(p.PartitionErrorCode)
|
|
wb.writeInt32(p.PartitionID)
|
|
wb.writeInt32(p.Leader)
|
|
wb.writeInt32Array(p.Replicas)
|
|
wb.writeInt32Array(p.Isr)
|
|
}
|
|
|
|
type topicMetadataRequestV6 struct {
|
|
Topics []string
|
|
AllowAutoTopicCreation bool
|
|
}
|
|
|
|
func (r topicMetadataRequestV6) size() int32 {
|
|
return sizeofStringArray([]string(r.Topics)) + 1
|
|
}
|
|
|
|
func (r topicMetadataRequestV6) writeTo(wb *writeBuffer) {
|
|
// communicate nil-ness to the broker by passing -1 as the array length.
|
|
// for this particular request, the broker interpets a zero length array
|
|
// as a request for no topics whereas a nil array is for all topics.
|
|
if r.Topics == nil {
|
|
wb.writeArrayLen(-1)
|
|
} else {
|
|
wb.writeStringArray([]string(r.Topics))
|
|
}
|
|
wb.writeBool(r.AllowAutoTopicCreation)
|
|
}
|
|
|
|
type metadataResponseV6 struct {
|
|
ThrottleTimeMs int32
|
|
Brokers []brokerMetadataV1
|
|
ClusterId string
|
|
ControllerID int32
|
|
Topics []topicMetadataV6
|
|
}
|
|
|
|
func (r metadataResponseV6) size() int32 {
|
|
n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() })
|
|
n2 := sizeofNullableString(&r.ClusterId)
|
|
n3 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
|
|
return 4 + 4 + n1 + n2 + n3
|
|
}
|
|
|
|
func (r metadataResponseV6) writeTo(wb *writeBuffer) {
|
|
wb.writeInt32(r.ThrottleTimeMs)
|
|
wb.writeArray(len(r.Brokers), func(i int) { r.Brokers[i].writeTo(wb) })
|
|
wb.writeString(r.ClusterId)
|
|
wb.writeInt32(r.ControllerID)
|
|
wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
|
|
}
|
|
|
|
type topicMetadataV6 struct {
|
|
TopicErrorCode int16
|
|
TopicName string
|
|
Internal bool
|
|
Partitions []partitionMetadataV6
|
|
}
|
|
|
|
func (t topicMetadataV6) size() int32 {
|
|
return 2 + 1 +
|
|
sizeofString(t.TopicName) +
|
|
sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
|
|
}
|
|
|
|
func (t topicMetadataV6) writeTo(wb *writeBuffer) {
|
|
wb.writeInt16(t.TopicErrorCode)
|
|
wb.writeString(t.TopicName)
|
|
wb.writeBool(t.Internal)
|
|
wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
|
|
}
|
|
|
|
type partitionMetadataV6 struct {
|
|
PartitionErrorCode int16
|
|
PartitionID int32
|
|
Leader int32
|
|
Replicas []int32
|
|
Isr []int32
|
|
OfflineReplicas []int32
|
|
}
|
|
|
|
func (p partitionMetadataV6) size() int32 {
|
|
return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr) + sizeofInt32Array(p.OfflineReplicas)
|
|
}
|
|
|
|
func (p partitionMetadataV6) writeTo(wb *writeBuffer) {
|
|
wb.writeInt16(p.PartitionErrorCode)
|
|
wb.writeInt32(p.PartitionID)
|
|
wb.writeInt32(p.Leader)
|
|
wb.writeInt32Array(p.Replicas)
|
|
wb.writeInt32Array(p.Isr)
|
|
wb.writeInt32Array(p.OfflineReplicas)
|
|
}
|
|
|