You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

287 lines
7.4 KiB

package kafka
import (
"context"
"fmt"
"net"
"time"
metadataAPI "github.com/segmentio/kafka-go/protocol/metadata"
)
// MetadataRequest represents a request sent to a kafka broker to retrieve its
// cluster metadata.
type MetadataRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// The list of topics to retrieve metadata for.
Topics []string
}
// MetadatResponse represents a response from a kafka broker to a metadata
// request.
type MetadataResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
// Name of the kafka cluster that client retrieved metadata from.
ClusterID string
// The broker which is currently the controller for the cluster.
Controller Broker
// The list of brokers registered to the cluster.
Brokers []Broker
// The list of topics available on the cluster.
Topics []Topic
}
// Metadata sends a metadata request to a kafka broker and returns the response.
func (c *Client) Metadata(ctx context.Context, req *MetadataRequest) (*MetadataResponse, error) {
m, err := c.roundTrip(ctx, req.Addr, &metadataAPI.Request{
TopicNames: req.Topics,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).Metadata: %w", err)
}
res := m.(*metadataAPI.Response)
ret := &MetadataResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Brokers: make([]Broker, len(res.Brokers)),
Topics: make([]Topic, len(res.Topics)),
ClusterID: res.ClusterID,
}
brokers := make(map[int32]Broker, len(res.Brokers))
for i, b := range res.Brokers {
broker := Broker{
Host: b.Host,
Port: int(b.Port),
ID: int(b.NodeID),
Rack: b.Rack,
}
ret.Brokers[i] = broker
brokers[b.NodeID] = broker
if b.NodeID == res.ControllerID {
ret.Controller = broker
}
}
for i, t := range res.Topics {
ret.Topics[i] = Topic{
Name: t.Name,
Internal: t.IsInternal,
Partitions: make([]Partition, len(t.Partitions)),
Error: makeError(t.ErrorCode, ""),
}
for j, p := range t.Partitions {
partition := Partition{
Topic: t.Name,
ID: int(p.PartitionIndex),
Leader: brokers[p.LeaderID],
Replicas: make([]Broker, len(p.ReplicaNodes)),
Isr: make([]Broker, len(p.IsrNodes)),
Error: makeError(p.ErrorCode, ""),
}
for i, id := range p.ReplicaNodes {
partition.Replicas[i] = brokers[id]
}
for i, id := range p.IsrNodes {
partition.Isr[i] = brokers[id]
}
ret.Topics[i].Partitions[j] = partition
}
}
return ret, nil
}
type topicMetadataRequestV1 []string
func (r topicMetadataRequestV1) size() int32 {
return sizeofStringArray([]string(r))
}
func (r topicMetadataRequestV1) writeTo(wb *writeBuffer) {
// communicate nil-ness to the broker by passing -1 as the array length.
// for this particular request, the broker interpets a zero length array
// as a request for no topics whereas a nil array is for all topics.
if r == nil {
wb.writeArrayLen(-1)
} else {
wb.writeStringArray([]string(r))
}
}
type metadataResponseV1 struct {
Brokers []brokerMetadataV1
ControllerID int32
Topics []topicMetadataV1
}
func (r metadataResponseV1) size() int32 {
n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() })
n2 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
return 4 + n1 + n2
}
func (r metadataResponseV1) writeTo(wb *writeBuffer) {
wb.writeArray(len(r.Brokers), func(i int) { r.Brokers[i].writeTo(wb) })
wb.writeInt32(r.ControllerID)
wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}
type brokerMetadataV1 struct {
NodeID int32
Host string
Port int32
Rack string
}
func (b brokerMetadataV1) size() int32 {
return 4 + 4 + sizeofString(b.Host) + sizeofString(b.Rack)
}
func (b brokerMetadataV1) writeTo(wb *writeBuffer) {
wb.writeInt32(b.NodeID)
wb.writeString(b.Host)
wb.writeInt32(b.Port)
wb.writeString(b.Rack)
}
type topicMetadataV1 struct {
TopicErrorCode int16
TopicName string
Internal bool
Partitions []partitionMetadataV1
}
func (t topicMetadataV1) size() int32 {
return 2 + 1 +
sizeofString(t.TopicName) +
sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
}
func (t topicMetadataV1) writeTo(wb *writeBuffer) {
wb.writeInt16(t.TopicErrorCode)
wb.writeString(t.TopicName)
wb.writeBool(t.Internal)
wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}
type partitionMetadataV1 struct {
PartitionErrorCode int16
PartitionID int32
Leader int32
Replicas []int32
Isr []int32
}
func (p partitionMetadataV1) size() int32 {
return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr)
}
func (p partitionMetadataV1) writeTo(wb *writeBuffer) {
wb.writeInt16(p.PartitionErrorCode)
wb.writeInt32(p.PartitionID)
wb.writeInt32(p.Leader)
wb.writeInt32Array(p.Replicas)
wb.writeInt32Array(p.Isr)
}
type topicMetadataRequestV6 struct {
Topics []string
AllowAutoTopicCreation bool
}
func (r topicMetadataRequestV6) size() int32 {
return sizeofStringArray([]string(r.Topics)) + 1
}
func (r topicMetadataRequestV6) writeTo(wb *writeBuffer) {
// communicate nil-ness to the broker by passing -1 as the array length.
// for this particular request, the broker interpets a zero length array
// as a request for no topics whereas a nil array is for all topics.
if r.Topics == nil {
wb.writeArrayLen(-1)
} else {
wb.writeStringArray([]string(r.Topics))
}
wb.writeBool(r.AllowAutoTopicCreation)
}
type metadataResponseV6 struct {
ThrottleTimeMs int32
Brokers []brokerMetadataV1
ClusterId string
ControllerID int32
Topics []topicMetadataV6
}
func (r metadataResponseV6) size() int32 {
n1 := sizeofArray(len(r.Brokers), func(i int) int32 { return r.Brokers[i].size() })
n2 := sizeofNullableString(&r.ClusterId)
n3 := sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
return 4 + 4 + n1 + n2 + n3
}
func (r metadataResponseV6) writeTo(wb *writeBuffer) {
wb.writeInt32(r.ThrottleTimeMs)
wb.writeArray(len(r.Brokers), func(i int) { r.Brokers[i].writeTo(wb) })
wb.writeString(r.ClusterId)
wb.writeInt32(r.ControllerID)
wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
}
type topicMetadataV6 struct {
TopicErrorCode int16
TopicName string
Internal bool
Partitions []partitionMetadataV6
}
func (t topicMetadataV6) size() int32 {
return 2 + 1 +
sizeofString(t.TopicName) +
sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
}
func (t topicMetadataV6) writeTo(wb *writeBuffer) {
wb.writeInt16(t.TopicErrorCode)
wb.writeString(t.TopicName)
wb.writeBool(t.Internal)
wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
}
type partitionMetadataV6 struct {
PartitionErrorCode int16
PartitionID int32
Leader int32
Replicas []int32
Isr []int32
OfflineReplicas []int32
}
func (p partitionMetadataV6) size() int32 {
return 2 + 4 + 4 + sizeofInt32Array(p.Replicas) + sizeofInt32Array(p.Isr) + sizeofInt32Array(p.OfflineReplicas)
}
func (p partitionMetadataV6) writeTo(wb *writeBuffer) {
wb.writeInt16(p.PartitionErrorCode)
wb.writeInt32(p.PartitionID)
wb.writeInt32(p.Leader)
wb.writeInt32Array(p.Replicas)
wb.writeInt32Array(p.Isr)
wb.writeInt32Array(p.OfflineReplicas)
}