hi~ Can you send your full code ? Ryan van Huuksloot <ryan.vanhuuksl...@shopify.com> 于2022年3月31日周四 22:58写道:
> Hello! > > *Problem:* > I am connecting to a Kafka Source with the Watermark Strategy below. > > val watermarkStrategy = WatermarkStrategy > .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS)) > .withTimestampAssigner(new > SerializableTimestampAssigner[StarscreamEventCounter_V1] { > override def extractTimestamp(element: StarscreamEventCounter_V1, > recordTimestamp: Long): Long = > element.envelopeTimestamp > }) > > The Watermarks are correctly getting assigned. > However, when a reduce function is used the window never terminates > because the `ctx.getCurrentWatermark()` returns the default value of > `-9223372036854775808` in perpetuity. > > This is the stream code: > > stream > .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _) > > The reduce uses this overloaded operator: > > @JsonIgnoreProperties(ignoreUnknown = true) > case class StarscreamEventCounter_V1( > envelopeTimestamp: Long, > numberOfEvents: Int = 1 > ) { > def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = { > > StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp), > that.numberOfEvents + this.numberOfEvents) > } > > > *Attempt to Solve:* > 1. Validate that the Watermark is set on the source > a. Set a custom trigger to emit a watermark on each event just in case > 2. Test with aggregate / process functions > a. Both other functions work properly - window closes and emits to a > PrintSink > 3. Change Watermark Generator to a custom generator > a. Also change time horizons and let run for 1 day - window never > closes due to the watermark being stuck at the min default. The sink never > receives the data but the UI says there are records being output! > > *Hypothesis:* > The output id of a reduce function is causing an upstream issue where the > watermark is no longer assigned to the Window. I haven't been able to lock > down what exactly is causing the issue though. My thought is that it might > be a bug given it works for Aggregate/Process. > It could be a bug in the IndexedCombinedWatermarkStatus, the partial > watermarks should not be the min default value when I set the watermark per > event - this is what I will be looking into until I hear back. I validated > that the watermark is set correctly in CombinedWatermarkStatus. > > *Tools:* > - Flink 1.14.3 > - Scala > - DataStream API > > Any assistance would be great! Happy to provide more context or > clarification if something isn't clear! > > Thanks! > > Ryan van Huuksloot > Data Developer | Data Platform Engineering | Streaming Capabilities > [image: Shopify] > <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email> > -- Best, pp