From f6475a808b67b5a2ad9e316348294c6056c4fb3f Mon Sep 17 00:00:00 2001 From: slaventius Date: Tue, 14 Mar 2023 15:32:47 +0300 Subject: [PATCH] * --- internal/transport/kafka/writer.go | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/internal/transport/kafka/writer.go b/internal/transport/kafka/writer.go index 27900cb..42a45e4 100644 --- a/internal/transport/kafka/writer.go +++ b/internal/transport/kafka/writer.go @@ -2,6 +2,8 @@ package kafka import ( "context" + "net" + "strconv" "time" logger "git.slaventius.ru/test3k/umate/pkg/logger" @@ -45,19 +47,18 @@ func (s *KafkaWriter) createTopic() error { } defer conn.Close() - // // - // controller, era := conn.Controller() - // if era != nil { - // return era - // } + // Определим ведущего реплики раздела + controller, era := conn.Controller() + if era != nil { + return era + } // - controllerConn := conn - // controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) - // if eru != nil { - // return eru - // } - // defer controllerConn.Close() + controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if eru != nil { + return eru + } + defer controllerConn.Close() // topicConfigs := []kafka.TopicConfig{