[ https://issues.apache.org/jira/browse/FLINK-5874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15903627#comment-15903627 ]
ASF GitHub Bot commented on FLINK-5874: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/3501#discussion_r105240820 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java --- @@ -114,9 +121,53 @@ public KeyedStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector, Ty dataStream.getTransformation(), new KeyGroupStreamPartitioner<>(keySelector, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM))); this.keySelector = keySelector; - this.keyType = keyType; + this.keyType = validateKeyType(keyType); } - + + private TypeInformation<KEY> validateKeyType(TypeInformation<KEY> keyType) { + Stack<TypeInformation<?>> stack = new Stack<>(); + stack.push(keyType); + + while (!stack.isEmpty()) { + TypeInformation<?> typeInfo = stack.pop(); + + if (!validateKeyTypeIsHashable(typeInfo)) { + throw new InvalidProgramException("This type (" + keyType + ") cannot be used as key."); + } + + if (typeInfo instanceof TupleTypeInfoBase) { + for (int i = 0; i < typeInfo.getArity(); i++) { + stack.push(((TupleTypeInfoBase) typeInfo).getTypeAt(i)); + } + } + } + return keyType; + } + + /** + * Validates that a given type of element (as encoded by the provided {@link TypeInformation}) can be + * used as a key in the {@code DataStream.keyBy()} operation. + * + * @return {@code false} if: --- End diff -- I would shorten this to read ```returns true if the type overrides the hashcode implementation```. The details can be container in the general javadoc of the method. > Reject arrays as keys in DataStream API to avoid inconsistent hashing > --------------------------------------------------------------------- > > Key: FLINK-5874 > URL: https://issues.apache.org/jira/browse/FLINK-5874 > Project: Flink > Issue Type: Bug > Components: DataStream API > Affects Versions: 1.2.0, 1.1.4 > Reporter: Robert Metzger > Assignee: Kostas Kloudas > Priority: Blocker > > This issue has been reported on the mailing list twice: > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Previously-working-job-fails-on-Flink-1-2-0-td11741.html > - > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Arrays-values-in-keyBy-td7530.html > The problem is the following: We are using just Key[].hashCode() to compute > the hash when shuffling data. Java's default hashCode() implementation > doesn't take the arrays contents into account, but the memory address. > This leads to different hash code on the sender and receiver side. > In Flink 1.1 this means that the data is shuffled randomly and not keyed, and > in Flink 1.2 the keygroups code detect a violation of the hashing. > The proper fix of the problem would be to rely on Flink's {{TypeComparator}} > class, which has a type-specific hashing function. But introducing this > change would break compatibility with existing code. > I'll file a JIRA for the 2.0 changes for that fix. > For 1.2.1 and 1.3.0 we should at least reject arrays as keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)