You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

128 lines
2.4 KiB

package kafka
import (
"context"
"time"
logger "git.slaventius.ru/test3k/umate/pkg/logger"
"github.com/segmentio/kafka-go"
)
type KafkaWriter struct {
ctx context.Context
writer *kafka.Writer
logger *logger.Logger
first string
topic string
}
func (s *KafkaWriter) fetchTopics() (map[string]bool, error) {
conn, err := kafka.Dial("tcp", s.first)
if err != nil {
return nil, err
}
defer conn.Close()
//
partitions, erp := conn.ReadPartitions()
if erp != nil {
return nil, erp
}
//
topics := make(map[string]bool)
for _, p := range partitions {
topics[p.Topic] = true
}
return topics, nil
}
func (s *KafkaWriter) createTopic() error {
conn, err := kafka.Dial("tcp", s.first)
if err != nil {
return err
}
defer conn.Close()
// //
// controller, era := conn.Controller()
// if era != nil {
// return era
// }
// //
// controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
// if eru != nil {
// return eru
// }
// defer controllerConn.Close()
//
topicConfigs := []kafka.TopicConfig{
{
Topic: s.topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
return conn.CreateTopics(topicConfigs...)
}
func (s *KafkaWriter) checkTopic() error {
topics, err := s.fetchTopics()
if err != nil {
return err
}
// Если топика нет, то создадим
if _, ok := topics[s.topic]; !ok {
era := s.createTopic()
if era != nil {
return era
}
s.logger.Printf("create topic %q\n", s.topic)
return era
}
return nil
}
func NewWriter(ctx context.Context, logger *logger.Logger, topic string, address ...string) *KafkaWriter {
s := &KafkaWriter{
ctx: ctx,
writer: &kafka.Writer{
Topic: topic,
Balancer: &kafka.LeastBytes{},
// Balancer: &kafka.Murmur2Balancer{},
WriteBackoffMax: time.Millisecond * 100,
BatchTimeout: time.Millisecond * 100,
Addr: kafka.TCP(address...),
},
logger: logger,
first: address[0],
topic: topic,
}
// Проверим и при необходимости создадим топик
era := s.checkTopic()
if era != nil {
logger.Fatal(era)
}
return s
}
func (s *KafkaWriter) Close() error {
return s.writer.Close()
}
func (s *KafkaWriter) WriteMessage(key []byte, value []byte) error {
return s.writer.WriteMessages(s.ctx, kafka.Message{
Key: key,
Value: value,
})
}