You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
506 lines
14 KiB
506 lines
14 KiB
package protocol
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
)
|
|
|
|
// Message is an interface implemented by all request and response types of the
|
|
// kafka protocol.
|
|
//
|
|
// This interface is used mostly as a safe-guard to provide a compile-time check
|
|
// for values passed to functions dealing kafka message types.
|
|
type Message interface {
|
|
ApiKey() ApiKey
|
|
}
|
|
|
|
type ApiKey int16
|
|
|
|
func (k ApiKey) String() string {
|
|
if i := int(k); i >= 0 && i < len(apiNames) {
|
|
return apiNames[i]
|
|
}
|
|
return strconv.Itoa(int(k))
|
|
}
|
|
|
|
func (k ApiKey) MinVersion() int16 { return k.apiType().minVersion() }
|
|
|
|
func (k ApiKey) MaxVersion() int16 { return k.apiType().maxVersion() }
|
|
|
|
func (k ApiKey) SelectVersion(minVersion, maxVersion int16) int16 {
|
|
min := k.MinVersion()
|
|
max := k.MaxVersion()
|
|
switch {
|
|
case min > maxVersion:
|
|
return min
|
|
case max < maxVersion:
|
|
return max
|
|
default:
|
|
return maxVersion
|
|
}
|
|
}
|
|
|
|
func (k ApiKey) apiType() apiType {
|
|
if i := int(k); i >= 0 && i < len(apiTypes) {
|
|
return apiTypes[i]
|
|
}
|
|
return apiType{}
|
|
}
|
|
|
|
const (
|
|
Produce ApiKey = 0
|
|
Fetch ApiKey = 1
|
|
ListOffsets ApiKey = 2
|
|
Metadata ApiKey = 3
|
|
LeaderAndIsr ApiKey = 4
|
|
StopReplica ApiKey = 5
|
|
UpdateMetadata ApiKey = 6
|
|
ControlledShutdown ApiKey = 7
|
|
OffsetCommit ApiKey = 8
|
|
OffsetFetch ApiKey = 9
|
|
FindCoordinator ApiKey = 10
|
|
JoinGroup ApiKey = 11
|
|
Heartbeat ApiKey = 12
|
|
LeaveGroup ApiKey = 13
|
|
SyncGroup ApiKey = 14
|
|
DescribeGroups ApiKey = 15
|
|
ListGroups ApiKey = 16
|
|
SaslHandshake ApiKey = 17
|
|
ApiVersions ApiKey = 18
|
|
CreateTopics ApiKey = 19
|
|
DeleteTopics ApiKey = 20
|
|
DeleteRecords ApiKey = 21
|
|
InitProducerId ApiKey = 22
|
|
OffsetForLeaderEpoch ApiKey = 23
|
|
AddPartitionsToTxn ApiKey = 24
|
|
AddOffsetsToTxn ApiKey = 25
|
|
EndTxn ApiKey = 26
|
|
WriteTxnMarkers ApiKey = 27
|
|
TxnOffsetCommit ApiKey = 28
|
|
DescribeAcls ApiKey = 29
|
|
CreateAcls ApiKey = 30
|
|
DeleteAcls ApiKey = 31
|
|
DescribeConfigs ApiKey = 32
|
|
AlterConfigs ApiKey = 33
|
|
AlterReplicaLogDirs ApiKey = 34
|
|
DescribeLogDirs ApiKey = 35
|
|
SaslAuthenticate ApiKey = 36
|
|
CreatePartitions ApiKey = 37
|
|
CreateDelegationToken ApiKey = 38
|
|
RenewDelegationToken ApiKey = 39
|
|
ExpireDelegationToken ApiKey = 40
|
|
DescribeDelegationToken ApiKey = 41
|
|
DeleteGroups ApiKey = 42
|
|
ElectLeaders ApiKey = 43
|
|
IncrementalAlterConfigs ApiKey = 44
|
|
AlterPartitionReassignments ApiKey = 45
|
|
ListPartitionReassignments ApiKey = 46
|
|
OffsetDelete ApiKey = 47
|
|
DescribeClientQuotas ApiKey = 48
|
|
AlterClientQuotas ApiKey = 49
|
|
|
|
numApis = 50
|
|
)
|
|
|
|
var apiNames = [numApis]string{
|
|
Produce: "Produce",
|
|
Fetch: "Fetch",
|
|
ListOffsets: "ListOffsets",
|
|
Metadata: "Metadata",
|
|
LeaderAndIsr: "LeaderAndIsr",
|
|
StopReplica: "StopReplica",
|
|
UpdateMetadata: "UpdateMetadata",
|
|
ControlledShutdown: "ControlledShutdown",
|
|
OffsetCommit: "OffsetCommit",
|
|
OffsetFetch: "OffsetFetch",
|
|
FindCoordinator: "FindCoordinator",
|
|
JoinGroup: "JoinGroup",
|
|
Heartbeat: "Heartbeat",
|
|
LeaveGroup: "LeaveGroup",
|
|
SyncGroup: "SyncGroup",
|
|
DescribeGroups: "DescribeGroups",
|
|
ListGroups: "ListGroups",
|
|
SaslHandshake: "SaslHandshake",
|
|
ApiVersions: "ApiVersions",
|
|
CreateTopics: "CreateTopics",
|
|
DeleteTopics: "DeleteTopics",
|
|
DeleteRecords: "DeleteRecords",
|
|
InitProducerId: "InitProducerId",
|
|
OffsetForLeaderEpoch: "OffsetForLeaderEpoch",
|
|
AddPartitionsToTxn: "AddPartitionsToTxn",
|
|
AddOffsetsToTxn: "AddOffsetsToTxn",
|
|
EndTxn: "EndTxn",
|
|
WriteTxnMarkers: "WriteTxnMarkers",
|
|
TxnOffsetCommit: "TxnOffsetCommit",
|
|
DescribeAcls: "DescribeAcls",
|
|
CreateAcls: "CreateAcls",
|
|
DeleteAcls: "DeleteAcls",
|
|
DescribeConfigs: "DescribeConfigs",
|
|
AlterConfigs: "AlterConfigs",
|
|
AlterReplicaLogDirs: "AlterReplicaLogDirs",
|
|
DescribeLogDirs: "DescribeLogDirs",
|
|
SaslAuthenticate: "SaslAuthenticate",
|
|
CreatePartitions: "CreatePartitions",
|
|
CreateDelegationToken: "CreateDelegationToken",
|
|
RenewDelegationToken: "RenewDelegationToken",
|
|
ExpireDelegationToken: "ExpireDelegationToken",
|
|
DescribeDelegationToken: "DescribeDelegationToken",
|
|
DeleteGroups: "DeleteGroups",
|
|
ElectLeaders: "ElectLeaders",
|
|
IncrementalAlterConfigs: "IncrementalAlterConfigs",
|
|
AlterPartitionReassignments: "AlterPartitionReassignments",
|
|
ListPartitionReassignments: "ListPartitionReassignments",
|
|
OffsetDelete: "OffsetDelete",
|
|
DescribeClientQuotas: "DescribeClientQuotas",
|
|
AlterClientQuotas: "AlterClientQuotas",
|
|
}
|
|
|
|
type messageType struct {
|
|
version int16
|
|
flexible bool
|
|
gotype reflect.Type
|
|
decode decodeFunc
|
|
encode encodeFunc
|
|
}
|
|
|
|
func (t *messageType) new() Message {
|
|
return reflect.New(t.gotype).Interface().(Message)
|
|
}
|
|
|
|
type apiType struct {
|
|
requests []messageType
|
|
responses []messageType
|
|
}
|
|
|
|
func (t apiType) minVersion() int16 {
|
|
if len(t.requests) == 0 {
|
|
return 0
|
|
}
|
|
return t.requests[0].version
|
|
}
|
|
|
|
func (t apiType) maxVersion() int16 {
|
|
if len(t.requests) == 0 {
|
|
return 0
|
|
}
|
|
return t.requests[len(t.requests)-1].version
|
|
}
|
|
|
|
var apiTypes [numApis]apiType
|
|
|
|
// Register is automatically called by sub-packages are imported to install a
|
|
// new pair of request/response message types.
|
|
func Register(req, res Message) {
|
|
k1 := req.ApiKey()
|
|
k2 := res.ApiKey()
|
|
|
|
if k1 != k2 {
|
|
panic(fmt.Sprintf("[%T/%T]: request and response API keys mismatch: %d != %d", req, res, k1, k2))
|
|
}
|
|
|
|
apiTypes[k1] = apiType{
|
|
requests: typesOf(req),
|
|
responses: typesOf(res),
|
|
}
|
|
}
|
|
|
|
func typesOf(v interface{}) []messageType {
|
|
return makeTypes(reflect.TypeOf(v).Elem())
|
|
}
|
|
|
|
func makeTypes(t reflect.Type) []messageType {
|
|
minVersion := int16(-1)
|
|
maxVersion := int16(-1)
|
|
|
|
// All future versions will be flexible (according to spec), so don't need to
|
|
// worry about maxes here.
|
|
minFlexibleVersion := int16(-1)
|
|
|
|
forEachStructField(t, func(_ reflect.Type, _ index, tag string) {
|
|
forEachStructTag(tag, func(tag structTag) bool {
|
|
if minVersion < 0 || tag.MinVersion < minVersion {
|
|
minVersion = tag.MinVersion
|
|
}
|
|
if maxVersion < 0 || tag.MaxVersion > maxVersion {
|
|
maxVersion = tag.MaxVersion
|
|
}
|
|
if tag.TagID > -2 && (minFlexibleVersion < 0 || tag.MinVersion < minFlexibleVersion) {
|
|
minFlexibleVersion = tag.MinVersion
|
|
}
|
|
return true
|
|
})
|
|
})
|
|
|
|
types := make([]messageType, 0, (maxVersion-minVersion)+1)
|
|
|
|
for v := minVersion; v <= maxVersion; v++ {
|
|
flexible := minFlexibleVersion >= 0 && v >= minFlexibleVersion
|
|
|
|
types = append(types, messageType{
|
|
version: v,
|
|
gotype: t,
|
|
flexible: flexible,
|
|
decode: decodeFuncOf(t, v, flexible, structTag{}),
|
|
encode: encodeFuncOf(t, v, flexible, structTag{}),
|
|
})
|
|
}
|
|
|
|
return types
|
|
}
|
|
|
|
type structTag struct {
|
|
MinVersion int16
|
|
MaxVersion int16
|
|
Compact bool
|
|
Nullable bool
|
|
TagID int
|
|
}
|
|
|
|
func forEachStructTag(tag string, do func(structTag) bool) {
|
|
if tag == "-" {
|
|
return // special case to ignore the field
|
|
}
|
|
|
|
forEach(tag, '|', func(s string) bool {
|
|
tag := structTag{
|
|
MinVersion: -1,
|
|
MaxVersion: -1,
|
|
|
|
// Legitimate tag IDs can start at 0. We use -1 as a placeholder to indicate
|
|
// that the message type is flexible, so that leaves -2 as the default for
|
|
// indicating that there is no tag ID and the message is not flexible.
|
|
TagID: -2,
|
|
}
|
|
|
|
var err error
|
|
forEach(s, ',', func(s string) bool {
|
|
switch {
|
|
case strings.HasPrefix(s, "min="):
|
|
tag.MinVersion, err = parseVersion(s[4:])
|
|
case strings.HasPrefix(s, "max="):
|
|
tag.MaxVersion, err = parseVersion(s[4:])
|
|
case s == "tag":
|
|
tag.TagID = -1
|
|
case strings.HasPrefix(s, "tag="):
|
|
tag.TagID, err = strconv.Atoi(s[4:])
|
|
case s == "compact":
|
|
tag.Compact = true
|
|
case s == "nullable":
|
|
tag.Nullable = true
|
|
default:
|
|
err = fmt.Errorf("unrecognized option: %q", s)
|
|
}
|
|
return err == nil
|
|
})
|
|
|
|
if err != nil {
|
|
panic(fmt.Errorf("malformed struct tag: %w", err))
|
|
}
|
|
|
|
if tag.MinVersion < 0 && tag.MaxVersion >= 0 {
|
|
panic(fmt.Errorf("missing minimum version in struct tag: %q", s))
|
|
}
|
|
|
|
if tag.MaxVersion < 0 && tag.MinVersion >= 0 {
|
|
panic(fmt.Errorf("missing maximum version in struct tag: %q", s))
|
|
}
|
|
|
|
if tag.MinVersion > tag.MaxVersion {
|
|
panic(fmt.Errorf("invalid version range in struct tag: %q", s))
|
|
}
|
|
|
|
return do(tag)
|
|
})
|
|
}
|
|
|
|
func forEach(s string, sep byte, do func(string) bool) bool {
|
|
for len(s) != 0 {
|
|
p := ""
|
|
i := strings.IndexByte(s, sep)
|
|
if i < 0 {
|
|
p, s = s, ""
|
|
} else {
|
|
p, s = s[:i], s[i+1:]
|
|
}
|
|
if !do(p) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func forEachStructField(t reflect.Type, do func(reflect.Type, index, string)) {
|
|
for i, n := 0, t.NumField(); i < n; i++ {
|
|
f := t.Field(i)
|
|
|
|
if f.PkgPath != "" && f.Name != "_" {
|
|
continue
|
|
}
|
|
|
|
kafkaTag, ok := f.Tag.Lookup("kafka")
|
|
if !ok {
|
|
kafkaTag = "|"
|
|
}
|
|
|
|
do(f.Type, indexOf(f), kafkaTag)
|
|
}
|
|
}
|
|
|
|
func parseVersion(s string) (int16, error) {
|
|
if !strings.HasPrefix(s, "v") {
|
|
return 0, fmt.Errorf("invalid version number: %q", s)
|
|
}
|
|
i, err := strconv.ParseInt(s[1:], 10, 16)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("invalid version number: %q: %w", s, err)
|
|
}
|
|
if i < 0 {
|
|
return 0, fmt.Errorf("invalid negative version number: %q", s)
|
|
}
|
|
return int16(i), nil
|
|
}
|
|
|
|
func dontExpectEOF(err error) error {
|
|
if err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
return io.ErrUnexpectedEOF
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type Broker struct {
|
|
Rack string
|
|
Host string
|
|
Port int32
|
|
ID int32
|
|
}
|
|
|
|
func (b Broker) String() string {
|
|
return net.JoinHostPort(b.Host, itoa(b.Port))
|
|
}
|
|
|
|
func (b Broker) Format(w fmt.State, v rune) {
|
|
switch v {
|
|
case 'd':
|
|
io.WriteString(w, itoa(b.ID))
|
|
case 's':
|
|
io.WriteString(w, b.String())
|
|
case 'v':
|
|
io.WriteString(w, itoa(b.ID))
|
|
io.WriteString(w, " ")
|
|
io.WriteString(w, b.String())
|
|
if b.Rack != "" {
|
|
io.WriteString(w, " ")
|
|
io.WriteString(w, b.Rack)
|
|
}
|
|
}
|
|
}
|
|
|
|
func itoa(i int32) string {
|
|
return strconv.Itoa(int(i))
|
|
}
|
|
|
|
type Topic struct {
|
|
Name string
|
|
Error int16
|
|
Partitions map[int32]Partition
|
|
}
|
|
|
|
type Partition struct {
|
|
ID int32
|
|
Error int16
|
|
Leader int32
|
|
Replicas []int32
|
|
ISR []int32
|
|
Offline []int32
|
|
}
|
|
|
|
// RawExchanger is an extention to the Message interface to allow messages
|
|
// to control the request response cycle for the message. This is currently
|
|
// only used to facilitate v0 SASL Authenticate requests being written in
|
|
// a non-standard fashion when the SASL Handshake was done at v0 but not
|
|
// when done at v1.
|
|
type RawExchanger interface {
|
|
// Required should return true when a RawExchange is needed.
|
|
// The passed in versions are the negotiated versions for the connection
|
|
// performing the request.
|
|
Required(versions map[ApiKey]int16) bool
|
|
// RawExchange is given the raw connection to the broker and the Message
|
|
// is responsible for writing itself to the connection as well as reading
|
|
// the response.
|
|
RawExchange(rw io.ReadWriter) (Message, error)
|
|
}
|
|
|
|
// BrokerMessage is an extension of the Message interface implemented by some
|
|
// request types to customize the broker assignment logic.
|
|
type BrokerMessage interface {
|
|
// Given a representation of the kafka cluster state as argument, returns
|
|
// the broker that the message should be routed to.
|
|
Broker(Cluster) (Broker, error)
|
|
}
|
|
|
|
// GroupMessage is an extension of the Message interface implemented by some
|
|
// request types to inform the program that they should be routed to a group
|
|
// coordinator.
|
|
type GroupMessage interface {
|
|
// Returns the group configured on the message.
|
|
Group() string
|
|
}
|
|
|
|
// TransactionalMessage is an extension of the Message interface implemented by some
|
|
// request types to inform the program that they should be routed to a transaction
|
|
// coordinator.
|
|
type TransactionalMessage interface {
|
|
// Returns the transactional id configured on the message.
|
|
Transaction() string
|
|
}
|
|
|
|
// PreparedMessage is an extension of the Message interface implemented by some
|
|
// request types which may need to run some pre-processing on their state before
|
|
// being sent.
|
|
type PreparedMessage interface {
|
|
// Prepares the message before being sent to a kafka broker using the API
|
|
// version passed as argument.
|
|
Prepare(apiVersion int16)
|
|
}
|
|
|
|
// Splitter is an interface implemented by messages that can be split into
|
|
// multiple requests and have their results merged back by a Merger.
|
|
type Splitter interface {
|
|
// For a given cluster layout, returns the list of messages constructed
|
|
// from the receiver for each requests that should be sent to the cluster.
|
|
// The second return value is a Merger which can be used to merge back the
|
|
// results of each request into a single message (or an error).
|
|
Split(Cluster) ([]Message, Merger, error)
|
|
}
|
|
|
|
// Merger is an interface implemented by messages which can merge multiple
|
|
// results into one response.
|
|
type Merger interface {
|
|
// Given a list of message and associated results, merge them back into a
|
|
// response (or an error). The results must be either Message or error
|
|
// values, other types should trigger a panic.
|
|
Merge(messages []Message, results []interface{}) (Message, error)
|
|
}
|
|
|
|
// Result converts r to a Message or an error, or panics if r could not be
|
|
// converted to these types.
|
|
func Result(r interface{}) (Message, error) {
|
|
switch v := r.(type) {
|
|
case Message:
|
|
return v, nil
|
|
case error:
|
|
return nil, v
|
|
default:
|
|
panic(fmt.Errorf("BUG: result must be a message or an error but not %T", v))
|
|
}
|
|
}
|
|
|