I verified keys and timestamps and they match. If I start the publisher and processor at the same time, the join has entirely correct output with 6000 messages coming in and 3000 coming out. Putting the grace period to a higher value has no effect. When is the watermark for the grace period advanced? Per commit interval? I read from 5 Kafka brokers and the event time is the Kafka log append time. Could the ordering across brokers have something to do with it?
On 2019/06/14 18:33:22, "Matthias J. Sax" <matth...@confluent.io> wrote: > How do you know that the result should be 900,000 messages? Did you > verify that the keys match and that the timestamps are correct? > > Did you try to remove grace-period or set a higher value? Maybe there is > an issue with ouf-of-order data? > > -Matthias > > On 6/14/19 5:05 AM, giselle.vandon...@klarrio.com wrote: > > I have two streams of data flowing into a Kafka cluster. I want to process > > this data with Kafka streams. The stream producers are started at some time > > t. > > > > I start up the Kafka Streams job 5 minutes later and start reading from > > earliest from both topics (about 900 000 messages already on each topic). > > The job parses the data and joins the two streams. On the intermediate > > topics, I see that all the earlier 2x900000 events are flowing through > > until the join. However, only 250 000 are outputted from the join, instead > > of 900 000. > > > > After processing the burst, the code works perfect on the new incoming > > events. > > > > Grace ms of the join is put on 50 millis but putting it on 5 minutes or 10 > > minutes makes no difference. My other settings: > > > > auto.offset.reset = earliest > > commit.interval.ms = 1000 > > batch.size = 204800 > > max.poll.records = 500000 > > > > The join is on the following window: > > > > JoinWindows.of(Duration.ofMillis(1000)) > > .grace(Duration.ofMillis(50)) > > > > As far as I understand these joins should be done on event time. What that > > can cause these results to be discarded? > > > >