Thanks for looking at it, Bill.

I initially agreed with you, but Manikumar asked me to check if it's
really a regression before calling it a blocker. I tested 2.3 and
found the same (buggy) behavior, so I don't think we can call it a
regression, and therefore, it's also not a blocker.

I'm still working on the test, which is pretty tricky to write well,
since so many components inter-operate to produce the behavior. I
still want to get this one fixed asap.

Thanks,
-John

On Tue, Nov 12, 2019 at 9:50 AM Bill Bejeck <bbej...@gmail.com> wrote:
>
> This could be a significant performance issue for some, so I think this fix
> needs to go into 2.4.
>
> Just my 2 cents.
>
> -Bill
>
> On Mon, Nov 11, 2019 at 5:57 PM John Roesler <j...@confluent.io> wrote:
>
> > Ok, created: https://github.com/apache/kafka/pull/7681
> >
> > I'm on the fence about whether we should file this as a 2.4.0 blocker.
> >
> > It _sounds_ like this would have a pretty big impact on performance.
> > I'm not convinced about any correctness problems, though, since the
> > changelogs are only configured with retention when the stores also
> > have the same retention.
> >
> > On the other hand, it doesn't look like a new regression introduced in
> > 2.4, but it's a little hard to say where exactly the logical chain got
> > broken, since there are quite a few code paths involved.
> >
> > WDYT?
> > -John
> >
> > On Mon, Nov 11, 2019 at 4:46 PM John Roesler <j...@confluent.io> wrote:
> > >
> > > Hi all,
> > >
> > > I've just been looking over the code and Guozhang's reply... I think
> > > that the reply is reasonable, but it seems like the code may not be
> > > precisely implementing this logic.
> > >
> > > As an entry point, in `StreamThread#runOnce`:
> > > If the state is `PARTITIONS_ASSIGNED`, we'll call
> > > `taskManager.updateNewAndRestoringTasks()`.
> > > If `active.allTasksRunning()`, we will invoke `assignStandbyPartitions()`
> > > In `assignStandbyPartitions`, we get the offsets from
> > > `standbyTask.checkpointedOffsets()` (as mentioned, this only reflects
> > > the offsets as of the last time `StandbyTask#initializeStateStores()`
> > > was called (during `AssignedTasks#initializeNewTasks()`) )
> > > Then, we simply `restoreConsumer.seek(partition, offset)` to whatever
> > > offset was there.
> > >
> > > We don't seem to ever call `restoreConsumer.resume()`, which I think
> > > is what Guozhang was suggesting.
> > >
> > > So, in summary, it does look to me like the bug as reported from
> > > Navinder is present. Just looking at the code flow, I'd guess that
> > > `checkpointedOffsets()` was supposed to be an unmodifiable view onto
> > > the stateMgr's checkpoint map. The code flow makes it hard to say at
> > > what point this whole process broke down. I'll prepare a fix, and we
> > > can just take it step-by-step to consider which released branches to
> > > cherry-pick to.
> > >
> > > Thanks,
> > > -John
> > >
> > >
> > > On Sun, Nov 10, 2019 at 8:11 PM Navinder Brar
> > > <navinder_b...@yahoo.com.invalid> wrote:
> > > >
> > > > Thanks Guozhang.
> > > > The jira is filed: [KAFKA-9169] Standby Tasks point ask for incorrect
> > offsets on resuming post suspension - ASF JIRA
> > > >
> > > > |
> > > > |
> > > > |  |
> > > > [KAFKA-9169] Standby Tasks point ask for incorrect offsets on resuming
> > p...
> > > >
> > > >
> > > >  |
> > > >
> > > >  |
> > > >
> > > >  |
> > > >
> > > >
> > > >
> > > >
> > > >     On Monday, 11 November, 2019, 03:10:37 am IST, Guozhang Wang <
> > wangg...@gmail.com> wrote:
> > > >
> > > >  Could you file a JIRA report for this so that we can keep track of it
> > and fix?
> > > >
> > > > Guozhang
> > > > On Sun, Nov 10, 2019 at 1:39 PM Guozhang Wang <wangg...@gmail.com>
> > wrote:
> > > >
> > > > If a standby task is suspended, it will write the checkpoint file
> > again after flushing its state stores, and when it resumes it does not re
> > initialize the position on the consumer and hence it is still the
> > task-manager's responsibility to set the right starting offset from the
> > latest checkpoint file. If we did not do that, that should still be a bug.
> > > >
> > > > Guozhang
> > > > On Sat, Nov 9, 2019 at 11:33 AM Navinder Brar <navinder_b...@yahoo.com>
> > wrote:
> > > >
> > > > Hi Guozhang,
> > > > Thanks for the reply.
> > > > So, if I understand it correctly. In versions where KIP-429 was not
> > implemented and when we were suspending the standby tasks during rebalance
> > and they were resumed post rebalance, they will be reading from the
> > beginning of the offsets of changelog, since the will be reading from
> > standbyTask.checkpointedOffsets() which was only updated during the first
> > initialization.
> > > > Regards,
> > > > Navinder
> > > >     On Sunday, 10 November, 2019, 12:50:39 am IST, Guozhang Wang <
> > wangg...@gmail.com> wrote:
> > > >
> > > >  Hello Navinder,
> > > >
> > > > Sorry for the late reply and thanks for bringing this up. I think this
> > is
> > > > indeed a bug that needs to be fixed.
> > > >
> > > > The rationale behind was the following: for restoring active tasks and
> > > > processing standby tasks, we are using the same consumer client within
> > the
> > > > thread (the restoreConsumer). And before ALL of the active tasks have
> > > > completed restoration, the consumer would not get assigned to any of
> > the
> > > > standby tasks at all. So in a timeline it should be looking like this
> > with
> > > > a rebalance assuming KIP-429 is already in place:
> > > >
> > > > T0: rebalance triggered, some tasks gets revoked but some others may
> > still
> > > > be active;
> > > > T0-T1: a subset of active tasks (via the main consumer) and all standby
> > > > tasks (via the restore consumer) are still processing;
> > > > T1: rebalance finished, some new tasks gets assigned, and now needs to
> > be
> > > > restored. Restore consumer re-assign to fetch from those restoring
> > consumer
> > > > only.
> > > > T1-T2: the main consumer paused all partitions, hence no active tasks
> > > > processing; also restore consumer only fetching for restoring tasks,
> > and
> > > > hence no standby tasks processing;
> > > > T2: restoration completed, restore consumer reassigned to those standby
> > > > tasks.
> > > >
> > > > Note in T1, the standby tasks are all still "running" but they just do
> > not
> > > > proceed any more since the consumer has switched to fetch other
> > partitions;
> > > > so at T2 when the consumer switch back it should just resume from
> > where it
> > > > has switched off.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > >
> > > > On Mon, Nov 4, 2019 at 4:47 AM Navinder Brar
> > > > <navinder_b...@yahoo.com.invalid> wrote:
> > > >
> > > > > Hi,
> > > > > Please let me know if this is not the correct forum to ask this. But
> > I
> > > > > have a doubt, I was hoping someone can clear it for me.
> > > > > In TaskManager:: updateNewAndRestoringTasks(), the
> > > > > function assignStandbyPartitions() gets called for all the running
> > standby
> > > > > tasks where it populates the Map: checkpointedOffsets from the
> > > > > standbyTask.checkpointedOffsets() which is only updated at the time
> > of
> > > > > initialization of a StandbyTask(i.e. in it's constructor). I have
> > checked
> > > > > and this goes way to 1.1 version when the rebalance protocol was old
> > and
> > > > > standby tasks were suspended during rebalance and then resumed on
> > > > > assignment.
> > > > > I want to know, why post resumption we were/are reading
> > > > > standbyTask.checkpointedOffsets() to know the offset from where the
> > standby
> > > > > task should start running and not from stateMgr.checkpointed() which
> > gets
> > > > > updated on every commit to the checkpoint file. In the former case
> > it's
> > > > > always reading from the same offset, even those which it had already
> > read
> > > > > earlier and in cases where changelog topic has a retention time, it
> > gives
> > > > > offsetOutOfRange exception.
> > > > > Regards,
> > > > > Navinder
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> >

Reply via email to