Hi group,

I wrote a simple kafka streams application with topology such as below:

builder.addStateStore(
>     Stores.keyValueStoreBuilder(
>     Stores.persistentKeyValueStore("STORE"),
>     Serdes.String(), Serdes.String())
>         .withLoggingEnabled(storeConfig))|



builder.stream("TOPIC_1", Consumed.with(...))
>     .merge(builder.stream("TOPIC_2", Consumed.with(...))
>     .merge(builder.stream("TOPIC_3", Consumed.with(...))
>     .map(...) // stateless
>     .transform(..., "STORE")  // stateful

    .to("TOPIC_4");


All input topics have 6 partitions, and for the purpose of testing, we are
producing data to partition number 5.
We are using kafka streams version 2.8.1, broker version 2.12-2.1.1

The application works as expected when it has caught up to the lag, eg.
when reset tool is used with --to-latest parameter.
However, when the application is processing the messages starting from the
earliest offset, the inputs are provided in batches such as:

   - ~1000 messages from TOPIC_1
   - ~1000 messages from TOPIC_2
   - ~1000 messages from TOPIC_3

All of the messages have timestamps provided in headers, so I would expect
the application to interleave the messages from these three topics so that
their timestamps are in the ascending order.
However, this is not the case that I am observing. The messages are
processed in batches.

How do I configure my application so that it processes messages in order
when it is catching up to the lag?

Reply via email to