Hi group, I wrote a simple kafka streams application with topology such as below:
builder.addStateStore( > Stores.keyValueStoreBuilder( > Stores.persistentKeyValueStore("STORE"), > Serdes.String(), Serdes.String()) > .withLoggingEnabled(storeConfig))| builder.stream("TOPIC_1", Consumed.with(...)) > .merge(builder.stream("TOPIC_2", Consumed.with(...)) > .merge(builder.stream("TOPIC_3", Consumed.with(...)) > .map(...) // stateless > .transform(..., "STORE") // stateful .to("TOPIC_4"); All input topics have 6 partitions, and for the purpose of testing, we are producing data to partition number 5. We are using kafka streams version 2.8.1, broker version 2.12-2.1.1 The application works as expected when it has caught up to the lag, eg. when reset tool is used with --to-latest parameter. However, when the application is processing the messages starting from the earliest offset, the inputs are provided in batches such as: - ~1000 messages from TOPIC_1 - ~1000 messages from TOPIC_2 - ~1000 messages from TOPIC_3 All of the messages have timestamps provided in headers, so I would expect the application to interleave the messages from these three topics so that their timestamps are in the ascending order. However, this is not the case that I am observing. The messages are processed in batches. How do I configure my application so that it processes messages in order when it is catching up to the lag?