Windowing affects how data is partitioned into groups in GroupByKey. For
example, Global windowing means that all the data will be classified into
the same window: the global window. Think of the global window as just a
window that's so large that all timestamps get classified into that window,
but it's still a window.
Which data within a group gets emitted or dropped, and when, is still
controlled by allowed lateness and trigger.

I suspect that you might be assuming that data dropping happens because
some data falls outside "the current window", and so if you chose the
global window, all data would fall into it. This is not the case: there is
no concept of "current window" - window is, for many purposes, just a
secondary key within a GroupByKey. Elements that arrive at any time may be
classified into any window or into many windows; all windows are being
"processed" at the same time the same way as all keys are processed at the
same time. This leaves the GroupByKey with a stream of occasionally
arriving elements for each key and window - e.g. "element 42 arrived for
key "foo" window [10am, 10:05am]". It needs to decide what to do with this
element: 1) drop it, 2) buffer it, or 3) emit the buffered elements that
have arrived so far for key "foo" window [10am, 10:05am]; and also whether
to clear the buffer or not.
- Whether to drop or to buffer: this is controlled by the watermark and the
allowed lateness: watermark is a promise "I think there'll be no future
data with a timestamp < $watermark", and data that violates this promise is
late data; allowed lateness says "data that's late by more than this will
be dropped". Your allowed lateness of 0 says "all data that's late by more
than 0 seconds will be dropped".
- Whether to buffer or to emit: this is controlled by the trigger.
- Whether to clear buffer or not: this is controlled by accumulation mode.

This is a very frequent point of confusion and I'd agree that what Beam
calls "window" and "trigger" is not what many people intuitively think when
they hear these terms, even after reading documentation. I hope we can
explain these terms better eventually.

On Fri, Dec 8, 2017 at 9:38 AM Nishu <nishuta...@gmail.com> wrote:

> Hi Eugene,
>
> Does AllowedLateness affect in case of GlobalWindows?  My assumption is
> that in case of Global windows,  it will capture entire data due to
> accumulation and watermark won't affect.
> Please correct me.
>
> Thanks & regards,
> Nishu
>
>
> On Fri, Dec 8, 2017 at 5:43 PM, Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> Accumulation mode doesn't affect late data dropping: I see your pipeline
>> specifies an allowed lateness of zero which means to drop all data that is
>> beyond the watermark: is this intentional?
>>
>> On Fri, Dec 8, 2017, 1:23 AM Nishu <nishuta...@gmail.com> wrote:
>>
>>> Below is the code, I use for Windows and triggers.
>>>
>>> *Trigger trigger = Repeatedly.forever(*
>>> *
>>> AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(delayDuration)));*
>>>
>>> *// Define windows for Customer Topic*
>>> *PCollection<KV<String, String>> customerSubset =
>>> customerInput.apply("CustomerWindowParDo",*
>>> * Window.<KV<String, String>> into(new
>>> GlobalWindows()).triggering(trigger).accumulatingFiredPanes()*
>>> * .withAllowedLateness(Duration.standardMinutes(0)));*
>>>
>>>
>>>
>>> Thanks & regards,
>>> Nishu
>>>
>>> On Fri, Dec 8, 2017 at 10:19 AM, Nishu <nishuta...@gmail.com> wrote:
>>>
>>>> Hi Eugene,
>>>>
>>>> In my usecase, I use GlobalWindow (
>>>> https://beam.apache.org/documentation/programming-guide/#provided-windowing-functions
>>>> ) and specify the trigger. In GLobal Window, entire data is accumulated
>>>> every time the trigger fires. so that we can avoid the late data issue.
>>>>
>>>> I found a JIRA issue(https://issues.apache.org/jira/browse/BEAM-3225 )
>>>> for the same issue in Beam.
>>>>
>>>> Today I am going to try to write similar implementation in Flink.
>>>>
>>>> Thanks,
>>>> Nishu
>>>>
>>>>
>>>> On Fri, Dec 8, 2017 at 12:08 AM, Eugene Kirpichov <kirpic...@google.com
>>>> > wrote:
>>>>
>>>>> Most likely this is late data
>>>>> https://beam.apache.org/documentation/programming-guide/#watermarks-and-late-data
>>>>> . Try configuring a trigger with a late data behavior that is more
>>>>> appropriate for your particular use case.
>>>>>
>>>>> On Thu, Dec 7, 2017 at 3:03 PM Nishu <nishuta...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am running a Streaming pipeline  with Flink runner.
>>>>>> *Operator sequence* is -> Reading the JSON data, Parse JSON String
>>>>>> to the Object and  Group the object based on common key.  I noticed that
>>>>>> GroupByKey operator throws away some data in between and hence I don't 
>>>>>> get
>>>>>> all the keys as output.
>>>>>>
>>>>>> In the below screenshot, 1001 records are read from kafka Topic ,
>>>>>> each record has unique ID .  After grouping it returns only 857 unique
>>>>>> IDs.  Ideally it should return 1001 records from GroupByKey operator.
>>>>>>
>>>>>>
>>>>>> [image: Inline image 3]
>>>>>>
>>>>>> Any idea, what can be the issue? Thanks in advance!
>>>>>>
>>>>>> --
>>>>>> Thanks & Regards,
>>>>>> Nishu Tayal
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Nishu Tayal
>>>>
>>>
>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Nishu Tayal
>>>
>>
>
>
> --
> Thanks & Regards,
> Nishu Tayal
>

Reply via email to