[ 
https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903637#comment-15903637
 ] 

ASF GitHub Bot commented on FLINK-5874:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3501#discussion_r105242176
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
 ---
    @@ -906,6 +919,256 @@ public void testChannelSelectors() {
        }
     
        /////////////////////////////////////////////////////////////
    +   // KeyBy testing
    +   /////////////////////////////////////////////////////////////
    +
    +   @Rule
    +   public ExpectedException expectedException = ExpectedException.none();
    +
    +   @Test
    +   public void testPrimitiveArrayKeyRejection() {
    +
    +           KeySelector<Tuple2<Integer[], String>, int[]> keySelector =
    +                           new KeySelector<Tuple2<Integer[], String>, 
int[]>() {
    +
    +                   @Override
    +                   public int[] getKey(Tuple2<Integer[], String> value) 
throws Exception {
    +                           int[] ks = new int[value.f0.length];
    +                           for (int i = 0; i < ks.length; i++) {
    +                                   ks[i] = value.f0[i];
    +                           }
    +                           return ks;
    +                   }
    +           };
    +
    +           testKeyRejection(keySelector, 
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
    +   }
    +
    +   @Test
    +   public void testBasicArrayKeyRejection() {
    +
    +           KeySelector<Tuple2<Integer[], String>, Integer[]> keySelector =
    +                           new KeySelector<Tuple2<Integer[], String>, 
Integer[]>() {
    +
    +                   @Override
    +                   public Integer[] getKey(Tuple2<Integer[], String> 
value) throws Exception {
    +                           return value.f0;
    +                   }
    +           };
    +
    +           testKeyRejection(keySelector, 
BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO);
    +   }
    +
    +   @Test
    +   public void testObjectArrayKeyRejection() {
    +
    +           KeySelector<Tuple2<Integer[], String>, TestClass[]> keySelector 
=
    +                           new KeySelector<Tuple2<Integer[], String>, 
TestClass[]>() {
    +
    +                                   @Override
    +                                   public TestClass[] 
getKey(Tuple2<Integer[], String> value) throws Exception {
    +                                           TestClass[] ks = new 
TestClass[value.f0.length];
    +                                           for (int i = 0; i < ks.length; 
i++) {
    +                                                   ks[i] = new 
TestClass(value.f0[i]);
    +                                           }
    +                                           return ks;
    +                                   }
    +                           };
    +
    +           ObjectArrayTypeInfo<TestClass[], TestClass> keyTypeInfo = 
ObjectArrayTypeInfo.getInfoFor(
    +                           TestClass[].class, new 
GenericTypeInfo<>(TestClass.class));
    +
    +           testKeyRejection(keySelector, keyTypeInfo);
    +   }
    +
    +   private <K> void testKeyRejection(KeySelector<Tuple2<Integer[], 
String>, K> keySelector, TypeInformation<K> expectedKeyType) {
    +
    +           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +           DataStream<Tuple2<Integer[], String>> input = env.fromElements(
    +                           new Tuple2<>(new Integer[] {1, 2}, "barfoo")
    +           );
    +
    +           Assert.assertEquals(expectedKeyType, 
TypeExtractor.getKeySelectorTypes(keySelector, input.getType()));
    +
    +           // adjust the rule
    +           expectedException.expect(InvalidProgramException.class);
    +           expectedException.expectMessage("This type (" + expectedKeyType 
+ ") cannot be used as key.");
    +
    +           input.keyBy(keySelector);
    +   }
    +
    +   // composite key tests : POJOs
    +
    +   @Test
    +   public void testPOJONestedArrayKeyRejection() {
    +           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +           DataStream<POJOwithHashCode> input = env.fromElements(
    +                           new POJOwithHashCode(new int[] {1, 2}));
    +
    +           TypeInformation<?> expectedTypeInfo = new 
TupleTypeInfo<Tuple1<int[]>>(
    +                           
PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
    +
    +           // adjust the rule
    +           expectedException.expect(InvalidProgramException.class);
    +           expectedException.expectMessage("This type (" + 
expectedTypeInfo + ") cannot be used as key.");
    +
    +           input.keyBy("id");
    +   }
    +
    +   @Test
    +   public void testNestedArrayWorkArround() {
    +           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +           DataStream<POJOwithHashCode> input = env.fromElements(
    --- End diff --
    
    The focus of this test appears to be to verify that you can use an array as 
a key by wrapping it in a Pojo that implements HashCode. (based on the naming).
    
    We should probably focus more on the Pojo-with-hashCode implementation 
part; doesn't really matter what is contained. We are never checking that 
anyway.


> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> ---------------------------------------------------------------------
>
>                 Key: FLINK-5874
>                 URL: https://issues.apache.org/jira/browse/FLINK-5874
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0, 1.1.4
>            Reporter: Robert Metzger
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to