You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

133 lines
3.6 KiB

package kafka
import (
"context"
"net"
"github.com/segmentio/kafka-go/protocol/incrementalalterconfigs"
)
type ConfigOperation int8
const (
ConfigOperationSet ConfigOperation = 0
ConfigOperationDelete ConfigOperation = 1
ConfigOperationAppend ConfigOperation = 2
ConfigOperationSubtract ConfigOperation = 3
)
// IncrementalAlterConfigsRequest is a request to the IncrementalAlterConfigs API.
type IncrementalAlterConfigsRequest struct {
// Addr is the address of the kafka broker to send the request to.
Addr net.Addr
// Resources contains the list of resources to update configs for.
Resources []IncrementalAlterConfigsRequestResource
// ValidateOnly indicates whether Kafka should validate the changes without actually
// applying them.
ValidateOnly bool
}
// IncrementalAlterConfigsRequestResource contains the details of a single resource type whose
// configs should be altered.
type IncrementalAlterConfigsRequestResource struct {
// ResourceType is the type of resource to update.
ResourceType ResourceType
// ResourceName is the name of the resource to update (i.e., topic name or broker ID).
ResourceName string
// Configs contains the list of config key/values to update.
Configs []IncrementalAlterConfigsRequestConfig
}
// IncrementalAlterConfigsRequestConfig describes a single config key/value pair that should
// be altered.
type IncrementalAlterConfigsRequestConfig struct {
// Name is the name of the config.
Name string
// Value is the value to set for this config.
Value string
// ConfigOperation indicates how this config should be updated (e.g., add, delete, etc.).
ConfigOperation ConfigOperation
}
// IncrementalAlterConfigsResponse is a response from the IncrementalAlterConfigs API.
type IncrementalAlterConfigsResponse struct {
// Resources contains details of each resource config that was updated.
Resources []IncrementalAlterConfigsResponseResource
}
// IncrementalAlterConfigsResponseResource contains the response details for a single resource
// whose configs were updated.
type IncrementalAlterConfigsResponseResource struct {
// Error is set to a non-nil value if an error occurred while updating this specific
// config.
Error error
// ResourceType is the type of resource that was updated.
ResourceType ResourceType
// ResourceName is the name of the resource that was updated.
ResourceName string
}
func (c *Client) IncrementalAlterConfigs(
ctx context.Context,
req *IncrementalAlterConfigsRequest,
) (*IncrementalAlterConfigsResponse, error) {
apiReq := &incrementalalterconfigs.Request{
ValidateOnly: req.ValidateOnly,
}
for _, res := range req.Resources {
apiRes := incrementalalterconfigs.RequestResource{
ResourceType: int8(res.ResourceType),
ResourceName: res.ResourceName,
}
for _, config := range res.Configs {
apiRes.Configs = append(
apiRes.Configs,
incrementalalterconfigs.RequestConfig{
Name: config.Name,
Value: config.Value,
ConfigOperation: int8(config.ConfigOperation),
},
)
}
apiReq.Resources = append(
apiReq.Resources,
apiRes,
)
}
protoResp, err := c.roundTrip(
ctx,
req.Addr,
apiReq,
)
if err != nil {
return nil, err
}
resp := &IncrementalAlterConfigsResponse{}
apiResp := protoResp.(*incrementalalterconfigs.Response)
for _, res := range apiResp.Responses {
resp.Resources = append(
resp.Resources,
IncrementalAlterConfigsResponseResource{
Error: makeError(res.ErrorCode, res.ErrorMessage),
ResourceType: ResourceType(res.ResourceType),
ResourceName: res.ResourceName,
},
)
}
return resp, nil
}