diff --git a/internal/transport/kafka/kafka_reader.go b/internal/transport/kafka/kafka_reader.go index 2922541..c290b82 100644 --- a/internal/transport/kafka/kafka_reader.go +++ b/internal/transport/kafka/kafka_reader.go @@ -6,15 +6,22 @@ import ( type KafkaReader struct { reader *kafka.Reader + first string + topic string } func NewReader(topic string, address ...string) *KafkaReader { return &KafkaReader{ - // reader: &kafka.Reader{ - // Topic: topic, - // Balancer: &kafka.LeastBytes{}, - // Addr: kafka.TCP(address...), - // }, + reader: kafka.NewReader(kafka.ReaderConfig{ + Topic: topic, + Brokers: address, + GroupID: "consumer-group-id", // TODO + Partition: 0, // TODO + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }), + first: address[0], + topic: topic, } }