Thanks for explaining in more detail. Now I understood it. Was on a complete wrong track before that!
Great find and thanks for fixing it :) With regard to two unrelated issues. We should have two JIRAs and two PRs. -Matthias On 3/6/17 8:48 PM, Sachin Mittal wrote: >> As for the second issue you brought up, I agree it is indeed a bug; but > just to clarify it is the CREATION of the first task including restoring > stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not > processing it right > > Yes this is correct. I may have misused the terminology so lets not confuse > with processing in terms of kafka streams. > >> From my understanding, the deadlock is "caused" by your fix of problem > one. If the thread would die, the lock would get release and no deadlock > would occur. > > Well actually deadlock issue is different from CommitFailedException and is > not caused by it. However if we fix this, it may potentially cause > CommitFailedException later on, but in trunk we have already fixed that > CommitFailedExceptio, so only issue left is this deadlock issue. > I actually did a single commit for both the issue, so it may have got > confusing that fix of one causes second, but they are essentially unrelated. > > Deadlock issue is simply if CREATION of the first task including restoring > stores takes longer than MAX_POLL_INTERVAL_MS_CONFIG then the second task > in that pipeline may go into deadlock state if some other thread has > already got the handle of that partition. So as per me we may need some > upper bound check for backoffTimeMs . > > Thanks > Sachin > > > > On Tue, Mar 7, 2017 at 3:24 AM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Thanks for your input. I now understood the first issue (and the fix). >> Still not sure about the second issue. >> >> From my understanding, the deadlock is "caused" by your fix of problem >> one. If the thread would die, the lock would get release and no deadlock >> would occur. >> >> However, because the thread does not die, it also does not release its >> locks. Thus, from my understanding, in case of a CommitFailedException, >> the thread must release its locks, too. This would resolve the deadlock >> -- we don't need any timeout for this. >> >> Or do I still miss understand the issue? >> >> >> -Matthias >> >> >> On 3/6/17 11:41 AM, Guozhang Wang wrote: >>> Hello Sachin, >>> >>> Thanks for your finds!! Just to add what Damian said regarding 1), in >>> KIP-129 where we are introducing exactly-once processing semantics to >>> Streams we have also described different categories of error handling for >>> exactly-once. Commit exceptions due to rebalance will be handled as >>> "producer fenced" which will not be thrown to the users as we already did >>> in trunk. >>> >>> As for the second issue you brought up, I agree it is indeed a bug; but >>> just to clarify it is the CREATION of the first task including restoring >>> stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not >>> processing it right (we are only start processing until all tasks have >> been >>> created and initialized)? >>> >>> >>> >>> Guozhang >>> >>> >>> On Mon, Mar 6, 2017 at 7:16 AM, Sachin Mittal <sjmit...@gmail.com> >> wrote: >>> >>>> Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848 >>>> >>>> >>>> On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy <damian....@gmail.com> >> wrote: >>>> >>>>> Hi Sachin, >>>>> >>>>> If it is a bug then please file a JIRA for it, too. >>>>> >>>>> Thanks, >>>>> Damian >>>>> >>>>> On Mon, 6 Mar 2017 at 11:23 Sachin Mittal <sjmit...@gmail.com> wrote: >>>>> >>>>>> Ok that's great. >>>>>> So you have already fixed that issue. >>>>>> >>>>>> I have modified my PR to remove that change (which was done keeping >>>>>> 0.10.2.0 in mind). >>>>>> >>>>>> However the other issue is still valid. >>>>>> >>>>>> Please review that change. https://github.com/apache/kafka/pull/2642 >>>>>> >>>>>> >>>>>> Thanks >>>>>> Sachin >>>>>> >>>>>> >>>>>> On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <damian....@gmail.com> >>>> wrote: >>>>>> >>>>>>> On trunk the CommitFailedException isn't thrown anymore. The >>>>>> commitOffsets >>>>>>> method doesn't throw an exception. It returns one if it was thrown. >>>> We >>>>>> used >>>>>>> to throw this exception during suspendTasksAndState, but we don't >>>>>> anymore. >>>>>>> >>>>>>> On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sjmit...@gmail.com> >>>> wrote: >>>>>>> >>>>>>>> Hi >>>>>>>> On CommitFailedException at onPartitionsRevoked if it is thrown it >>>>> gets >>>>>>>> assigned to rebalanceException. >>>>>>>> This causes the stream thread to shutdown. I am not sure how we can >>>>>>> resume >>>>>>>> the thread. >>>>>>>> >>>>>>>> Note thread is not in invalid state because because it already has >>>>> been >>>>>>>> assigned new partitions and this exception happens when trying to >>>>>> revoke >>>>>>>> old partitions which have been moved to some other thread, so we >>>> need >>>>>> to >>>>>>>> swallow this exception at the StreanThread side too, just like we >>>>>> swallow >>>>>>>> it at ConsumerCoordinator.java >>>>>>>> >>>>>>>> Also I fixed this against code base 0.10.2.0 and the difference in >>>>> that >>>>>>> vs >>>>>>>> trunk code is these lines >>>>>>>> 10.2.0 >>>>>>>> if (firstException.get() == null) { >>>>>>>> firstException.set(commitOffsets()); >>>>>>>> } >>>>>>>> vs trunk >>>>>>>> if (firstException.get() == null) { >>>>>>>> // TODO: currently commit failures will not be thrown >>>> to >>>>>>> users >>>>>>>> // while suspending tasks; this need to be re-visit >>>> after >>>>>>>> KIP-98 >>>>>>>> commitOffsets(); >>>>>>>> } >>>>>>>> I am again not sure since this part is still a TODO, but looking at >>>>>> code >>>>>>> I >>>>>>>> see that commitOffsets can still throw the CommitFailedException >>>>> which >>>>>>>> needs to be handled at onPartitionsRevoked. >>>>>>>> >>>>>>>> Hope this makes sense. >>>>>>>> >>>>>>>> On second issue, the deadlock is not caused by >>>> CommitFailedExceptio, >>>>>> but >>>>>>>> after fixing the deadlock we need to make sure thread does not die >>>>> due >>>>>> to >>>>>>>> unhandled CommitFailedException at onPartitionsRevoked. >>>>>>>> The deadlock issue is like this. >>>>>>>> If a thread has two partitions and while processing partition one >>>> it >>>>>>> takes >>>>>>>> more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is >>>>> evicted >>>>>>>> from the group and both partitions are now migrated to some other >>>>>> thread. >>>>>>>> Now when it tries to process the partition two it tries to get the >>>>> lock >>>>>>> to >>>>>>>> rocks db. It won't get the lock since that partition is now moved >>>> to >>>>>> some >>>>>>>> other thread. So it keeps increasing the backoffTimeMs and keeps >>>>> trying >>>>>>> to >>>>>>>> get the lock forever. This reaching a deadlock. >>>>>>>> To fix this we need some upper bound of the time limit till it >>>> tries >>>>> to >>>>>>> get >>>>>>>> that lock. And that upper bound has to be >>>>> MAX_POLL_INTERVAL_MS_CONFIG, >>>>>>>> because if by that time it has not got the lock, we can see that >>>> this >>>>>>>> thread was evicted from the group and need to rejoin again to get >>>> new >>>>>>>> partitions. >>>>>>>> >>>>>>>> On JIRA issue I can create one and attach the part of logs where it >>>>>> keeps >>>>>>>> trying to get the lock with increasing backoffTimeM. >>>>>>>> >>>>>>>> Let me know if these makes sense. Right now this is the best way we >>>>>> could >>>>>>>> come up with to handle stream thread failures. >>>>>>>> >>>>>>>> Also on a side note I feel we need more resilient streams. If we >>>> have >>>>>> say >>>>>>>> configured our streams application with 4 threads and for whatever >>>>>>> reason a >>>>>>>> thread dies, then application should itself (or via some exposed >>>>>> hooks), >>>>>>>> allow to restart a new thread (because in Java I guess same thread >>>>>> cannot >>>>>>>> be restarted), so that number of threads always stay what one has >>>>>>>> configured. >>>>>>>> I think exposed hooks will be better option to do this. >>>>>>>> >>>>>>>> Thanks >>>>>>>> Sachin >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax < >>>>> matth...@confluent.io >>>>>>> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Sachin, >>>>>>>>> >>>>>>>>> thanks a lot for contributing! >>>>>>>>> >>>>>>>>> Right now, I am not sure if I understand the change. On >>>>>>>>> CommitFailedException, why can we just resume the thread? To me, >>>> it >>>>>>>>> seems that the thread will be in an invalid state and thus it's >>>> not >>>>>>> save >>>>>>>>> to just swallow the exception and keep going. Can you shed some >>>>>> light? >>>>>>>>> >>>>>>>>> And from my understanding, the deadlock is "caused" by the change >>>>>> from >>>>>>>>> above, right? So if it is save to swallow the exception, we >>>> should >>>>> do >>>>>>>>> some "clean up" to avoid the deadlock in the first place, instead >>>>> of >>>>>>>>> applying and additional timeout. >>>>>>>>> >>>>>>>>> Also, if this is a bug, we should have a JIRA. >>>>>>>>> >>>>>>>>> -Matthias >>>>>>>>> >>>>>>>>> >>>>>>>>> On 3/5/17 4:11 AM, Sachin Mittal wrote: >>>>>>>>>> Hi, >>>>>>>>>> Please find the new PR >>>>>>>>>> https://github.com/apache/kafka/pull/2642/ >>>>>>>>>> >>>>>>>>>> I see that in trunk there has been change which is different >>>> from >>>>>> in >>>>>>>>> 10.2.0 >>>>>>>>>> >>>>>>>>>> 10.2.0 >>>>>>>>>> if (firstException.get() == null) { >>>>>>>>>> firstException.set(commitOffsets()); >>>>>>>>>> } >>>>>>>>>> vs trunk >>>>>>>>>> if (firstException.get() == null) { >>>>>>>>>> // TODO: currently commit failures will not be >>>> thrown >>>>>> to >>>>>>>>> users >>>>>>>>>> // while suspending tasks; this need to be re-visit >>>>>> after >>>>>>>>> KIP-98 >>>>>>>>>> commitOffsets(); >>>>>>>>>> } >>>>>>>>>> I am not sure in view of this is is my part of the fix still >>>>> valid. >>>>>>>> Looks >>>>>>>>>> like it is still valid. >>>>>>>>>> >>>>>>>>>> Also on side note what is the policy of closing a branch that >>>> is >>>>>> just >>>>>>>>>> released. >>>>>>>>>> >>>>>>>>>> Since you have release 10.2.0 we are using that and that is why >>>>>> have >>>>>>>> made >>>>>>>>>> changes in that branch so that our changes just modify the >>>> needed >>>>>>> code >>>>>>>>> and >>>>>>>>>> we don't mess up the other released code. >>>>>>>>>> >>>>>>>>>> Is the new release released off the branch 10.2.0, if yes then >>>>> you >>>>>>>> should >>>>>>>>>> not close it as there can be patch fixes on them. >>>>>>>>>> >>>>>>>>>> Or is the release always made off the branch trunk. In that >>>> case >>>>>> how >>>>>>>> can >>>>>>>>> we >>>>>>>>>> pick up the code on which the release binaries were created so >>>>> when >>>>>>> we >>>>>>>>>> build the binary we have exactly same code as released one, >>>> plus >>>>>> any >>>>>>>>>> changes (we or someone else) makes on it. >>>>>>>>>> >>>>>>>>>> Also if a branch is closed, then perhaps we should delete it or >>>>>> mark >>>>>>> it >>>>>>>>>> closed or something. >>>>>>>>>> >>>>>>>>>> Please let us know how releases get created (off what >>>> codebase), >>>>> so >>>>>>> we >>>>>>>>> are >>>>>>>>>> more exact in applying our changes to. >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Sachin >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska < >>>>>> eno.there...@gmail.com >>>>>>>> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks Sachin, one thing before the review, 0.10.2 is closed >>>>> now, >>>>>>> this >>>>>>>>>>> needs to target trunk. >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> Eno >>>>>>>>>>>> On 5 Mar 2017, at 09:10, Sachin Mittal <sjmit...@gmail.com> >>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Please review the PR and let me know if this makes sense. >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/apache/kafka/pull/2640 >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> Sachin >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska < >>>>>>> eno.there...@gmail.com >>>>>>>>> >>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks Sachin for your contribution. Could you create a pull >>>>>>> request >>>>>>>>> out >>>>>>>>>>>>> of the commit (so we can add comments, and also so you are >>>>>>>>> acknowledged >>>>>>>>>>>>> properly for your contribution)? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> Eno >>>>>>>>>>>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmit...@gmail.com >>>>> >>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> So far in our experiment we have encountered 2 critical >>>> bugs. >>>>>>>>>>>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG >>>> to >>>>>>>>> compute a >>>>>>>>>>>>>> cycle it gets evicted from group and rebalance takes place >>>>> and >>>>>> it >>>>>>>>> gets >>>>>>>>>>>>> new >>>>>>>>>>>>>> assignment. >>>>>>>>>>>>>> However when this thread tries to commit offsets for the >>>>>> revoked >>>>>>>>>>>>> partitions >>>>>>>>>>>>>> in >>>>>>>>>>>>>> onPartitionsRevoked it will again throw the >>>>>>> CommitFailedException. >>>>>>>>>>>>>> >>>>>>>>>>>>>> This gets handled by ConsumerCoordinatorso there is no >>>> point >>>>> to >>>>>>>>> assign >>>>>>>>>>>>> this >>>>>>>>>>>>>> exception to >>>>>>>>>>>>>> rebalanceException in StreamThread and stop it. It has >>>>> already >>>>>>> been >>>>>>>>>>>>>> assigned new partitions and it can continue. >>>>>>>>>>>>>> >>>>>>>>>>>>>> So as fix in case on CommitFailedException I am not killing >>>>> the >>>>>>>>>>>>> StreamThrea. >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2. Next we see a deadlock state when to process a task it >>>>> takes >>>>>>>>> longer >>>>>>>>>>>>>> than MAX_POLL_INTERVAL_MS_CONFIG >>>>>>>>>>>>>> time. Then this threads partitions are assigned to some >>>> other >>>>>>>> thread >>>>>>>>>>>>>> including rocksdb lock. When it tries to process the next >>>>> task >>>>>> it >>>>>>>>>>> cannot >>>>>>>>>>>>>> get rocks db lock and simply keeps waiting for that lock >>>>>> forever. >>>>>>>>>>>>>> >>>>>>>>>>>>>> in retryWithBackoff for AbstractTaskCreator we have a >>>>>>>> backoffTimeMs = >>>>>>>>>>>>> 50L. >>>>>>>>>>>>>> If it does not get lock the we simply increase the time by >>>>> 10x >>>>>>> and >>>>>>>>> keep >>>>>>>>>>>>>> trying inside the while true loop. >>>>>>>>>>>>>> >>>>>>>>>>>>>> We need to have a upper bound for this backoffTimeM. If the >>>>>> time >>>>>>> is >>>>>>>>>>>>> greater >>>>>>>>>>>>>> than MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got >>>> the >>>>>>> lock >>>>>>>>>>> means >>>>>>>>>>>>>> this thread's partitions are moved somewhere else and it >>>> may >>>>>> not >>>>>>>> get >>>>>>>>>>> the >>>>>>>>>>>>>> lock again. >>>>>>>>>>>>>> >>>>>>>>>>>>>> So I have added an upper bound check in that while loop. >>>>>>>>>>>>>> >>>>>>>>>>>>>> The commits are here: >>>>>>>>>>>>>> https://github.com/sjmittal/kafka/commit/ >>>>>>>>>>> 6f04327c890c58cab9b1ae108af4ce >>>>>>>>>>>>> 5c4e3b89a1 >>>>>>>>>>>>>> >>>>>>>>>>>>>> please review and if you feel they make sense, please merge >>>>> it >>>>>> to >>>>>>>>> main >>>>>>>>>>>>>> branch. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> Sachin >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >>> >>> >> >> >
signature.asc
Description: OpenPGP digital signature