This could be a significant performance issue for some, so I think this fix
needs to go into 2.4.

Just my 2 cents.

-Bill

On Mon, Nov 11, 2019 at 5:57 PM John Roesler <j...@confluent.io> wrote:

> Ok, created: https://github.com/apache/kafka/pull/7681
>
> I'm on the fence about whether we should file this as a 2.4.0 blocker.
>
> It _sounds_ like this would have a pretty big impact on performance.
> I'm not convinced about any correctness problems, though, since the
> changelogs are only configured with retention when the stores also
> have the same retention.
>
> On the other hand, it doesn't look like a new regression introduced in
> 2.4, but it's a little hard to say where exactly the logical chain got
> broken, since there are quite a few code paths involved.
>
> WDYT?
> -John
>
> On Mon, Nov 11, 2019 at 4:46 PM John Roesler <j...@confluent.io> wrote:
> >
> > Hi all,
> >
> > I've just been looking over the code and Guozhang's reply... I think
> > that the reply is reasonable, but it seems like the code may not be
> > precisely implementing this logic.
> >
> > As an entry point, in `StreamThread#runOnce`:
> > If the state is `PARTITIONS_ASSIGNED`, we'll call
> > `taskManager.updateNewAndRestoringTasks()`.
> > If `active.allTasksRunning()`, we will invoke `assignStandbyPartitions()`
> > In `assignStandbyPartitions`, we get the offsets from
> > `standbyTask.checkpointedOffsets()` (as mentioned, this only reflects
> > the offsets as of the last time `StandbyTask#initializeStateStores()`
> > was called (during `AssignedTasks#initializeNewTasks()`) )
> > Then, we simply `restoreConsumer.seek(partition, offset)` to whatever
> > offset was there.
> >
> > We don't seem to ever call `restoreConsumer.resume()`, which I think
> > is what Guozhang was suggesting.
> >
> > So, in summary, it does look to me like the bug as reported from
> > Navinder is present. Just looking at the code flow, I'd guess that
> > `checkpointedOffsets()` was supposed to be an unmodifiable view onto
> > the stateMgr's checkpoint map. The code flow makes it hard to say at
> > what point this whole process broke down. I'll prepare a fix, and we
> > can just take it step-by-step to consider which released branches to
> > cherry-pick to.
> >
> > Thanks,
> > -John
> >
> >
> > On Sun, Nov 10, 2019 at 8:11 PM Navinder Brar
> > <navinder_b...@yahoo.com.invalid> wrote:
> > >
> > > Thanks Guozhang.
> > > The jira is filed: [KAFKA-9169] Standby Tasks point ask for incorrect
> offsets on resuming post suspension - ASF JIRA
> > >
> > > |
> > > |
> > > |  |
> > > [KAFKA-9169] Standby Tasks point ask for incorrect offsets on resuming
> p...
> > >
> > >
> > >  |
> > >
> > >  |
> > >
> > >  |
> > >
> > >
> > >
> > >
> > >     On Monday, 11 November, 2019, 03:10:37 am IST, Guozhang Wang <
> wangg...@gmail.com> wrote:
> > >
> > >  Could you file a JIRA report for this so that we can keep track of it
> and fix?
> > >
> > > Guozhang
> > > On Sun, Nov 10, 2019 at 1:39 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> > >
> > > If a standby task is suspended, it will write the checkpoint file
> again after flushing its state stores, and when it resumes it does not re
> initialize the position on the consumer and hence it is still the
> task-manager's responsibility to set the right starting offset from the
> latest checkpoint file. If we did not do that, that should still be a bug.
> > >
> > > Guozhang
> > > On Sat, Nov 9, 2019 at 11:33 AM Navinder Brar <navinder_b...@yahoo.com>
> wrote:
> > >
> > > Hi Guozhang,
> > > Thanks for the reply.
> > > So, if I understand it correctly. In versions where KIP-429 was not
> implemented and when we were suspending the standby tasks during rebalance
> and they were resumed post rebalance, they will be reading from the
> beginning of the offsets of changelog, since the will be reading from
> standbyTask.checkpointedOffsets() which was only updated during the first
> initialization.
> > > Regards,
> > > Navinder
> > >     On Sunday, 10 November, 2019, 12:50:39 am IST, Guozhang Wang <
> wangg...@gmail.com> wrote:
> > >
> > >  Hello Navinder,
> > >
> > > Sorry for the late reply and thanks for bringing this up. I think this
> is
> > > indeed a bug that needs to be fixed.
> > >
> > > The rationale behind was the following: for restoring active tasks and
> > > processing standby tasks, we are using the same consumer client within
> the
> > > thread (the restoreConsumer). And before ALL of the active tasks have
> > > completed restoration, the consumer would not get assigned to any of
> the
> > > standby tasks at all. So in a timeline it should be looking like this
> with
> > > a rebalance assuming KIP-429 is already in place:
> > >
> > > T0: rebalance triggered, some tasks gets revoked but some others may
> still
> > > be active;
> > > T0-T1: a subset of active tasks (via the main consumer) and all standby
> > > tasks (via the restore consumer) are still processing;
> > > T1: rebalance finished, some new tasks gets assigned, and now needs to
> be
> > > restored. Restore consumer re-assign to fetch from those restoring
> consumer
> > > only.
> > > T1-T2: the main consumer paused all partitions, hence no active tasks
> > > processing; also restore consumer only fetching for restoring tasks,
> and
> > > hence no standby tasks processing;
> > > T2: restoration completed, restore consumer reassigned to those standby
> > > tasks.
> > >
> > > Note in T1, the standby tasks are all still "running" but they just do
> not
> > > proceed any more since the consumer has switched to fetch other
> partitions;
> > > so at T2 when the consumer switch back it should just resume from
> where it
> > > has switched off.
> > >
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Nov 4, 2019 at 4:47 AM Navinder Brar
> > > <navinder_b...@yahoo.com.invalid> wrote:
> > >
> > > > Hi,
> > > > Please let me know if this is not the correct forum to ask this. But
> I
> > > > have a doubt, I was hoping someone can clear it for me.
> > > > In TaskManager:: updateNewAndRestoringTasks(), the
> > > > function assignStandbyPartitions() gets called for all the running
> standby
> > > > tasks where it populates the Map: checkpointedOffsets from the
> > > > standbyTask.checkpointedOffsets() which is only updated at the time
> of
> > > > initialization of a StandbyTask(i.e. in it's constructor). I have
> checked
> > > > and this goes way to 1.1 version when the rebalance protocol was old
> and
> > > > standby tasks were suspended during rebalance and then resumed on
> > > > assignment.
> > > > I want to know, why post resumption we were/are reading
> > > > standbyTask.checkpointedOffsets() to know the offset from where the
> standby
> > > > task should start running and not from stateMgr.checkpointed() which
> gets
> > > > updated on every commit to the checkpoint file. In the former case
> it's
> > > > always reading from the same offset, even those which it had already
> read
> > > > earlier and in cases where changelog topic has a retention time, it
> gives
> > > > offsetOutOfRange exception.
> > > > Regards,
> > > > Navinder
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
>

Reply via email to