You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
108 lines
3.1 KiB
108 lines
3.1 KiB
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"time"
|
|
|
|
"github.com/segmentio/kafka-go/protocol/createacls"
|
|
)
|
|
|
|
// CreateACLsRequest represents a request sent to a kafka broker to add
|
|
// new ACLs.
|
|
type CreateACLsRequest struct {
|
|
// Address of the kafka broker to send the request to.
|
|
Addr net.Addr
|
|
|
|
// List of ACL to create.
|
|
ACLs []ACLEntry
|
|
}
|
|
|
|
// CreateACLsResponse represents a response from a kafka broker to an ACL
|
|
// creation request.
|
|
type CreateACLsResponse struct {
|
|
// The amount of time that the broker throttled the request.
|
|
Throttle time.Duration
|
|
|
|
// List of errors that occurred while attempting to create
|
|
// the ACLs.
|
|
//
|
|
// The errors contain the kafka error code. Programs may use the standard
|
|
// errors.Is function to test the error against kafka error codes.
|
|
Errors []error
|
|
}
|
|
|
|
type ACLPermissionType int8
|
|
|
|
const (
|
|
ACLPermissionTypeUnknown ACLPermissionType = 0
|
|
ACLPermissionTypeAny ACLPermissionType = 1
|
|
ACLPermissionTypeDeny ACLPermissionType = 2
|
|
ACLPermissionTypeAllow ACLPermissionType = 3
|
|
)
|
|
|
|
type ACLOperationType int8
|
|
|
|
const (
|
|
ACLOperationTypeUnknown ACLOperationType = 0
|
|
ACLOperationTypeAny ACLOperationType = 1
|
|
ACLOperationTypeAll ACLOperationType = 2
|
|
ACLOperationTypeRead ACLOperationType = 3
|
|
ACLOperationTypeWrite ACLOperationType = 4
|
|
ACLOperationTypeCreate ACLOperationType = 5
|
|
ACLOperationTypeDelete ACLOperationType = 6
|
|
ACLOperationTypeAlter ACLOperationType = 7
|
|
ACLOperationTypeDescribe ACLOperationType = 8
|
|
ACLOperationTypeClusterAction ACLOperationType = 9
|
|
ACLOperationTypeDescribeConfigs ACLOperationType = 10
|
|
ACLOperationTypeAlterConfigs ACLOperationType = 11
|
|
ACLOperationTypeIdempotentWrite ACLOperationType = 12
|
|
)
|
|
|
|
type ACLEntry struct {
|
|
ResourceType ResourceType
|
|
ResourceName string
|
|
ResourcePatternType PatternType
|
|
Principal string
|
|
Host string
|
|
Operation ACLOperationType
|
|
PermissionType ACLPermissionType
|
|
}
|
|
|
|
// CreateACLs sends ACLs creation request to a kafka broker and returns the
|
|
// response.
|
|
func (c *Client) CreateACLs(ctx context.Context, req *CreateACLsRequest) (*CreateACLsResponse, error) {
|
|
acls := make([]createacls.RequestACLs, 0, len(req.ACLs))
|
|
|
|
for _, acl := range req.ACLs {
|
|
acls = append(acls, createacls.RequestACLs{
|
|
ResourceType: int8(acl.ResourceType),
|
|
ResourceName: acl.ResourceName,
|
|
ResourcePatternType: int8(acl.ResourcePatternType),
|
|
Principal: acl.Principal,
|
|
Host: acl.Host,
|
|
Operation: int8(acl.Operation),
|
|
PermissionType: int8(acl.PermissionType),
|
|
})
|
|
}
|
|
|
|
m, err := c.roundTrip(ctx, req.Addr, &createacls.Request{
|
|
Creations: acls,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("kafka.(*Client).CreateACLs: %w", err)
|
|
}
|
|
|
|
res := m.(*createacls.Response)
|
|
ret := &CreateACLsResponse{
|
|
Throttle: makeDuration(res.ThrottleTimeMs),
|
|
Errors: make([]error, 0, len(res.Results)),
|
|
}
|
|
|
|
for _, t := range res.Results {
|
|
ret.Errors = append(ret.Errors, makeError(t.ErrorCode, t.ErrorMessage))
|
|
}
|
|
|
|
return ret, nil
|
|
}
|
|
|