Hi Yunfeng, So regarding the dropping of records for out of order watermark, lats say records later than T - B will be dropped by the first operator after watermarking, which is reading from the source. So then these records will never be forwarded to the step where we do event-time windowing. Hence those records will never arrive at that step.
Hence records with timestamp T - A - B will never reach my windowing operator, to get collected by the side outputs. Is this understanding correct? If this is the case then shouldn't A be less than B to atleast collect those records to get included in a particular window. Basically having allowed lateness A greater than the out of order bound B won't make sense as records later than T - B would have got dropped at the source itself. Please let me know if I am understanding this correctly or am I missing something? Thanks Sachin On Mon, Apr 15, 2024 at 6:56 AM Yunfeng Zhou <flink.zhouyunf...@gmail.com> wrote: > Hi Sachin, > > Firstly sorry for my misunderstanding about watermarking in the last > email. When you configure an out-of-orderness watermark with a > tolerance of B, the next watermark emitted after a record with > timestamp T would be T-B instead of T described in my last email. > > Then let's go back to your question. When the Flink job receives n > records with timestamp Tn, it will set the timestamp of the next > watermark to be max(Tn - B - 1ms), and that watermark will be emitted > after the next autoWatermarkInternal is reached. So a record with > timestamp T will not influence records less than T - B immediately, > instead it influences the next watermark, and the watermark afterwards > influences those records. > > As for the influence on those late records, Flink operators will drop > them by default, but you can also gather them for other downstream > logics. Please refer to > > https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/datastream/operators/windows/#getting-late-data-as-a-side-output > > Based on the analysis above, if you configure allowed lateness to A, > records with timestamps less than T - A - B will be dropped or > gathered as side outputs. > > Best, > Yunfeng > > On Fri, Apr 12, 2024 at 6:34 PM Sachin Mittal <sjmit...@gmail.com> wrote: > > > > Hi Yunfeng, > > I have a question around the tolerance for out of order bound > watermarking, > > > > What I understand that when consuming from source with out of order > bound set as B, lets say it gets a record with timestamp T. > > After that it will drop all the subsequent records which arrive with the > timestamp less than T - B. > > > > Please let me know if I understood this correctly. > > > > If this is correct, then how does allowed lateness when performing event > time windowing works ? Say allowed lateness is set as A, > > does this mean that value of A should be less than that of B because > records with timestamp less than T - B would have already been dropped at > the source. > > > > If this is not the case than how does lateness work with our of order > boundedness ? > > > > Thanks > > Sachin > > > > > > On Fri, Apr 12, 2024 at 12:30 PM Yunfeng Zhou < > flink.zhouyunf...@gmail.com> wrote: > >> > >> Hi Sachin, > >> > >> 1. When your Flink job performs an operation like map or flatmap, the > >> output records would be automatically assigned with the same timestamp > >> as the input record. You don't need to manually assign the timestamp > >> in each step. So the windowing result in your example should be as you > >> have expected. > >> > >> 2. The frequency of watermarks can be configured by > >> pipeline.auto-watermark-interval in flink-conf.yaml, or > >> ExecutionConfig#setAutoWatermarkInterval in Java API. In your example, > >> the event time related to the Watermark is still T, just that the job > >> will tolerate any records whose timestamp is in range [T-B, T]. > >> > >> Best, > >> Yunfeng > >> > >> On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal <sjmit...@gmail.com> > wrote: > >> > > >> > Hello folks, > >> > I have few questions: > >> > > >> > Say I have a source like this: > >> > > >> > final DataStream<Data> data = > >> > env.fromSource( > >> > source, > >> > > WatermarkStrategy.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(60)) > >> > .withTimestampAssigner((event, timestamp) -> > event.timestamp)); > >> > > >> > > >> > My pipeline after this is as followed: > >> > > >> > data.flatMap(new MyFlattendData()) > >> > .keyBy(new MyKeySelector()) > >> > .window(TumblingEventTimeWindows.of(Time.seconds(60))) > >> > .reduce(new MyReducer()); > >> > > >> > > >> > First question I have is that the timestamp I assign from the source, > would it get carried to all steps below to my window ? > >> > Example say I have timestamped data from source as: > >> > => [ (10, data1), (12, data2), (59, data3), (61, data4), ... ] > >> > > >> > would this get flattened to say: > >> > => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...] > >> > > >> > then keyed to say: > >> > => [ (10, [key1, flatdata1]), (12, [key1, flatdata2]), (61, > [key1, flatdata4]), ... ] > >> > > >> > windows: > >> > 1st => [ flatdata1, flatdata2 ] > >> > 2nd => [ flatdata4, ... ] > >> > > >> > Would the windows created before the reduce function be applied be > like I have illustrated or to have it this way, do I need to output a > record at each step with the timestamp assigned for that record ? > >> > > >> > Basically is the timestamp assigned when reading from the source > pushed (retained) down to all the steps below when doing event time window > operation ? > >> > > >> > > >> > Next question is in my watermark strategy: how do I set the period of > the watermarking. > >> > Basically from An out-of-order bound B means that once an event with > timestamp T was encountered, no events older than T - B will follow any > more when the watermarking is done. > >> > > >> > However, how frequently is watermarking done and when say > watermarking, the last encountered event was with timestamp T , does this > mean watermark timestamp would be T - B ? > >> > > >> > How can we control the watermarking period ? > >> > > >> > Thanks > >> > Sachin > >> > > >> > > >> > > >> > > >> > > >> > > >> > > >> > >