[ 
https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna updated KAFKA-15297:
----------------------------------
    Description: 
The flush order of the state store caches in Kafka Streams might not correspond 
to the topological order of the state stores in the topology. The order depends 
on how the processors and state stores are added to the topology. 
In some cases downstream state stores might be flushed before upstream state 
stores. That means, that during a commit records in upstream caches might end 
up in downstream caches that have already been flushed during the same commit. 
If a crash happens at that point, those records in the downstream caches are 
lost. Those records are lost for two reasons:

1. Records in caches are only changelogged after they are flushed from the 
cache. However, the downstream caches have already been flushed and they will 
not be flushed again during the same commit.

2. The offsets of the input records that caused the records that now are 
blocked in the downstream caches are committed during the same commit and so 
they will not be re-processed after the crash.

An example for a topology where the flush order of the caches is wrong is the 
following:

{code:java}
final String inputTopic1 = "inputTopic1";
final String inputTopic2 = "inputTopic2";
final String outputTopic1 = "outputTopic1";
final String processorName = "processor1";
final String stateStoreA = "stateStoreA";
final String stateStoreB = "stateStoreB";
final String stateStoreC = "stateStoreC";

streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), 
Serdes.String()))
    .process(
        () -> new Processor<String, String, String, String>() {

            private ProcessorContext<String, String> context;

            @Override
            public void init(ProcessorContext<String, String> context) {
                this.context = context;
            }

            @Override
            public void process(Record<String, String> record) {
                context.forward(record);
            }

            @Override
            public void close() {}
        },
        Named.as("processor1")
    )
    .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));

    streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), 
Serdes.String()))
        .toTable(Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
        .mapValues(value -> value, Materialized.<String, String, 
KeyValueStore<Bytes, 
byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
        .mapValues(value -> value, Materialized.<String, String, 
KeyValueStore<Bytes, 
byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
        .toStream()
        .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));

    final Topology topology = streamsBuilder.build(streamsConfiguration);
    topology.connectProcessorAndStateStores(processorName, stateStoreC);
{code}

This code results in the attached topology.

In the topology {{processor1}} is connected to {{stateStoreC}}. If 
{{processor1}} is added to the topology before the other processors, i.e., if 
the right branch of the topology is added before the left branch as in the code 
above, the cache of {{stateStoreC}} is flushed before the caches of 
{{stateStoreA}} and {{stateStoreB}}.

You can observe the flush order by feeding some records into the input topics 
of the topology, waiting for a commit,  and looking for the following log 
message:

https://github.com/apache/kafka/blob/2e1947d240607d53f071f61c875cfffc3fec47fe/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L513
 
I changed the log message from trace to debug to avoid too much noise. 


  was:
The flush order of the state store caches in Kafka Streams might not correspond 
to the topological order of the state stores in the topology. The order depends 
on how the processors and state stores are added to the topology. 
In some cases downstream state stores might be flushed before upstream state 
stores. That means, that during a commit records in upstream caches might end 
up in downstream caches that have already been flushed during the same commit. 
If a crash happens at that point, those records in the downstream caches are 
lost. Those records are lost for two reasons:

1. Records in caches are only changelogged after they are flushed from the 
cache. However, the downstream caches have already been flushed and they will 
not be flushed again during the same commit.

2. The offsets of the input records that caused the records that now are 
blocked in the downstream caches are committed during the same commit and so 
they will not be re-processed after the crash.

An example for a topology where the flush order of the caches is wrong is the 
following:

{code:java}
final String inputTopic1 = "inputTopic1";
final String inputTopic2 = "inputTopic2";
final String outputTopic1 = "outputTopic1";
final String processorName = "processor1";
final String stateStoreA = "stateStoreA";
final String stateStoreB = "stateStoreB";
final String stateStoreC = "stateStoreC";

streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), 
Serdes.String()))
    .process(
        () -> new Processor<String, String, String, String>() {

            private ProcessorContext<String, String> context;

            @Override
            public void init(ProcessorContext<String, String> context) {
                this.context = context;
            }

            @Override
            public void process(Record<String, String> record) {
                context.forward(record);
            }

            @Override
            public void close() {}
        },
        Named.as("processor1")
    )
    .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));

    streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), 
