Github user tonycox commented on a diff in the pull request: https://github.com/apache/flink/pull/2968#discussion_r91547968 --- Diff: flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/RowComparator.java --- @@ -0,0 +1,698 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.CompositeTypeComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.types.KeyFieldOutOfBoundsException; +import org.apache.flink.types.Row; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.api.java.typeutils.runtime.NullMaskUtils.readIntoNullMask; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * Comparator for {@link Row} + */ +public class RowComparator extends CompositeTypeComparator<Row> { + + private static final long serialVersionUID = 1L; + /** The number of fields of the Row */ + private final int arity; + /** key positions describe which fields are keys in what order */ + private final int[] keyPositions; + /** null-aware comparators for the key fields, in the same order as the key fields */ + private final NullAwareComparator<Object>[] comparators; + /** serializers to deserialize the first n fields for comparison */ + private final TypeSerializer<Object>[] serializers; + /** auxiliary fields for normalized key support */ + private final int[] normalizedKeyLengths; + private final int numLeadingNormalizableKeys; + private final int normalizableKeyPrefixLen; + private final boolean invertNormKey; + + // null masks for serialized comparison + private final boolean[] nullMask1; + private final boolean[] nullMask2; + + // cache for the deserialized key field objects + transient private final Object[] deserializedKeyFields1; + transient private final Object[] deserializedKeyFields2; + + /** + * General constructor for RowComparator. + * + * @param arity the number of fields of the Row + * @param keyPositions key positions describe which fields are keys in what order + * @param comparators non-null-aware comparators for the key fields, in the same order as + * the key fields + * @param serializers serializers to deserialize the first n fields for comparison + * @param orders sorting orders for the fields + */ + public RowComparator( + int arity, + int[] keyPositions, + TypeComparator<Object>[] comparators, + TypeSerializer<Object>[] serializers, + boolean[] orders) { + this(arity, keyPositions, makeNullAware(comparators, orders), serializers); + } + + + /** + * Intermediate constructor for creating auxiliary fields. + */ + private RowComparator( + int arity, + int[] keyPositions, + NullAwareComparator<Object>[] comparators, + TypeSerializer<Object>[] serializers) { + this( + arity, + keyPositions, + comparators, + serializers, + createAuxiliaryFields(keyPositions, comparators)); + } + + /** + * Intermediate constructor for creating auxiliary fields. + */ + private RowComparator( + int arity, + int[] keyPositions, + NullAwareComparator<Object>[] comparators, + TypeSerializer<Object>[] serializers, + Tuple4<int[], Integer, Integer, Boolean> auxiliaryFields) { + this( + arity, + keyPositions, + comparators, + serializers, + auxiliaryFields.f0, + auxiliaryFields.f1, + auxiliaryFields.f2, + auxiliaryFields.f3); + } + + /** + * Intermediate constructor for creating auxiliary fields. + */ + private RowComparator( + int arity, + int[] keyPositions, + NullAwareComparator<Object>[] comparators, + TypeSerializer<Object>[] serializers, + int[] normalizedKeyLengths, + int numLeadingNormalizableKeys, int normalizableKeyPrefixLen, boolean invertNormKey) { + this.arity = arity; + this.keyPositions = keyPositions; + this.comparators = comparators; + this.serializers = serializers; + this.normalizedKeyLengths = normalizedKeyLengths; + this.numLeadingNormalizableKeys = numLeadingNormalizableKeys; + this.normalizableKeyPrefixLen = normalizableKeyPrefixLen; + this.invertNormKey = invertNormKey; + this.nullMask1 = new boolean[arity]; + this.nullMask2 = new boolean[arity]; + deserializedKeyFields1 = instantiateDeserializationFields(); + deserializedKeyFields2 = instantiateDeserializationFields(); + } + + // -------------------------------------------------------------------------------------------- + // Comparator Methods + // -------------------------------------------------------------------------------------------- + + @Override + public void getFlatComparator(List<TypeComparator> flatComparators) { + for (NullAwareComparator<Object> c : comparators) { + Collections.addAll(flatComparators, c.getFlatComparators()); + } + } + + @Override + public int hash(Row record) { + int code = 0; + int i = 0; + + try { + for (; i < keyPositions.length; i++) { + code *= TupleComparatorBase.HASH_SALT[i & 0x1F]; + Object element = record.getField(keyPositions[i]); // element can be null + code += comparators[i].hash(element); + } + } catch (IndexOutOfBoundsException e) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + + return code; + } + + @Override + public void setReference(Row toCompare) { + int i = 0; + try { + for (; i < keyPositions.length; i++) { + TypeComparator<Object> comparator = comparators[i]; + Object element = toCompare.getField(keyPositions[i]); + comparator.setReference(element); // element can be null + } + } catch (IndexOutOfBoundsException e) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + } + + @Override + public boolean equalToReference(Row candidate) { + int i = 0; + try { + for (; i < keyPositions.length; i++) { + TypeComparator<Object> comparator = comparators[i]; + Object element = candidate.getField(keyPositions[i]); // element can be null + // check if reference is not equal + if (!comparator.equalToReference(element)) { + return false; + } + } + } catch (IndexOutOfBoundsException e) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + return true; + } + + @Override + public int compareToReference(TypeComparator<Row> referencedComparator) { + RowComparator other = (RowComparator) referencedComparator; + int i = 0; + try { + for (; i < keyPositions.length; i++) { + int cmp = comparators[i].compareToReference(other.comparators[i]); + if (cmp != 0) { + return cmp; + } + } + } catch (IndexOutOfBoundsException e) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + return 0; + } + + @Override + public int compare(Row first, Row second) { + int i = 0; + try { + for (; i < keyPositions.length; i++) { + int keyPos = keyPositions[i]; + TypeComparator<Object> comparator = comparators[i]; + Object firstElement = first.getField(keyPos); // element can be null + Object secondElement = second.getField(keyPos); // element can be null + + int cmp = comparator.compare(firstElement, secondElement); + if (cmp != 0) { + return cmp; + } + } + } catch (IndexOutOfBoundsException e) { + throw new KeyFieldOutOfBoundsException(keyPositions[i]); + } + return 0; + } + + @Override + public int compareSerialized( + DataInputView firstSource, + DataInputView secondSource) throws IOException { + + int len = serializers.length; + int keyLen = keyPositions.length; + + readIntoNullMask(arity, firstSource, nullMask1); + readIntoNullMask(arity, secondSource, nullMask2); + + // deserialize + for (int i = 0; i < len; i++) { + TypeSerializer<Object> serializer = serializers[i]; + + // deserialize field 1 + if (!nullMask1[i]) { + deserializedKeyFields1[i] = serializer.deserialize( + deserializedKeyFields1[i], + firstSource); + } + + // deserialize field 2 + if (!nullMask2[i]) { + deserializedKeyFields2[i] = serializer.deserialize( + deserializedKeyFields2[i], + secondSource); + } + } + + // compare + for (int i = 0; i < keyLen; i++) { + int keyPos = keyPositions[i]; + TypeComparator<Object> comparator = comparators[i]; + + boolean isNull1 = nullMask1[keyPos]; + boolean isNull2 = nullMask2[keyPos]; + + int cmp = 0; + // both values are null -> equality + if (isNull1 && isNull2) { + cmp = 0; + } + // first value is null -> inequality + else if (isNull1) { + cmp = comparator.compare(null, deserializedKeyFields2[keyPos]); + } + // second value is null -> inequality + else if (isNull2) { + cmp = comparator.compare(deserializedKeyFields1[keyPos], null); + } + // no null values + else { + cmp = comparator.compare( + deserializedKeyFields1[keyPos], + deserializedKeyFields2[keyPos]); + } + + if (cmp != 0) { + return cmp; + } + } + + return 0; + } + + @Override + public boolean supportsNormalizedKey() { + return numLeadingNormalizableKeys > 0; + } + + @Override + public boolean supportsSerializationWithKeyNormalization() { + return false; + } + + @Override + public int getNormalizeKeyLen() { + return normalizableKeyPrefixLen; + } + + @Override + public boolean isNormalizedKeyPrefixOnly(int keyBytes) { + return numLeadingNormalizableKeys < keyPositions.length || + normalizableKeyPrefixLen == Integer.MAX_VALUE || + normalizableKeyPrefixLen > keyBytes; + } + + @Override + public void putNormalizedKey( + Row record, MemorySegment target, int offset, int numBytes) { + int bytesLeft = numBytes; + int currentOffset = offset; + + for (int i=0;i<numLeadingNormalizableKeys && bytesLeft > 0;i++){ + int len = normalizedKeyLengths[i]; + len = bytesLeft >= len ? len : bytesLeft; + + TypeComparator<Object> comparator = comparators[i]; + Object element = record.getField(keyPositions[i]); // element can be null + // write key + comparator.putNormalizedKey(element, target, currentOffset, len); + + bytesLeft -= len; + currentOffset += len; + } + + } + + @Override + public void writeWithKeyNormalization( + Row record, + DataOutputView target) throws IOException { + throw new UnsupportedOperationException( + "Record serialization with leading normalized keys not supported."); + + } + + @Override + public Row readWithKeyDenormalization(Row reuse, DataInputView source) throws IOException { + throw new UnsupportedOperationException( + "Record deserialization with leading normalized keys not supported."); + } + + @Override + public boolean invertNormalizedKey() { + return invertNormKey; + } + + @Override + public TypeComparator<Row> duplicate() { + NullAwareComparator<?>[] comparatorsCopy = new NullAwareComparator<?>[comparators.length]; + for (int i = 0; i < comparators.length; i++) { + comparatorsCopy[i] = (NullAwareComparator<?>) comparators[i].duplicate(); + } + + TypeSerializer<?>[] serializersCopy = new TypeSerializer<?>[serializers.length]; + for (int i = 0; i < serializers.length; i++) { + serializersCopy[i] = serializers[i].duplicate(); + } + + return new RowComparator( + arity, + keyPositions, + (NullAwareComparator<Object>[]) comparatorsCopy, + (TypeSerializer<Object>[]) serializersCopy, + normalizedKeyLengths, + numLeadingNormalizableKeys, + normalizableKeyPrefixLen, + invertNormKey); + } + + @Override + public int extractKeys(Object record, Object[] target, int index) { + int len = comparators.length; + int localIndex = index; + for(int i=0;i<len;i++){ --- End diff -- Please, add some spaces
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---