From 0bb0a4864e8404f59d07ae56926ed80992f07068 Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 16:10:08 +0300 Subject: [PATCH] * --- internal/transport/grpc/grpc.go | 23 ++++++++++++++++++----- internal/transport/kafka/kafka_writer.go | 6 +++--- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 1640af3..0b39243 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -2,6 +2,7 @@ package grpc import ( "context" + "encoding/json" "errors" "log" "net" @@ -17,13 +18,17 @@ const ( topicRegistrations string = "registrations" // Топик для регистраций ) +type msg struct { + Code string + Email string +} + type user struct { ID int32 - Code string Login string Password string - Email string Confirmed bool + msg } type AuthDBServer struct { @@ -91,16 +96,24 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe // user := &user{ ID: s.id, - Code: code, Login: req.GetLogin(), Password: code, // TODO - Email: req.GetEmail(), Confirmed: false, + msg: msg{ + Code: code, + Email: req.GetEmail(), + }, } s.users[req.Login] = user // TODO - err := s.kafkaWriter.WriteMessage(user.Login, user.Email) + value, eru := json.Marshal(user.msg) + if eru != nil { + return nil, eru + } + + // + err := s.kafkaWriter.WriteMessage([]byte(user.Login), value) if err != nil { log.Print(err) diff --git a/internal/transport/kafka/kafka_writer.go b/internal/transport/kafka/kafka_writer.go index c0690e9..83a307a 100644 --- a/internal/transport/kafka/kafka_writer.go +++ b/internal/transport/kafka/kafka_writer.go @@ -116,9 +116,9 @@ func (s *KafkaWriter) checkTopic() error { return nil } -func (s *KafkaWriter) WriteMessage(key string, value string) error { +func (s *KafkaWriter) WriteMessage(key []byte, value []byte) error { return s.writer.WriteMessages(s.ctx, kafka.Message{ - Key: []byte(key), - Value: []byte(value), + Key: key, + Value: value, }) }