package kafka import ( "context" "log" "time" "github.com/segmentio/kafka-go" ) type KafkaWriter struct { ctx context.Context writer *kafka.Writer first string topic string } func (s *KafkaWriter) fetchTopics() (map[string]bool, error) { conn, err := kafka.Dial("tcp", s.first) if err != nil { return nil, err } defer conn.Close() // partitions, erp := conn.ReadPartitions() if erp != nil { return nil, erp } // topics := make(map[string]bool) for _, p := range partitions { topics[p.Topic] = true } return topics, nil } func (s *KafkaWriter) createTopic() error { conn, err := kafka.Dial("tcp", s.first) if err != nil { return err } defer conn.Close() // // // controller, era := conn.Controller() // if era != nil { // return era // } // // // controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) // if eru != nil { // return eru // } // defer controllerConn.Close() // topicConfigs := []kafka.TopicConfig{ { Topic: s.topic, NumPartitions: 1, ReplicationFactor: 1, }, } return conn.CreateTopics(topicConfigs...) } func (s *KafkaWriter) checkTopic() error { topics, err := s.fetchTopics() if err != nil { return err } // Если топика нет, то создадим if _, ok := topics[s.topic]; !ok { era := s.createTopic() if era != nil { return era } log.Printf("create topic %q\n", s.topic) return era } return nil } func NewWriter(ctx context.Context, topic string, address ...string) *KafkaWriter { s := &KafkaWriter{ ctx: ctx, writer: &kafka.Writer{ Topic: topic, Balancer: &kafka.LeastBytes{}, WriteBackoffMax: time.Millisecond * 100, BatchTimeout: time.Millisecond * 100, Addr: kafka.TCP(address...), }, first: address[0], topic: topic, } // Проверим и при необходимости создадим топик era := s.checkTopic() if era != nil { log.Fatal(era) } return s } func (s *KafkaWriter) Close() error { return s.writer.Close() } func (s *KafkaWriter) WriteMessage(key []byte, value []byte) error { return s.writer.WriteMessages(s.ctx, kafka.Message{ Key: key, Value: value, }) }