[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903626#comment-15903626 ]
ASF GitHub Bot commented on FLINK-5874: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105238457 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java --- @@ -906,6 +919,256 @@ public void testChannelSelectors() { } ///////////////////////////////////////////////////////////// + // KeyBy testing + ///////////////////////////////////////////////////////////// + + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + @Test + public void testPrimitiveArrayKeyRejection() { + + KeySelector<Tuple2<Integer[], String>, int[]> keySelector = + new KeySelector<Tuple2<Integer[], String>, int[]>() { + + @Override + public int[] getKey(Tuple2<Integer[], String> value) throws Exception { + int[] ks = new int[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = value.f0[i]; + } + return ks; + } + }; + + testKeyRejection(keySelector, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + } + + @Test + public void testBasicArrayKeyRejection() { + + KeySelector<Tuple2<Integer[], String>, Integer[]> keySelector = + new KeySelector<Tuple2<Integer[], String>, Integer[]>() { + + @Override + public Integer[] getKey(Tuple2<Integer[], String> value) throws Exception { + return value.f0; + } + }; + + testKeyRejection(keySelector, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO); + } + + @Test + public void testObjectArrayKeyRejection() { + + KeySelector<Tuple2<Integer[], String>, TestClass[]> keySelector = + new KeySelector<Tuple2<Integer[], String>, TestClass[]>() { + + @Override + public TestClass[] getKey(Tuple2<Integer[], String> value) throws Exception { + TestClass[] ks = new TestClass[value.f0.length]; + for (int i = 0; i < ks.length; i++) { + ks[i] = new TestClass(value.f0[i]); + } + return ks; + } + }; + + ObjectArrayTypeInfo<TestClass[], TestClass> keyTypeInfo = ObjectArrayTypeInfo.getInfoFor( + TestClass[].class, new GenericTypeInfo<>(TestClass.class)); + + testKeyRejection(keySelector, keyTypeInfo); + } + + private <K> void testKeyRejection(KeySelector<Tuple2<Integer[], String>, K> keySelector, TypeInformation<K> expectedKeyType) { + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<Integer[], String>> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "barfoo") + ); + + Assert.assertEquals(expectedKeyType, TypeExtractor.getKeySelectorTypes(keySelector, input.getType())); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedKeyType + ") cannot be used as key."); + + input.keyBy(keySelector); + } + + // composite key tests : POJOs + + @Test + public void testPOJONestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<POJOwithHashCode> input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + TypeInformation<?> expectedTypeInfo = new TupleTypeInfo<Tuple1<int[]>>( + PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy("id"); + } + + @Test + public void testNestedArrayWorkArround() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<POJOwithHashCode> input = env.fromElements( + new POJOwithHashCode(new int[] {1, 2})); + + input.keyBy(new KeySelector<POJOwithHashCode, POJOwithHashCode>() { + @Override + public POJOwithHashCode getKey(POJOwithHashCode value) throws Exception { + return value; + } + }).addSink(new SinkFunction<POJOwithHashCode>() { + @Override + public void invoke(POJOwithHashCode value) throws Exception { + Assert.assertEquals(value.getId(), new int[]{1, 2}); + } + }); + } + + @Test + public void testPOJOnoHashCodeKeyRejection() { + + KeySelector<POJOnoHashCode, POJOnoHashCode> keySelector = + new KeySelector<POJOnoHashCode, POJOnoHashCode>() { + @Override + public POJOnoHashCode getKey(POJOnoHashCode value) throws Exception { + return value; + } + }; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<POJOnoHashCode> input = env.fromElements( + new POJOnoHashCode(new int[] {1, 2})); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + + input.keyBy(keySelector); + } + + // composite key tests : Tuples + + @Test + public void testTupleNestedArrayKeyRejection() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStream<Tuple2<Integer[], String>> input = env.fromElements( + new Tuple2<>(new Integer[] {1, 2}, "test-test")); + + TypeInformation<?> expectedTypeInfo = new TupleTypeInfo<Tuple2<Integer[], String>>( + BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); + + // adjust the rule + expectedException.expect(InvalidProgramException.class); + expectedException.expectMessage("This type (" + expectedTypeInfo + ") cannot be used as key."); + + input.keyBy(new KeySelector<Tuple2<Integer[],String>, Tuple2<Integer[],String>>() { + @Override + public Tuple2<Integer[], String> getKey(Tuple2<Integer[], String> value) throws Exception { + return value; + } + }); + } + + @Test + public void testPrimitiveKeyRejection() throws Exception { --- End diff -- what do we actually verify here? > Reject arrays as keys in DataStream API to avoid inconsistent hashing > --------------------------------------------------------------------- > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0, 1.1.4 > Reporter: Robert Metzger > Assignee: Kostas Kloudas > Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)