This sounds like a bug indeed. Could you please create a ticket with a
minimal test case?

The workaround is probably to use #aggregate but it should be fixed
nonetheless.

On Fri, Apr 1, 2022 at 9:58 AM r pp <pr123sha...@gmail.com> wrote:

> hi~ Can you send your full code ?
>
> Ryan van Huuksloot <ryan.vanhuuksl...@shopify.com> 于2022年3月31日周四 22:58写道:
>
>> Hello!
>>
>> *Problem:*
>> I am connecting to a Kafka Source with the Watermark Strategy below.
>>
>> val watermarkStrategy = WatermarkStrategy
>>   .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS))
>>   .withTimestampAssigner(new 
>> SerializableTimestampAssigner[StarscreamEventCounter_V1] {
>>     override def extractTimestamp(element: StarscreamEventCounter_V1, 
>> recordTimestamp: Long): Long =
>>       element.envelopeTimestamp
>>   })
>>
>> The Watermarks are correctly getting assigned.
>> However, when a reduce function is used the window never terminates
>> because the `ctx.getCurrentWatermark()` returns the default value of
>> `-9223372036854775808` in perpetuity.
>>
>> This is the stream code:
>>
>> stream
>>   .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _)
>>
>> The reduce uses this overloaded operator:
>>
>> @JsonIgnoreProperties(ignoreUnknown = true)
>> case class StarscreamEventCounter_V1(
>>   envelopeTimestamp: Long,
>>   numberOfEvents: Int = 1
>> ) {
>>   def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = {
>>     
>> StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp),
>>  that.numberOfEvents + this.numberOfEvents)
>>   }
>>
>>
>> *Attempt to Solve:*
>> 1. Validate that the Watermark is set on the source
>>     a. Set a custom trigger to emit a watermark on each event just in case
>> 2. Test with aggregate / process functions
>>     a. Both other functions work properly - window closes and emits to a
>> PrintSink
>> 3. Change Watermark Generator to a custom generator
>>     a. Also change time horizons and let run for 1 day - window never
>> closes due to the watermark being stuck at the min default. The sink never
>> receives the data but the UI says there are records being output!
>>
>> *Hypothesis:*
>> The output id of a reduce function is causing an upstream issue where the
>> watermark is no longer assigned to the Window. I haven't been able to lock
>> down what exactly is causing the issue though. My thought is that it might
>> be a bug given it works for Aggregate/Process.
>> It could be a bug in the IndexedCombinedWatermarkStatus, the partial
>> watermarks should not be the min default value when I set the watermark per
>> event - this is what I will be looking into until I hear back. I validated
>> that the watermark is set correctly in CombinedWatermarkStatus.
>>
>> *Tools:*
>> - Flink 1.14.3
>> - Scala
>> - DataStream API
>>
>> Any assistance would be great! Happy to provide more context or
>> clarification if something isn't clear!
>>
>> Thanks!
>>
>> Ryan van Huuksloot
>> Data Developer | Data Platform Engineering | Streaming Capabilities
>> [image: Shopify]
>> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
>>
>
>
> --
> Best,
>   pp
>

Reply via email to