From cb6a45544b1d3f41eaa64af52917d764c8e9e423 Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 13:21:27 +0300 Subject: [PATCH] * --- cmd/main.go | 2 +- internal/transport/grpc/grpc.go | 8 +++++--- internal/transport/kafka/kafka_reader.go | 6 +++++- internal/transport/kafka/kafka_writer.go | 6 ++++-- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 7e45352..9df6f32 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,7 +18,7 @@ import ( func main() { config := config.NewConfig() ctx, ctxCancel := context.WithCancel(context.Background()) - srv := server.NewServer(config) + srv := server.NewServer(ctx, config) s := grpc.NewServer() // diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 8cbf9b2..1640af3 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -31,14 +31,16 @@ type AuthDBServer struct { users map[string]*user kafkaWriter *kafka.KafkaWriter api.UnimplementedAuthDBServer - id int32 + ctx context.Context + id int32 } -func NewServer(config *config.Config) *AuthDBServer { +func NewServer(ctx context.Context, config *config.Config) *AuthDBServer { return &AuthDBServer{ mu: sync.Mutex{}, users: make(map[string]*user), - kafkaWriter: kafka.NewWriter(topicRegistrations, net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))), + kafkaWriter: kafka.NewWriter(ctx, topicRegistrations, net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))), + ctx: ctx, id: 0, } } diff --git a/internal/transport/kafka/kafka_reader.go b/internal/transport/kafka/kafka_reader.go index c290b82..584afa1 100644 --- a/internal/transport/kafka/kafka_reader.go +++ b/internal/transport/kafka/kafka_reader.go @@ -1,17 +1,21 @@ package kafka import ( + "context" + "github.com/segmentio/kafka-go" ) type KafkaReader struct { + ctx context.Context reader *kafka.Reader first string topic string } -func NewReader(topic string, address ...string) *KafkaReader { +func NewReader(ctx context.Context, topic string, address ...string) *KafkaReader { return &KafkaReader{ + ctx: ctx, reader: kafka.NewReader(kafka.ReaderConfig{ Topic: topic, Brokers: address, diff --git a/internal/transport/kafka/kafka_writer.go b/internal/transport/kafka/kafka_writer.go index 2ad9cd0..c0690e9 100644 --- a/internal/transport/kafka/kafka_writer.go +++ b/internal/transport/kafka/kafka_writer.go @@ -10,13 +10,15 @@ import ( ) type KafkaWriter struct { + ctx context.Context writer *kafka.Writer first string topic string } -func NewWriter(topic string, address ...string) *KafkaWriter { +func NewWriter(ctx context.Context, topic string, address ...string) *KafkaWriter { s := &KafkaWriter{ + ctx: ctx, writer: &kafka.Writer{ Topic: topic, Balancer: &kafka.LeastBytes{}, @@ -115,7 +117,7 @@ func (s *KafkaWriter) checkTopic() error { } func (s *KafkaWriter) WriteMessage(key string, value string) error { - return s.writer.WriteMessages(context.Background(), kafka.Message{ + return s.writer.WriteMessages(s.ctx, kafka.Message{ Key: []byte(key), Value: []byte(value), })