I verified keys and timestamps and they match.  If I start the publisher and 
processor at the same time, the join has entirely correct output with 6000 
messages coming in and 3000 coming out. 
Putting the grace period to a higher value has no effect. 
When is the watermark for the grace period advanced? Per commit interval? 
I read from 5 Kafka brokers and the event time is the Kafka log append time. 
Could the ordering across brokers have something to do with it?

On 2019/06/14 18:33:22, "Matthias J. Sax" <matth...@confluent.io> wrote: 
> How do you know that the result should be 900,000 messages? Did you
> verify that the keys match and that the timestamps are correct?
> 
> Did you try to remove grace-period or set a higher value? Maybe there is
> an issue with ouf-of-order data?
> 
> -Matthias
> 
> On 6/14/19 5:05 AM, giselle.vandon...@klarrio.com wrote:
> > I have two streams of data flowing into a Kafka cluster. I want to process 
> > this data with Kafka streams. The stream producers are started at some time 
> > t.
> > 
> > I start up the Kafka Streams job 5 minutes later and start reading from 
> > earliest from both topics (about 900 000 messages already on each topic). 
> > The job parses the data and joins the two streams. On the intermediate 
> > topics, I see that all the earlier 2x900000 events are flowing through 
> > until the join. However, only 250 000 are outputted from the join, instead 
> > of 900 000. 
> > 
> > After processing the burst, the code works perfect on the new incoming 
> > events.
> > 
> > Grace ms of the join is put on 50 millis but putting it on 5 minutes or 10 
> > minutes makes no difference. My other settings:
> > 
> >     auto.offset.reset = earliest
> >     commit.interval.ms = 1000
> >     batch.size = 204800
> >     max.poll.records = 500000
> > 
> > The join is on the following window:
> > 
> > JoinWindows.of(Duration.ofMillis(1000))
> >       .grace(Duration.ofMillis(50))
> > 
> > As far as I understand these joins should be done on event time. What that 
> > can cause these results to be discarded?
> > 
> 
> 

Reply via email to