You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
339 lines
10 KiB
339 lines
10 KiB
package kafka
|
|
|
|
import (
|
|
"sort"
|
|
)
|
|
|
|
// GroupMember describes a single participant in a consumer group.
|
|
type GroupMember struct {
|
|
// ID is the unique ID for this member as taken from the JoinGroup response.
|
|
ID string
|
|
|
|
// Topics is a list of topics that this member is consuming.
|
|
Topics []string
|
|
|
|
// UserData contains any information that the GroupBalancer sent to the
|
|
// consumer group coordinator.
|
|
UserData []byte
|
|
}
|
|
|
|
// GroupMemberAssignments holds MemberID => topic => partitions.
|
|
type GroupMemberAssignments map[string]map[string][]int
|
|
|
|
// GroupBalancer encapsulates the client side rebalancing logic.
|
|
type GroupBalancer interface {
|
|
// ProtocolName of the GroupBalancer
|
|
ProtocolName() string
|
|
|
|
// UserData provides the GroupBalancer an opportunity to embed custom
|
|
// UserData into the metadata.
|
|
//
|
|
// Will be used by JoinGroup to begin the consumer group handshake.
|
|
//
|
|
// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupRequest
|
|
UserData() ([]byte, error)
|
|
|
|
// DefineMemberships returns which members will be consuming
|
|
// which topic partitions
|
|
AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments
|
|
}
|
|
|
|
// RangeGroupBalancer groups consumers by partition
|
|
//
|
|
// Example: 5 partitions, 2 consumers
|
|
// C0: [0, 1, 2]
|
|
// C1: [3, 4]
|
|
//
|
|
// Example: 6 partitions, 3 consumers
|
|
// C0: [0, 1]
|
|
// C1: [2, 3]
|
|
// C2: [4, 5]
|
|
//
|
|
type RangeGroupBalancer struct{}
|
|
|
|
func (r RangeGroupBalancer) ProtocolName() string {
|
|
return "range"
|
|
}
|
|
|
|
func (r RangeGroupBalancer) UserData() ([]byte, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
|
|
groupAssignments := GroupMemberAssignments{}
|
|
membersByTopic := findMembersByTopic(members)
|
|
|
|
for topic, members := range membersByTopic {
|
|
partitions := findPartitions(topic, topicPartitions)
|
|
partitionCount := len(partitions)
|
|
memberCount := len(members)
|
|
|
|
for memberIndex, member := range members {
|
|
assignmentsByTopic, ok := groupAssignments[member.ID]
|
|
if !ok {
|
|
assignmentsByTopic = map[string][]int{}
|
|
groupAssignments[member.ID] = assignmentsByTopic
|
|
}
|
|
|
|
minIndex := memberIndex * partitionCount / memberCount
|
|
maxIndex := (memberIndex + 1) * partitionCount / memberCount
|
|
|
|
for partitionIndex, partition := range partitions {
|
|
if partitionIndex >= minIndex && partitionIndex < maxIndex {
|
|
assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return groupAssignments
|
|
}
|
|
|
|
// RoundrobinGroupBalancer divides partitions evenly among consumers
|
|
//
|
|
// Example: 5 partitions, 2 consumers
|
|
// C0: [0, 2, 4]
|
|
// C1: [1, 3]
|
|
//
|
|
// Example: 6 partitions, 3 consumers
|
|
// C0: [0, 3]
|
|
// C1: [1, 4]
|
|
// C2: [2, 5]
|
|
//
|
|
type RoundRobinGroupBalancer struct{}
|
|
|
|
func (r RoundRobinGroupBalancer) ProtocolName() string {
|
|
return "roundrobin"
|
|
}
|
|
|
|
func (r RoundRobinGroupBalancer) UserData() ([]byte, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
|
|
groupAssignments := GroupMemberAssignments{}
|
|
membersByTopic := findMembersByTopic(members)
|
|
for topic, members := range membersByTopic {
|
|
partitionIDs := findPartitions(topic, topicPartitions)
|
|
memberCount := len(members)
|
|
|
|
for memberIndex, member := range members {
|
|
assignmentsByTopic, ok := groupAssignments[member.ID]
|
|
if !ok {
|
|
assignmentsByTopic = map[string][]int{}
|
|
groupAssignments[member.ID] = assignmentsByTopic
|
|
}
|
|
|
|
for partitionIndex, partition := range partitionIDs {
|
|
if (partitionIndex % memberCount) == memberIndex {
|
|
assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return groupAssignments
|
|
}
|
|
|
|
// RackAffinityGroupBalancer makes a best effort to pair up consumers with
|
|
// partitions whose leader is in the same rack. This strategy can have
|
|
// performance benefits by minimizing round trip latency between the consumer
|
|
// and the broker. In environments where network traffic across racks incurs
|
|
// charges (such as cross AZ data transfer in AWS), this strategy is also a cost
|
|
// optimization measure because it keeps network traffic within the local rack
|
|
// where possible.
|
|
//
|
|
// The primary objective is to spread partitions evenly across consumers with a
|
|
// secondary focus on maximizing the number of partitions where the leader and
|
|
// the consumer are in the same rack. For best affinity, it's recommended to
|
|
// have a balanced spread of consumers and partition leaders across racks.
|
|
//
|
|
// This balancer requires Kafka version 0.10.0.0+ or later. Earlier versions do
|
|
// not return the brokers' racks in the metadata request.
|
|
type RackAffinityGroupBalancer struct {
|
|
// Rack is the name of the rack where this consumer is running. It will be
|
|
// communicated to the consumer group leader via the UserData so that
|
|
// assignments can be made with affinity to the partition leader.
|
|
Rack string
|
|
}
|
|
|
|
func (r RackAffinityGroupBalancer) ProtocolName() string {
|
|
return "rack-affinity"
|
|
}
|
|
|
|
func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments {
|
|
membersByTopic := make(map[string][]GroupMember)
|
|
for _, m := range members {
|
|
for _, t := range m.Topics {
|
|
membersByTopic[t] = append(membersByTopic[t], m)
|
|
}
|
|
}
|
|
|
|
partitionsByTopic := make(map[string][]Partition)
|
|
for _, p := range partitions {
|
|
partitionsByTopic[p.Topic] = append(partitionsByTopic[p.Topic], p)
|
|
}
|
|
|
|
assignments := GroupMemberAssignments{}
|
|
for topic := range membersByTopic {
|
|
topicAssignments := r.assignTopic(membersByTopic[topic], partitionsByTopic[topic])
|
|
for member, parts := range topicAssignments {
|
|
memberAssignments, ok := assignments[member]
|
|
if !ok {
|
|
memberAssignments = make(map[string][]int)
|
|
assignments[member] = memberAssignments
|
|
}
|
|
memberAssignments[topic] = parts
|
|
}
|
|
}
|
|
return assignments
|
|
}
|
|
|
|
func (r RackAffinityGroupBalancer) UserData() ([]byte, error) {
|
|
return []byte(r.Rack), nil
|
|
}
|
|
|
|
func (r *RackAffinityGroupBalancer) assignTopic(members []GroupMember, partitions []Partition) map[string][]int {
|
|
zonedPartitions := make(map[string][]int)
|
|
for _, part := range partitions {
|
|
zone := part.Leader.Rack
|
|
zonedPartitions[zone] = append(zonedPartitions[zone], part.ID)
|
|
}
|
|
|
|
zonedConsumers := make(map[string][]string)
|
|
for _, member := range members {
|
|
zone := string(member.UserData)
|
|
zonedConsumers[zone] = append(zonedConsumers[zone], member.ID)
|
|
}
|
|
|
|
targetPerMember := len(partitions) / len(members)
|
|
remainder := len(partitions) % len(members)
|
|
assignments := make(map[string][]int)
|
|
|
|
// assign as many as possible in zone. this will assign up to partsPerMember
|
|
// to each consumer. it will also prefer to allocate remainder partitions
|
|
// in zone if possible.
|
|
for zone, parts := range zonedPartitions {
|
|
consumers := zonedConsumers[zone]
|
|
if len(consumers) == 0 {
|
|
continue
|
|
}
|
|
|
|
// don't over-allocate. cap partition assignments at the calculated
|
|
// target.
|
|
partsPerMember := len(parts) / len(consumers)
|
|
if partsPerMember > targetPerMember {
|
|
partsPerMember = targetPerMember
|
|
}
|
|
|
|
for _, consumer := range consumers {
|
|
assignments[consumer] = append(assignments[consumer], parts[:partsPerMember]...)
|
|
parts = parts[partsPerMember:]
|
|
}
|
|
|
|
// if we had enough partitions for each consumer in this zone to hit its
|
|
// target, attempt to use any leftover partitions to satisfy the total
|
|
// remainder by adding at most 1 partition per consumer.
|
|
leftover := len(parts)
|
|
if partsPerMember == targetPerMember {
|
|
if leftover > remainder {
|
|
leftover = remainder
|
|
}
|
|
if leftover > len(consumers) {
|
|
leftover = len(consumers)
|
|
}
|
|
remainder -= leftover
|
|
}
|
|
|
|
// this loop covers the case where we're assigning extra partitions or
|
|
// if there weren't enough to satisfy the targetPerMember and the zoned
|
|
// partitions didn't divide evenly.
|
|
for i := 0; i < leftover; i++ {
|
|
assignments[consumers[i]] = append(assignments[consumers[i]], parts[i])
|
|
}
|
|
parts = parts[leftover:]
|
|
|
|
if len(parts) == 0 {
|
|
delete(zonedPartitions, zone)
|
|
} else {
|
|
zonedPartitions[zone] = parts
|
|
}
|
|
}
|
|
|
|
// assign out remainders regardless of zone.
|
|
var remaining []int
|
|
for _, partitions := range zonedPartitions {
|
|
remaining = append(remaining, partitions...)
|
|
}
|
|
|
|
for _, member := range members {
|
|
assigned := assignments[member.ID]
|
|
delta := targetPerMember - len(assigned)
|
|
// if it were possible to assign the remainder in zone, it's been taken
|
|
// care of already. now we will portion out any remainder to a member
|
|
// that can take it.
|
|
if delta >= 0 && remainder > 0 {
|
|
delta++
|
|
remainder--
|
|
}
|
|
if delta > 0 {
|
|
assignments[member.ID] = append(assigned, remaining[:delta]...)
|
|
remaining = remaining[delta:]
|
|
}
|
|
}
|
|
|
|
return assignments
|
|
}
|
|
|
|
// findPartitions extracts the partition ids associated with the topic from the
|
|
// list of Partitions provided.
|
|
func findPartitions(topic string, partitions []Partition) []int {
|
|
var ids []int
|
|
for _, partition := range partitions {
|
|
if partition.Topic == topic {
|
|
ids = append(ids, partition.ID)
|
|
}
|
|
}
|
|
return ids
|
|
}
|
|
|
|
// findMembersByTopic groups the memberGroupMetadata by topic.
|
|
func findMembersByTopic(members []GroupMember) map[string][]GroupMember {
|
|
membersByTopic := map[string][]GroupMember{}
|
|
for _, member := range members {
|
|
for _, topic := range member.Topics {
|
|
membersByTopic[topic] = append(membersByTopic[topic], member)
|
|
}
|
|
}
|
|
|
|
// normalize ordering of members to enabling grouping across topics by partitions
|
|
//
|
|
// Want:
|
|
// C0 [T0/P0, T1/P0]
|
|
// C1 [T0/P1, T1/P1]
|
|
//
|
|
// Not:
|
|
// C0 [T0/P0, T1/P1]
|
|
// C1 [T0/P1, T1/P0]
|
|
//
|
|
// Even though the later is still round robin, the partitions are crossed
|
|
//
|
|
for _, members := range membersByTopic {
|
|
sort.Slice(members, func(i, j int) bool {
|
|
return members[i].ID < members[j].ID
|
|
})
|
|
}
|
|
|
|
return membersByTopic
|
|
}
|
|
|
|
// findGroupBalancer returns the GroupBalancer with the specified protocolName
|
|
// from the slice provided.
|
|
func findGroupBalancer(protocolName string, balancers []GroupBalancer) (GroupBalancer, bool) {
|
|
for _, balancer := range balancers {
|
|
if balancer.ProtocolName() == protocolName {
|
|
return balancer, true
|
|
}
|
|
}
|
|
return nil, false
|
|
}
|
|
|