Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848
On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy <damian....@gmail.com> wrote: > Hi Sachin, > > If it is a bug then please file a JIRA for it, too. > > Thanks, > Damian > > On Mon, 6 Mar 2017 at 11:23 Sachin Mittal <sjmit...@gmail.com> wrote: > > > Ok that's great. > > So you have already fixed that issue. > > > > I have modified my PR to remove that change (which was done keeping > > 0.10.2.0 in mind). > > > > However the other issue is still valid. > > > > Please review that change. https://github.com/apache/kafka/pull/2642 > > > > > > Thanks > > Sachin > > > > > > On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <damian....@gmail.com> wrote: > > > > > On trunk the CommitFailedException isn't thrown anymore. The > > commitOffsets > > > method doesn't throw an exception. It returns one if it was thrown. We > > used > > > to throw this exception during suspendTasksAndState, but we don't > > anymore. > > > > > > On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sjmit...@gmail.com> wrote: > > > > > > > Hi > > > > On CommitFailedException at onPartitionsRevoked if it is thrown it > gets > > > > assigned to rebalanceException. > > > > This causes the stream thread to shutdown. I am not sure how we can > > > resume > > > > the thread. > > > > > > > > Note thread is not in invalid state because because it already has > been > > > > assigned new partitions and this exception happens when trying to > > revoke > > > > old partitions which have been moved to some other thread, so we need > > to > > > > swallow this exception at the StreanThread side too, just like we > > swallow > > > > it at ConsumerCoordinator.java > > > > > > > > Also I fixed this against code base 0.10.2.0 and the difference in > that > > > vs > > > > trunk code is these lines > > > > 10.2.0 > > > > if (firstException.get() == null) { > > > > firstException.set(commitOffsets()); > > > > } > > > > vs trunk > > > > if (firstException.get() == null) { > > > > // TODO: currently commit failures will not be thrown to > > > users > > > > // while suspending tasks; this need to be re-visit after > > > > KIP-98 > > > > commitOffsets(); > > > > } > > > > I am again not sure since this part is still a TODO, but looking at > > code > > > I > > > > see that commitOffsets can still throw the CommitFailedException > which > > > > needs to be handled at onPartitionsRevoked. > > > > > > > > Hope this makes sense. > > > > > > > > On second issue, the deadlock is not caused by CommitFailedExceptio, > > but > > > > after fixing the deadlock we need to make sure thread does not die > due > > to > > > > unhandled CommitFailedException at onPartitionsRevoked. > > > > The deadlock issue is like this. > > > > If a thread has two partitions and while processing partition one it > > > takes > > > > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is > evicted > > > > from the group and both partitions are now migrated to some other > > thread. > > > > Now when it tries to process the partition two it tries to get the > lock > > > to > > > > rocks db. It won't get the lock since that partition is now moved to > > some > > > > other thread. So it keeps increasing the backoffTimeMs and keeps > trying > > > to > > > > get the lock forever. This reaching a deadlock. > > > > To fix this we need some upper bound of the time limit till it tries > to > > > get > > > > that lock. And that upper bound has to be > MAX_POLL_INTERVAL_MS_CONFIG, > > > > because if by that time it has not got the lock, we can see that this > > > > thread was evicted from the group and need to rejoin again to get new > > > > partitions. > > > > > > > > On JIRA issue I can create one and attach the part of logs where it > > keeps > > > > trying to get the lock with increasing backoffTimeM. > > > > > > > > Let me know if these makes sense. Right now this is the best way we > > could > > > > come up with to handle stream thread failures. > > > > > > > > Also on a side note I feel we need more resilient streams. If we have > > say > > > > configured our streams application with 4 threads and for whatever > > > reason a > > > > thread dies, then application should itself (or via some exposed > > hooks), > > > > allow to restart a new thread (because in Java I guess same thread > > cannot > > > > be restarted), so that number of threads always stay what one has > > > > configured. > > > > I think exposed hooks will be better option to do this. > > > > > > > > Thanks > > > > Sachin > > > > > > > > > > > > > > > > > > > > On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax < > matth...@confluent.io > > > > > > > wrote: > > > > > > > > > Sachin, > > > > > > > > > > thanks a lot for contributing! > > > > > > > > > > Right now, I am not sure if I understand the change. On > > > > > CommitFailedException, why can we just resume the thread? To me, it > > > > > seems that the thread will be in an invalid state and thus it's not > > > save > > > > > to just swallow the exception and keep going. Can you shed some > > light? > > > > > > > > > > And from my understanding, the deadlock is "caused" by the change > > from > > > > > above, right? So if it is save to swallow the exception, we should > do > > > > > some "clean up" to avoid the deadlock in the first place, instead > of > > > > > applying and additional timeout. > > > > > > > > > > Also, if this is a bug, we should have a JIRA. > > > > > > > > > > -Matthias > > > > > > > > > > > > > > > On 3/5/17 4:11 AM, Sachin Mittal wrote: > > > > > > Hi, > > > > > > Please find the new PR > > > > > > https://github.com/apache/kafka/pull/2642/ > > > > > > > > > > > > I see that in trunk there has been change which is different from > > in > > > > > 10.2.0 > > > > > > > > > > > > 10.2.0 > > > > > > if (firstException.get() == null) { > > > > > > firstException.set(commitOffsets()); > > > > > > } > > > > > > vs trunk > > > > > > if (firstException.get() == null) { > > > > > > // TODO: currently commit failures will not be thrown > > to > > > > > users > > > > > > // while suspending tasks; this need to be re-visit > > after > > > > > KIP-98 > > > > > > commitOffsets(); > > > > > > } > > > > > > I am not sure in view of this is is my part of the fix still > valid. > > > > Looks > > > > > > like it is still valid. > > > > > > > > > > > > Also on side note what is the policy of closing a branch that is > > just > > > > > > released. > > > > > > > > > > > > Since you have release 10.2.0 we are using that and that is why > > have > > > > made > > > > > > changes in that branch so that our changes just modify the needed > > > code > > > > > and > > > > > > we don't mess up the other released code. > > > > > > > > > > > > Is the new release released off the branch 10.2.0, if yes then > you > > > > should > > > > > > not close it as there can be patch fixes on them. > > > > > > > > > > > > Or is the release always made off the branch trunk. In that case > > how > > > > can > > > > > we > > > > > > pick up the code on which the release binaries were created so > when > > > we > > > > > > build the binary we have exactly same code as released one, plus > > any > > > > > > changes (we or someone else) makes on it. > > > > > > > > > > > > Also if a branch is closed, then perhaps we should delete it or > > mark > > > it > > > > > > closed or something. > > > > > > > > > > > > Please let us know how releases get created (off what codebase), > so > > > we > > > > > are > > > > > > more exact in applying our changes to. > > > > > > > > > > > > Thanks > > > > > > Sachin > > > > > > > > > > > > > > > > > > > > > > > > On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska < > > eno.there...@gmail.com > > > > > > > > > wrote: > > > > > > > > > > > >> Thanks Sachin, one thing before the review, 0.10.2 is closed > now, > > > this > > > > > >> needs to target trunk. > > > > > >> > > > > > >> Thanks > > > > > >> Eno > > > > > >>> On 5 Mar 2017, at 09:10, Sachin Mittal <sjmit...@gmail.com> > > wrote: > > > > > >>> > > > > > >>> Please review the PR and let me know if this makes sense. > > > > > >>> > > > > > >>> https://github.com/apache/kafka/pull/2640 > > > > > >>> > > > > > >>> Thanks > > > > > >>> Sachin > > > > > >>> > > > > > >>> > > > > > >>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska < > > > eno.there...@gmail.com > > > > > > > > > > >> wrote: > > > > > >>> > > > > > >>>> Thanks Sachin for your contribution. Could you create a pull > > > request > > > > > out > > > > > >>>> of the commit (so we can add comments, and also so you are > > > > > acknowledged > > > > > >>>> properly for your contribution)? > > > > > >>>> > > > > > >>>> Thanks > > > > > >>>> Eno > > > > > >>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmit...@gmail.com> > > > wrote: > > > > > >>>>> > > > > > >>>>> Hi, > > > > > >>>>> So far in our experiment we have encountered 2 critical bugs. > > > > > >>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to > > > > > compute a > > > > > >>>>> cycle it gets evicted from group and rebalance takes place > and > > it > > > > > gets > > > > > >>>> new > > > > > >>>>> assignment. > > > > > >>>>> However when this thread tries to commit offsets for the > > revoked > > > > > >>>> partitions > > > > > >>>>> in > > > > > >>>>> onPartitionsRevoked it will again throw the > > > CommitFailedException. > > > > > >>>>> > > > > > >>>>> This gets handled by ConsumerCoordinatorso there is no point > to > > > > > assign > > > > > >>>> this > > > > > >>>>> exception to > > > > > >>>>> rebalanceException in StreamThread and stop it. It has > already > > > been > > > > > >>>>> assigned new partitions and it can continue. > > > > > >>>>> > > > > > >>>>> So as fix in case on CommitFailedException I am not killing > the > > > > > >>>> StreamThrea. > > > > > >>>>> > > > > > >>>>> 2. Next we see a deadlock state when to process a task it > takes > > > > > longer > > > > > >>>>> than MAX_POLL_INTERVAL_MS_CONFIG > > > > > >>>>> time. Then this threads partitions are assigned to some other > > > > thread > > > > > >>>>> including rocksdb lock. When it tries to process the next > task > > it > > > > > >> cannot > > > > > >>>>> get rocks db lock and simply keeps waiting for that lock > > forever. > > > > > >>>>> > > > > > >>>>> in retryWithBackoff for AbstractTaskCreator we have a > > > > backoffTimeMs = > > > > > >>>> 50L. > > > > > >>>>> If it does not get lock the we simply increase the time by > 10x > > > and > > > > > keep > > > > > >>>>> trying inside the while true loop. > > > > > >>>>> > > > > > >>>>> We need to have a upper bound for this backoffTimeM. If the > > time > > > is > > > > > >>>> greater > > > > > >>>>> than MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the > > > lock > > > > > >> means > > > > > >>>>> this thread's partitions are moved somewhere else and it may > > not > > > > get > > > > > >> the > > > > > >>>>> lock again. > > > > > >>>>> > > > > > >>>>> So I have added an upper bound check in that while loop. > > > > > >>>>> > > > > > >>>>> The commits are here: > > > > > >>>>> https://github.com/sjmittal/kafka/commit/ > > > > > >> 6f04327c890c58cab9b1ae108af4ce > > > > > >>>> 5c4e3b89a1 > > > > > >>>>> > > > > > >>>>> please review and if you feel they make sense, please merge > it > > to > > > > > main > > > > > >>>>> branch. > > > > > >>>>> > > > > > >>>>> Thanks > > > > > >>>>> Sachin > > > > > >>>> > > > > > >>>> > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > >