[ https://issues.apache.org/jira/browse/FLINK-2678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126635#comment-15126635 ]
ASF GitHub Bot commented on FLINK-2678: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1566#discussion_r51453182 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArrayComparator.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.util.Arrays; + +public class GenericArrayComparator<T> extends TypeComparator<T[]> implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + private transient T[] reference; + + protected final boolean ascendingComparison; + + private final TypeSerializer<T[]> serializer; + + // For use by getComparators + @SuppressWarnings("rawtypes") + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public GenericArrayComparator(boolean ascending, TypeSerializer<T[]> serializer) { + this.ascendingComparison = ascending; + this.serializer = serializer; + } + + @Override + public void setReference(T[] reference) { + this.reference = reference; + } + + @Override + public boolean equalToReference(T[] candidate) { + return compare(this.reference, candidate) == 0; + } + + @Override + public int compareToReference(TypeComparator<T[]> referencedComparator) { + int comp = compare(((GenericArrayComparator<T>) referencedComparator).reference, reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + T[] firstArray = serializer.deserialize(firstSource); + T[] secondArray = serializer.deserialize(secondSource); + + int comp = compare(firstArray, secondArray); + return ascendingComparison ? comp : -comp; + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + @Override + public boolean supportsNormalizedKey() { + return false; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + return 0; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + throw new UnsupportedOperationException(); + } + + @Override + public void putNormalizedKey(T[] record, MemorySegment target, int offset, int numBytes) { + throw new UnsupportedOperationException(); + } + + @Override + public void writeWithKeyNormalization(T[] record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public T[] readWithKeyDenormalization(T[] reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public int hash(T[] record) { + return Arrays.hashCode(record); + } + + private int compareValues(Object first, Object second) { --- End diff -- This only works for primitive types. What about complex types? > DataSet API does not support multi-dimensional arrays as keys > ------------------------------------------------------------- > > Key: FLINK-2678 > URL: https://issues.apache.org/jira/browse/FLINK-2678 > Project: Flink > Issue Type: Wish > Components: DataSet API > Reporter: Till Rohrmann > Assignee: Subhobrata Dey > Priority: Minor > > The DataSet API does not support grouping/sorting on field which are > multi-dimensional arrays. It could be helpful to also support these types. -- This message was sent by Atlassian JIRA (v6.3.4#6332)