You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

124 lines
2.1 KiB

package kafka
import (
"context"
"log"
"net"
"strconv"
"github.com/segmentio/kafka-go"
)
type KafkaWriter struct {
ctx context.Context
writer *kafka.Writer
first string
topic string
}
func NewWriter(ctx context.Context, topic string, address ...string) *KafkaWriter {
s := &KafkaWriter{
ctx: ctx,
writer: &kafka.Writer{
Topic: topic,
Balancer: &kafka.LeastBytes{},
Addr: kafka.TCP(address...),
},
first: address[0],
topic: topic,
}
// Приверим и при необходимости создадим топик
era := s.checkTopic()
if era != nil {
log.Fatal(era)
}
return s
}
func (s *KafkaWriter) Close() error {
return s.writer.Close()
}
func (s *KafkaWriter) fetchTopics() (map[string]bool, error) {
conn, err := kafka.Dial("tcp", s.first)
if err != nil {
return nil, err
}
defer conn.Close()
//
partitions, erp := conn.ReadPartitions()
if erp != nil {
return nil, erp
}
//
topics := make(map[string]bool)
for _, p := range partitions {
topics[p.Topic] = true
}
return topics, nil
}
func (s *KafkaWriter) createTopic() error {
conn, err := kafka.Dial("tcp", s.first)
if err != nil {
return err
}
defer conn.Close()
//
controller, era := conn.Controller()
if era != nil {
return era
}
//
controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if eru != nil {
return eru
}
defer controllerConn.Close()
//
topicConfigs := []kafka.TopicConfig{
{
Topic: s.topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
return controllerConn.CreateTopics(topicConfigs...)
}
func (s *KafkaWriter) checkTopic() error {
topics, err := s.fetchTopics()
if err != nil {
return err
}
// Если топика нет, то создадим
if _, ok := topics[s.topic]; !ok {
era := s.createTopic()
if era != nil {
return era
}
log.Printf("create topic %q\n", s.topic)
return era
}
return nil
}
func (s *KafkaWriter) WriteMessage(key string, value string) error {
return s.writer.WriteMessages(s.ctx, kafka.Message{
Key: []byte(key),
Value: []byte(value),
})
}