Also, note that I observed similar behaviour with DataflowRunner.

On Thu, 28 May 2020 at 21:24, Jay <jayadeep.jayara...@gmail.com> wrote:

> I was trying to simulate the PaneInfo in Python to check for parity with
> the Java SDK. I was able to get PaneInfo after introduction a CombinePerKey.
> I am not sure why GBK operation is returning the correct information.
>
>
> On Wed, 27 May 2020 at 00:54, Robert Bradshaw <rober...@google.com> wrote:
>
>> To clarify, PaneInfo is supported on the FnAPI local runner, but not on
>> the bundle based one. Unfortunately, Streaming is not supported on the
>> FnAPI one (yet), but work there is ongoing.
>>
>> On Tue, May 26, 2020 at 11:46 AM Pablo Estrada <pabl...@google.com>
>> wrote:
>>
>>> Hi Jayadeep,
>>> Unfortunately, it seems that PaneInfo is not well supported yet on the
>>> local runners: https://issues.apache.org/jira/browse/BEAM-3759
>>>
>>> Can you share more about your use case, and what you'd like to do with
>>> the PaneInfo?
>>>
>>> On Sat, May 23, 2020 at 10:03 AM Jay <jayadeep.jayara...@gmail.com>
>>> wrote:
>>>
>>>> Hi All -
>>>>
>>>> Below is a sample code written in Python which reads data from Pub/Sub
>>>> and tries to determine the PaneInfo for different elements
>>>>
>>>> There are 3 rows of data as shown below
>>>>
>>>> {"country":"USA","user_id": 1,"ts": 0},
>>>>> {"country":"USA","user_id": 2,"ts": 0},
>>>>> {"country":"USA","user_id": 3,"ts": 8}
>>>>
>>>>
>>>> Below is the sample code which modifies the event timestamp for the
>>>> messages
>>>>
>>>>> class AddTimestampDoFn(beam.DoFn):
>>>>>   def process(self, element):
>>>>>     unix_timestamp = (datetime.datetime.now()).timestamp() +
>>>>> element["ts"]
>>>>>     yield beam.window.TimestampedValue(element, unix_timestamp)
>>>>
>>>>
>>>> Below is a Helper class to retrieve the timestamp. This has no
>>>> importance apart from checking to see if the timestamp has been set
>>>> correctly
>>>>
>>>>> class AddTimestamp(beam.DoFn):
>>>>>   def process(self, element, timestamp=beam.DoFn.TimestampParam):
>>>>>     yield (timestamp.to_utc_datetime(), element)
>>>>
>>>>
>>>> Code below reads the records from Pub/Sub and runs the ParDo's
>>>> mentioned above
>>>>
>>>> data = p | "read"                  >>
>>>>> beam.io.ReadFromPubSub(subscription=subscription)
>>>>>          | "JsonConvert"           >> beam.Map(json.loads)
>>>>
>>>> sliding_windows = (
>>>>>       data | 'ConvertIntoUserEvents' >> beam.ParDo(AddTimestampDoFn())
>>>>>            | 'AddTimestamp'          >> beam.ParDo(AddTimestamp())
>>>>> )
>>>>
>>>>
>>>> Below is my trigger definition and the implementation
>>>>
>>>>>
>>>>>
>>>>> class ProcessRecord(beam.DoFn):
>>>>>   def process(self, element,
>>>>> window=beam.DoFn.WindowParam,pane_info=beam.DoFn.PaneInfoParam):
>>>>>      # access pane info e.g pane_info.is_first, pane_info.is_last,
>>>>> pane_info.timing
>>>>>      yield (element,
>>>>> datetime.datetime.now(),window.start.to_utc_datetime(),
>>>>> window.end.to_utc_datetime(), pane_info.timing, pane_info.is_first,
>>>>> pane_info.is_last)
>>>>
>>>>
>>>>
>>>>> window_fn  = beam.window.FixedWindows(10)
>>>>> trigger_fn =
>>>>> beam.transforms.trigger.AfterWatermark(early=AfterCount(1))
>>>>> acc_dis_fn = beam.transforms.trigger.AccumulationMode.ACCUMULATING
>>>>> new_final = sliding_windows | "acc_30" >> beam.WindowInto(
>>>>>             window_fn,
>>>>>             trigger= trigger_fn,
>>>>>             accumulation_mode=acc_dis_fn
>>>>>         ) | "acc_30_par" >> beam.ParDo(ProcessRecord())
>>>>
>>>>
>>>>
>>>>  When I look at the output I see the below
>>>>
>>>> 0 (datetime.datetime(2020, 5, 23, 16, 44, 9, 598681), {'country':
>>>> 'USA', 'user_id': 1, 'ts': 0}) 2020-05-23 16:44:45.895890 2020-05-23
>>>> 16:44:00 2020-05-23 16:44:10 3 True True
>>>> 1 (datetime.datetime(2020, 5, 23, 16, 44, 9, 995521), {'country':
>>>> 'USA', 'user_id': 2, 'ts': 0}) 2020-05-23 16:44:46.297163 2020-05-23
>>>> 16:44:00 2020-05-23 16:44:10 3 True True
>>>> 2 (datetime.datetime(2020, 5, 23, 16, 44, 17, 995603), {'country':
>>>> 'USA', 'user_id': 3, 'ts': 8}) 2020-05-23 16:44:46.297259 2020-05-23
>>>> 16:44:10 2020-05-23 16:44:20 3 True True
>>>> As can be observed above there are two Windows that have been defined
>>>> which is inline with the data and the FixedWindow strategy
>>>> Window 1 - 2020-05-23 16:44:00, 2020-05-23 16:44:10
>>>> Window 2 - 2020-05-23 16:44:10, 2020-05-23 16:44:20
>>>>
>>>> Couple of questions which I am not able to understand
>>>> 1. Why is the PaneInfo.Timing value returned as "3" (UNKNOWN) instead
>>>> of (EARLY, ON_TIME) ?
>>>> 2. Shouldn't for Window1 and Window 2 there should be two firings one
>>>> EARLY and one ON_TIME ?
>>>> 3. The last two boolean values are is_first and is_last again both have
>>>> been set to TRUE which doesn't look right.
>>>>
>>>>
>>>> Can someone suggest on what can be the issue ?
>>>>
>>>> Thanks,
>>>> Jayadeep
>>>>
>>>

Reply via email to