Hey Pedro,

On Tue, 30 Sept 2025 at 11:57, Pedro Mázala <[email protected]> wrote:

> Hey there Gunnar! Are you using TwoInputNonBroadcastStreamProcessFunction
> <https://github.com/apache/flink/blob/538c24c8bf490eb8870c4d39c8ac5b204d3613ee/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/function/TwoInputNonBroadcastStreamProcessFunction.java>
> ?
>

Yes, I am, via TwoInputNonBroadcastJoinProcessFunction which
extends TwoInputNonBroadcastStreamProcessFunction.

Is the watermark definition on the method declareWatermarks?
>

Yepp, it is.


> If so, I think you may want to add defaultHandlingStrategyForward()
> <https://github.com/apache/flink/blob/538c24c8bf490eb8870c4d39c8ac5b204d3613ee/docs/content/docs/dev/datastream-v2/watermark.md?plain=1#L97-L105>
> to your builder. This will make sure the watermark is forwarded. If you
> want to control it by yourself, I think you have to override
> the onWatermark method.
>

I've done that, but in that case, whenever there's a watermark on either
stream, it will be forwarded right away. Whereas I was expecting that the
watermark will be forwarded downstream only when the minimum watermark of
the two streams increases, as per this configuration of the definition:

    .combineFunctionMin()
    .combineWaitForAllChannels(true)

I suppose I can implement the behavior myself by keeping track of the
minimum watermark in state of my process function and manually emitting the
WM when the minimum increases, but then I feel I'm reimplementing what that
combiner logic is supposed to do.


> This is not knowledge from living it. I was just playing with something
> similar. Hope it helps :D
>

Thanks, appreciate it!

>
> Att,
> Pedro Mázala
>

--Gunnar


> Be awesome
>
>
> On Tue, 30 Sept 2025 at 11:31, Gunnar Morling <
> [email protected]> wrote:
>
>> Hey all,
>>
>> Is there an example which shows the usage of watermarks with joins in the
>> DS V2 API? Specifically, I'd like to propagate the minimum watermark of the
>> two joined streams. Here's my watermark declaration:
>>
>>     public static final LongWatermarkDeclaration WATERMARK_DECLARATION =
>> WatermarkDeclarations
>>         .newBuilder("MY_CUSTOM_WATERMARK_IDENTIFIER")
>>         .typeLong()
>>         .combineFunctionMin()
>>         .combineWaitForAllChannels(true)
>>         .build();
>>
>> I'm observing though that all watermarks from either side are propagated
>> downstream, also if there's a lower watermark on the other side. Does this
>> ring a bell for anyone?
>>
>> Thanks a lot,
>>
>> --Gunnar
>>
>>

Reply via email to