Thanks Sachin, one thing before the review, 0.10.2 is closed now, this needs to target trunk.
Thanks Eno > On 5 Mar 2017, at 09:10, Sachin Mittal <sjmit...@gmail.com> wrote: > > Please review the PR and let me know if this makes sense. > > https://github.com/apache/kafka/pull/2640 > > Thanks > Sachin > > > On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <eno.there...@gmail.com> wrote: > >> Thanks Sachin for your contribution. Could you create a pull request out >> of the commit (so we can add comments, and also so you are acknowledged >> properly for your contribution)? >> >> Thanks >> Eno >>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmit...@gmail.com> wrote: >>> >>> Hi, >>> So far in our experiment we have encountered 2 critical bugs. >>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to compute a >>> cycle it gets evicted from group and rebalance takes place and it gets >> new >>> assignment. >>> However when this thread tries to commit offsets for the revoked >> partitions >>> in >>> onPartitionsRevoked it will again throw the CommitFailedException. >>> >>> This gets handled by ConsumerCoordinatorso there is no point to assign >> this >>> exception to >>> rebalanceException in StreamThread and stop it. It has already been >>> assigned new partitions and it can continue. >>> >>> So as fix in case on CommitFailedException I am not killing the >> StreamThrea. >>> >>> 2. Next we see a deadlock state when to process a task it takes longer >>> than MAX_POLL_INTERVAL_MS_CONFIG >>> time. Then this threads partitions are assigned to some other thread >>> including rocksdb lock. When it tries to process the next task it cannot >>> get rocks db lock and simply keeps waiting for that lock forever. >>> >>> in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs = >> 50L. >>> If it does not get lock the we simply increase the time by 10x and keep >>> trying inside the while true loop. >>> >>> We need to have a upper bound for this backoffTimeM. If the time is >> greater >>> than MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock means >>> this thread's partitions are moved somewhere else and it may not get the >>> lock again. >>> >>> So I have added an upper bound check in that while loop. >>> >>> The commits are here: >>> https://github.com/sjmittal/kafka/commit/6f04327c890c58cab9b1ae108af4ce >> 5c4e3b89a1 >>> >>> please review and if you feel they make sense, please merge it to main >>> branch. >>> >>> Thanks >>> Sachin >> >>