[ 
https://issues.apache.org/jira/browse/KAFKA-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15548284#comment-15548284
 ] 

ASF GitHub Bot commented on KAFKA-4253:
---------------------------------------

GitHub user dguy opened a pull request:

    https://github.com/apache/kafka/pull/1970

    KAFKA-4253: Fix Kafka Stream thread shutting down process ordering

    Changed the ordering in `StreamThread.shutdown`
    1. commitAll (we need to commit so that any cached data is flushed through 
the topology)
    2. close all tasks
    3. producer.flush() - so any records produced during close are flushed and 
we have offsets for them
    4. close all state managers
    5. close producers/consumers
    6. remove the tasks
    
    Also in `onPartitionsRevoked`
    1. commitAll
    2. close all tasks
    3. producer.flush
    4. close all state managers


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dguy/kafka kafka-4253

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/1970.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1970
    
----
commit d52e8de5fefac51c9eb59a9c965bc17dcd3aa3bc
Author: Damian Guy <damian....@gmail.com>
Date:   2016-10-05T10:03:21Z

    change shutdown order

----


> Fix Kafka Stream thread shutting down process ordering
> ------------------------------------------------------
>
>                 Key: KAFKA-4253
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4253
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.0
>            Reporter: Guozhang Wang
>            Assignee: Damian Guy
>
> Currently we close the stream thread in the following way:
> 0. Commit all tasks.
> 1. Close producer.
> 2. Close consumer.
> 3. Close restore consumer.
> 4. For each task, close its topology processors one-by-one following the 
> topology order by calling {{processor.close()}}.
> 5. For each task, close its state manager as well as flushing and closing all 
> its associated state stores.
> We choose to close the producer / consumer clients before shutting down the 
> tasks because we need to make sure all sent records has been acked so that we 
> have the right log-end-offset when closing the store and checkpointing the 
> offset of the changelog. However there is also an issue with this ordering, 
> in which users choose to write more records in their {{processor.close()}} 
> calls, this will cause RTE since the producers has already been closed, and 
> no changelog records will be able to write.
> Thinking about this issue, a more appropriate ordering will be:
> 1. For each task, close their topology processors following the topology 
> order by calling {{processor.close()}}.
> 2. For each task, commit its state by calling {{task.commit()}}. At this time 
> all sent records should be acked since {{producer.flush()}} is called.
> 3. For each task, close their {{ProcessorStateManager}}.
> 4. Close all embedded clients, i.e. producer / consumer / restore consumer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to