You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1619 lines
46 KiB
1619 lines
46 KiB
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
LastOffset int64 = -1 // The most recent offset available for a partition.
|
|
FirstOffset int64 = -2 // The least recent offset available for a partition.
|
|
)
|
|
|
|
const (
|
|
// defaultCommitRetries holds the number commit attempts to make
|
|
// before giving up.
|
|
defaultCommitRetries = 3
|
|
)
|
|
|
|
const (
|
|
// defaultFetchMinBytes of 1 byte means that fetch requests are answered as
|
|
// soon as a single byte of data is available or the fetch request times out
|
|
// waiting for data to arrive.
|
|
defaultFetchMinBytes = 1
|
|
)
|
|
|
|
var (
|
|
errOnlyAvailableWithGroup = errors.New("unavailable when GroupID is not set")
|
|
errNotAvailableWithGroup = errors.New("unavailable when GroupID is set")
|
|
)
|
|
|
|
const (
|
|
// defaultReadBackoffMax/Min sets the boundaries for how long the reader wait before
|
|
// polling for new messages.
|
|
defaultReadBackoffMin = 100 * time.Millisecond
|
|
defaultReadBackoffMax = 1 * time.Second
|
|
)
|
|
|
|
// Reader provides a high-level API for consuming messages from kafka.
|
|
//
|
|
// A Reader automatically manages reconnections to a kafka server, and
|
|
// blocking methods have context support for asynchronous cancellations.
|
|
//
|
|
// Note that it is important to call `Close()` on a `Reader` when a process exits.
|
|
// The kafka server needs a graceful disconnect to stop it from continuing to
|
|
// attempt to send messages to the connected clients. The given example will not
|
|
// call `Close()` if the process is terminated with SIGINT (ctrl-c at the shell) or
|
|
// SIGTERM (as docker stop or a kubernetes restart does). This can result in a
|
|
// delay when a new reader on the same topic connects (e.g. new process started
|
|
// or new container running). Use a `signal.Notify` handler to close the reader on
|
|
// process shutdown.
|
|
type Reader struct {
|
|
// immutable fields of the reader
|
|
config ReaderConfig
|
|
|
|
// communication channels between the parent reader and its subreaders
|
|
msgs chan readerMessage
|
|
|
|
// mutable fields of the reader (synchronized on the mutex)
|
|
mutex sync.Mutex
|
|
join sync.WaitGroup
|
|
cancel context.CancelFunc
|
|
stop context.CancelFunc
|
|
done chan struct{}
|
|
commits chan commitRequest
|
|
version int64 // version holds the generation of the spawned readers
|
|
offset int64
|
|
lag int64
|
|
closed bool
|
|
|
|
// Without a group subscription (when Reader.config.GroupID == ""),
|
|
// when errors occur, the Reader gets a synthetic readerMessage with
|
|
// a non-nil err set. With group subscriptions however, when an error
|
|
// occurs in Reader.run, there's no reader running (sic, cf. reader vs.
|
|
// Reader) and there's no way to let the high-level methods like
|
|
// FetchMessage know that an error indeed occurred. If an error in run
|
|
// occurs, it will be non-block-sent to this unbuffered channel, where
|
|
// the high-level methods can select{} on it and notify the caller.
|
|
runError chan error
|
|
|
|
// reader stats are all made of atomic values, no need for synchronization.
|
|
once uint32
|
|
stctx context.Context
|
|
// reader stats are all made of atomic values, no need for synchronization.
|
|
// Use a pointer to ensure 64-bit alignment of the values.
|
|
stats *readerStats
|
|
}
|
|
|
|
// useConsumerGroup indicates whether the Reader is part of a consumer group.
|
|
func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" }
|
|
|
|
func (r *Reader) getTopics() []string {
|
|
if len(r.config.GroupTopics) > 0 {
|
|
return r.config.GroupTopics[:]
|
|
}
|
|
|
|
return []string{r.config.Topic}
|
|
}
|
|
|
|
// useSyncCommits indicates whether the Reader is configured to perform sync or
|
|
// async commits.
|
|
func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 }
|
|
|
|
func (r *Reader) unsubscribe() {
|
|
r.cancel()
|
|
r.join.Wait()
|
|
// it would be interesting to drain the r.msgs channel at this point since
|
|
// it will contain buffered messages for partitions that may not be
|
|
// re-assigned to this reader in the next consumer group generation.
|
|
// however, draining the channel could race with the client calling
|
|
// ReadMessage, which could result in messages delivered and/or committed
|
|
// with gaps in the offset. for now, we will err on the side of caution and
|
|
// potentially have those messages be reprocessed in the next generation by
|
|
// another consumer to avoid such a race.
|
|
}
|
|
|
|
func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
|
|
offsets := make(map[topicPartition]int64)
|
|
for topic, assignments := range allAssignments {
|
|
for _, assignment := range assignments {
|
|
key := topicPartition{
|
|
topic: topic,
|
|
partition: int32(assignment.ID),
|
|
}
|
|
offsets[key] = assignment.Offset
|
|
}
|
|
}
|
|
|
|
r.mutex.Lock()
|
|
r.start(offsets)
|
|
r.mutex.Unlock()
|
|
|
|
r.withLogger(func(l Logger) {
|
|
l.Printf("subscribed to topics and partitions: %+v", offsets)
|
|
})
|
|
}
|
|
|
|
// commitOffsetsWithRetry attempts to commit the specified offsets and retries
|
|
// up to the specified number of times.
|
|
func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash, retries int) (err error) {
|
|
const (
|
|
backoffDelayMin = 100 * time.Millisecond
|
|
backoffDelayMax = 5 * time.Second
|
|
)
|
|
|
|
for attempt := 0; attempt < retries; attempt++ {
|
|
if attempt != 0 {
|
|
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
|
|
return
|
|
}
|
|
}
|
|
|
|
if err = gen.CommitOffsets(offsetStash); err == nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
return // err will not be nil
|
|
}
|
|
|
|
// offsetStash holds offsets by topic => partition => offset.
|
|
type offsetStash map[string]map[int]int64
|
|
|
|
// merge updates the offsetStash with the offsets from the provided messages.
|
|
func (o offsetStash) merge(commits []commit) {
|
|
for _, c := range commits {
|
|
offsetsByPartition, ok := o[c.topic]
|
|
if !ok {
|
|
offsetsByPartition = map[int]int64{}
|
|
o[c.topic] = offsetsByPartition
|
|
}
|
|
|
|
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
|
|
offsetsByPartition[c.partition] = c.offset
|
|
}
|
|
}
|
|
}
|
|
|
|
// reset clears the contents of the offsetStash.
|
|
func (o offsetStash) reset() {
|
|
for key := range o {
|
|
delete(o, key)
|
|
}
|
|
}
|
|
|
|
// commitLoopImmediate handles each commit synchronously.
|
|
func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
|
|
offsets := offsetStash{}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
// drain the commit channel and prepare a single, final commit.
|
|
// the commit will combine any outstanding requests and the result
|
|
// will be sent back to all the callers of CommitMessages so that
|
|
// they can return.
|
|
var errchs []chan<- error
|
|
for hasCommits := true; hasCommits; {
|
|
select {
|
|
case req := <-r.commits:
|
|
offsets.merge(req.commits)
|
|
errchs = append(errchs, req.errch)
|
|
default:
|
|
hasCommits = false
|
|
}
|
|
}
|
|
err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
|
|
for _, errch := range errchs {
|
|
// NOTE : this will be a buffered channel and will not block.
|
|
errch <- err
|
|
}
|
|
return
|
|
|
|
case req := <-r.commits:
|
|
offsets.merge(req.commits)
|
|
req.errch <- r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
|
|
offsets.reset()
|
|
}
|
|
}
|
|
}
|
|
|
|
// commitLoopInterval handles each commit asynchronously with a period defined
|
|
// by ReaderConfig.CommitInterval.
|
|
func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) {
|
|
ticker := time.NewTicker(r.config.CommitInterval)
|
|
defer ticker.Stop()
|
|
|
|
// the offset stash should not survive rebalances b/c the consumer may
|
|
// receive new assignments.
|
|
offsets := offsetStash{}
|
|
|
|
commit := func() {
|
|
if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil {
|
|
r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) })
|
|
} else {
|
|
offsets.reset()
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
// drain the commit channel in order to prepare the final commit.
|
|
for hasCommits := true; hasCommits; {
|
|
select {
|
|
case req := <-r.commits:
|
|
offsets.merge(req.commits)
|
|
default:
|
|
hasCommits = false
|
|
}
|
|
}
|
|
commit()
|
|
return
|
|
|
|
case <-ticker.C:
|
|
commit()
|
|
|
|
case req := <-r.commits:
|
|
offsets.merge(req.commits)
|
|
}
|
|
}
|
|
}
|
|
|
|
// commitLoop processes commits off the commit chan.
|
|
func (r *Reader) commitLoop(ctx context.Context, gen *Generation) {
|
|
r.withLogger(func(l Logger) {
|
|
l.Printf("started commit for group %s\n", r.config.GroupID)
|
|
})
|
|
defer r.withLogger(func(l Logger) {
|
|
l.Printf("stopped commit for group %s\n", r.config.GroupID)
|
|
})
|
|
|
|
if r.useSyncCommits() {
|
|
r.commitLoopImmediate(ctx, gen)
|
|
} else {
|
|
r.commitLoopInterval(ctx, gen)
|
|
}
|
|
}
|
|
|
|
// run provides the main consumer group management loop. Each iteration performs the
|
|
// handshake to join the Reader to the consumer group.
|
|
//
|
|
// This function is responsible for closing the consumer group upon exit.
|
|
func (r *Reader) run(cg *ConsumerGroup) {
|
|
defer close(r.done)
|
|
defer cg.Close()
|
|
|
|
r.withLogger(func(l Logger) {
|
|
l.Printf("entering loop for consumer group, %v\n", r.config.GroupID)
|
|
})
|
|
|
|
for {
|
|
// Limit the number of attempts at waiting for the next
|
|
// consumer generation.
|
|
var err error
|
|
var gen *Generation
|
|
for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ {
|
|
gen, err = cg.Next(r.stctx)
|
|
if err == nil {
|
|
break
|
|
}
|
|
if errors.Is(err, r.stctx.Err()) {
|
|
return
|
|
}
|
|
r.stats.errors.observe(1)
|
|
r.withErrorLogger(func(l Logger) {
|
|
l.Printf(err.Error())
|
|
})
|
|
// Continue with next attempt...
|
|
}
|
|
if err != nil {
|
|
// All attempts have failed.
|
|
select {
|
|
case r.runError <- err:
|
|
// If somebody's receiving on the runError, let
|
|
// them know the error occurred.
|
|
default:
|
|
// Otherwise, don't block to allow healing.
|
|
}
|
|
continue
|
|
}
|
|
|
|
r.stats.rebalances.observe(1)
|
|
|
|
r.subscribe(gen.Assignments)
|
|
|
|
gen.Start(func(ctx context.Context) {
|
|
r.commitLoop(ctx, gen)
|
|
})
|
|
gen.Start(func(ctx context.Context) {
|
|
// wait for the generation to end and then unsubscribe.
|
|
select {
|
|
case <-ctx.Done():
|
|
// continue to next generation
|
|
case <-r.stctx.Done():
|
|
// this will be the last loop because the reader is closed.
|
|
}
|
|
r.unsubscribe()
|
|
})
|
|
}
|
|
}
|
|
|
|
// ReaderConfig is a configuration object used to create new instances of
|
|
// Reader.
|
|
type ReaderConfig struct {
|
|
// The list of broker addresses used to connect to the kafka cluster.
|
|
Brokers []string
|
|
|
|
// GroupID holds the optional consumer group id. If GroupID is specified, then
|
|
// Partition should NOT be specified e.g. 0
|
|
GroupID string
|
|
|
|
// GroupTopics allows specifying multiple topics, but can only be used in
|
|
// combination with GroupID, as it is a consumer-group feature. As such, if
|
|
// GroupID is set, then either Topic or GroupTopics must be defined.
|
|
GroupTopics []string
|
|
|
|
// The topic to read messages from.
|
|
Topic string
|
|
|
|
// Partition to read messages from. Either Partition or GroupID may
|
|
// be assigned, but not both
|
|
Partition int
|
|
|
|
// An dialer used to open connections to the kafka server. This field is
|
|
// optional, if nil, the default dialer is used instead.
|
|
Dialer *Dialer
|
|
|
|
// The capacity of the internal message queue, defaults to 100 if none is
|
|
// set.
|
|
QueueCapacity int
|
|
|
|
// MinBytes indicates to the broker the minimum batch size that the consumer
|
|
// will accept. Setting a high minimum when consuming from a low-volume topic
|
|
// may result in delayed delivery when the broker does not have enough data to
|
|
// satisfy the defined minimum.
|
|
//
|
|
// Default: 1
|
|
MinBytes int
|
|
|
|
// MaxBytes indicates to the broker the maximum batch size that the consumer
|
|
// will accept. The broker will truncate a message to satisfy this maximum, so
|
|
// choose a value that is high enough for your largest message size.
|
|
//
|
|
// Default: 1MB
|
|
MaxBytes int
|
|
|
|
// Maximum amount of time to wait for new data to come when fetching batches
|
|
// of messages from kafka.
|
|
//
|
|
// Default: 10s
|
|
MaxWait time.Duration
|
|
|
|
// ReadBatchTimeout amount of time to wait to fetch message from kafka messages batch.
|
|
//
|
|
// Default: 10s
|
|
ReadBatchTimeout time.Duration
|
|
|
|
// ReadLagInterval sets the frequency at which the reader lag is updated.
|
|
// Setting this field to a negative value disables lag reporting.
|
|
ReadLagInterval time.Duration
|
|
|
|
// GroupBalancers is the priority-ordered list of client-side consumer group
|
|
// balancing strategies that will be offered to the coordinator. The first
|
|
// strategy that all group members support will be chosen by the leader.
|
|
//
|
|
// Default: [Range, RoundRobin]
|
|
//
|
|
// Only used when GroupID is set
|
|
GroupBalancers []GroupBalancer
|
|
|
|
// HeartbeatInterval sets the optional frequency at which the reader sends the consumer
|
|
// group heartbeat update.
|
|
//
|
|
// Default: 3s
|
|
//
|
|
// Only used when GroupID is set
|
|
HeartbeatInterval time.Duration
|
|
|
|
// CommitInterval indicates the interval at which offsets are committed to
|
|
// the broker. If 0, commits will be handled synchronously.
|
|
//
|
|
// Default: 0
|
|
//
|
|
// Only used when GroupID is set
|
|
CommitInterval time.Duration
|
|
|
|
// PartitionWatchInterval indicates how often a reader checks for partition changes.
|
|
// If a reader sees a partition change (such as a partition add) it will rebalance the group
|
|
// picking up new partitions.
|
|
//
|
|
// Default: 5s
|
|
//
|
|
// Only used when GroupID is set and WatchPartitionChanges is set.
|
|
PartitionWatchInterval time.Duration
|
|
|
|
// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
|
|
// polling the brokers and rebalancing if any partition changes happen to the topic.
|
|
WatchPartitionChanges bool
|
|
|
|
// SessionTimeout optionally sets the length of time that may pass without a heartbeat
|
|
// before the coordinator considers the consumer dead and initiates a rebalance.
|
|
//
|
|
// Default: 30s
|
|
//
|
|
// Only used when GroupID is set
|
|
SessionTimeout time.Duration
|
|
|
|
// RebalanceTimeout optionally sets the length of time the coordinator will wait
|
|
// for members to join as part of a rebalance. For kafka servers under higher
|
|
// load, it may be useful to set this value higher.
|
|
//
|
|
// Default: 30s
|
|
//
|
|
// Only used when GroupID is set
|
|
RebalanceTimeout time.Duration
|
|
|
|
// JoinGroupBackoff optionally sets the length of time to wait between re-joining
|
|
// the consumer group after an error.
|
|
//
|
|
// Default: 5s
|
|
JoinGroupBackoff time.Duration
|
|
|
|
// RetentionTime optionally sets the length of time the consumer group will be saved
|
|
// by the broker
|
|
//
|
|
// Default: 24h
|
|
//
|
|
// Only used when GroupID is set
|
|
RetentionTime time.Duration
|
|
|
|
// StartOffset determines from whence the consumer group should begin
|
|
// consuming when it finds a partition without a committed offset. If
|
|
// non-zero, it must be set to one of FirstOffset or LastOffset.
|
|
//
|
|
// Default: FirstOffset
|
|
//
|
|
// Only used when GroupID is set
|
|
StartOffset int64
|
|
|
|
// BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
|
|
// polling for new messages
|
|
//
|
|
// Default: 100ms
|
|
ReadBackoffMin time.Duration
|
|
|
|
// BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
|
|
// polling for new messages
|
|
//
|
|
// Default: 1s
|
|
ReadBackoffMax time.Duration
|
|
|
|
// If not nil, specifies a logger used to report internal changes within the
|
|
// reader.
|
|
Logger Logger
|
|
|
|
// ErrorLogger is the logger used to report errors. If nil, the reader falls
|
|
// back to using Logger instead.
|
|
ErrorLogger Logger
|
|
|
|
// IsolationLevel controls the visibility of transactional records.
|
|
// ReadUncommitted makes all records visible. With ReadCommitted only
|
|
// non-transactional and committed records are visible.
|
|
IsolationLevel IsolationLevel
|
|
|
|
// Limit of how many attempts will be made before delivering the error.
|
|
//
|
|
// The default is to try 3 times.
|
|
MaxAttempts int
|
|
|
|
// OffsetOutOfRangeError indicates that the reader should return an error in
|
|
// the event of an OffsetOutOfRange error, rather than retrying indefinitely.
|
|
// This flag is being added to retain backwards-compatibility, so it will be
|
|
// removed in a future version of kafka-go.
|
|
OffsetOutOfRangeError bool
|
|
}
|
|
|
|
// Validate method validates ReaderConfig properties.
|
|
func (config *ReaderConfig) Validate() error {
|
|
if len(config.Brokers) == 0 {
|
|
return errors.New("cannot create a new kafka reader with an empty list of broker addresses")
|
|
}
|
|
|
|
if config.Partition < 0 || config.Partition >= math.MaxInt32 {
|
|
return fmt.Errorf("partition number out of bounds: %d", config.Partition)
|
|
}
|
|
|
|
if config.MinBytes < 0 {
|
|
return fmt.Errorf("invalid negative minimum batch size (min = %d)", config.MinBytes)
|
|
}
|
|
|
|
if config.MaxBytes < 0 {
|
|
return fmt.Errorf("invalid negative maximum batch size (max = %d)", config.MaxBytes)
|
|
}
|
|
|
|
if config.GroupID != "" {
|
|
if config.Partition != 0 {
|
|
return errors.New("either Partition or GroupID may be specified, but not both")
|
|
}
|
|
|
|
if len(config.Topic) == 0 && len(config.GroupTopics) == 0 {
|
|
return errors.New("either Topic or GroupTopics must be specified with GroupID")
|
|
}
|
|
} else if len(config.Topic) == 0 {
|
|
return errors.New("cannot create a new kafka reader with an empty topic")
|
|
}
|
|
|
|
if config.MinBytes > config.MaxBytes {
|
|
return fmt.Errorf("minimum batch size greater than the maximum (min = %d, max = %d)", config.MinBytes, config.MaxBytes)
|
|
}
|
|
|
|
if config.ReadBackoffMax < 0 {
|
|
return fmt.Errorf("ReadBackoffMax out of bounds: %d", config.ReadBackoffMax)
|
|
}
|
|
|
|
if config.ReadBackoffMin < 0 {
|
|
return fmt.Errorf("ReadBackoffMin out of bounds: %d", config.ReadBackoffMin)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReaderStats is a data structure returned by a call to Reader.Stats that exposes
|
|
// details about the behavior of the reader.
|
|
type ReaderStats struct {
|
|
Dials int64 `metric:"kafka.reader.dial.count" type:"counter"`
|
|
Fetches int64 `metric:"kafka.reader.fetch.count" type:"counter"`
|
|
Messages int64 `metric:"kafka.reader.message.count" type:"counter"`
|
|
Bytes int64 `metric:"kafka.reader.message.bytes" type:"counter"`
|
|
Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"`
|
|
Timeouts int64 `metric:"kafka.reader.timeout.count" type:"counter"`
|
|
Errors int64 `metric:"kafka.reader.error.count" type:"counter"`
|
|
|
|
DialTime DurationStats `metric:"kafka.reader.dial.seconds"`
|
|
ReadTime DurationStats `metric:"kafka.reader.read.seconds"`
|
|
WaitTime DurationStats `metric:"kafka.reader.wait.seconds"`
|
|
FetchSize SummaryStats `metric:"kafka.reader.fetch.size"`
|
|
FetchBytes SummaryStats `metric:"kafka.reader.fetch.bytes"`
|
|
|
|
Offset int64 `metric:"kafka.reader.offset" type:"gauge"`
|
|
Lag int64 `metric:"kafka.reader.lag" type:"gauge"`
|
|
MinBytes int64 `metric:"kafka.reader.fetch_bytes.min" type:"gauge"`
|
|
MaxBytes int64 `metric:"kafka.reader.fetch_bytes.max" type:"gauge"`
|
|
MaxWait time.Duration `metric:"kafka.reader.fetch_wait.max" type:"gauge"`
|
|
QueueLength int64 `metric:"kafka.reader.queue.length" type:"gauge"`
|
|
QueueCapacity int64 `metric:"kafka.reader.queue.capacity" type:"gauge"`
|
|
|
|
ClientID string `tag:"client_id"`
|
|
Topic string `tag:"topic"`
|
|
Partition string `tag:"partition"`
|
|
|
|
// The original `Fetches` field had a typo where the metric name was called
|
|
// "kafak..." instead of "kafka...", in order to offer time to fix monitors
|
|
// that may be relying on this mistake we are temporarily introducing this
|
|
// field.
|
|
DeprecatedFetchesWithTypo int64 `metric:"kafak.reader.fetch.count" type:"counter"`
|
|
}
|
|
|
|
// readerStats is a struct that contains statistics on a reader.
|
|
type readerStats struct {
|
|
dials counter
|
|
fetches counter
|
|
messages counter
|
|
bytes counter
|
|
rebalances counter
|
|
timeouts counter
|
|
errors counter
|
|
dialTime summary
|
|
readTime summary
|
|
waitTime summary
|
|
fetchSize summary
|
|
fetchBytes summary
|
|
offset gauge
|
|
lag gauge
|
|
partition string
|
|
}
|
|
|
|
// NewReader creates and returns a new Reader configured with config.
|
|
// The offset is initialized to FirstOffset.
|
|
func NewReader(config ReaderConfig) *Reader {
|
|
if err := config.Validate(); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if config.GroupID != "" {
|
|
if len(config.GroupBalancers) == 0 {
|
|
config.GroupBalancers = []GroupBalancer{
|
|
RangeGroupBalancer{},
|
|
RoundRobinGroupBalancer{},
|
|
}
|
|
}
|
|
}
|
|
|
|
if config.Dialer == nil {
|
|
config.Dialer = DefaultDialer
|
|
}
|
|
|
|
if config.MaxBytes == 0 {
|
|
config.MaxBytes = 1e6 // 1 MB
|
|
}
|
|
|
|
if config.MinBytes == 0 {
|
|
config.MinBytes = defaultFetchMinBytes
|
|
}
|
|
|
|
if config.MaxWait == 0 {
|
|
config.MaxWait = 10 * time.Second
|
|
}
|
|
|
|
if config.ReadBatchTimeout == 0 {
|
|
config.ReadBatchTimeout = 10 * time.Second
|
|
}
|
|
|
|
if config.ReadLagInterval == 0 {
|
|
config.ReadLagInterval = 1 * time.Minute
|
|
}
|
|
|
|
if config.ReadBackoffMin == 0 {
|
|
config.ReadBackoffMin = defaultReadBackoffMin
|
|
}
|
|
|
|
if config.ReadBackoffMax == 0 {
|
|
config.ReadBackoffMax = defaultReadBackoffMax
|
|
}
|
|
|
|
if config.ReadBackoffMax < config.ReadBackoffMin {
|
|
panic(fmt.Errorf("ReadBackoffMax %d smaller than ReadBackoffMin %d", config.ReadBackoffMax, config.ReadBackoffMin))
|
|
}
|
|
|
|
if config.QueueCapacity == 0 {
|
|
config.QueueCapacity = 100
|
|
}
|
|
|
|
if config.MaxAttempts == 0 {
|
|
config.MaxAttempts = 3
|
|
}
|
|
|
|
// when configured as a consumer group; stats should report a partition of -1
|
|
readerStatsPartition := config.Partition
|
|
if config.GroupID != "" {
|
|
readerStatsPartition = -1
|
|
}
|
|
|
|
// when configured as a consume group, start version as 1 to ensure that only
|
|
// the rebalance function will start readers
|
|
version := int64(0)
|
|
if config.GroupID != "" {
|
|
version = 1
|
|
}
|
|
|
|
stctx, stop := context.WithCancel(context.Background())
|
|
r := &Reader{
|
|
config: config,
|
|
msgs: make(chan readerMessage, config.QueueCapacity),
|
|
cancel: func() {},
|
|
commits: make(chan commitRequest, config.QueueCapacity),
|
|
stop: stop,
|
|
offset: FirstOffset,
|
|
stctx: stctx,
|
|
stats: &readerStats{
|
|
dialTime: makeSummary(),
|
|
readTime: makeSummary(),
|
|
waitTime: makeSummary(),
|
|
fetchSize: makeSummary(),
|
|
fetchBytes: makeSummary(),
|
|
// Generate the string representation of the partition number only
|
|
// once when the reader is created.
|
|
partition: strconv.Itoa(readerStatsPartition),
|
|
},
|
|
version: version,
|
|
}
|
|
if r.useConsumerGroup() {
|
|
r.done = make(chan struct{})
|
|
r.runError = make(chan error)
|
|
cg, err := NewConsumerGroup(ConsumerGroupConfig{
|
|
ID: r.config.GroupID,
|
|
Brokers: r.config.Brokers,
|
|
Dialer: r.config.Dialer,
|
|
Topics: r.getTopics(),
|
|
GroupBalancers: r.config.GroupBalancers,
|
|
HeartbeatInterval: r.config.HeartbeatInterval,
|
|
PartitionWatchInterval: r.config.PartitionWatchInterval,
|
|
WatchPartitionChanges: r.config.WatchPartitionChanges,
|
|
SessionTimeout: r.config.SessionTimeout,
|
|
RebalanceTimeout: r.config.RebalanceTimeout,
|
|
JoinGroupBackoff: r.config.JoinGroupBackoff,
|
|
RetentionTime: r.config.RetentionTime,
|
|
StartOffset: r.config.StartOffset,
|
|
Logger: r.config.Logger,
|
|
ErrorLogger: r.config.ErrorLogger,
|
|
})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
go r.run(cg)
|
|
}
|
|
|
|
return r
|
|
}
|
|
|
|
// Config returns the reader's configuration.
|
|
func (r *Reader) Config() ReaderConfig {
|
|
return r.config
|
|
}
|
|
|
|
// Close closes the stream, preventing the program from reading any more
|
|
// messages from it.
|
|
func (r *Reader) Close() error {
|
|
atomic.StoreUint32(&r.once, 1)
|
|
|
|
r.mutex.Lock()
|
|
closed := r.closed
|
|
r.closed = true
|
|
r.mutex.Unlock()
|
|
|
|
r.cancel()
|
|
r.stop()
|
|
r.join.Wait()
|
|
|
|
if r.done != nil {
|
|
<-r.done
|
|
}
|
|
|
|
if !closed {
|
|
close(r.msgs)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReadMessage reads and return the next message from the r. The method call
|
|
// blocks until a message becomes available, or an error occurs. The program
|
|
// may also specify a context to asynchronously cancel the blocking operation.
|
|
//
|
|
// The method returns io.EOF to indicate that the reader has been closed.
|
|
//
|
|
// If consumer groups are used, ReadMessage will automatically commit the
|
|
// offset when called. Note that this could result in an offset being committed
|
|
// before the message is fully processed.
|
|
//
|
|
// If more fine grained control of when offsets are committed is required, it
|
|
// is recommended to use FetchMessage with CommitMessages instead.
|
|
func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
|
|
m, err := r.FetchMessage(ctx)
|
|
if err != nil {
|
|
return Message{}, err
|
|
}
|
|
|
|
if r.useConsumerGroup() {
|
|
if err := r.CommitMessages(ctx, m); err != nil {
|
|
return Message{}, err
|
|
}
|
|
}
|
|
|
|
return m, nil
|
|
}
|
|
|
|
// FetchMessage reads and return the next message from the r. The method call
|
|
// blocks until a message becomes available, or an error occurs. The program
|
|
// may also specify a context to asynchronously cancel the blocking operation.
|
|
//
|
|
// The method returns io.EOF to indicate that the reader has been closed.
|
|
//
|
|
// FetchMessage does not commit offsets automatically when using consumer groups.
|
|
// Use CommitMessages to commit the offset.
|
|
func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
|
|
r.activateReadLag()
|
|
|
|
for {
|
|
r.mutex.Lock()
|
|
|
|
if !r.closed && r.version == 0 {
|
|
r.start(r.getTopicPartitionOffset())
|
|
}
|
|
|
|
version := r.version
|
|
r.mutex.Unlock()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return Message{}, ctx.Err()
|
|
|
|
case err := <-r.runError:
|
|
return Message{}, err
|
|
|
|
case m, ok := <-r.msgs:
|
|
if !ok {
|
|
return Message{}, io.EOF
|
|
}
|
|
|
|
if m.version >= version {
|
|
r.mutex.Lock()
|
|
|
|
switch {
|
|
case m.error != nil:
|
|
case version == r.version:
|
|
r.offset = m.message.Offset + 1
|
|
r.lag = m.watermark - r.offset
|
|
}
|
|
|
|
r.mutex.Unlock()
|
|
|
|
if errors.Is(m.error, io.EOF) {
|
|
// io.EOF is used as a marker to indicate that the stream
|
|
// has been closed, in case it was received from the inner
|
|
// reader we don't want to confuse the program and replace
|
|
// the error with io.ErrUnexpectedEOF.
|
|
m.error = io.ErrUnexpectedEOF
|
|
}
|
|
|
|
return m.message, m.error
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// CommitMessages commits the list of messages passed as argument. The program
|
|
// may pass a context to asynchronously cancel the commit operation when it was
|
|
// configured to be blocking.
|
|
//
|
|
// Because kafka consumer groups track a single offset per partition, the
|
|
// highest message offset passed to CommitMessages will cause all previous
|
|
// messages to be committed. Applications need to account for these Kafka
|
|
// limitations when committing messages, and maintain message ordering if they
|
|
// need strong delivery guarantees. This property makes it valid to pass only
|
|
// the last message seen to CommitMessages in order to move the offset of the
|
|
// topic/partition it belonged to forward, effectively committing all previous
|
|
// messages in the partition.
|
|
func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
|
|
if !r.useConsumerGroup() {
|
|
return errOnlyAvailableWithGroup
|
|
}
|
|
|
|
var errch <-chan error
|
|
creq := commitRequest{
|
|
commits: makeCommits(msgs...),
|
|
}
|
|
|
|
if r.useSyncCommits() {
|
|
ch := make(chan error, 1)
|
|
errch, creq.errch = ch, ch
|
|
}
|
|
|
|
select {
|
|
case r.commits <- creq:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-r.stctx.Done():
|
|
// This context is used to ensure we don't allow commits after the
|
|
// reader was closed.
|
|
return io.ErrClosedPipe
|
|
}
|
|
|
|
if !r.useSyncCommits() {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case err := <-errch:
|
|
return err
|
|
}
|
|
}
|
|
|
|
// ReadLag returns the current lag of the reader by fetching the last offset of
|
|
// the topic and partition and computing the difference between that value and
|
|
// the offset of the last message returned by ReadMessage.
|
|
//
|
|
// This method is intended to be used in cases where a program may be unable to
|
|
// call ReadMessage to update the value returned by Lag, but still needs to get
|
|
// an up to date estimation of how far behind the reader is. For example when
|
|
// the consumer is not ready to process the next message.
|
|
//
|
|
// The function returns a lag of zero when the reader's current offset is
|
|
// negative.
|
|
func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) {
|
|
if r.useConsumerGroup() {
|
|
return 0, errNotAvailableWithGroup
|
|
}
|
|
|
|
type offsets struct {
|
|
first int64
|
|
last int64
|
|
}
|
|
|
|
offch := make(chan offsets, 1)
|
|
errch := make(chan error, 1)
|
|
|
|
go func() {
|
|
var off offsets
|
|
var err error
|
|
|
|
for _, broker := range r.config.Brokers {
|
|
var conn *Conn
|
|
|
|
if conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition); err != nil {
|
|
continue
|
|
}
|
|
|
|
deadline, _ := ctx.Deadline()
|
|
conn.SetDeadline(deadline)
|
|
|
|
off.first, off.last, err = conn.ReadOffsets()
|
|
conn.Close()
|
|
|
|
if err == nil {
|
|
break
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
errch <- err
|
|
} else {
|
|
offch <- off
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case off := <-offch:
|
|
switch cur := r.Offset(); {
|
|
case cur == FirstOffset:
|
|
lag = off.last - off.first
|
|
|
|
case cur == LastOffset:
|
|
lag = 0
|
|
|
|
default:
|
|
lag = off.last - cur
|
|
}
|
|
case err = <-errch:
|
|
case <-ctx.Done():
|
|
err = ctx.Err()
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Offset returns the current absolute offset of the reader, or -1
|
|
// if r is backed by a consumer group.
|
|
func (r *Reader) Offset() int64 {
|
|
if r.useConsumerGroup() {
|
|
return -1
|
|
}
|
|
|
|
r.mutex.Lock()
|
|
offset := r.offset
|
|
r.mutex.Unlock()
|
|
r.withLogger(func(log Logger) {
|
|
log.Printf("looking up offset of kafka reader for partition %d of %s: %s", r.config.Partition, r.config.Topic, toHumanOffset(offset))
|
|
})
|
|
return offset
|
|
}
|
|
|
|
// Lag returns the lag of the last message returned by ReadMessage, or -1
|
|
// if r is backed by a consumer group.
|
|
func (r *Reader) Lag() int64 {
|
|
if r.useConsumerGroup() {
|
|
return -1
|
|
}
|
|
|
|
r.mutex.Lock()
|
|
lag := r.lag
|
|
r.mutex.Unlock()
|
|
return lag
|
|
}
|
|
|
|
// SetOffset changes the offset from which the next batch of messages will be
|
|
// read. The method fails with io.ErrClosedPipe if the reader has already been closed.
|
|
//
|
|
// From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first
|
|
// or last available offset in the partition. Please note while -1 and -2 were accepted
|
|
// to indicate the first or last offset in previous versions, the meanings of the numbers
|
|
// were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol
|
|
// specification.
|
|
func (r *Reader) SetOffset(offset int64) error {
|
|
if r.useConsumerGroup() {
|
|
return errNotAvailableWithGroup
|
|
}
|
|
|
|
var err error
|
|
r.mutex.Lock()
|
|
|
|
if r.closed {
|
|
err = io.ErrClosedPipe
|
|
} else if offset != r.offset {
|
|
r.withLogger(func(log Logger) {
|
|
log.Printf("setting the offset of the kafka reader for partition %d of %s from %s to %s",
|
|
r.config.Partition, r.config.Topic, toHumanOffset(r.offset), toHumanOffset(offset))
|
|
})
|
|
r.offset = offset
|
|
|
|
if r.version != 0 {
|
|
r.start(r.getTopicPartitionOffset())
|
|
}
|
|
|
|
r.activateReadLag()
|
|
}
|
|
|
|
r.mutex.Unlock()
|
|
return err
|
|
}
|
|
|
|
// SetOffsetAt changes the offset from which the next batch of messages will be
|
|
// read given the timestamp t.
|
|
//
|
|
// The method fails if the unable to connect partition leader, or unable to read the offset
|
|
// given the ts, or if the reader has been closed.
|
|
func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error {
|
|
r.mutex.Lock()
|
|
if r.closed {
|
|
r.mutex.Unlock()
|
|
return io.ErrClosedPipe
|
|
}
|
|
r.mutex.Unlock()
|
|
|
|
if len(r.config.Brokers) < 1 {
|
|
return errors.New("no brokers in config")
|
|
}
|
|
var conn *Conn
|
|
var err error
|
|
for _, broker := range r.config.Brokers {
|
|
conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
deadline, _ := ctx.Deadline()
|
|
conn.SetDeadline(deadline)
|
|
offset, err := conn.ReadOffset(t)
|
|
conn.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return r.SetOffset(offset)
|
|
}
|
|
return fmt.Errorf("error dialing all brokers, one of the errors: %w", err)
|
|
}
|
|
|
|
// Stats returns a snapshot of the reader stats since the last time the method
|
|
// was called, or since the reader was created if it is called for the first
|
|
// time.
|
|
//
|
|
// A typical use of this method is to spawn a goroutine that will periodically
|
|
// call Stats on a kafka reader and report the metrics to a stats collection
|
|
// system.
|
|
func (r *Reader) Stats() ReaderStats {
|
|
stats := ReaderStats{
|
|
Dials: r.stats.dials.snapshot(),
|
|
Fetches: r.stats.fetches.snapshot(),
|
|
Messages: r.stats.messages.snapshot(),
|
|
Bytes: r.stats.bytes.snapshot(),
|
|
Rebalances: r.stats.rebalances.snapshot(),
|
|
Timeouts: r.stats.timeouts.snapshot(),
|
|
Errors: r.stats.errors.snapshot(),
|
|
DialTime: r.stats.dialTime.snapshotDuration(),
|
|
ReadTime: r.stats.readTime.snapshotDuration(),
|
|
WaitTime: r.stats.waitTime.snapshotDuration(),
|
|
FetchSize: r.stats.fetchSize.snapshot(),
|
|
FetchBytes: r.stats.fetchBytes.snapshot(),
|
|
Offset: r.stats.offset.snapshot(),
|
|
Lag: r.stats.lag.snapshot(),
|
|
MinBytes: int64(r.config.MinBytes),
|
|
MaxBytes: int64(r.config.MaxBytes),
|
|
MaxWait: r.config.MaxWait,
|
|
QueueLength: int64(len(r.msgs)),
|
|
QueueCapacity: int64(cap(r.msgs)),
|
|
ClientID: r.config.Dialer.ClientID,
|
|
Topic: r.config.Topic,
|
|
Partition: r.stats.partition,
|
|
}
|
|
// TODO: remove when we get rid of the deprecated field.
|
|
stats.DeprecatedFetchesWithTypo = stats.Fetches
|
|
return stats
|
|
}
|
|
|
|
func (r *Reader) getTopicPartitionOffset() map[topicPartition]int64 {
|
|
key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)}
|
|
return map[topicPartition]int64{key: r.offset}
|
|
}
|
|
|
|
func (r *Reader) withLogger(do func(Logger)) {
|
|
if r.config.Logger != nil {
|
|
do(r.config.Logger)
|
|
}
|
|
}
|
|
|
|
func (r *Reader) withErrorLogger(do func(Logger)) {
|
|
if r.config.ErrorLogger != nil {
|
|
do(r.config.ErrorLogger)
|
|
} else {
|
|
r.withLogger(do)
|
|
}
|
|
}
|
|
|
|
func (r *Reader) activateReadLag() {
|
|
if r.config.ReadLagInterval > 0 && atomic.CompareAndSwapUint32(&r.once, 0, 1) {
|
|
// read lag will only be calculated when not using consumer groups
|
|
// todo discuss how capturing read lag should interact with rebalancing
|
|
if !r.useConsumerGroup() {
|
|
go r.readLag(r.stctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Reader) readLag(ctx context.Context) {
|
|
ticker := time.NewTicker(r.config.ReadLagInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
timeout, cancel := context.WithTimeout(ctx, r.config.ReadLagInterval/2)
|
|
lag, err := r.ReadLag(timeout)
|
|
cancel()
|
|
|
|
if err != nil {
|
|
r.stats.errors.observe(1)
|
|
r.withErrorLogger(func(log Logger) {
|
|
log.Printf("kafka reader failed to read lag of partition %d of %s: %s", r.config.Partition, r.config.Topic, err)
|
|
})
|
|
} else {
|
|
r.stats.lag.observe(lag)
|
|
}
|
|
|
|
select {
|
|
case <-ticker.C:
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
|
|
if r.closed {
|
|
// don't start child reader if parent Reader is closed
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
r.cancel() // always cancel the previous reader
|
|
r.cancel = cancel
|
|
r.version++
|
|
|
|
r.join.Add(len(offsetsByPartition))
|
|
for key, offset := range offsetsByPartition {
|
|
go func(ctx context.Context, key topicPartition, offset int64, join *sync.WaitGroup) {
|
|
defer join.Done()
|
|
|
|
(&reader{
|
|
dialer: r.config.Dialer,
|
|
logger: r.config.Logger,
|
|
errorLogger: r.config.ErrorLogger,
|
|
brokers: r.config.Brokers,
|
|
topic: key.topic,
|
|
partition: int(key.partition),
|
|
minBytes: r.config.MinBytes,
|
|
maxBytes: r.config.MaxBytes,
|
|
maxWait: r.config.MaxWait,
|
|
readBatchTimeout: r.config.ReadBatchTimeout,
|
|
backoffDelayMin: r.config.ReadBackoffMin,
|
|
backoffDelayMax: r.config.ReadBackoffMax,
|
|
version: r.version,
|
|
msgs: r.msgs,
|
|
stats: r.stats,
|
|
isolationLevel: r.config.IsolationLevel,
|
|
maxAttempts: r.config.MaxAttempts,
|
|
|
|
// backwards-compatibility flags
|
|
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
|
|
}).run(ctx, offset)
|
|
}(ctx, key, offset, &r.join)
|
|
}
|
|
}
|
|
|
|
// A reader reads messages from kafka and produces them on its channels, it's
|
|
// used as an way to asynchronously fetch messages while the main program reads
|
|
// them using the high level reader API.
|
|
type reader struct {
|
|
dialer *Dialer
|
|
logger Logger
|
|
errorLogger Logger
|
|
brokers []string
|
|
topic string
|
|
partition int
|
|
minBytes int
|
|
maxBytes int
|
|
maxWait time.Duration
|
|
readBatchTimeout time.Duration
|
|
backoffDelayMin time.Duration
|
|
backoffDelayMax time.Duration
|
|
version int64
|
|
msgs chan<- readerMessage
|
|
stats *readerStats
|
|
isolationLevel IsolationLevel
|
|
maxAttempts int
|
|
|
|
offsetOutOfRangeError bool
|
|
}
|
|
|
|
type readerMessage struct {
|
|
version int64
|
|
message Message
|
|
watermark int64
|
|
error error
|
|
}
|
|
|
|
func (r *reader) run(ctx context.Context, offset int64) {
|
|
// This is the reader's main loop, it only ends if the context is canceled
|
|
// and will keep attempting to reader messages otherwise.
|
|
//
|
|
// Retrying indefinitely has the nice side effect of preventing Read calls
|
|
// on the parent reader to block if connection to the kafka server fails,
|
|
// the reader keeps reporting errors on the error channel which will then
|
|
// be surfaced to the program.
|
|
// If the reader wasn't retrying then the program would block indefinitely
|
|
// on a Read call after reading the first error.
|
|
for attempt := 0; true; attempt++ {
|
|
if attempt != 0 {
|
|
if !sleep(ctx, backoff(attempt, r.backoffDelayMin, r.backoffDelayMax)) {
|
|
return
|
|
}
|
|
}
|
|
|
|
r.withLogger(func(log Logger) {
|
|
log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, toHumanOffset(offset))
|
|
})
|
|
|
|
conn, start, err := r.initialize(ctx, offset)
|
|
if err != nil {
|
|
if errors.Is(err, OffsetOutOfRange) {
|
|
if r.offsetOutOfRangeError {
|
|
r.sendError(ctx, err)
|
|
return
|
|
}
|
|
|
|
// This would happen if the requested offset is passed the last
|
|
// offset on the partition leader. In that case we're just going
|
|
// to retry later hoping that enough data has been produced.
|
|
r.withErrorLogger(func(log Logger) {
|
|
log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
|
|
})
|
|
|
|
continue
|
|
}
|
|
|
|
// Perform a configured number of attempts before
|
|
// reporting first errors, this helps mitigate
|
|
// situations where the kafka server is temporarily
|
|
// unavailable.
|
|
if attempt >= r.maxAttempts {
|
|
r.sendError(ctx, err)
|
|
} else {
|
|
r.stats.errors.observe(1)
|
|
r.withErrorLogger(func(log Logger) {
|
|
log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
|
|
})
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Resetting the attempt counter ensures that if a failure occurs after
|
|
// a successful initialization we don't keep increasing the backoff
|
|
// timeout.
|
|
attempt = 0
|
|
|
|
// Now we're sure to have an absolute offset number, may anything happen
|
|
// to the connection we know we'll want to restart from this offset.
|
|
offset = start
|
|
|
|
errcount := 0
|
|
readLoop:
|
|
for {
|
|
if !sleep(ctx, backoff(errcount, r.backoffDelayMin, r.backoffDelayMax)) {
|
|
conn.Close()
|
|
return
|
|
}
|
|
|
|
offset, err = r.read(ctx, offset, conn)
|
|
switch {
|
|
case err == nil:
|
|
errcount = 0
|
|
continue
|
|
|
|
case errors.Is(err, io.EOF):
|
|
// done with this batch of messages...carry on. note that this
|
|
// block relies on the batch repackaging real io.EOF errors as
|
|
// io.UnexpectedEOF. otherwise, we would end up swallowing real
|
|
// errors here.
|
|
errcount = 0
|
|
continue
|
|
|
|
case errors.Is(err, io.ErrNoProgress):
|
|
// This error is returned by the Conn when it believes the connection
|
|
// has been corrupted, so we need to explicitly close it. Since we are
|
|
// explicitly handling it and a retry will pick up, we can suppress the
|
|
// error metrics and logs for this case.
|
|
conn.Close()
|
|
break readLoop
|
|
|
|
case errors.Is(err, UnknownTopicOrPartition):
|
|
r.withErrorLogger(func(log Logger) {
|
|
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, toHumanOffset(offset), r.brokers)
|
|
})
|
|
|
|
conn.Close()
|
|
|
|
// The next call to .initialize will re-establish a connection to the proper
|
|
// topic/partition broker combo.
|
|
r.stats.rebalances.observe(1)
|
|
break readLoop
|
|
|
|
case errors.Is(err, NotLeaderForPartition):
|
|
r.withErrorLogger(func(log Logger) {
|
|
log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, toHumanOffset(offset))
|
|
})
|
|
|
|
conn.Close()
|
|
|
|
// The next call to .initialize will re-establish a connection to the proper
|
|
// partition leader.
|
|
r.stats.rebalances.observe(1)
|
|
break readLoop
|
|
|
|
case errors.Is(err, RequestTimedOut):
|
|
// Timeout on the kafka side, this can be safely retried.
|
|
errcount = 0
|
|
r.withLogger(func(log Logger) {
|
|
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset))
|
|
})
|
|
r.stats.timeouts.observe(1)
|
|
continue
|
|
|
|
case errors.Is(err, OffsetOutOfRange):
|
|
first, last, err := r.readOffsets(conn)
|
|
if err != nil {
|
|
r.withErrorLogger(func(log Logger) {
|
|
log.Printf("the kafka reader got an error while attempting to determine whether it was reading before the first offset or after the last offset of partition %d of %s: %s", r.partition, r.topic, err)
|
|
})
|
|
conn.Close()
|
|
break readLoop
|
|
}
|
|
|
|
switch {
|
|
case offset < first:
|
|
r.withErrorLogger(func(log Logger) {
|
|
log.Printf("the kafka reader is reading before the first offset for partition %d of %s, skipping from offset %d to %d (%d messages)", r.partition, r.topic, toHumanOffset(offset), first, first-offset)
|
|
})
|
|
offset, errcount = first, 0
|
|
continue // retry immediately so we don't keep falling behind due to the backoff
|
|
|
|
case offset < last:
|
|
errcount = 0
|
|
continue // more messages have already become available, retry immediately
|
|
|
|
default:
|
|
// We may be reading past the last offset, will retry later.
|
|
r.withErrorLogger(func(log Logger) {
|
|
log.Printf("the kafka reader is reading passed the last offset for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset))
|
|
})
|
|
}
|
|
|
|
case errors.Is(err, context.Canceled):
|
|
// Another reader has taken over, we can safely quit.
|
|
conn.Close()
|
|
return
|
|
|
|
case errors.Is(err, errUnknownCodec):
|
|
// The compression codec is either unsupported or has not been
|
|
// imported. This is a fatal error b/c the reader cannot
|
|
// proceed.
|
|
r.sendError(ctx, err)
|
|
break readLoop
|
|
|
|
default:
|
|
var kafkaError Error
|
|
if errors.As(err, &kafkaError) {
|
|
r.sendError(ctx, err)
|
|
} else {
|
|
r.withErrorLogger(func(log Logger) {
|
|
log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, toHumanOffset(offset), err)
|
|
})
|
|
r.stats.errors.observe(1)
|
|
conn.Close()
|
|
break readLoop
|
|
}
|
|
}
|
|
|
|
errcount++
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) {
|
|
for i := 0; i != len(r.brokers) && conn == nil; i++ {
|
|
broker := r.brokers[i]
|
|
var first, last int64
|
|
|
|
t0 := time.Now()
|
|
conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition)
|
|
t1 := time.Now()
|
|
r.stats.dials.observe(1)
|
|
r.stats.dialTime.observeDuration(t1.Sub(t0))
|
|
|
|
if err != nil {
|
|
continue
|
|
}
|
|
|
|
if first, last, err = r.readOffsets(conn); err != nil {
|
|
conn.Close()
|
|
conn = nil
|
|
break
|
|
}
|
|
|
|
switch {
|
|
case offset == FirstOffset:
|
|
offset = first
|
|
|
|
case offset == LastOffset:
|
|
offset = last
|
|
|
|
case offset < first:
|
|
offset = first
|
|
}
|
|
|
|
r.withLogger(func(log Logger) {
|
|
log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, toHumanOffset(offset))
|
|
})
|
|
|
|
if start, err = conn.Seek(offset, SeekAbsolute); err != nil {
|
|
conn.Close()
|
|
conn = nil
|
|
break
|
|
}
|
|
|
|
conn.SetDeadline(time.Time{})
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
|
|
r.stats.fetches.observe(1)
|
|
r.stats.offset.observe(offset)
|
|
|
|
t0 := time.Now()
|
|
conn.SetReadDeadline(t0.Add(r.maxWait))
|
|
|
|
batch := conn.ReadBatchWith(ReadBatchConfig{
|
|
MinBytes: r.minBytes,
|
|
MaxBytes: r.maxBytes,
|
|
IsolationLevel: r.isolationLevel,
|
|
})
|
|
highWaterMark := batch.HighWaterMark()
|
|
|
|
t1 := time.Now()
|
|
r.stats.waitTime.observeDuration(t1.Sub(t0))
|
|
|
|
var msg Message
|
|
var err error
|
|
var size int64
|
|
var bytes int64
|
|
|
|
for {
|
|
conn.SetReadDeadline(time.Now().Add(r.readBatchTimeout))
|
|
|
|
if msg, err = batch.ReadMessage(); err != nil {
|
|
batch.Close()
|
|
break
|
|
}
|
|
|
|
n := int64(len(msg.Key) + len(msg.Value))
|
|
r.stats.messages.observe(1)
|
|
r.stats.bytes.observe(n)
|
|
|
|
if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
|
|
batch.Close()
|
|
break
|
|
}
|
|
|
|
offset = msg.Offset + 1
|
|
r.stats.offset.observe(offset)
|
|
r.stats.lag.observe(highWaterMark - offset)
|
|
|
|
size++
|
|
bytes += n
|
|
}
|
|
|
|
conn.SetReadDeadline(time.Time{})
|
|
|
|
t2 := time.Now()
|
|
r.stats.readTime.observeDuration(t2.Sub(t1))
|
|
r.stats.fetchSize.observe(size)
|
|
r.stats.fetchBytes.observe(bytes)
|
|
return offset, err
|
|
}
|
|
|
|
func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) {
|
|
conn.SetDeadline(time.Now().Add(10 * time.Second))
|
|
return conn.ReadOffsets()
|
|
}
|
|
|
|
func (r *reader) sendMessage(ctx context.Context, msg Message, watermark int64) error {
|
|
select {
|
|
case r.msgs <- readerMessage{version: r.version, message: msg, watermark: watermark}:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (r *reader) sendError(ctx context.Context, err error) error {
|
|
select {
|
|
case r.msgs <- readerMessage{version: r.version, error: err}:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (r *reader) withLogger(do func(Logger)) {
|
|
if r.logger != nil {
|
|
do(r.logger)
|
|
}
|
|
}
|
|
|
|
func (r *reader) withErrorLogger(do func(Logger)) {
|
|
if r.errorLogger != nil {
|
|
do(r.errorLogger)
|
|
} else {
|
|
r.withLogger(do)
|
|
}
|
|
}
|
|
|
|
// extractTopics returns the unique list of topics represented by the set of
|
|
// provided members.
|
|
func extractTopics(members []GroupMember) []string {
|
|
visited := map[string]struct{}{}
|
|
var topics []string
|
|
|
|
for _, member := range members {
|
|
for _, topic := range member.Topics {
|
|
if _, seen := visited[topic]; seen {
|
|
continue
|
|
}
|
|
|
|
topics = append(topics, topic)
|
|
visited[topic] = struct{}{}
|
|
}
|
|
}
|
|
|
|
sort.Strings(topics)
|
|
|
|
return topics
|
|
}
|
|
|
|
type humanOffset int64
|
|
|
|
func toHumanOffset(v int64) humanOffset {
|
|
return humanOffset(v)
|
|
}
|
|
|
|
func (offset humanOffset) Format(w fmt.State, _ rune) {
|
|
v := int64(offset)
|
|
switch v {
|
|
case FirstOffset:
|
|
fmt.Fprint(w, "first offset")
|
|
case LastOffset:
|
|
fmt.Fprint(w, "last offset")
|
|
default:
|
|
fmt.Fprint(w, strconv.FormatInt(v, 10))
|
|
}
|
|
}
|
|
|