You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
377 lines
10 KiB
377 lines
10 KiB
package kafka
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go/protocol"
|
|
"github.com/segmentio/kafka-go/protocol/consumer"
|
|
"github.com/segmentio/kafka-go/protocol/joingroup"
|
|
)
|
|
|
|
// JoinGroupRequest is the request structure for the JoinGroup function.
|
|
type JoinGroupRequest struct {
|
|
// Address of the kafka broker to send the request to.
|
|
Addr net.Addr
|
|
|
|
// GroupID of the group to join.
|
|
GroupID string
|
|
|
|
// The duration after which the coordinator considers the consumer dead
|
|
// if it has not received a heartbeat.
|
|
SessionTimeout time.Duration
|
|
|
|
// The duration the coordination will wait for each member to rejoin when rebalancing the group.
|
|
RebalanceTimeout time.Duration
|
|
|
|
// The ID assigned by the group coordinator.
|
|
MemberID string
|
|
|
|
// The unique identifier for the consumer instance.
|
|
GroupInstanceID string
|
|
|
|
// The name for the class of protocols implemented by the group being joined.
|
|
ProtocolType string
|
|
|
|
// The list of protocols the member supports.
|
|
Protocols []GroupProtocol
|
|
}
|
|
|
|
// GroupProtocol represents a consumer group protocol.
|
|
type GroupProtocol struct {
|
|
// The protocol name.
|
|
Name string
|
|
|
|
// The protocol metadata.
|
|
Metadata GroupProtocolSubscription
|
|
}
|
|
|
|
type GroupProtocolSubscription struct {
|
|
// The Topics to subscribe to.
|
|
Topics []string
|
|
|
|
// UserData assosiated with the subscription for the given protocol
|
|
UserData []byte
|
|
|
|
// Partitions owned by this consumer.
|
|
OwnedPartitions map[string][]int
|
|
}
|
|
|
|
// JoinGroupResponse is the response structure for the JoinGroup function.
|
|
type JoinGroupResponse struct {
|
|
// An error that may have occurred when attempting to join the group.
|
|
//
|
|
// The errors contain the kafka error code. Programs may use the standard
|
|
// errors.Is function to test the error against kafka error codes.
|
|
Error error
|
|
|
|
// The amount of time that the broker throttled the request.
|
|
Throttle time.Duration
|
|
|
|
// The generation ID of the group.
|
|
GenerationID int
|
|
|
|
// The group protocol selected by the coordinatior.
|
|
ProtocolName string
|
|
|
|
// The group protocol name.
|
|
ProtocolType string
|
|
|
|
// The leader of the group.
|
|
LeaderID string
|
|
|
|
// The group member ID.
|
|
MemberID string
|
|
|
|
// The members of the group.
|
|
Members []JoinGroupResponseMember
|
|
}
|
|
|
|
// JoinGroupResponseMember represents a group memmber in a reponse to a JoinGroup request.
|
|
type JoinGroupResponseMember struct {
|
|
// The group memmber ID.
|
|
ID string
|
|
|
|
// The unique identifier of the consumer instance.
|
|
GroupInstanceID string
|
|
|
|
// The group member metadata.
|
|
Metadata GroupProtocolSubscription
|
|
}
|
|
|
|
// JoinGroup sends a join group request to the coordinator and returns the response.
|
|
func (c *Client) JoinGroup(ctx context.Context, req *JoinGroupRequest) (*JoinGroupResponse, error) {
|
|
joinGroup := joingroup.Request{
|
|
GroupID: req.GroupID,
|
|
SessionTimeoutMS: int32(req.SessionTimeout.Milliseconds()),
|
|
RebalanceTimeoutMS: int32(req.RebalanceTimeout.Milliseconds()),
|
|
MemberID: req.MemberID,
|
|
GroupInstanceID: req.GroupInstanceID,
|
|
ProtocolType: req.ProtocolType,
|
|
Protocols: make([]joingroup.RequestProtocol, 0, len(req.Protocols)),
|
|
}
|
|
|
|
for _, proto := range req.Protocols {
|
|
protoMeta := consumer.Subscription{
|
|
Version: consumer.MaxVersionSupported,
|
|
Topics: proto.Metadata.Topics,
|
|
UserData: proto.Metadata.UserData,
|
|
OwnedPartitions: make([]consumer.TopicPartition, 0, len(proto.Metadata.OwnedPartitions)),
|
|
}
|
|
for topic, partitions := range proto.Metadata.OwnedPartitions {
|
|
tp := consumer.TopicPartition{
|
|
Topic: topic,
|
|
Partitions: make([]int32, 0, len(partitions)),
|
|
}
|
|
for _, partition := range partitions {
|
|
tp.Partitions = append(tp.Partitions, int32(partition))
|
|
}
|
|
protoMeta.OwnedPartitions = append(protoMeta.OwnedPartitions, tp)
|
|
}
|
|
|
|
metaBytes, err := protocol.Marshal(consumer.MaxVersionSupported, protoMeta)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
|
|
}
|
|
|
|
joinGroup.Protocols = append(joinGroup.Protocols, joingroup.RequestProtocol{
|
|
Name: proto.Name,
|
|
Metadata: metaBytes,
|
|
})
|
|
}
|
|
|
|
m, err := c.roundTrip(ctx, req.Addr, &joinGroup)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
|
|
}
|
|
|
|
r := m.(*joingroup.Response)
|
|
|
|
res := &JoinGroupResponse{
|
|
Error: makeError(r.ErrorCode, ""),
|
|
Throttle: makeDuration(r.ThrottleTimeMS),
|
|
GenerationID: int(r.GenerationID),
|
|
ProtocolName: r.ProtocolName,
|
|
ProtocolType: r.ProtocolType,
|
|
LeaderID: r.LeaderID,
|
|
MemberID: r.MemberID,
|
|
Members: make([]JoinGroupResponseMember, 0, len(r.Members)),
|
|
}
|
|
|
|
for _, member := range r.Members {
|
|
var meta consumer.Subscription
|
|
err = protocol.Unmarshal(member.Metadata, consumer.MaxVersionSupported, &meta)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka.(*Client).JoinGroup: %w", err)
|
|
}
|
|
subscription := GroupProtocolSubscription{
|
|
Topics: meta.Topics,
|
|
UserData: meta.UserData,
|
|
OwnedPartitions: make(map[string][]int, len(meta.OwnedPartitions)),
|
|
}
|
|
for _, owned := range meta.OwnedPartitions {
|
|
subscription.OwnedPartitions[owned.Topic] = make([]int, 0, len(owned.Partitions))
|
|
for _, partition := range owned.Partitions {
|
|
subscription.OwnedPartitions[owned.Topic] = append(subscription.OwnedPartitions[owned.Topic], int(partition))
|
|
}
|
|
}
|
|
res.Members = append(res.Members, JoinGroupResponseMember{
|
|
ID: member.MemberID,
|
|
GroupInstanceID: member.GroupInstanceID,
|
|
Metadata: subscription,
|
|
})
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
type groupMetadata struct {
|
|
Version int16
|
|
Topics []string
|
|
UserData []byte
|
|
}
|
|
|
|
func (t groupMetadata) size() int32 {
|
|
return sizeofInt16(t.Version) +
|
|
sizeofStringArray(t.Topics) +
|
|
sizeofBytes(t.UserData)
|
|
}
|
|
|
|
func (t groupMetadata) writeTo(wb *writeBuffer) {
|
|
wb.writeInt16(t.Version)
|
|
wb.writeStringArray(t.Topics)
|
|
wb.writeBytes(t.UserData)
|
|
}
|
|
|
|
func (t groupMetadata) bytes() []byte {
|
|
buf := bytes.NewBuffer(nil)
|
|
t.writeTo(&writeBuffer{w: buf})
|
|
return buf.Bytes()
|
|
}
|
|
|
|
func (t *groupMetadata) readFrom(r *bufio.Reader, size int) (remain int, err error) {
|
|
if remain, err = readInt16(r, size, &t.Version); err != nil {
|
|
return
|
|
}
|
|
if remain, err = readStringArray(r, remain, &t.Topics); err != nil {
|
|
return
|
|
}
|
|
if remain, err = readBytes(r, remain, &t.UserData); err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
type joinGroupRequestGroupProtocolV1 struct {
|
|
ProtocolName string
|
|
ProtocolMetadata []byte
|
|
}
|
|
|
|
func (t joinGroupRequestGroupProtocolV1) size() int32 {
|
|
return sizeofString(t.ProtocolName) +
|
|
sizeofBytes(t.ProtocolMetadata)
|
|
}
|
|
|
|
func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) {
|
|
wb.writeString(t.ProtocolName)
|
|
wb.writeBytes(t.ProtocolMetadata)
|
|
}
|
|
|
|
type joinGroupRequestV1 struct {
|
|
// GroupID holds the unique group identifier
|
|
GroupID string
|
|
|
|
// SessionTimeout holds the coordinator considers the consumer dead if it
|
|
// receives no heartbeat after this timeout in ms.
|
|
SessionTimeout int32
|
|
|
|
// RebalanceTimeout holds the maximum time that the coordinator will wait
|
|
// for each member to rejoin when rebalancing the group in ms
|
|
RebalanceTimeout int32
|
|
|
|
// MemberID assigned by the group coordinator or the zero string if joining
|
|
// for the first time.
|
|
MemberID string
|
|
|
|
// ProtocolType holds the unique name for class of protocols implemented by group
|
|
ProtocolType string
|
|
|
|
// GroupProtocols holds the list of protocols that the member supports
|
|
GroupProtocols []joinGroupRequestGroupProtocolV1
|
|
}
|
|
|
|
func (t joinGroupRequestV1) size() int32 {
|
|
return sizeofString(t.GroupID) +
|
|
sizeofInt32(t.SessionTimeout) +
|
|
sizeofInt32(t.RebalanceTimeout) +
|
|
sizeofString(t.MemberID) +
|
|
sizeofString(t.ProtocolType) +
|
|
sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() })
|
|
}
|
|
|
|
func (t joinGroupRequestV1) writeTo(wb *writeBuffer) {
|
|
wb.writeString(t.GroupID)
|
|
wb.writeInt32(t.SessionTimeout)
|
|
wb.writeInt32(t.RebalanceTimeout)
|
|
wb.writeString(t.MemberID)
|
|
wb.writeString(t.ProtocolType)
|
|
wb.writeArray(len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(wb) })
|
|
}
|
|
|
|
type joinGroupResponseMemberV1 struct {
|
|
// MemberID assigned by the group coordinator
|
|
MemberID string
|
|
MemberMetadata []byte
|
|
}
|
|
|
|
func (t joinGroupResponseMemberV1) size() int32 {
|
|
return sizeofString(t.MemberID) +
|
|
sizeofBytes(t.MemberMetadata)
|
|
}
|
|
|
|
func (t joinGroupResponseMemberV1) writeTo(wb *writeBuffer) {
|
|
wb.writeString(t.MemberID)
|
|
wb.writeBytes(t.MemberMetadata)
|
|
}
|
|
|
|
func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
|
|
if remain, err = readString(r, size, &t.MemberID); err != nil {
|
|
return
|
|
}
|
|
if remain, err = readBytes(r, remain, &t.MemberMetadata); err != nil {
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
type joinGroupResponseV1 struct {
|
|
// ErrorCode holds response error code
|
|
ErrorCode int16
|
|
|
|
// GenerationID holds the generation of the group.
|
|
GenerationID int32
|
|
|
|
// GroupProtocol holds the group protocol selected by the coordinator
|
|
GroupProtocol string
|
|
|
|
// LeaderID holds the leader of the group
|
|
LeaderID string
|
|
|
|
// MemberID assigned by the group coordinator
|
|
MemberID string
|
|
Members []joinGroupResponseMemberV1
|
|
}
|
|
|
|
func (t joinGroupResponseV1) size() int32 {
|
|
return sizeofInt16(t.ErrorCode) +
|
|
sizeofInt32(t.GenerationID) +
|
|
sizeofString(t.GroupProtocol) +
|
|
sizeofString(t.LeaderID) +
|
|
sizeofString(t.MemberID) +
|
|
sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() })
|
|
}
|
|
|
|
func (t joinGroupResponseV1) writeTo(wb *writeBuffer) {
|
|
wb.writeInt16(t.ErrorCode)
|
|
wb.writeInt32(t.GenerationID)
|
|
wb.writeString(t.GroupProtocol)
|
|
wb.writeString(t.LeaderID)
|
|
wb.writeString(t.MemberID)
|
|
wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) })
|
|
}
|
|
|
|
func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
|
|
if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
|
|
return
|
|
}
|
|
if remain, err = readInt32(r, remain, &t.GenerationID); err != nil {
|
|
return
|
|
}
|
|
if remain, err = readString(r, remain, &t.GroupProtocol); err != nil {
|
|
return
|
|
}
|
|
if remain, err = readString(r, remain, &t.LeaderID); err != nil {
|
|
return
|
|
}
|
|
if remain, err = readString(r, remain, &t.MemberID); err != nil {
|
|
return
|
|
}
|
|
|
|
fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
|
|
var item joinGroupResponseMemberV1
|
|
if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
|
|
return
|
|
}
|
|
t.Members = append(t.Members, item)
|
|
return
|
|
}
|
|
if remain, err = readArrayWith(r, remain, fn); err != nil {
|
|
return
|
|
}
|
|
|
|
return
|
|
}
|
|
|