Hey Pedro, On Tue, 30 Sept 2025 at 11:57, Pedro Mázala <[email protected]> wrote:
> Hey there Gunnar! Are you using TwoInputNonBroadcastStreamProcessFunction > <https://github.com/apache/flink/blob/538c24c8bf490eb8870c4d39c8ac5b204d3613ee/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java> > ? > Yes, I am, via TwoInputNonBroadcastJoinProcessFunction which extends TwoInputNonBroadcastStreamProcessFunction. Is the watermark definition on the method declareWatermarks? > Yepp, it is. > If so, I think you may want to add defaultHandlingStrategyForward() > <https://github.com/apache/flink/blob/538c24c8bf490eb8870c4d39c8ac5b204d3613ee/docs/content/docs/dev/datastream-v2/watermark.md?plain=1#L97-L105> > to your builder. This will make sure the watermark is forwarded. If you > want to control it by yourself, I think you have to override > the onWatermark method. > I've done that, but in that case, whenever there's a watermark on either stream, it will be forwarded right away. Whereas I was expecting that the watermark will be forwarded downstream only when the minimum watermark of the two streams increases, as per this configuration of the definition: .combineFunctionMin() .combineWaitForAllChannels(true) I suppose I can implement the behavior myself by keeping track of the minimum watermark in state of my process function and manually emitting the WM when the minimum increases, but then I feel I'm reimplementing what that combiner logic is supposed to do. > This is not knowledge from living it. I was just playing with something > similar. Hope it helps :D > Thanks, appreciate it! > > Att, > Pedro Mázala > --Gunnar > Be awesome > > > On Tue, 30 Sept 2025 at 11:31, Gunnar Morling < > [email protected]> wrote: > >> Hey all, >> >> Is there an example which shows the usage of watermarks with joins in the >> DS V2 API? Specifically, I'd like to propagate the minimum watermark of the >> two joined streams. Here's my watermark declaration: >> >> public static final LongWatermarkDeclaration WATERMARK_DECLARATION = >> WatermarkDeclarations >> .newBuilder("MY_CUSTOM_WATERMARK_IDENTIFIER") >> .typeLong() >> .combineFunctionMin() >> .combineWaitForAllChannels(true) >> .build(); >> >> I'm observing though that all watermarks from either side are propagated >> downstream, also if there's a lower watermark on the other side. Does this >> ring a bell for anyone? >> >> Thanks a lot, >> >> --Gunnar >> >>
