Ok that's great. So you have already fixed that issue. I have modified my PR to remove that change (which was done keeping 0.10.2.0 in mind).
However the other issue is still valid. Please review that change. https://github.com/apache/kafka/pull/2642 Thanks Sachin On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <damian....@gmail.com> wrote: > On trunk the CommitFailedException isn't thrown anymore. The commitOffsets > method doesn't throw an exception. It returns one if it was thrown. We used > to throw this exception during suspendTasksAndState, but we don't anymore. > > On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sjmit...@gmail.com> wrote: > > > Hi > > On CommitFailedException at onPartitionsRevoked if it is thrown it gets > > assigned to rebalanceException. > > This causes the stream thread to shutdown. I am not sure how we can > resume > > the thread. > > > > Note thread is not in invalid state because because it already has been > > assigned new partitions and this exception happens when trying to revoke > > old partitions which have been moved to some other thread, so we need to > > swallow this exception at the StreanThread side too, just like we swallow > > it at ConsumerCoordinator.java > > > > Also I fixed this against code base 0.10.2.0 and the difference in that > vs > > trunk code is these lines > > 10.2.0 > > if (firstException.get() == null) { > > firstException.set(commitOffsets()); > > } > > vs trunk > > if (firstException.get() == null) { > > // TODO: currently commit failures will not be thrown to > users > > // while suspending tasks; this need to be re-visit after > > KIP-98 > > commitOffsets(); > > } > > I am again not sure since this part is still a TODO, but looking at code > I > > see that commitOffsets can still throw the CommitFailedException which > > needs to be handled at onPartitionsRevoked. > > > > Hope this makes sense. > > > > On second issue, the deadlock is not caused by CommitFailedExceptio, but > > after fixing the deadlock we need to make sure thread does not die due to > > unhandled CommitFailedException at onPartitionsRevoked. > > The deadlock issue is like this. > > If a thread has two partitions and while processing partition one it > takes > > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted > > from the group and both partitions are now migrated to some other thread. > > Now when it tries to process the partition two it tries to get the lock > to > > rocks db. It won't get the lock since that partition is now moved to some > > other thread. So it keeps increasing the backoffTimeMs and keeps trying > to > > get the lock forever. This reaching a deadlock. > > To fix this we need some upper bound of the time limit till it tries to > get > > that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG, > > because if by that time it has not got the lock, we can see that this > > thread was evicted from the group and need to rejoin again to get new > > partitions. > > > > On JIRA issue I can create one and attach the part of logs where it keeps > > trying to get the lock with increasing backoffTimeM. > > > > Let me know if these makes sense. Right now this is the best way we could > > come up with to handle stream thread failures. > > > > Also on a side note I feel we need more resilient streams. If we have say > > configured our streams application with 4 threads and for whatever > reason a > > thread dies, then application should itself (or via some exposed hooks), > > allow to restart a new thread (because in Java I guess same thread cannot > > be restarted), so that number of threads always stay what one has > > configured. > > I think exposed hooks will be better option to do this. > > > > Thanks > > Sachin > > > > > > > > > > On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <matth...@confluent.io> > > wrote: > > > > > Sachin, > > > > > > thanks a lot for contributing! > > > > > > Right now, I am not sure if I understand the change. On > > > CommitFailedException, why can we just resume the thread? To me, it > > > seems that the thread will be in an invalid state and thus it's not > save > > > to just swallow the exception and keep going. Can you shed some light? > > > > > > And from my understanding, the deadlock is "caused" by the change from > > > above, right? So if it is save to swallow the exception, we should do > > > some "clean up" to avoid the deadlock in the first place, instead of > > > applying and additional timeout. > > > > > > Also, if this is a bug, we should have a JIRA. > > > > > > -Matthias > > > > > > > > > On 3/5/17 4:11 AM, Sachin Mittal wrote: > > > > Hi, > > > > Please find the new PR > > > > https://github.com/apache/kafka/pull/2642/ > > > > > > > > I see that in trunk there has been change which is different from in > > > 10.2.0 > > > > > > > > 10.2.0 > > > > if (firstException.get() == null) { > > > > firstException.set(commitOffsets()); > > > > } > > > > vs trunk > > > > if (firstException.get() == null) { > > > > // TODO: currently commit failures will not be thrown to > > > users > > > > // while suspending tasks; this need to be re-visit after > > > KIP-98 > > > > commitOffsets(); > > > > } > > > > I am not sure in view of this is is my part of the fix still valid. > > Looks > > > > like it is still valid. > > > > > > > > Also on side note what is the policy of closing a branch that is just > > > > released. > > > > > > > > Since you have release 10.2.0 we are using that and that is why have > > made > > > > changes in that branch so that our changes just modify the needed > code > > > and > > > > we don't mess up the other released code. > > > > > > > > Is the new release released off the branch 10.2.0, if yes then you > > should > > > > not close it as there can be patch fixes on them. > > > > > > > > Or is the release always made off the branch trunk. In that case how > > can > > > we > > > > pick up the code on which the release binaries were created so when > we > > > > build the binary we have exactly same code as released one, plus any > > > > changes (we or someone else) makes on it. > > > > > > > > Also if a branch is closed, then perhaps we should delete it or mark > it > > > > closed or something. > > > > > > > > Please let us know how releases get created (off what codebase), so > we > > > are > > > > more exact in applying our changes to. > > > > > > > > Thanks > > > > Sachin > > > > > > > > > > > > > > > > On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <eno.there...@gmail.com > > > > > wrote: > > > > > > > >> Thanks Sachin, one thing before the review, 0.10.2 is closed now, > this > > > >> needs to target trunk. > > > >> > > > >> Thanks > > > >> Eno > > > >>> On 5 Mar 2017, at 09:10, Sachin Mittal <sjmit...@gmail.com> wrote: > > > >>> > > > >>> Please review the PR and let me know if this makes sense. > > > >>> > > > >>> https://github.com/apache/kafka/pull/2640 > > > >>> > > > >>> Thanks > > > >>> Sachin > > > >>> > > > >>> > > > >>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska < > eno.there...@gmail.com > > > > > > >> wrote: > > > >>> > > > >>>> Thanks Sachin for your contribution. Could you create a pull > request > > > out > > > >>>> of the commit (so we can add comments, and also so you are > > > acknowledged > > > >>>> properly for your contribution)? > > > >>>> > > > >>>> Thanks > > > >>>> Eno > > > >>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmit...@gmail.com> > wrote: > > > >>>>> > > > >>>>> Hi, > > > >>>>> So far in our experiment we have encountered 2 critical bugs. > > > >>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to > > > compute a > > > >>>>> cycle it gets evicted from group and rebalance takes place and it > > > gets > > > >>>> new > > > >>>>> assignment. > > > >>>>> However when this thread tries to commit offsets for the revoked > > > >>>> partitions > > > >>>>> in > > > >>>>> onPartitionsRevoked it will again throw the > CommitFailedException. > > > >>>>> > > > >>>>> This gets handled by ConsumerCoordinatorso there is no point to > > > assign > > > >>>> this > > > >>>>> exception to > > > >>>>> rebalanceException in StreamThread and stop it. It has already > been > > > >>>>> assigned new partitions and it can continue. > > > >>>>> > > > >>>>> So as fix in case on CommitFailedException I am not killing the > > > >>>> StreamThrea. > > > >>>>> > > > >>>>> 2. Next we see a deadlock state when to process a task it takes > > > longer > > > >>>>> than MAX_POLL_INTERVAL_MS_CONFIG > > > >>>>> time. Then this threads partitions are assigned to some other > > thread > > > >>>>> including rocksdb lock. When it tries to process the next task it > > > >> cannot > > > >>>>> get rocks db lock and simply keeps waiting for that lock forever. > > > >>>>> > > > >>>>> in retryWithBackoff for AbstractTaskCreator we have a > > backoffTimeMs = > > > >>>> 50L. > > > >>>>> If it does not get lock the we simply increase the time by 10x > and > > > keep > > > >>>>> trying inside the while true loop. > > > >>>>> > > > >>>>> We need to have a upper bound for this backoffTimeM. If the time > is > > > >>>> greater > > > >>>>> than MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the > lock > > > >> means > > > >>>>> this thread's partitions are moved somewhere else and it may not > > get > > > >> the > > > >>>>> lock again. > > > >>>>> > > > >>>>> So I have added an upper bound check in that while loop. > > > >>>>> > > > >>>>> The commits are here: > > > >>>>> https://github.com/sjmittal/kafka/commit/ > > > >> 6f04327c890c58cab9b1ae108af4ce > > > >>>> 5c4e3b89a1 > > > >>>>> > > > >>>>> please review and if you feel they make sense, please merge it to > > > main > > > >>>>> branch. > > > >>>>> > > > >>>>> Thanks > > > >>>>> Sachin > > > >>>> > > > >>>> > > > >> > > > >> > > > > > > > > > > > > >