[ https://issues.apache.org/jira/browse/KAFKA-8204?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16815516#comment-16815516 ]
John Roesler commented on KAFKA-8204: ------------------------------------- How far back should we backport the fix for this? The bug seems to affect cached state stores in every version of Streams since 0.10.2, and it's quite a subtle effect. You have to have multiple cached state stores in a row in the same subtopology, and Streams has to flush them in the wrong order (which is not guaranteed). In this case, Streams will mark a record as consumed even though it's still buffered in memory in the downstream store. If you stop (or crash) the app at this point, the record won't be added to the downstream store's changelog or emitted, constituting data loss. With the introduction of Suppress, the scenario is basically the same, but the effect is that we *do* emit the output record, but we *do not* mark the record as emitted in the suppression buffer changelog. When we restore, we forget that we previously emitted the record, leading to a duplicate result (or possibly disordered earlier result being emitted). > Streams may flush state stores in the incorrect order > ----------------------------------------------------- > > Key: KAFKA-8204 > URL: https://issues.apache.org/jira/browse/KAFKA-8204 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 0.10.2.0, 0.10.2.1, 0.10.2.2, 0.11.0.0, 0.11.0.1, > 0.11.0.2, 0.11.0.3, 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0, 2.0.1, 2.1.0, > 2.2.0, 2.1.1 > Reporter: John Roesler > Assignee: John Roesler > Priority: Blocker > Fix For: 2.1.2, 2.2.1 > > > Cached state stores may forward records during a flush call, so Streams > should flush the stores in topological order. Otherwise, Streams may flush a > downstream store before an upstream one, resulting in sink results being > committed without the corresponding state changelog updates being committed. > This behavior is partly responsible for the bug reported in KAFKA-7895 . > The fix is simply to flush the stores in topological order, then when the > upstream store forwards records to a downstream stateful processor, the > corresponding state changes will be correctly flushed as well. > An alternative would be to repeatedly call flush on all state stores until > they report there is nothing left to flush, but this requires a public API > change to enable state stores to report whether they need a flush or not. -- This message was sent by Atlassian JIRA (v7.6.3#76005)