Hi All -

Below is a sample code written in Python which reads data from Pub/Sub and
tries to determine the PaneInfo for different elements

There are 3 rows of data as shown below

{"country":"USA","user_id": 1,"ts": 0},
> {"country":"USA","user_id": 2,"ts": 0},
> {"country":"USA","user_id": 3,"ts": 8}


Below is the sample code which modifies the event timestamp for the messages

> class AddTimestampDoFn(beam.DoFn):
>   def process(self, element):
>     unix_timestamp = (datetime.datetime.now()).timestamp() + element["ts"]
>     yield beam.window.TimestampedValue(element, unix_timestamp)


Below is a Helper class to retrieve the timestamp. This has no importance
apart from checking to see if the timestamp has been set correctly

> class AddTimestamp(beam.DoFn):
>   def process(self, element, timestamp=beam.DoFn.TimestampParam):
>     yield (timestamp.to_utc_datetime(), element)


Code below reads the records from Pub/Sub and runs the ParDo's mentioned
above

data = p | "read"                  >>
> beam.io.ReadFromPubSub(subscription=subscription)
>          | "JsonConvert"           >> beam.Map(json.loads)

sliding_windows = (
>       data | 'ConvertIntoUserEvents' >> beam.ParDo(AddTimestampDoFn())
>            | 'AddTimestamp'          >> beam.ParDo(AddTimestamp())
> )


Below is my trigger definition and the implementation

>
>
> class ProcessRecord(beam.DoFn):
>   def process(self, element,
> window=beam.DoFn.WindowParam,pane_info=beam.DoFn.PaneInfoParam):
>      # access pane info e.g pane_info.is_first, pane_info.is_last,
> pane_info.timing
>      yield (element,
> datetime.datetime.now(),window.start.to_utc_datetime(),
> window.end.to_utc_datetime(), pane_info.timing, pane_info.is_first,
> pane_info.is_last)



> window_fn  = beam.window.FixedWindows(10)
> trigger_fn = beam.transforms.trigger.AfterWatermark(early=AfterCount(1))
> acc_dis_fn = beam.transforms.trigger.AccumulationMode.ACCUMULATING
> new_final = sliding_windows | "acc_30" >> beam.WindowInto(
>             window_fn,
>             trigger= trigger_fn,
>             accumulation_mode=acc_dis_fn
>         ) | "acc_30_par" >> beam.ParDo(ProcessRecord())



 When I look at the output I see the below

0 (datetime.datetime(2020, 5, 23, 16, 44, 9, 598681), {'country': 'USA',
'user_id': 1, 'ts': 0}) 2020-05-23 16:44:45.895890 2020-05-23 16:44:00
2020-05-23
16:44:10 3 True True
1 (datetime.datetime(2020, 5, 23, 16, 44, 9, 995521), {'country': 'USA',
'user_id': 2, 'ts': 0}) 2020-05-23 16:44:46.297163 2020-05-23 16:44:00
2020-05-23
16:44:10 3 True True
2 (datetime.datetime(2020, 5, 23, 16, 44, 17, 995603), {'country': 'USA',
'user_id': 3, 'ts': 8}) 2020-05-23 16:44:46.297259 2020-05-23 16:44:10
2020-05-23
16:44:20 3 True True
As can be observed above there are two Windows that have been defined which
is inline with the data and the FixedWindow strategy
Window 1 - 2020-05-23 16:44:00, 2020-05-23 16:44:10
Window 2 - 2020-05-23 16:44:10, 2020-05-23 16:44:20

Couple of questions which I am not able to understand
1. Why is the PaneInfo.Timing value returned as "3" (UNKNOWN) instead of
(EARLY, ON_TIME) ?
2. Shouldn't for Window1 and Window 2 there should be two firings one EARLY
and one ON_TIME ?
3. The last two boolean values are is_first and is_last again both have
been set to TRUE which doesn't look right.


Can someone suggest on what can be the issue ?

Thanks,
Jayadeep

Reply via email to