From 14ee880ea4b8d916500b2a6e35bc1ad8b04c5fca Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 12:20:16 +0300 Subject: [PATCH 01/21] * --- .vscode/launch.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 07fdea4..a5379fb 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,11 +11,11 @@ "mode": "debug", "program": "${workspaceRoot}/cmd/main.go", "env": { - "DB_HOST": "127.0.0.1", - "DB_PORT":"9055", + // "DB_HOST": "127.0.0.1", + // "DB_PORT":"9055", "APP_PORT":"9056", }, - "args": ["1", "2", "25"] + "args": [] } ] } From 4fee6063a03f58be1916cec195d4e9587c4c1367 Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 12:24:21 +0300 Subject: [PATCH 02/21] * --- cmd/main.go | 2 +- internal/transport/grpc/grpc.go | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 5052b4c..25a2a08 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -27,7 +27,7 @@ func main() { } // - fmt.Printf("authDBService up in %d port\n", config.App.Port) + fmt.Printf("authDBServer up (%s)\n", connStr) eru := s.Serve(lis) if eru != nil { log.Fatal(eru) diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 4d6d0f4..e278262 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -19,22 +19,22 @@ type user struct { Confirmed bool } -type gRPCServer struct { +type authDBServer struct { mu sync.Mutex users map[string]*user api.UnimplementedAuthDBServer id int32 } -func NewServer() *gRPCServer { - return &gRPCServer{ +func NewServer() *authDBServer { + return &authDBServer{ mu: sync.Mutex{}, users: make(map[string]*user), id: 0, } } -func (s *gRPCServer) Login(ctx context.Context, req *api.LoginRequest) (*api.LoginResponse, error) { +func (s *authDBServer) Login(ctx context.Context, req *api.LoginRequest) (*api.LoginResponse, error) { s.mu.Lock() defer s.mu.Unlock() @@ -59,7 +59,7 @@ func (s *gRPCServer) Login(ctx context.Context, req *api.LoginRequest) (*api.Log }, nil } -func (s *gRPCServer) Registration(ctx context.Context, req *api.RegistrationRequest) (*api.RegistrationResponse, error) { +func (s *authDBServer) Registration(ctx context.Context, req *api.RegistrationRequest) (*api.RegistrationResponse, error) { s.mu.Lock() defer s.mu.Unlock() @@ -115,7 +115,7 @@ func (s *gRPCServer) Registration(ctx context.Context, req *api.RegistrationRequ }, nil } -func (s *gRPCServer) Confirmation(ctx context.Context, req *api.ConfirmationRequest) (*api.ConfirmationResponse, error) { +func (s *authDBServer) Confirmation(ctx context.Context, req *api.ConfirmationRequest) (*api.ConfirmationResponse, error) { s.mu.Lock() defer s.mu.Unlock() From 8695307a455738d235e79b546bb85c3a7c594f7d Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 13:54:41 +0300 Subject: [PATCH 03/21] * --- .vscode/launch.json | 2 +- internal/transport/grpc/grpc.go | 21 ++++++++++++++++----- 2 files changed, 17 insertions(+), 6 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index a5379fb..c6f7343 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -13,7 +13,7 @@ "env": { // "DB_HOST": "127.0.0.1", // "DB_PORT":"9055", - "APP_PORT":"9056", + "APP_PORT":"9995", }, "args": [] } diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index e278262..908e3e3 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -41,17 +41,23 @@ func (s *authDBServer) Login(ctx context.Context, req *api.LoginRequest) (*api.L // user, ok := s.users[req.GetLogin()] if !ok { - return nil, errors.New("login unknown") + return &api.LoginResponse{ + ID: 0, + }, errors.New("login unknown") } // if !user.Confirmed { - return nil, errors.New("login unconfirmed") + return &api.LoginResponse{ + ID: 0, + }, errors.New("login unconfirmed") } // if user.Password != req.Password { - return nil, errors.New("password incorrect") + return &api.LoginResponse{ + ID: 0, + }, errors.New("password incorrect") } return &api.LoginResponse{ @@ -64,9 +70,14 @@ func (s *authDBServer) Registration(ctx context.Context, req *api.RegistrationRe defer s.mu.Unlock() // - _, ok := s.users[req.GetLogin()] + val, ok := s.users[req.GetLogin()] if ok { - return nil, errors.New("login already registered") + user := &api.RegistrationResponse{ + Code: val.Code, + Email: val.Email, + } + + return user, errors.New("login already registered") } // From f40d9deaa11c2b9a941c9474f61957c5f9786fb8 Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 14:03:36 +0300 Subject: [PATCH 04/21] * --- internal/transport/grpc/grpc.go | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 908e3e3..796ed34 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -70,29 +70,21 @@ func (s *authDBServer) Registration(ctx context.Context, req *api.RegistrationRe defer s.mu.Unlock() // - val, ok := s.users[req.GetLogin()] - if ok { - user := &api.RegistrationResponse{ - Code: val.Code, - Email: val.Email, - } - - return user, errors.New("login already registered") + if _, ok := s.users[req.GetLogin()]; ok { + return nil, errors.New("login already registered") } // s.id = s.id + 1 - - // - id := time.Now().Nanosecond() - code := strconv.Itoa(id / 2) + unique := time.Now().Nanosecond() + code := strconv.Itoa(unique / 2) // s.users[req.Login] = &user{ ID: s.id, Code: code, Login: req.Login, - Password: strconv.Itoa(id), + Password: strconv.Itoa(unique), Email: req.Email, Confirmed: false, } From b37723b219ab34ec0457d178388ca8975e19243b Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 14:08:47 +0300 Subject: [PATCH 05/21] * --- internal/transport/grpc/grpc.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 796ed34..9d49b2d 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -70,8 +70,11 @@ func (s *authDBServer) Registration(ctx context.Context, req *api.RegistrationRe defer s.mu.Unlock() // - if _, ok := s.users[req.GetLogin()]; ok { - return nil, errors.New("login already registered") + if val, ok := s.users[req.GetLogin()]; ok { + return &api.RegistrationResponse{ + Code: val.Code, + Email: val.Email, + }, errors.New("login already registered") } // From f256615246e5b1fdaa7198fec43a8940c1540893 Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 14:54:43 +0300 Subject: [PATCH 06/21] * --- internal/transport/grpc/grpc.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 9d49b2d..88846e6 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -21,7 +21,7 @@ type user struct { type authDBServer struct { mu sync.Mutex - users map[string]*user + users map[string]user api.UnimplementedAuthDBServer id int32 } @@ -29,7 +29,7 @@ type authDBServer struct { func NewServer() *authDBServer { return &authDBServer{ mu: sync.Mutex{}, - users: make(map[string]*user), + users: make(map[string]user), id: 0, } } @@ -83,7 +83,7 @@ func (s *authDBServer) Registration(ctx context.Context, req *api.RegistrationRe code := strconv.Itoa(unique / 2) // - s.users[req.Login] = &user{ + s.users[req.Login] = user{ ID: s.id, Code: code, Login: req.Login, From 517ef22e82f0ded4e8fc39058af889e124141933 Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 15:34:18 +0300 Subject: [PATCH 07/21] * --- cmd/main.go | 41 ++++++++++++++++++++++++++++++--- internal/transport/grpc/grpc.go | 21 +++++++++-------- 2 files changed, 50 insertions(+), 12 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 25a2a08..ff55d16 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -1,9 +1,13 @@ package main import ( + "context" "fmt" "log" "net" + "os" + "os/signal" + "syscall" "test3k/authDB/internal/config" server "test3k/authDB/internal/transport/grpc" api "test3k/authDB/pkg/api" @@ -12,11 +16,40 @@ import ( ) func main() { - s := grpc.NewServer() - srv := server.NewServer() config := config.NewConfig() + ctx, _ := context.WithCancel(context.Background()) + srv := server.NewServer() + s := grpc.NewServer() // + signalChannel := make(chan os.Signal, 1) + signal.Notify(signalChannel, syscall.SIGINT) + signal.Notify(signalChannel, syscall.SIGTERM) + defer stop(signalChannel) + + // Запуск сервера + go start(config, s, srv) + + // + for { + select { + case <-signalChannel: + return + case <-ctx.Done(): + return + } + } +} + +// Остановка сервера +func stop(signalChannel chan os.Signal) { + log.Println("authDBServer stopping ...") + + signal.Stop(signalChannel) +} + +// Запуск сервера +func start(config *config.Config, s *grpc.Server, srv *server.AuthDBServer) { api.RegisterAuthDBServer(s, srv) // @@ -27,7 +60,9 @@ func main() { } // - fmt.Printf("authDBServer up (%s)\n", connStr) + log.Printf("authDBServer starting (%s)\n", connStr) + + // eru := s.Serve(lis) if eru != nil { log.Fatal(eru) diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 88846e6..94f0693 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -3,6 +3,7 @@ package grpc import ( "context" "errors" + "log" "strconv" "sync" api "test3k/authDB/pkg/api" @@ -19,22 +20,22 @@ type user struct { Confirmed bool } -type authDBServer struct { +type AuthDBServer struct { mu sync.Mutex users map[string]user api.UnimplementedAuthDBServer id int32 } -func NewServer() *authDBServer { - return &authDBServer{ +func NewServer() *AuthDBServer { + return &AuthDBServer{ mu: sync.Mutex{}, users: make(map[string]user), id: 0, } } -func (s *authDBServer) Login(ctx context.Context, req *api.LoginRequest) (*api.LoginResponse, error) { +func (s *AuthDBServer) Login(ctx context.Context, req *api.LoginRequest) (*api.LoginResponse, error) { s.mu.Lock() defer s.mu.Unlock() @@ -65,12 +66,14 @@ func (s *authDBServer) Login(ctx context.Context, req *api.LoginRequest) (*api.L }, nil } -func (s *authDBServer) Registration(ctx context.Context, req *api.RegistrationRequest) (*api.RegistrationResponse, error) { +func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRequest) (*api.RegistrationResponse, error) { s.mu.Lock() defer s.mu.Unlock() // if val, ok := s.users[req.GetLogin()]; ok { + log.Printf("login %s already registered", val.Login) + return &api.RegistrationResponse{ Code: val.Code, Email: val.Email, @@ -86,9 +89,9 @@ func (s *authDBServer) Registration(ctx context.Context, req *api.RegistrationRe s.users[req.Login] = user{ ID: s.id, Code: code, - Login: req.Login, + Login: req.GetLogin(), Password: strconv.Itoa(unique), - Email: req.Email, + Email: req.GetEmail(), Confirmed: false, } @@ -117,11 +120,11 @@ func (s *authDBServer) Registration(ctx context.Context, req *api.RegistrationRe return &api.RegistrationResponse{ Code: code, - Email: req.Email, + Email: req.GetEmail(), }, nil } -func (s *authDBServer) Confirmation(ctx context.Context, req *api.ConfirmationRequest) (*api.ConfirmationResponse, error) { +func (s *AuthDBServer) Confirmation(ctx context.Context, req *api.ConfirmationRequest) (*api.ConfirmationResponse, error) { s.mu.Lock() defer s.mu.Unlock() From 409813057ed5744fef0fd766ffde925b0bd60cbb Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 15:36:09 +0300 Subject: [PATCH 08/21] * --- cmd/main.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index ff55d16..6f2a74b 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -25,7 +25,7 @@ func main() { signalChannel := make(chan os.Signal, 1) signal.Notify(signalChannel, syscall.SIGINT) signal.Notify(signalChannel, syscall.SIGTERM) - defer stop(signalChannel) + defer stop(signalChannel, s) // Запуск сервера go start(config, s, srv) @@ -42,10 +42,11 @@ func main() { } // Остановка сервера -func stop(signalChannel chan os.Signal) { - log.Println("authDBServer stopping ...") +func stop(signalChannel chan os.Signal, s *grpc.Server) { + defer s.GracefulStop() + defer signal.Stop(signalChannel) - signal.Stop(signalChannel) + log.Println("authDBServer stopping ...") } // Запуск сервера From bcf71ee1041d68329a0c99fa0845ab5d5bc8d482 Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 15:49:08 +0300 Subject: [PATCH 09/21] * --- internal/transport/grpc/grpc.go | 26 +++++++------------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 94f0693..3628d4d 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -3,7 +3,6 @@ package grpc import ( "context" "errors" - "log" "strconv" "sync" api "test3k/authDB/pkg/api" @@ -42,23 +41,17 @@ func (s *AuthDBServer) Login(ctx context.Context, req *api.LoginRequest) (*api.L // user, ok := s.users[req.GetLogin()] if !ok { - return &api.LoginResponse{ - ID: 0, - }, errors.New("login unknown") + return nil, errors.New("login unknown") } // if !user.Confirmed { - return &api.LoginResponse{ - ID: 0, - }, errors.New("login unconfirmed") + return nil, errors.New("login unconfirmed") } // if user.Password != req.Password { - return &api.LoginResponse{ - ID: 0, - }, errors.New("password incorrect") + return nil, errors.New("password incorrect") } return &api.LoginResponse{ @@ -71,26 +64,21 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe defer s.mu.Unlock() // - if val, ok := s.users[req.GetLogin()]; ok { - log.Printf("login %s already registered", val.Login) - - return &api.RegistrationResponse{ - Code: val.Code, - Email: val.Email, - }, errors.New("login already registered") + if _, ok := s.users[req.GetLogin()]; ok { + return nil, errors.New("login already registered") } // s.id = s.id + 1 unique := time.Now().Nanosecond() - code := strconv.Itoa(unique / 2) + code := strconv.Itoa(unique) // s.users[req.Login] = user{ ID: s.id, Code: code, Login: req.GetLogin(), - Password: strconv.Itoa(unique), + Password: code, // TODO Email: req.GetEmail(), Confirmed: false, } From b8e469c8dad680c08dd59b80eebcbbe601d8f6ab Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 16:04:20 +0300 Subject: [PATCH 10/21] * --- internal/transport/grpc/grpc.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 3628d4d..0e92b90 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -21,7 +21,7 @@ type user struct { type AuthDBServer struct { mu sync.Mutex - users map[string]user + users map[string]*user api.UnimplementedAuthDBServer id int32 } @@ -29,7 +29,7 @@ type AuthDBServer struct { func NewServer() *AuthDBServer { return &AuthDBServer{ mu: sync.Mutex{}, - users: make(map[string]user), + users: make(map[string]*user), id: 0, } } @@ -74,7 +74,7 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe code := strconv.Itoa(unique) // - s.users[req.Login] = user{ + s.users[req.Login] = &user{ ID: s.id, Code: code, Login: req.GetLogin(), From a8959b1b0b2a1d0e177d619b73fae78e3d073715 Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 18:58:49 +0300 Subject: [PATCH 11/21] * --- cmd/main.go | 5 +- internal/transport/grpc/grpc.go | 58 +++++------ internal/transport/kafka/kafka_reader.go | 27 ++++++ internal/transport/kafka/kafka_writer.go | 117 +++++++++++++++++++++++ 4 files changed, 173 insertions(+), 34 deletions(-) create mode 100644 internal/transport/kafka/kafka_reader.go create mode 100644 internal/transport/kafka/kafka_writer.go diff --git a/cmd/main.go b/cmd/main.go index 6f2a74b..07ab14f 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -25,7 +25,7 @@ func main() { signalChannel := make(chan os.Signal, 1) signal.Notify(signalChannel, syscall.SIGINT) signal.Notify(signalChannel, syscall.SIGTERM) - defer stop(signalChannel, s) + defer stop(signalChannel, s, srv) // Запуск сервера go start(config, s, srv) @@ -42,8 +42,9 @@ func main() { } // Остановка сервера -func stop(signalChannel chan os.Signal, s *grpc.Server) { +func stop(signalChannel chan os.Signal, s *grpc.Server, srv *server.AuthDBServer) { defer s.GracefulStop() + defer srv.GracefulStop() defer signal.Stop(signalChannel) log.Println("authDBServer stopping ...") diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 0e92b90..d5c352e 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -3,11 +3,13 @@ package grpc import ( "context" "errors" + "log" + "net" "strconv" "sync" + kafka "test3k/authDB/internal/transport/kafka" api "test3k/authDB/pkg/api" "time" - // kafka "github.com/segmentio/kafka-go" ) type user struct { @@ -20,20 +22,26 @@ type user struct { } type AuthDBServer struct { - mu sync.Mutex - users map[string]*user + mu sync.Mutex + users map[string]*user + kafkaWriter *kafka.KafkaWriter api.UnimplementedAuthDBServer id int32 } func NewServer() *AuthDBServer { return &AuthDBServer{ - mu: sync.Mutex{}, - users: make(map[string]*user), - id: 0, + mu: sync.Mutex{}, + users: make(map[string]*user), + kafkaWriter: kafka.NewWriter("registrations", net.JoinHostPort("localhost", "9094")), + id: 0, } } +func (s *AuthDBServer) GracefulStop() error { + return s.kafkaWriter.Close() +} + func (s *AuthDBServer) Login(ctx context.Context, req *api.LoginRequest) (*api.LoginResponse, error) { s.mu.Lock() defer s.mu.Unlock() @@ -63,10 +71,10 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe s.mu.Lock() defer s.mu.Unlock() - // - if _, ok := s.users[req.GetLogin()]; ok { - return nil, errors.New("login already registered") - } + // // + // if _, ok := s.users[req.GetLogin()]; ok { + // return nil, errors.New("login already registered") + // } // s.id = s.id + 1 @@ -74,7 +82,7 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe code := strconv.Itoa(unique) // - s.users[req.Login] = &user{ + user := &user{ ID: s.id, Code: code, Login: req.GetLogin(), @@ -82,29 +90,15 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe Email: req.GetEmail(), Confirmed: false, } + s.users[req.Login] = user - // // - // consumer := kafka.NewReader(kafka.ReaderConfig{ - // Brokers: []string{"localhost:9092"}, - // Topic: "topic-A", - // Partition: 0, - // MinBytes: 10e3, // 10KB - // MaxBytes: 10e6, // 10MB - // }) - // defer consumer.Close() + // TODO + err := s.kafkaWriter.WriteMessage(user.Login, user.Email) + if err != nil { + log.Println("kafka write ", err) - // // - // // consumer.SetOffset(42) - - // // - // for { - // m, err := consumer.ReadMessage(ctx) - // if err != nil { - // log.Fatal(err) - // } - - // fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value)) - // } + return nil, err + } return &api.RegistrationResponse{ Code: code, diff --git a/internal/transport/kafka/kafka_reader.go b/internal/transport/kafka/kafka_reader.go new file mode 100644 index 0000000..2922541 --- /dev/null +++ b/internal/transport/kafka/kafka_reader.go @@ -0,0 +1,27 @@ +package kafka + +import ( + "github.com/segmentio/kafka-go" +) + +type KafkaReader struct { + reader *kafka.Reader +} + +func NewReader(topic string, address ...string) *KafkaReader { + return &KafkaReader{ + // reader: &kafka.Reader{ + // Topic: topic, + // Balancer: &kafka.LeastBytes{}, + // Addr: kafka.TCP(address...), + // }, + } +} + +func (s *KafkaReader) Close() error { + return s.reader.Close() +} + +func (s *KafkaReader) ReadMessage(key string, value string) error { + return nil +} diff --git a/internal/transport/kafka/kafka_writer.go b/internal/transport/kafka/kafka_writer.go new file mode 100644 index 0000000..d4543c7 --- /dev/null +++ b/internal/transport/kafka/kafka_writer.go @@ -0,0 +1,117 @@ +package kafka + +import ( + "context" + "log" + "net" + "strconv" + + "github.com/segmentio/kafka-go" +) + +type KafkaWriter struct { + writer *kafka.Writer + first string + topic string +} + +func NewWriter(topic string, address ...string) *KafkaWriter { + s := &KafkaWriter{ + writer: &kafka.Writer{ + Topic: topic, + Balancer: &kafka.LeastBytes{}, + Addr: kafka.TCP(address...), + }, + first: address[0], + topic: topic, + } + + // Приверим и при необходимости создадим топик + era := s.checkTopic() + if era != nil { + log.Fatal(era) + } + + return s +} + +func (s *KafkaWriter) Close() error { + return s.writer.Close() +} + +func (s *KafkaWriter) fetchTopics() (map[string]bool, error) { + conn, err := kafka.Dial("tcp", s.first) + if err != nil { + return nil, err + } + defer conn.Close() + + // + partitions, erp := conn.ReadPartitions() + if erp != nil { + return nil, erp + } + + // + topics := make(map[string]bool) + for _, p := range partitions { + topics[p.Topic] = true + } + + return topics, nil +} + +func (s *KafkaWriter) createTopic() error { + conn, err := kafka.Dial("tcp", s.first) + if err != nil { + return err + } + defer conn.Close() + + // + controller, era := conn.Controller() + if era != nil { + return era + } + + // + controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port))) + if eru != nil { + return eru + } + defer controllerConn.Close() + + // + topicConfigs := []kafka.TopicConfig{ + { + Topic: s.topic, + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + + return controllerConn.CreateTopics(topicConfigs...) +} + +func (s *KafkaWriter) checkTopic() error { + topics, err := s.fetchTopics() + if err != nil { + return err + } + + // Если топика нет, то создадим + if _, ok := topics[s.topic]; !ok { + return s.createTopic() + } + + return nil +} + +func (s *KafkaWriter) WriteMessage(key string, value string) error { + err := s.writer.WriteMessages(context.Background(), kafka.Message{ + Key: []byte(key), + Value: []byte(value), + }) + + return err +} From e8a90f016d6723821c7d37e61b49f1fb5ca4ef79 Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 19:13:27 +0300 Subject: [PATCH 12/21] * --- cmd/main.go | 6 +++--- deploy/docker-compose.yml | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) create mode 100644 deploy/docker-compose.yml diff --git a/cmd/main.go b/cmd/main.go index 07ab14f..2ed9d93 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -2,11 +2,11 @@ package main import ( "context" - "fmt" "log" "net" "os" "os/signal" + "strconv" "syscall" "test3k/authDB/internal/config" server "test3k/authDB/internal/transport/grpc" @@ -55,7 +55,7 @@ func start(config *config.Config, s *grpc.Server, srv *server.AuthDBServer) { api.RegisterAuthDBServer(s, srv) // - connStr := fmt.Sprintf(":%d", config.App.Port) + connStr := net.JoinHostPort("", strconv.Itoa(config.App.Port)) lis, era := net.Listen("tcp", connStr) if era != nil { log.Fatal(era) @@ -67,6 +67,6 @@ func start(config *config.Config, s *grpc.Server, srv *server.AuthDBServer) { // eru := s.Serve(lis) if eru != nil { - log.Fatal(eru) + log.Fatal("Failed starting server") } } diff --git a/deploy/docker-compose.yml b/deploy/docker-compose.yml new file mode 100644 index 0000000..bb187d7 --- /dev/null +++ b/deploy/docker-compose.yml @@ -0,0 +1,19 @@ +version: '2' +services: + zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + kafka: + build: . + ports: + - "9094:9094" + environment: + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://kafka:9094 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094 + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT + KAFKA_CREATE_TOPICS: "abcd:2:1" + volumes: + - /var/run/docker.sock:/var/run/docker.sock From 2f3300685c95f5df4340ad5c40585e8ee21de556 Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 20:35:28 +0300 Subject: [PATCH 13/21] * --- deploy/{ => kafka}/docker-compose.yml | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename deploy/{ => kafka}/docker-compose.yml (100%) diff --git a/deploy/docker-compose.yml b/deploy/kafka/docker-compose.yml similarity index 100% rename from deploy/docker-compose.yml rename to deploy/kafka/docker-compose.yml From b80d509593fa578ce7ad621346a824143986966b Mon Sep 17 00:00:00 2001 From: slaventius Date: Wed, 1 Feb 2023 20:41:44 +0300 Subject: [PATCH 14/21] * --- internal/transport/grpc/grpc.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index d5c352e..d880a45 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -71,10 +71,10 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe s.mu.Lock() defer s.mu.Unlock() - // // - // if _, ok := s.users[req.GetLogin()]; ok { - // return nil, errors.New("login already registered") - // } + // + if _, ok := s.users[req.GetLogin()]; ok { + return nil, errors.New("login already registered") + } // s.id = s.id + 1 From 2a7a8ff2039153d6af4bf134aedd81604c6f39da Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 09:35:34 +0300 Subject: [PATCH 15/21] * --- cmd/main.go | 2 +- internal/config/config.go | 16 ++++++++-------- internal/transport/grpc/grpc.go | 9 +++++++-- internal/transport/kafka/kafka_writer.go | 9 ++++++++- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 2ed9d93..3298837 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,7 +18,7 @@ import ( func main() { config := config.NewConfig() ctx, _ := context.WithCancel(context.Background()) - srv := server.NewServer() + srv := server.NewServer(config) s := grpc.NewServer() // diff --git a/internal/config/config.go b/internal/config/config.go index b16aee5..dc5902a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -6,19 +6,19 @@ import ( "github.com/kelseyhightower/envconfig" ) -// type DbConfig struct { -// Host string `envconfig:"DB_HOST"` -// Port int `envconfig:"DB_PORT"` -// } - -type AppConfig struct { +type appConfig struct { Port int `envconfig:"APP_PORT"` } +type kafkaConfig struct { + Host string `envconfig:"KAFKA_HOST"` + Port int `envconfig:"KAFKA_PORT"` +} + // ... type Config struct { - // Db DbConfig - App AppConfig + App appConfig + Kafka kafkaConfig } func NewConfig() *Config { diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index d880a45..421da1c 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -7,11 +7,16 @@ import ( "net" "strconv" "sync" + "test3k/authDB/internal/config" kafka "test3k/authDB/internal/transport/kafka" api "test3k/authDB/pkg/api" "time" ) +const ( + topicRegistrations string = "registrations" // Топик для регистраций +) + type user struct { ID int32 Code string @@ -29,11 +34,11 @@ type AuthDBServer struct { id int32 } -func NewServer() *AuthDBServer { +func NewServer(config *config.Config) *AuthDBServer { return &AuthDBServer{ mu: sync.Mutex{}, users: make(map[string]*user), - kafkaWriter: kafka.NewWriter("registrations", net.JoinHostPort("localhost", "9094")), + kafkaWriter: kafka.NewWriter(topicRegistrations, net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))), id: 0, } } diff --git a/internal/transport/kafka/kafka_writer.go b/internal/transport/kafka/kafka_writer.go index d4543c7..434b79b 100644 --- a/internal/transport/kafka/kafka_writer.go +++ b/internal/transport/kafka/kafka_writer.go @@ -101,7 +101,14 @@ func (s *KafkaWriter) checkTopic() error { // Если топика нет, то создадим if _, ok := topics[s.topic]; !ok { - return s.createTopic() + era := s.createTopic() + if era != nil { + return era + } + + log.Printf("create topic %q\n", s.topic) + + return era } return nil From 82e0c26c2721614bfa7aa77fb3be9ee1f941eaaa Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 10:01:03 +0300 Subject: [PATCH 16/21] * --- internal/transport/grpc/grpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index 421da1c..a2b4bcb 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -100,7 +100,7 @@ func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRe // TODO err := s.kafkaWriter.WriteMessage(user.Login, user.Email) if err != nil { - log.Println("kafka write ", err) + log.Print(err) return nil, err } From 281cc799631ae309803daacf78b7f8da70ec7d8d Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 11:06:35 +0300 Subject: [PATCH 17/21] * --- .vscode/launch.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index c6f7343..05f4473 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -11,9 +11,8 @@ "mode": "debug", "program": "${workspaceRoot}/cmd/main.go", "env": { - // "DB_HOST": "127.0.0.1", - // "DB_PORT":"9055", "APP_PORT":"9995", + "KAFKA_PORT":"9092", }, "args": [] } From 85e829b8b4c8e66d698761923f7520bbf8ac993b Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 11:07:10 +0300 Subject: [PATCH 18/21] * --- internal/transport/kafka/kafka_writer.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/transport/kafka/kafka_writer.go b/internal/transport/kafka/kafka_writer.go index 434b79b..2ad9cd0 100644 --- a/internal/transport/kafka/kafka_writer.go +++ b/internal/transport/kafka/kafka_writer.go @@ -115,10 +115,8 @@ func (s *KafkaWriter) checkTopic() error { } func (s *KafkaWriter) WriteMessage(key string, value string) error { - err := s.writer.WriteMessages(context.Background(), kafka.Message{ + return s.writer.WriteMessages(context.Background(), kafka.Message{ Key: []byte(key), Value: []byte(value), }) - - return err } From 14a0508252455caa44c601dd55cd86c40934ff89 Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 11:34:41 +0300 Subject: [PATCH 19/21] * --- internal/transport/kafka/kafka_reader.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/internal/transport/kafka/kafka_reader.go b/internal/transport/kafka/kafka_reader.go index 2922541..c290b82 100644 --- a/internal/transport/kafka/kafka_reader.go +++ b/internal/transport/kafka/kafka_reader.go @@ -6,15 +6,22 @@ import ( type KafkaReader struct { reader *kafka.Reader + first string + topic string } func NewReader(topic string, address ...string) *KafkaReader { return &KafkaReader{ - // reader: &kafka.Reader{ - // Topic: topic, - // Balancer: &kafka.LeastBytes{}, - // Addr: kafka.TCP(address...), - // }, + reader: kafka.NewReader(kafka.ReaderConfig{ + Topic: topic, + Brokers: address, + GroupID: "consumer-group-id", // TODO + Partition: 0, // TODO + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }), + first: address[0], + topic: topic, } } From da9c2699680da3c704929815af102a693894c4e7 Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 11:43:28 +0300 Subject: [PATCH 20/21] * --- deploy/kafka/docker-compose-local-kafka.yml | 26 +++++++++++++++++++++ deploy/kafka/docker-compose.yml | 19 --------------- 2 files changed, 26 insertions(+), 19 deletions(-) create mode 100644 deploy/kafka/docker-compose-local-kafka.yml delete mode 100644 deploy/kafka/docker-compose.yml diff --git a/deploy/kafka/docker-compose-local-kafka.yml b/deploy/kafka/docker-compose-local-kafka.yml new file mode 100644 index 0000000..5494bd1 --- /dev/null +++ b/deploy/kafka/docker-compose-local-kafka.yml @@ -0,0 +1,26 @@ +version: '2' +services: + zookeeper: + image: confluentinc/cp-zookeeper:7.3.0 + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + + broker: + image: confluentinc/cp-kafka:7.3.0 + container_name: broker + ports: + # To learn about configuring Kafka for access across networks see + # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ + - "9092:9092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 diff --git a/deploy/kafka/docker-compose.yml b/deploy/kafka/docker-compose.yml deleted file mode 100644 index bb187d7..0000000 --- a/deploy/kafka/docker-compose.yml +++ /dev/null @@ -1,19 +0,0 @@ -version: '2' -services: - zookeeper: - image: wurstmeister/zookeeper - ports: - - "2181:2181" - kafka: - build: . - ports: - - "9094:9094" - environment: - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://kafka:9094 - KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094 - KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT - KAFKA_CREATE_TOPICS: "abcd:2:1" - volumes: - - /var/run/docker.sock:/var/run/docker.sock From 5ef6837b26e974d57b9c36a5dfeae8842013bde1 Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 11:43:51 +0300 Subject: [PATCH 21/21] * --- deploy/kafka/docker-compose-local-kafka.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/deploy/kafka/docker-compose-local-kafka.yml b/deploy/kafka/docker-compose-local-kafka.yml index 5494bd1..3226e09 100644 --- a/deploy/kafka/docker-compose-local-kafka.yml +++ b/deploy/kafka/docker-compose-local-kafka.yml @@ -11,8 +11,8 @@ services: image: confluentinc/cp-kafka:7.3.0 container_name: broker ports: - # To learn about configuring Kafka for access across networks see - # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ + # To learn about configuring Kafka for access across networks see + # https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/ - "9092:9092" depends_on: - zookeeper