Hi Yunfeng, I have a question around the tolerance for out of order bound watermarking,
What I understand that when consuming from source with out of order bound set as B, lets say it gets a record with timestamp T. After that it will drop all the subsequent records which arrive with the timestamp less than T - B. Please let me know if I understood this correctly. If this is correct, then how does allowed lateness when performing event time windowing works ? Say allowed lateness is set as A, does this mean that value of A should be less than that of B because records with timestamp less than T - B would have already been dropped at the source. If this is not the case than how does lateness work with our of order boundedness ? Thanks Sachin On Fri, Apr 12, 2024 at 12:30 PM Yunfeng Zhou <flink.zhouyunf...@gmail.com> wrote: > Hi Sachin, > > 1. When your Flink job performs an operation like map or flatmap, the > output records would be automatically assigned with the same timestamp > as the input record. You don't need to manually assign the timestamp > in each step. So the windowing result in your example should be as you > have expected. > > 2. The frequency of watermarks can be configured by > pipeline.auto-watermark-interval in flink-conf.yaml, or > ExecutionConfig#setAutoWatermarkInterval in Java API. In your example, > the event time related to the Watermark is still T, just that the job > will tolerate any records whose timestamp is in range [T-B, T]. > > Best, > Yunfeng > > On Thu, Apr 11, 2024 at 9:15 PM Sachin Mittal <sjmit...@gmail.com> wrote: > > > > Hello folks, > > I have few questions: > > > > Say I have a source like this: > > > > final DataStream<Data> data = > > env.fromSource( > > source, > > > WatermarkStrategy.<Data>forBoundedOutOfOrderness(Duration.ofSeconds(60)) > > .withTimestampAssigner((event, timestamp) -> > event.timestamp)); > > > > > > My pipeline after this is as followed: > > > > data.flatMap(new MyFlattendData()) > > .keyBy(new MyKeySelector()) > > .window(TumblingEventTimeWindows.of(Time.seconds(60))) > > .reduce(new MyReducer()); > > > > > > First question I have is that the timestamp I assign from the source, > would it get carried to all steps below to my window ? > > Example say I have timestamped data from source as: > > => [ (10, data1), (12, data2), (59, data3), (61, data4), ... ] > > > > would this get flattened to say: > > => [ (10, flatdata1), (12, flatdata2), (61, flatdata4), ...] > > > > then keyed to say: > > => [ (10, [key1, flatdata1]), (12, [key1, flatdata2]), (61, [key1, > flatdata4]), ... ] > > > > windows: > > 1st => [ flatdata1, flatdata2 ] > > 2nd => [ flatdata4, ... ] > > > > Would the windows created before the reduce function be applied be like > I have illustrated or to have it this way, do I need to output a record at > each step with the timestamp assigned for that record ? > > > > Basically is the timestamp assigned when reading from the source pushed > (retained) down to all the steps below when doing event time window > operation ? > > > > > > Next question is in my watermark strategy: how do I set the period of > the watermarking. > > Basically from An out-of-order bound B means that once an event with > timestamp T was encountered, no events older than T - B will follow any > more when the watermarking is done. > > > > However, how frequently is watermarking done and when say watermarking, > the last encountered event was with timestamp T , does this mean watermark > timestamp would be T - B ? > > > > How can we control the watermarking period ? > > > > Thanks > > Sachin > > > > > > > > > > > > > > > > >