
I am connecting to a Kafka Source with the Watermark Strategy below.

val watermarkStrategy = WatermarkStrategy
  .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS))
SerializableTimestampAssigner[StarscreamEventCounter_V1] {
    override def extractTimestamp(element: StarscreamEventCounter_V1,
recordTimestamp: Long): Long =

The Watermarks are correctly getting assigned.
However, when a reduce function is used the window never terminates
because the `ctx.getCurrentWatermark()` returns the default value of
`-9223372036854775808` in perpetuity.

This is the stream code:

  .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _)

The reduce uses this overloaded operator:

@JsonIgnoreProperties(ignoreUnknown = true)
case class StarscreamEventCounter_V1(
  envelopeTimestamp: Long,
  numberOfEvents: Int = 1
) {
  def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = {
that.numberOfEvents + this.numberOfEvents)

*Attempt to Solve:*
1. Validate that the Watermark is set on the source
    a. Set a custom trigger to emit a watermark on each event just in case
2. Test with aggregate / process functions
    a. Both other functions work properly - window closes and emits to a
3. Change Watermark Generator to a custom generator
    a. Also change time horizons and let run for 1 day - window never
closes due to the watermark being stuck at the min default. The sink never
receives the data but the UI says there are records being output!

The output id of a reduce function is causing an upstream issue where the
watermark is no longer assigned to the Window. I haven't been able to lock
down what exactly is causing the issue though. My thought is that it might
be a bug given it works for Aggregate/Process.
It could be a bug in the IndexedCombinedWatermarkStatus, the partial
watermarks should not be the min default value when I set the watermark per
event - this is what I will be looking into until I hear back. I validated
that the watermark is set correctly in CombinedWatermarkStatus.

- Flink 1.14.3
- Scala
- DataStream API

Any assistance would be great! Happy to provide more context or
clarification if something isn't clear!


