From cad80c407ef0eecb375f44789fe0edacfa890fdb Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 15 Mar 2023 12:53:03 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D0=BB=20?= =?UTF-8?q?=D0=BF=D0=BE=D0=B4=D1=82=D0=B2=D0=B5=D1=80=D0=B6=D0=B4=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5=20=D1=81=D0=BE=D0=BE=D0=B1=D1=89=D0=B5=D0=BD?= =?UTF-8?q?=D0=B8=D1=8F=20=D0=BF=D0=BE=D1=81=D0=BB=D0=B5=20=D1=83=D1=81?= =?UTF-8?q?=D0=BF=D0=B5=D1=88=D0=BD=D0=BE=D0=B9=20=D0=BE=D0=B1=D1=80=D0=B0?= =?UTF-8?q?=D0=B1=D0=BE=D1=82=D0=BA=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/authPostman.go | 23 ++++++++++++++++++----- internal/transport/kafka/reader.go | 8 ++++++++ 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/internal/authPostman.go b/internal/authPostman.go index 5d2aa60..076504e 100644 --- a/internal/authPostman.go +++ b/internal/authPostman.go @@ -47,21 +47,26 @@ func (s *AuthPostmanServer) ReadMessage(offset int64) error { // for { - m, err := s.kafkaReader.ReadMessage() + // m, err := s.kafkaReader.ReadMessage() + m, err := s.kafkaReader.FetchMessage() if err != nil { - return err + s.logger.Error(err) + + continue } // Декодируем сообщение msg := api.MessageRegistration{} erk := json.Unmarshal(m.Value, &msg) if erk != nil { - return erk + s.logger.Error(erk) + + continue } // Проверим пришедшее сообщение, не является ли оно повтором if _, ok := s.msgs[msg.ID]; ok { - s.logger.Printf("the message #%d is a duplicate", msg.ID) + s.logger.Warnf("the message #%d is a duplicate", msg.ID) continue } else { @@ -82,9 +87,17 @@ func (s *AuthPostmanServer) ReadMessage(offset int64) error { message.AppendRecipient(msg.Email) ers := postman.Send(message) if ers != nil { - s.logger.Print(ers) + s.logger.Error(ers) + + continue } else { s.logger.Printf("send code %s to %s (message #%d) completed", msg.Code, msg.Email, msg.ID) } + + // Подтвердим сообщение после успешной обработки + erf := s.kafkaReader.CommitMessage(m) + if erf != nil { + s.logger.Error(erf) + } } } diff --git a/internal/transport/kafka/reader.go b/internal/transport/kafka/reader.go index 5fc168a..4a86881 100644 --- a/internal/transport/kafka/reader.go +++ b/internal/transport/kafka/reader.go @@ -43,3 +43,11 @@ func (s *KafkaReader) Close() error { func (s *KafkaReader) ReadMessage() (kafka.Message, error) { return s.reader.ReadMessage(s.ctx) } + +func (s *KafkaReader) FetchMessage() (kafka.Message, error) { + return s.reader.FetchMessage(s.ctx) +} + +func (s *KafkaReader) CommitMessage(message kafka.Message) error { + return s.reader.CommitMessages(s.ctx, message) +}