It seems like there were multiple issues:
1. The two streams were read in separately:
    val stream1: KStream[String, String] = builder.stream[String, 
String](Set("topic1"))
    val stream2: KStream[String, String] = builder.stream[String, 
String](Set("topic2"))
 instead of together:
    val rawStreams: KStream[String, String] = builder.stream[String, 
String](Set("topic1", "topic2"))
This second option got much more output but still not complete.
2.  There are twenty partitions per topic. It seems as if it is not reading 
equally fast from all topic partitions. When printing the input per thread the 
timestamps do not accumulate nicely across partitions. If time is tracked on a 
per-record basis. Do you then take the max timestamp across all partitions and 
topics as the current event time? If so, if you read faster from one partition 
than from the other when processing old data, what can you do to make sure this 
does not get discarded besides putting a very high grace period?

My join window is one second, grace period 50ms, retention time is default.
I use the timestamp inside the observations. But I have tried with the default 
TimestampExtractor (log append time) as well, which still did not give all the 
wanted output. 

I am also wondering about what to do with the commit interval? In normal cases 
this should be on 1000 ms but in the case of this initial startup burst it 
should output faster?

On 2019/06/17 16:47:11, "Matthias J. Sax" <matth...@confluent.io> wrote: 
> > I verified keys and timestamps and they match.
> 
> Did you verify the timestamps client side, ie, in your Streams application?
> 
> > When is the watermark for the grace period advanced? 
> 
> There is nothing like a watermark. Time is tracked on a per-record basis.
> 
> > the event time is the Kafka log append time.
> 
> If it's log append time, that the broker sets the timestamp. Do you use
> the embedded record timestamp for the join (default)? Or do you have an
> embedded timestamps in the value and use an custom `TimestampExtractor`?
> 
> How large is your join-window, what is your grace period and what's your
> store retention time?
> 
> 
> -Matthias
> 
> 
> On 6/17/19 5:24 AM, giselle.vandon...@klarrio.com wrote:
> > I verified keys and timestamps and they match.  If I start the publisher 
> > and processor at the same time, the join has entirely correct output with 
> > 6000 messages coming in and 3000 coming out. 
> > Putting the grace period to a higher value has no effect. 
> > When is the watermark for the grace period advanced? Per commit interval? 
> > I read from 5 Kafka brokers and the event time is the Kafka log append 
> > time. Could the ordering across brokers have something to do with it?
> > 
> > On 2019/06/14 18:33:22, "Matthias J. Sax" <matth...@confluent.io> wrote: 
> >> How do you know that the result should be 900,000 messages? Did you
> >> verify that the keys match and that the timestamps are correct?
> >>
> >> Did you try to remove grace-period or set a higher value? Maybe there is
> >> an issue with ouf-of-order data?
> >>
> >> -Matthias
> >>
> >> On 6/14/19 5:05 AM, giselle.vandon...@klarrio.com wrote:
> >>> I have two streams of data flowing into a Kafka cluster. I want to 
> >>> process this data with Kafka streams. The stream producers are started at 
> >>> some time t.
> >>>
> >>> I start up the Kafka Streams job 5 minutes later and start reading from 
> >>> earliest from both topics (about 900 000 messages already on each topic). 
> >>> The job parses the data and joins the two streams. On the intermediate 
> >>> topics, I see that all the earlier 2x900000 events are flowing through 
> >>> until the join. However, only 250 000 are outputted from the join, 
> >>> instead of 900 000. 
> >>>
> >>> After processing the burst, the code works perfect on the new incoming 
> >>> events.
> >>>
> >>> Grace ms of the join is put on 50 millis but putting it on 5 minutes or 
> >>> 10 minutes makes no difference. My other settings:
> >>>
> >>>     auto.offset.reset = earliest
> >>>     commit.interval.ms = 1000
> >>>     batch.size = 204800
> >>>     max.poll.records = 500000
> >>>
> >>> The join is on the following window:
> >>>
> >>> JoinWindows.of(Duration.ofMillis(1000))
> >>>       .grace(Duration.ofMillis(50))
> >>>
> >>> As far as I understand these joins should be done on event time. What 
> >>> that can cause these results to be discarded?
> >>>
> >>
> >>
> 
> 

Reply via email to