Re: [DataStream API] Watermarks not closing TumblingEventTimeWindow with Reduce function

2022-04-29 Thread r pp
Glad to hear it! thank you for feedback ! Ryan van Huuksloot 于2022年4月5日周二 06:26写道: > Sorry for the hassle. I ended up working with a colleague and found that > the Kafka Source had a single partition but the pipeline had a > parallelism of 4 and there was no withIdleness associated so the Waterm

Re: [DataStream API] Watermarks not closing TumblingEventTimeWindow with Reduce function

2022-04-04 Thread Ryan van Huuksloot
Sorry for the hassle. I ended up working with a colleague and found that the Kafka Source had a single partition but the pipeline had a parallelism of 4 and there was no withIdleness associated so the Watermark was set on the output but didn't persist to the operator. Appreciate you both taking ti

Re: [DataStream API] Watermarks not closing TumblingEventTimeWindow with Reduce function

2022-04-04 Thread Arvid Heise
This sounds like a bug indeed. Could you please create a ticket with a minimal test case? The workaround is probably to use #aggregate but it should be fixed nonetheless. On Fri, Apr 1, 2022 at 9:58 AM r pp wrote: > hi~ Can you send your full code ? > > Ryan van Huuksloot 于2022年3月31日周四 22:58写道

Re: [DataStream API] Watermarks not closing TumblingEventTimeWindow with Reduce function

2022-04-01 Thread r pp
hi~ Can you send your full code ? Ryan van Huuksloot 于2022年3月31日周四 22:58写道: > Hello! > > *Problem:* > I am connecting to a Kafka Source with the Watermark Strategy below. > > val watermarkStrategy = WatermarkStrategy > .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS)) > .withTimest

[DataStream API] Watermarks not closing TumblingEventTimeWindow with Reduce function

2022-03-31 Thread Ryan van Huuksloot
Hello! *Problem:* I am connecting to a Kafka Source with the Watermark Strategy below. val watermarkStrategy = WatermarkStrategy .forBoundedOutOfOrderness(Duration.of(2, ChronoUnit.HOURS)) .withTimestampAssigner(new SerializableTimestampAssigner[StarscreamEventCounter_V1] { override def e