You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
298 lines
8.1 KiB
298 lines
8.1 KiB
package kafka
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
|
|
"github.com/segmentio/kafka-go/protocol/describegroups"
|
|
)
|
|
|
|
// DescribeGroupsRequest is a request to the DescribeGroups API.
|
|
type DescribeGroupsRequest struct {
|
|
// Addr is the address of the kafka broker to send the request to.
|
|
Addr net.Addr
|
|
|
|
// GroupIDs is a slice of groups to get details for.
|
|
GroupIDs []string
|
|
}
|
|
|
|
// DescribeGroupsResponse is a response from the DescribeGroups API.
|
|
type DescribeGroupsResponse struct {
|
|
// Groups is a slice of details for the requested groups.
|
|
Groups []DescribeGroupsResponseGroup
|
|
}
|
|
|
|
// DescribeGroupsResponseGroup contains the response details for a single group.
|
|
type DescribeGroupsResponseGroup struct {
|
|
// Error is set to a non-nil value if there was an error fetching the details
|
|
// for this group.
|
|
Error error
|
|
|
|
// GroupID is the ID of the group.
|
|
GroupID string
|
|
|
|
// GroupState is a description of the group state.
|
|
GroupState string
|
|
|
|
// Members contains details about each member of the group.
|
|
Members []DescribeGroupsResponseMember
|
|
}
|
|
|
|
// MemberInfo represents the membership information for a single group member.
|
|
type DescribeGroupsResponseMember struct {
|
|
// MemberID is the ID of the group member.
|
|
MemberID string
|
|
|
|
// ClientID is the ID of the client that the group member is using.
|
|
ClientID string
|
|
|
|
// ClientHost is the host of the client that the group member is connecting from.
|
|
ClientHost string
|
|
|
|
// MemberMetadata contains metadata about this group member.
|
|
MemberMetadata DescribeGroupsResponseMemberMetadata
|
|
|
|
// MemberAssignments contains the topic partitions that this member is assigned to.
|
|
MemberAssignments DescribeGroupsResponseAssignments
|
|
}
|
|
|
|
// GroupMemberMetadata stores metadata associated with a group member.
|
|
type DescribeGroupsResponseMemberMetadata struct {
|
|
// Version is the version of the metadata.
|
|
Version int
|
|
|
|
// Topics is the list of topics that the member is assigned to.
|
|
Topics []string
|
|
|
|
// UserData is the user data for the member.
|
|
UserData []byte
|
|
|
|
// OwnedPartitions contains the partitions owned by this group member; only set if
|
|
// consumers are using a cooperative rebalancing assignor protocol.
|
|
OwnedPartitions []DescribeGroupsResponseMemberMetadataOwnedPartition
|
|
}
|
|
|
|
type DescribeGroupsResponseMemberMetadataOwnedPartition struct {
|
|
// Topic is the name of the topic.
|
|
Topic string
|
|
|
|
// Partitions is the partitions that are owned by the group in the topic.
|
|
Partitions []int
|
|
}
|
|
|
|
// GroupMemberAssignmentsInfo stores the topic partition assignment data for a group member.
|
|
type DescribeGroupsResponseAssignments struct {
|
|
// Version is the version of the assignments data.
|
|
Version int
|
|
|
|
// Topics contains the details of the partition assignments for each topic.
|
|
Topics []GroupMemberTopic
|
|
|
|
// UserData is the user data for the member.
|
|
UserData []byte
|
|
}
|
|
|
|
// GroupMemberTopic is a mapping from a topic to a list of partitions in the topic. It is used
|
|
// to represent the topic partitions that have been assigned to a group member.
|
|
type GroupMemberTopic struct {
|
|
// Topic is the name of the topic.
|
|
Topic string
|
|
|
|
// Partitions is a slice of partition IDs that this member is assigned to in the topic.
|
|
Partitions []int
|
|
}
|
|
|
|
// DescribeGroups calls the Kafka DescribeGroups API to get information about one or more
|
|
// consumer groups. See https://kafka.apache.org/protocol#The_Messages_DescribeGroups for
|
|
// more information.
|
|
func (c *Client) DescribeGroups(
|
|
ctx context.Context,
|
|
req *DescribeGroupsRequest,
|
|
) (*DescribeGroupsResponse, error) {
|
|
protoResp, err := c.roundTrip(
|
|
ctx,
|
|
req.Addr,
|
|
&describegroups.Request{
|
|
Groups: req.GroupIDs,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
apiResp := protoResp.(*describegroups.Response)
|
|
resp := &DescribeGroupsResponse{}
|
|
|
|
for _, apiGroup := range apiResp.Groups {
|
|
group := DescribeGroupsResponseGroup{
|
|
Error: makeError(apiGroup.ErrorCode, ""),
|
|
GroupID: apiGroup.GroupID,
|
|
GroupState: apiGroup.GroupState,
|
|
}
|
|
|
|
for _, member := range apiGroup.Members {
|
|
decodedMetadata, err := decodeMemberMetadata(member.MemberMetadata)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
decodedAssignments, err := decodeMemberAssignments(member.MemberAssignment)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
group.Members = append(group.Members, DescribeGroupsResponseMember{
|
|
MemberID: member.MemberID,
|
|
ClientID: member.ClientID,
|
|
ClientHost: member.ClientHost,
|
|
MemberAssignments: decodedAssignments,
|
|
MemberMetadata: decodedMetadata,
|
|
})
|
|
}
|
|
resp.Groups = append(resp.Groups, group)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// decodeMemberMetadata converts raw metadata bytes to a
|
|
// DescribeGroupsResponseMemberMetadata struct.
|
|
//
|
|
// See https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java#L49
|
|
// for protocol details.
|
|
func decodeMemberMetadata(rawMetadata []byte) (DescribeGroupsResponseMemberMetadata, error) {
|
|
mm := DescribeGroupsResponseMemberMetadata{}
|
|
|
|
if len(rawMetadata) == 0 {
|
|
return mm, nil
|
|
}
|
|
|
|
buf := bytes.NewBuffer(rawMetadata)
|
|
bufReader := bufio.NewReader(buf)
|
|
remain := len(rawMetadata)
|
|
|
|
var err error
|
|
var version16 int16
|
|
|
|
if remain, err = readInt16(bufReader, remain, &version16); err != nil {
|
|
return mm, err
|
|
}
|
|
mm.Version = int(version16)
|
|
|
|
if remain, err = readStringArray(bufReader, remain, &mm.Topics); err != nil {
|
|
return mm, err
|
|
}
|
|
if remain, err = readBytes(bufReader, remain, &mm.UserData); err != nil {
|
|
return mm, err
|
|
}
|
|
|
|
if mm.Version == 1 && remain > 0 {
|
|
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
|
|
op := DescribeGroupsResponseMemberMetadataOwnedPartition{}
|
|
if fnRemain, fnErr = readString(r, size, &op.Topic); fnErr != nil {
|
|
return
|
|
}
|
|
|
|
ps := []int32{}
|
|
if fnRemain, fnErr = readInt32Array(r, fnRemain, &ps); fnErr != nil {
|
|
return
|
|
}
|
|
|
|
for _, p := range ps {
|
|
op.Partitions = append(op.Partitions, int(p))
|
|
}
|
|
|
|
mm.OwnedPartitions = append(mm.OwnedPartitions, op)
|
|
return
|
|
}
|
|
|
|
if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
|
|
return mm, err
|
|
}
|
|
}
|
|
|
|
if remain != 0 {
|
|
return mm, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
|
|
}
|
|
|
|
return mm, nil
|
|
}
|
|
|
|
// decodeMemberAssignments converts raw assignment bytes to a DescribeGroupsResponseAssignments
|
|
// struct.
|
|
//
|
|
// See https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java#L49
|
|
// for protocol details.
|
|
func decodeMemberAssignments(rawAssignments []byte) (DescribeGroupsResponseAssignments, error) {
|
|
ma := DescribeGroupsResponseAssignments{}
|
|
|
|
if len(rawAssignments) == 0 {
|
|
return ma, nil
|
|
}
|
|
|
|
buf := bytes.NewBuffer(rawAssignments)
|
|
bufReader := bufio.NewReader(buf)
|
|
remain := len(rawAssignments)
|
|
|
|
var err error
|
|
var version16 int16
|
|
|
|
if remain, err = readInt16(bufReader, remain, &version16); err != nil {
|
|
return ma, err
|
|
}
|
|
ma.Version = int(version16)
|
|
|
|
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
|
|
item := GroupMemberTopic{}
|
|
|
|
if fnRemain, fnErr = readString(r, size, &item.Topic); fnErr != nil {
|
|
return
|
|
}
|
|
|
|
partitions := []int32{}
|
|
|
|
if fnRemain, fnErr = readInt32Array(r, fnRemain, &partitions); fnErr != nil {
|
|
return
|
|
}
|
|
for _, partition := range partitions {
|
|
item.Partitions = append(item.Partitions, int(partition))
|
|
}
|
|
|
|
ma.Topics = append(ma.Topics, item)
|
|
return
|
|
}
|
|
if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
|
|
return ma, err
|
|
}
|
|
|
|
if remain, err = readBytes(bufReader, remain, &ma.UserData); err != nil {
|
|
return ma, err
|
|
}
|
|
|
|
if remain != 0 {
|
|
return ma, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
|
|
}
|
|
|
|
return ma, nil
|
|
}
|
|
|
|
// readInt32Array reads an array of int32s. It's adapted from the implementation of
|
|
// readStringArray.
|
|
func readInt32Array(r *bufio.Reader, sz int, v *[]int32) (remain int, err error) {
|
|
var content []int32
|
|
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
|
|
var value int32
|
|
if fnRemain, fnErr = readInt32(r, size, &value); fnErr != nil {
|
|
return
|
|
}
|
|
content = append(content, value)
|
|
return
|
|
}
|
|
if remain, err = readArrayWith(r, sz, fn); err != nil {
|
|
return
|
|
}
|
|
|
|
*v = content
|
|
return
|
|
}
|
|
|