[ https://issues.apache.org/jira/browse/FLINK-6423?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aljoscha Krettek updated FLINK-6423: ------------------------------------ Description: JoinedStreams.WithWindow.apply(...) can get a JoinFunction as a parameter, but not an equivalent lambda. Consider the following code, runWithFunction completes successfully, while runWithLambda throws "java.lang.ArrayIndexOutOfBoundsException: -1" As this might look like a very minor issue, the exception is not clear and might cause a developer to spend precious time while looking for the cause {code} public void runWithFunction() throws Exception { StreamExecutionEnvironment env = LocalStreamEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<String> stream1 = env.fromElements("a", "b", "c"); DataStream<String> stream2 = env.fromElements("A", "B", "C"); DataStream<String> joined = stream1.join(stream2) .where(String::toLowerCase).equalTo(String::toLowerCase) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))) .apply(new JoinFunction<String, String, String>() { @Override public String join(String s1, String s2) { return s1 + "_" + s2; } }); joined.print(); env.execute(); } public void runWithLambda() throws Exception { StreamExecutionEnvironment env = LocalStreamEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<String> stream1 = env.fromElements("a", "b", "c"); DataStream<String> stream2 = env.fromElements("A", "B", "C"); DataStream<String> joined = stream1.join(stream2) .where(String::toLowerCase).equalTo(String::toLowerCase) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))) .apply((JoinFunction<String, String, String>) (s1, s2) -> s1 + "_" + s2); joined.print(); env.execute(); } {code} was: JoinedStreams.WithWindow.apply(...) can get a JoinFunction as a parameter, but not an equivalent lambda. Consider the following code, runWithFunction completes successfully, while runWithLambda throws "java.lang.ArrayIndexOutOfBoundsException: -1" As this might look like a very minor issue, the exception is not clear and might cause a developer to spend precious time while looking for the cause public void runWithFunction() throws Exception { StreamExecutionEnvironment env = LocalStreamEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<String> stream1 = env.fromElements("a", "b", "c"); DataStream<String> stream2 = env.fromElements("A", "B", "C"); DataStream<String> joined = stream1.join(stream2) .where(String::toLowerCase).equalTo(String::toLowerCase) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))) .apply(new JoinFunction<String, String, String>() { @Override public String join(String s1, String s2) { return s1 + "_" + s2; } }); joined.print(); env.execute(); } public void runWithLambda() throws Exception { StreamExecutionEnvironment env = LocalStreamEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<String> stream1 = env.fromElements("a", "b", "c"); DataStream<String> stream2 = env.fromElements("A", "B", "C"); DataStream<String> joined = stream1.join(stream2) .where(String::toLowerCase).equalTo(String::toLowerCase) .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))) .apply((JoinFunction<String, String, String>) (s1, s2) -> s1 + "_" + s2); joined.print(); env.execute(); } > JoinFunction can not be replaced with lambda > -------------------------------------------- > > Key: FLINK-6423 > URL: https://issues.apache.org/jira/browse/FLINK-6423 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0 > Reporter: Moshe Sayag > Priority: Minor > > JoinedStreams.WithWindow.apply(...) can get a JoinFunction as a parameter, > but not an equivalent lambda. > Consider the following code, runWithFunction completes successfully, while > runWithLambda throws "java.lang.ArrayIndexOutOfBoundsException: -1" > As this might look like a very minor issue, the exception is not clear and > might cause a developer to spend precious time while looking for the cause > {code} > public void runWithFunction() throws Exception { > StreamExecutionEnvironment env = > LocalStreamEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); > DataStream<String> stream1 = env.fromElements("a", "b", "c"); > DataStream<String> stream2 = env.fromElements("A", "B", "C"); > DataStream<String> joined = stream1.join(stream2) > .where(String::toLowerCase).equalTo(String::toLowerCase) > .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))) > .apply(new JoinFunction<String, String, String>() { > @Override > public String join(String s1, String s2) { > return s1 + "_" + s2; > } > }); > joined.print(); > env.execute(); > } > public void runWithLambda() throws Exception { > StreamExecutionEnvironment env = > LocalStreamEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); > DataStream<String> stream1 = env.fromElements("a", "b", "c"); > DataStream<String> stream2 = env.fromElements("A", "B", "C"); > DataStream<String> joined = stream1.join(stream2) > .where(String::toLowerCase).equalTo(String::toLowerCase) > .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1))) > .apply((JoinFunction<String, String, String>) (s1, s2) -> s1 > + "_" + s2); > joined.print(); > env.execute(); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)