slaventius@odnookno.info 2 years ago
commit 338a76a698
  1. 7
      .vscode/launch.json
  2. 49
      cmd/main.go
  3. 26
      deploy/kafka/docker-compose-local-kafka.yml
  4. 16
      internal/config/config.go
  5. 86
      internal/transport/grpc/grpc.go
  6. 34
      internal/transport/kafka/kafka_reader.go
  7. 122
      internal/transport/kafka/kafka_writer.go

@ -11,11 +11,10 @@
"mode": "debug", "mode": "debug",
"program": "${workspaceRoot}/cmd/main.go", "program": "${workspaceRoot}/cmd/main.go",
"env": { "env": {
"DB_HOST": "127.0.0.1", "APP_PORT":"9995",
"DB_PORT":"9055", "KAFKA_PORT":"9092",
"APP_PORT":"9056",
}, },
"args": ["1", "2", "25"] "args": []
} }
] ]
} }

@ -1,9 +1,13 @@
package main package main
import ( import (
"fmt" "context"
"log" "log"
"net" "net"
"os"
"os/signal"
"strconv"
"syscall"
"test3k/authDB/internal/config" "test3k/authDB/internal/config"
server "test3k/authDB/internal/transport/grpc" server "test3k/authDB/internal/transport/grpc"
api "test3k/authDB/pkg/api" api "test3k/authDB/pkg/api"
@ -12,24 +16,57 @@ import (
) )
func main() { func main() {
s := grpc.NewServer()
srv := server.NewServer()
config := config.NewConfig() config := config.NewConfig()
ctx, _ := context.WithCancel(context.Background())
srv := server.NewServer(config)
s := grpc.NewServer()
// //
signalChannel := make(chan os.Signal, 1)
signal.Notify(signalChannel, syscall.SIGINT)
signal.Notify(signalChannel, syscall.SIGTERM)
defer stop(signalChannel, s, srv)
// Запуск сервера
go start(config, s, srv)
//
for {
select {
case <-signalChannel:
return
case <-ctx.Done():
return
}
}
}
// Остановка сервера
func stop(signalChannel chan os.Signal, s *grpc.Server, srv *server.AuthDBServer) {
defer s.GracefulStop()
defer srv.GracefulStop()
defer signal.Stop(signalChannel)
log.Println("authDBServer stopping ...")
}
// Запуск сервера
func start(config *config.Config, s *grpc.Server, srv *server.AuthDBServer) {
api.RegisterAuthDBServer(s, srv) api.RegisterAuthDBServer(s, srv)
// //
connStr := fmt.Sprintf(":%d", config.App.Port) connStr := net.JoinHostPort("", strconv.Itoa(config.App.Port))
lis, era := net.Listen("tcp", connStr) lis, era := net.Listen("tcp", connStr)
if era != nil { if era != nil {
log.Fatal(era) log.Fatal(era)
} }
// //
fmt.Printf("authDBService up in %d port\n", config.App.Port) log.Printf("authDBServer starting (%s)\n", connStr)
//
eru := s.Serve(lis) eru := s.Serve(lis)
if eru != nil { if eru != nil {
log.Fatal(eru) log.Fatal("Failed starting server")
} }
} }

@ -0,0 +1,26 @@
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.3.0
container_name: broker
ports:
# To learn about configuring Kafka for access across networks see
# https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

@ -6,19 +6,19 @@ import (
"github.com/kelseyhightower/envconfig" "github.com/kelseyhightower/envconfig"
) )
// type DbConfig struct { type appConfig struct {
// Host string `envconfig:"DB_HOST"`
// Port int `envconfig:"DB_PORT"`
// }
type AppConfig struct {
Port int `envconfig:"APP_PORT"` Port int `envconfig:"APP_PORT"`
} }
type kafkaConfig struct {
Host string `envconfig:"KAFKA_HOST"`
Port int `envconfig:"KAFKA_PORT"`
}
// ... // ...
type Config struct { type Config struct {
// Db DbConfig App appConfig
App AppConfig Kafka kafkaConfig
} }
func NewConfig() *Config { func NewConfig() *Config {

@ -3,11 +3,18 @@ package grpc
import ( import (
"context" "context"
"errors" "errors"
"log"
"net"
"strconv" "strconv"
"sync" "sync"
"test3k/authDB/internal/config"
kafka "test3k/authDB/internal/transport/kafka"
api "test3k/authDB/pkg/api" api "test3k/authDB/pkg/api"
"time" "time"
// kafka "github.com/segmentio/kafka-go" )
const (
topicRegistrations string = "registrations" // Топик для регистраций
) )
type user struct { type user struct {
@ -19,22 +26,28 @@ type user struct {
Confirmed bool Confirmed bool
} }
type gRPCServer struct { type AuthDBServer struct {
mu sync.Mutex mu sync.Mutex
users map[string]*user users map[string]*user
kafkaWriter *kafka.KafkaWriter
api.UnimplementedAuthDBServer api.UnimplementedAuthDBServer
id int32 id int32
} }
func NewServer() *gRPCServer { func NewServer(config *config.Config) *AuthDBServer {
return &gRPCServer{ return &AuthDBServer{
mu: sync.Mutex{}, mu: sync.Mutex{},
users: make(map[string]*user), users: make(map[string]*user),
id: 0, kafkaWriter: kafka.NewWriter(topicRegistrations, net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))),
id: 0,
} }
} }
func (s *gRPCServer) Login(ctx context.Context, req *api.LoginRequest) (*api.LoginResponse, error) { func (s *AuthDBServer) GracefulStop() error {
return s.kafkaWriter.Close()
}
func (s *AuthDBServer) Login(ctx context.Context, req *api.LoginRequest) (*api.LoginResponse, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -59,63 +72,46 @@ func (s *gRPCServer) Login(ctx context.Context, req *api.LoginRequest) (*api.Log
}, nil }, nil
} }
func (s *gRPCServer) Registration(ctx context.Context, req *api.RegistrationRequest) (*api.RegistrationResponse, error) { func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRequest) (*api.RegistrationResponse, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
// //
_, ok := s.users[req.GetLogin()] if _, ok := s.users[req.GetLogin()]; ok {
if ok {
return nil, errors.New("login already registered") return nil, errors.New("login already registered")
} }
// //
s.id = s.id + 1 s.id = s.id + 1
unique := time.Now().Nanosecond()
code := strconv.Itoa(unique)
// //
id := time.Now().Nanosecond() user := &user{
code := strconv.Itoa(id / 2)
//
s.users[req.Login] = &user{
ID: s.id, ID: s.id,
Code: code, Code: code,
Login: req.Login, Login: req.GetLogin(),
Password: strconv.Itoa(id), Password: code, // TODO
Email: req.Email, Email: req.GetEmail(),
Confirmed: false, Confirmed: false,
} }
s.users[req.Login] = user
// // // TODO
// consumer := kafka.NewReader(kafka.ReaderConfig{ err := s.kafkaWriter.WriteMessage(user.Login, user.Email)
// Brokers: []string{"localhost:9092"}, if err != nil {
// Topic: "topic-A", log.Print(err)
// Partition: 0,
// MinBytes: 10e3, // 10KB return nil, err
// MaxBytes: 10e6, // 10MB }
// })
// defer consumer.Close()
// //
// // consumer.SetOffset(42)
// //
// for {
// m, err := consumer.ReadMessage(ctx)
// if err != nil {
// log.Fatal(err)
// }
// fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
// }
return &api.RegistrationResponse{ return &api.RegistrationResponse{
Code: code, Code: code,
Email: req.Email, Email: req.GetEmail(),
}, nil }, nil
} }
func (s *gRPCServer) Confirmation(ctx context.Context, req *api.ConfirmationRequest) (*api.ConfirmationResponse, error) { func (s *AuthDBServer) Confirmation(ctx context.Context, req *api.ConfirmationRequest) (*api.ConfirmationResponse, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()

@ -0,0 +1,34 @@
package kafka
import (
"github.com/segmentio/kafka-go"
)
type KafkaReader struct {
reader *kafka.Reader
first string
topic string
}
func NewReader(topic string, address ...string) *KafkaReader {
return &KafkaReader{
reader: kafka.NewReader(kafka.ReaderConfig{
Topic: topic,
Brokers: address,
GroupID: "consumer-group-id", // TODO
Partition: 0, // TODO
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
}),
first: address[0],
topic: topic,
}
}
func (s *KafkaReader) Close() error {
return s.reader.Close()
}
func (s *KafkaReader) ReadMessage(key string, value string) error {
return nil
}

@ -0,0 +1,122 @@
package kafka
import (
"context"
"log"
"net"
"strconv"
"github.com/segmentio/kafka-go"
)
type KafkaWriter struct {
writer *kafka.Writer
first string
topic string
}
func NewWriter(topic string, address ...string) *KafkaWriter {
s := &KafkaWriter{
writer: &kafka.Writer{
Topic: topic,
Balancer: &kafka.LeastBytes{},
Addr: kafka.TCP(address...),
},
first: address[0],
topic: topic,
}
// Приверим и при необходимости создадим топик
era := s.checkTopic()
if era != nil {
log.Fatal(era)
}
return s
}
func (s *KafkaWriter) Close() error {
return s.writer.Close()
}
func (s *KafkaWriter) fetchTopics() (map[string]bool, error) {
conn, err := kafka.Dial("tcp", s.first)
if err != nil {
return nil, err
}
defer conn.Close()
//
partitions, erp := conn.ReadPartitions()
if erp != nil {
return nil, erp
}
//
topics := make(map[string]bool)
for _, p := range partitions {
topics[p.Topic] = true
}
return topics, nil
}
func (s *KafkaWriter) createTopic() error {
conn, err := kafka.Dial("tcp", s.first)
if err != nil {
return err
}
defer conn.Close()
//
controller, era := conn.Controller()
if era != nil {
return era
}
//
controllerConn, eru := kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if eru != nil {
return eru
}
defer controllerConn.Close()
//
topicConfigs := []kafka.TopicConfig{
{
Topic: s.topic,
NumPartitions: 1,
ReplicationFactor: 1,
},
}
return controllerConn.CreateTopics(topicConfigs...)
}
func (s *KafkaWriter) checkTopic() error {
topics, err := s.fetchTopics()
if err != nil {
return err
}
// Если топика нет, то создадим
if _, ok := topics[s.topic]; !ok {
era := s.createTopic()
if era != nil {
return era
}
log.Printf("create topic %q\n", s.topic)
return era
}
return nil
}
func (s *KafkaWriter) WriteMessage(key string, value string) error {
return s.writer.WriteMessages(context.Background(), kafka.Message{
Key: []byte(key),
Value: []byte(value),
})
}
Loading…
Cancel
Save