Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not commit offsets for past generations if partition not owned #1330

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ func (batch *Batch) ReadMessage() (Message, error) {
msg.HighWaterMark = batch.highWaterMark
msg.Time = makeTime(timestamp)
msg.Headers = headers
if batch.conn != nil {
msg.GenerationId = batch.conn.generationId
}

return msg, err
}
Expand Down
14 changes: 8 additions & 6 deletions commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@ package kafka
// A commit represents the instruction of publishing an update of the last
// offset read by a program for a topic and partition.
type commit struct {
topic string
partition int
offset int64
topic string
partition int
offset int64
generationId int32
}

// makeCommit builds a commit value from a message, the resulting commit takes
// its topic, partition, and offset from the message.
func makeCommit(msg Message) commit {
return commit{
topic: msg.Topic,
partition: msg.Partition,
offset: msg.Offset + 1,
topic: msg.Topic,
partition: msg.Partition,
offset: msg.Offset + 1,
generationId: msg.GenerationId,
}
}

Expand Down
4 changes: 4 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Conn struct {
apiVersions atomic.Value // apiVersionMap

transactionalID *string

generationId int32
}

type apiVersionMap map[apiKey]ApiVersion
Expand Down Expand Up @@ -182,6 +184,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
offset: FirstOffset,
requiredAcks: -1,
transactionalID: emptyToNullable(config.TransactionalID),
generationId: -1,
}

c.wb.w = &c.wbuf
Expand Down Expand Up @@ -388,6 +391,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error
return joinGroupResponseV1{}, Error(response.ErrorCode)
}

c.generationId = response.GenerationID
return response, nil
}

Expand Down
4 changes: 4 additions & 0 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type Message struct {
Value []byte
Headers []Header

// If the message has been sent by a consumer group, it contains the
// generation's id. Value is -1 if not using consumer groups.
GenerationId int32

// This field is used to hold arbitrary data you wish to include, so it
// will be available when handle it on the Writer's `Completion` method,
// this support the application can do any post operation on each message.
Expand Down
65 changes: 51 additions & 14 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (r *Reader) unsubscribe() {
// another consumer to avoid such a race.
}

func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
func (r *Reader) subscribe(generationId int32, allAssignments map[string][]PartitionAssignment) {
offsets := make(map[topicPartition]int64)
for topic, assignments := range allAssignments {
for _, assignment := range assignments {
Expand All @@ -134,7 +134,7 @@ func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
}

r.mutex.Lock()
r.start(offsets)
r.start(generationId, offsets)
r.mutex.Unlock()

r.withLogger(func(l Logger) {
Expand All @@ -150,35 +150,71 @@ func (r *Reader) commitOffsetsWithRetry(gen *Generation, offsetStash offsetStash
backoffDelayMax = 5 * time.Second
)

messagesToSend := make(map[string]map[int]int64)
for topic, partitionsInfo := range offsetStash {
msgsForTopic := make(map[int]int64)
for partition, commitInfo := range partitionsInfo {
// if there is a generation and it is different than the current
// it means there was a rebalance
if commitInfo.generationId != -1 {
if gen.ID == commitInfo.generationId {
msgsForTopic[partition] = commitInfo.offset
} else {
if assignments, ok := gen.Assignments[topic]; ok {
for _, assignment := range assignments {
if assignment.ID == partition && commitInfo.generationId > gen.ID {
msgsForTopic[partition] = commitInfo.offset
}
}
}
r.withErrorLogger(func(l Logger) {
l.Printf("Discarding commint for %s - %d: %d . Current generation is %d, commit generation is %d", topic, partition, commitInfo.offset, gen.ID, commitInfo.generationId)
})
}
} else {
msgsForTopic[partition] = commitInfo.offset
}
}
if len(msgsForTopic) > 0 {
messagesToSend[topic] = msgsForTopic
}
}
for attempt := 0; attempt < retries; attempt++ {
if attempt != 0 {
if !sleep(r.stctx, backoff(attempt, backoffDelayMin, backoffDelayMax)) {
return
}
}

if err = gen.CommitOffsets(offsetStash); err == nil {
if err = gen.CommitOffsets(messagesToSend); err == nil {
return
}
}

return // err will not be nil
}

// offsetStash holds offsets by topic => partition => offset.
type offsetStash map[string]map[int]int64
// offsetStash holds offsets by topic => partition => offsetEntry.
type offsetEntry struct {
offset int64
generationId int32
}
type offsetStash map[string]map[int]offsetEntry

// merge updates the offsetStash with the offsets from the provided messages.
func (o offsetStash) merge(commits []commit) {
for _, c := range commits {
offsetsByPartition, ok := o[c.topic]
if !ok {
offsetsByPartition = map[int]int64{}
offsetsByPartition = map[int]offsetEntry{}
o[c.topic] = offsetsByPartition
}

if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset {
offsetsByPartition[c.partition] = c.offset
if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset.offset {
offsetsByPartition[c.partition] = offsetEntry{
offset: c.offset,
generationId: c.generationId,
}
}
}
}
Expand Down Expand Up @@ -329,7 +365,7 @@ func (r *Reader) run(cg *ConsumerGroup) {

r.stats.rebalances.observe(1)

r.subscribe(gen.Assignments)
r.subscribe(gen.ID, gen.Assignments)

gen.Start(func(ctx context.Context) {
r.commitLoop(ctx, gen)
Expand Down Expand Up @@ -819,7 +855,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
r.mutex.Lock()

if !r.closed && r.version == 0 {
r.start(r.getTopicPartitionOffset())
r.start(-1, r.getTopicPartitionOffset())
}

version := r.version
Expand Down Expand Up @@ -1040,7 +1076,7 @@ func (r *Reader) SetOffset(offset int64) error {
r.offset = offset

if r.version != 0 {
r.start(r.getTopicPartitionOffset())
r.start(-1, r.getTopicPartitionOffset())
}

r.activateReadLag()
Expand Down Expand Up @@ -1178,7 +1214,7 @@ func (r *Reader) readLag(ctx context.Context) {
}
}

func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
func (r *Reader) start(generationId int32, offsetsByPartition map[topicPartition]int64) {
if r.closed {
// don't start child reader if parent Reader is closed
return
Expand Down Expand Up @@ -1216,7 +1252,7 @@ func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {

// backwards-compatibility flags
offsetOutOfRangeError: r.config.OffsetOutOfRangeError,
}).run(ctx, offset)
}).run(ctx, generationId, offset)
}(ctx, key, offset, &r.join)
}
}
Expand Down Expand Up @@ -1253,7 +1289,7 @@ type readerMessage struct {
error error
}

func (r *reader) run(ctx context.Context, offset int64) {
func (r *reader) run(ctx context.Context, generationId int32, offset int64) {
// This is the reader's main loop, it only ends if the context is canceled
// and will keep attempting to reader messages otherwise.
//
Expand Down Expand Up @@ -1306,6 +1342,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
}
continue
}
conn.generationId = generationId

// Resetting the attempt counter ensures that if a failure occurs after
// a successful initialization we don't keep increasing the backoff
Expand Down
Loading