This sounds like a bug indeed. Could you please create a ticket with a minimal test case?
The workaround is probably to use #aggregate but it should be fixed nonetheless. On Fri, Apr 1, 2022 at 9:58 AM r pp <pr123sha...@gmail.com> wrote: > hi~ Can you send your full code ? > > Ryan van Huuksloot <ryan.vanhuuksl...@shopify.com> 于2022年3月31日周四 22:58写道: > >> Hello! >> >> *Problem:* >> I am connecting to a Kafka Source with the Watermark Strategy below. >> >> val watermarkStrategy = WatermarkStrategy >> .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS)) >> .withTimestampAssigner(new >> SerializableTimestampAssigner[StarscreamEventCounter_V1] { >> override def extractTimestamp(element: StarscreamEventCounter_V1, >> recordTimestamp: Long): Long = >> element.envelopeTimestamp >> }) >> >> The Watermarks are correctly getting assigned. >> However, when a reduce function is used the window never terminates >> because the `ctx.getCurrentWatermark()` returns the default value of >> `-9223372036854775808` in perpetuity. >> >> This is the stream code: >> >> stream >> .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _) >> >> The reduce uses this overloaded operator: >> >> @JsonIgnoreProperties(ignoreUnknown = true) >> case class StarscreamEventCounter_V1( >> envelopeTimestamp: Long, >> numberOfEvents: Int = 1 >> ) { >> def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = { >> >> StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp), >> that.numberOfEvents + this.numberOfEvents) >> } >> >> >> *Attempt to Solve:* >> 1. Validate that the Watermark is set on the source >> a. Set a custom trigger to emit a watermark on each event just in case >> 2. Test with aggregate / process functions >> a. Both other functions work properly - window closes and emits to a >> PrintSink >> 3. Change Watermark Generator to a custom generator >> a. Also change time horizons and let run for 1 day - window never >> closes due to the watermark being stuck at the min default. The sink never >> receives the data but the UI says there are records being output! >> >> *Hypothesis:* >> The output id of a reduce function is causing an upstream issue where the >> watermark is no longer assigned to the Window. I haven't been able to lock >> down what exactly is causing the issue though. My thought is that it might >> be a bug given it works for Aggregate/Process. >> It could be a bug in the IndexedCombinedWatermarkStatus, the partial >> watermarks should not be the min default value when I set the watermark per >> event - this is what I will be looking into until I hear back. I validated >> that the watermark is set correctly in CombinedWatermarkStatus. >> >> *Tools:* >> - Flink 1.14.3 >> - Scala >> - DataStream API >> >> Any assistance would be great! Happy to provide more context or >> clarification if something isn't clear! >> >> Thanks! >> >> Ryan van Huuksloot >> Data Developer | Data Platform Engineering | Streaming Capabilities >> [image: Shopify] >> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> >> > > > -- > Best, > pp >