|
|
|
@ -32,6 +32,7 @@ type AuthDBServer struct { |
|
|
|
|
repo *repo.CustomerRepository |
|
|
|
|
api.UnimplementedAuthDBServer |
|
|
|
|
api.UnimplementedHealthServer |
|
|
|
|
counter int64 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewServer(ctx context.Context, config *config.Config) *AuthDBServer { |
|
|
|
@ -54,15 +55,30 @@ func NewServer(ctx context.Context, config *config.Config) *AuthDBServer { |
|
|
|
|
kafkaWriter: kafka.NewWriter(ctx, logger, apiKafka.TopicRegistrations, net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))), |
|
|
|
|
config: config, |
|
|
|
|
logger: logger, |
|
|
|
|
|
|
|
|
|
// С каждым стартом сервиса значение счетчика повторяться не будет
|
|
|
|
|
counter: time.Now().UnixNano(), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (r *AuthDBServer) getMD5Hash(text string) string { |
|
|
|
|
func (s *AuthDBServer) getMD5Hash(text string) string { |
|
|
|
|
hash := md5.Sum([]byte(text)) |
|
|
|
|
|
|
|
|
|
return hex.EncodeToString(hash[:]) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *AuthDBServer) genMessage(customer repo.Customer) apiKafka.MessageRegistration { |
|
|
|
|
// Увеличим счетчик сообщений на случай повторной отправки пакета продьюсером
|
|
|
|
|
// На стороне получателя уникальность этого значение будет контролироваться
|
|
|
|
|
s.counter = s.counter + 1 |
|
|
|
|
|
|
|
|
|
return apiKafka.MessageRegistration{ |
|
|
|
|
ID: s.counter, |
|
|
|
|
Code: customer.Code, |
|
|
|
|
Email: customer.Email, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *AuthDBServer) GracefulStop() error { |
|
|
|
|
return s.kafkaWriter.Close() |
|
|
|
|
} |
|
|
|
@ -122,13 +138,14 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TODO
|
|
|
|
|
value, era := json.Marshal(customer.MessageRegistration) |
|
|
|
|
msg := s.genMessage(customer) |
|
|
|
|
value, era := json.Marshal(msg) |
|
|
|
|
if era != nil { |
|
|
|
|
return nil, era |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
s.logger.Printf("publication code %s to %s ...", customer.MessageRegistration.Code, customer.MessageRegistration.Email) |
|
|
|
|
s.logger.Printf("publication code %s to %s (message #%d) ...", msg.Code, msg.Email, msg.ID) |
|
|
|
|
|
|
|
|
|
// Отправим уведомление о необходимости подтверждения
|
|
|
|
|
err := s.kafkaWriter.WriteMessage([]byte(customer.Login), value) |
|
|
|
@ -139,11 +156,11 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
//
|
|
|
|
|
s.logger.Printf("publication code %s to %s completed", customer.MessageRegistration.Code, customer.MessageRegistration.Email) |
|
|
|
|
s.logger.Printf("publication code %s to %s (message #%d) completed", msg.Code, msg.Email, msg.ID) |
|
|
|
|
|
|
|
|
|
return &api.RegistrationResponse{ |
|
|
|
|
Code: customer.MessageRegistration.Code, |
|
|
|
|
Email: customer.MessageRegistration.Email, |
|
|
|
|
Code: msg.Code, |
|
|
|
|
Email: msg.Email, |
|
|
|
|
}, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|