Andrew Roberts created FLINK-10167: -------------------------------------- Summary: SessionWindows not compatible with typed DataStreams in scala Key: FLINK-10167 URL: https://issues.apache.org/jira/browse/FLINK-10167 Project: Flink Issue Type: Bug Reporter: Andrew Roberts
I'm trying to construct a trivial job that uses session windows, and it looks like the data type parameter is hardcoded to `Object`/`AnyRef`. Due to the invariance of java classes in scala, this means that we can't use the provided SessionWindow helper classes in scala on typed streams. Example job: {code:java} import org.apache.flink.api.scala._ import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment} import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window} import org.apache.flink.util.Collector object TestJob { val jobName = "TestJob" def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.fromCollection(Range(0, 100).toList) .keyBy(_ / 10) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))) .reduce( (a: Int, b: Int) => a + b, (key: Int, window: Window, items: Iterable[Int], out: Collector[String]) => s"${key}: ${items}" ) .map(println(_)) env.execute(jobName) } }{code} Compile error: {code:java} [error] found : org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows [error] required: org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[_ >: Int, ?] [error] Note: Object <: Any (and org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows <: org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner[Object,org.apache.flink.streaming.api.windowing.windows.TimeWindow]), but Java-defined class WindowAssigner is invariant in type T. [error] You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10) [error] .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)