WatermarkStrategy.forBoundedOutOfOrderness(Duration.of(1,
ChronoUnit.MINUTES)) returns a WatermarkStrategy<T>, but the exact type
is entirely dependent on the variable declaration (i.e., it is not
dependent on any argument).
So, when you assign the strategy to a variable then the compiler can
infer the generic type. Without a variable it is treated as a
WatermarkStrategy<Object>, because there is nothing to infer the type from.
On 08/07/2020 08:54, Niels Basjes wrote:
Hi,
I'm migrating some of my code to Flink 1.11 and I ran into something I
find strange.
This works
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES));
watermarkStrategy
.withTimestampAssigner((SerializableTimestampAssigner<String>) (element,
recordTimestamp) -> 42L);
However this does NOT work
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness(Duration.of(1, ChronoUnit.MINUTES))
.withTimestampAssigner((SerializableTimestampAssigner<String>) (element,
recordTimestamp) -> 42L);
When I try to compile this last one I get
Error:(109, 13) java: no suitable method found for
withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String>)
method
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.TimestampAssignerSupplier<java.lang.Object>)
is not applicable
(argument mismatch;
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String>
cannot be converted to
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier<java.lang.Object>)
method
org.apache.flink.api.common.eventtime.WatermarkStrategy.withTimestampAssigner(org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.Object>)
is not applicable
(argument mismatch;
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.String>
cannot be converted to
org.apache.flink.api.common.eventtime.SerializableTimestampAssigner<java.lang.Object>)
Why is that?
--
Best regards / Met vriendelijke groeten,
Niels Basjes