From f65f378dd935d45d8da9b16798591e961ef6942e Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 15 Mar 2023 11:38:34 +0300 Subject: [PATCH] * --- internal/authPostman.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/internal/authPostman.go b/internal/authPostman.go index 0c8b331..5d2aa60 100644 --- a/internal/authPostman.go +++ b/internal/authPostman.go @@ -21,6 +21,7 @@ type AuthPostmanServer struct { kafkaReader *kafka.KafkaReader logger *logger.Logger config *config.Config + msgs map[int64]bool } func NewServer(ctx context.Context, config *config.Config) *AuthPostmanServer { @@ -32,6 +33,7 @@ func NewServer(ctx context.Context, config *config.Config) *AuthPostmanServer { kafkaReader: kafka.NewReader(ctx, config, api.TopicRegistrations, address...), logger: logger, config: config, + msgs: make(map[int64]bool), } } @@ -57,8 +59,17 @@ func (s *AuthPostmanServer) ReadMessage(offset int64) error { return erk } + // Проверим пришедшее сообщение, не является ли оно повтором + if _, ok := s.msgs[msg.ID]; ok { + s.logger.Printf("the message #%d is a duplicate", msg.ID) + + continue + } else { + s.msgs[msg.ID] = true + } + // - s.logger.Printf("send code %s to %s ...", msg.Code, msg.Email) + s.logger.Printf("send code %s to %s (message #%d) ...", msg.Code, msg.Email, msg.ID) // text := fmt.Sprintf("Confirmation code %v", msg.Code) @@ -73,7 +84,7 @@ func (s *AuthPostmanServer) ReadMessage(offset int64) error { if ers != nil { s.logger.Print(ers) } else { - s.logger.Printf("send code %s to %s completed", msg.Code, msg.Email) + s.logger.Printf("send code %s to %s (message #%d) completed", msg.Code, msg.Email, msg.ID) } } }