slaventius 2 years ago
parent c9bc445b67
commit afc0b1fe5d
  1. 1
      .gitignore
  2. 30
      cmd/main.go
  3. 39
      internal/postman.go

1
.gitignore vendored

@ -0,0 +1 @@
authPostmanService

@ -10,14 +10,16 @@ import (
"syscall"
server "test3k/authPostman/internal"
"test3k/authPostman/internal/config"
)
"github.com/segmentio/kafka-go"
const (
topicRegistrations string = "registrations" // Топик для регистраций
)
func main() {
config := config.NewConfig()
ctx, _ := context.WithCancel(context.Background())
srv := server.NewServer(ctx, config)
srv := server.NewServer(ctx, config, topicRegistrations)
//
signalChannel := make(chan os.Signal, 1)
@ -54,26 +56,8 @@ func start(config *config.Config, srv *server.AuthPostmanServer) {
//
log.Printf("authPostmanServer starting (listening to %s)\n", connStr)
//
r := kafka.NewReader(kafka.ReaderConfig{
Topic: "registrations",
Brokers: []string{connStr},
GroupID: "consumer-group-id",
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
})
defer r.Close()
// ...
r.SetOffset(0)
for {
m, err := r.ReadMessage(context.Background())
if err != nil {
break
}
log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
// Запускаем прослушивание
if err := srv.ReadMessage(0); err != nil {
log.Fatal(err)
}
}

@ -2,17 +2,48 @@ package postman
import (
"context"
"log"
"net"
"strconv"
"test3k/authPostman/internal/config"
"github.com/segmentio/kafka-go"
)
type AuthPostmanServer struct {
kafkaReader *kafka.Reader
ctx context.Context
}
func NewServer(ctx context.Context, config *config.Config) *AuthPostmanServer {
return &AuthPostmanServer{}
func NewServer(ctx context.Context, config *config.Config, topic string) *AuthPostmanServer {
return &AuthPostmanServer{
kafkaReader: kafka.NewReader(kafka.ReaderConfig{
Topic: topic,
Brokers: []string{net.JoinHostPort(config.Kafka.Host, strconv.Itoa(config.Kafka.Port))},
GroupID: "consumer-group-id",
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
}),
ctx: ctx,
}
}
func (s *AuthPostmanServer) GracefulStop() error {
// return s.db.Close()
return nil
return s.kafkaReader.Close()
}
func (s *AuthPostmanServer) ReadMessage(offset int64) error {
// ...
s.kafkaReader.SetOffset(offset)
//
for {
m, err := s.kafkaReader.ReadMessage(s.ctx)
if err != nil {
return err
}
log.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
}
}

Loading…
Cancel
Save