Hi Sachin,

If it is a bug then please file a JIRA for it, too.

Thanks,
Damian

On Mon, 6 Mar 2017 at 11:23 Sachin Mittal <sjmit...@gmail.com> wrote:

> Ok that's great.
> So you have already fixed that issue.
>
> I have modified my PR to remove that change (which was done keeping
> 0.10.2.0 in mind).
>
> However the other issue is still valid.
>
> Please review that change. https://github.com/apache/kafka/pull/2642
>
>
> Thanks
> Sachin
>
>
> On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <damian....@gmail.com> wrote:
>
> > On trunk the CommitFailedException isn't thrown anymore. The
> commitOffsets
> > method doesn't throw an exception. It returns one if it was thrown. We
> used
> > to throw this exception during suspendTasksAndState, but we don't
> anymore.
> >
> > On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > > Hi
> > > On CommitFailedException at onPartitionsRevoked if it is thrown it gets
> > > assigned to rebalanceException.
> > > This causes the stream thread to shutdown. I am not sure how we can
> > resume
> > > the thread.
> > >
> > > Note thread is not in invalid state because because it already has been
> > > assigned new partitions and this exception happens when trying to
> revoke
> > > old partitions which have been moved to some other thread, so we need
> to
> > > swallow this exception at the StreanThread side too, just like we
> swallow
> > > it at ConsumerCoordinator.java
> > >
> > > Also I fixed this against code base 0.10.2.0 and the difference in that
> > vs
> > > trunk code is these lines
> > > 10.2.0
> > >        if (firstException.get() == null) {
> > >             firstException.set(commitOffsets());
> > >        }
> > >  vs trunk
> > >         if (firstException.get() == null) {
> > >             // TODO: currently commit failures will not be thrown to
> > users
> > >             // while suspending tasks; this need to be re-visit after
> > > KIP-98
> > >             commitOffsets();
> > >         }
> > > I am again not sure since this part is still a TODO, but looking at
> code
> > I
> > > see that commitOffsets can still throw the CommitFailedException which
> > > needs to be handled at onPartitionsRevoked.
> > >
> > > Hope this makes sense.
> > >
> > > On second issue, the deadlock is not caused by CommitFailedExceptio,
> but
> > > after fixing the deadlock we need to make sure thread does not die due
> to
> > > unhandled CommitFailedException at onPartitionsRevoked.
> > > The deadlock issue is like this.
> > > If a thread has two partitions and while processing partition one it
> > takes
> > > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted
> > > from the group and both partitions are now migrated to some other
> thread.
> > > Now when it tries to process the partition two it tries to get the lock
> > to
> > > rocks db. It won't get the lock since that partition is now moved to
> some
> > > other thread. So it keeps increasing the backoffTimeMs and keeps trying
> > to
> > > get the lock forever. This reaching a deadlock.
> > > To fix this we need some upper bound of the time limit till it tries to
> > get
> > > that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG,
> > > because if by that time it has not got the lock, we can see that this
> > > thread was evicted from the group and need to rejoin again to get new
> > > partitions.
> > >
> > > On JIRA issue I can create one and attach the part of logs where it
> keeps
> > > trying to get the lock with increasing backoffTimeM.
> > >
> > > Let me know if these makes sense. Right now this is the best way we
> could
> > > come up with to handle stream thread failures.
> > >
> > > Also on a side note I feel we need more resilient streams. If we have
> say
> > > configured our streams application with 4 threads and for whatever
> > reason a
> > > thread dies, then application should itself (or via some exposed
> hooks),
> > > allow to restart a new thread (because in Java I guess same thread
> cannot
> > > be restarted), so that number of threads always stay what one has
> > > configured.
> > > I think exposed hooks will be better option to do this.
> > >
> > > Thanks
> > > Sachin
> > >
> > >
> > >
> > >
> > > On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <matth...@confluent.io
> >
> > > wrote:
> > >
> > > > Sachin,
> > > >
> > > > thanks a lot for contributing!
> > > >
> > > > Right now, I am not sure if I understand the change. On
> > > > CommitFailedException, why can we just resume the thread? To me, it
> > > > seems that the thread will be in an invalid state and thus it's not
> > save
> > > > to just swallow the exception and keep going. Can you shed some
> light?
> > > >
> > > > And from my understanding, the deadlock is "caused" by the change
> from
> > > > above, right? So if it is save to swallow the exception, we should do
> > > > some "clean up" to avoid the deadlock in the first place, instead of
> > > > applying and additional timeout.
> > > >
> > > > Also, if this is a bug, we should have a JIRA.
> > > >
> > > > -Matthias
> > > >
> > > >
> > > > On 3/5/17 4:11 AM, Sachin Mittal wrote:
> > > > > Hi,
> > > > > Please find the new PR
> > > > > https://github.com/apache/kafka/pull/2642/
> > > > >
> > > > > I see that in trunk there has been change which is different from
> in
> > > > 10.2.0
> > > > >
> > > > > 10.2.0
> > > > >        if (firstException.get() == null) {
> > > > >             firstException.set(commitOffsets());
> > > > >        }
> > > > >  vs trunk
> > > > >         if (firstException.get() == null) {
> > > > >             // TODO: currently commit failures will not be thrown
> to
> > > > users
> > > > >             // while suspending tasks; this need to be re-visit
> after
> > > > KIP-98
> > > > >             commitOffsets();
> > > > >         }
> > > > > I am not sure in view of this is is my part of the fix still valid.
> > > Looks
> > > > > like it is still valid.
> > > > >
> > > > > Also on side note what is the policy of closing a branch that is
> just
> > > > > released.
> > > > >
> > > > > Since you have release 10.2.0 we are using that and that is why
> have
> > > made
> > > > > changes in that branch so that our changes just modify the needed
> > code
> > > > and
> > > > > we don't mess up the other released code.
> > > > >
> > > > > Is the new release released off the branch 10.2.0, if yes then you
> > > should
> > > > > not close it as there can be patch fixes on them.
> > > > >
> > > > > Or is the release always made off the branch trunk. In that case
> how
> > > can
> > > > we
> > > > > pick up the code on which the release binaries were created so when
> > we
> > > > > build the binary we have exactly same code as released one, plus
> any
> > > > > changes (we or someone else) makes on it.
> > > > >
> > > > > Also if a branch is closed, then perhaps we should delete it or
> mark
> > it
> > > > > closed or something.
> > > > >
> > > > > Please let us know how releases get created (off what codebase), so
> > we
> > > > are
> > > > > more exact in applying our changes to.
> > > > >
> > > > > Thanks
> > > > > Sachin
> > > > >
> > > > >
> > > > >
> > > > > On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <
> eno.there...@gmail.com
> > >
> > > > wrote:
> > > > >
> > > > >> Thanks Sachin, one thing before the review, 0.10.2 is closed now,
> > this
> > > > >> needs to target trunk.
> > > > >>
> > > > >> Thanks
> > > > >> Eno
> > > > >>> On 5 Mar 2017, at 09:10, Sachin Mittal <sjmit...@gmail.com>
> wrote:
> > > > >>>
> > > > >>> Please review the PR and let me know if this makes sense.
> > > > >>>
> > > > >>> https://github.com/apache/kafka/pull/2640
> > > > >>>
> > > > >>> Thanks
> > > > >>> Sachin
> > > > >>>
> > > > >>>
> > > > >>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <
> > eno.there...@gmail.com
> > > >
> > > > >> wrote:
> > > > >>>
> > > > >>>> Thanks Sachin for your contribution. Could you create a pull
> > request
> > > > out
> > > > >>>> of the commit (so we can add comments, and also so you are
> > > > acknowledged
> > > > >>>> properly for your contribution)?
> > > > >>>>
> > > > >>>> Thanks
> > > > >>>> Eno
> > > > >>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmit...@gmail.com>
> > wrote:
> > > > >>>>>
> > > > >>>>> Hi,
> > > > >>>>> So far in our experiment we have encountered 2 critical bugs.
> > > > >>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to
> > > > compute a
> > > > >>>>> cycle it gets evicted from group and rebalance takes place and
> it
> > > > gets
> > > > >>>> new
> > > > >>>>> assignment.
> > > > >>>>> However when this thread tries to commit offsets for the
> revoked
> > > > >>>> partitions
> > > > >>>>> in
> > > > >>>>> onPartitionsRevoked it will again throw the
> > CommitFailedException.
> > > > >>>>>
> > > > >>>>> This gets handled by ConsumerCoordinatorso there is no point to
> > > > assign
> > > > >>>> this
> > > > >>>>> exception to
> > > > >>>>> rebalanceException in StreamThread and stop it. It has already
> > been
> > > > >>>>> assigned new partitions and it can continue.
> > > > >>>>>
> > > > >>>>> So as fix in case on CommitFailedException I am not killing the
> > > > >>>> StreamThrea.
> > > > >>>>>
> > > > >>>>> 2. Next we see a deadlock state when to process a task it takes
> > > > longer
> > > > >>>>> than MAX_POLL_INTERVAL_MS_CONFIG
> > > > >>>>> time. Then this threads partitions are assigned to some other
> > > thread
> > > > >>>>> including rocksdb lock. When it tries to process the next task
> it
> > > > >> cannot
> > > > >>>>> get rocks db lock and simply keeps waiting for that lock
> forever.
> > > > >>>>>
> > > > >>>>> in retryWithBackoff for AbstractTaskCreator we have a
> > > backoffTimeMs =
> > > > >>>> 50L.
> > > > >>>>> If it does not get lock the we simply increase the time by 10x
> > and
> > > > keep
> > > > >>>>> trying inside the while true loop.
> > > > >>>>>
> > > > >>>>> We need to have a upper bound for this backoffTimeM. If the
> time
> > is
> > > > >>>> greater
> > > > >>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the
> > lock
> > > > >> means
> > > > >>>>> this thread's partitions are moved somewhere else and it may
> not
> > > get
> > > > >> the
> > > > >>>>> lock again.
> > > > >>>>>
> > > > >>>>> So I have added an upper bound check in that while loop.
> > > > >>>>>
> > > > >>>>> The commits are here:
> > > > >>>>> https://github.com/sjmittal/kafka/commit/
> > > > >> 6f04327c890c58cab9b1ae108af4ce
> > > > >>>> 5c4e3b89a1
> > > > >>>>>
> > > > >>>>> please review and if you feel they make sense, please merge it
> to
> > > > main
> > > > >>>>> branch.
> > > > >>>>>
> > > > >>>>> Thanks
> > > > >>>>> Sachin
> > > > >>>>
> > > > >>>>
> > > > >>
> > > > >>
> > > > >
> > > >
> > > >
> > >
> >
>

Reply via email to