slaventius 2 years ago
parent cad80c407e
commit 9038502295
  1. 17
      internal/authPostman.go

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
"time"
"git.slaventius.ru/test3k/authPostman/internal/config" "git.slaventius.ru/test3k/authPostman/internal/config"
kafka "git.slaventius.ru/test3k/authPostman/internal/transport/kafka" kafka "git.slaventius.ru/test3k/authPostman/internal/transport/kafka"
@ -21,7 +22,8 @@ type AuthPostmanServer struct {
kafkaReader *kafka.KafkaReader kafkaReader *kafka.KafkaReader
logger *logger.Logger logger *logger.Logger
config *config.Config config *config.Config
msgs map[int64]bool msgs map[int64]time.Time
age time.Duration
} }
func NewServer(ctx context.Context, config *config.Config) *AuthPostmanServer { func NewServer(ctx context.Context, config *config.Config) *AuthPostmanServer {
@ -33,7 +35,8 @@ func NewServer(ctx context.Context, config *config.Config) *AuthPostmanServer {
kafkaReader: kafka.NewReader(ctx, config, api.TopicRegistrations, address...), kafkaReader: kafka.NewReader(ctx, config, api.TopicRegistrations, address...),
logger: logger, logger: logger,
config: config, config: config,
msgs: make(map[int64]bool), msgs: make(map[int64]time.Time),
age: time.Minute * 5,
} }
} }
@ -70,7 +73,7 @@ func (s *AuthPostmanServer) ReadMessage(offset int64) error {
continue continue
} else { } else {
s.msgs[msg.ID] = true s.msgs[msg.ID] = time.Now()
} }
// //
@ -99,5 +102,13 @@ func (s *AuthPostmanServer) ReadMessage(offset int64) error {
if erf != nil { if erf != nil {
s.logger.Error(erf) s.logger.Error(erf)
} }
// !!!
no := time.Now()
for k, v := range s.msgs {
if s.age < no.Sub(v) {
delete(s.msgs, k)
}
}
} }
} }

Loading…
Cancel
Save