You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
133 lines
3.6 KiB
133 lines
3.6 KiB
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"net"
|
|
|
|
"github.com/segmentio/kafka-go/protocol/incrementalalterconfigs"
|
|
)
|
|
|
|
type ConfigOperation int8
|
|
|
|
const (
|
|
ConfigOperationSet ConfigOperation = 0
|
|
ConfigOperationDelete ConfigOperation = 1
|
|
ConfigOperationAppend ConfigOperation = 2
|
|
ConfigOperationSubtract ConfigOperation = 3
|
|
)
|
|
|
|
// IncrementalAlterConfigsRequest is a request to the IncrementalAlterConfigs API.
|
|
type IncrementalAlterConfigsRequest struct {
|
|
// Addr is the address of the kafka broker to send the request to.
|
|
Addr net.Addr
|
|
|
|
// Resources contains the list of resources to update configs for.
|
|
Resources []IncrementalAlterConfigsRequestResource
|
|
|
|
// ValidateOnly indicates whether Kafka should validate the changes without actually
|
|
// applying them.
|
|
ValidateOnly bool
|
|
}
|
|
|
|
// IncrementalAlterConfigsRequestResource contains the details of a single resource type whose
|
|
// configs should be altered.
|
|
type IncrementalAlterConfigsRequestResource struct {
|
|
// ResourceType is the type of resource to update.
|
|
ResourceType ResourceType
|
|
|
|
// ResourceName is the name of the resource to update (i.e., topic name or broker ID).
|
|
ResourceName string
|
|
|
|
// Configs contains the list of config key/values to update.
|
|
Configs []IncrementalAlterConfigsRequestConfig
|
|
}
|
|
|
|
// IncrementalAlterConfigsRequestConfig describes a single config key/value pair that should
|
|
// be altered.
|
|
type IncrementalAlterConfigsRequestConfig struct {
|
|
// Name is the name of the config.
|
|
Name string
|
|
|
|
// Value is the value to set for this config.
|
|
Value string
|
|
|
|
// ConfigOperation indicates how this config should be updated (e.g., add, delete, etc.).
|
|
ConfigOperation ConfigOperation
|
|
}
|
|
|
|
// IncrementalAlterConfigsResponse is a response from the IncrementalAlterConfigs API.
|
|
type IncrementalAlterConfigsResponse struct {
|
|
// Resources contains details of each resource config that was updated.
|
|
Resources []IncrementalAlterConfigsResponseResource
|
|
}
|
|
|
|
// IncrementalAlterConfigsResponseResource contains the response details for a single resource
|
|
// whose configs were updated.
|
|
type IncrementalAlterConfigsResponseResource struct {
|
|
// Error is set to a non-nil value if an error occurred while updating this specific
|
|
// config.
|
|
Error error
|
|
|
|
// ResourceType is the type of resource that was updated.
|
|
ResourceType ResourceType
|
|
|
|
// ResourceName is the name of the resource that was updated.
|
|
ResourceName string
|
|
}
|
|
|
|
func (c *Client) IncrementalAlterConfigs(
|
|
ctx context.Context,
|
|
req *IncrementalAlterConfigsRequest,
|
|
) (*IncrementalAlterConfigsResponse, error) {
|
|
apiReq := &incrementalalterconfigs.Request{
|
|
ValidateOnly: req.ValidateOnly,
|
|
}
|
|
|
|
for _, res := range req.Resources {
|
|
apiRes := incrementalalterconfigs.RequestResource{
|
|
ResourceType: int8(res.ResourceType),
|
|
ResourceName: res.ResourceName,
|
|
}
|
|
|
|
for _, config := range res.Configs {
|
|
apiRes.Configs = append(
|
|
apiRes.Configs,
|
|
incrementalalterconfigs.RequestConfig{
|
|
Name: config.Name,
|
|
Value: config.Value,
|
|
ConfigOperation: int8(config.ConfigOperation),
|
|
},
|
|
)
|
|
}
|
|
|
|
apiReq.Resources = append(
|
|
apiReq.Resources,
|
|
apiRes,
|
|
)
|
|
}
|
|
|
|
protoResp, err := c.roundTrip(
|
|
ctx,
|
|
req.Addr,
|
|
apiReq,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp := &IncrementalAlterConfigsResponse{}
|
|
|
|
apiResp := protoResp.(*incrementalalterconfigs.Response)
|
|
for _, res := range apiResp.Responses {
|
|
resp.Resources = append(
|
|
resp.Resources,
|
|
IncrementalAlterConfigsResponseResource{
|
|
Error: makeError(res.ErrorCode, res.ErrorMessage),
|
|
ResourceType: ResourceType(res.ResourceType),
|
|
ResourceName: res.ResourceName,
|
|
},
|
|
)
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|