You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
146 lines
4.0 KiB
146 lines
4.0 KiB
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go/protocol"
|
|
)
|
|
|
|
const (
|
|
defaultCreateTopicsTimeout = 2 * time.Second
|
|
defaultDeleteTopicsTimeout = 2 * time.Second
|
|
defaultCreatePartitionsTimeout = 2 * time.Second
|
|
defaultProduceTimeout = 500 * time.Millisecond
|
|
defaultMaxWait = 500 * time.Millisecond
|
|
)
|
|
|
|
// Client is a high-level API to interract with kafka brokers.
|
|
//
|
|
// All methods of the Client type accept a context as first argument, which may
|
|
// be used to asynchronously cancel the requests.
|
|
//
|
|
// Clients are safe to use concurrently from multiple goroutines, as long as
|
|
// their configuration is not changed after first use.
|
|
type Client struct {
|
|
// Address of the kafka cluster (or specific broker) that the client will be
|
|
// sending requests to.
|
|
//
|
|
// This field is optional, the address may be provided in each request
|
|
// instead. The request address takes precedence if both were specified.
|
|
Addr net.Addr
|
|
|
|
// Time limit for requests sent by this client.
|
|
//
|
|
// If zero, no timeout is applied.
|
|
Timeout time.Duration
|
|
|
|
// A transport used to communicate with the kafka brokers.
|
|
//
|
|
// If nil, DefaultTransport is used.
|
|
Transport RoundTripper
|
|
}
|
|
|
|
// A ConsumerGroup and Topic as these are both strings we define a type for
|
|
// clarity when passing to the Client as a function argument
|
|
//
|
|
// N.B TopicAndGroup is currently experimental! Therefore, it is subject to
|
|
// change, including breaking changes between MINOR and PATCH releases.
|
|
//
|
|
// DEPRECATED: this type will be removed in version 1.0, programs should
|
|
// migrate to use kafka.(*Client).OffsetFetch instead.
|
|
type TopicAndGroup struct {
|
|
Topic string
|
|
GroupId string
|
|
}
|
|
|
|
// ConsumerOffsets returns a map[int]int64 of partition to committed offset for
|
|
// a consumer group id and topic.
|
|
//
|
|
// DEPRECATED: this method will be removed in version 1.0, programs should
|
|
// migrate to use kafka.(*Client).OffsetFetch instead.
|
|
func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error) {
|
|
metadata, err := c.Metadata(ctx, &MetadataRequest{
|
|
Topics: []string{tg.Topic},
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get topic metadata :%w", err)
|
|
}
|
|
|
|
topic := metadata.Topics[0]
|
|
partitions := make([]int, len(topic.Partitions))
|
|
|
|
for i := range topic.Partitions {
|
|
partitions[i] = topic.Partitions[i].ID
|
|
}
|
|
|
|
offsets, err := c.OffsetFetch(ctx, &OffsetFetchRequest{
|
|
GroupID: tg.GroupId,
|
|
Topics: map[string][]int{
|
|
tg.Topic: partitions,
|
|
},
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get offsets: %w", err)
|
|
}
|
|
|
|
topicOffsets := offsets.Topics[topic.Name]
|
|
partitionOffsets := make(map[int]int64, len(topicOffsets))
|
|
|
|
for _, off := range topicOffsets {
|
|
partitionOffsets[off.Partition] = off.CommittedOffset
|
|
}
|
|
|
|
return partitionOffsets, nil
|
|
}
|
|
|
|
func (c *Client) roundTrip(ctx context.Context, addr net.Addr, msg protocol.Message) (protocol.Message, error) {
|
|
if c.Timeout > 0 {
|
|
var cancel context.CancelFunc
|
|
ctx, cancel = context.WithTimeout(ctx, c.Timeout)
|
|
defer cancel()
|
|
}
|
|
|
|
if addr == nil {
|
|
if addr = c.Addr; addr == nil {
|
|
return nil, errors.New("no address was given for the kafka cluster in the request or on the client")
|
|
}
|
|
}
|
|
|
|
return c.transport().RoundTrip(ctx, addr, msg)
|
|
}
|
|
|
|
func (c *Client) transport() RoundTripper {
|
|
if c.Transport != nil {
|
|
return c.Transport
|
|
}
|
|
return DefaultTransport
|
|
}
|
|
|
|
func (c *Client) timeout(ctx context.Context, defaultTimeout time.Duration) time.Duration {
|
|
timeout := c.Timeout
|
|
|
|
if deadline, ok := ctx.Deadline(); ok {
|
|
if remain := time.Until(deadline); remain < timeout {
|
|
timeout = remain
|
|
}
|
|
}
|
|
|
|
if timeout > 0 {
|
|
// Half the timeout because it is communicated to kafka in multiple
|
|
// requests (e.g. Fetch, Produce, etc...), this adds buffer to account
|
|
// for network latency when waiting for the response from kafka.
|
|
return timeout / 2
|
|
}
|
|
|
|
return defaultTimeout
|
|
}
|
|
|
|
func (c *Client) timeoutMs(ctx context.Context, defaultTimeout time.Duration) int32 {
|
|
return milliseconds(c.timeout(ctx, defaultTimeout))
|
|
}
|
|
|