I was trying to simulate the PaneInfo in Python to check for parity with
the Java SDK. I was able to get PaneInfo after introduction a CombinePerKey.
I am not sure why GBK operation is returning the correct information.


On Wed, 27 May 2020 at 00:54, Robert Bradshaw <rober...@google.com> wrote:

> To clarify, PaneInfo is supported on the FnAPI local runner, but not on
> the bundle based one. Unfortunately, Streaming is not supported on the
> FnAPI one (yet), but work there is ongoing.
>
> On Tue, May 26, 2020 at 11:46 AM Pablo Estrada <pabl...@google.com> wrote:
>
>> Hi Jayadeep,
>> Unfortunately, it seems that PaneInfo is not well supported yet on the
>> local runners: https://issues.apache.org/jira/browse/BEAM-3759
>>
>> Can you share more about your use case, and what you'd like to do with
>> the PaneInfo?
>>
>> On Sat, May 23, 2020 at 10:03 AM Jay <jayadeep.jayara...@gmail.com>
>> wrote:
>>
>>> Hi All -
>>>
>>> Below is a sample code written in Python which reads data from Pub/Sub
>>> and tries to determine the PaneInfo for different elements
>>>
>>> There are 3 rows of data as shown below
>>>
>>> {"country":"USA","user_id": 1,"ts": 0},
>>>> {"country":"USA","user_id": 2,"ts": 0},
>>>> {"country":"USA","user_id": 3,"ts": 8}
>>>
>>>
>>> Below is the sample code which modifies the event timestamp for the
>>> messages
>>>
>>>> class AddTimestampDoFn(beam.DoFn):
>>>>   def process(self, element):
>>>>     unix_timestamp = (datetime.datetime.now()).timestamp() +
>>>> element["ts"]
>>>>     yield beam.window.TimestampedValue(element, unix_timestamp)
>>>
>>>
>>> Below is a Helper class to retrieve the timestamp. This has no
>>> importance apart from checking to see if the timestamp has been set
>>> correctly
>>>
>>>> class AddTimestamp(beam.DoFn):
>>>>   def process(self, element, timestamp=beam.DoFn.TimestampParam):
>>>>     yield (timestamp.to_utc_datetime(), element)
>>>
>>>
>>> Code below reads the records from Pub/Sub and runs the ParDo's mentioned
>>> above
>>>
>>> data = p | "read"                  >>
>>>> beam.io.ReadFromPubSub(subscription=subscription)
>>>>          | "JsonConvert"           >> beam.Map(json.loads)
>>>
>>> sliding_windows = (
>>>>       data | 'ConvertIntoUserEvents' >> beam.ParDo(AddTimestampDoFn())
>>>>            | 'AddTimestamp'          >> beam.ParDo(AddTimestamp())
>>>> )
>>>
>>>
>>> Below is my trigger definition and the implementation
>>>
>>>>
>>>>
>>>> class ProcessRecord(beam.DoFn):
>>>>   def process(self, element,
>>>> window=beam.DoFn.WindowParam,pane_info=beam.DoFn.PaneInfoParam):
>>>>      # access pane info e.g pane_info.is_first, pane_info.is_last,
>>>> pane_info.timing
>>>>      yield (element,
>>>> datetime.datetime.now(),window.start.to_utc_datetime(),
>>>> window.end.to_utc_datetime(), pane_info.timing, pane_info.is_first,
>>>> pane_info.is_last)
>>>
>>>
>>>
>>>> window_fn  = beam.window.FixedWindows(10)
>>>> trigger_fn = beam.transforms.trigger.AfterWatermark(early=AfterCount(1))
>>>> acc_dis_fn = beam.transforms.trigger.AccumulationMode.ACCUMULATING
>>>> new_final = sliding_windows | "acc_30" >> beam.WindowInto(
>>>>             window_fn,
>>>>             trigger= trigger_fn,
>>>>             accumulation_mode=acc_dis_fn
>>>>         ) | "acc_30_par" >> beam.ParDo(ProcessRecord())
>>>
>>>
>>>
>>>  When I look at the output I see the below
>>>
>>> 0 (datetime.datetime(2020, 5, 23, 16, 44, 9, 598681), {'country':
>>> 'USA', 'user_id': 1, 'ts': 0}) 2020-05-23 16:44:45.895890 2020-05-23
>>> 16:44:00 2020-05-23 16:44:10 3 True True
>>> 1 (datetime.datetime(2020, 5, 23, 16, 44, 9, 995521), {'country':
>>> 'USA', 'user_id': 2, 'ts': 0}) 2020-05-23 16:44:46.297163 2020-05-23
>>> 16:44:00 2020-05-23 16:44:10 3 True True
>>> 2 (datetime.datetime(2020, 5, 23, 16, 44, 17, 995603), {'country':
>>> 'USA', 'user_id': 3, 'ts': 8}) 2020-05-23 16:44:46.297259 2020-05-23
>>> 16:44:10 2020-05-23 16:44:20 3 True True
>>> As can be observed above there are two Windows that have been defined
>>> which is inline with the data and the FixedWindow strategy
>>> Window 1 - 2020-05-23 16:44:00, 2020-05-23 16:44:10
>>> Window 2 - 2020-05-23 16:44:10, 2020-05-23 16:44:20
>>>
>>> Couple of questions which I am not able to understand
>>> 1. Why is the PaneInfo.Timing value returned as "3" (UNKNOWN) instead of
>>> (EARLY, ON_TIME) ?
>>> 2. Shouldn't for Window1 and Window 2 there should be two firings one
>>> EARLY and one ON_TIME ?
>>> 3. The last two boolean values are is_first and is_last again both have
>>> been set to TRUE which doesn't look right.
>>>
>>>
>>> Can someone suggest on what can be the issue ?
>>>
>>> Thanks,
>>> Jayadeep
>>>
>>

Reply via email to