package authPostman import ( "context" "encoding/json" "fmt" "net" "strconv" "time" "git.slaventius.ru/test3k/authPostman/internal/config" kafka "git.slaventius.ru/test3k/authPostman/internal/transport/kafka" // telegram "git.slaventius.ru/test3k/authPostman/internal/transport/telegram" smtp "git.slaventius.ru/test3k/authPostman/internal/transport/smtp" api "git.slaventius.ru/test3k/umate/pkg/kafka" logger "git.slaventius.ru/test3k/umate/pkg/logger" ) type AuthPostmanServer struct { ctx context.Context kafkaReader *kafka.KafkaReader logger *logger.Logger config *config.Config msgs map[int64]time.Time age time.Duration } func NewServer(ctx context.Context, config *config.Config) *AuthPostmanServer { logger := logger.NewLogger("test3k:authPostmanService", config.Sentry.DSN) address := []string{net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))} return &AuthPostmanServer{ ctx: ctx, kafkaReader: kafka.NewReader(ctx, config, api.TopicRegistrations, address...), logger: logger, config: config, msgs: make(map[int64]time.Time), age: time.Minute * 5, } } func (s *AuthPostmanServer) GracefulStop() error { return s.kafkaReader.Close() } func (s *AuthPostmanServer) ReadMessage(offset int64) error { // ... // s.kafkaReader.SetOffset(offset) // for { // m, err := s.kafkaReader.ReadMessage() m, err := s.kafkaReader.FetchMessage() if err != nil { s.logger.Error(err) continue } // Декодируем сообщение msg := api.MessageRegistration{} erk := json.Unmarshal(m.Value, &msg) if erk != nil { s.logger.Error(erk) continue } // Проверим пришедшее сообщение, не является ли оно повтором if _, ok := s.msgs[msg.ID]; ok { s.logger.Warnf("the message #%d is a duplicate", msg.ID) continue } else { s.msgs[msg.ID] = time.Now() } // s.logger.Printf("send code %s to %s (message #%d) ...", msg.Code, msg.Email, msg.ID) // text := fmt.Sprintf("Confirmation code %v", msg.Code) // postman := telegram.NewService(s.config.Telegram.ChatToken) // message := postman.NewMessage(s.config.Telegram.ChatID, text) // ers := postman.SendMessage(message) postman := smtp.NewService(s.config.Smtp.Host, s.config.Smtp.Port, s.config.Smtp.Sender, s.config.Smtp.Password) message := smtp.NewMessage("Confirmation code", text) message.AppendRecipient(msg.Email) ers := postman.Send(message) if ers != nil { s.logger.Error(ers) continue } else { s.logger.Printf("send code %s to %s (message #%d) completed", msg.Code, msg.Email, msg.ID) } // Подтвердим сообщение после успешной обработки erf := s.kafkaReader.CommitMessage(m) if erf != nil { s.logger.Error(erf) } // Удалим идентификаторы устаревших сообщений no := time.Now() for k, v := range s.msgs { if s.age < no.Sub(v) { delete(s.msgs, k) } } } }