Hi Jayadeep,
Unfortunately, it seems that PaneInfo is not well supported yet on the
local runners: https://issues.apache.org/jira/browse/BEAM-3759

Can you share more about your use case, and what you'd like to do with the
PaneInfo?

On Sat, May 23, 2020 at 10:03 AM Jay <jayadeep.jayara...@gmail.com> wrote:

> Hi All -
>
> Below is a sample code written in Python which reads data from Pub/Sub and
> tries to determine the PaneInfo for different elements
>
> There are 3 rows of data as shown below
>
> {"country":"USA","user_id": 1,"ts": 0},
>> {"country":"USA","user_id": 2,"ts": 0},
>> {"country":"USA","user_id": 3,"ts": 8}
>
>
> Below is the sample code which modifies the event timestamp for the
> messages
>
>> class AddTimestampDoFn(beam.DoFn):
>>   def process(self, element):
>>     unix_timestamp = (datetime.datetime.now()).timestamp() + element["ts"]
>>     yield beam.window.TimestampedValue(element, unix_timestamp)
>
>
> Below is a Helper class to retrieve the timestamp. This has no importance
> apart from checking to see if the timestamp has been set correctly
>
>> class AddTimestamp(beam.DoFn):
>>   def process(self, element, timestamp=beam.DoFn.TimestampParam):
>>     yield (timestamp.to_utc_datetime(), element)
>
>
> Code below reads the records from Pub/Sub and runs the ParDo's mentioned
> above
>
> data = p | "read"                  >>
>> beam.io.ReadFromPubSub(subscription=subscription)
>>          | "JsonConvert"           >> beam.Map(json.loads)
>
> sliding_windows = (
>>       data | 'ConvertIntoUserEvents' >> beam.ParDo(AddTimestampDoFn())
>>            | 'AddTimestamp'          >> beam.ParDo(AddTimestamp())
>> )
>
>
> Below is my trigger definition and the implementation
>
>>
>>
>> class ProcessRecord(beam.DoFn):
>>   def process(self, element,
>> window=beam.DoFn.WindowParam,pane_info=beam.DoFn.PaneInfoParam):
>>      # access pane info e.g pane_info.is_first, pane_info.is_last,
>> pane_info.timing
>>      yield (element,
>> datetime.datetime.now(),window.start.to_utc_datetime(),
>> window.end.to_utc_datetime(), pane_info.timing, pane_info.is_first,
>> pane_info.is_last)
>
>
>
>> window_fn  = beam.window.FixedWindows(10)
>> trigger_fn = beam.transforms.trigger.AfterWatermark(early=AfterCount(1))
>> acc_dis_fn = beam.transforms.trigger.AccumulationMode.ACCUMULATING
>> new_final = sliding_windows | "acc_30" >> beam.WindowInto(
>>             window_fn,
>>             trigger= trigger_fn,
>>             accumulation_mode=acc_dis_fn
>>         ) | "acc_30_par" >> beam.ParDo(ProcessRecord())
>
>
>
>  When I look at the output I see the below
>
> 0 (datetime.datetime(2020, 5, 23, 16, 44, 9, 598681), {'country': 'USA',
> 'user_id': 1, 'ts': 0}) 2020-05-23 16:44:45.895890 2020-05-23 16:44:00 
> 2020-05-23
> 16:44:10 3 True True
> 1 (datetime.datetime(2020, 5, 23, 16, 44, 9, 995521), {'country': 'USA',
> 'user_id': 2, 'ts': 0}) 2020-05-23 16:44:46.297163 2020-05-23 16:44:00 
> 2020-05-23
> 16:44:10 3 True True
> 2 (datetime.datetime(2020, 5, 23, 16, 44, 17, 995603), {'country': 'USA',
> 'user_id': 3, 'ts': 8}) 2020-05-23 16:44:46.297259 2020-05-23 16:44:10 
> 2020-05-23
> 16:44:20 3 True True
> As can be observed above there are two Windows that have been defined
> which is inline with the data and the FixedWindow strategy
> Window 1 - 2020-05-23 16:44:00, 2020-05-23 16:44:10
> Window 2 - 2020-05-23 16:44:10, 2020-05-23 16:44:20
>
> Couple of questions which I am not able to understand
> 1. Why is the PaneInfo.Timing value returned as "3" (UNKNOWN) instead of
> (EARLY, ON_TIME) ?
> 2. Shouldn't for Window1 and Window 2 there should be two firings one
> EARLY and one ON_TIME ?
> 3. The last two boolean values are is_first and is_last again both have
> been set to TRUE which doesn't look right.
>
>
> Can someone suggest on what can be the issue ?
>
> Thanks,
> Jayadeep
>

Reply via email to