To clarify, PaneInfo is supported on the FnAPI local runner, but not on the bundle based one. Unfortunately, Streaming is not supported on the FnAPI one (yet), but work there is ongoing.
On Tue, May 26, 2020 at 11:46 AM Pablo Estrada <pabl...@google.com> wrote: > Hi Jayadeep, > Unfortunately, it seems that PaneInfo is not well supported yet on the > local runners: https://issues.apache.org/jira/browse/BEAM-3759 > > Can you share more about your use case, and what you'd like to do with the > PaneInfo? > > On Sat, May 23, 2020 at 10:03 AM Jay <jayadeep.jayara...@gmail.com> wrote: > >> Hi All - >> >> Below is a sample code written in Python which reads data from Pub/Sub >> and tries to determine the PaneInfo for different elements >> >> There are 3 rows of data as shown below >> >> {"country":"USA","user_id": 1,"ts": 0}, >>> {"country":"USA","user_id": 2,"ts": 0}, >>> {"country":"USA","user_id": 3,"ts": 8} >> >> >> Below is the sample code which modifies the event timestamp for the >> messages >> >>> class AddTimestampDoFn(beam.DoFn): >>> def process(self, element): >>> unix_timestamp = (datetime.datetime.now()).timestamp() + >>> element["ts"] >>> yield beam.window.TimestampedValue(element, unix_timestamp) >> >> >> Below is a Helper class to retrieve the timestamp. This has no importance >> apart from checking to see if the timestamp has been set correctly >> >>> class AddTimestamp(beam.DoFn): >>> def process(self, element, timestamp=beam.DoFn.TimestampParam): >>> yield (timestamp.to_utc_datetime(), element) >> >> >> Code below reads the records from Pub/Sub and runs the ParDo's mentioned >> above >> >> data = p | "read" >> >>> beam.io.ReadFromPubSub(subscription=subscription) >>> | "JsonConvert" >> beam.Map(json.loads) >> >> sliding_windows = ( >>> data | 'ConvertIntoUserEvents' >> beam.ParDo(AddTimestampDoFn()) >>> | 'AddTimestamp' >> beam.ParDo(AddTimestamp()) >>> ) >> >> >> Below is my trigger definition and the implementation >> >>> >>> >>> class ProcessRecord(beam.DoFn): >>> def process(self, element, >>> window=beam.DoFn.WindowParam,pane_info=beam.DoFn.PaneInfoParam): >>> # access pane info e.g pane_info.is_first, pane_info.is_last, >>> pane_info.timing >>> yield (element, >>> datetime.datetime.now(),window.start.to_utc_datetime(), >>> window.end.to_utc_datetime(), pane_info.timing, pane_info.is_first, >>> pane_info.is_last) >> >> >> >>> window_fn = beam.window.FixedWindows(10) >>> trigger_fn = beam.transforms.trigger.AfterWatermark(early=AfterCount(1)) >>> acc_dis_fn = beam.transforms.trigger.AccumulationMode.ACCUMULATING >>> new_final = sliding_windows | "acc_30" >> beam.WindowInto( >>> window_fn, >>> trigger= trigger_fn, >>> accumulation_mode=acc_dis_fn >>> ) | "acc_30_par" >> beam.ParDo(ProcessRecord()) >> >> >> >> When I look at the output I see the below >> >> 0 (datetime.datetime(2020, 5, 23, 16, 44, 9, 598681), {'country': 'USA', >> 'user_id': 1, 'ts': 0}) 2020-05-23 16:44:45.895890 2020-05-23 16:44:00 >> 2020-05-23 >> 16:44:10 3 True True >> 1 (datetime.datetime(2020, 5, 23, 16, 44, 9, 995521), {'country': 'USA', >> 'user_id': 2, 'ts': 0}) 2020-05-23 16:44:46.297163 2020-05-23 16:44:00 >> 2020-05-23 >> 16:44:10 3 True True >> 2 (datetime.datetime(2020, 5, 23, 16, 44, 17, 995603), {'country': >> 'USA', 'user_id': 3, 'ts': 8}) 2020-05-23 16:44:46.297259 2020-05-23 >> 16:44:10 2020-05-23 16:44:20 3 True True >> As can be observed above there are two Windows that have been defined >> which is inline with the data and the FixedWindow strategy >> Window 1 - 2020-05-23 16:44:00, 2020-05-23 16:44:10 >> Window 2 - 2020-05-23 16:44:10, 2020-05-23 16:44:20 >> >> Couple of questions which I am not able to understand >> 1. Why is the PaneInfo.Timing value returned as "3" (UNKNOWN) instead of >> (EARLY, ON_TIME) ? >> 2. Shouldn't for Window1 and Window 2 there should be two firings one >> EARLY and one ON_TIME ? >> 3. The last two boolean values are is_first and is_last again both have >> been set to TRUE which doesn't look right. >> >> >> Can someone suggest on what can be the issue ? >> >> Thanks, >> Jayadeep >> >