diff --git a/internal/transport/kafka/writer.go b/internal/transport/kafka/writer.go index 27900cb..42a45e4 100644 --- a/internal/transport/kafka/writer.go +++ b/internal/transport/kafka/writer.go @@ -2,6 +2,8 @@ package kafka import ( "context" + "net" + "strconv" "time" logger "git.slaventius.ru/test3k/umate/pkg/logger" @@ -45,19 +47,18 @@ func (s *KafkaWriter) createTopic() error { } defer conn.Close() - // // - // controller, era := conn.Controller() - // if era != nil { - // return era - // } + // Определим ведущего реплики раздела + controller, era := conn.Controller() + if era != nil { + return era + } // - controllerConn := conn - // controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) - // if eru != nil { - // return eru - // } - // defer controllerConn.Close() + controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if eru != nil { + return eru + } + defer controllerConn.Close() // topicConfigs := []kafka.TopicConfig{