Hi Yunfeng,
So regarding the dropping of records for out of order watermark, lats say
records later than T - B will be dropped by the first operator after
watermarking, which is reading from the source.
So then these records will never be forwarded to the step where we do
event-time windowing. Hence those records will never arrive at that step.

Hence records with timestamp T - A - B will never reach my windowing
operator, to get collected by the side outputs.

Is this understanding correct?

If this is the case then shouldn't A be less than B to atleast collect
those records to get included in a particular window.

Basically having allowed lateness A greater than the out of order bound B
won't make sense as records later than T - B would have got dropped at the
source itself.

Please let me know if I am understanding this correctly or am I missing
something?

Thanks
Sachin


On Mon, Apr 15, 2024 at 6:56 AM Yunfeng Zhou <flink.zhouyunf...@gmail.com>
wrote:

> Hi Sachin,
>
> Firstly sorry for my misunderstanding about watermarking in the last
> email. When you configure an out-of-orderness watermark with a
> tolerance of B, the next watermark emitted after a record with
> timestamp T would be T-B instead of T described in my last email.
>
> Then let's go back to your question. When the Flink job receives n
> records with timestamp Tn, it will set the timestamp of the next
> watermark to be max(Tn - B - 1ms), and that watermark will be emitted
> after the next autoWatermarkInternal is reached. So a record with
> timestamp T will not influence records less than T - B immediately,
> instead it influences the next watermark, and the watermark afterwards
> influences those records.
>
> As for the influence on those late records, Flink operators will drop
> them by default, but you can also gather them for other downstream
> logics. Please refer to
>
> https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output
>
> Based on the analysis above, if you configure allowed lateness to A,
> records with timestamps less than T - A - B will be dropped or
> gathered as side outputs.
>
> Best,
> Yunfeng
>
> On Fri, Apr 12, 2024 at 6:34 PM Sachin Mittal <sjmit...@gmail.com> wrote:
> >
> > Hi Yunfeng,
> > I have a question around the tolerance for out of order bound
> watermarking,
> >
> > What I understand that when consuming from source with out of order
> bound set as B, lets say it gets a record with timestamp T.
> > After that it will drop all the subsequent records which arrive with the
> timestamp less than T - B.
> >
> > Please let me know if I understood this correctly.
> >
> > If this is correct, then how does allowed lateness when performing event
> time windowing works ?  Say allowed lateness is set as A,
> > does this mean that value of A should be less than that of B because
> records with timestamp less than T - B would have already been dropped at
> the source.
> >
> > If this is not the case than how does lateness work with our of order
> boundedness ?
> >
> > Thanks
> > Sachin
> >
> >
> > On Fri, Apr 12, 2024 at 12:30 PM Yunfeng Zhou <
> flink.zhouyunf...@gmail.com> wrote:
> >>
> >> Hi Sachin,
> >>
> >> 1. When your Flink job performs an operation like map or flatmap, the
> >> output records would be automatically assigned with the same timestamp
> >> as the input record. You don't need to manually assign the timestamp
> >> in each step. So the windowing result in your example should be as you
> >> have expected.
> >>
> >> 2. The frequency of watermarks can be configured by
> >> pipeline.auto-watermark-interval in flink-conf.yaml, or
> >> ExecutionConfig#setAutoWatermarkInterval in Java API. In your example,
> >> the event time related to the Watermark is still T, just that the job
> >> will tolerate any records whose timestamp is in range [T-B, T].
> >>
> >> Best,
> >> Yunfeng
> >>
> >> On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal <sjmit...@gmail.com>
> wrote:
> >> >
> >> > Hello folks,
> >> > I have few questions:
> >> >
> >> > Say I have a source like this:
> >> >
> >> > final DataStream<Data> data =
> >> >     env.fromSource(
> >> >         source,
> >> >
>  WatermarkStrategy.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(60))
> >> >             .withTimestampAssigner((event, timestamp) ->
> event.timestamp));
> >> >
> >> >
> >> > My pipeline after this is as followed:
> >> >
> >> >     data.flatMap(new MyFlattendData())
> >> >         .keyBy(new MyKeySelector())
> >> >         .window(TumblingEventTimeWindows.of(Time.seconds(60)))
> >> >         .reduce(new MyReducer());
> >> >
> >> >
> >> > First question I have is that the timestamp I assign from the source,
> would it get carried to all steps below to my window ?
> >> > Example say I have timestamped data from source as:
> >> > => [ (10, data1), (12, data2), (59, data3), (61, data4), ...  ]
> >> >
> >> >  would this get flattened to say:
> >> > => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...]
> >> >
> >> > then keyed to say:
> >> > => [ (10, [key1, flatdata1]),   (12, [key1, flatdata2]),   (61,
> [key1, flatdata4]),    ...    ]
> >> >
> >> > windows:
> >> > 1st => [ flatdata1, flatdata2 ]
> >> > 2nd => [ flatdata4, ... ]
> >> >
> >> > Would the windows created before the reduce function be applied be
> like I have illustrated or to have it this way, do I need to output a
> record at each step with the timestamp assigned for that record ?
> >> >
> >> > Basically is the timestamp assigned when reading from the source
> pushed (retained) down to all the steps below when doing event time window
> operation ?
> >> >
> >> >
> >> > Next question is in my watermark strategy: how do I set the period of
> the watermarking.
> >> > Basically from An out-of-order bound B means that once an event with
> timestamp T was encountered, no events older than T - B will follow any
> more when the watermarking is done.
> >> >
> >> > However, how frequently is watermarking done and when say
> watermarking, the last encountered event was with timestamp T , does this
> mean watermark timestamp would be T - B ?
> >> >
> >> > How can we control the watermarking period ?
> >> >
> >> > Thanks
> >> > Sachin
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
>

Reply via email to