package kafka import ( "context" "fmt" "time" "git.slaventius.ru/test3k/authPostman/internal/config" "github.com/segmentio/kafka-go" ) type KafkaReader struct { ctx context.Context config *config.Config reader *kafka.Reader first string topic string } func NewReader(ctx context.Context, config *config.Config, topic string, address ...string) *KafkaReader { return &KafkaReader{ ctx: ctx, config: config, reader: kafka.NewReader(kafka.ReaderConfig{ Topic: topic, Brokers: address, GroupID: fmt.Sprintf("consumer-group-%s", topic), // Partition: 0, // TODO MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB ReadBackoffMax: time.Millisecond * 100, }), first: address[0], topic: topic, } } func (s *KafkaReader) Close() error { return s.reader.Close() } func (s *KafkaReader) ReadMessage() (kafka.Message, error) { return s.reader.ReadMessage(s.ctx) } func (s *KafkaReader) FetchMessage() (kafka.Message, error) { return s.reader.FetchMessage(s.ctx) } func (s *KafkaReader) CommitMessage(message kafka.Message) error { return s.reader.CommitMessages(s.ctx, message) }