Hi Sachin,

1. When your Flink job performs an operation like map or flatmap, the
output records would be automatically assigned with the same timestamp
as the input record. You don't need to manually assign the timestamp
in each step. So the windowing result in your example should be as you
have expected.

2. The frequency of watermarks can be configured by
pipeline.auto-watermark-interval in flink-conf.yaml, or
ExecutionConfig#setAutoWatermarkInterval in Java API. In your example,
the event time related to the Watermark is still T, just that the job
will tolerate any records whose timestamp is in range [T-B, T].

Best,
Yunfeng

On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal <sjmit...@gmail.com> wrote:
>
> Hello folks,
> I have few questions:
>
> Say I have a source like this:
>
> final DataStream<Data> data =
>     env.fromSource(
>         source,
>         
> WatermarkStrategy.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(60))
>             .withTimestampAssigner((event, timestamp) -> event.timestamp));
>
>
> My pipeline after this is as followed:
>
>     data.flatMap(new MyFlattendData())
>         .keyBy(new MyKeySelector())
>         .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>         .reduce(new MyReducer());
>
>
> First question I have is that the timestamp I assign from the source, would 
> it get carried to all steps below to my window ?
> Example say I have timestamped data from source as:
> => [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]
>
>  would this get flattened to say:
> => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]
>
> then keyed to say:
> => [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61, [key1, 
> flatdata4]),    ...    ]
>
> windows:
> 1st => [ flatdata1, flatdata2 ]
> 2nd => [ flatdata4, ... ]
>
> Would the windows created before the reduce function be applied be like I 
> have illustrated or to have it this way, do I need to output a record at each 
> step with the timestamp assigned for that record ?
>
> Basically is the timestamp assigned when reading from the source pushed 
> (retained) down to all the steps below when doing event time window operation 
> ?
>
>
> Next question is in my watermark strategy: how do I set the period of the 
> watermarking.
> Basically from An out-of-order bound B means that once an event with 
> timestamp T was encountered, no events older than T - B will follow any more 
> when the watermarking is done.
>
> However, how frequently is watermarking done and when say watermarking, the 
> last encountered event was with timestamp T , does this mean watermark 
> timestamp would be T - B ?
>
> How can we control the watermarking period ?
>
> Thanks
> Sachin
>
>
>
>
>
>
>
>

Reply via email to