Sachin,

"late" data is data that arrives after the grace period and is not
processed but dropped for this reason. What you mean is "out-of-order
data" for which you can use the grace period to process it -- increasing
the window size would be a semantic change, while increasing the grace
period allows you get the same result for ordered and unordered input.

Let's look at an example with a join-window of 5 seconds
<key,value,timestamp>

Stream1: <k1,v1,10> <k2,v2,20> <k1,v3,26>
Stream2: <k1,w1,12> <k2,w2,30> <k1,w3,13>

With a grace period of zero, the computation and result would be as follows:

s1 -> k1 (insert into store)
s2 -> k1 (insert into store + join)
-> result <k1,v1+w1,12>
s1 -> k2 (insert into store + remove k1 from store because window size
is only 5 and grace period is zero)
s1 -> k1 (insert into store + remove k2 from store because window size
is only 5 and grace period is zero)
s2 -> k2 (insert into store -> no result because k2 from s1 was already
removed)
s2 -> k1 (out-of-order record that is also late, drop on the floor).

Note that the last record from s2 should actually be in the result and
if it would not have been out-or-order it would have joined with the
first record from s1.

If we increase the grace period (note that default grace period is 24h)
to for example to 50, we would get the following:

s1 -> k1 (insert into store)
s2 -> k1 (insert into store + join)
-> result <k1,v1+w1,12>
s1 -> k2 (insert into store)
s1 -> k1 (insert into store -- does not join because window is only 5)
s2 -> k2 (insert into store -- does not join because window is only 5)
s2 -> k1 (out-of-order record, outside of the window but processed
normally because it's within the grace period: insert into store + join)
-> result <k1,v1+w3,13>

This result is semantically "the same" as the result above -- if is
different though as we allow to process out-of-order data. The missing
join result from the last record of s2 and the first record of s1 is now
in the result as desired.

On the other hand, if we increase the window size to 50, we get a
semantically different result:

s1 -> k1 (insert into store)
s2 -> k1 (insert into store + join)
-> result <k1,v1+w1,12>
s1 -> k2 (insert into store)
s1 -> k1 (insert into store + join)
-> result <k1,v3+w1,26>
s2 -> k2 (insert into store + join)
-> result <k2,v2+w2,30>
s2 -> k1 (out-of-order record, within the window: insert into store + join)
-> 2 result2 <k1,v1+w3,13>, <k1,v3+w3,26>

Because we changes the window size, we get 5 result records instead of 2
(or 1) as in the first two examples.

Does this make sense?


-Matthias


On 2/21/20 7:35 PM, Sachin Mittal wrote:
> Hi,
> Reading the kafka docs I see that grace period is defined as:
> the time to admit late-arriving events after the end of the window
> 
> I however have not understood as when to use it?
> 
> If I see that some records are arriving after the end of the window and
> hence not included in the join, should I not simply increase the window
> size to accommodate that.
> 
> When do I really need to use grace and not alter the window size.
> 
> Thanks
> Sachin
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to