Hello! *Problem:* I am connecting to a Kafka Source with the Watermark Strategy below.
val watermarkStrategy = WatermarkStrategy .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS)) .withTimestampAssigner(new SerializableTimestampAssigner[StarscreamEventCounter_V1] { override def extractTimestamp(element: StarscreamEventCounter_V1, recordTimestamp: Long): Long = element.envelopeTimestamp }) The Watermarks are correctly getting assigned. However, when a reduce function is used the window never terminates because the `ctx.getCurrentWatermark()` returns the default value of `-9223372036854775808` in perpetuity. This is the stream code: stream .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _) The reduce uses this overloaded operator: @JsonIgnoreProperties(ignoreUnknown = true) case class StarscreamEventCounter_V1( envelopeTimestamp: Long, numberOfEvents: Int = 1 ) { def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = { StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp), that.numberOfEvents + this.numberOfEvents) } *Attempt to Solve:* 1. Validate that the Watermark is set on the source a. Set a custom trigger to emit a watermark on each event just in case 2. Test with aggregate / process functions a. Both other functions work properly - window closes and emits to a PrintSink 3. Change Watermark Generator to a custom generator a. Also change time horizons and let run for 1 day - window never closes due to the watermark being stuck at the min default. The sink never receives the data but the UI says there are records being output! *Hypothesis:* The output id of a reduce function is causing an upstream issue where the watermark is no longer assigned to the Window. I haven't been able to lock down what exactly is causing the issue though. My thought is that it might be a bug given it works for Aggregate/Process. It could be a bug in the IndexedCombinedWatermarkStatus, the partial watermarks should not be the min default value when I set the watermark per event - this is what I will be looking into until I hear back. I validated that the watermark is set correctly in CombinedWatermarkStatus. *Tools:* - Flink 1.14.3 - Scala - DataStream API Any assistance would be great! Happy to provide more context or clarification if something isn't clear! Thanks! Ryan van Huuksloot Data Developer | Data Platform Engineering | Streaming Capabilities [image: Shopify] <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>