Leah,

thanks for your update. However, it does not completely answer my question.

In our current window implementations, we emit a window result update
record (ie, early/partial result) for each input record. When an
out-of-order record arrives, we just update to corresponding old window
and emit another update.

It's unclear from the KIP if you propose the same emit strategy? -- For
sliding windows it might be worth to consider to use a different emit
strategy and only support emitting the final result only (ie, after the
grace period passed)?



Boyang, also raises a good point that relates to my point from above
about pre-aggregations and storage layout. Our current time windows are
all pre-aggregated and stored in parallel. We can also lookup windows
efficiently, as we can compute the windowed-key given the input record
key and timestamp based on the window definition.

However, for sliding windows, window boundaries are data dependent and
thus we cannot compute them upfront. Thus, how can we "find" existing
window efficiently? Furthermore, out-of-order data would create new
windows in the past and we need to be able to handle this case.

Thus, to handle out-of-order data correctly, we need to store all raw
input events. Additionally, we could also store pre-aggregated results
if we thinks it's benfitial. -- If we apply "emit only final results"
strategy, storing pre-aggregated result would not be necessary though.


Btw: for sliding windows it might also be useful to consider allowing
users to supply a `Subtractor` -- this subtractor could be applied on
the current window result (in case we store it) if a record drops out of
the window. Of course, not all aggregation functions are subtractable
and we can consider this as a follow up task, too, and not include in
this KIP for now. Thoughts?



I was also thinking about the type hierarchy. I am not sure if extending
TimeWindow is the best approach? For TimeWindows, we can pre-compute
window boundaries (cf `windowsFor()`) while for a sliding window the
boundaries are data dependent. Session windows are also data dependent
and thus they don't inherit from TimeWindow (Maybe check out the KIP
that added session windows? It could provides some good insights.) -- I
believe the same rational applies to sliding windows?



-Matthias




On 7/10/20 12:47 PM, Boyang Chen wrote:
> Thanks Leah and Sophie for the KIP.
> 
> 1. I'm a bit surprised that we don't have an advance time. Could we
> elaborate how the storage layer is structured?
> 
> 2. IIUC, there will be extra cost in terms of fetching aggregation results,
> since we couldn't pre-aggregate until the user asks for it. Would be good
> to also discuss it.
> 
> 3. We haven't discussed the possibility of supporting sliding windows
> inherently. For a user who actually uses a hopping window, Streams could
> detect such an inefficiency doing a window_size/advance_time ratio to reach
> a conclusion on whether the write amplification is too high compared with
> some configured threshold. The benefit of doing so is that existing Streams
> users don't need to change their code, learn a new API, but only to upgrade
> Streams library to get benefits for their inefficient hopping window
> implementation. There might be some compatibility issues for sure, but
> worth listing them out for trade-off.
> 
> Boyang
> 
> On Fri, Jul 10, 2020 at 12:40 PM Leah Thomas <ltho...@confluent.io> wrote:
> 
>> Hey Matthias,
>>
>> Thanks for pointing that out. I added the following to the Propose Changes
>> section of the KIP:
>>
>> "Records that come out of order will be processed the same way as in-order
>> records, as long as they fall within the grace period. Any new windows
>> created by the late record will still be created, and the existing windows
>> that are changed by the late record will be updated. Any record that falls
>> outside of the grace period (either user defined or default) will be
>> discarded. "
>>
>> All the best,
>> Leah
>>
>> On Thu, Jul 9, 2020 at 9:47 PM Matthias J. Sax <mj...@apache.org> wrote:
>>
>>> Leah,
>>>
>>> thanks a lot for the KIP. Very well written.
>>>
>>> The KIP does not talk about the handling of out-of-order data though.
>>> How do you propose to address this?
>>>
>>>
>>> -Matthias
>>>
>>> On 7/8/20 5:33 PM, Leah Thomas wrote:
>>>> Hi all,
>>>> I'd like to kick-off the discussion for KIP-450, adding sliding window
>>>> aggregation support to Kafka Streams.
>>>>
>>>>
>>>
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-450%3A+Sliding+Window+Aggregations+in+the+DSL
>>>>
>>>> Let me know what you think,
>>>> Leah
>>>>
>>>
>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to