Tomasz you'll need to upgrade the kafka Streams dependency to 3.0 (or above) to get the fix that John mentioned before -- this behavior is known/expected on earlier versions such as 2.8.1 as you are using
On Tue, Nov 15, 2022 at 2:21 AM Tomasz Gac <tomasz....@empirica.io.invalid> wrote: > Hi John, > > I've reviewed the test you sent and it seems to be correct, but it may not > reproduce our setup. > > We are using Java 8, kafka-client 2.8.1 with kafka streams version 2.8.1 > against the kafka broker version 2.1.1. We are running it as an OSGI bundle > with dependencies packaged within the bundle. > > Thank you, > Tomasz > > > pt., 30 wrz 2022 o 17:57 John Roesler <vvcep...@apache.org> napisał(a): > > > Hi again, Tomasz, > > > > Your issue is really bugging me, since I'm pretty sure it shouldn't be > > happening. > > > > I went ahead and added an integration test with your exact scenario, as I > > understand it: https://github.com/apache/kafka/pull/12706 > > > > The test passes for me. > > > > Do you think you can check it out and try adjusting the test setup until > > you're able to reproduce the behavior you're seeing? If you can do that, > I > > think we will get to the bottom of it. > > > > Thanks, > > -John > > > > On Fri, Sep 30, 2022, at 09:51, John Roesler wrote: > > > Hi Tomasz, > > > > > > Thanks for trying that out. It’s not the way I’d expect it to work. I > > > don’t remember if there were any follow-up bugs that have been solved > > > in subsequent releases. Just as a long shot, perhaps you can try it on > > > the latest release (3.3.0)? > > > > > > Otherwise, I think the best path forward would be to file a bug report > > > on the Apache Kafka Jira with enough information to reproduce the issue > > > (or if you’re able to provide a repro, that would be awesome). > > > > > > Thanks, and sorry for the trouble. > > > -John > > > > > > On Tue, Sep 27, 2022, at 03:15, Tomasz Gac wrote: > > >> I upgraded to kafka streams 3.0.0 with positive task.max.idle.ms and > > it did > > >> not help. > > >> When lag is large, the application still consumes data batches without > > >> interleaving. > > >> > > >> > > >> > > >> wt., 27 wrz 2022 o 05:51 John Roesler <vvcep...@apache.org> > napisał(a): > > >> > > >>> Hi Tomasz, > > >>> > > >>> Thanks for asking. This sounds like the situation that we fixed in > > Apache > > >>> Kafka 3.0, with KIP-695 ( > > >>> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization > > >>> ). > > >>> > > >>> Can you try upgrading and let us know if this fixes the problem? > > >>> > > >>> Thanks, > > >>> -John > > >>> > > >>> On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote: > > >>> > Hi group, > > >>> > > > >>> > I wrote a simple kafka streams application with topology such as > > below: > > >>> > > > >>> > builder.addStateStore( > > >>> >> Stores.keyValueStoreBuilder( > > >>> >> Stores.persistentKeyValueStore("STORE"), > > >>> >> Serdes.String(), Serdes.String()) > > >>> >> .withLoggingEnabled(storeConfig))| > > >>> > > > >>> > > > >>> > > > >>> > builder.stream("TOPIC_1", Consumed.with(...)) > > >>> >> .merge(builder.stream("TOPIC_2", Consumed.with(...)) > > >>> >> .merge(builder.stream("TOPIC_3", Consumed.with(...)) > > >>> >> .map(...) // stateless > > >>> >> .transform(..., "STORE") // stateful > > >>> > > > >>> > .to("TOPIC_4"); > > >>> > > > >>> > > > >>> > All input topics have 6 partitions, and for the purpose of testing, > > we > > >>> are > > >>> > producing data to partition number 5. > > >>> > We are using kafka streams version 2.8.1, broker version 2.12-2.1.1 > > >>> > > > >>> > The application works as expected when it has caught up to the lag, > > eg. > > >>> > when reset tool is used with --to-latest parameter. > > >>> > However, when the application is processing the messages starting > > from > > >>> the > > >>> > earliest offset, the inputs are provided in batches such as: > > >>> > > > >>> > - ~1000 messages from TOPIC_1 > > >>> > - ~1000 messages from TOPIC_2 > > >>> > - ~1000 messages from TOPIC_3 > > >>> > > > >>> > All of the messages have timestamps provided in headers, so I would > > >>> expect > > >>> > the application to interleave the messages from these three topics > so > > >>> that > > >>> > their timestamps are in the ascending order. > > >>> > However, this is not the case that I am observing. The messages are > > >>> > processed in batches. > > >>> > > > >>> > How do I configure my application so that it processes messages in > > order > > >>> > when it is catching up to the lag? > > >>> > > >