Sorry for the hassle. I ended up working with a colleague and found that the Kafka Source had a single partition but the pipeline had a parallelism of 4 and there was no withIdleness associated so the Watermark was set on the output but didn't persist to the operator.
Appreciate you both taking time - Thanks! Closing this ticket Ryan van Huuksloot Data Developer | Data Platform Engineering | Streaming Capabilities [image: Shopify] <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> On Mon, Apr 4, 2022 at 8:18 AM Arvid Heise <ar...@apache.org> wrote: > This sounds like a bug indeed. Could you please create a ticket with a > minimal test case? > > The workaround is probably to use #aggregate but it should be fixed > nonetheless. > > On Fri, Apr 1, 2022 at 9:58 AM r pp <pr123sha...@gmail.com> wrote: > >> hi~ Can you send your full code ? >> >> Ryan van Huuksloot <ryan.vanhuuksl...@shopify.com> 于2022年3月31日周四 22:58写道: >> >>> Hello! >>> >>> *Problem:* >>> I am connecting to a Kafka Source with the Watermark Strategy below. >>> >>> val watermarkStrategy = WatermarkStrategy >>> .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS)) >>> .withTimestampAssigner(new >>> SerializableTimestampAssigner[StarscreamEventCounter_V1] { >>> override def extractTimestamp(element: StarscreamEventCounter_V1, >>> recordTimestamp: Long): Long = >>> element.envelopeTimestamp >>> }) >>> >>> The Watermarks are correctly getting assigned. >>> However, when a reduce function is used the window never terminates >>> because the `ctx.getCurrentWatermark()` returns the default value of >>> `-9223372036854775808` in perpetuity. >>> >>> This is the stream code: >>> >>> stream >>> .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _) >>> >>> The reduce uses this overloaded operator: >>> >>> @JsonIgnoreProperties(ignoreUnknown = true) >>> case class StarscreamEventCounter_V1( >>> envelopeTimestamp: Long, >>> numberOfEvents: Int = 1 >>> ) { >>> def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = { >>> >>> StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp), >>> that.numberOfEvents + this.numberOfEvents) >>> } >>> >>> >>> *Attempt to Solve:* >>> 1. Validate that the Watermark is set on the source >>> a. Set a custom trigger to emit a watermark on each event just in >>> case >>> 2. Test with aggregate / process functions >>> a. Both other functions work properly - window closes and emits to a >>> PrintSink >>> 3. Change Watermark Generator to a custom generator >>> a. Also change time horizons and let run for 1 day - window never >>> closes due to the watermark being stuck at the min default. The sink never >>> receives the data but the UI says there are records being output! >>> >>> *Hypothesis:* >>> The output id of a reduce function is causing an upstream issue where >>> the watermark is no longer assigned to the Window. I haven't been able to >>> lock down what exactly is causing the issue though. My thought is that it >>> might be a bug given it works for Aggregate/Process. >>> It could be a bug in the IndexedCombinedWatermarkStatus, the partial >>> watermarks should not be the min default value when I set the watermark per >>> event - this is what I will be looking into until I hear back. I validated >>> that the watermark is set correctly in CombinedWatermarkStatus. >>> >>> *Tools:* >>> - Flink 1.14.3 >>> - Scala >>> - DataStream API >>> >>> Any assistance would be great! Happy to provide more context or >>> clarification if something isn't clear! >>> >>> Thanks! >>> >>> Ryan van Huuksloot >>> Data Developer | Data Platform Engineering | Streaming Capabilities >>> [image: Shopify] >>> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> >>> >> >> >> -- >> Best, >> pp >> >