[ https://issues.apache.org/jira/browse/FLINK-10167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589125#comment-16589125 ]
Andrew Roberts commented on FLINK-10167: ---------------------------------------- Interesting - I was making several assumptions that turned out to be wrong. I follow you now on the value-vs-object (AnyVal vs AnyRef) split. The underlying issue I was seeing was due to keying by a string that I thought had to be immutable because it contained only immutable objects and case classes, but changing the key to a "more immutable" version somehow fixed the issue. Also, it's worth noting that switching the stream to a tuple of (key, value) also worked to "solve" the problem, if I keyed that stream by the first tuple element. I don't quite understand why, since it was the same string that didn't work when used directly as the key, but perhaps some kind of evaluation order weirdness was in play. In any case, I've gotten my job to where I want it to be - thanks for your help! > SessionWindows not compatible with typed DataStreams in scala > ------------------------------------------------------------- > > Key: FLINK-10167 > URL: https://issues.apache.org/jira/browse/FLINK-10167 > Project: Flink > Issue Type: Bug > Reporter: Andrew Roberts > Priority: Major > > I'm trying to construct a trivial job that uses session windows, and it looks > like the data type parameter is hardcoded to `Object`/`AnyRef`. Due to the > invariance of java classes in scala, this means that we can't use the > provided SessionWindow helper classes in scala on typed streams. > > Example job: > {code:java} > import org.apache.flink.api.scala._ > import org.apache.flink.streaming.api.scala.{DataStream, > StreamExecutionEnvironment} > import > org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows > import org.apache.flink.streaming.api.windowing.time.Time > import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window} > import org.apache.flink.util.Collector > object TestJob { > val jobName = "TestJob" > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.fromCollection(Range(0, 100).toList) > .keyBy(_ / 10) > .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))) > .reduce( > (a: Int, b: Int) => a + b, > (key: Int, window: Window, items: Iterable[Int], out: > Collector[String]) => s"${key}: ${items}" > ) > .map(println(_)) > env.execute(jobName) > } > }{code} > > Compile error: > {code:java} > [error] found : > org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows > [error] required: > org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[_ >: Int, ?] > [error] Note: Object <: Any (and > org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows > <: > org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner[Object,org.apache.flink.streaming.api.windowing.windows.TimeWindow]), > but Java-defined class WindowAssigner is invariant in type T. > [error] You may wish to investigate a wildcard type such as `_ <: Any`. (SLS > 3.2.10) > [error] > .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)