You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

209 lines
5.6 KiB

package authDB
import (
"context"
"crypto/md5"
"encoding/hex"
"encoding/json"
"errors"
"log"
"sync"
"net"
"strconv"
"time"
"git.slaventius.ru/test3k/authDB/internal/config"
repo "git.slaventius.ru/test3k/authDB/internal/customer"
arango "git.slaventius.ru/test3k/authDB/internal/transport/arango"
kafka "git.slaventius.ru/test3k/authDB/internal/transport/kafka"
api "git.slaventius.ru/test3k/umate/pkg/api"
apiKafka "git.slaventius.ru/test3k/umate/pkg/kafka"
logger "git.slaventius.ru/test3k/umate/pkg/logger"
// health "google.golang.org/grpc/health/grpc_health_v1"
)
type AuthDBServer struct {
mu sync.Mutex
ctx context.Context
kafkaWriter *kafka.KafkaWriter
config *config.Config
logger *logger.Logger
repo *repo.CustomerRepository
api.UnimplementedAuthDBServer
api.UnimplementedHealthServer
}
func NewServer(ctx context.Context, config *config.Config) *AuthDBServer {
conn := arango.NewConnection(ctx, config.Arango.Host, config.Arango.Port)
client := arango.NewClient(ctx, conn, config.Arango.User, config.Arango.Password)
arangoDB := arango.NewDataBase(ctx, client, "test3k")
logger := logger.NewLogger("test3k:authDBService", config.Sentry.DSN)
repo := repo.NewCustomerRepository(ctx, arangoDB)
//
err := repo.FetchAll()
if err != nil {
log.Fatal(err)
}
return &AuthDBServer{
mu: sync.Mutex{},
ctx: ctx,
repo: repo,
kafkaWriter: kafka.NewWriter(ctx, logger, apiKafka.TopicRegistrations, net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))),
config: config,
logger: logger,
}
}
func (r *AuthDBServer) getMD5Hash(text string) string {
hash := md5.Sum([]byte(text))
return hex.EncodeToString(hash[:])
}
func (s *AuthDBServer) GracefulStop() error {
return s.kafkaWriter.Close()
}
func (s *AuthDBServer) Login(ctx context.Context, req *api.LoginRequest) (*api.LoginResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
//
customer, ok := s.repo.Customers[req.GetLogin()]
if !ok {
return nil, errors.New("login unknown")
}
//
if !customer.Confirmed {
return nil, errors.New("login unconfirmed")
}
//
if customer.Password != s.getMD5Hash(req.GetPassword()) {
return nil, errors.New("password incorrect")
}
return &api.LoginResponse{
ID: 0,
}, nil
}
func (s *AuthDBServer) Registration(ctx context.Context, req *api.RegistrationRequest) (*api.RegistrationResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
//
customer, ok := s.repo.Customers[req.GetLogin()]
if !ok {
hash := s.getMD5Hash(req.GetPassword())
customer = s.repo.NewCustomer(req.GetLogin(), hash, req.GetEmail())
//
erk := customer.Add(s.config.MinutesReregistration)
if erk != nil {
return nil, erk
}
// Добавим в локальную копию
s.repo.Customers[req.GetLogin()] = customer
} else if customer.Confirmed || time.Now().Before(customer.TimeReregistration) {
return nil, errors.New("login already registered")
} else { // Обновим время регистрации
ers := customer.Refresh(s.config.MinutesReregistration)
if ers != nil {
return nil, ers
}
s.repo.Customers[customer.Login] = customer
}
// TODO
_, era := json.Marshal(customer.MessageRegistration)
if era != nil {
return nil, era
}
//
s.logger.Printf("publication code %s to %s ...", customer.MessageRegistration.Code, customer.MessageRegistration.Email)
// Отправим уведомление о необходимости подтверждения
// err := s.kafkaWriter.WriteMessage([]byte(customer.Login), value)
// if err != nil {
// s.logger.Error(err)
// return nil, err
// }
//
s.logger.Printf("publication code %s to %s completed", customer.MessageRegistration.Code, customer.MessageRegistration.Email)
return &api.RegistrationResponse{
Code: customer.MessageRegistration.Code,
Email: customer.MessageRegistration.Email,
}, nil
}
func (s *AuthDBServer) Confirmation(ctx context.Context, req *api.ConfirmationRequest) (*api.ConfirmationResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()
//
for _, x := range s.repo.Customers {
if x.Code == req.GetCode() {
if x.Confirmed {
return nil, errors.New("already confirmed")
}
//
err := x.Confirm()
if err != nil {
return nil, err
} else { // Что бы не перечитывать из БД, обновим локальную копию
s.repo.Customers[x.Login] = x
}
return &api.ConfirmationResponse{
ID: 0,
}, nil
}
}
return nil, errors.New("code unknown")
}
// func (s *AuthDBServer) Check(ctx context.Context, req *health.HealthCheckRequest) (*health.HealthCheckResponse, error) {
// s.logger.Println("🏥 K8s is health checking")
// s.logger.Printf("✅ Server's status is %s", api.HealthCheckResponse_SERVING)
// // if isDatabaseReady == true {
// // s.logger.Printf("✅ Server's status is %s", api.HealthCheckResponse_SERVING)
// // return &api.HealthCheckResponse{
// // Status: api.HealthCheckResponse_SERVING,
// // }, nil
// // } else if isDatabaseReady == false {
// // s.logger.Printf("🚫 Server's status is %s", api.HealthCheckResponse_NOT_SERVING)
// // return &api.HealthCheckResponse{
// // Status: api.HealthCheckResponse_NOT_SERVING,
// // }, nil
// // } else {
// // s.logger.Printf("🚫 Server's status is %s", api.HealthCheckResponse_UNKNOWN)
// // return &api.HealthCheckResponse{
// // Status: api.HealthCheckResponse_UNKNOWN,
// // }, nil
// // }
// return &health.HealthCheckResponse{
// Status: health.HealthCheckResponse_SERVING,
// }, nil
// }
// func (s *AuthDBServer) Watch(req *health.HealthCheckRequest, srv health.Health_WatchServer) error {
// return nil
// }