> Hello!
> *Problem:*
> I am connecting to a Kafka Source with the Watermark Strategy below.
> val watermarkStrategy = WatermarkStrategy
>   .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS))
>   .withTimestampAssigner(new 
> SerializableTimestampAssigner[StarscreamEventCounter_V1] {
>     override def extractTimestamp(element: StarscreamEventCounter_V1, 
> recordTimestamp: Long): Long =
>       element.envelopeTimestamp
>   })
> The Watermarks are correctly getting assigned.
> However, when a reduce function is used the window never terminates
> because the `ctx.getCurrentWatermark()` returns the default value of
> `-9223372036854775808` in perpetuity.
> This is the stream code:
> stream
>   .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _)
> The reduce uses this overloaded operator:
> @JsonIgnoreProperties(ignoreUnknown = true)
> case class StarscreamEventCounter_V1(
>   envelopeTimestamp: Long,
>   numberOfEvents: Int = 1
> ) {
>   def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = {
> StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp), 
> that.numberOfEvents + this.numberOfEvents)
>   }
> *Attempt to Solve:*
> 1. Validate that the Watermark is set on the source
>     a. Set a custom trigger to emit a watermark on each event just in case
> 2. Test with aggregate / process functions
>     a. Both other functions work properly - window closes and emits to a
> PrintSink
> 3. Change Watermark Generator to a custom generator
>     a. Also change time horizons and let run for 1 day - window never
> closes due to the watermark being stuck at the min default. The sink never
> receives the data but the UI says there are records being output!
> *Hypothesis:*
> The output id of a reduce function is causing an upstream issue where the
> watermark is no longer assigned to the Window. I haven't been able to lock
> down what exactly is causing the issue though. My thought is that it might
> be a bug given it works for Aggregate/Process.
> It could be a bug in the IndexedCombinedWatermarkStatus, the partial
> watermarks should not be the min default value when I set the watermark per
> event - this is what I will be looking into until I hear back. I validated
> that the watermark is set correctly in CombinedWatermarkStatus.
> *Tools:*
> - Flink 1.14.3
> - Scala
> - DataStream API
> Any assistance would be great! Happy to provide more context or
> clarification if something isn't clear!
> Thanks!
