[ https://issues.apache.org/jira/browse/FLINK-2678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126656#comment-15126656 ]
ASF GitHub Bot commented on FLINK-2678: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1566#discussion_r51453634 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArrayComparator.java --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.typeutils.base; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; + +import java.io.IOException; +import java.lang.reflect.Array; +import java.util.Arrays; + +public class GenericArrayComparator<T> extends TypeComparator<T[]> implements java.io.Serializable { + + private static final long serialVersionUID = 1L; + + private transient T[] reference; + + protected final boolean ascendingComparison; + + private final TypeSerializer<T[]> serializer; + + // For use by getComparators + @SuppressWarnings("rawtypes") + private final TypeComparator[] comparators = new TypeComparator[] {this}; + + public GenericArrayComparator(boolean ascending, TypeSerializer<T[]> serializer) { + this.ascendingComparison = ascending; + this.serializer = serializer; + } + + @Override + public void setReference(T[] reference) { + this.reference = reference; + } + + @Override + public boolean equalToReference(T[] candidate) { + return compare(this.reference, candidate) == 0; + } + + @Override + public int compareToReference(TypeComparator<T[]> referencedComparator) { + int comp = compare(((GenericArrayComparator<T>) referencedComparator).reference, reference); + return ascendingComparison ? comp : -comp; + } + + @Override + public int compareSerialized(DataInputView firstSource, DataInputView secondSource) throws IOException { + T[] firstArray = serializer.deserialize(firstSource); + T[] secondArray = serializer.deserialize(secondSource); + + int comp = compare(firstArray, secondArray); + return ascendingComparison ? comp : -comp; + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + target[index] = record; + return 1; + } + + @Override + public TypeComparator[] getFlatComparators() { + return comparators; + } + + @Override + public boolean supportsNormalizedKey() { + return false; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + return 0; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + throw new UnsupportedOperationException(); + } + + @Override + public void putNormalizedKey(T[] record, MemorySegment target, int offset, int numBytes) { + throw new UnsupportedOperationException(); + } + + @Override + public void writeWithKeyNormalization(T[] record, DataOutputView target) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public T[] readWithKeyDenormalization(T[] reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public boolean invertNormalizedKey() { + return !ascendingComparison; + } + + @Override + public int hash(T[] record) { + return Arrays.hashCode(record); + } + + private int compareValues(Object first, Object second) { + if (first.getClass().equals(Boolean.class) && second.getClass().equals(Boolean.class)) { + return new BooleanComparator(true).compare((Boolean) first, (Boolean) second); + } + else if (first.getClass().equals(Byte.class) && second.getClass().equals(Byte.class)) { + return new ByteComparator(true).compare((Byte) first, (Byte) second); + } + else if (first.getClass().equals(Character.class) && second.getClass().equals(Character.class)) { + return new CharComparator(true).compare((Character) first, (Character) second); + } + else if (first.getClass().equals(Double.class) && second.getClass().equals(Double.class)) { + return new DoubleComparator(true).compare((Double) first, (Double) second); + } + else if (first.getClass().equals(Float.class) && second.getClass().equals(Float.class)) { + return new FloatComparator(true).compare((Float) first, (Float) second); + } + else if (first.getClass().equals(Integer.class) && second.getClass().equals(Integer.class)) { + return new IntComparator(true).compare((Integer) first, (Integer) second); + } + else if (first.getClass().equals(Long.class) && second.getClass().equals(Long.class)) { + return new LongComparator(true).compare((Long) first, (Long) second); + } + else if (first.getClass().equals(Short.class) && second.getClass().equals(Short.class)) { + return new ShortComparator(true).compare((Short) first, (Short) second); + } + else if (first.getClass().equals(String.class) && second.getClass().equals(String.class)) { + return new StringComparator(true).compare((String) first, (String) second); + } + return -1; + } + + @SuppressWarnings("unchecked") + private int parseGenericArray(Object firstArray, Object secondArray) { + int compareResult = 0; + if (firstArray.getClass().isArray() && secondArray.getClass().isArray()) { + int min = Array.getLength(firstArray); + int tempResult = 0; + + if (min < Array.getLength(secondArray)) { + tempResult = -1; + } + if (min > Array.getLength(secondArray)) { + min = Array.getLength(secondArray); + tempResult = 1; + } + + for(int i=0; i < min; i++) { + int val = parseGenericArray(Array.get(firstArray, i), Array.get(secondArray, i)); + if (val != 0 && compareResult == 0) { + compareResult = val; + } + } + + if (compareResult == 0) { + compareResult = tempResult; + } + } + else { + compareResult = compareValues(firstArray, secondArray); + } + return compareResult; + } + + @Override + public int compare(T[] first, T[] second) { --- End diff -- I think the compare method should only check that the length of `first` and `second` are the same and then apply the component comparator (derived from `ObjectArrayTypeInfo.componentInfo`) on the elements. > DataSet API does not support multi-dimensional arrays as keys > ------------------------------------------------------------- > > Key: FLINK-2678 > URL: https://issues.apache.org/jira/browse/FLINK-2678 > Project: Flink > Issue Type: Wish > Components: DataSet API > Reporter: Till Rohrmann > Assignee: Subhobrata Dey > Priority: Minor > > The DataSet API does not support grouping/sorting on field which are > multi-dimensional arrays. It could be helpful to also support these types. -- This message was sent by Atlassian JIRA (v6.3.4#6332)