> I verified keys and timestamps and they match.

Did you verify the timestamps client side, ie, in your Streams application?

> When is the watermark for the grace period advanced? 

There is nothing like a watermark. Time is tracked on a per-record basis.

> the event time is the Kafka log append time.

If it's log append time, that the broker sets the timestamp. Do you use
the embedded record timestamp for the join (default)? Or do you have an
embedded timestamps in the value and use an custom `TimestampExtractor`?

How large is your join-window, what is your grace period and what's your
store retention time?


-Matthias


On 6/17/19 5:24 AM, giselle.vandon...@klarrio.com wrote:
> I verified keys and timestamps and they match.  If I start the publisher and 
> processor at the same time, the join has entirely correct output with 6000 
> messages coming in and 3000 coming out. 
> Putting the grace period to a higher value has no effect. 
> When is the watermark for the grace period advanced? Per commit interval? 
> I read from 5 Kafka brokers and the event time is the Kafka log append time. 
> Could the ordering across brokers have something to do with it?
> 
> On 2019/06/14 18:33:22, "Matthias J. Sax" <matth...@confluent.io> wrote: 
>> How do you know that the result should be 900,000 messages? Did you
>> verify that the keys match and that the timestamps are correct?
>>
>> Did you try to remove grace-period or set a higher value? Maybe there is
>> an issue with ouf-of-order data?
>>
>> -Matthias
>>
>> On 6/14/19 5:05 AM, giselle.vandon...@klarrio.com wrote:
>>> I have two streams of data flowing into a Kafka cluster. I want to process 
>>> this data with Kafka streams. The stream producers are started at some time 
>>> t.
>>>
>>> I start up the Kafka Streams job 5 minutes later and start reading from 
>>> earliest from both topics (about 900 000 messages already on each topic). 
>>> The job parses the data and joins the two streams. On the intermediate 
>>> topics, I see that all the earlier 2x900000 events are flowing through 
>>> until the join. However, only 250 000 are outputted from the join, instead 
>>> of 900 000. 
>>>
>>> After processing the burst, the code works perfect on the new incoming 
>>> events.
>>>
>>> Grace ms of the join is put on 50 millis but putting it on 5 minutes or 10 
>>> minutes makes no difference. My other settings:
>>>
>>>     auto.offset.reset = earliest
>>>     commit.interval.ms = 1000
>>>     batch.size = 204800
>>>     max.poll.records = 500000
>>>
>>> The join is on the following window:
>>>
>>> JoinWindows.of(Duration.ofMillis(1000))
>>>       .grace(Duration.ofMillis(50))
>>>
>>> As far as I understand these joins should be done on event time. What that 
>>> can cause these results to be discarded?
>>>
>>
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to