diff --git a/cmd/main.go b/cmd/main.go index 6f2a74b..07ab14f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -25,7 +25,7 @@ func main() { signalChannel := make(chan os.Signal, 1) signal.Notify(signalChannel, syscall.SIGINT) signal.Notify(signalChannel, syscall.SIGTERM) - defer stop(signalChannel, s) + defer stop(signalChannel, s, srv) // Запуск сервера go start(config, s, srv) @@ -42,8 +42,9 @@ func main() { } // Остановка сервера -func stop(signalChannel chan os.Signal, s *grpc.Server) { +func stop(signalChannel chan os.Signal, s *grpc.Server, srv *server.AuthDBServer) { defer s.GracefulStop() + defer srv.GracefulStop() defer signal.Stop(signalChannel) log.Println("authDBServer stopping ...") diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 0e92b90..d5c352e 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -3,11 +3,13 @@ package grpc import ( "context" "errors" + "log" + "net" "strconv" "sync" + kafka "test3k/authDB/internal/transport/kafka" api "test3k/authDB/pkg/api" "time" - // kafka "github.com/segmentio/kafka-go" ) type user struct { @@ -20,20 +22,26 @@ type user struct { } type AuthDBServer struct { - mu sync.Mutex - users map[string]*user + mu sync.Mutex + users map[string]*user + kafkaWriter *kafka.KafkaWriter api.UnimplementedAuthDBServer id int32 } func NewServer() *AuthDBServer { return &AuthDBServer{ - mu: sync.Mutex{}, - users: make(map[string]*user), - id: 0, + mu: sync.Mutex{}, + users: make(map[string]*user), + kafkaWriter: kafka.NewWriter("registrations", net.JoinHostPort("localhost", "9094")), + id: 0, } } +func (s *AuthDBServer) GracefulStop() error { + return s.kafkaWriter.Close() +} + func (s *AuthDBServer) Login(ctx context.Context, req *api.LoginRequest) (*api.LoginResponse, error) { s.mu.Lock() defer s.mu.Unlock() @@ -63,10 +71,10 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe s.mu.Lock() defer s.mu.Unlock() - // - if _, ok := s.users[req.GetLogin()]; ok { - return nil, errors.New("login already registered") - } + // // + // if _, ok := s.users[req.GetLogin()]; ok { + // return nil, errors.New("login already registered") + // } // s.id = s.id + 1 @@ -74,7 +82,7 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe code := strconv.Itoa(unique) // - s.users[req.Login] = &user{ + user := &user{ ID: s.id, Code: code, Login: req.GetLogin(), @@ -82,29 +90,15 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe Email: req.GetEmail(), Confirmed: false, } + s.users[req.Login] = user - // // - // consumer := kafka.NewReader(kafka.ReaderConfig{ - // Brokers: []string{"localhost:9092"}, - // Topic: "topic-A", - // Partition: 0, - // MinBytes: 10e3, // 10KB - // MaxBytes: 10e6, // 10MB - // }) - // defer consumer.Close() + // TODO + err := s.kafkaWriter.WriteMessage(user.Login, user.Email) + if err != nil { + log.Println("kafka write ", err) - // // - // // consumer.SetOffset(42) - - // // - // for { - // m, err := consumer.ReadMessage(ctx) - // if err != nil { - // log.Fatal(err) - // } - - // fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) - // } + return nil, err + } return &api.RegistrationResponse{ Code: code, diff --git a/internal/transport/kafka/kafka_reader.go b/internal/transport/kafka/kafka_reader.go new file mode 100644 index 0000000..2922541 --- /dev/null +++ b/internal/transport/kafka/kafka_reader.go @@ -0,0 +1,27 @@ +package kafka + +import ( + "github.com/segmentio/kafka-go" +) + +type KafkaReader struct { + reader *kafka.Reader +} + +func NewReader(topic string, address ...string) *KafkaReader { + return &KafkaReader{ + // reader: &kafka.Reader{ + // Topic: topic, + // Balancer: &kafka.LeastBytes{}, + // Addr: kafka.TCP(address...), + // }, + } +} + +func (s *KafkaReader) Close() error { + return s.reader.Close() +} + +func (s *KafkaReader) ReadMessage(key string, value string) error { + return nil +} diff --git a/internal/transport/kafka/kafka_writer.go b/internal/transport/kafka/kafka_writer.go new file mode 100644 index 0000000..d4543c7 --- /dev/null +++ b/internal/transport/kafka/kafka_writer.go @@ -0,0 +1,117 @@ +package kafka + +import ( + "context" + "log" + "net" + "strconv" + + "github.com/segmentio/kafka-go" +) + +type KafkaWriter struct { + writer *kafka.Writer + first string + topic string +} + +func NewWriter(topic string, address ...string) *KafkaWriter { + s := &KafkaWriter{ + writer: &kafka.Writer{ + Topic: topic, + Balancer: &kafka.LeastBytes{}, + Addr: kafka.TCP(address...), + }, + first: address[0], + topic: topic, + } + + // Приверим и при необходимости создадим топик + era := s.checkTopic() + if era != nil { + log.Fatal(era) + } + + return s +} + +func (s *KafkaWriter) Close() error { + return s.writer.Close() +} + +func (s *KafkaWriter) fetchTopics() (map[string]bool, error) { + conn, err := kafka.Dial("tcp", s.first) + if err != nil { + return nil, err + } + defer conn.Close() + + // + partitions, erp := conn.ReadPartitions() + if erp != nil { + return nil, erp + } + + // + topics := make(map[string]bool) + for _, p := range partitions { + topics[p.Topic] = true + } + + return topics, nil +} + +func (s *KafkaWriter) createTopic() error { + conn, err := kafka.Dial("tcp", s.first) + if err != nil { + return err + } + defer conn.Close() + + // + controller, era := conn.Controller() + if era != nil { + return era + } + + // + controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if eru != nil { + return eru + } + defer controllerConn.Close() + + // + topicConfigs := []kafka.TopicConfig{ + { + Topic: s.topic, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + + return controllerConn.CreateTopics(topicConfigs...) +} + +func (s *KafkaWriter) checkTopic() error { + topics, err := s.fetchTopics() + if err != nil { + return err + } + + // Если топика нет, то создадим + if _, ok := topics[s.topic]; !ok { + return s.createTopic() + } + + return nil +} + +func (s *KafkaWriter) WriteMessage(key string, value string) error { + err := s.writer.WriteMessages(context.Background(), kafka.Message{ + Key: []byte(key), + Value: []byte(value), + }) + + return err +}