You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

146 lines
4.0 KiB

package kafka
import (
"context"
"errors"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol"
)
const (
defaultCreateTopicsTimeout = 2 * time.Second
defaultDeleteTopicsTimeout = 2 * time.Second
defaultCreatePartitionsTimeout = 2 * time.Second
defaultProduceTimeout = 500 * time.Millisecond
defaultMaxWait = 500 * time.Millisecond
)
// Client is a high-level API to interract with kafka brokers.
//
// All methods of the Client type accept a context as first argument, which may
// be used to asynchronously cancel the requests.
//
// Clients are safe to use concurrently from multiple goroutines, as long as
// their configuration is not changed after first use.
type Client struct {
// Address of the kafka cluster (or specific broker) that the client will be
// sending requests to.
//
// This field is optional, the address may be provided in each request
// instead. The request address takes precedence if both were specified.
Addr net.Addr
// Time limit for requests sent by this client.
//
// If zero, no timeout is applied.
Timeout time.Duration
// A transport used to communicate with the kafka brokers.
//
// If nil, DefaultTransport is used.
Transport RoundTripper
}
// A ConsumerGroup and Topic as these are both strings we define a type for
// clarity when passing to the Client as a function argument
//
// N.B TopicAndGroup is currently experimental! Therefore, it is subject to
// change, including breaking changes between MINOR and PATCH releases.
//
// DEPRECATED: this type will be removed in version 1.0, programs should
// migrate to use kafka.(*Client).OffsetFetch instead.
type TopicAndGroup struct {
Topic string
GroupId string
}
// ConsumerOffsets returns a map[int]int64 of partition to committed offset for
// a consumer group id and topic.
//
// DEPRECATED: this method will be removed in version 1.0, programs should
// migrate to use kafka.(*Client).OffsetFetch instead.
func (c *Client) ConsumerOffsets(ctx context.Context, tg TopicAndGroup) (map[int]int64, error) {
metadata, err := c.Metadata(ctx, &MetadataRequest{
Topics: []string{tg.Topic},
})
if err != nil {
return nil, fmt.Errorf("failed to get topic metadata :%w", err)
}
topic := metadata.Topics[0]
partitions := make([]int, len(topic.Partitions))
for i := range topic.Partitions {
partitions[i] = topic.Partitions[i].ID
}
offsets, err := c.OffsetFetch(ctx, &OffsetFetchRequest{
GroupID: tg.GroupId,
Topics: map[string][]int{
tg.Topic: partitions,
},
})
if err != nil {
return nil, fmt.Errorf("failed to get offsets: %w", err)
}
topicOffsets := offsets.Topics[topic.Name]
partitionOffsets := make(map[int]int64, len(topicOffsets))
for _, off := range topicOffsets {
partitionOffsets[off.Partition] = off.CommittedOffset
}
return partitionOffsets, nil
}
func (c *Client) roundTrip(ctx context.Context, addr net.Addr, msg protocol.Message) (protocol.Message, error) {
if c.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, c.Timeout)
defer cancel()
}
if addr == nil {
if addr = c.Addr; addr == nil {
return nil, errors.New("no address was given for the kafka cluster in the request or on the client")
}
}
return c.transport().RoundTrip(ctx, addr, msg)
}
func (c *Client) transport() RoundTripper {
if c.Transport != nil {
return c.Transport
}
return DefaultTransport
}
func (c *Client) timeout(ctx context.Context, defaultTimeout time.Duration) time.Duration {
timeout := c.Timeout
if deadline, ok := ctx.Deadline(); ok {
if remain := time.Until(deadline); remain < timeout {
timeout = remain
}
}
if timeout > 0 {
// Half the timeout because it is communicated to kafka in multiple
// requests (e.g. Fetch, Produce, etc...), this adds buffer to account
// for network latency when waiting for the response from kafka.
return timeout / 2
}
return defaultTimeout
}
func (c *Client) timeoutMs(ctx context.Context, defaultTimeout time.Duration) int32 {
return milliseconds(c.timeout(ctx, defaultTimeout))
}