You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

1619 lines
46 KiB

package kafka
import (
"context"
"errors"
"fmt"
"io"
"math"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
)
const (
LastOffset int64 = -1 // The most recent offset available for a partition.
FirstOffset int64 = -2 // The least recent offset available for a partition.
)
const (
// defaultCommitRetries holds the number commit attempts to make
// before giving up.
defaultCommitRetries = 3
)
const (
// defaultFetchMinBytes of 1 byte means that fetch requests are answered as
// soon as a single byte of data is available or the fetch request times out
// waiting for data to arrive.
defaultFetchMinBytes = 1
)
var (
errOnlyAvailableWithGroup = errors.New("unavailable when GroupID is not set")
errNotAvailableWithGroup = errors.New("unavailable when GroupID is set")
)
const (
// defaultReadBackoffMax/Min sets the boundaries for how long the reader wait before
// polling for new messages.
defaultReadBackoffMin = 100 * time.Millisecond
defaultReadBackoffMax = 1 * time.Second
)
// Reader provides a high-level API for consuming messages from kafka.
//
// A Reader automatically manages reconnections to a kafka server, and
// blocking methods have context support for asynchronous cancellations.
//
// Note that it is important to call `Close()` on a `Reader` when a process exits.
// The kafka server needs a graceful disconnect to stop it from continuing to
// attempt to send messages to the connected clients. The given example will not
// call `Close()` if the process is terminated with SIGINT (ctrl-c at the shell) or
// SIGTERM (as docker stop or a kubernetes restart does). This can result in a
// delay when a new reader on the same topic connects (e.g. new process started
// or new container running). Use a `signal.Notify` handler to close the reader on
// process shutdown.
type Reader struct {
// immutable fields of the reader
config ReaderConfig
// communication channels between the parent reader and its subreaders
msgs chan readerMessage
// mutable fields of the reader (synchronized on the mutex)
mutex sync.Mutex
join sync.WaitGroup
cancel context.CancelFunc
stop context.CancelFunc
done chan struct{}
commits chan commitRequest
version int64 // version holds the generation of the spawned readers
offset int64
lag int64
closed bool
// Without a group subscription (when Reader.config.GroupID == ""),
// when errors occur, the Reader gets a synthetic readerMessage with
// a non-nil err set. With group subscriptions however, when an error
// occurs in Reader.run, there's no reader running (sic, cf. reader vs.
// Reader) and there's no way to let the high-level methods like
// FetchMessage know that an error indeed occurred. If an error in run
// occurs, it will be non-block-sent to this unbuffered channel, where
// the high-level methods can select{} on it and notify the caller.
runError chan error
// reader stats are all made of atomic values, no need for synchronization.
once uint32
stctx context.Context
// reader stats are all made of atomic values, no need for synchronization.
// Use a pointer to ensure 64-bit alignment of the values.
stats *readerStats
}
// useConsumerGroup indicates whether the Reader is part of a consumer group.
func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" }
func (r *Reader) getTopics() []string {
if len(r.config.GroupTopics) > 0 {
return r.config.GroupTopics[:]
}
return []string{r.config.Topic}
}
// useSyncCommits indicates whether the Reader is configured to perform sync or
// async commits.
func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 }
func (r *Reader) unsubscribe() {
r.cancel()
r.join.Wait()
// it would be interesting to drain the r.msgs channel at this point since
// it will contain buffered messages for partitions that may not be
// re-assigned to this reader in the next consumer group generation.
// however, draining the channel could race with the client calling
// ReadMessage, which could result in messages delivered and/or committed
// with gaps in the offset. for now, we will err on the side of caution and
// potentially have those messages be reprocessed in the next generation by
// another consumer to avoid such a race.
}
func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
offsets := make(map[topicPartition]int64)
for topic, assignments := range allAssignments {
for _, assignment := range assignments {
key := topicPartition{
topic: topic,
partition: int32(assignment.ID),
}
offsets[key] = assignment.Offset
}
}
r.mutex.Lock()
r.start(offsets)
r.mutex.Unlock()
r.withLogger(func(l Logger) {
l.Printf("subscribed to topics and partitions: %+v", offsets)
})
}
// commitOffsetsWithRetry attempts to commit the specified offsets and retries
// up to the specified number of times.
func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash, retries int) (err error) {
const (
backoffDelayMin = 100 * time.Millisecond
backoffDelayMax = 5 * time.Second
)
for attempt := 0; attempt < retries; attempt++ {
if attempt != 0 {
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
return
}
}
if err = gen.CommitOffsets(offsetStash); err == nil {
return
}
}
return // err will not be nil
}
// offsetStash holds offsets by topic => partition => offset.
type offsetStash map[string]map[int]int64
// merge updates the offsetStash with the offsets from the provided messages.
func (o offsetStash) merge(commits []commit) {
for _, c := range commits {
offsetsByPartition, ok := o[c.topic]
if !ok {
offsetsByPartition = map[int]int64{}
o[c.topic] = offsetsByPartition
}
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
offsetsByPartition[c.partition] = c.offset
}
}
}
// reset clears the contents of the offsetStash.
func (o offsetStash) reset() {
for key := range o {
delete(o, key)
}
}
// commitLoopImmediate handles each commit synchronously.
func (r *Reader) commitLoopImmediate(ctx context.Context, gen *Generation) {
offsets := offsetStash{}
for {
select {
case <-ctx.Done():
// drain the commit channel and prepare a single, final commit.
// the commit will combine any outstanding requests and the result
// will be sent back to all the callers of CommitMessages so that
// they can return.
var errchs []chan<- error
for hasCommits := true; hasCommits; {
select {
case req := <-r.commits:
offsets.merge(req.commits)
errchs = append(errchs, req.errch)
default:
hasCommits = false
}
}
err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
for _, errch := range errchs {
// NOTE : this will be a buffered channel and will not block.
errch <- err
}
return
case req := <-r.commits:
offsets.merge(req.commits)
req.errch <- r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries)
offsets.reset()
}
}
}
// commitLoopInterval handles each commit asynchronously with a period defined
// by ReaderConfig.CommitInterval.
func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) {
ticker := time.NewTicker(r.config.CommitInterval)
defer ticker.Stop()
// the offset stash should not survive rebalances b/c the consumer may
// receive new assignments.
offsets := offsetStash{}
commit := func() {
if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil {
r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) })
} else {
offsets.reset()
}
}
for {
select {
case <-ctx.Done():
// drain the commit channel in order to prepare the final commit.
for hasCommits := true; hasCommits; {
select {
case req := <-r.commits:
offsets.merge(req.commits)
default:
hasCommits = false
}
}
commit()
return
case <-ticker.C:
commit()
case req := <-r.commits:
offsets.merge(req.commits)
}
}
}
// commitLoop processes commits off the commit chan.
func (r *Reader) commitLoop(ctx context.Context, gen *Generation) {
r.withLogger(func(l Logger) {
l.Printf("started commit for group %s\n", r.config.GroupID)
})
defer r.withLogger(func(l Logger) {
l.Printf("stopped commit for group %s\n", r.config.GroupID)
})
if r.useSyncCommits() {
r.commitLoopImmediate(ctx, gen)
} else {
r.commitLoopInterval(ctx, gen)
}
}
// run provides the main consumer group management loop. Each iteration performs the
// handshake to join the Reader to the consumer group.
//
// This function is responsible for closing the consumer group upon exit.
func (r *Reader) run(cg *ConsumerGroup) {
defer close(r.done)
defer cg.Close()
r.withLogger(func(l Logger) {
l.Printf("entering loop for consumer group, %v\n", r.config.GroupID)
})
for {
// Limit the number of attempts at waiting for the next
// consumer generation.
var err error
var gen *Generation
for attempt := 1; attempt <= r.config.MaxAttempts; attempt++ {
gen, err = cg.Next(r.stctx)
if err == nil {
break
}
if errors.Is(err, r.stctx.Err()) {
return
}
r.stats.errors.observe(1)
r.withErrorLogger(func(l Logger) {
l.Printf(err.Error())
})
// Continue with next attempt...
}
if err != nil {
// All attempts have failed.
select {
case r.runError <- err:
// If somebody's receiving on the runError, let
// them know the error occurred.
default:
// Otherwise, don't block to allow healing.
}
continue
}
r.stats.rebalances.observe(1)
r.subscribe(gen.Assignments)
gen.Start(func(ctx context.Context) {
r.commitLoop(ctx, gen)
})
gen.Start(func(ctx context.Context) {
// wait for the generation to end and then unsubscribe.
select {
case <-ctx.Done():
// continue to next generation
case <-r.stctx.Done():
// this will be the last loop because the reader is closed.
}
r.unsubscribe()
})
}
}
// ReaderConfig is a configuration object used to create new instances of
// Reader.
type ReaderConfig struct {
// The list of broker addresses used to connect to the kafka cluster.
Brokers []string
// GroupID holds the optional consumer group id. If GroupID is specified, then
// Partition should NOT be specified e.g. 0
GroupID string
// GroupTopics allows specifying multiple topics, but can only be used in
// combination with GroupID, as it is a consumer-group feature. As such, if
// GroupID is set, then either Topic or GroupTopics must be defined.
GroupTopics []string
// The topic to read messages from.
Topic string
// Partition to read messages from. Either Partition or GroupID may
// be assigned, but not both
Partition int
// An dialer used to open connections to the kafka server. This field is
// optional, if nil, the default dialer is used instead.
Dialer *Dialer
// The capacity of the internal message queue, defaults to 100 if none is
// set.
QueueCapacity int
// MinBytes indicates to the broker the minimum batch size that the consumer
// will accept. Setting a high minimum when consuming from a low-volume topic
// may result in delayed delivery when the broker does not have enough data to
// satisfy the defined minimum.
//
// Default: 1
MinBytes int
// MaxBytes indicates to the broker the maximum batch size that the consumer
// will accept. The broker will truncate a message to satisfy this maximum, so
// choose a value that is high enough for your largest message size.
//
// Default: 1MB
MaxBytes int
// Maximum amount of time to wait for new data to come when fetching batches
// of messages from kafka.
//
// Default: 10s
MaxWait time.Duration
// ReadBatchTimeout amount of time to wait to fetch message from kafka messages batch.
//
// Default: 10s
ReadBatchTimeout time.Duration
// ReadLagInterval sets the frequency at which the reader lag is updated.
// Setting this field to a negative value disables lag reporting.
ReadLagInterval time.Duration
// GroupBalancers is the priority-ordered list of client-side consumer group
// balancing strategies that will be offered to the coordinator. The first
// strategy that all group members support will be chosen by the leader.
//
// Default: [Range, RoundRobin]
//
// Only used when GroupID is set
GroupBalancers []GroupBalancer
// HeartbeatInterval sets the optional frequency at which the reader sends the consumer
// group heartbeat update.
//
// Default: 3s
//
// Only used when GroupID is set
HeartbeatInterval time.Duration
// CommitInterval indicates the interval at which offsets are committed to
// the broker. If 0, commits will be handled synchronously.
//
// Default: 0
//
// Only used when GroupID is set
CommitInterval time.Duration
// PartitionWatchInterval indicates how often a reader checks for partition changes.
// If a reader sees a partition change (such as a partition add) it will rebalance the group
// picking up new partitions.
//
// Default: 5s
//
// Only used when GroupID is set and WatchPartitionChanges is set.
PartitionWatchInterval time.Duration
// WatchForPartitionChanges is used to inform kafka-go that a consumer group should be
// polling the brokers and rebalancing if any partition changes happen to the topic.
WatchPartitionChanges bool
// SessionTimeout optionally sets the length of time that may pass without a heartbeat
// before the coordinator considers the consumer dead and initiates a rebalance.
//
// Default: 30s
//
// Only used when GroupID is set
SessionTimeout time.Duration
// RebalanceTimeout optionally sets the length of time the coordinator will wait
// for members to join as part of a rebalance. For kafka servers under higher
// load, it may be useful to set this value higher.
//
// Default: 30s
//
// Only used when GroupID is set
RebalanceTimeout time.Duration
// JoinGroupBackoff optionally sets the length of time to wait between re-joining
// the consumer group after an error.
//
// Default: 5s
JoinGroupBackoff time.Duration
// RetentionTime optionally sets the length of time the consumer group will be saved
// by the broker
//
// Default: 24h
//
// Only used when GroupID is set
RetentionTime time.Duration
// StartOffset determines from whence the consumer group should begin
// consuming when it finds a partition without a committed offset. If
// non-zero, it must be set to one of FirstOffset or LastOffset.
//
// Default: FirstOffset
//
// Only used when GroupID is set
StartOffset int64
// BackoffDelayMin optionally sets the smallest amount of time the reader will wait before
// polling for new messages
//
// Default: 100ms
ReadBackoffMin time.Duration
// BackoffDelayMax optionally sets the maximum amount of time the reader will wait before
// polling for new messages
//
// Default: 1s
ReadBackoffMax time.Duration
// If not nil, specifies a logger used to report internal changes within the
// reader.
Logger Logger
// ErrorLogger is the logger used to report errors. If nil, the reader falls
// back to using Logger instead.
ErrorLogger Logger
// IsolationLevel controls the visibility of transactional records.
// ReadUncommitted makes all records visible. With ReadCommitted only
// non-transactional and committed records are visible.
IsolationLevel IsolationLevel
// Limit of how many attempts will be made before delivering the error.
//
// The default is to try 3 times.
MaxAttempts int
// OffsetOutOfRangeError indicates that the reader should return an error in
// the event of an OffsetOutOfRange error, rather than retrying indefinitely.
// This flag is being added to retain backwards-compatibility, so it will be
// removed in a future version of kafka-go.
OffsetOutOfRangeError bool
}
// Validate method validates ReaderConfig properties.
func (config *ReaderConfig) Validate() error {
if len(config.Brokers) == 0 {
return errors.New("cannot create a new kafka reader with an empty list of broker addresses")
}
if config.Partition < 0 || config.Partition >= math.MaxInt32 {
return fmt.Errorf("partition number out of bounds: %d", config.Partition)
}
if config.MinBytes < 0 {
return fmt.Errorf("invalid negative minimum batch size (min = %d)", config.MinBytes)
}
if config.MaxBytes < 0 {
return fmt.Errorf("invalid negative maximum batch size (max = %d)", config.MaxBytes)
}
if config.GroupID != "" {
if config.Partition != 0 {
return errors.New("either Partition or GroupID may be specified, but not both")
}
if len(config.Topic) == 0 && len(config.GroupTopics) == 0 {
return errors.New("either Topic or GroupTopics must be specified with GroupID")
}
} else if len(config.Topic) == 0 {
return errors.New("cannot create a new kafka reader with an empty topic")
}
if config.MinBytes > config.MaxBytes {
return fmt.Errorf("minimum batch size greater than the maximum (min = %d, max = %d)", config.MinBytes, config.MaxBytes)
}
if config.ReadBackoffMax < 0 {
return fmt.Errorf("ReadBackoffMax out of bounds: %d", config.ReadBackoffMax)
}
if config.ReadBackoffMin < 0 {
return fmt.Errorf("ReadBackoffMin out of bounds: %d", config.ReadBackoffMin)
}
return nil
}
// ReaderStats is a data structure returned by a call to Reader.Stats that exposes
// details about the behavior of the reader.
type ReaderStats struct {
Dials int64 `metric:"kafka.reader.dial.count" type:"counter"`
Fetches int64 `metric:"kafka.reader.fetch.count" type:"counter"`
Messages int64 `metric:"kafka.reader.message.count" type:"counter"`
Bytes int64 `metric:"kafka.reader.message.bytes" type:"counter"`
Rebalances int64 `metric:"kafka.reader.rebalance.count" type:"counter"`
Timeouts int64 `metric:"kafka.reader.timeout.count" type:"counter"`
Errors int64 `metric:"kafka.reader.error.count" type:"counter"`
DialTime DurationStats `metric:"kafka.reader.dial.seconds"`
ReadTime DurationStats `metric:"kafka.reader.read.seconds"`
WaitTime DurationStats `metric:"kafka.reader.wait.seconds"`
FetchSize SummaryStats `metric:"kafka.reader.fetch.size"`
FetchBytes SummaryStats `metric:"kafka.reader.fetch.bytes"`
Offset int64 `metric:"kafka.reader.offset" type:"gauge"`
Lag int64 `metric:"kafka.reader.lag" type:"gauge"`
MinBytes int64 `metric:"kafka.reader.fetch_bytes.min" type:"gauge"`
MaxBytes int64 `metric:"kafka.reader.fetch_bytes.max" type:"gauge"`
MaxWait time.Duration `metric:"kafka.reader.fetch_wait.max" type:"gauge"`
QueueLength int64 `metric:"kafka.reader.queue.length" type:"gauge"`
QueueCapacity int64 `metric:"kafka.reader.queue.capacity" type:"gauge"`
ClientID string `tag:"client_id"`
Topic string `tag:"topic"`
Partition string `tag:"partition"`
// The original `Fetches` field had a typo where the metric name was called
// "kafak..." instead of "kafka...", in order to offer time to fix monitors
// that may be relying on this mistake we are temporarily introducing this
// field.
DeprecatedFetchesWithTypo int64 `metric:"kafak.reader.fetch.count" type:"counter"`
}
// readerStats is a struct that contains statistics on a reader.
type readerStats struct {
dials counter
fetches counter
messages counter
bytes counter
rebalances counter
timeouts counter
errors counter
dialTime summary
readTime summary
waitTime summary
fetchSize summary
fetchBytes summary
offset gauge
lag gauge
partition string
}
// NewReader creates and returns a new Reader configured with config.
// The offset is initialized to FirstOffset.
func NewReader(config ReaderConfig) *Reader {
if err := config.Validate(); err != nil {
panic(err)
}
if config.GroupID != "" {
if len(config.GroupBalancers) == 0 {
config.GroupBalancers = []GroupBalancer{
RangeGroupBalancer{},
RoundRobinGroupBalancer{},
}
}
}
if config.Dialer == nil {
config.Dialer = DefaultDialer
}
if config.MaxBytes == 0 {
config.MaxBytes = 1e6 // 1 MB
}
if config.MinBytes == 0 {
config.MinBytes = defaultFetchMinBytes
}
if config.MaxWait == 0 {
config.MaxWait = 10 * time.Second
}
if config.ReadBatchTimeout == 0 {
config.ReadBatchTimeout = 10 * time.Second
}
if config.ReadLagInterval == 0 {
config.ReadLagInterval = 1 * time.Minute
}
if config.ReadBackoffMin == 0 {
config.ReadBackoffMin = defaultReadBackoffMin
}
if config.ReadBackoffMax == 0 {
config.ReadBackoffMax = defaultReadBackoffMax
}
if config.ReadBackoffMax < config.ReadBackoffMin {
panic(fmt.Errorf("ReadBackoffMax %d smaller than ReadBackoffMin %d", config.ReadBackoffMax, config.ReadBackoffMin))
}
if config.QueueCapacity == 0 {
config.QueueCapacity = 100
}
if config.MaxAttempts == 0 {
config.MaxAttempts = 3
}
// when configured as a consumer group; stats should report a partition of -1
readerStatsPartition := config.Partition
if config.GroupID != "" {
readerStatsPartition = -1
}
// when configured as a consume group, start version as 1 to ensure that only
// the rebalance function will start readers
version := int64(0)
if config.GroupID != "" {
version = 1
}
stctx, stop := context.WithCancel(context.Background())
r := &Reader{
config: config,
msgs: make(chan readerMessage, config.QueueCapacity),
cancel: func() {},
commits: make(chan commitRequest, config.QueueCapacity),
stop: stop,
offset: FirstOffset,
stctx: stctx,
stats: &readerStats{
dialTime: makeSummary(),
readTime: makeSummary(),
waitTime: makeSummary(),
fetchSize: makeSummary(),
fetchBytes: makeSummary(),
// Generate the string representation of the partition number only
// once when the reader is created.
partition: strconv.Itoa(readerStatsPartition),
},
version: version,
}
if r.useConsumerGroup() {
r.done = make(chan struct{})
r.runError = make(chan error)
cg, err := NewConsumerGroup(ConsumerGroupConfig{
ID: r.config.GroupID,
Brokers: r.config.Brokers,
Dialer: r.config.Dialer,
Topics: r.getTopics(),
GroupBalancers: r.config.GroupBalancers,
HeartbeatInterval: r.config.HeartbeatInterval,
PartitionWatchInterval: r.config.PartitionWatchInterval,
WatchPartitionChanges: r.config.WatchPartitionChanges,
SessionTimeout: r.config.SessionTimeout,
RebalanceTimeout: r.config.RebalanceTimeout,
JoinGroupBackoff: r.config.JoinGroupBackoff,
RetentionTime: r.config.RetentionTime,
StartOffset: r.config.StartOffset,
Logger: r.config.Logger,
ErrorLogger: r.config.ErrorLogger,
})
if err != nil {
panic(err)
}
go r.run(cg)
}
return r
}
// Config returns the reader's configuration.
func (r *Reader) Config() ReaderConfig {
return r.config
}
// Close closes the stream, preventing the program from reading any more
// messages from it.
func (r *Reader) Close() error {
atomic.StoreUint32(&r.once, 1)
r.mutex.Lock()
closed := r.closed
r.closed = true
r.mutex.Unlock()
r.cancel()
r.stop()
r.join.Wait()
if r.done != nil {
<-r.done
}
if !closed {
close(r.msgs)
}
return nil
}
// ReadMessage reads and return the next message from the r. The method call
// blocks until a message becomes available, or an error occurs. The program
// may also specify a context to asynchronously cancel the blocking operation.
//
// The method returns io.EOF to indicate that the reader has been closed.
//
// If consumer groups are used, ReadMessage will automatically commit the
// offset when called. Note that this could result in an offset being committed
// before the message is fully processed.
//
// If more fine grained control of when offsets are committed is required, it
// is recommended to use FetchMessage with CommitMessages instead.
func (r *Reader) ReadMessage(ctx context.Context) (Message, error) {
m, err := r.FetchMessage(ctx)
if err != nil {
return Message{}, err
}
if r.useConsumerGroup() {
if err := r.CommitMessages(ctx, m); err != nil {
return Message{}, err
}
}
return m, nil
}
// FetchMessage reads and return the next message from the r. The method call
// blocks until a message becomes available, or an error occurs. The program
// may also specify a context to asynchronously cancel the blocking operation.
//
// The method returns io.EOF to indicate that the reader has been closed.
//
// FetchMessage does not commit offsets automatically when using consumer groups.
// Use CommitMessages to commit the offset.
func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
r.activateReadLag()
for {
r.mutex.Lock()
if !r.closed && r.version == 0 {
r.start(r.getTopicPartitionOffset())
}
version := r.version
r.mutex.Unlock()
select {
case <-ctx.Done():
return Message{}, ctx.Err()
case err := <-r.runError:
return Message{}, err
case m, ok := <-r.msgs:
if !ok {
return Message{}, io.EOF
}
if m.version >= version {
r.mutex.Lock()
switch {
case m.error != nil:
case version == r.version:
r.offset = m.message.Offset + 1
r.lag = m.watermark - r.offset
}
r.mutex.Unlock()
if errors.Is(m.error, io.EOF) {
// io.EOF is used as a marker to indicate that the stream
// has been closed, in case it was received from the inner
// reader we don't want to confuse the program and replace
// the error with io.ErrUnexpectedEOF.
m.error = io.ErrUnexpectedEOF
}
return m.message, m.error
}
}
}
}
// CommitMessages commits the list of messages passed as argument. The program
// may pass a context to asynchronously cancel the commit operation when it was
// configured to be blocking.
//
// Because kafka consumer groups track a single offset per partition, the
// highest message offset passed to CommitMessages will cause all previous
// messages to be committed. Applications need to account for these Kafka
// limitations when committing messages, and maintain message ordering if they
// need strong delivery guarantees. This property makes it valid to pass only
// the last message seen to CommitMessages in order to move the offset of the
// topic/partition it belonged to forward, effectively committing all previous
// messages in the partition.
func (r *Reader) CommitMessages(ctx context.Context, msgs ...Message) error {
if !r.useConsumerGroup() {
return errOnlyAvailableWithGroup
}
var errch <-chan error
creq := commitRequest{
commits: makeCommits(msgs...),
}
if r.useSyncCommits() {
ch := make(chan error, 1)
errch, creq.errch = ch, ch
}
select {
case r.commits <- creq:
case <-ctx.Done():
return ctx.Err()
case <-r.stctx.Done():
// This context is used to ensure we don't allow commits after the
// reader was closed.
return io.ErrClosedPipe
}
if !r.useSyncCommits() {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errch:
return err
}
}
// ReadLag returns the current lag of the reader by fetching the last offset of
// the topic and partition and computing the difference between that value and
// the offset of the last message returned by ReadMessage.
//
// This method is intended to be used in cases where a program may be unable to
// call ReadMessage to update the value returned by Lag, but still needs to get
// an up to date estimation of how far behind the reader is. For example when
// the consumer is not ready to process the next message.
//
// The function returns a lag of zero when the reader's current offset is
// negative.
func (r *Reader) ReadLag(ctx context.Context) (lag int64, err error) {
if r.useConsumerGroup() {
return 0, errNotAvailableWithGroup
}
type offsets struct {
first int64
last int64
}
offch := make(chan offsets, 1)
errch := make(chan error, 1)
go func() {
var off offsets
var err error
for _, broker := range r.config.Brokers {
var conn *Conn
if conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition); err != nil {
continue
}
deadline, _ := ctx.Deadline()
conn.SetDeadline(deadline)
off.first, off.last, err = conn.ReadOffsets()
conn.Close()
if err == nil {
break
}
}
if err != nil {
errch <- err
} else {
offch <- off
}
}()
select {
case off := <-offch:
switch cur := r.Offset(); {
case cur == FirstOffset:
lag = off.last - off.first
case cur == LastOffset:
lag = 0
default:
lag = off.last - cur
}
case err = <-errch:
case <-ctx.Done():
err = ctx.Err()
}
return
}
// Offset returns the current absolute offset of the reader, or -1
// if r is backed by a consumer group.
func (r *Reader) Offset() int64 {
if r.useConsumerGroup() {
return -1
}
r.mutex.Lock()
offset := r.offset
r.mutex.Unlock()
r.withLogger(func(log Logger) {
log.Printf("looking up offset of kafka reader for partition %d of %s: %s", r.config.Partition, r.config.Topic, toHumanOffset(offset))
})
return offset
}
// Lag returns the lag of the last message returned by ReadMessage, or -1
// if r is backed by a consumer group.
func (r *Reader) Lag() int64 {
if r.useConsumerGroup() {
return -1
}
r.mutex.Lock()
lag := r.lag
r.mutex.Unlock()
return lag
}
// SetOffset changes the offset from which the next batch of messages will be
// read. The method fails with io.ErrClosedPipe if the reader has already been closed.
//
// From version 0.2.0, FirstOffset and LastOffset can be used to indicate the first
// or last available offset in the partition. Please note while -1 and -2 were accepted
// to indicate the first or last offset in previous versions, the meanings of the numbers
// were swapped in 0.2.0 to match the meanings in other libraries and the Kafka protocol
// specification.
func (r *Reader) SetOffset(offset int64) error {
if r.useConsumerGroup() {
return errNotAvailableWithGroup
}
var err error
r.mutex.Lock()
if r.closed {
err = io.ErrClosedPipe
} else if offset != r.offset {
r.withLogger(func(log Logger) {
log.Printf("setting the offset of the kafka reader for partition %d of %s from %s to %s",
r.config.Partition, r.config.Topic, toHumanOffset(r.offset), toHumanOffset(offset))
})
r.offset = offset
if r.version != 0 {
r.start(r.getTopicPartitionOffset())
}
r.activateReadLag()
}
r.mutex.Unlock()
return err
}
// SetOffsetAt changes the offset from which the next batch of messages will be
// read given the timestamp t.
//
// The method fails if the unable to connect partition leader, or unable to read the offset
// given the ts, or if the reader has been closed.
func (r *Reader) SetOffsetAt(ctx context.Context, t time.Time) error {
r.mutex.Lock()
if r.closed {
r.mutex.Unlock()
return io.ErrClosedPipe
}
r.mutex.Unlock()
if len(r.config.Brokers) < 1 {
return errors.New("no brokers in config")
}
var conn *Conn
var err error
for _, broker := range r.config.Brokers {
conn, err = r.config.Dialer.DialLeader(ctx, "tcp", broker, r.config.Topic, r.config.Partition)
if err != nil {
continue
}
deadline, _ := ctx.Deadline()
conn.SetDeadline(deadline)
offset, err := conn.ReadOffset(t)
conn.Close()
if err != nil {
return err
}
return r.SetOffset(offset)
}
return fmt.Errorf("error dialing all brokers, one of the errors: %w", err)
}
// Stats returns a snapshot of the reader stats since the last time the method
// was called, or since the reader was created if it is called for the first
// time.
//
// A typical use of this method is to spawn a goroutine that will periodically
// call Stats on a kafka reader and report the metrics to a stats collection
// system.
func (r *Reader) Stats() ReaderStats {
stats := ReaderStats{
Dials: r.stats.dials.snapshot(),
Fetches: r.stats.fetches.snapshot(),
Messages: r.stats.messages.snapshot(),
Bytes: r.stats.bytes.snapshot(),
Rebalances: r.stats.rebalances.snapshot(),
Timeouts: r.stats.timeouts.snapshot(),
Errors: r.stats.errors.snapshot(),
DialTime: r.stats.dialTime.snapshotDuration(),
ReadTime: r.stats.readTime.snapshotDuration(),
WaitTime: r.stats.waitTime.snapshotDuration(),
FetchSize: r.stats.fetchSize.snapshot(),
FetchBytes: r.stats.fetchBytes.snapshot(),
Offset: r.stats.offset.snapshot(),
Lag: r.stats.lag.snapshot(),
MinBytes: int64(r.config.MinBytes),
MaxBytes: int64(r.config.MaxBytes),
MaxWait: r.config.MaxWait,
QueueLength: int64(len(r.msgs)),
QueueCapacity: int64(cap(r.msgs)),
ClientID: r.config.Dialer.ClientID,
Topic: r.config.Topic,
Partition: r.stats.partition,
}
// TODO: remove when we get rid of the deprecated field.
stats.DeprecatedFetchesWithTypo = stats.Fetches
return stats
}
func (r *Reader) getTopicPartitionOffset() map[topicPartition]int64 {
key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)}
return map[topicPartition]int64{key: r.offset}
}
func (r *Reader) withLogger(do func(Logger)) {
if r.config.Logger != nil {
do(r.config.Logger)
}
}
func (r *Reader) withErrorLogger(do func(Logger)) {
if r.config.ErrorLogger != nil {
do(r.config.ErrorLogger)
} else {
r.withLogger(do)
}
}
func (r *Reader) activateReadLag() {
if r.config.ReadLagInterval > 0 && atomic.CompareAndSwapUint32(&r.once, 0, 1) {
// read lag will only be calculated when not using consumer groups
// todo discuss how capturing read lag should interact with rebalancing
if !r.useConsumerGroup() {
go r.readLag(r.stctx)
}
}
}
func (r *Reader) readLag(ctx context.Context) {
ticker := time.NewTicker(r.config.ReadLagInterval)
defer ticker.Stop()
for {
timeout, cancel := context.WithTimeout(ctx, r.config.ReadLagInterval/2)
lag, err := r.ReadLag(timeout)
cancel()
if err != nil {
r.stats.errors.observe(1)
r.withErrorLogger(func(log Logger) {
log.Printf("kafka reader failed to read lag of partition %d of %s: %s", r.config.Partition, r.config.Topic, err)
})
} else {
r.stats.lag.observe(lag)
}
select {
case <-ticker.C:
case <-ctx.Done():
return
}
}
}
func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
if r.closed {
// don't start child reader if parent Reader is closed
return
}
ctx, cancel := context.WithCancel(context.Background())
r.cancel() // always cancel the previous reader
r.cancel = cancel
r.version++
r.join.Add(len(offsetsByPartition))
for key, offset := range offsetsByPartition {
go func(ctx context.Context, key topicPartition, offset int64, join *sync.WaitGroup) {
defer join.Done()
(&reader{
dialer: r.config.Dialer,
logger: r.config.Logger,
errorLogger: r.config.ErrorLogger,
brokers: r.config.Brokers,
topic: key.topic,
partition: int(key.partition),
minBytes: r.config.MinBytes,
maxBytes: r.config.MaxBytes,
maxWait: r.config.MaxWait,
readBatchTimeout: r.config.ReadBatchTimeout,
backoffDelayMin: r.config.ReadBackoffMin,
backoffDelayMax: r.config.ReadBackoffMax,
version: r.version,
msgs: r.msgs,
stats: r.stats,
isolationLevel: r.config.IsolationLevel,
maxAttempts: r.config.MaxAttempts,
// backwards-compatibility flags
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
}).run(ctx, offset)
}(ctx, key, offset, &r.join)
}
}
// A reader reads messages from kafka and produces them on its channels, it's
// used as an way to asynchronously fetch messages while the main program reads
// them using the high level reader API.
type reader struct {
dialer *Dialer
logger Logger
errorLogger Logger
brokers []string
topic string
partition int
minBytes int
maxBytes int
maxWait time.Duration
readBatchTimeout time.Duration
backoffDelayMin time.Duration
backoffDelayMax time.Duration
version int64
msgs chan<- readerMessage
stats *readerStats
isolationLevel IsolationLevel
maxAttempts int
offsetOutOfRangeError bool
}
type readerMessage struct {
version int64
message Message
watermark int64
error error
}
func (r *reader) run(ctx context.Context, offset int64) {
// This is the reader's main loop, it only ends if the context is canceled
// and will keep attempting to reader messages otherwise.
//
// Retrying indefinitely has the nice side effect of preventing Read calls
// on the parent reader to block if connection to the kafka server fails,
// the reader keeps reporting errors on the error channel which will then
// be surfaced to the program.
// If the reader wasn't retrying then the program would block indefinitely
// on a Read call after reading the first error.
for attempt := 0; true; attempt++ {
if attempt != 0 {
if !sleep(ctx, backoff(attempt, r.backoffDelayMin, r.backoffDelayMax)) {
return
}
}
r.withLogger(func(log Logger) {
log.Printf("initializing kafka reader for partition %d of %s starting at offset %d", r.partition, r.topic, toHumanOffset(offset))
})
conn, start, err := r.initialize(ctx, offset)
if err != nil {
if errors.Is(err, OffsetOutOfRange) {
if r.offsetOutOfRangeError {
r.sendError(ctx, err)
return
}
// This would happen if the requested offset is passed the last
// offset on the partition leader. In that case we're just going
// to retry later hoping that enough data has been produced.
r.withErrorLogger(func(log Logger) {
log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
})
continue
}
// Perform a configured number of attempts before
// reporting first errors, this helps mitigate
// situations where the kafka server is temporarily
// unavailable.
if attempt >= r.maxAttempts {
r.sendError(ctx, err)
} else {
r.stats.errors.observe(1)
r.withErrorLogger(func(log Logger) {
log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, err)
})
}
continue
}
// Resetting the attempt counter ensures that if a failure occurs after
// a successful initialization we don't keep increasing the backoff
// timeout.
attempt = 0
// Now we're sure to have an absolute offset number, may anything happen
// to the connection we know we'll want to restart from this offset.
offset = start
errcount := 0
readLoop:
for {
if !sleep(ctx, backoff(errcount, r.backoffDelayMin, r.backoffDelayMax)) {
conn.Close()
return
}
offset, err = r.read(ctx, offset, conn)
switch {
case err == nil:
errcount = 0
continue
case errors.Is(err, io.EOF):
// done with this batch of messages...carry on. note that this
// block relies on the batch repackaging real io.EOF errors as
// io.UnexpectedEOF. otherwise, we would end up swallowing real
// errors here.
errcount = 0
continue
case errors.Is(err, io.ErrNoProgress):
// This error is returned by the Conn when it believes the connection
// has been corrupted, so we need to explicitly close it. Since we are
// explicitly handling it and a retry will pick up, we can suppress the
// error metrics and logs for this case.
conn.Close()
break readLoop
case errors.Is(err, UnknownTopicOrPartition):
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, toHumanOffset(offset), r.brokers)
})
conn.Close()
// The next call to .initialize will re-establish a connection to the proper
// topic/partition broker combo.
r.stats.rebalances.observe(1)
break readLoop
case errors.Is(err, NotLeaderForPartition):
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, toHumanOffset(offset))
})
conn.Close()
// The next call to .initialize will re-establish a connection to the proper
// partition leader.
r.stats.rebalances.observe(1)
break readLoop
case errors.Is(err, RequestTimedOut):
// Timeout on the kafka side, this can be safely retried.
errcount = 0
r.withLogger(func(log Logger) {
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset))
})
r.stats.timeouts.observe(1)
continue
case errors.Is(err, OffsetOutOfRange):
first, last, err := r.readOffsets(conn)
if err != nil {
r.withErrorLogger(func(log Logger) {
log.Printf("the kafka reader got an error while attempting to determine whether it was reading before the first offset or after the last offset of partition %d of %s: %s", r.partition, r.topic, err)
})
conn.Close()
break readLoop
}
switch {
case offset < first:
r.withErrorLogger(func(log Logger) {
log.Printf("the kafka reader is reading before the first offset for partition %d of %s, skipping from offset %d to %d (%d messages)", r.partition, r.topic, toHumanOffset(offset), first, first-offset)
})
offset, errcount = first, 0
continue // retry immediately so we don't keep falling behind due to the backoff
case offset < last:
errcount = 0
continue // more messages have already become available, retry immediately
default:
// We may be reading past the last offset, will retry later.
r.withErrorLogger(func(log Logger) {
log.Printf("the kafka reader is reading passed the last offset for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset))
})
}
case errors.Is(err, context.Canceled):
// Another reader has taken over, we can safely quit.
conn.Close()
return
case errors.Is(err, errUnknownCodec):
// The compression codec is either unsupported or has not been
// imported. This is a fatal error b/c the reader cannot
// proceed.
r.sendError(ctx, err)
break readLoop
default:
var kafkaError Error
if errors.As(err, &kafkaError) {
r.sendError(ctx, err)
} else {
r.withErrorLogger(func(log Logger) {
log.Printf("the kafka reader got an unknown error reading partition %d of %s at offset %d: %s", r.partition, r.topic, toHumanOffset(offset), err)
})
r.stats.errors.observe(1)
conn.Close()
break readLoop
}
}
errcount++
}
}
}
func (r *reader) initialize(ctx context.Context, offset int64) (conn *Conn, start int64, err error) {
for i := 0; i != len(r.brokers) && conn == nil; i++ {
broker := r.brokers[i]
var first, last int64
t0 := time.Now()
conn, err = r.dialer.DialLeader(ctx, "tcp", broker, r.topic, r.partition)
t1 := time.Now()
r.stats.dials.observe(1)
r.stats.dialTime.observeDuration(t1.Sub(t0))
if err != nil {
continue
}
if first, last, err = r.readOffsets(conn); err != nil {
conn.Close()
conn = nil
break
}
switch {
case offset == FirstOffset:
offset = first
case offset == LastOffset:
offset = last
case offset < first:
offset = first
}
r.withLogger(func(log Logger) {
log.Printf("the kafka reader for partition %d of %s is seeking to offset %d", r.partition, r.topic, toHumanOffset(offset))
})
if start, err = conn.Seek(offset, SeekAbsolute); err != nil {
conn.Close()
conn = nil
break
}
conn.SetDeadline(time.Time{})
}
return
}
func (r *reader) read(ctx context.Context, offset int64, conn *Conn) (int64, error) {
r.stats.fetches.observe(1)
r.stats.offset.observe(offset)
t0 := time.Now()
conn.SetReadDeadline(t0.Add(r.maxWait))
batch := conn.ReadBatchWith(ReadBatchConfig{
MinBytes: r.minBytes,
MaxBytes: r.maxBytes,
IsolationLevel: r.isolationLevel,
})
highWaterMark := batch.HighWaterMark()
t1 := time.Now()
r.stats.waitTime.observeDuration(t1.Sub(t0))
var msg Message
var err error
var size int64
var bytes int64
for {
conn.SetReadDeadline(time.Now().Add(r.readBatchTimeout))
if msg, err = batch.ReadMessage(); err != nil {
batch.Close()
break
}
n := int64(len(msg.Key) + len(msg.Value))
r.stats.messages.observe(1)
r.stats.bytes.observe(n)
if err = r.sendMessage(ctx, msg, highWaterMark); err != nil {
batch.Close()
break
}
offset = msg.Offset + 1
r.stats.offset.observe(offset)
r.stats.lag.observe(highWaterMark - offset)
size++
bytes += n
}
conn.SetReadDeadline(time.Time{})
t2 := time.Now()
r.stats.readTime.observeDuration(t2.Sub(t1))
r.stats.fetchSize.observe(size)
r.stats.fetchBytes.observe(bytes)
return offset, err
}
func (r *reader) readOffsets(conn *Conn) (first, last int64, err error) {
conn.SetDeadline(time.Now().Add(10 * time.Second))
return conn.ReadOffsets()
}
func (r *reader) sendMessage(ctx context.Context, msg Message, watermark int64) error {
select {
case r.msgs <- readerMessage{version: r.version, message: msg, watermark: watermark}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (r *reader) sendError(ctx context.Context, err error) error {
select {
case r.msgs <- readerMessage{version: r.version, error: err}:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (r *reader) withLogger(do func(Logger)) {
if r.logger != nil {
do(r.logger)
}
}
func (r *reader) withErrorLogger(do func(Logger)) {
if r.errorLogger != nil {
do(r.errorLogger)
} else {
r.withLogger(do)
}
}
// extractTopics returns the unique list of topics represented by the set of
// provided members.
func extractTopics(members []GroupMember) []string {
visited := map[string]struct{}{}
var topics []string
for _, member := range members {
for _, topic := range member.Topics {
if _, seen := visited[topic]; seen {
continue
}
topics = append(topics, topic)
visited[topic] = struct{}{}
}
}
sort.Strings(topics)
return topics
}
type humanOffset int64
func toHumanOffset(v int64) humanOffset {
return humanOffset(v)
}
func (offset humanOffset) Format(w fmt.State, _ rune) {
v := int64(offset)
switch v {
case FirstOffset:
fmt.Fprint(w, "first offset")
case LastOffset:
fmt.Fprint(w, "last offset")
default:
fmt.Fprint(w, strconv.FormatInt(v, 10))
}
}