You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
129 lines
2.4 KiB
129 lines
2.4 KiB
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
logger "git.slaventius.ru/test3k/umate/pkg/logger"
|
|
"github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
type KafkaWriter struct {
|
|
ctx context.Context
|
|
writer *kafka.Writer
|
|
logger *logger.Logger
|
|
first string
|
|
topic string
|
|
}
|
|
|
|
func (s *KafkaWriter) fetchTopics() (map[string]bool, error) {
|
|
conn, err := kafka.Dial("tcp", s.first)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer conn.Close()
|
|
|
|
//
|
|
partitions, erp := conn.ReadPartitions()
|
|
if erp != nil {
|
|
return nil, erp
|
|
}
|
|
|
|
//
|
|
topics := make(map[string]bool)
|
|
for _, p := range partitions {
|
|
topics[p.Topic] = true
|
|
}
|
|
|
|
return topics, nil
|
|
}
|
|
|
|
func (s *KafkaWriter) createTopic() error {
|
|
conn, err := kafka.Dial("tcp", s.first)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
|
|
// //
|
|
// controller, era := conn.Controller()
|
|
// if era != nil {
|
|
// return era
|
|
// }
|
|
|
|
// //
|
|
// controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
|
|
// if eru != nil {
|
|
// return eru
|
|
// }
|
|
// defer controllerConn.Close()
|
|
|
|
//
|
|
topicConfigs := []kafka.TopicConfig{
|
|
{
|
|
Topic: s.topic,
|
|
NumPartitions: 1,
|
|
ReplicationFactor: 1,
|
|
},
|
|
}
|
|
|
|
return conn.CreateTopics(topicConfigs...)
|
|
}
|
|
|
|
func (s *KafkaWriter) checkTopic() error {
|
|
topics, err := s.fetchTopics()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Если топика нет, то создадим
|
|
if _, ok := topics[s.topic]; !ok {
|
|
era := s.createTopic()
|
|
if era != nil {
|
|
return era
|
|
}
|
|
|
|
s.logger.Printf("create topic %q\n", s.topic)
|
|
|
|
return era
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func NewWriter(ctx context.Context, logger *logger.Logger, topic string, address ...string) *KafkaWriter {
|
|
s := &KafkaWriter{
|
|
ctx: ctx,
|
|
writer: &kafka.Writer{
|
|
Topic: topic,
|
|
// Balancer: &MyBalancer{},
|
|
Balancer: &kafka.LeastBytes{},
|
|
// Balancer: &kafka.Murmur2Balancer{},
|
|
WriteBackoffMax: time.Millisecond * 100,
|
|
BatchTimeout: time.Millisecond * 100,
|
|
Addr: kafka.TCP(address...),
|
|
},
|
|
logger: logger,
|
|
first: address[0],
|
|
topic: topic,
|
|
}
|
|
|
|
// Проверим и при необходимости создадим топик
|
|
era := s.checkTopic()
|
|
if era != nil {
|
|
logger.Fatal(era)
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *KafkaWriter) Close() error {
|
|
return s.writer.Close()
|
|
}
|
|
|
|
func (s *KafkaWriter) WriteMessage(key []byte, value []byte) error {
|
|
return s.writer.WriteMessages(s.ctx, kafka.Message{
|
|
Key: key,
|
|
Value: value,
|
|
})
|
|
}
|
|
|