Hello!

*Problem:*
I am connecting to a Kafka Source with the Watermark Strategy below.

val watermarkStrategy = WatermarkStrategy
  .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS))
  .withTimestampAssigner(new
SerializableTimestampAssigner[StarscreamEventCounter_V1] {
    override def extractTimestamp(element: StarscreamEventCounter_V1,
recordTimestamp: Long): Long =
      element.envelopeTimestamp
  })

The Watermarks are correctly getting assigned.
However, when a reduce function is used the window never terminates
because the `ctx.getCurrentWatermark()` returns the default value of
`-9223372036854775808` in perpetuity.

This is the stream code:

stream
  .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _)

The reduce uses this overloaded operator:

@JsonIgnoreProperties(ignoreUnknown = true)
case class StarscreamEventCounter_V1(
  envelopeTimestamp: Long,
  numberOfEvents: Int = 1
) {
  def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = {
    
StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp),
that.numberOfEvents + this.numberOfEvents)
  }


*Attempt to Solve:*
1. Validate that the Watermark is set on the source
    a. Set a custom trigger to emit a watermark on each event just in case
2. Test with aggregate / process functions
    a. Both other functions work properly - window closes and emits to a
PrintSink
3. Change Watermark Generator to a custom generator
    a. Also change time horizons and let run for 1 day - window never
closes due to the watermark being stuck at the min default. The sink never
receives the data but the UI says there are records being output!

*Hypothesis:*
The output id of a reduce function is causing an upstream issue where the
watermark is no longer assigned to the Window. I haven't been able to lock
down what exactly is causing the issue though. My thought is that it might
be a bug given it works for Aggregate/Process.
It could be a bug in the IndexedCombinedWatermarkStatus, the partial
watermarks should not be the min default value when I set the watermark per
event - this is what I will be looking into until I hear back. I validated
that the watermark is set correctly in CombinedWatermarkStatus.

*Tools:*
- Flink 1.14.3
- Scala
- DataStream API

Any assistance would be great! Happy to provide more context or
clarification if something isn't clear!

Thanks!

Ryan van Huuksloot
Data Developer | Data Platform Engineering | Streaming Capabilities
[image: Shopify]
<https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>

Reply via email to