diff --git a/internal/authPostman.go b/internal/authPostman.go index 076504e..c617a38 100644 --- a/internal/authPostman.go +++ b/internal/authPostman.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "strconv" + "time" "git.slaventius.ru/test3k/authPostman/internal/config" kafka "git.slaventius.ru/test3k/authPostman/internal/transport/kafka" @@ -21,7 +22,8 @@ type AuthPostmanServer struct { kafkaReader *kafka.KafkaReader logger *logger.Logger config *config.Config - msgs map[int64]bool + msgs map[int64]time.Time + age time.Duration } func NewServer(ctx context.Context, config *config.Config) *AuthPostmanServer { @@ -33,7 +35,8 @@ func NewServer(ctx context.Context, config *config.Config) *AuthPostmanServer { kafkaReader: kafka.NewReader(ctx, config, api.TopicRegistrations, address...), logger: logger, config: config, - msgs: make(map[int64]bool), + msgs: make(map[int64]time.Time), + age: time.Minute * 5, } } @@ -70,7 +73,7 @@ func (s *AuthPostmanServer) ReadMessage(offset int64) error { continue } else { - s.msgs[msg.ID] = true + s.msgs[msg.ID] = time.Now() } // @@ -99,5 +102,13 @@ func (s *AuthPostmanServer) ReadMessage(offset int64) error { if erf != nil { s.logger.Error(erf) } + + // !!! + no := time.Now() + for k, v := range s.msgs { + if s.age < no.Sub(v) { + delete(s.msgs, k) + } + } } }