This could be a significant performance issue for some, so I think this fix needs to go into 2.4.
Just my 2 cents. -Bill On Mon, Nov 11, 2019 at 5:57 PM John Roesler <j...@confluent.io> wrote: > Ok, created: https://github.com/apache/kafka/pull/7681 > > I'm on the fence about whether we should file this as a 2.4.0 blocker. > > It _sounds_ like this would have a pretty big impact on performance. > I'm not convinced about any correctness problems, though, since the > changelogs are only configured with retention when the stores also > have the same retention. > > On the other hand, it doesn't look like a new regression introduced in > 2.4, but it's a little hard to say where exactly the logical chain got > broken, since there are quite a few code paths involved. > > WDYT? > -John > > On Mon, Nov 11, 2019 at 4:46 PM John Roesler <j...@confluent.io> wrote: > > > > Hi all, > > > > I've just been looking over the code and Guozhang's reply... I think > > that the reply is reasonable, but it seems like the code may not be > > precisely implementing this logic. > > > > As an entry point, in `StreamThread#runOnce`: > > If the state is `PARTITIONS_ASSIGNED`, we'll call > > `taskManager.updateNewAndRestoringTasks()`. > > If `active.allTasksRunning()`, we will invoke `assignStandbyPartitions()` > > In `assignStandbyPartitions`, we get the offsets from > > `standbyTask.checkpointedOffsets()` (as mentioned, this only reflects > > the offsets as of the last time `StandbyTask#initializeStateStores()` > > was called (during `AssignedTasks#initializeNewTasks()`) ) > > Then, we simply `restoreConsumer.seek(partition, offset)` to whatever > > offset was there. > > > > We don't seem to ever call `restoreConsumer.resume()`, which I think > > is what Guozhang was suggesting. > > > > So, in summary, it does look to me like the bug as reported from > > Navinder is present. Just looking at the code flow, I'd guess that > > `checkpointedOffsets()` was supposed to be an unmodifiable view onto > > the stateMgr's checkpoint map. The code flow makes it hard to say at > > what point this whole process broke down. I'll prepare a fix, and we > > can just take it step-by-step to consider which released branches to > > cherry-pick to. > > > > Thanks, > > -John > > > > > > On Sun, Nov 10, 2019 at 8:11 PM Navinder Brar > > <navinder_b...@yahoo.com.invalid> wrote: > > > > > > Thanks Guozhang. > > > The jira is filed: [KAFKA-9169] Standby Tasks point ask for incorrect > offsets on resuming post suspension - ASF JIRA > > > > > > | > > > | > > > | | > > > [KAFKA-9169] Standby Tasks point ask for incorrect offsets on resuming > p... > > > > > > > > > | > > > > > > | > > > > > > | > > > > > > > > > > > > > > > On Monday, 11 November, 2019, 03:10:37 am IST, Guozhang Wang < > wangg...@gmail.com> wrote: > > > > > > Could you file a JIRA report for this so that we can keep track of it > and fix? > > > > > > Guozhang > > > On Sun, Nov 10, 2019 at 1:39 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > > If a standby task is suspended, it will write the checkpoint file > again after flushing its state stores, and when it resumes it does not re > initialize the position on the consumer and hence it is still the > task-manager's responsibility to set the right starting offset from the > latest checkpoint file. If we did not do that, that should still be a bug. > > > > > > Guozhang > > > On Sat, Nov 9, 2019 at 11:33 AM Navinder Brar <navinder_b...@yahoo.com> > wrote: > > > > > > Hi Guozhang, > > > Thanks for the reply. > > > So, if I understand it correctly. In versions where KIP-429 was not > implemented and when we were suspending the standby tasks during rebalance > and they were resumed post rebalance, they will be reading from the > beginning of the offsets of changelog, since the will be reading from > standbyTask.checkpointedOffsets() which was only updated during the first > initialization. > > > Regards, > > > Navinder > > > On Sunday, 10 November, 2019, 12:50:39 am IST, Guozhang Wang < > wangg...@gmail.com> wrote: > > > > > > Hello Navinder, > > > > > > Sorry for the late reply and thanks for bringing this up. I think this > is > > > indeed a bug that needs to be fixed. > > > > > > The rationale behind was the following: for restoring active tasks and > > > processing standby tasks, we are using the same consumer client within > the > > > thread (the restoreConsumer). And before ALL of the active tasks have > > > completed restoration, the consumer would not get assigned to any of > the > > > standby tasks at all. So in a timeline it should be looking like this > with > > > a rebalance assuming KIP-429 is already in place: > > > > > > T0: rebalance triggered, some tasks gets revoked but some others may > still > > > be active; > > > T0-T1: a subset of active tasks (via the main consumer) and all standby > > > tasks (via the restore consumer) are still processing; > > > T1: rebalance finished, some new tasks gets assigned, and now needs to > be > > > restored. Restore consumer re-assign to fetch from those restoring > consumer > > > only. > > > T1-T2: the main consumer paused all partitions, hence no active tasks > > > processing; also restore consumer only fetching for restoring tasks, > and > > > hence no standby tasks processing; > > > T2: restoration completed, restore consumer reassigned to those standby > > > tasks. > > > > > > Note in T1, the standby tasks are all still "running" but they just do > not > > > proceed any more since the consumer has switched to fetch other > partitions; > > > so at T2 when the consumer switch back it should just resume from > where it > > > has switched off. > > > > > > > > > Guozhang > > > > > > > > > On Mon, Nov 4, 2019 at 4:47 AM Navinder Brar > > > <navinder_b...@yahoo.com.invalid> wrote: > > > > > > > Hi, > > > > Please let me know if this is not the correct forum to ask this. But > I > > > > have a doubt, I was hoping someone can clear it for me. > > > > In TaskManager:: updateNewAndRestoringTasks(), the > > > > function assignStandbyPartitions() gets called for all the running > standby > > > > tasks where it populates the Map: checkpointedOffsets from the > > > > standbyTask.checkpointedOffsets() which is only updated at the time > of > > > > initialization of a StandbyTask(i.e. in it's constructor). I have > checked > > > > and this goes way to 1.1 version when the rebalance protocol was old > and > > > > standby tasks were suspended during rebalance and then resumed on > > > > assignment. > > > > I want to know, why post resumption we were/are reading > > > > standbyTask.checkpointedOffsets() to know the offset from where the > standby > > > > task should start running and not from stateMgr.checkpointed() which > gets > > > > updated on every commit to the checkpoint file. In the former case > it's > > > > always reading from the same offset, even those which it had already > read > > > > earlier and in cases where changelog topic has a retention time, it > gives > > > > offsetOutOfRange exception. > > > > Regards, > > > > Navinder > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > > > > > -- > > > -- Guozhang > > > >