Hi Jayadeep, Unfortunately, it seems that PaneInfo is not well supported yet on the local runners: https://issues.apache.org/jira/browse/BEAM-3759
Can you share more about your use case, and what you'd like to do with the PaneInfo? On Sat, May 23, 2020 at 10:03 AM Jay <jayadeep.jayara...@gmail.com> wrote: > Hi All - > > Below is a sample code written in Python which reads data from Pub/Sub and > tries to determine the PaneInfo for different elements > > There are 3 rows of data as shown below > > {"country":"USA","user_id": 1,"ts": 0}, >> {"country":"USA","user_id": 2,"ts": 0}, >> {"country":"USA","user_id": 3,"ts": 8} > > > Below is the sample code which modifies the event timestamp for the > messages > >> class AddTimestampDoFn(beam.DoFn): >> def process(self, element): >> unix_timestamp = (datetime.datetime.now()).timestamp() + element["ts"] >> yield beam.window.TimestampedValue(element, unix_timestamp) > > > Below is a Helper class to retrieve the timestamp. This has no importance > apart from checking to see if the timestamp has been set correctly > >> class AddTimestamp(beam.DoFn): >> def process(self, element, timestamp=beam.DoFn.TimestampParam): >> yield (timestamp.to_utc_datetime(), element) > > > Code below reads the records from Pub/Sub and runs the ParDo's mentioned > above > > data = p | "read" >> >> beam.io.ReadFromPubSub(subscription=subscription) >> | "JsonConvert" >> beam.Map(json.loads) > > sliding_windows = ( >> data | 'ConvertIntoUserEvents' >> beam.ParDo(AddTimestampDoFn()) >> | 'AddTimestamp' >> beam.ParDo(AddTimestamp()) >> ) > > > Below is my trigger definition and the implementation > >> >> >> class ProcessRecord(beam.DoFn): >> def process(self, element, >> window=beam.DoFn.WindowParam,pane_info=beam.DoFn.PaneInfoParam): >> # access pane info e.g pane_info.is_first, pane_info.is_last, >> pane_info.timing >> yield (element, >> datetime.datetime.now(),window.start.to_utc_datetime(), >> window.end.to_utc_datetime(), pane_info.timing, pane_info.is_first, >> pane_info.is_last) > > > >> window_fn = beam.window.FixedWindows(10) >> trigger_fn = beam.transforms.trigger.AfterWatermark(early=AfterCount(1)) >> acc_dis_fn = beam.transforms.trigger.AccumulationMode.ACCUMULATING >> new_final = sliding_windows | "acc_30" >> beam.WindowInto( >> window_fn, >> trigger= trigger_fn, >> accumulation_mode=acc_dis_fn >> ) | "acc_30_par" >> beam.ParDo(ProcessRecord()) > > > > When I look at the output I see the below > > 0 (datetime.datetime(2020, 5, 23, 16, 44, 9, 598681), {'country': 'USA', > 'user_id': 1, 'ts': 0}) 2020-05-23 16:44:45.895890 2020-05-23 16:44:00 > 2020-05-23 > 16:44:10 3 True True > 1 (datetime.datetime(2020, 5, 23, 16, 44, 9, 995521), {'country': 'USA', > 'user_id': 2, 'ts': 0}) 2020-05-23 16:44:46.297163 2020-05-23 16:44:00 > 2020-05-23 > 16:44:10 3 True True > 2 (datetime.datetime(2020, 5, 23, 16, 44, 17, 995603), {'country': 'USA', > 'user_id': 3, 'ts': 8}) 2020-05-23 16:44:46.297259 2020-05-23 16:44:10 > 2020-05-23 > 16:44:20 3 True True > As can be observed above there are two Windows that have been defined > which is inline with the data and the FixedWindow strategy > Window 1 - 2020-05-23 16:44:00, 2020-05-23 16:44:10 > Window 2 - 2020-05-23 16:44:10, 2020-05-23 16:44:20 > > Couple of questions which I am not able to understand > 1. Why is the PaneInfo.Timing value returned as "3" (UNKNOWN) instead of > (EARLY, ON_TIME) ? > 2. Shouldn't for Window1 and Window 2 there should be two firings one > EARLY and one ON_TIME ? > 3. The last two boolean values are is_first and is_last again both have > been set to TRUE which doesn't look right. > > > Can someone suggest on what can be the issue ? > > Thanks, > Jayadeep >