
ASF GitHub Bot commented on FLINK-5874:

Github user zentol commented on a diff in the pull request:

    --- Diff: 
    @@ -114,9 +121,53 @@ public KeyedStream(DataStream<T> dataStream, 
KeySelector<T, KEY> keySelector, Ty
                                new KeyGroupStreamPartitioner<>(keySelector, 
                this.keySelector = keySelector;
    -           this.keyType = keyType;
    +           this.keyType = validateKeyType(keyType);
    +   private TypeInformation<KEY> validateKeyType(TypeInformation<KEY> 
keyType) {
    +           Stack<TypeInformation<?>> stack = new Stack<>();
    +           stack.push(keyType);
    +           while (!stack.isEmpty()) {
    +                   TypeInformation<?> typeInfo = stack.pop();
    +                   if (!validateKeyTypeIsHashable(typeInfo)) {
    +                           throw new InvalidProgramException("This type (" 
+ keyType + ") cannot be used as key.");
    +                   }
    +                   if (typeInfo instanceof TupleTypeInfoBase) {
    +                           for (int i = 0; i < typeInfo.getArity(); i++) {
    +                                   stack.push(((TupleTypeInfoBase) 
    +                           }
    +                   }
    +           }
    +           return keyType;
    +   }
    +   /**
    +    * Validates that a given type of element (as encoded by the provided 
{@link TypeInformation}) can be
    +    * used as a key in the {@code DataStream.keyBy()} operation.
    +    *
    +    * @return {@code false} if:
    --- End diff --
    I would shorten this to read ```returns true if the type overrides the 
hashcode implementation```. The details can be container in the general javadoc 
of the method.

> Reject arrays as keys in DataStream API to avoid inconsistent hashing
> ---------------------------------------------------------------------
>                 Key: FLINK-5874
>                 URL: https://issues.apache.org/jira/browse/FLINK-5874
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0, 1.1.4
>            Reporter: Robert Metzger
>            Assignee: Kostas Kloudas
>            Priority: Blocker
> This issue has been reported on the mailing list twice:
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html
> - 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html
> The problem is the following: We are using just Key[].hashCode() to compute 
> the hash when shuffling data. Java's default hashCode() implementation 
> doesn't take the arrays contents into account, but the memory address.
> This leads to different hash code on the sender and receiver side.
> In Flink 1.1 this means that the data is shuffled randomly and not keyed, and 
> in Flink 1.2 the keygroups code detect a violation of the hashing.
> The proper fix of the problem would be to rely on Flink's {{TypeComparator}} 
> class, which has a type-specific hashing function. But introducing this 
> change would break compatibility with existing code.
> I'll file a JIRA for the 2.0 changes for that fix.
> For 1.2.1 and 1.3.0 we should at least reject arrays as keys.

This message was sent by Atlassian JIRA

Reply via email to