You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1305 lines
38 KiB
1305 lines
38 KiB
package kafka
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
metadataAPI "github.com/segmentio/kafka-go/protocol/metadata"
|
|
)
|
|
|
|
// The Writer type provides the implementation of a producer of kafka messages
|
|
// that automatically distributes messages across partitions of a single topic
|
|
// using a configurable balancing policy.
|
|
//
|
|
// Writes manage the dispatch of messages across partitions of the topic they
|
|
// are configured to write to using a Balancer, and aggregate batches to
|
|
// optimize the writes to kafka.
|
|
//
|
|
// Writers may be configured to be used synchronously or asynchronously. When
|
|
// use synchronously, calls to WriteMessages block until the messages have been
|
|
// written to kafka. In this mode, the program should inspect the error returned
|
|
// by the function and test if it an instance of kafka.WriteErrors in order to
|
|
// identify which messages have succeeded or failed, for example:
|
|
//
|
|
// // Construct a synchronous writer (the default mode).
|
|
// w := &kafka.Writer{
|
|
// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
|
|
// Topic: "topic-A",
|
|
// RequiredAcks: kafka.RequireAll,
|
|
// }
|
|
//
|
|
// ...
|
|
//
|
|
// // Passing a context can prevent the operation from blocking indefinitely.
|
|
// switch err := w.WriteMessages(ctx, msgs...).(type) {
|
|
// case nil:
|
|
// case kafka.WriteErrors:
|
|
// for i := range msgs {
|
|
// if err[i] != nil {
|
|
// // handle the error writing msgs[i]
|
|
// ...
|
|
// }
|
|
// }
|
|
// default:
|
|
// // handle other errors
|
|
// ...
|
|
// }
|
|
//
|
|
// In asynchronous mode, the program may configure a completion handler on the
|
|
// writer to receive notifications of messages being written to kafka:
|
|
//
|
|
// w := &kafka.Writer{
|
|
// Addr: Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
|
|
// Topic: "topic-A",
|
|
// RequiredAcks: kafka.RequireAll,
|
|
// Async: true, // make the writer asynchronous
|
|
// Completion: func(messages []kafka.Message, err error) {
|
|
// ...
|
|
// },
|
|
// }
|
|
//
|
|
// ...
|
|
//
|
|
// // Because the writer is asynchronous, there is no need for the context to
|
|
// // be cancelled, the call will never block.
|
|
// if err := w.WriteMessages(context.Background(), msgs...); err != nil {
|
|
// // Only validation errors would be reported in this case.
|
|
// ...
|
|
// }
|
|
//
|
|
// Methods of Writer are safe to use concurrently from multiple goroutines,
|
|
// however the writer configuration should not be modified after first use.
|
|
type Writer struct {
|
|
// Address of the kafka cluster that this writer is configured to send
|
|
// messages to.
|
|
//
|
|
// This field is required, attempting to write messages to a writer with a
|
|
// nil address will error.
|
|
Addr net.Addr
|
|
|
|
// Topic is the name of the topic that the writer will produce messages to.
|
|
//
|
|
// Setting this field or not is a mutually exclusive option. If you set Topic
|
|
// here, you must not set Topic for any produced Message. Otherwise, if you do
|
|
// not set Topic, every Message must have Topic specified.
|
|
Topic string
|
|
|
|
// The balancer used to distribute messages across partitions.
|
|
//
|
|
// The default is to use a round-robin distribution.
|
|
Balancer Balancer
|
|
|
|
// Limit on how many attempts will be made to deliver a message.
|
|
//
|
|
// The default is to try at most 10 times.
|
|
MaxAttempts int
|
|
|
|
// WriteBackoffMin optionally sets the smallest amount of time the writer waits before
|
|
// it attempts to write a batch of messages
|
|
//
|
|
// Default: 100ms
|
|
WriteBackoffMin time.Duration
|
|
|
|
// WriteBackoffMax optionally sets the maximum amount of time the writer waits before
|
|
// it attempts to write a batch of messages
|
|
//
|
|
// Default: 1s
|
|
WriteBackoffMax time.Duration
|
|
|
|
// Limit on how many messages will be buffered before being sent to a
|
|
// partition.
|
|
//
|
|
// The default is to use a target batch size of 100 messages.
|
|
BatchSize int
|
|
|
|
// Limit the maximum size of a request in bytes before being sent to
|
|
// a partition.
|
|
//
|
|
// The default is to use a kafka default value of 1048576.
|
|
BatchBytes int64
|
|
|
|
// Time limit on how often incomplete message batches will be flushed to
|
|
// kafka.
|
|
//
|
|
// The default is to flush at least every second.
|
|
BatchTimeout time.Duration
|
|
|
|
// Timeout for read operations performed by the Writer.
|
|
//
|
|
// Defaults to 10 seconds.
|
|
ReadTimeout time.Duration
|
|
|
|
// Timeout for write operation performed by the Writer.
|
|
//
|
|
// Defaults to 10 seconds.
|
|
WriteTimeout time.Duration
|
|
|
|
// Number of acknowledges from partition replicas required before receiving
|
|
// a response to a produce request, the following values are supported:
|
|
//
|
|
// RequireNone (0) fire-and-forget, do not wait for acknowledgements from the
|
|
// RequireOne (1) wait for the leader to acknowledge the writes
|
|
// RequireAll (-1) wait for the full ISR to acknowledge the writes
|
|
//
|
|
// Defaults to RequireNone.
|
|
RequiredAcks RequiredAcks
|
|
|
|
// Setting this flag to true causes the WriteMessages method to never block.
|
|
// It also means that errors are ignored since the caller will not receive
|
|
// the returned value. Use this only if you don't care about guarantees of
|
|
// whether the messages were written to kafka.
|
|
//
|
|
// Defaults to false.
|
|
Async bool
|
|
|
|
// An optional function called when the writer succeeds or fails the
|
|
// delivery of messages to a kafka partition. When writing the messages
|
|
// fails, the `err` parameter will be non-nil.
|
|
//
|
|
// The messages that the Completion function is called with have their
|
|
// topic, partition, offset, and time set based on the Produce responses
|
|
// received from kafka. All messages passed to a call to the function have
|
|
// been written to the same partition. The keys and values of messages are
|
|
// referencing the original byte slices carried by messages in the calls to
|
|
// WriteMessages.
|
|
//
|
|
// The function is called from goroutines started by the writer. Calls to
|
|
// Close will block on the Completion function calls. When the Writer is
|
|
// not writing asynchronously, the WriteMessages call will also block on
|
|
// Completion function, which is a useful guarantee if the byte slices
|
|
// for the message keys and values are intended to be reused after the
|
|
// WriteMessages call returned.
|
|
//
|
|
// If a completion function panics, the program terminates because the
|
|
// panic is not recovered by the writer and bubbles up to the top of the
|
|
// goroutine's call stack.
|
|
Completion func(messages []Message, err error)
|
|
|
|
// Compression set the compression codec to be used to compress messages.
|
|
Compression Compression
|
|
|
|
// If not nil, specifies a logger used to report internal changes within the
|
|
// writer.
|
|
Logger Logger
|
|
|
|
// ErrorLogger is the logger used to report errors. If nil, the writer falls
|
|
// back to using Logger instead.
|
|
ErrorLogger Logger
|
|
|
|
// A transport used to send messages to kafka clusters.
|
|
//
|
|
// If nil, DefaultTransport is used.
|
|
Transport RoundTripper
|
|
|
|
// AllowAutoTopicCreation notifies writer to create topic if missing.
|
|
AllowAutoTopicCreation bool
|
|
|
|
// Manages the current set of partition-topic writers.
|
|
group sync.WaitGroup
|
|
mutex sync.Mutex
|
|
closed bool
|
|
writers map[topicPartition]*partitionWriter
|
|
|
|
// writer stats are all made of atomic values, no need for synchronization.
|
|
// Use a pointer to ensure 64-bit alignment of the values. The once value is
|
|
// used to lazily create the value when first used, allowing programs to use
|
|
// the zero-value value of Writer.
|
|
once sync.Once
|
|
*writerStats
|
|
|
|
// If no balancer is configured, the writer uses this one. RoundRobin values
|
|
// are safe to use concurrently from multiple goroutines, there is no need
|
|
// for extra synchronization to access this field.
|
|
roundRobin RoundRobin
|
|
|
|
// non-nil when a transport was created by NewWriter, remove in 1.0.
|
|
transport *Transport
|
|
}
|
|
|
|
// WriterConfig is a configuration type used to create new instances of Writer.
|
|
//
|
|
// DEPRECATED: writer values should be configured directly by assigning their
|
|
// exported fields. This type is kept for backward compatibility, and will be
|
|
// removed in version 1.0.
|
|
type WriterConfig struct {
|
|
// The list of brokers used to discover the partitions available on the
|
|
// kafka cluster.
|
|
//
|
|
// This field is required, attempting to create a writer with an empty list
|
|
// of brokers will panic.
|
|
Brokers []string
|
|
|
|
// The topic that the writer will produce messages to.
|
|
//
|
|
// If provided, this will be used to set the topic for all produced messages.
|
|
// If not provided, each Message must specify a topic for itself. This must be
|
|
// mutually exclusive, otherwise the Writer will return an error.
|
|
Topic string
|
|
|
|
// The dialer used by the writer to establish connections to the kafka
|
|
// cluster.
|
|
//
|
|
// If nil, the default dialer is used instead.
|
|
Dialer *Dialer
|
|
|
|
// The balancer used to distribute messages across partitions.
|
|
//
|
|
// The default is to use a round-robin distribution.
|
|
Balancer Balancer
|
|
|
|
// Limit on how many attempts will be made to deliver a message.
|
|
//
|
|
// The default is to try at most 10 times.
|
|
MaxAttempts int
|
|
|
|
// DEPRECATED: in versions prior to 0.4, the writer used channels internally
|
|
// to dispatch messages to partitions. This has been replaced by an in-memory
|
|
// aggregation of batches which uses shared state instead of message passing,
|
|
// making this option unnecessary.
|
|
QueueCapacity int
|
|
|
|
// Limit on how many messages will be buffered before being sent to a
|
|
// partition.
|
|
//
|
|
// The default is to use a target batch size of 100 messages.
|
|
BatchSize int
|
|
|
|
// Limit the maximum size of a request in bytes before being sent to
|
|
// a partition.
|
|
//
|
|
// The default is to use a kafka default value of 1048576.
|
|
BatchBytes int
|
|
|
|
// Time limit on how often incomplete message batches will be flushed to
|
|
// kafka.
|
|
//
|
|
// The default is to flush at least every second.
|
|
BatchTimeout time.Duration
|
|
|
|
// Timeout for read operations performed by the Writer.
|
|
//
|
|
// Defaults to 10 seconds.
|
|
ReadTimeout time.Duration
|
|
|
|
// Timeout for write operation performed by the Writer.
|
|
//
|
|
// Defaults to 10 seconds.
|
|
WriteTimeout time.Duration
|
|
|
|
// DEPRECATED: in versions prior to 0.4, the writer used to maintain a cache
|
|
// the topic layout. With the change to use a transport to manage connections,
|
|
// the responsibility of syncing the cluster layout has been delegated to the
|
|
// transport.
|
|
RebalanceInterval time.Duration
|
|
|
|
// DEPRECATED: in versions prior to 0.4, the writer used to manage connections
|
|
// to the kafka cluster directly. With the change to use a transport to manage
|
|
// connections, the writer has no connections to manage directly anymore.
|
|
IdleConnTimeout time.Duration
|
|
|
|
// Number of acknowledges from partition replicas required before receiving
|
|
// a response to a produce request. The default is -1, which means to wait for
|
|
// all replicas, and a value above 0 is required to indicate how many replicas
|
|
// should acknowledge a message to be considered successful.
|
|
//
|
|
// This version of kafka-go (v0.3) does not support 0 required acks, due to
|
|
// some internal complexity implementing this with the Kafka protocol. If you
|
|
// need that functionality specifically, you'll need to upgrade to v0.4.
|
|
RequiredAcks int
|
|
|
|
// Setting this flag to true causes the WriteMessages method to never block.
|
|
// It also means that errors are ignored since the caller will not receive
|
|
// the returned value. Use this only if you don't care about guarantees of
|
|
// whether the messages were written to kafka.
|
|
Async bool
|
|
|
|
// CompressionCodec set the codec to be used to compress Kafka messages.
|
|
CompressionCodec
|
|
|
|
// If not nil, specifies a logger used to report internal changes within the
|
|
// writer.
|
|
Logger Logger
|
|
|
|
// ErrorLogger is the logger used to report errors. If nil, the writer falls
|
|
// back to using Logger instead.
|
|
ErrorLogger Logger
|
|
}
|
|
|
|
type topicPartition struct {
|
|
topic string
|
|
partition int32
|
|
}
|
|
|
|
// Validate method validates WriterConfig properties.
|
|
func (config *WriterConfig) Validate() error {
|
|
if len(config.Brokers) == 0 {
|
|
return errors.New("cannot create a kafka writer with an empty list of brokers")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// WriterStats is a data structure returned by a call to Writer.Stats that
|
|
// exposes details about the behavior of the writer.
|
|
type WriterStats struct {
|
|
Writes int64 `metric:"kafka.writer.write.count" type:"counter"`
|
|
Messages int64 `metric:"kafka.writer.message.count" type:"counter"`
|
|
Bytes int64 `metric:"kafka.writer.message.bytes" type:"counter"`
|
|
Errors int64 `metric:"kafka.writer.error.count" type:"counter"`
|
|
|
|
BatchTime DurationStats `metric:"kafka.writer.batch.seconds"`
|
|
WriteTime DurationStats `metric:"kafka.writer.write.seconds"`
|
|
WaitTime DurationStats `metric:"kafka.writer.wait.seconds"`
|
|
Retries int64 `metric:"kafka.writer.retries.count" type:"counter"`
|
|
BatchSize SummaryStats `metric:"kafka.writer.batch.size"`
|
|
BatchBytes SummaryStats `metric:"kafka.writer.batch.bytes"`
|
|
|
|
MaxAttempts int64 `metric:"kafka.writer.attempts.max" type:"gauge"`
|
|
WriteBackoffMin time.Duration `metric:"kafka.writer.backoff.min" type:"gauge"`
|
|
WriteBackoffMax time.Duration `metric:"kafka.writer.backoff.max" type:"gauge"`
|
|
MaxBatchSize int64 `metric:"kafka.writer.batch.max" type:"gauge"`
|
|
BatchTimeout time.Duration `metric:"kafka.writer.batch.timeout" type:"gauge"`
|
|
ReadTimeout time.Duration `metric:"kafka.writer.read.timeout" type:"gauge"`
|
|
WriteTimeout time.Duration `metric:"kafka.writer.write.timeout" type:"gauge"`
|
|
RequiredAcks int64 `metric:"kafka.writer.acks.required" type:"gauge"`
|
|
Async bool `metric:"kafka.writer.async" type:"gauge"`
|
|
|
|
Topic string `tag:"topic"`
|
|
|
|
// DEPRECATED: these fields will only be reported for backward compatibility
|
|
// if the Writer was constructed with NewWriter.
|
|
Dials int64 `metric:"kafka.writer.dial.count" type:"counter"`
|
|
DialTime DurationStats `metric:"kafka.writer.dial.seconds"`
|
|
|
|
// DEPRECATED: these fields were meaningful prior to kafka-go 0.4, changes
|
|
// to the internal implementation and the introduction of the transport type
|
|
// made them unnecessary.
|
|
//
|
|
// The values will be zero but are left for backward compatibility to avoid
|
|
// breaking programs that used these fields.
|
|
Rebalances int64
|
|
RebalanceInterval time.Duration
|
|
QueueLength int64
|
|
QueueCapacity int64
|
|
ClientID string
|
|
}
|
|
|
|
// writerStats is a struct that contains statistics on a writer.
|
|
//
|
|
// Since atomic is used to mutate the statistics the values must be 64-bit aligned.
|
|
// This is easily accomplished by always allocating this struct directly, (i.e. using a pointer to the struct).
|
|
// See https://golang.org/pkg/sync/atomic/#pkg-note-BUG
|
|
type writerStats struct {
|
|
dials counter
|
|
writes counter
|
|
messages counter
|
|
bytes counter
|
|
errors counter
|
|
dialTime summary
|
|
batchTime summary
|
|
writeTime summary
|
|
waitTime summary
|
|
retries counter
|
|
batchSize summary
|
|
batchSizeBytes summary
|
|
}
|
|
|
|
// NewWriter creates and returns a new Writer configured with config.
|
|
//
|
|
// DEPRECATED: Writer value can be instantiated and configured directly,
|
|
// this function is retained for backward compatibility and will be removed
|
|
// in version 1.0.
|
|
func NewWriter(config WriterConfig) *Writer {
|
|
if err := config.Validate(); err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
if config.Dialer == nil {
|
|
config.Dialer = DefaultDialer
|
|
}
|
|
|
|
if config.Balancer == nil {
|
|
config.Balancer = &RoundRobin{}
|
|
}
|
|
|
|
// Converts the pre-0.4 Dialer API into a Transport.
|
|
kafkaDialer := DefaultDialer
|
|
if config.Dialer != nil {
|
|
kafkaDialer = config.Dialer
|
|
}
|
|
|
|
dialer := (&net.Dialer{
|
|
Timeout: kafkaDialer.Timeout,
|
|
Deadline: kafkaDialer.Deadline,
|
|
LocalAddr: kafkaDialer.LocalAddr,
|
|
DualStack: kafkaDialer.DualStack,
|
|
FallbackDelay: kafkaDialer.FallbackDelay,
|
|
KeepAlive: kafkaDialer.KeepAlive,
|
|
})
|
|
|
|
var resolver Resolver
|
|
if r, ok := kafkaDialer.Resolver.(*net.Resolver); ok {
|
|
dialer.Resolver = r
|
|
} else {
|
|
resolver = kafkaDialer.Resolver
|
|
}
|
|
|
|
stats := new(writerStats)
|
|
// For backward compatibility with the pre-0.4 APIs, support custom
|
|
// resolvers by wrapping the dial function.
|
|
dial := func(ctx context.Context, network, addr string) (net.Conn, error) {
|
|
start := time.Now()
|
|
defer func() {
|
|
stats.dials.observe(1)
|
|
stats.dialTime.observe(int64(time.Since(start)))
|
|
}()
|
|
address, err := lookupHost(ctx, addr, resolver)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return dialer.DialContext(ctx, network, address)
|
|
}
|
|
|
|
idleTimeout := config.IdleConnTimeout
|
|
if idleTimeout == 0 {
|
|
// Historical default value of WriterConfig.IdleTimeout, 9 minutes seems
|
|
// like it is way too long when there is no ping mechanism in the kafka
|
|
// protocol.
|
|
idleTimeout = 9 * time.Minute
|
|
}
|
|
|
|
metadataTTL := config.RebalanceInterval
|
|
if metadataTTL == 0 {
|
|
// Historical default value of WriterConfig.RebalanceInterval.
|
|
metadataTTL = 15 * time.Second
|
|
}
|
|
|
|
transport := &Transport{
|
|
Dial: dial,
|
|
SASL: kafkaDialer.SASLMechanism,
|
|
TLS: kafkaDialer.TLS,
|
|
ClientID: kafkaDialer.ClientID,
|
|
IdleTimeout: idleTimeout,
|
|
MetadataTTL: metadataTTL,
|
|
}
|
|
|
|
w := &Writer{
|
|
Addr: TCP(config.Brokers...),
|
|
Topic: config.Topic,
|
|
MaxAttempts: config.MaxAttempts,
|
|
BatchSize: config.BatchSize,
|
|
Balancer: config.Balancer,
|
|
BatchBytes: int64(config.BatchBytes),
|
|
BatchTimeout: config.BatchTimeout,
|
|
ReadTimeout: config.ReadTimeout,
|
|
WriteTimeout: config.WriteTimeout,
|
|
RequiredAcks: RequiredAcks(config.RequiredAcks),
|
|
Async: config.Async,
|
|
Logger: config.Logger,
|
|
ErrorLogger: config.ErrorLogger,
|
|
Transport: transport,
|
|
transport: transport,
|
|
writerStats: stats,
|
|
}
|
|
|
|
if config.RequiredAcks == 0 {
|
|
// Historically the writers created by NewWriter have used "all" as the
|
|
// default value when 0 was specified.
|
|
w.RequiredAcks = RequireAll
|
|
}
|
|
|
|
if config.CompressionCodec != nil {
|
|
w.Compression = Compression(config.CompressionCodec.Code())
|
|
}
|
|
|
|
return w
|
|
}
|
|
|
|
// enter is called by WriteMessages to indicate that a new inflight operation
|
|
// has started, which helps synchronize with Close and ensure that the method
|
|
// does not return until all inflight operations were completed.
|
|
func (w *Writer) enter() bool {
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
if w.closed {
|
|
return false
|
|
}
|
|
w.group.Add(1)
|
|
return true
|
|
}
|
|
|
|
// leave is called by WriteMessages to indicate that the inflight operation has
|
|
// completed.
|
|
func (w *Writer) leave() { w.group.Done() }
|
|
|
|
// spawn starts an new asynchronous operation on the writer. This method is used
|
|
// instead of starting goroutines inline to help manage the state of the
|
|
// writer's wait group. The wait group is used to block Close calls until all
|
|
// inflight operations have completed, therefore automatically including those
|
|
// started with calls to spawn.
|
|
func (w *Writer) spawn(f func()) {
|
|
w.group.Add(1)
|
|
go func() {
|
|
defer w.group.Done()
|
|
f()
|
|
}()
|
|
}
|
|
|
|
// Close flushes pending writes, and waits for all writes to complete before
|
|
// returning. Calling Close also prevents new writes from being submitted to
|
|
// the writer, further calls to WriteMessages and the like will fail with
|
|
// io.ErrClosedPipe.
|
|
func (w *Writer) Close() error {
|
|
w.mutex.Lock()
|
|
// Marking the writer as closed here causes future calls to WriteMessages to
|
|
// fail with io.ErrClosedPipe. Mutation of this field is synchronized on the
|
|
// writer's mutex to ensure that no more increments of the wait group are
|
|
// performed afterwards (which could otherwise race with the Wait below).
|
|
w.closed = true
|
|
|
|
// close all writers to trigger any pending batches
|
|
for _, writer := range w.writers {
|
|
writer.close()
|
|
}
|
|
|
|
for partition := range w.writers {
|
|
delete(w.writers, partition)
|
|
}
|
|
|
|
w.mutex.Unlock()
|
|
w.group.Wait()
|
|
|
|
if w.transport != nil {
|
|
w.transport.CloseIdleConnections()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// WriteMessages writes a batch of messages to the kafka topic configured on this
|
|
// writer.
|
|
//
|
|
// Unless the writer was configured to write messages asynchronously, the method
|
|
// blocks until all messages have been written, or until the maximum number of
|
|
// attempts was reached.
|
|
//
|
|
// When sending synchronously and the writer's batch size is configured to be
|
|
// greater than 1, this method blocks until either a full batch can be assembled
|
|
// or the batch timeout is reached. The batch size and timeouts are evaluated
|
|
// per partition, so the choice of Balancer can also influence the flushing
|
|
// behavior. For example, the Hash balancer will require on average N * batch
|
|
// size messages to trigger a flush where N is the number of partitions. The
|
|
// best way to achieve good batching behavior is to share one Writer amongst
|
|
// multiple go routines.
|
|
//
|
|
// When the method returns an error, it may be of type kafka.WriteError to allow
|
|
// the caller to determine the status of each message.
|
|
//
|
|
// The context passed as first argument may also be used to asynchronously
|
|
// cancel the operation. Note that in this case there are no guarantees made on
|
|
// whether messages were written to kafka. The program should assume that the
|
|
// whole batch failed and re-write the messages later (which could then cause
|
|
// duplicates).
|
|
func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
|
|
if w.Addr == nil {
|
|
return errors.New("kafka.(*Writer).WriteMessages: cannot create a kafka writer with a nil address")
|
|
}
|
|
|
|
if !w.enter() {
|
|
return io.ErrClosedPipe
|
|
}
|
|
defer w.leave()
|
|
|
|
if len(msgs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
balancer := w.balancer()
|
|
batchBytes := w.batchBytes()
|
|
|
|
for i := range msgs {
|
|
n := int64(msgs[i].size())
|
|
if n > batchBytes {
|
|
// This error is left for backward compatibility with historical
|
|
// behavior, but it can yield O(N^2) behaviors. The expectations
|
|
// are that the program will check if WriteMessages returned a
|
|
// MessageTooLargeError, discard the message that was exceeding
|
|
// the maximum size, and try again.
|
|
return messageTooLarge(msgs, i)
|
|
}
|
|
}
|
|
|
|
// We use int32 here to half the memory footprint (compared to using int
|
|
// on 64 bits architectures). We map lists of the message indexes instead
|
|
// of the message values for the same reason, int32 is 4 bytes, vs a full
|
|
// Message value which is 100+ bytes and contains pointers and contributes
|
|
// to increasing GC work.
|
|
assignments := make(map[topicPartition][]int32)
|
|
|
|
for i, msg := range msgs {
|
|
topic, err := w.chooseTopic(msg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
numPartitions, err := w.partitions(ctx, topic)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
partition := balancer.Balance(msg, loadCachedPartitions(numPartitions)...)
|
|
|
|
key := topicPartition{
|
|
topic: topic,
|
|
partition: int32(partition),
|
|
}
|
|
|
|
assignments[key] = append(assignments[key], int32(i))
|
|
}
|
|
|
|
batches := w.batchMessages(msgs, assignments)
|
|
if w.Async {
|
|
return nil
|
|
}
|
|
|
|
done := ctx.Done()
|
|
hasErrors := false
|
|
for batch := range batches {
|
|
select {
|
|
case <-done:
|
|
return ctx.Err()
|
|
case <-batch.done:
|
|
if batch.err != nil {
|
|
hasErrors = true
|
|
}
|
|
}
|
|
}
|
|
|
|
if !hasErrors {
|
|
return nil
|
|
}
|
|
|
|
werr := make(WriteErrors, len(msgs))
|
|
|
|
for batch, indexes := range batches {
|
|
for _, i := range indexes {
|
|
werr[i] = batch.err
|
|
}
|
|
}
|
|
return werr
|
|
}
|
|
|
|
func (w *Writer) batchMessages(messages []Message, assignments map[topicPartition][]int32) map[*writeBatch][]int32 {
|
|
var batches map[*writeBatch][]int32
|
|
if !w.Async {
|
|
batches = make(map[*writeBatch][]int32, len(assignments))
|
|
}
|
|
|
|
w.mutex.Lock()
|
|
defer w.mutex.Unlock()
|
|
|
|
if w.writers == nil {
|
|
w.writers = map[topicPartition]*partitionWriter{}
|
|
}
|
|
|
|
for key, indexes := range assignments {
|
|
writer := w.writers[key]
|
|
if writer == nil {
|
|
writer = newPartitionWriter(w, key)
|
|
w.writers[key] = writer
|
|
}
|
|
wbatches := writer.writeMessages(messages, indexes)
|
|
|
|
for batch, idxs := range wbatches {
|
|
batches[batch] = idxs
|
|
}
|
|
}
|
|
|
|
return batches
|
|
}
|
|
|
|
func (w *Writer) produce(key topicPartition, batch *writeBatch) (*ProduceResponse, error) {
|
|
timeout := w.writeTimeout()
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
return w.client(timeout).Produce(ctx, &ProduceRequest{
|
|
Partition: int(key.partition),
|
|
Topic: key.topic,
|
|
RequiredAcks: w.RequiredAcks,
|
|
Compression: w.Compression,
|
|
Records: &writerRecords{
|
|
msgs: batch.msgs,
|
|
},
|
|
})
|
|
}
|
|
|
|
func (w *Writer) partitions(ctx context.Context, topic string) (int, error) {
|
|
client := w.client(w.readTimeout())
|
|
// Here we use the transport directly as an optimization to avoid the
|
|
// construction of temporary request and response objects made by the
|
|
// (*Client).Metadata API.
|
|
//
|
|
// It is expected that the transport will optimize this request by
|
|
// caching recent results (the kafka.Transport types does).
|
|
r, err := client.transport().RoundTrip(ctx, client.Addr, &metadataAPI.Request{
|
|
TopicNames: []string{topic},
|
|
AllowAutoTopicCreation: w.AllowAutoTopicCreation,
|
|
})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
for _, t := range r.(*metadataAPI.Response).Topics {
|
|
if t.Name == topic {
|
|
// This should always hit, unless kafka has a bug.
|
|
if t.ErrorCode != 0 {
|
|
return 0, Error(t.ErrorCode)
|
|
}
|
|
return len(t.Partitions), nil
|
|
}
|
|
}
|
|
return 0, UnknownTopicOrPartition
|
|
}
|
|
|
|
func (w *Writer) client(timeout time.Duration) *Client {
|
|
return &Client{
|
|
Addr: w.Addr,
|
|
Transport: w.Transport,
|
|
Timeout: timeout,
|
|
}
|
|
}
|
|
|
|
func (w *Writer) balancer() Balancer {
|
|
if w.Balancer != nil {
|
|
return w.Balancer
|
|
}
|
|
return &w.roundRobin
|
|
}
|
|
|
|
func (w *Writer) maxAttempts() int {
|
|
if w.MaxAttempts > 0 {
|
|
return w.MaxAttempts
|
|
}
|
|
// TODO: this is a very high default, if something has failed 9 times it
|
|
// seems unlikely it will succeed on the 10th attempt. However, it does
|
|
// carry the risk to greatly increase the volume of requests sent to the
|
|
// kafka cluster. We should consider reducing this default (3?).
|
|
return 10
|
|
}
|
|
|
|
func (w *Writer) writeBackoffMin() time.Duration {
|
|
if w.WriteBackoffMin > 0 {
|
|
return w.WriteBackoffMin
|
|
}
|
|
return 100 * time.Millisecond
|
|
}
|
|
|
|
func (w *Writer) writeBackoffMax() time.Duration {
|
|
if w.WriteBackoffMax > 0 {
|
|
return w.WriteBackoffMax
|
|
}
|
|
return 1 * time.Second
|
|
}
|
|
|
|
func (w *Writer) batchSize() int {
|
|
if w.BatchSize > 0 {
|
|
return w.BatchSize
|
|
}
|
|
return 100
|
|
}
|
|
|
|
func (w *Writer) batchBytes() int64 {
|
|
if w.BatchBytes > 0 {
|
|
return w.BatchBytes
|
|
}
|
|
return 1048576
|
|
}
|
|
|
|
func (w *Writer) batchTimeout() time.Duration {
|
|
if w.BatchTimeout > 0 {
|
|
return w.BatchTimeout
|
|
}
|
|
return 1 * time.Second
|
|
}
|
|
|
|
func (w *Writer) readTimeout() time.Duration {
|
|
if w.ReadTimeout > 0 {
|
|
return w.ReadTimeout
|
|
}
|
|
return 10 * time.Second
|
|
}
|
|
|
|
func (w *Writer) writeTimeout() time.Duration {
|
|
if w.WriteTimeout > 0 {
|
|
return w.WriteTimeout
|
|
}
|
|
return 10 * time.Second
|
|
}
|
|
|
|
func (w *Writer) withLogger(do func(Logger)) {
|
|
if w.Logger != nil {
|
|
do(w.Logger)
|
|
}
|
|
}
|
|
|
|
func (w *Writer) withErrorLogger(do func(Logger)) {
|
|
if w.ErrorLogger != nil {
|
|
do(w.ErrorLogger)
|
|
} else {
|
|
w.withLogger(do)
|
|
}
|
|
}
|
|
|
|
func (w *Writer) stats() *writerStats {
|
|
w.once.Do(func() {
|
|
// This field is not nil when the writer was constructed with NewWriter
|
|
// to share the value with the dial function and count dials.
|
|
if w.writerStats == nil {
|
|
w.writerStats = new(writerStats)
|
|
}
|
|
})
|
|
return w.writerStats
|
|
}
|
|
|
|
// Stats returns a snapshot of the writer stats since the last time the method
|
|
// was called, or since the writer was created if it is called for the first
|
|
// time.
|
|
//
|
|
// A typical use of this method is to spawn a goroutine that will periodically
|
|
// call Stats on a kafka writer and report the metrics to a stats collection
|
|
// system.
|
|
func (w *Writer) Stats() WriterStats {
|
|
stats := w.stats()
|
|
return WriterStats{
|
|
Dials: stats.dials.snapshot(),
|
|
Writes: stats.writes.snapshot(),
|
|
Messages: stats.messages.snapshot(),
|
|
Bytes: stats.bytes.snapshot(),
|
|
Errors: stats.errors.snapshot(),
|
|
DialTime: stats.dialTime.snapshotDuration(),
|
|
BatchTime: stats.batchTime.snapshotDuration(),
|
|
WriteTime: stats.writeTime.snapshotDuration(),
|
|
WaitTime: stats.waitTime.snapshotDuration(),
|
|
Retries: stats.retries.snapshot(),
|
|
BatchSize: stats.batchSize.snapshot(),
|
|
BatchBytes: stats.batchSizeBytes.snapshot(),
|
|
MaxAttempts: int64(w.MaxAttempts),
|
|
WriteBackoffMin: w.WriteBackoffMin,
|
|
WriteBackoffMax: w.WriteBackoffMax,
|
|
MaxBatchSize: int64(w.BatchSize),
|
|
BatchTimeout: w.BatchTimeout,
|
|
ReadTimeout: w.ReadTimeout,
|
|
WriteTimeout: w.WriteTimeout,
|
|
RequiredAcks: int64(w.RequiredAcks),
|
|
Async: w.Async,
|
|
Topic: w.Topic,
|
|
}
|
|
}
|
|
|
|
func (w *Writer) chooseTopic(msg Message) (string, error) {
|
|
// w.Topic and msg.Topic are mutually exclusive, meaning only 1 must be set
|
|
// otherwise we will return an error.
|
|
if w.Topic != "" && msg.Topic != "" {
|
|
return "", errors.New("kafka.(*Writer): Topic must not be specified for both Writer and Message")
|
|
} else if w.Topic == "" && msg.Topic == "" {
|
|
return "", errors.New("kafka.(*Writer): Topic must be specified for Writer or Message")
|
|
}
|
|
|
|
// now we choose the topic, depending on which one is not empty
|
|
if msg.Topic != "" {
|
|
return msg.Topic, nil
|
|
}
|
|
|
|
return w.Topic, nil
|
|
}
|
|
|
|
type batchQueue struct {
|
|
queue []*writeBatch
|
|
|
|
// Pointers are used here to make `go vet` happy, and avoid copying mutexes.
|
|
// It may be better to revert these to non-pointers and avoid the copies in
|
|
// a different way.
|
|
mutex *sync.Mutex
|
|
cond *sync.Cond
|
|
|
|
closed bool
|
|
}
|
|
|
|
func (b *batchQueue) Put(batch *writeBatch) bool {
|
|
b.cond.L.Lock()
|
|
defer b.cond.L.Unlock()
|
|
defer b.cond.Broadcast()
|
|
|
|
if b.closed {
|
|
return false
|
|
}
|
|
b.queue = append(b.queue, batch)
|
|
return true
|
|
}
|
|
|
|
func (b *batchQueue) Get() *writeBatch {
|
|
b.cond.L.Lock()
|
|
defer b.cond.L.Unlock()
|
|
|
|
for len(b.queue) == 0 && !b.closed {
|
|
b.cond.Wait()
|
|
}
|
|
|
|
if len(b.queue) == 0 {
|
|
return nil
|
|
}
|
|
|
|
batch := b.queue[0]
|
|
b.queue[0] = nil
|
|
b.queue = b.queue[1:]
|
|
|
|
return batch
|
|
}
|
|
|
|
func (b *batchQueue) Close() {
|
|
b.cond.L.Lock()
|
|
defer b.cond.L.Unlock()
|
|
defer b.cond.Broadcast()
|
|
|
|
b.closed = true
|
|
}
|
|
|
|
func newBatchQueue(initialSize int) batchQueue {
|
|
bq := batchQueue{
|
|
queue: make([]*writeBatch, 0, initialSize),
|
|
mutex: &sync.Mutex{},
|
|
cond: &sync.Cond{},
|
|
}
|
|
|
|
bq.cond.L = bq.mutex
|
|
|
|
return bq
|
|
}
|
|
|
|
// partitionWriter is a writer for a topic-partion pair. It maintains messaging order
|
|
// across batches of messages.
|
|
type partitionWriter struct {
|
|
meta topicPartition
|
|
queue batchQueue
|
|
|
|
mutex sync.Mutex
|
|
currBatch *writeBatch
|
|
|
|
// reference to the writer that owns this batch. Used for the produce logic
|
|
// as well as stat tracking
|
|
w *Writer
|
|
}
|
|
|
|
func newPartitionWriter(w *Writer, key topicPartition) *partitionWriter {
|
|
writer := &partitionWriter{
|
|
meta: key,
|
|
queue: newBatchQueue(10),
|
|
w: w,
|
|
}
|
|
w.spawn(writer.writeBatches)
|
|
return writer
|
|
}
|
|
|
|
func (ptw *partitionWriter) writeBatches() {
|
|
for {
|
|
batch := ptw.queue.Get()
|
|
|
|
// The only time we can return nil is when the queue is closed
|
|
// and empty. If the queue is closed that means
|
|
// the Writer is closed so once we're here it's time to exit.
|
|
if batch == nil {
|
|
return
|
|
}
|
|
|
|
ptw.writeBatch(batch)
|
|
}
|
|
}
|
|
|
|
func (ptw *partitionWriter) writeMessages(msgs []Message, indexes []int32) map[*writeBatch][]int32 {
|
|
ptw.mutex.Lock()
|
|
defer ptw.mutex.Unlock()
|
|
|
|
batchSize := ptw.w.batchSize()
|
|
batchBytes := ptw.w.batchBytes()
|
|
|
|
var batches map[*writeBatch][]int32
|
|
if !ptw.w.Async {
|
|
batches = make(map[*writeBatch][]int32, 1)
|
|
}
|
|
|
|
for _, i := range indexes {
|
|
assignMessage:
|
|
batch := ptw.currBatch
|
|
if batch == nil {
|
|
batch = ptw.newWriteBatch()
|
|
ptw.currBatch = batch
|
|
}
|
|
if !batch.add(msgs[i], batchSize, batchBytes) {
|
|
batch.trigger()
|
|
ptw.queue.Put(batch)
|
|
ptw.currBatch = nil
|
|
goto assignMessage
|
|
}
|
|
|
|
if batch.full(batchSize, batchBytes) {
|
|
batch.trigger()
|
|
ptw.queue.Put(batch)
|
|
ptw.currBatch = nil
|
|
}
|
|
|
|
if !ptw.w.Async {
|
|
batches[batch] = append(batches[batch], i)
|
|
}
|
|
}
|
|
return batches
|
|
}
|
|
|
|
// ptw.w can be accessed here because this is called with the lock ptw.mutex already held.
|
|
func (ptw *partitionWriter) newWriteBatch() *writeBatch {
|
|
batch := newWriteBatch(time.Now(), ptw.w.batchTimeout())
|
|
ptw.w.spawn(func() { ptw.awaitBatch(batch) })
|
|
return batch
|
|
}
|
|
|
|
// awaitBatch waits for a batch to either fill up or time out.
|
|
// If the batch is full it only stops the timer, if the timer
|
|
// expires it will queue the batch for writing if needed.
|
|
func (ptw *partitionWriter) awaitBatch(batch *writeBatch) {
|
|
select {
|
|
case <-batch.timer.C:
|
|
ptw.mutex.Lock()
|
|
// detach the batch from the writer if we're still attached
|
|
// and queue for writing.
|
|
// Only the current batch can expire, all previous batches were already written to the queue.
|
|
// If writeMesseages locks pw.mutex after the timer fires but before this goroutine
|
|
// can lock pw.mutex it will either have filled the batch and enqueued it which will mean
|
|
// pw.currBatch != batch so we just move on.
|
|
// Otherwise, we detach the batch from the ptWriter and enqueue it for writing.
|
|
if ptw.currBatch == batch {
|
|
ptw.queue.Put(batch)
|
|
ptw.currBatch = nil
|
|
}
|
|
ptw.mutex.Unlock()
|
|
case <-batch.ready:
|
|
// The batch became full, it was removed from the ptwriter and its
|
|
// ready channel was closed. We need to close the timer to avoid
|
|
// having it leak until it expires.
|
|
batch.timer.Stop()
|
|
}
|
|
}
|
|
|
|
func (ptw *partitionWriter) writeBatch(batch *writeBatch) {
|
|
stats := ptw.w.stats()
|
|
stats.batchTime.observe(int64(time.Since(batch.time)))
|
|
stats.batchSize.observe(int64(len(batch.msgs)))
|
|
stats.batchSizeBytes.observe(batch.bytes)
|
|
|
|
var res *ProduceResponse
|
|
var err error
|
|
key := ptw.meta
|
|
for attempt, maxAttempts := 0, ptw.w.maxAttempts(); attempt < maxAttempts; attempt++ {
|
|
if attempt != 0 {
|
|
stats.retries.observe(1)
|
|
// TODO: should there be a way to asynchronously cancel this
|
|
// operation?
|
|
//
|
|
// * If all goroutines that added message to this batch have stopped
|
|
// waiting for it, should we abort?
|
|
//
|
|
// * If the writer has been closed? It reduces the durability
|
|
// guarantees to abort, but may be better to avoid long wait times
|
|
// on close.
|
|
//
|
|
delay := backoff(attempt, ptw.w.writeBackoffMin(), ptw.w.writeBackoffMax())
|
|
ptw.w.withLogger(func(log Logger) {
|
|
log.Printf("backing off %s writing %d messages to %s (partition: %d)", delay, len(batch.msgs), key.topic, key.partition)
|
|
})
|
|
time.Sleep(delay)
|
|
}
|
|
|
|
ptw.w.withLogger(func(log Logger) {
|
|
log.Printf("writing %d messages to %s (partition: %d)", len(batch.msgs), key.topic, key.partition)
|
|
})
|
|
|
|
start := time.Now()
|
|
res, err = ptw.w.produce(key, batch)
|
|
|
|
stats.writes.observe(1)
|
|
stats.messages.observe(int64(len(batch.msgs)))
|
|
stats.bytes.observe(batch.bytes)
|
|
// stats.writeTime used to report the duration of WriteMessages, but the
|
|
// implementation was broken and reporting values in the nanoseconds
|
|
// range. In kafka-go 0.4, we recylced this value to instead report the
|
|
// duration of produce requests, and changed the stats.waitTime value to
|
|
// report the time that kafka has throttled the requests for.
|
|
stats.writeTime.observe(int64(time.Since(start)))
|
|
|
|
if res != nil {
|
|
err = res.Error
|
|
stats.waitTime.observe(int64(res.Throttle))
|
|
}
|
|
|
|
if err == nil {
|
|
break
|
|
}
|
|
|
|
stats.errors.observe(1)
|
|
|
|
ptw.w.withErrorLogger(func(log Logger) {
|
|
log.Printf("error writing messages to %s (partition %d): %s", key.topic, key.partition, err)
|
|
})
|
|
|
|
if !isTemporary(err) && !isTransientNetworkError(err) {
|
|
break
|
|
}
|
|
}
|
|
|
|
if res != nil {
|
|
for i := range batch.msgs {
|
|
m := &batch.msgs[i]
|
|
m.Topic = key.topic
|
|
m.Partition = int(key.partition)
|
|
m.Offset = res.BaseOffset + int64(i)
|
|
|
|
if m.Time.IsZero() {
|
|
m.Time = res.LogAppendTime
|
|
}
|
|
}
|
|
}
|
|
|
|
if ptw.w.Completion != nil {
|
|
ptw.w.Completion(batch.msgs, err)
|
|
}
|
|
|
|
batch.complete(err)
|
|
}
|
|
|
|
func (ptw *partitionWriter) close() {
|
|
ptw.mutex.Lock()
|
|
defer ptw.mutex.Unlock()
|
|
|
|
if ptw.currBatch != nil {
|
|
batch := ptw.currBatch
|
|
ptw.queue.Put(batch)
|
|
ptw.currBatch = nil
|
|
batch.trigger()
|
|
}
|
|
|
|
ptw.queue.Close()
|
|
}
|
|
|
|
type writeBatch struct {
|
|
time time.Time
|
|
msgs []Message
|
|
size int
|
|
bytes int64
|
|
ready chan struct{}
|
|
done chan struct{}
|
|
timer *time.Timer
|
|
err error // result of the batch completion
|
|
}
|
|
|
|
func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch {
|
|
return &writeBatch{
|
|
time: now,
|
|
ready: make(chan struct{}),
|
|
done: make(chan struct{}),
|
|
timer: time.NewTimer(timeout),
|
|
}
|
|
}
|
|
|
|
func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool {
|
|
bytes := int64(msg.size())
|
|
|
|
if b.size > 0 && (b.bytes+bytes) > maxBytes {
|
|
return false
|
|
}
|
|
|
|
if cap(b.msgs) == 0 {
|
|
b.msgs = make([]Message, 0, maxSize)
|
|
}
|
|
|
|
b.msgs = append(b.msgs, msg)
|
|
b.size++
|
|
b.bytes += bytes
|
|
return true
|
|
}
|
|
|
|
func (b *writeBatch) full(maxSize int, maxBytes int64) bool {
|
|
return b.size >= maxSize || b.bytes >= maxBytes
|
|
}
|
|
|
|
func (b *writeBatch) trigger() {
|
|
close(b.ready)
|
|
}
|
|
|
|
func (b *writeBatch) complete(err error) {
|
|
b.err = err
|
|
close(b.done)
|
|
}
|
|
|
|
type writerRecords struct {
|
|
msgs []Message
|
|
index int
|
|
record Record
|
|
key bytesReadCloser
|
|
value bytesReadCloser
|
|
}
|
|
|
|
func (r *writerRecords) ReadRecord() (*Record, error) {
|
|
if r.index >= 0 && r.index < len(r.msgs) {
|
|
m := &r.msgs[r.index]
|
|
r.index++
|
|
r.record = Record{
|
|
Time: m.Time,
|
|
Headers: m.Headers,
|
|
}
|
|
if m.Key != nil {
|
|
r.key.Reset(m.Key)
|
|
r.record.Key = &r.key
|
|
}
|
|
if m.Value != nil {
|
|
r.value.Reset(m.Value)
|
|
r.record.Value = &r.value
|
|
}
|
|
return &r.record, nil
|
|
}
|
|
return nil, io.EOF
|
|
}
|
|
|
|
type bytesReadCloser struct{ bytes.Reader }
|
|
|
|
func (*bytesReadCloser) Close() error { return nil }
|
|
|
|
// A cache of []int values passed to balancers of writers, used to amortize the
|
|
// heap allocation of the partition index lists.
|
|
//
|
|
// With hindsight, the use of `...int` to pass the partition list to Balancers
|
|
// was not the best design choice: kafka partition numbers are monotonically
|
|
// increasing, we could have simply passed the number of partitions instead.
|
|
// If we ever revisit this API, we can hopefully remove this cache.
|
|
var partitionsCache atomic.Value
|
|
|
|
func loadCachedPartitions(numPartitions int) []int {
|
|
partitions, ok := partitionsCache.Load().([]int)
|
|
if ok && len(partitions) >= numPartitions {
|
|
return partitions[:numPartitions]
|
|
}
|
|
|
|
const alignment = 128
|
|
n := ((numPartitions / alignment) + 1) * alignment
|
|
|
|
partitions = make([]int, n)
|
|
for i := range partitions {
|
|
partitions[i] = i
|
|
}
|
|
|
|
partitionsCache.Store(partitions)
|
|
return partitions[:numPartitions]
|
|
}
|
|
|