Glad to hear it! thank you for feedback ! Ryan van Huuksloot <ryan.vanhuuksl...@shopify.com> 于2022年4月5日周二 06:26写道:
> Sorry for the hassle. I ended up working with a colleague and found that > the Kafka Source had a single partition but the pipeline had a > parallelism of 4 and there was no withIdleness associated so the Watermark > was set on the output but didn't persist to the operator. > > Appreciate you both taking time - Thanks! > > Closing this ticket > > Ryan van Huuksloot > Data Developer | Data Platform Engineering | Streaming Capabilities > [image: Shopify] > <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> > > > On Mon, Apr 4, 2022 at 8:18 AM Arvid Heise <ar...@apache.org> wrote: > >> This sounds like a bug indeed. Could you please create a ticket with a >> minimal test case? >> >> The workaround is probably to use #aggregate but it should be fixed >> nonetheless. >> >> On Fri, Apr 1, 2022 at 9:58 AM r pp <pr123sha...@gmail.com> wrote: >> >>> hi~ Can you send your full code ? >>> >>> Ryan van Huuksloot <ryan.vanhuuksl...@shopify.com> 于2022年3月31日周四 >>> 22:58写道: >>> >>>> Hello! >>>> >>>> *Problem:* >>>> I am connecting to a Kafka Source with the Watermark Strategy below. >>>> >>>> val watermarkStrategy = WatermarkStrategy >>>> .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS)) >>>> .withTimestampAssigner(new >>>> SerializableTimestampAssigner[StarscreamEventCounter_V1] { >>>> override def extractTimestamp(element: StarscreamEventCounter_V1, >>>> recordTimestamp: Long): Long = >>>> element.envelopeTimestamp >>>> }) >>>> >>>> The Watermarks are correctly getting assigned. >>>> However, when a reduce function is used the window never terminates >>>> because the `ctx.getCurrentWatermark()` returns the default value of >>>> `-9223372036854775808` in perpetuity. >>>> >>>> This is the stream code: >>>> >>>> stream >>>> .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _) >>>> >>>> The reduce uses this overloaded operator: >>>> >>>> @JsonIgnoreProperties(ignoreUnknown = true) >>>> case class StarscreamEventCounter_V1( >>>> envelopeTimestamp: Long, >>>> numberOfEvents: Int = 1 >>>> ) { >>>> def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = { >>>> >>>> StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp), >>>> that.numberOfEvents + this.numberOfEvents) >>>> } >>>> >>>> >>>> *Attempt to Solve:* >>>> 1. Validate that the Watermark is set on the source >>>> a. Set a custom trigger to emit a watermark on each event just in >>>> case >>>> 2. Test with aggregate / process functions >>>> a. Both other functions work properly - window closes and emits to >>>> a PrintSink >>>> 3. Change Watermark Generator to a custom generator >>>> a. Also change time horizons and let run for 1 day - window never >>>> closes due to the watermark being stuck at the min default. The sink never >>>> receives the data but the UI says there are records being output! >>>> >>>> *Hypothesis:* >>>> The output id of a reduce function is causing an upstream issue where >>>> the watermark is no longer assigned to the Window. I haven't been able to >>>> lock down what exactly is causing the issue though. My thought is that it >>>> might be a bug given it works for Aggregate/Process. >>>> It could be a bug in the IndexedCombinedWatermarkStatus, the partial >>>> watermarks should not be the min default value when I set the watermark per >>>> event - this is what I will be looking into until I hear back. I validated >>>> that the watermark is set correctly in CombinedWatermarkStatus. >>>> >>>> *Tools:* >>>> - Flink 1.14.3 >>>> - Scala >>>> - DataStream API >>>> >>>> Any assistance would be great! Happy to provide more context or >>>> clarification if something isn't clear! >>>> >>>> Thanks! >>>> >>>> Ryan van Huuksloot >>>> Data Developer | Data Platform Engineering | Streaming Capabilities >>>> [image: Shopify] >>>> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> >>>> >>> >>> >>> -- >>> Best, >>> pp >>> >> -- Best, pp