From 2a7a8ff2039153d6af4bf134aedd81604c6f39da Mon Sep 17 00:00:00 2001 From: slaventius Date: Thu, 2 Feb 2023 09:35:34 +0300 Subject: [PATCH] * --- cmd/main.go | 2 +- internal/config/config.go | 16 ++++++++-------- internal/transport/grpc/grpc.go | 9 +++++++-- internal/transport/kafka/kafka_writer.go | 9 ++++++++- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 2ed9d93..3298837 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -18,7 +18,7 @@ import ( func main() { config := config.NewConfig() ctx, _ := context.WithCancel(context.Background()) - srv := server.NewServer() + srv := server.NewServer(config) s := grpc.NewServer() // diff --git a/internal/config/config.go b/internal/config/config.go index b16aee5..dc5902a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -6,19 +6,19 @@ import ( "github.com/kelseyhightower/envconfig" ) -// type DbConfig struct { -// Host string `envconfig:"DB_HOST"` -// Port int `envconfig:"DB_PORT"` -// } - -type AppConfig struct { +type appConfig struct { Port int `envconfig:"APP_PORT"` } +type kafkaConfig struct { + Host string `envconfig:"KAFKA_HOST"` + Port int `envconfig:"KAFKA_PORT"` +} + // ... type Config struct { - // Db DbConfig - App AppConfig + App appConfig + Kafka kafkaConfig } func NewConfig() *Config { diff --git a/internal/transport/grpc/grpc.go b/internal/transport/grpc/grpc.go index d880a45..421da1c 100644 --- a/internal/transport/grpc/grpc.go +++ b/internal/transport/grpc/grpc.go @@ -7,11 +7,16 @@ import ( "net" "strconv" "sync" + "test3k/authDB/internal/config" kafka "test3k/authDB/internal/transport/kafka" api "test3k/authDB/pkg/api" "time" ) +const ( + topicRegistrations string = "registrations" // Топик для регистраций +) + type user struct { ID int32 Code string @@ -29,11 +34,11 @@ type AuthDBServer struct { id int32 } -func NewServer() *AuthDBServer { +func NewServer(config *config.Config) *AuthDBServer { return &AuthDBServer{ mu: sync.Mutex{}, users: make(map[string]*user), - kafkaWriter: kafka.NewWriter("registrations", net.JoinHostPort("localhost", "9094")), + kafkaWriter: kafka.NewWriter(topicRegistrations, net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))), id: 0, } } diff --git a/internal/transport/kafka/kafka_writer.go b/internal/transport/kafka/kafka_writer.go index d4543c7..434b79b 100644 --- a/internal/transport/kafka/kafka_writer.go +++ b/internal/transport/kafka/kafka_writer.go @@ -101,7 +101,14 @@ func (s *KafkaWriter) checkTopic() error { // Если топика нет, то создадим if _, ok := topics[s.topic]; !ok { - return s.createTopic() + era := s.createTopic() + if era != nil { + return era + } + + log.Printf("create topic %q\n", s.topic) + + return era } return nil