You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
39 lines
1.0 KiB
39 lines
1.0 KiB
package kafka
|
|
|
|
// A commit represents the instruction of publishing an update of the last
|
|
// offset read by a program for a topic and partition.
|
|
type commit struct {
|
|
topic string
|
|
partition int
|
|
offset int64
|
|
}
|
|
|
|
// makeCommit builds a commit value from a message, the resulting commit takes
|
|
// its topic, partition, and offset from the message.
|
|
func makeCommit(msg Message) commit {
|
|
return commit{
|
|
topic: msg.Topic,
|
|
partition: msg.Partition,
|
|
offset: msg.Offset + 1,
|
|
}
|
|
}
|
|
|
|
// makeCommits generates a slice of commits from a list of messages, it extracts
|
|
// the topic, partition, and offset of each message and builds the corresponding
|
|
// commit slice.
|
|
func makeCommits(msgs ...Message) []commit {
|
|
commits := make([]commit, len(msgs))
|
|
|
|
for i, m := range msgs {
|
|
commits[i] = makeCommit(m)
|
|
}
|
|
|
|
return commits
|
|
}
|
|
|
|
// commitRequest is the data type exchanged between the CommitMessages method
|
|
// and internals of the reader's implementation.
|
|
type commitRequest struct {
|
|
commits []commit
|
|
errch chan<- error
|
|
}
|
|
|