You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
103 lines
2.7 KiB
103 lines
2.7 KiB
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go/protocol/createpartitions"
|
|
)
|
|
|
|
// CreatePartitionsRequest represents a request sent to a kafka broker to create
|
|
// and update topic parititions.
|
|
type CreatePartitionsRequest struct {
|
|
// Address of the kafka broker to send the request to.
|
|
Addr net.Addr
|
|
|
|
// List of topics to create and their configuration.
|
|
Topics []TopicPartitionsConfig
|
|
|
|
// When set to true, topics are not created but the configuration is
|
|
// validated as if they were.
|
|
ValidateOnly bool
|
|
}
|
|
|
|
// CreatePartitionsResponse represents a response from a kafka broker to a partition
|
|
// creation request.
|
|
type CreatePartitionsResponse struct {
|
|
// The amount of time that the broker throttled the request.
|
|
Throttle time.Duration
|
|
|
|
// Mapping of topic names to errors that occurred while attempting to create
|
|
// the topics.
|
|
//
|
|
// The errors contain the kafka error code. Programs may use the standard
|
|
// errors.Is function to test the error against kafka error codes.
|
|
Errors map[string]error
|
|
}
|
|
|
|
// CreatePartitions sends a partitions creation request to a kafka broker and returns the
|
|
// response.
|
|
func (c *Client) CreatePartitions(ctx context.Context, req *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
|
|
topics := make([]createpartitions.RequestTopic, len(req.Topics))
|
|
|
|
for i, t := range req.Topics {
|
|
topics[i] = createpartitions.RequestTopic{
|
|
Name: t.Name,
|
|
Count: t.Count,
|
|
Assignments: t.assignments(),
|
|
}
|
|
}
|
|
|
|
m, err := c.roundTrip(ctx, req.Addr, &createpartitions.Request{
|
|
Topics: topics,
|
|
TimeoutMs: c.timeoutMs(ctx, defaultCreatePartitionsTimeout),
|
|
ValidateOnly: req.ValidateOnly,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka.(*Client).CreatePartitions: %w", err)
|
|
}
|
|
|
|
res := m.(*createpartitions.Response)
|
|
ret := &CreatePartitionsResponse{
|
|
Throttle: makeDuration(res.ThrottleTimeMs),
|
|
Errors: make(map[string]error, len(res.Results)),
|
|
}
|
|
|
|
for _, t := range res.Results {
|
|
ret.Errors[t.Name] = makeError(t.ErrorCode, t.ErrorMessage)
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|
|
type TopicPartitionsConfig struct {
|
|
// Topic name
|
|
Name string
|
|
|
|
// Topic partition's count.
|
|
Count int32
|
|
|
|
// TopicPartitionAssignments among kafka brokers for this topic partitions.
|
|
TopicPartitionAssignments []TopicPartitionAssignment
|
|
}
|
|
|
|
func (t *TopicPartitionsConfig) assignments() []createpartitions.RequestAssignment {
|
|
if len(t.TopicPartitionAssignments) == 0 {
|
|
return nil
|
|
}
|
|
assignments := make([]createpartitions.RequestAssignment, len(t.TopicPartitionAssignments))
|
|
for i, a := range t.TopicPartitionAssignments {
|
|
assignments[i] = createpartitions.RequestAssignment{
|
|
BrokerIDs: a.BrokerIDs,
|
|
}
|
|
}
|
|
return assignments
|
|
}
|
|
|
|
type TopicPartitionAssignment struct {
|
|
// Broker IDs
|
|
BrokerIDs []int32
|
|
}
|
|
|