Also, note that I observed similar behaviour with DataflowRunner. On Thu, 28 May 2020 at 21:24, Jay <jayadeep.jayara...@gmail.com> wrote:
> I was trying to simulate the PaneInfo in Python to check for parity with > the Java SDK. I was able to get PaneInfo after introduction a CombinePerKey. > I am not sure why GBK operation is returning the correct information. > > > On Wed, 27 May 2020 at 00:54, Robert Bradshaw <rober...@google.com> wrote: > >> To clarify, PaneInfo is supported on the FnAPI local runner, but not on >> the bundle based one. Unfortunately, Streaming is not supported on the >> FnAPI one (yet), but work there is ongoing. >> >> On Tue, May 26, 2020 at 11:46 AM Pablo Estrada <pabl...@google.com> >> wrote: >> >>> Hi Jayadeep, >>> Unfortunately, it seems that PaneInfo is not well supported yet on the >>> local runners: https://issues.apache.org/jira/browse/BEAM-3759 >>> >>> Can you share more about your use case, and what you'd like to do with >>> the PaneInfo? >>> >>> On Sat, May 23, 2020 at 10:03 AM Jay <jayadeep.jayara...@gmail.com> >>> wrote: >>> >>>> Hi All - >>>> >>>> Below is a sample code written in Python which reads data from Pub/Sub >>>> and tries to determine the PaneInfo for different elements >>>> >>>> There are 3 rows of data as shown below >>>> >>>> {"country":"USA","user_id": 1,"ts": 0}, >>>>> {"country":"USA","user_id": 2,"ts": 0}, >>>>> {"country":"USA","user_id": 3,"ts": 8} >>>> >>>> >>>> Below is the sample code which modifies the event timestamp for the >>>> messages >>>> >>>>> class AddTimestampDoFn(beam.DoFn): >>>>> def process(self, element): >>>>> unix_timestamp = (datetime.datetime.now()).timestamp() + >>>>> element["ts"] >>>>> yield beam.window.TimestampedValue(element, unix_timestamp) >>>> >>>> >>>> Below is a Helper class to retrieve the timestamp. This has no >>>> importance apart from checking to see if the timestamp has been set >>>> correctly >>>> >>>>> class AddTimestamp(beam.DoFn): >>>>> def process(self, element, timestamp=beam.DoFn.TimestampParam): >>>>> yield (timestamp.to_utc_datetime(), element) >>>> >>>> >>>> Code below reads the records from Pub/Sub and runs the ParDo's >>>> mentioned above >>>> >>>> data = p | "read" >> >>>>> beam.io.ReadFromPubSub(subscription=subscription) >>>>> | "JsonConvert" >> beam.Map(json.loads) >>>> >>>> sliding_windows = ( >>>>> data | 'ConvertIntoUserEvents' >> beam.ParDo(AddTimestampDoFn()) >>>>> | 'AddTimestamp' >> beam.ParDo(AddTimestamp()) >>>>> ) >>>> >>>> >>>> Below is my trigger definition and the implementation >>>> >>>>> >>>>> >>>>> class ProcessRecord(beam.DoFn): >>>>> def process(self, element, >>>>> window=beam.DoFn.WindowParam,pane_info=beam.DoFn.PaneInfoParam): >>>>> # access pane info e.g pane_info.is_first, pane_info.is_last, >>>>> pane_info.timing >>>>> yield (element, >>>>> datetime.datetime.now(),window.start.to_utc_datetime(), >>>>> window.end.to_utc_datetime(), pane_info.timing, pane_info.is_first, >>>>> pane_info.is_last) >>>> >>>> >>>> >>>>> window_fn = beam.window.FixedWindows(10) >>>>> trigger_fn = >>>>> beam.transforms.trigger.AfterWatermark(early=AfterCount(1)) >>>>> acc_dis_fn = beam.transforms.trigger.AccumulationMode.ACCUMULATING >>>>> new_final = sliding_windows | "acc_30" >> beam.WindowInto( >>>>> window_fn, >>>>> trigger= trigger_fn, >>>>> accumulation_mode=acc_dis_fn >>>>> ) | "acc_30_par" >> beam.ParDo(ProcessRecord()) >>>> >>>> >>>> >>>> When I look at the output I see the below >>>> >>>> 0 (datetime.datetime(2020, 5, 23, 16, 44, 9, 598681), {'country': >>>> 'USA', 'user_id': 1, 'ts': 0}) 2020-05-23 16:44:45.895890 2020-05-23 >>>> 16:44:00 2020-05-23 16:44:10 3 True True >>>> 1 (datetime.datetime(2020, 5, 23, 16, 44, 9, 995521), {'country': >>>> 'USA', 'user_id': 2, 'ts': 0}) 2020-05-23 16:44:46.297163 2020-05-23 >>>> 16:44:00 2020-05-23 16:44:10 3 True True >>>> 2 (datetime.datetime(2020, 5, 23, 16, 44, 17, 995603), {'country': >>>> 'USA', 'user_id': 3, 'ts': 8}) 2020-05-23 16:44:46.297259 2020-05-23 >>>> 16:44:10 2020-05-23 16:44:20 3 True True >>>> As can be observed above there are two Windows that have been defined >>>> which is inline with the data and the FixedWindow strategy >>>> Window 1 - 2020-05-23 16:44:00, 2020-05-23 16:44:10 >>>> Window 2 - 2020-05-23 16:44:10, 2020-05-23 16:44:20 >>>> >>>> Couple of questions which I am not able to understand >>>> 1. Why is the PaneInfo.Timing value returned as "3" (UNKNOWN) instead >>>> of (EARLY, ON_TIME) ? >>>> 2. Shouldn't for Window1 and Window 2 there should be two firings one >>>> EARLY and one ON_TIME ? >>>> 3. The last two boolean values are is_first and is_last again both have >>>> been set to TRUE which doesn't look right. >>>> >>>> >>>> Can someone suggest on what can be the issue ? >>>> >>>> Thanks, >>>> Jayadeep >>>> >>>