[
https://issues.apache.org/jira/browse/KAFKA-4253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15548284#comment-15548284
]
ASF GitHub Bot commented on KAFKA-4253:
---------------------------------------
GitHub user dguy opened a pull request:
https://github.com/apache/kafka/pull/1970
KAFKA-4253: Fix Kafka Stream thread shutting down process ordering
Changed the ordering in `StreamThread.shutdown`
1. commitAll (we need to commit so that any cached data is flushed through
the topology)
2. close all tasks
3. producer.flush() - so any records produced during close are flushed and
we have offsets for them
4. close all state managers
5. close producers/consumers
6. remove the tasks
Also in `onPartitionsRevoked`
1. commitAll
2. close all tasks
3. producer.flush
4. close all state managers
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/dguy/kafka kafka-4253
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/kafka/pull/1970.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1970
----
commit d52e8de5fefac51c9eb59a9c965bc17dcd3aa3bc
Author: Damian Guy <[email protected]>
Date: 2016-10-05T10:03:21Z
change shutdown order
----
> Fix Kafka Stream thread shutting down process ordering
> ------------------------------------------------------
>
> Key: KAFKA-4253
> URL: https://issues.apache.org/jira/browse/KAFKA-4253
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.1.0
> Reporter: Guozhang Wang
> Assignee: Damian Guy
>
> Currently we close the stream thread in the following way:
> 0. Commit all tasks.
> 1. Close producer.
> 2. Close consumer.
> 3. Close restore consumer.
> 4. For each task, close its topology processors one-by-one following the
> topology order by calling {{processor.close()}}.
> 5. For each task, close its state manager as well as flushing and closing all
> its associated state stores.
> We choose to close the producer / consumer clients before shutting down the
> tasks because we need to make sure all sent records has been acked so that we
> have the right log-end-offset when closing the store and checkpointing the
> offset of the changelog. However there is also an issue with this ordering,
> in which users choose to write more records in their {{processor.close()}}
> calls, this will cause RTE since the producers has already been closed, and
> no changelog records will be able to write.
> Thinking about this issue, a more appropriate ordering will be:
> 1. For each task, close their topology processors following the topology
> order by calling {{processor.close()}}.
> 2. For each task, commit its state by calling {{task.commit()}}. At this time
> all sent records should be acked since {{producer.flush()}} is called.
> 3. For each task, close their {{ProcessorStateManager}}.
> 4. Close all embedded clients, i.e. producer / consumer / restore consumer.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)