Serdes.String()))
        .toTable(Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
        .mapValues(value -> value, Materialized.<String, String, 
KeyValueStore<Bytes, 
byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
        .mapValues(value -> value, Materialized.<String, String, 
KeyValueStore<Bytes, 
byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
        .toStream()
        .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));

    final Topology topology = streamsBuilder.build(streamsConfiguration);
    topology.connectProcessorAndStateStores(processorName, stateStoreC);
{code}

This code results in the attached topology.

In the topology {{processor1}} is connected to {{stateStoreC}}. If 
{{processor1}} is added to the topology before the other processors, i.e., if 
the right branch of the topology is added before the left branch as in the code 
above, the cache of {{stateStoreC}} is flushed before the caches of 
{{stateStoreA}} and {{stateStoreB}}.

You can observe the flush order by feeding some records into the input topics 
of the topology and looking for the following log message:

https://github.com/apache/kafka/blob/2e1947d240607d53f071f61c875cfffc3fec47fe/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L513
 
I changed the log message from trace to debug to avoid too much noise. 



> Cache flush order might not be topological order 
> -------------------------------------------------
>
>                 Key: KAFKA-15297
>                 URL: https://issues.apache.org/jira/browse/KAFKA-15297
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.4.0
>            Reporter: Bruno Cadonna
>            Priority: Major
>         Attachments: minimal_example.png
>
>
> The flush order of the state store caches in Kafka Streams might not 
> correspond to the topological order of the state stores in the topology. The 
> order depends on how the processors and state stores are added to the 
> topology. 
> In some cases downstream state stores might be flushed before upstream state 
> stores. That means, that during a commit records in upstream caches might end 
> up in downstream caches that have already been flushed during the same 
> commit. If a crash happens at that point, those records in the downstream 
> caches are lost. Those records are lost for two reasons:
> 1. Records in caches are only changelogged after they are flushed from the 
> cache. However, the downstream caches have already been flushed and they will 
> not be flushed again during the same commit.
> 2. The offsets of the input records that caused the records that now are 
> blocked in the downstream caches are committed during the same commit and so 
> they will not be re-processed after the crash.
> An example for a topology where the flush order of the caches is wrong is the 
> following:
> {code:java}
> final String inputTopic1 = "inputTopic1";
> final String inputTopic2 = "inputTopic2";
> final String outputTopic1 = "outputTopic1";
> final String processorName = "processor1";
> final String stateStoreA = "stateStoreA";
> final String stateStoreB = "stateStoreB";
> final String stateStoreC = "stateStoreC";
> streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), 
> Serdes.String()))
>     .process(
>         () -> new Processor<String, String, String, String>() {
>             private ProcessorContext<String, String> context;
>             @Override
>             public void init(ProcessorContext<String, String> context) {
>                 this.context = context;
>             }
>             @Override
>             public void process(Record<String, String> record) {
>                 context.forward(record);
>             }
>             @Override
>             public void close() {}
>         },
>         Named.as("processor1")
>     )
>     .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
>     streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), 
> Serdes.String()))
>         .toTable(Materialized.<String, String, KeyValueStore<Bytes, 
> byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
>         .mapValues(value -> value, Materialized.<String, String, 
> KeyValueStore<Bytes, 
> byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
>         .mapValues(value -> value, Materialized.<String, String, 
> KeyValueStore<Bytes, 
> byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String()))
>         .toStream()
>         .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String()));
>     final Topology topology = streamsBuilder.build(streamsConfiguration);
>     topology.connectProcessorAndStateStores(processorName, stateStoreC);
> {code}
> This code results in the attached topology.
> In the topology {{processor1}} is connected to {{stateStoreC}}. If 
> {{processor1}} is added to the topology before the other processors, i.e., if 
> the right branch of the topology is added before the left branch as in the 
> code above, the cache of {{stateStoreC}} is flushed before the caches of 
> {{stateStoreA}} and {{stateStoreB}}.
> You can observe the flush order by feeding some records into the input topics 
> of the topology, waiting for a commit,  and looking for the following log 
> message:
> https://github.com/apache/kafka/blob/2e1947d240607d53f071f61c875cfffc3fec47fe/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L513
>  
> I changed the log message from trace to debug to avoid too much noise. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to