diff --git a/internal/transport/kafka/balancer.go b/internal/transport/kafka/balancer.go new file mode 100644 index 0000000..7fa2aef --- /dev/null +++ b/internal/transport/kafka/balancer.go @@ -0,0 +1,13 @@ +package kafka + +import "github.com/segmentio/kafka-go" + +// Собственный балансировщик для определения номера партиции +// в которую попадет очередное сообщение +type MyBalancer struct { + Cool bool +} + +func (s *MyBalancer) Balance(msg kafka.Message, partitions ...int) (partition int) { + return 0 +} diff --git a/internal/transport/kafka/writer.go b/internal/transport/kafka/writer.go index 6954f0c..a760c22 100644 --- a/internal/transport/kafka/writer.go +++ b/internal/transport/kafka/writer.go @@ -95,7 +95,8 @@ func NewWriter(ctx context.Context, logger *logger.Logger, topic string, address s := &KafkaWriter{ ctx: ctx, writer: &kafka.Writer{ - Topic: topic, + Topic: topic, + // Balancer: &MyBalancer{}, Balancer: &kafka.LeastBytes{}, // Balancer: &kafka.Murmur2Balancer{}, WriteBackoffMax: time.Millisecond * 100,