Hi All - Below is a sample code written in Python which reads data from Pub/Sub and tries to determine the PaneInfo for different elements
There are 3 rows of data as shown below {"country":"USA","user_id": 1,"ts": 0}, > {"country":"USA","user_id": 2,"ts": 0}, > {"country":"USA","user_id": 3,"ts": 8} Below is the sample code which modifies the event timestamp for the messages > class AddTimestampDoFn(beam.DoFn): > def process(self, element): > unix_timestamp = (datetime.datetime.now()).timestamp() + element["ts"] > yield beam.window.TimestampedValue(element, unix_timestamp) Below is a Helper class to retrieve the timestamp. This has no importance apart from checking to see if the timestamp has been set correctly > class AddTimestamp(beam.DoFn): > def process(self, element, timestamp=beam.DoFn.TimestampParam): > yield (timestamp.to_utc_datetime(), element) Code below reads the records from Pub/Sub and runs the ParDo's mentioned above data = p | "read" >> > beam.io.ReadFromPubSub(subscription=subscription) > | "JsonConvert" >> beam.Map(json.loads) sliding_windows = ( > data | 'ConvertIntoUserEvents' >> beam.ParDo(AddTimestampDoFn()) > | 'AddTimestamp' >> beam.ParDo(AddTimestamp()) > ) Below is my trigger definition and the implementation > > > class ProcessRecord(beam.DoFn): > def process(self, element, > window=beam.DoFn.WindowParam,pane_info=beam.DoFn.PaneInfoParam): > # access pane info e.g pane_info.is_first, pane_info.is_last, > pane_info.timing > yield (element, > datetime.datetime.now(),window.start.to_utc_datetime(), > window.end.to_utc_datetime(), pane_info.timing, pane_info.is_first, > pane_info.is_last) > window_fn = beam.window.FixedWindows(10) > trigger_fn = beam.transforms.trigger.AfterWatermark(early=AfterCount(1)) > acc_dis_fn = beam.transforms.trigger.AccumulationMode.ACCUMULATING > new_final = sliding_windows | "acc_30" >> beam.WindowInto( > window_fn, > trigger= trigger_fn, > accumulation_mode=acc_dis_fn > ) | "acc_30_par" >> beam.ParDo(ProcessRecord()) When I look at the output I see the below 0 (datetime.datetime(2020, 5, 23, 16, 44, 9, 598681), {'country': 'USA', 'user_id': 1, 'ts': 0}) 2020-05-23 16:44:45.895890 2020-05-23 16:44:00 2020-05-23 16:44:10 3 True True 1 (datetime.datetime(2020, 5, 23, 16, 44, 9, 995521), {'country': 'USA', 'user_id': 2, 'ts': 0}) 2020-05-23 16:44:46.297163 2020-05-23 16:44:00 2020-05-23 16:44:10 3 True True 2 (datetime.datetime(2020, 5, 23, 16, 44, 17, 995603), {'country': 'USA', 'user_id': 3, 'ts': 8}) 2020-05-23 16:44:46.297259 2020-05-23 16:44:10 2020-05-23 16:44:20 3 True True As can be observed above there are two Windows that have been defined which is inline with the data and the FixedWindow strategy Window 1 - 2020-05-23 16:44:00, 2020-05-23 16:44:10 Window 2 - 2020-05-23 16:44:10, 2020-05-23 16:44:20 Couple of questions which I am not able to understand 1. Why is the PaneInfo.Timing value returned as "3" (UNKNOWN) instead of (EARLY, ON_TIME) ? 2. Shouldn't for Window1 and Window 2 there should be two firings one EARLY and one ON_TIME ? 3. The last two boolean values are is_first and is_last again both have been set to TRUE which doesn't look right. Can someone suggest on what can be the issue ? Thanks, Jayadeep