diff --git a/.vscode/launch.json b/.vscode/launch.json index 07fdea4..05f4473 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,11 +11,10 @@ "mode": "debug", "program": "${workspaceRoot}/cmd/main.go", "env": { - "DB_HOST": "127.0.0.1", - "DB_PORT":"9055", - "APP_PORT":"9056", + "APP_PORT":"9995", + "KAFKA_PORT":"9092", }, - "args": ["1", "2", "25"] + "args": [] } ] } diff --git a/cmd/main.go b/cmd/main.go index 5052b4c..3298837 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,9 +1,13 @@ package main import ( - "fmt" + "context" "log" "net" + "os" + "os/signal" + "strconv" + "syscall" "test3k/authDB/internal/config" server "test3k/authDB/internal/transport/grpc" api "test3k/authDB/pkg/api" @@ -12,24 +16,57 @@ import ( ) func main() { - s := grpc.NewServer() - srv := server.NewServer() config := config.NewConfig() + ctx, _ := context.WithCancel(context.Background()) + srv := server.NewServer(config) + s := grpc.NewServer() // + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, syscall.SIGINT) + signal.Notify(signalChannel, syscall.SIGTERM) + defer stop(signalChannel, s, srv) + + // Запуск сервера + go start(config, s, srv) + + // + for { + select { + case <-signalChannel: + return + case <-ctx.Done(): + return + } + } +} + +// Остановка сервера +func stop(signalChannel chan os.Signal, s *grpc.Server, srv *server.AuthDBServer) { + defer s.GracefulStop() + defer srv.GracefulStop() + defer signal.Stop(signalChannel) + + log.Println("authDBServer stopping ...") +} + +// Запуск сервера +func start(config *config.Config, s *grpc.Server, srv *server.AuthDBServer) { api.RegisterAuthDBServer(s, srv) // - connStr := fmt.Sprintf(":%d", config.App.Port) + connStr := net.JoinHostPort("", strconv.Itoa(config.App.Port)) lis, era := net.Listen("tcp", connStr) if era != nil { log.Fatal(era) } // - fmt.Printf("authDBService up in %d port\n", config.App.Port) + log.Printf("authDBServer starting (%s)\n", connStr) + + // eru := s.Serve(lis) if eru != nil { - log.Fatal(eru) + log.Fatal("Failed starting server") } } diff --git a/deploy/kafka/docker-compose-local-kafka.yml b/deploy/kafka/docker-compose-local-kafka.yml new file mode 100644 index 0000000..3226e09 --- /dev/null +++ b/deploy/kafka/docker-compose-local-kafka.yml @@ -0,0 +1,26 @@ +version: '2' +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.3.0 + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + broker: + image: confluentinc/cp-kafka:7.3.0 + container_name: broker + ports: + # To learn about configuring Kafka for access across networks see + # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ + - "9092:9092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 diff --git a/internal/config/config.go b/internal/config/config.go index b16aee5..dc5902a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -6,19 +6,19 @@ import ( "github.com/kelseyhightower/envconfig" ) -// type DbConfig struct { -// Host string `envconfig:"DB_HOST"` -// Port int `envconfig:"DB_PORT"` -// } - -type AppConfig struct { +type appConfig struct { Port int `envconfig:"APP_PORT"` } +type kafkaConfig struct { + Host string `envconfig:"KAFKA_HOST"` + Port int `envconfig:"KAFKA_PORT"` +} + // ... type Config struct { - // Db DbConfig - App AppConfig + App appConfig + Kafka kafkaConfig } func NewConfig() *Config { diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 4d6d0f4..a2b4bcb 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -3,11 +3,18 @@ package grpc import ( "context" "errors" + "log" + "net" "strconv" "sync" + "test3k/authDB/internal/config" + kafka "test3k/authDB/internal/transport/kafka" api "test3k/authDB/pkg/api" "time" - // kafka "github.com/segmentio/kafka-go" +) + +const ( + topicRegistrations string = "registrations" // Топик для регистраций ) type user struct { @@ -19,22 +26,28 @@ type user struct { Confirmed bool } -type gRPCServer struct { - mu sync.Mutex - users map[string]*user +type AuthDBServer struct { + mu sync.Mutex + users map[string]*user + kafkaWriter *kafka.KafkaWriter api.UnimplementedAuthDBServer id int32 } -func NewServer() *gRPCServer { - return &gRPCServer{ - mu: sync.Mutex{}, - users: make(map[string]*user), - id: 0, +func NewServer(config *config.Config) *AuthDBServer { + return &AuthDBServer{ + mu: sync.Mutex{}, + users: make(map[string]*user), + kafkaWriter: kafka.NewWriter(topicRegistrations, net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))), + id: 0, } } -func (s *gRPCServer) Login(ctx context.Context, req *api.LoginRequest) (*api.LoginResponse, error) { +func (s *AuthDBServer) GracefulStop() error { + return s.kafkaWriter.Close() +} + +func (s *AuthDBServer) Login(ctx context.Context, req *api.LoginRequest) (*api.LoginResponse, error) { s.mu.Lock() defer s.mu.Unlock() @@ -59,63 +72,46 @@ func (s *gRPCServer) Login(ctx context.Context, req *api.LoginRequest) (*api.Log }, nil } -func (s *gRPCServer) Registration(ctx context.Context, req *api.RegistrationRequest) (*api.RegistrationResponse, error) { +func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRequest) (*api.RegistrationResponse, error) { s.mu.Lock() defer s.mu.Unlock() // - _, ok := s.users[req.GetLogin()] - if ok { + if _, ok := s.users[req.GetLogin()]; ok { return nil, errors.New("login already registered") } // s.id = s.id + 1 + unique := time.Now().Nanosecond() + code := strconv.Itoa(unique) // - id := time.Now().Nanosecond() - code := strconv.Itoa(id / 2) - - // - s.users[req.Login] = &user{ + user := &user{ ID: s.id, Code: code, - Login: req.Login, - Password: strconv.Itoa(id), - Email: req.Email, + Login: req.GetLogin(), + Password: code, // TODO + Email: req.GetEmail(), Confirmed: false, } + s.users[req.Login] = user - // // - // consumer := kafka.NewReader(kafka.ReaderConfig{ - // Brokers: []string{"localhost:9092"}, - // Topic: "topic-A", - // Partition: 0, - // MinBytes: 10e3, // 10KB - // MaxBytes: 10e6, // 10MB - // }) - // defer consumer.Close() - - // // - // // consumer.SetOffset(42) - - // // - // for { - // m, err := consumer.ReadMessage(ctx) - // if err != nil { - // log.Fatal(err) - // } - - // fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) - // } + // TODO + err := s.kafkaWriter.WriteMessage(user.Login, user.Email) + if err != nil { + log.Print(err) + + return nil, err + } return &api.RegistrationResponse{ Code: code, - Email: req.Email, + Email: req.GetEmail(), }, nil } -func (s *gRPCServer) Confirmation(ctx context.Context, req *api.ConfirmationRequest) (*api.ConfirmationResponse, error) { +func (s *AuthDBServer) Confirmation(ctx context.Context, req *api.ConfirmationRequest) (*api.ConfirmationResponse, error) { s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/transport/kafka/kafka_reader.go b/internal/transport/kafka/kafka_reader.go new file mode 100644 index 0000000..c290b82 --- /dev/null +++ b/internal/transport/kafka/kafka_reader.go @@ -0,0 +1,34 @@ +package kafka + +import ( + "github.com/segmentio/kafka-go" +) + +type KafkaReader struct { + reader *kafka.Reader + first string + topic string +} + +func NewReader(topic string, address ...string) *KafkaReader { + return &KafkaReader{ + reader: kafka.NewReader(kafka.ReaderConfig{ + Topic: topic, + Brokers: address, + GroupID: "consumer-group-id", // TODO + Partition: 0, // TODO + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }), + first: address[0], + topic: topic, + } +} + +func (s *KafkaReader) Close() error { + return s.reader.Close() +} + +func (s *KafkaReader) ReadMessage(key string, value string) error { + return nil +} diff --git a/internal/transport/kafka/kafka_writer.go b/internal/transport/kafka/kafka_writer.go new file mode 100644 index 0000000..2ad9cd0 --- /dev/null +++ b/internal/transport/kafka/kafka_writer.go @@ -0,0 +1,122 @@ +package kafka + +import ( + "context" + "log" + "net" + "strconv" + + "github.com/segmentio/kafka-go" +) + +type KafkaWriter struct { + writer *kafka.Writer + first string + topic string +} + +func NewWriter(topic string, address ...string) *KafkaWriter { + s := &KafkaWriter{ + writer: &kafka.Writer{ + Topic: topic, + Balancer: &kafka.LeastBytes{}, + Addr: kafka.TCP(address...), + }, + first: address[0], + topic: topic, + } + + // Приверим и при необходимости создадим топик + era := s.checkTopic() + if era != nil { + log.Fatal(era) + } + + return s +} + +func (s *KafkaWriter) Close() error { + return s.writer.Close() +} + +func (s *KafkaWriter) fetchTopics() (map[string]bool, error) { + conn, err := kafka.Dial("tcp", s.first) + if err != nil { + return nil, err + } + defer conn.Close() + + // + partitions, erp := conn.ReadPartitions() + if erp != nil { + return nil, erp + } + + // + topics := make(map[string]bool) + for _, p := range partitions { + topics[p.Topic] = true + } + + return topics, nil +} + +func (s *KafkaWriter) createTopic() error { + conn, err := kafka.Dial("tcp", s.first) + if err != nil { + return err + } + defer conn.Close() + + // + controller, era := conn.Controller() + if era != nil { + return era + } + + // + controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if eru != nil { + return eru + } + defer controllerConn.Close() + + // + topicConfigs := []kafka.TopicConfig{ + { + Topic: s.topic, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + + return controllerConn.CreateTopics(topicConfigs...) +} + +func (s *KafkaWriter) checkTopic() error { + topics, err := s.fetchTopics() + if err != nil { + return err + } + + // Если топика нет, то создадим + if _, ok := topics[s.topic]; !ok { + era := s.createTopic() + if era != nil { + return era + } + + log.Printf("create topic %q\n", s.topic) + + return era + } + + return nil +} + +func (s *KafkaWriter) WriteMessage(key string, value string) error { + return s.writer.WriteMessages(context.Background(), kafka.Message{ + Key: []byte(key), + Value: []byte(value), + }) +}