You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
124 lines
2.1 KiB
124 lines
2.1 KiB
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"net"
|
|
"strconv"
|
|
|
|
"github.com/segmentio/kafka-go"
|
|
)
|
|
|
|
type KafkaWriter struct {
|
|
ctx context.Context
|
|
writer *kafka.Writer
|
|
first string
|
|
topic string
|
|
}
|
|
|
|
func NewWriter(ctx context.Context, topic string, address ...string) *KafkaWriter {
|
|
s := &KafkaWriter{
|
|
ctx: ctx,
|
|
writer: &kafka.Writer{
|
|
Topic: topic,
|
|
Balancer: &kafka.LeastBytes{},
|
|
Addr: kafka.TCP(address...),
|
|
},
|
|
first: address[0],
|
|
topic: topic,
|
|
}
|
|
|
|
// Приверим и при необходимости создадим топик
|
|
era := s.checkTopic()
|
|
if era != nil {
|
|
log.Fatal(era)
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *KafkaWriter) Close() error {
|
|
return s.writer.Close()
|
|
}
|
|
|
|
func (s *KafkaWriter) fetchTopics() (map[string]bool, error) {
|
|
conn, err := kafka.Dial("tcp", s.first)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer conn.Close()
|
|
|
|
//
|
|
partitions, erp := conn.ReadPartitions()
|
|
if erp != nil {
|
|
return nil, erp
|
|
}
|
|
|
|
//
|
|
topics := make(map[string]bool)
|
|
for _, p := range partitions {
|
|
topics[p.Topic] = true
|
|
}
|
|
|
|
return topics, nil
|
|
}
|
|
|
|
func (s *KafkaWriter) createTopic() error {
|
|
conn, err := kafka.Dial("tcp", s.first)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Close()
|
|
|
|
//
|
|
controller, era := conn.Controller()
|
|
if era != nil {
|
|
return era
|
|
}
|
|
|
|
//
|
|
controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
|
|
if eru != nil {
|
|
return eru
|
|
}
|
|
defer controllerConn.Close()
|
|
|
|
//
|
|
topicConfigs := []kafka.TopicConfig{
|
|
{
|
|
Topic: s.topic,
|
|
NumPartitions: 1,
|
|
ReplicationFactor: 1,
|
|
},
|
|
}
|
|
|
|
return controllerConn.CreateTopics(topicConfigs...)
|
|
}
|
|
|
|
func (s *KafkaWriter) checkTopic() error {
|
|
topics, err := s.fetchTopics()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Если топика нет, то создадим
|
|
if _, ok := topics[s.topic]; !ok {
|
|
era := s.createTopic()
|
|
if era != nil {
|
|
return era
|
|
}
|
|
|
|
log.Printf("create topic %q\n", s.topic)
|
|
|
|
return era
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *KafkaWriter) WriteMessage(key []byte, value []byte) error {
|
|
return s.writer.WriteMessages(s.ctx, kafka.Message{
|
|
Key: key,
|
|
Value: value,
|
|
})
|
|
}
|
|
|