Ok that's great.
So you have already fixed that issue.

I have modified my PR to remove that change (which was done keeping
0.10.2.0 in mind).

However the other issue is still valid.

Please review that change. https://github.com/apache/kafka/pull/2642


Thanks
Sachin


On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <damian....@gmail.com> wrote:

> On trunk the CommitFailedException isn't thrown anymore. The commitOffsets
> method doesn't throw an exception. It returns one if it was thrown. We used
> to throw this exception during suspendTasksAndState, but we don't anymore.
>
> On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sjmit...@gmail.com> wrote:
>
> > Hi
> > On CommitFailedException at onPartitionsRevoked if it is thrown it gets
> > assigned to rebalanceException.
> > This causes the stream thread to shutdown. I am not sure how we can
> resume
> > the thread.
> >
> > Note thread is not in invalid state because because it already has been
> > assigned new partitions and this exception happens when trying to revoke
> > old partitions which have been moved to some other thread, so we need to
> > swallow this exception at the StreanThread side too, just like we swallow
> > it at ConsumerCoordinator.java
> >
> > Also I fixed this against code base 0.10.2.0 and the difference in that
> vs
> > trunk code is these lines
> > 10.2.0
> >        if (firstException.get() == null) {
> >             firstException.set(commitOffsets());
> >        }
> >  vs trunk
> >         if (firstException.get() == null) {
> >             // TODO: currently commit failures will not be thrown to
> users
> >             // while suspending tasks; this need to be re-visit after
> > KIP-98
> >             commitOffsets();
> >         }
> > I am again not sure since this part is still a TODO, but looking at code
> I
> > see that commitOffsets can still throw the CommitFailedException which
> > needs to be handled at onPartitionsRevoked.
> >
> > Hope this makes sense.
> >
> > On second issue, the deadlock is not caused by CommitFailedExceptio, but
> > after fixing the deadlock we need to make sure thread does not die due to
> > unhandled CommitFailedException at onPartitionsRevoked.
> > The deadlock issue is like this.
> > If a thread has two partitions and while processing partition one it
> takes
> > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted
> > from the group and both partitions are now migrated to some other thread.
> > Now when it tries to process the partition two it tries to get the lock
> to
> > rocks db. It won't get the lock since that partition is now moved to some
> > other thread. So it keeps increasing the backoffTimeMs and keeps trying
> to
> > get the lock forever. This reaching a deadlock.
> > To fix this we need some upper bound of the time limit till it tries to
> get
> > that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG,
> > because if by that time it has not got the lock, we can see that this
> > thread was evicted from the group and need to rejoin again to get new
> > partitions.
> >
> > On JIRA issue I can create one and attach the part of logs where it keeps
> > trying to get the lock with increasing backoffTimeM.
> >
> > Let me know if these makes sense. Right now this is the best way we could
> > come up with to handle stream thread failures.
> >
> > Also on a side note I feel we need more resilient streams. If we have say
> > configured our streams application with 4 threads and for whatever
> reason a
> > thread dies, then application should itself (or via some exposed hooks),
> > allow to restart a new thread (because in Java I guess same thread cannot
> > be restarted), so that number of threads always stay what one has
> > configured.
> > I think exposed hooks will be better option to do this.
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> > On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <matth...@confluent.io>
> > wrote:
> >
> > > Sachin,
> > >
> > > thanks a lot for contributing!
> > >
> > > Right now, I am not sure if I understand the change. On
> > > CommitFailedException, why can we just resume the thread? To me, it
> > > seems that the thread will be in an invalid state and thus it's not
> save
> > > to just swallow the exception and keep going. Can you shed some light?
> > >
> > > And from my understanding, the deadlock is "caused" by the change from
> > > above, right? So if it is save to swallow the exception, we should do
> > > some "clean up" to avoid the deadlock in the first place, instead of
> > > applying and additional timeout.
> > >
> > > Also, if this is a bug, we should have a JIRA.
> > >
> > > -Matthias
> > >
> > >
> > > On 3/5/17 4:11 AM, Sachin Mittal wrote:
> > > > Hi,
> > > > Please find the new PR
> > > > https://github.com/apache/kafka/pull/2642/
> > > >
> > > > I see that in trunk there has been change which is different from in
> > > 10.2.0
> > > >
> > > > 10.2.0
> > > >        if (firstException.get() == null) {
> > > >             firstException.set(commitOffsets());
> > > >        }
> > > >  vs trunk
> > > >         if (firstException.get() == null) {
> > > >             // TODO: currently commit failures will not be thrown to
> > > users
> > > >             // while suspending tasks; this need to be re-visit after
> > > KIP-98
> > > >             commitOffsets();
> > > >         }
> > > > I am not sure in view of this is is my part of the fix still valid.
> > Looks
> > > > like it is still valid.
> > > >
> > > > Also on side note what is the policy of closing a branch that is just
> > > > released.
> > > >
> > > > Since you have release 10.2.0 we are using that and that is why have
> > made
> > > > changes in that branch so that our changes just modify the needed
> code
> > > and
> > > > we don't mess up the other released code.
> > > >
> > > > Is the new release released off the branch 10.2.0, if yes then you
> > should
> > > > not close it as there can be patch fixes on them.
> > > >
> > > > Or is the release always made off the branch trunk. In that case how
> > can
> > > we
> > > > pick up the code on which the release binaries were created so when
> we
> > > > build the binary we have exactly same code as released one, plus any
> > > > changes (we or someone else) makes on it.
> > > >
> > > > Also if a branch is closed, then perhaps we should delete it or mark
> it
> > > > closed or something.
> > > >
> > > > Please let us know how releases get created (off what codebase), so
> we
> > > are
> > > > more exact in applying our changes to.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > >
> > > > On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <eno.there...@gmail.com
> >
> > > wrote:
> > > >
> > > >> Thanks Sachin, one thing before the review, 0.10.2 is closed now,
> this
> > > >> needs to target trunk.
> > > >>
> > > >> Thanks
> > > >> Eno
> > > >>> On 5 Mar 2017, at 09:10, Sachin Mittal <sjmit...@gmail.com> wrote:
> > > >>>
> > > >>> Please review the PR and let me know if this makes sense.
> > > >>>
> > > >>> https://github.com/apache/kafka/pull/2640
> > > >>>
> > > >>> Thanks
> > > >>> Sachin
> > > >>>
> > > >>>
> > > >>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <
> eno.there...@gmail.com
> > >
> > > >> wrote:
> > > >>>
> > > >>>> Thanks Sachin for your contribution. Could you create a pull
> request
> > > out
> > > >>>> of the commit (so we can add comments, and also so you are
> > > acknowledged
> > > >>>> properly for your contribution)?
> > > >>>>
> > > >>>> Thanks
> > > >>>> Eno
> > > >>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmit...@gmail.com>
> wrote:
> > > >>>>>
> > > >>>>> Hi,
> > > >>>>> So far in our experiment we have encountered 2 critical bugs.
> > > >>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to
> > > compute a
> > > >>>>> cycle it gets evicted from group and rebalance takes place and it
> > > gets
> > > >>>> new
> > > >>>>> assignment.
> > > >>>>> However when this thread tries to commit offsets for the revoked
> > > >>>> partitions
> > > >>>>> in
> > > >>>>> onPartitionsRevoked it will again throw the
> CommitFailedException.
> > > >>>>>
> > > >>>>> This gets handled by ConsumerCoordinatorso there is no point to
> > > assign
> > > >>>> this
> > > >>>>> exception to
> > > >>>>> rebalanceException in StreamThread and stop it. It has already
> been
> > > >>>>> assigned new partitions and it can continue.
> > > >>>>>
> > > >>>>> So as fix in case on CommitFailedException I am not killing the
> > > >>>> StreamThrea.
> > > >>>>>
> > > >>>>> 2. Next we see a deadlock state when to process a task it takes
> > > longer
> > > >>>>> than MAX_POLL_INTERVAL_MS_CONFIG
> > > >>>>> time. Then this threads partitions are assigned to some other
> > thread
> > > >>>>> including rocksdb lock. When it tries to process the next task it
> > > >> cannot
> > > >>>>> get rocks db lock and simply keeps waiting for that lock forever.
> > > >>>>>
> > > >>>>> in retryWithBackoff for AbstractTaskCreator we have a
> > backoffTimeMs =
> > > >>>> 50L.
> > > >>>>> If it does not get lock the we simply increase the time by 10x
> and
> > > keep
> > > >>>>> trying inside the while true loop.
> > > >>>>>
> > > >>>>> We need to have a upper bound for this backoffTimeM. If the time
> is
> > > >>>> greater
> > > >>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the
> lock
> > > >> means
> > > >>>>> this thread's partitions are moved somewhere else and it may not
> > get
> > > >> the
> > > >>>>> lock again.
> > > >>>>>
> > > >>>>> So I have added an upper bound check in that while loop.
> > > >>>>>
> > > >>>>> The commits are here:
> > > >>>>> https://github.com/sjmittal/kafka/commit/
> > > >> 6f04327c890c58cab9b1ae108af4ce
> > > >>>> 5c4e3b89a1
> > > >>>>>
> > > >>>>> please review and if you feel they make sense, please merge it to
> > > main
> > > >>>>> branch.
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>> Sachin
> > > >>>>
> > > >>>>
> > > >>
> > > >>
> > > >
> > >
> > >
> >
>

Reply via email to