diff --git a/internal/authPostman.go b/internal/authPostman.go index 5d2aa60..076504e 100644 --- a/internal/authPostman.go +++ b/internal/authPostman.go @@ -47,21 +47,26 @@ func (s *AuthPostmanServer) ReadMessage(offset int64) error { // for { - m, err := s.kafkaReader.ReadMessage() + // m, err := s.kafkaReader.ReadMessage() + m, err := s.kafkaReader.FetchMessage() if err != nil { - return err + s.logger.Error(err) + + continue } // Декодируем сообщение msg := api.MessageRegistration{} erk := json.Unmarshal(m.Value, &msg) if erk != nil { - return erk + s.logger.Error(erk) + + continue } // Проверим пришедшее сообщение, не является ли оно повтором if _, ok := s.msgs[msg.ID]; ok { - s.logger.Printf("the message #%d is a duplicate", msg.ID) + s.logger.Warnf("the message #%d is a duplicate", msg.ID) continue } else { @@ -82,9 +87,17 @@ func (s *AuthPostmanServer) ReadMessage(offset int64) error { message.AppendRecipient(msg.Email) ers := postman.Send(message) if ers != nil { - s.logger.Print(ers) + s.logger.Error(ers) + + continue } else { s.logger.Printf("send code %s to %s (message #%d) completed", msg.Code, msg.Email, msg.ID) } + + // Подтвердим сообщение после успешной обработки + erf := s.kafkaReader.CommitMessage(m) + if erf != nil { + s.logger.Error(erf) + } } } diff --git a/internal/transport/kafka/reader.go b/internal/transport/kafka/reader.go index 5fc168a..4a86881 100644 --- a/internal/transport/kafka/reader.go +++ b/internal/transport/kafka/reader.go @@ -43,3 +43,11 @@ func (s *KafkaReader) Close() error { func (s *KafkaReader) ReadMessage() (kafka.Message, error) { return s.reader.ReadMessage(s.ctx) } + +func (s *KafkaReader) FetchMessage() (kafka.Message, error) { + return s.reader.FetchMessage(s.ctx) +} + +func (s *KafkaReader) CommitMessage(message kafka.Message) error { + return s.reader.CommitMessages(s.ctx, message) +}