|
|
|
@ -2,15 +2,16 @@ package kafka |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
|
"log" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
|
logger "git.slaventius.ru/test3k/umate/pkg/logger" |
|
|
|
|
"github.com/segmentio/kafka-go" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type KafkaWriter struct { |
|
|
|
|
ctx context.Context |
|
|
|
|
writer *kafka.Writer |
|
|
|
|
logger *logger.Logger |
|
|
|
|
first string |
|
|
|
|
topic string |
|
|
|
|
} |
|
|
|
@ -82,7 +83,7 @@ func (s *KafkaWriter) checkTopic() error { |
|
|
|
|
return era |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
log.Printf("create topic %q\n", s.topic) |
|
|
|
|
s.logger.Printf("create topic %q\n", s.topic) |
|
|
|
|
|
|
|
|
|
return era |
|
|
|
|
} |
|
|
|
@ -90,7 +91,7 @@ func (s *KafkaWriter) checkTopic() error { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewWriter(ctx context.Context, topic string, address ...string) *KafkaWriter { |
|
|
|
|
func NewWriter(ctx context.Context, logger *logger.Logger, topic string, address ...string) *KafkaWriter { |
|
|
|
|
s := &KafkaWriter{ |
|
|
|
|
ctx: ctx, |
|
|
|
|
writer: &kafka.Writer{ |
|
|
|
@ -100,14 +101,15 @@ func NewWriter(ctx context.Context, topic string, address ...string) *KafkaWrite |
|
|
|
|
BatchTimeout: time.Millisecond * 100, |
|
|
|
|
Addr: kafka.TCP(address...), |
|
|
|
|
}, |
|
|
|
|
first: address[0], |
|
|
|
|
topic: topic, |
|
|
|
|
logger: logger, |
|
|
|
|
first: address[0], |
|
|
|
|
topic: topic, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Проверим и при необходимости создадим топик
|
|
|
|
|
era := s.checkTopic() |
|
|
|
|
if era != nil { |
|
|
|
|
log.Fatal(era) |
|
|
|
|
logger.Fatal(era) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return s |
|
|
|
|