I was trying to simulate the PaneInfo in Python to check for parity with the Java SDK. I was able to get PaneInfo after introduction a CombinePerKey. I am not sure why GBK operation is returning the correct information.
On Wed, 27 May 2020 at 00:54, Robert Bradshaw <rober...@google.com> wrote: > To clarify, PaneInfo is supported on the FnAPI local runner, but not on > the bundle based one. Unfortunately, Streaming is not supported on the > FnAPI one (yet), but work there is ongoing. > > On Tue, May 26, 2020 at 11:46 AM Pablo Estrada <pabl...@google.com> wrote: > >> Hi Jayadeep, >> Unfortunately, it seems that PaneInfo is not well supported yet on the >> local runners: https://issues.apache.org/jira/browse/BEAM-3759 >> >> Can you share more about your use case, and what you'd like to do with >> the PaneInfo? >> >> On Sat, May 23, 2020 at 10:03 AM Jay <jayadeep.jayara...@gmail.com> >> wrote: >> >>> Hi All - >>> >>> Below is a sample code written in Python which reads data from Pub/Sub >>> and tries to determine the PaneInfo for different elements >>> >>> There are 3 rows of data as shown below >>> >>> {"country":"USA","user_id": 1,"ts": 0}, >>>> {"country":"USA","user_id": 2,"ts": 0}, >>>> {"country":"USA","user_id": 3,"ts": 8} >>> >>> >>> Below is the sample code which modifies the event timestamp for the >>> messages >>> >>>> class AddTimestampDoFn(beam.DoFn): >>>> def process(self, element): >>>> unix_timestamp = (datetime.datetime.now()).timestamp() + >>>> element["ts"] >>>> yield beam.window.TimestampedValue(element, unix_timestamp) >>> >>> >>> Below is a Helper class to retrieve the timestamp. This has no >>> importance apart from checking to see if the timestamp has been set >>> correctly >>> >>>> class AddTimestamp(beam.DoFn): >>>> def process(self, element, timestamp=beam.DoFn.TimestampParam): >>>> yield (timestamp.to_utc_datetime(), element) >>> >>> >>> Code below reads the records from Pub/Sub and runs the ParDo's mentioned >>> above >>> >>> data = p | "read" >> >>>> beam.io.ReadFromPubSub(subscription=subscription) >>>> | "JsonConvert" >> beam.Map(json.loads) >>> >>> sliding_windows = ( >>>> data | 'ConvertIntoUserEvents' >> beam.ParDo(AddTimestampDoFn()) >>>> | 'AddTimestamp' >> beam.ParDo(AddTimestamp()) >>>> ) >>> >>> >>> Below is my trigger definition and the implementation >>> >>>> >>>> >>>> class ProcessRecord(beam.DoFn): >>>> def process(self, element, >>>> window=beam.DoFn.WindowParam,pane_info=beam.DoFn.PaneInfoParam): >>>> # access pane info e.g pane_info.is_first, pane_info.is_last, >>>> pane_info.timing >>>> yield (element, >>>> datetime.datetime.now(),window.start.to_utc_datetime(), >>>> window.end.to_utc_datetime(), pane_info.timing, pane_info.is_first, >>>> pane_info.is_last) >>> >>> >>> >>>> window_fn = beam.window.FixedWindows(10) >>>> trigger_fn = beam.transforms.trigger.AfterWatermark(early=AfterCount(1)) >>>> acc_dis_fn = beam.transforms.trigger.AccumulationMode.ACCUMULATING >>>> new_final = sliding_windows | "acc_30" >> beam.WindowInto( >>>> window_fn, >>>> trigger= trigger_fn, >>>> accumulation_mode=acc_dis_fn >>>> ) | "acc_30_par" >> beam.ParDo(ProcessRecord()) >>> >>> >>> >>> When I look at the output I see the below >>> >>> 0 (datetime.datetime(2020, 5, 23, 16, 44, 9, 598681), {'country': >>> 'USA', 'user_id': 1, 'ts': 0}) 2020-05-23 16:44:45.895890 2020-05-23 >>> 16:44:00 2020-05-23 16:44:10 3 True True >>> 1 (datetime.datetime(2020, 5, 23, 16, 44, 9, 995521), {'country': >>> 'USA', 'user_id': 2, 'ts': 0}) 2020-05-23 16:44:46.297163 2020-05-23 >>> 16:44:00 2020-05-23 16:44:10 3 True True >>> 2 (datetime.datetime(2020, 5, 23, 16, 44, 17, 995603), {'country': >>> 'USA', 'user_id': 3, 'ts': 8}) 2020-05-23 16:44:46.297259 2020-05-23 >>> 16:44:10 2020-05-23 16:44:20 3 True True >>> As can be observed above there are two Windows that have been defined >>> which is inline with the data and the FixedWindow strategy >>> Window 1 - 2020-05-23 16:44:00, 2020-05-23 16:44:10 >>> Window 2 - 2020-05-23 16:44:10, 2020-05-23 16:44:20 >>> >>> Couple of questions which I am not able to understand >>> 1. Why is the PaneInfo.Timing value returned as "3" (UNKNOWN) instead of >>> (EARLY, ON_TIME) ? >>> 2. Shouldn't for Window1 and Window 2 there should be two firings one >>> EARLY and one ON_TIME ? >>> 3. The last two boolean values are is_first and is_last again both have >>> been set to TRUE which doesn't look right. >>> >>> >>> Can someone suggest on what can be the issue ? >>> >>> Thanks, >>> Jayadeep >>> >>