You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

162 lines
4.1 KiB

package kafka
import (
"context"
"fmt"
"net"
"time"
"github.com/segmentio/kafka-go/protocol/describeconfigs"
)
// DescribeConfigsRequest represents a request sent to a kafka broker to describe configs.
type DescribeConfigsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// List of resources to update.
Resources []DescribeConfigRequestResource
// Ignored if API version is less than v1
IncludeSynonyms bool
// Ignored if API version is less than v3
IncludeDocumentation bool
}
type DescribeConfigRequestResource struct {
// Resource Type
ResourceType ResourceType
// Resource Name
ResourceName string
// ConfigNames is a list of configurations to update.
ConfigNames []string
}
// DescribeConfigsResponse represents a response from a kafka broker to a describe config request.
type DescribeConfigsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
// Resources
Resources []DescribeConfigResponseResource
}
// DescribeConfigResponseResource.
type DescribeConfigResponseResource struct {
// Resource Type
ResourceType int8
// Resource Name
ResourceName string
// Error
Error error
// ConfigEntries
ConfigEntries []DescribeConfigResponseConfigEntry
}
// DescribeConfigResponseConfigEntry.
type DescribeConfigResponseConfigEntry struct {
ConfigName string
ConfigValue string
ReadOnly bool
// Ignored if API version is greater than v0
IsDefault bool
// Ignored if API version is less than v1
ConfigSource int8
IsSensitive bool
// Ignored if API version is less than v1
ConfigSynonyms []DescribeConfigResponseConfigSynonym
// Ignored if API version is less than v3
ConfigType int8
// Ignored if API version is less than v3
ConfigDocumentation string
}
// DescribeConfigResponseConfigSynonym.
type DescribeConfigResponseConfigSynonym struct {
// Ignored if API version is less than v1
ConfigName string
// Ignored if API version is less than v1
ConfigValue string
// Ignored if API version is less than v1
ConfigSource int8
}
// DescribeConfigs sends a config altering request to a kafka broker and returns the
// response.
func (c *Client) DescribeConfigs(ctx context.Context, req *DescribeConfigsRequest) (*DescribeConfigsResponse, error) {
resources := make([]describeconfigs.RequestResource, len(req.Resources))
for i, t := range req.Resources {
resources[i] = describeconfigs.RequestResource{
ResourceType: int8(t.ResourceType),
ResourceName: t.ResourceName,
ConfigNames: t.ConfigNames,
}
}
m, err := c.roundTrip(ctx, req.Addr, &describeconfigs.Request{
Resources: resources,
IncludeSynonyms: req.IncludeSynonyms,
IncludeDocumentation: req.IncludeDocumentation,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DescribeConfigs: %w", err)
}
res := m.(*describeconfigs.Response)
ret := &DescribeConfigsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Resources: make([]DescribeConfigResponseResource, len(res.Resources)),
}
for i, t := range res.Resources {
configEntries := make([]DescribeConfigResponseConfigEntry, len(t.ConfigEntries))
for j, v := range t.ConfigEntries {
configSynonyms := make([]DescribeConfigResponseConfigSynonym, len(v.ConfigSynonyms))
for k, cs := range v.ConfigSynonyms {
configSynonyms[k] = DescribeConfigResponseConfigSynonym{
ConfigName: cs.ConfigName,
ConfigValue: cs.ConfigValue,
ConfigSource: cs.ConfigSource,
}
}
configEntries[j] = DescribeConfigResponseConfigEntry{
ConfigName: v.ConfigName,
ConfigValue: v.ConfigValue,
ReadOnly: v.ReadOnly,
ConfigSource: v.ConfigSource,
IsDefault: v.IsDefault,
IsSensitive: v.IsSensitive,
ConfigSynonyms: configSynonyms,
ConfigType: v.ConfigType,
ConfigDocumentation: v.ConfigDocumentation,
}
}
ret.Resources[i] = DescribeConfigResponseResource{
ResourceType: t.ResourceType,
ResourceName: t.ResourceName,
Error: makeError(t.ErrorCode, t.ErrorMessage),
ConfigEntries: configEntries,
}
}
return ret, nil
}