Sorry for the hassle. I ended up working with a colleague and found that
the Kafka Source had a single partition but the pipeline had a
parallelism of 4 and there was no withIdleness associated so the Watermark
was set on the output but didn't persist to the operator.

Appreciate you both taking time - Thanks!

Closing this ticket

Ryan van Huuksloot
Data Developer | Data Platform Engineering | Streaming Capabilities
[image: Shopify]
<https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>


On Mon, Apr 4, 2022 at 8:18 AM Arvid Heise <ar...@apache.org> wrote:

> This sounds like a bug indeed. Could you please create a ticket with a
> minimal test case?
>
> The workaround is probably to use #aggregate but it should be fixed
> nonetheless.
>
> On Fri, Apr 1, 2022 at 9:58 AM r pp <pr123sha...@gmail.com> wrote:
>
>> hi~ Can you send your full code ?
>>
>> Ryan van Huuksloot <ryan.vanhuuksl...@shopify.com> 于2022年3月31日周四 22:58写道:
>>
>>> Hello!
>>>
>>> *Problem:*
>>> I am connecting to a Kafka Source with the Watermark Strategy below.
>>>
>>> val watermarkStrategy = WatermarkStrategy
>>>   .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS))
>>>   .withTimestampAssigner(new 
>>> SerializableTimestampAssigner[StarscreamEventCounter_V1] {
>>>     override def extractTimestamp(element: StarscreamEventCounter_V1, 
>>> recordTimestamp: Long): Long =
>>>       element.envelopeTimestamp
>>>   })
>>>
>>> The Watermarks are correctly getting assigned.
>>> However, when a reduce function is used the window never terminates
>>> because the `ctx.getCurrentWatermark()` returns the default value of
>>> `-9223372036854775808` in perpetuity.
>>>
>>> This is the stream code:
>>>
>>> stream
>>>   .windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).reduce(_ + _)
>>>
>>> The reduce uses this overloaded operator:
>>>
>>> @JsonIgnoreProperties(ignoreUnknown = true)
>>> case class StarscreamEventCounter_V1(
>>>   envelopeTimestamp: Long,
>>>   numberOfEvents: Int = 1
>>> ) {
>>>   def +(that: StarscreamEventCounter_V1): StarscreamEventCounter_V1 = {
>>>     
>>> StarscreamEventCounter_V1(this.envelopeTimestamp.min(that.envelopeTimestamp),
>>>  that.numberOfEvents + this.numberOfEvents)
>>>   }
>>>
>>>
>>> *Attempt to Solve:*
>>> 1. Validate that the Watermark is set on the source
>>>     a. Set a custom trigger to emit a watermark on each event just in
>>> case
>>> 2. Test with aggregate / process functions
>>>     a. Both other functions work properly - window closes and emits to a
>>> PrintSink
>>> 3. Change Watermark Generator to a custom generator
>>>     a. Also change time horizons and let run for 1 day - window never
>>> closes due to the watermark being stuck at the min default. The sink never
>>> receives the data but the UI says there are records being output!
>>>
>>> *Hypothesis:*
>>> The output id of a reduce function is causing an upstream issue where
>>> the watermark is no longer assigned to the Window. I haven't been able to
>>> lock down what exactly is causing the issue though. My thought is that it
>>> might be a bug given it works for Aggregate/Process.
>>> It could be a bug in the IndexedCombinedWatermarkStatus, the partial
>>> watermarks should not be the min default value when I set the watermark per
>>> event - this is what I will be looking into until I hear back. I validated
>>> that the watermark is set correctly in CombinedWatermarkStatus.
>>>
>>> *Tools:*
>>> - Flink 1.14.3
>>> - Scala
>>> - DataStream API
>>>
>>> Any assistance would be great! Happy to provide more context or
>>> clarification if something isn't clear!
>>>
>>> Thanks!
>>>
>>> Ryan van Huuksloot
>>> Data Developer | Data Platform Engineering | Streaming Capabilities
>>> [image: Shopify]
>>> <https://www.shopify.com/?utm_medium=salessignatures&utm_source=hs_email>
>>>
>>
>>
>> --
>> Best,
>>   pp
>>
>

Reply via email to