Thanks for explaining in more detail. Now I understood it. Was on a
complete wrong track before that!

Great find and thanks for fixing it :)

With regard to two unrelated issues. We should have two JIRAs and two PRs.


-Matthias

On 3/6/17 8:48 PM, Sachin Mittal wrote:
>> As for the second issue you brought up, I agree it is indeed a bug; but
> just to clarify it is the CREATION of the first task including restoring
> stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not
> processing it right
> 
> Yes this is correct. I may have misused the terminology so lets not confuse
> with processing in terms of kafka streams.
> 
>> From my understanding, the deadlock is "caused" by your fix of problem
> one. If the thread would die, the lock would get release and no deadlock
> would occur.
> 
> Well actually deadlock issue is different from CommitFailedException and is
> not caused by it. However if we fix this, it may potentially cause
> CommitFailedException later on, but in trunk we have already fixed that
> CommitFailedExceptio, so only issue left is this deadlock issue.
> I actually did a single commit for both the issue, so it may have got
> confusing that fix of one causes second, but they are essentially unrelated.
> 
> Deadlock issue is simply if CREATION of the first task including restoring
> stores takes longer than MAX_POLL_INTERVAL_MS_CONFIG then the second task
> in that pipeline may go into deadlock state if some other thread has
> already got the handle of that partition. So as per me we may need some
> upper bound check for backoffTimeMs .
> 
> Thanks
> Sachin
> 
> 
> 
> On Tue, Mar 7, 2017 at 3:24 AM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Thanks for your input. I now understood the first issue (and the fix).
>> Still not sure about the second issue.
>>
>> From my understanding, the deadlock is "caused" by your fix of problem
>> one. If the thread would die, the lock would get release and no deadlock
>> would occur.
>>
>> However, because the thread does not die, it also does not release its
>> locks. Thus, from my understanding, in case of a CommitFailedException,
>> the thread must release its locks, too. This would resolve the deadlock
>> -- we don't need any timeout for this.
>>
>> Or do I still miss understand the issue?
>>
>>
>> -Matthias
>>
>>
>> On 3/6/17 11:41 AM, Guozhang Wang wrote:
>>> Hello Sachin,
>>>
>>> Thanks for your finds!! Just to add what Damian said regarding 1), in
>>> KIP-129 where we are introducing exactly-once processing semantics to
>>> Streams we have also described different categories of error handling for
>>> exactly-once. Commit exceptions due to rebalance will be handled as
>>> "producer fenced" which will not be thrown to the users as we already did
>>> in trunk.
>>>
>>> As for the second issue you brought up, I agree it is indeed a bug; but
>>> just to clarify it is the CREATION of the first task including restoring
>>> stores that can take longer than MAX_POLL_INTERVAL_MS_CONFIG, not
>>> processing it right (we are only start processing until all tasks have
>> been
>>> created and initialized)?
>>>
>>>
>>>
>>> Guozhang
>>>
>>>
>>> On Mon, Mar 6, 2017 at 7:16 AM, Sachin Mittal <sjmit...@gmail.com>
>> wrote:
>>>
>>>> Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848
>>>>
>>>>
>>>> On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy <damian....@gmail.com>
>> wrote:
>>>>
>>>>> Hi Sachin,
>>>>>
>>>>> If it is a bug then please file a JIRA for it, too.
>>>>>
>>>>> Thanks,
>>>>> Damian
>>>>>
>>>>> On Mon, 6 Mar 2017 at 11:23 Sachin Mittal <sjmit...@gmail.com> wrote:
>>>>>
>>>>>> Ok that's great.
>>>>>> So you have already fixed that issue.
>>>>>>
>>>>>> I have modified my PR to remove that change (which was done keeping
>>>>>> 0.10.2.0 in mind).
>>>>>>
>>>>>> However the other issue is still valid.
>>>>>>
>>>>>> Please review that change. https://github.com/apache/kafka/pull/2642
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>> Sachin
>>>>>>
>>>>>>
>>>>>> On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <damian....@gmail.com>
>>>> wrote:
>>>>>>
>>>>>>> On trunk the CommitFailedException isn't thrown anymore. The
>>>>>> commitOffsets
>>>>>>> method doesn't throw an exception. It returns one if it was thrown.
>>>> We
>>>>>> used
>>>>>>> to throw this exception during suspendTasksAndState, but we don't
>>>>>> anymore.
>>>>>>>
>>>>>>> On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sjmit...@gmail.com>
>>>> wrote:
>>>>>>>
>>>>>>>> Hi
>>>>>>>> On CommitFailedException at onPartitionsRevoked if it is thrown it
>>>>> gets
>>>>>>>> assigned to rebalanceException.
>>>>>>>> This causes the stream thread to shutdown. I am not sure how we can
>>>>>>> resume
>>>>>>>> the thread.
>>>>>>>>
>>>>>>>> Note thread is not in invalid state because because it already has
>>>>> been
>>>>>>>> assigned new partitions and this exception happens when trying to
>>>>>> revoke
>>>>>>>> old partitions which have been moved to some other thread, so we
>>>> need
>>>>>> to
>>>>>>>> swallow this exception at the StreanThread side too, just like we
>>>>>> swallow
>>>>>>>> it at ConsumerCoordinator.java
>>>>>>>>
>>>>>>>> Also I fixed this against code base 0.10.2.0 and the difference in
>>>>> that
>>>>>>> vs
>>>>>>>> trunk code is these lines
>>>>>>>> 10.2.0
>>>>>>>>        if (firstException.get() == null) {
>>>>>>>>             firstException.set(commitOffsets());
>>>>>>>>        }
>>>>>>>>  vs trunk
>>>>>>>>         if (firstException.get() == null) {
>>>>>>>>             // TODO: currently commit failures will not be thrown
>>>> to
>>>>>>> users
>>>>>>>>             // while suspending tasks; this need to be re-visit
>>>> after
>>>>>>>> KIP-98
>>>>>>>>             commitOffsets();
>>>>>>>>         }
>>>>>>>> I am again not sure since this part is still a TODO, but looking at
>>>>>> code
>>>>>>> I
>>>>>>>> see that commitOffsets can still throw the CommitFailedException
>>>>> which
>>>>>>>> needs to be handled at onPartitionsRevoked.
>>>>>>>>
>>>>>>>> Hope this makes sense.
>>>>>>>>
>>>>>>>> On second issue, the deadlock is not caused by
>>>> CommitFailedExceptio,
>>>>>> but
>>>>>>>> after fixing the deadlock we need to make sure thread does not die
>>>>> due
>>>>>> to
>>>>>>>> unhandled CommitFailedException at onPartitionsRevoked.
>>>>>>>> The deadlock issue is like this.
>>>>>>>> If a thread has two partitions and while processing partition one
>>>> it
>>>>>>> takes
>>>>>>>> more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is
>>>>> evicted
>>>>>>>> from the group and both partitions are now migrated to some other
>>>>>> thread.
>>>>>>>> Now when it tries to process the partition two it tries to get the
>>>>> lock
>>>>>>> to
>>>>>>>> rocks db. It won't get the lock since that partition is now moved
>>>> to
>>>>>> some
>>>>>>>> other thread. So it keeps increasing the backoffTimeMs and keeps
>>>>> trying
>>>>>>> to
>>>>>>>> get the lock forever. This reaching a deadlock.
>>>>>>>> To fix this we need some upper bound of the time limit till it
>>>> tries
>>>>> to
>>>>>>> get
>>>>>>>> that lock. And that upper bound has to be
>>>>> MAX_POLL_INTERVAL_MS_CONFIG,
>>>>>>>> because if by that time it has not got the lock, we can see that
>>>> this
>>>>>>>> thread was evicted from the group and need to rejoin again to get
>>>> new
>>>>>>>> partitions.
>>>>>>>>
>>>>>>>> On JIRA issue I can create one and attach the part of logs where it
>>>>>> keeps
>>>>>>>> trying to get the lock with increasing backoffTimeM.
>>>>>>>>
>>>>>>>> Let me know if these makes sense. Right now this is the best way we
>>>>>> could
>>>>>>>> come up with to handle stream thread failures.
>>>>>>>>
>>>>>>>> Also on a side note I feel we need more resilient streams. If we
>>>> have
>>>>>> say
>>>>>>>> configured our streams application with 4 threads and for whatever
>>>>>>> reason a
>>>>>>>> thread dies, then application should itself (or via some exposed
>>>>>> hooks),
>>>>>>>> allow to restart a new thread (because in Java I guess same thread
>>>>>> cannot
>>>>>>>> be restarted), so that number of threads always stay what one has
>>>>>>>> configured.
>>>>>>>> I think exposed hooks will be better option to do this.
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Sachin
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <
>>>>> matth...@confluent.io
>>>>>>>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Sachin,
>>>>>>>>>
>>>>>>>>> thanks a lot for contributing!
>>>>>>>>>
>>>>>>>>> Right now, I am not sure if I understand the change. On
>>>>>>>>> CommitFailedException, why can we just resume the thread? To me,
>>>> it
>>>>>>>>> seems that the thread will be in an invalid state and thus it's
>>>> not
>>>>>>> save
>>>>>>>>> to just swallow the exception and keep going. Can you shed some
>>>>>> light?
>>>>>>>>>
>>>>>>>>> And from my understanding, the deadlock is "caused" by the change
>>>>>> from
>>>>>>>>> above, right? So if it is save to swallow the exception, we
>>>> should
>>>>> do
>>>>>>>>> some "clean up" to avoid the deadlock in the first place, instead
>>>>> of
>>>>>>>>> applying and additional timeout.
>>>>>>>>>
>>>>>>>>> Also, if this is a bug, we should have a JIRA.
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 3/5/17 4:11 AM, Sachin Mittal wrote:
>>>>>>>>>> Hi,
>>>>>>>>>> Please find the new PR
>>>>>>>>>> https://github.com/apache/kafka/pull/2642/
>>>>>>>>>>
>>>>>>>>>> I see that in trunk there has been change which is different
>>>> from
>>>>>> in
>>>>>>>>> 10.2.0
>>>>>>>>>>
>>>>>>>>>> 10.2.0
>>>>>>>>>>        if (firstException.get() == null) {
>>>>>>>>>>             firstException.set(commitOffsets());
>>>>>>>>>>        }
>>>>>>>>>>  vs trunk
>>>>>>>>>>         if (firstException.get() == null) {
>>>>>>>>>>             // TODO: currently commit failures will not be
>>>> thrown
>>>>>> to
>>>>>>>>> users
>>>>>>>>>>             // while suspending tasks; this need to be re-visit
>>>>>> after
>>>>>>>>> KIP-98
>>>>>>>>>>             commitOffsets();
>>>>>>>>>>         }
>>>>>>>>>> I am not sure in view of this is is my part of the fix still
>>>>> valid.
>>>>>>>> Looks
>>>>>>>>>> like it is still valid.
>>>>>>>>>>
>>>>>>>>>> Also on side note what is the policy of closing a branch that
>>>> is
>>>>>> just
>>>>>>>>>> released.
>>>>>>>>>>
>>>>>>>>>> Since you have release 10.2.0 we are using that and that is why
>>>>>> have
>>>>>>>> made
>>>>>>>>>> changes in that branch so that our changes just modify the
>>>> needed
>>>>>>> code
>>>>>>>>> and
>>>>>>>>>> we don't mess up the other released code.
>>>>>>>>>>
>>>>>>>>>> Is the new release released off the branch 10.2.0, if yes then
>>>>> you
>>>>>>>> should
>>>>>>>>>> not close it as there can be patch fixes on them.
>>>>>>>>>>
>>>>>>>>>> Or is the release always made off the branch trunk. In that
>>>> case
>>>>>> how
>>>>>>>> can
>>>>>>>>> we
>>>>>>>>>> pick up the code on which the release binaries were created so
>>>>> when
>>>>>>> we
>>>>>>>>>> build the binary we have exactly same code as released one,
>>>> plus
>>>>>> any
>>>>>>>>>> changes (we or someone else) makes on it.
>>>>>>>>>>
>>>>>>>>>> Also if a branch is closed, then perhaps we should delete it or
>>>>>> mark
>>>>>>> it
>>>>>>>>>> closed or something.
>>>>>>>>>>
>>>>>>>>>> Please let us know how releases get created (off what
>>>> codebase),
>>>>> so
>>>>>>> we
>>>>>>>>> are
>>>>>>>>>> more exact in applying our changes to.
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>> Sachin
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <
>>>>>> eno.there...@gmail.com
>>>>>>>>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Sachin, one thing before the review, 0.10.2 is closed
>>>>> now,
>>>>>>> this
>>>>>>>>>>> needs to target trunk.
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>> Eno
>>>>>>>>>>>> On 5 Mar 2017, at 09:10, Sachin Mittal <sjmit...@gmail.com>
>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Please review the PR and let me know if this makes sense.
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/kafka/pull/2640
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Sachin
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <
>>>>>>> eno.there...@gmail.com
>>>>>>>>>
>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Sachin for your contribution. Could you create a pull
>>>>>>> request
>>>>>>>>> out
>>>>>>>>>>>>> of the commit (so we can add comments, and also so you are
>>>>>>>>> acknowledged
>>>>>>>>>>>>> properly for your contribution)?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Eno
>>>>>>>>>>>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmit...@gmail.com
>>>>>
>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>> So far in our experiment we have encountered 2 critical
>>>> bugs.
>>>>>>>>>>>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG
>>>> to
>>>>>>>>> compute a
>>>>>>>>>>>>>> cycle it gets evicted from group and rebalance takes place
>>>>> and
>>>>>> it
>>>>>>>>> gets
>>>>>>>>>>>>> new
>>>>>>>>>>>>>> assignment.
>>>>>>>>>>>>>> However when this thread tries to commit offsets for the
>>>>>> revoked
>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>> onPartitionsRevoked it will again throw the
>>>>>>> CommitFailedException.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This gets handled by ConsumerCoordinatorso there is no
>>>> point
>>>>> to
>>>>>>>>> assign
>>>>>>>>>>>>> this
>>>>>>>>>>>>>> exception to
>>>>>>>>>>>>>> rebalanceException in StreamThread and stop it. It has
>>>>> already
>>>>>>> been
>>>>>>>>>>>>>> assigned new partitions and it can continue.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So as fix in case on CommitFailedException I am not killing
>>>>> the
>>>>>>>>>>>>> StreamThrea.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2. Next we see a deadlock state when to process a task it
>>>>> takes
>>>>>>>>> longer
>>>>>>>>>>>>>> than MAX_POLL_INTERVAL_MS_CONFIG
>>>>>>>>>>>>>> time. Then this threads partitions are assigned to some
>>>> other
>>>>>>>> thread
>>>>>>>>>>>>>> including rocksdb lock. When it tries to process the next
>>>>> task
>>>>>> it
>>>>>>>>>>> cannot
>>>>>>>>>>>>>> get rocks db lock and simply keeps waiting for that lock
>>>>>> forever.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> in retryWithBackoff for AbstractTaskCreator we have a
>>>>>>>> backoffTimeMs =
>>>>>>>>>>>>> 50L.
>>>>>>>>>>>>>> If it does not get lock the we simply increase the time by
>>>>> 10x
>>>>>>> and
>>>>>>>>> keep
>>>>>>>>>>>>>> trying inside the while true loop.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> We need to have a upper bound for this backoffTimeM. If the
>>>>>> time
>>>>>>> is
>>>>>>>>>>>>> greater
>>>>>>>>>>>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got
>>>> the
>>>>>>> lock
>>>>>>>>>>> means
>>>>>>>>>>>>>> this thread's partitions are moved somewhere else and it
>>>> may
>>>>>> not
>>>>>>>> get
>>>>>>>>>>> the
>>>>>>>>>>>>>> lock again.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So I have added an upper bound check in that while loop.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The commits are here:
>>>>>>>>>>>>>> https://github.com/sjmittal/kafka/commit/
>>>>>>>>>>> 6f04327c890c58cab9b1ae108af4ce
>>>>>>>>>>>>> 5c4e3b89a1
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> please review and if you feel they make sense, please merge
>>>>> it
>>>>>> to
>>>>>>>>> main
>>>>>>>>>>>>>> branch.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Sachin
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>>
>>>
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to