Tomasz you'll need to upgrade the kafka Streams dependency to 3.0 (or above)
to get the fix that John mentioned before -- this behavior is
known/expected on
earlier versions such as 2.8.1 as you are using

On Tue, Nov 15, 2022 at 2:21 AM Tomasz Gac <tomasz....@empirica.io.invalid>
wrote:

> Hi John,
>
> I've reviewed the test you sent and it seems to be correct, but it may not
> reproduce our setup.
>
> We are using Java 8, kafka-client 2.8.1 with kafka streams version 2.8.1
> against the kafka broker version 2.1.1. We are running it as an OSGI bundle
> with dependencies packaged within the bundle.
>
> Thank you,
> Tomasz
>
>
> pt., 30 wrz 2022 o 17:57 John Roesler <vvcep...@apache.org> napisał(a):
>
> > Hi again, Tomasz,
> >
> > Your issue is really bugging me, since I'm pretty sure it shouldn't be
> > happening.
> >
> > I went ahead and added an integration test with your exact scenario, as I
> > understand it: https://github.com/apache/kafka/pull/12706
> >
> > The test passes for me.
> >
> > Do you think you can check it out and try adjusting the test setup until
> > you're able to reproduce the behavior you're seeing? If you can do that,
> I
> > think we will get to the bottom of it.
> >
> > Thanks,
> > -John
> >
> > On Fri, Sep 30, 2022, at 09:51, John Roesler wrote:
> > > Hi Tomasz,
> > >
> > > Thanks for trying that out. It’s not the way I’d expect it to work. I
> > > don’t remember if there were any follow-up bugs that have been solved
> > > in subsequent releases. Just as a long shot, perhaps you can try it on
> > > the latest release (3.3.0)?
> > >
> > > Otherwise, I think the best path forward would be to file a bug report
> > > on the Apache Kafka Jira with enough information to reproduce the issue
> > > (or if you’re able to provide a repro, that would be awesome).
> > >
> > > Thanks, and sorry for the trouble.
> > > -John
> > >
> > > On Tue, Sep 27, 2022, at 03:15, Tomasz Gac wrote:
> > >> I upgraded to kafka streams 3.0.0 with positive task.max.idle.ms and
> > it did
> > >> not help.
> > >> When lag is large, the application still consumes data batches without
> > >> interleaving.
> > >>
> > >>
> > >>
> > >> wt., 27 wrz 2022 o 05:51 John Roesler <vvcep...@apache.org>
> napisał(a):
> > >>
> > >>> Hi Tomasz,
> > >>>
> > >>> Thanks for asking. This sounds like the situation that we fixed in
> > Apache
> > >>> Kafka 3.0, with KIP-695 (
> > >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization
> > >>> ).
> > >>>
> > >>> Can you try upgrading and let us know if this fixes the problem?
> > >>>
> > >>> Thanks,
> > >>> -John
> > >>>
> > >>> On Mon, Sep 26, 2022, at 01:35, Tomasz Gac wrote:
> > >>> > Hi group,
> > >>> >
> > >>> > I wrote a simple kafka streams application with topology such as
> > below:
> > >>> >
> > >>> > builder.addStateStore(
> > >>> >>     Stores.keyValueStoreBuilder(
> > >>> >>     Stores.persistentKeyValueStore("STORE"),
> > >>> >>     Serdes.String(), Serdes.String())
> > >>> >>         .withLoggingEnabled(storeConfig))|
> > >>> >
> > >>> >
> > >>> >
> > >>> > builder.stream("TOPIC_1", Consumed.with(...))
> > >>> >>     .merge(builder.stream("TOPIC_2", Consumed.with(...))
> > >>> >>     .merge(builder.stream("TOPIC_3", Consumed.with(...))
> > >>> >>     .map(...) // stateless
> > >>> >>     .transform(..., "STORE")  // stateful
> > >>> >
> > >>> >     .to("TOPIC_4");
> > >>> >
> > >>> >
> > >>> > All input topics have 6 partitions, and for the purpose of testing,
> > we
> > >>> are
> > >>> > producing data to partition number 5.
> > >>> > We are using kafka streams version 2.8.1, broker version 2.12-2.1.1
> > >>> >
> > >>> > The application works as expected when it has caught up to the lag,
> > eg.
> > >>> > when reset tool is used with --to-latest parameter.
> > >>> > However, when the application is processing the messages starting
> > from
> > >>> the
> > >>> > earliest offset, the inputs are provided in batches such as:
> > >>> >
> > >>> >    - ~1000 messages from TOPIC_1
> > >>> >    - ~1000 messages from TOPIC_2
> > >>> >    - ~1000 messages from TOPIC_3
> > >>> >
> > >>> > All of the messages have timestamps provided in headers, so I would
> > >>> expect
> > >>> > the application to interleave the messages from these three topics
> so
> > >>> that
> > >>> > their timestamps are in the ascending order.
> > >>> > However, this is not the case that I am observing. The messages are
> > >>> > processed in batches.
> > >>> >
> > >>> > How do I configure my application so that it processes messages in
> > order
> > >>> > when it is catching up to the lag?
> > >>>
> >
>

Reply via email to