You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

339 lines
10 KiB

package kafka
import (
"sort"
)
// GroupMember describes a single participant in a consumer group.
type GroupMember struct {
// ID is the unique ID for this member as taken from the JoinGroup response.
ID string
// Topics is a list of topics that this member is consuming.
Topics []string
// UserData contains any information that the GroupBalancer sent to the
// consumer group coordinator.
UserData []byte
}
// GroupMemberAssignments holds MemberID => topic => partitions.
type GroupMemberAssignments map[string]map[string][]int
// GroupBalancer encapsulates the client side rebalancing logic.
type GroupBalancer interface {
// ProtocolName of the GroupBalancer
ProtocolName() string
// UserData provides the GroupBalancer an opportunity to embed custom
// UserData into the metadata.
//
// Will be used by JoinGroup to begin the consumer group handshake.
//
// See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-JoinGroupRequest
UserData() ([]byte, error)
// DefineMemberships returns which members will be consuming
// which topic partitions
AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments
}
// RangeGroupBalancer groups consumers by partition
//
// Example: 5 partitions, 2 consumers
// C0: [0, 1, 2]
// C1: [3, 4]
//
// Example: 6 partitions, 3 consumers
// C0: [0, 1]
// C1: [2, 3]
// C2: [4, 5]
//
type RangeGroupBalancer struct{}
func (r RangeGroupBalancer) ProtocolName() string {
return "range"
}
func (r RangeGroupBalancer) UserData() ([]byte, error) {
return nil, nil
}
func (r RangeGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
groupAssignments := GroupMemberAssignments{}
membersByTopic := findMembersByTopic(members)
for topic, members := range membersByTopic {
partitions := findPartitions(topic, topicPartitions)
partitionCount := len(partitions)
memberCount := len(members)
for memberIndex, member := range members {
assignmentsByTopic, ok := groupAssignments[member.ID]
if !ok {
assignmentsByTopic = map[string][]int{}
groupAssignments[member.ID] = assignmentsByTopic
}
minIndex := memberIndex * partitionCount / memberCount
maxIndex := (memberIndex + 1) * partitionCount / memberCount
for partitionIndex, partition := range partitions {
if partitionIndex >= minIndex && partitionIndex < maxIndex {
assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
}
}
}
}
return groupAssignments
}
// RoundrobinGroupBalancer divides partitions evenly among consumers
//
// Example: 5 partitions, 2 consumers
// C0: [0, 2, 4]
// C1: [1, 3]
//
// Example: 6 partitions, 3 consumers
// C0: [0, 3]
// C1: [1, 4]
// C2: [2, 5]
//
type RoundRobinGroupBalancer struct{}
func (r RoundRobinGroupBalancer) ProtocolName() string {
return "roundrobin"
}
func (r RoundRobinGroupBalancer) UserData() ([]byte, error) {
return nil, nil
}
func (r RoundRobinGroupBalancer) AssignGroups(members []GroupMember, topicPartitions []Partition) GroupMemberAssignments {
groupAssignments := GroupMemberAssignments{}
membersByTopic := findMembersByTopic(members)
for topic, members := range membersByTopic {
partitionIDs := findPartitions(topic, topicPartitions)
memberCount := len(members)
for memberIndex, member := range members {
assignmentsByTopic, ok := groupAssignments[member.ID]
if !ok {
assignmentsByTopic = map[string][]int{}
groupAssignments[member.ID] = assignmentsByTopic
}
for partitionIndex, partition := range partitionIDs {
if (partitionIndex % memberCount) == memberIndex {
assignmentsByTopic[topic] = append(assignmentsByTopic[topic], partition)
}
}
}
}
return groupAssignments
}
// RackAffinityGroupBalancer makes a best effort to pair up consumers with
// partitions whose leader is in the same rack. This strategy can have
// performance benefits by minimizing round trip latency between the consumer
// and the broker. In environments where network traffic across racks incurs
// charges (such as cross AZ data transfer in AWS), this strategy is also a cost
// optimization measure because it keeps network traffic within the local rack
// where possible.
//
// The primary objective is to spread partitions evenly across consumers with a
// secondary focus on maximizing the number of partitions where the leader and
// the consumer are in the same rack. For best affinity, it's recommended to
// have a balanced spread of consumers and partition leaders across racks.
//
// This balancer requires Kafka version 0.10.0.0+ or later. Earlier versions do
// not return the brokers' racks in the metadata request.
type RackAffinityGroupBalancer struct {
// Rack is the name of the rack where this consumer is running. It will be
// communicated to the consumer group leader via the UserData so that
// assignments can be made with affinity to the partition leader.
Rack string
}
func (r RackAffinityGroupBalancer) ProtocolName() string {
return "rack-affinity"
}
func (r RackAffinityGroupBalancer) AssignGroups(members []GroupMember, partitions []Partition) GroupMemberAssignments {
membersByTopic := make(map[string][]GroupMember)
for _, m := range members {
for _, t := range m.Topics {
membersByTopic[t] = append(membersByTopic[t], m)
}
}
partitionsByTopic := make(map[string][]Partition)
for _, p := range partitions {
partitionsByTopic[p.Topic] = append(partitionsByTopic[p.Topic], p)
}
assignments := GroupMemberAssignments{}
for topic := range membersByTopic {
topicAssignments := r.assignTopic(membersByTopic[topic], partitionsByTopic[topic])
for member, parts := range topicAssignments {
memberAssignments, ok := assignments[member]
if !ok {
memberAssignments = make(map[string][]int)
assignments[member] = memberAssignments
}
memberAssignments[topic] = parts
}
}
return assignments
}
func (r RackAffinityGroupBalancer) UserData() ([]byte, error) {
return []byte(r.Rack), nil
}
func (r *RackAffinityGroupBalancer) assignTopic(members []GroupMember, partitions []Partition) map[string][]int {
zonedPartitions := make(map[string][]int)
for _, part := range partitions {
zone := part.Leader.Rack
zonedPartitions[zone] = append(zonedPartitions[zone], part.ID)
}
zonedConsumers := make(map[string][]string)
for _, member := range members {
zone := string(member.UserData)
zonedConsumers[zone] = append(zonedConsumers[zone], member.ID)
}
targetPerMember := len(partitions) / len(members)
remainder := len(partitions) % len(members)
assignments := make(map[string][]int)
// assign as many as possible in zone. this will assign up to partsPerMember
// to each consumer. it will also prefer to allocate remainder partitions
// in zone if possible.
for zone, parts := range zonedPartitions {
consumers := zonedConsumers[zone]
if len(consumers) == 0 {
continue
}
// don't over-allocate. cap partition assignments at the calculated
// target.
partsPerMember := len(parts) / len(consumers)
if partsPerMember > targetPerMember {
partsPerMember = targetPerMember
}
for _, consumer := range consumers {
assignments[consumer] = append(assignments[consumer], parts[:partsPerMember]...)
parts = parts[partsPerMember:]
}
// if we had enough partitions for each consumer in this zone to hit its
// target, attempt to use any leftover partitions to satisfy the total
// remainder by adding at most 1 partition per consumer.
leftover := len(parts)
if partsPerMember == targetPerMember {
if leftover > remainder {
leftover = remainder
}
if leftover > len(consumers) {
leftover = len(consumers)
}
remainder -= leftover
}
// this loop covers the case where we're assigning extra partitions or
// if there weren't enough to satisfy the targetPerMember and the zoned
// partitions didn't divide evenly.
for i := 0; i < leftover; i++ {
assignments[consumers[i]] = append(assignments[consumers[i]], parts[i])
}
parts = parts[leftover:]
if len(parts) == 0 {
delete(zonedPartitions, zone)
} else {
zonedPartitions[zone] = parts
}
}
// assign out remainders regardless of zone.
var remaining []int
for _, partitions := range zonedPartitions {
remaining = append(remaining, partitions...)
}
for _, member := range members {
assigned := assignments[member.ID]
delta := targetPerMember - len(assigned)
// if it were possible to assign the remainder in zone, it's been taken
// care of already. now we will portion out any remainder to a member
// that can take it.
if delta >= 0 && remainder > 0 {
delta++
remainder--
}
if delta > 0 {
assignments[member.ID] = append(assigned, remaining[:delta]...)
remaining = remaining[delta:]
}
}
return assignments
}
// findPartitions extracts the partition ids associated with the topic from the
// list of Partitions provided.
func findPartitions(topic string, partitions []Partition) []int {
var ids []int
for _, partition := range partitions {
if partition.Topic == topic {
ids = append(ids, partition.ID)
}
}
return ids
}
// findMembersByTopic groups the memberGroupMetadata by topic.
func findMembersByTopic(members []GroupMember) map[string][]GroupMember {
membersByTopic := map[string][]GroupMember{}
for _, member := range members {
for _, topic := range member.Topics {
membersByTopic[topic] = append(membersByTopic[topic], member)
}
}
// normalize ordering of members to enabling grouping across topics by partitions
//
// Want:
// C0 [T0/P0, T1/P0]
// C1 [T0/P1, T1/P1]
//
// Not:
// C0 [T0/P0, T1/P1]
// C1 [T0/P1, T1/P0]
//
// Even though the later is still round robin, the partitions are crossed
//
for _, members := range membersByTopic {
sort.Slice(members, func(i, j int) bool {
return members[i].ID < members[j].ID
})
}
return membersByTopic
}
// findGroupBalancer returns the GroupBalancer with the specified protocolName
// from the slice provided.
func findGroupBalancer(protocolName string, balancers []GroupBalancer) (GroupBalancer, bool) {
for _, balancer := range balancers {
if balancer.ProtocolName() == protocolName {
return balancer, true
}
}
return nil, false
}