package postman import ( "context" "encoding/json" "log" "net" "strconv" "test3k/authPostman/internal/config" // smtp "test3k/authPostman/internal/smtp" "github.com/segmentio/kafka-go" ) type msg struct { Code string Email string } type AuthPostmanServer struct { ctx context.Context config *config.Config kafkaReader *kafka.Reader } func NewServer(ctx context.Context, config *config.Config, topic string) *AuthPostmanServer { return &AuthPostmanServer{ ctx: ctx, config: config, kafkaReader: kafka.NewReader(kafka.ReaderConfig{ Topic: topic, Brokers: []string{net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))}, // GroupID: fmt.Sprintf("consumer-group-%d", config.Kafka.Partition), Partition: config.Kafka.Partition, MinBytes: 10e3, // 10KB MaxBytes: 10e6, // 10MB }), } } func (s *AuthPostmanServer) GracefulStop() error { return s.kafkaReader.Close() } func (s *AuthPostmanServer) ReadMessage(offset int64) error { // ... s.kafkaReader.SetOffset(offset) // for { m, err := s.kafkaReader.ReadMessage(s.ctx) if err != nil { return err } // Декодируем сообщение amsg := msg{} erk := json.Unmarshal(m.Value, &amsg) if erk != nil { return erk } // log.Printf("send code %s to %s ...", amsg.Code, amsg.Email) // log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) // // // message := smtp.NewMessage("Confirmation code", amsg.Code) // message.AppendRecipient(amsg.Email) // // // smtpSender := smtp.NewService(s.config.Smtp.Host, s.config.Smtp.Port, s.config.Smtp.Sender, s.config.Smtp.Password) // ers := smtpSender.Send(message) // if ers != nil { // log.Print(ers) // } log.Printf("send code %s to %s completed", amsg.Code, amsg.Email) } }