Hi Yunfeng,
I have a question around the tolerance for out of order bound watermarking,

What I understand that when consuming from source with out of order bound
set as B, lets say it gets a record with timestamp T.
After that it will drop all the subsequent records which arrive with the
timestamp less than T - B.

Please let me know if I understood this correctly.

If this is correct, then how does allowed lateness when performing event
time windowing works ?  Say allowed lateness is set as A,
does this mean that value of A should be less than that of B because
records with timestamp less than T - B would have already been dropped at
the source.

If this is not the case than how does lateness work with our of order
boundedness ?


On Fri, Apr 12, 2024 at 12:30 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com>

> Hi Sachin,
> 1. When your Flink job performs an operation like map or flatmap, the
> output records would be automatically assigned with the same timestamp
> as the input record. You don't need to manually assign the timestamp
> in each step. So the windowing result in your example should be as you
> have expected.
> 2. The frequency of watermarks can be configured by
> pipeline.auto-watermark-interval in flink-conf.yaml, or
> ExecutionConfig#setAutoWatermarkInterval in Java API. In your example,
> the event time related to the Watermark is still T, just that the job
> will tolerate any records whose timestamp is in range [T-B, T].
> Best,
> Yunfeng
> On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > Hello folks,
> > I have few questions:
> >
> > Say I have a source like this:
> >
> > final DataStream<Data> data =
> >     env.fromSource(
> >         source,
> >
>  WatermarkStrategy.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(60))
> >             .withTimestampAssigner((event, timestamp) ->
> event.timestamp));
> >
> >
> > My pipeline after this is as followed:
> >
> >     data.flatMap(new MyFlattendData())
> >         .keyBy(new MyKeySelector())
> >         .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> >         .reduce(new MyReducer());
> >
> >
> > First question I have is that the timestamp I assign from the source,
> would it get carried to all steps below to my window ?
> > Example say I have timestamped data from source as:
> > => [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]
> >
> >  would this get flattened to say:
> > => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]
> >
> > then keyed to say:
> > => [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61, [key1,
> flatdata4]),    ...    ]
> >
> > windows:
> > 1st => [ flatdata1, flatdata2 ]
> > 2nd => [ flatdata4, ... ]
> >
> > Would the windows created before the reduce function be applied be like
> I have illustrated or to have it this way, do I need to output a record at
> each step with the timestamp assigned for that record ?
> >
> > Basically is the timestamp assigned when reading from the source pushed
> (retained) down to all the steps below when doing event time window
> operation ?
> >
> >
> > Next question is in my watermark strategy: how do I set the period of
> the watermarking.
> > Basically from An out-of-order bound B means that once an event with
> timestamp T was encountered, no events older than T - B will follow any
> more when the watermarking is done.
> >
> > However, how frequently is watermarking done and when say watermarking,
> the last encountered event was with timestamp T , does this mean watermark
> timestamp would be T - B ?
> >
> > How can we control the watermarking period ?
> >
> > Thanks
> > Sachin
> >
> >
> >
> >
> >
> >
> >
> >

Reply via email to