You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
107 lines
2.8 KiB
107 lines
2.8 KiB
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go/protocol/alterconfigs"
|
|
)
|
|
|
|
// AlterConfigsRequest represents a request sent to a kafka broker to alter configs.
|
|
type AlterConfigsRequest struct {
|
|
// Address of the kafka broker to send the request to.
|
|
Addr net.Addr
|
|
|
|
// List of resources to update.
|
|
Resources []AlterConfigRequestResource
|
|
|
|
// When set to true, topics are not created but the configuration is
|
|
// validated as if they were.
|
|
ValidateOnly bool
|
|
}
|
|
|
|
type AlterConfigRequestResource struct {
|
|
// Resource Type
|
|
ResourceType ResourceType
|
|
|
|
// Resource Name
|
|
ResourceName string
|
|
|
|
// Configs is a list of configuration updates.
|
|
Configs []AlterConfigRequestConfig
|
|
}
|
|
|
|
type AlterConfigRequestConfig struct {
|
|
// Configuration key name
|
|
Name string
|
|
|
|
// The value to set for the configuration key.
|
|
Value string
|
|
}
|
|
|
|
// AlterConfigsResponse represents a response from a kafka broker to an alter config request.
|
|
type AlterConfigsResponse struct {
|
|
// Duration for which the request was throttled due to a quota violation.
|
|
Throttle time.Duration
|
|
|
|
// Mapping of topic names to errors that occurred while attempting to create
|
|
// the topics.
|
|
//
|
|
// The errors contain the kafka error code. Programs may use the standard
|
|
// errors.Is function to test the error against kafka error codes.
|
|
Errors map[AlterConfigsResponseResource]error
|
|
}
|
|
|
|
// AlterConfigsResponseResource helps map errors to specific resources in an
|
|
// alter config response.
|
|
type AlterConfigsResponseResource struct {
|
|
Type int8
|
|
Name string
|
|
}
|
|
|
|
// AlterConfigs sends a config altering request to a kafka broker and returns the
|
|
// response.
|
|
func (c *Client) AlterConfigs(ctx context.Context, req *AlterConfigsRequest) (*AlterConfigsResponse, error) {
|
|
resources := make([]alterconfigs.RequestResources, len(req.Resources))
|
|
|
|
for i, t := range req.Resources {
|
|
configs := make([]alterconfigs.RequestConfig, len(t.Configs))
|
|
for j, v := range t.Configs {
|
|
configs[j] = alterconfigs.RequestConfig{
|
|
Name: v.Name,
|
|
Value: v.Value,
|
|
}
|
|
}
|
|
resources[i] = alterconfigs.RequestResources{
|
|
ResourceType: int8(t.ResourceType),
|
|
ResourceName: t.ResourceName,
|
|
Configs: configs,
|
|
}
|
|
}
|
|
|
|
m, err := c.roundTrip(ctx, req.Addr, &alterconfigs.Request{
|
|
Resources: resources,
|
|
ValidateOnly: req.ValidateOnly,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka.(*Client).AlterConfigs: %w", err)
|
|
}
|
|
|
|
res := m.(*alterconfigs.Response)
|
|
ret := &AlterConfigsResponse{
|
|
Throttle: makeDuration(res.ThrottleTimeMs),
|
|
Errors: make(map[AlterConfigsResponseResource]error, len(res.Responses)),
|
|
}
|
|
|
|
for _, t := range res.Responses {
|
|
ret.Errors[AlterConfigsResponseResource{
|
|
Type: t.ResourceType,
|
|
Name: t.ResourceName,
|
|
}] = makeError(t.ErrorCode, t.ErrorMessage)
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|