[ https://issues.apache.org/jira/browse/KAFKA-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15548284#comment-15548284 ]
ASF GitHub Bot commented on KAFKA-4253: --------------------------------------- GitHub user dguy opened a pull request: https://github.com/apache/kafka/pull/1970 KAFKA-4253: Fix Kafka Stream thread shutting down process ordering Changed the ordering in `StreamThread.shutdown` 1. commitAll (we need to commit so that any cached data is flushed through the topology) 2. close all tasks 3. producer.flush() - so any records produced during close are flushed and we have offsets for them 4. close all state managers 5. close producers/consumers 6. remove the tasks Also in `onPartitionsRevoked` 1. commitAll 2. close all tasks 3. producer.flush 4. close all state managers You can merge this pull request into a Git repository by running: $ git pull https://github.com/dguy/kafka kafka-4253 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/1970.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1970 ---- commit d52e8de5fefac51c9eb59a9c965bc17dcd3aa3bc Author: Damian Guy <damian....@gmail.com> Date: 2016-10-05T10:03:21Z change shutdown order ---- > Fix Kafka Stream thread shutting down process ordering > ------------------------------------------------------ > > Key: KAFKA-4253 > URL: https://issues.apache.org/jira/browse/KAFKA-4253 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.1.0 > Reporter: Guozhang Wang > Assignee: Damian Guy > > Currently we close the stream thread in the following way: > 0. Commit all tasks. > 1. Close producer. > 2. Close consumer. > 3. Close restore consumer. > 4. For each task, close its topology processors one-by-one following the > topology order by calling {{processor.close()}}. > 5. For each task, close its state manager as well as flushing and closing all > its associated state stores. > We choose to close the producer / consumer clients before shutting down the > tasks because we need to make sure all sent records has been acked so that we > have the right log-end-offset when closing the store and checkpointing the > offset of the changelog. However there is also an issue with this ordering, > in which users choose to write more records in their {{processor.close()}} > calls, this will cause RTE since the producers has already been closed, and > no changelog records will be able to write. > Thinking about this issue, a more appropriate ordering will be: > 1. For each task, close their topology processors following the topology > order by calling {{processor.close()}}. > 2. For each task, commit its state by calling {{task.commit()}}. At this time > all sent records should be acked since {{producer.flush()}} is called. > 3. For each task, close their {{ProcessorStateManager}}. > 4. Close all embedded clients, i.e. producer / consumer / restore consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)