[ https://issues.apache.org/jira/browse/FLINK-3140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15068549#comment-15068549 ]
ASF GitHub Bot commented on FLINK-3140: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/1465#discussion_r48285315 --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RowComparator.scala --- @@ -0,0 +1,463 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.typeinfo + +import java.util + +import org.apache.flink.api.common.typeutils.base.BasicTypeComparator +import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, TypeComparator, TypeSerializer} +import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase +import org.apache.flink.api.table.Row +import org.apache.flink.api.table.typeinfo.NullMaskUtils.readIntoNullMask +import org.apache.flink.api.table.typeinfo.RowComparator.createAuxiliaryFields +import org.apache.flink.api.table.typeinfo.RowComparator.NullChecker +import org.apache.flink.core.memory.{MemorySegment, DataOutputView, DataInputView} +import org.apache.flink.types.{KeyFieldOutOfBoundsException, NullKeyFieldException} + +/** + * Comparator for [[Row]]. + */ +class RowComparator private ( + /** key positions describe which fields are keys in what order */ + val keyPositions: Array[Int], + /** comparators for the key fields, in the same order as the key fields */ + val comparators: Array[TypeComparator[_]], + /** serializers to deserialize the first n fields for comparison */ + val serializers: Array[TypeSerializer[_]], + /** auxiliary fields for normalized key support */ + private val auxiliaryFields: (List[Int], Int, Int)) + extends CompositeTypeComparator[Row] with Serializable { + + // null masks for serialized comparison + private val nullMask1 = new Array[Boolean](serializers.length) + private val nullMask2 = new Array[Boolean](serializers.length) + + // cache for the deserialized key field objects + @transient + private lazy val deserializedKeyFields1: Array[Any] = instantiateDeserializationFields() + + @transient + private lazy val deserializedKeyFields2: Array[Any] = instantiateDeserializationFields() + + // null checker for reference comparison + private val nullChecker = new NullChecker() + + // create auxiliary fields + private val normalizedKeyLengths: Array[Int] = auxiliaryFields._1.toArray[Int] + private val numLeadingNormalizableKeys: Int = auxiliaryFields._2 + private val normalizableKeyPrefixLen: Int = auxiliaryFields._3 + + // first comparator decides invert the key direction + private val invertNormKey: Boolean = comparators(0).invertNormalizedKey() + + def this( + keyPositions: Array[Int], + comparators: Array[TypeComparator[_]], + serializers: Array[TypeSerializer[_]]) = { + this( + keyPositions, + comparators, + serializers, + createAuxiliaryFields(comparators)) + } + + private def instantiateDeserializationFields(): Array[Any] = { + val newFields = new Array[Any](serializers.length) + var i = 0 + while (i < serializers.length) { + newFields(i) = serializers(i).createInstance() + i += 1 + } + newFields + } + + // -------------------------------------------------------------------------------------------- + // Comparator Methods + // -------------------------------------------------------------------------------------------- + + override def getFlatComparator(flatComparators: util.List[TypeComparator[_]]): Unit = + comparators.foreach { + case ctc: CompositeTypeComparator[_] => ctc.getFlatComparator(flatComparators) + case c@_ => flatComparators.add(c) + } + + override def compareToReference(referencedComparator: TypeComparator[Row]): Int = { + val other: RowComparator = referencedComparator.asInstanceOf[RowComparator] + var i = 0 + try { + while (i < keyPositions.length) { + val comparator = comparators(i).asInstanceOf[TypeComparator[Any]] + val otherComparator = other.comparators(i).asInstanceOf[TypeComparator[Any]] + + val nullCheck = comparator.equalToReference(nullChecker) + val nullCheckOther = otherComparator.equalToReference(nullChecker) + + var cmp = 0 + // both values are null -> equality + if (nullCheck && nullCheckOther) { + cmp = 0 + } + // one value is null -> inequality + // order is considered for basic types + else if (nullCheck || nullCheckOther) { + if (comparator.isInstanceOf[BasicTypeComparator[_]]) { + val basicComp = comparator.asInstanceOf[BasicTypeComparator[_]] + if (nullCheck) { + return if (basicComp.isAscendingComparison) 1 else -1 + } + else if (nullCheckOther) { + return if (basicComp.isAscendingComparison) -1 else 1 + } + } + else { + return if (nullCheck) 1 else -1 + } + } + else { + cmp = comparator.compareToReference(otherComparator) + } + + if (cmp != 0) { + return cmp + } + i = i + 1 + } + 0 + } + catch { + case npex: NullPointerException => + throw new NullKeyFieldException(keyPositions(i)) + case iobex: IndexOutOfBoundsException => + throw new KeyFieldOutOfBoundsException(keyPositions(i)) + } + } + + override def compareSerialized(firstSource: DataInputView, secondSource: DataInputView): Int = { + val len = serializers.length + val keyLen = keyPositions.length + + readIntoNullMask(len, firstSource, nullMask1) + readIntoNullMask(len, secondSource, nullMask2) + + // deserialize + var i = 0 + while (i < len) { + val serializer = serializers(i).asInstanceOf[TypeSerializer[Any]] + + // deserialize field 1 + if (nullMask1(i)) { + deserializedKeyFields1(i) = null + } + else if (deserializedKeyFields1(i) != null) { + deserializedKeyFields1(i) = serializer.deserialize(deserializedKeyFields1(i), firstSource) + } + else { + deserializedKeyFields1(i) = serializer.deserialize(firstSource) + } + + // deserialize field 2 + if (nullMask2(i)) { + deserializedKeyFields2(i) = null + } + else if (deserializedKeyFields2(i) != null) { + deserializedKeyFields2(i) = serializer.deserialize(deserializedKeyFields2(i), secondSource) + } + else { + deserializedKeyFields2(i) = serializer.deserialize(secondSource) + } + + i += 1 + } + + // compare + i = 0 + while (i < keyLen) { + val keyPos = keyPositions(i) + val comparator = comparators(i).asInstanceOf[TypeComparator[Any]] + + val firstElement = deserializedKeyFields1(keyPos) + val secondElement = deserializedKeyFields2(keyPos) + + var cmp = 0 + // both values are null -> equality + if (firstElement == null && secondElement == null) { + cmp = 0 + } + // one value is null -> inequality + // order is considered for basic types + else if (firstElement == null || secondElement == null) { + if (comparator.isInstanceOf[BasicTypeComparator[_]]) { + val basicComp = comparator.asInstanceOf[BasicTypeComparator[_]] + if (firstElement == null) { + return if (basicComp.isAscendingComparison) -1 else 1 + } + else if (secondElement == null) { + return if (basicComp.isAscendingComparison) 1 else -1 + } + } + else { + return if (firstElement == null) -1 else 1 + } + } + else { + cmp = comparator.compare(firstElement, secondElement) + } + + if (cmp != 0) { + return cmp + } + + i += 1 + } + 0 + } + + override def supportsNormalizedKey(): Boolean = numLeadingNormalizableKeys > 0 + + override def getNormalizeKeyLen: Int = normalizableKeyPrefixLen + + override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean = + numLeadingNormalizableKeys < keyPositions.length || + normalizableKeyPrefixLen == Integer.MAX_VALUE || + normalizableKeyPrefixLen > keyBytes + + override def invertNormalizedKey(): Boolean = invertNormKey + + override def supportsSerializationWithKeyNormalization(): Boolean = false + + override def writeWithKeyNormalization(record: Row, target: DataOutputView): Unit = + throw new UnsupportedOperationException("Normalized keys not suppported for Rows.") + + override def readWithKeyDenormalization(reuse: Row, source: DataInputView): Row = + throw new UnsupportedOperationException("Normalized keys not suppported for Rows.") + + override def duplicate(): TypeComparator[Row] = { + // copy comparator and serializer factories + val comparatorsCopy = comparators.map(_.duplicate()) + val serializersCopy = serializers.map(_.duplicate()) + + new RowComparator( + keyPositions, + comparatorsCopy, + serializersCopy, + auxiliaryFields) + } + + override def hash(value: Row): Int = { + var code: Int = 0 + var i = 0 + try { + while(i < keyPositions.length) { + code *= TupleComparatorBase.HASH_SALT(i & 0x1F) + val element = value.productElement(keyPositions(i)) + if (element != null) { + code += comparators(i).asInstanceOf[TypeComparator[Any]].hash(element) + } + i += 1 + } + } catch { + case npex: NullPointerException => + throw new NullKeyFieldException(keyPositions(i)) + case iobex: IndexOutOfBoundsException => + throw new KeyFieldOutOfBoundsException(keyPositions(i)) + } + code + } + + override def setReference(toCompare: Row) { + var i = 0 + try { + while(i < keyPositions.length) { + val comparator = comparators(i).asInstanceOf[TypeComparator[Any]] + val element = toCompare.productElement(keyPositions(i)) + comparator.setReference(element) // element can be null + i += 1 + } + } catch { + case npex: NullPointerException => + throw new NullKeyFieldException(keyPositions(i)) + case iobex: IndexOutOfBoundsException => + throw new KeyFieldOutOfBoundsException(keyPositions(i)) + } + } + + override def equalToReference(candidate: Row): Boolean = { + var i = 0 + try { + while(i < keyPositions.length) { + val comparator = comparators(i).asInstanceOf[TypeComparator[Any]] + val element = candidate.productElement(keyPositions(i)) + // check if reference is null + if (element == null && !comparator.equalToReference(nullChecker)) { + return false + } + else if (element != null && !comparator.equalToReference(element)) { + return false + } + i += 1 + } + } catch { + case npex: NullPointerException => + throw new NullKeyFieldException(keyPositions(i)) + case iobex: IndexOutOfBoundsException => + throw new KeyFieldOutOfBoundsException(keyPositions(i)) + } + true + } + + override def compare(first: Row, second: Row): Int = { + var i = 0 + try { + while(i < keyPositions.length) { + val keyPos: Int = keyPositions(i) + val comparator = comparators(i).asInstanceOf[TypeComparator[Any]] + val firstElement = first.productElement(keyPos) + val secondElement = second.productElement(keyPos) + + var cmp = 0 + // both values are null -> equality + if (firstElement == null && secondElement == null) { + cmp = 0 + } + // one value is null -> inequality + // order is considered for basic types + else if (firstElement == null || secondElement == null) { + if (comparator.isInstanceOf[BasicTypeComparator[_]]) { + val basicComp = comparator.asInstanceOf[BasicTypeComparator[_]] + if (firstElement == null) { + return if (basicComp.isAscendingComparison) -1 else 1 + } + else if (secondElement == null) { + return if (basicComp.isAscendingComparison) 1 else -1 + } + } + else { + return if (firstElement == null) -1 else 1 + } + } + else { + cmp = comparator.compare(firstElement, secondElement) + } + + if (cmp != 0) { + return cmp + } + i += 1 + } + } catch { + case npex: NullPointerException => + throw new NullKeyFieldException(keyPositions(i)) + case iobex: IndexOutOfBoundsException => + throw new KeyFieldOutOfBoundsException(keyPositions(i)) + } + 0 + } + + override def putNormalizedKey(record: Row, target: MemorySegment, offset: Int, numBytes: Int) + : Unit = { + var bytesLeft = numBytes + var currentOffset = offset + + var i = 0 + var j = 0 + while (i < numLeadingNormalizableKeys && bytesLeft > 0) { + var len = normalizedKeyLengths(i) + len = if (bytesLeft >= len) len else bytesLeft + + val comparator = comparators(i).asInstanceOf[TypeComparator[Any]] + val element = record.productElement(keyPositions(i)) + if (len > 0) { + // write a null byte with padding + if (element == null) { + target.putBoolean(currentOffset, true) + // write padding + j = 0 + while (j < len - 1) { + target.put(currentOffset + j, 0.toByte) + j += 1 + } + } + // write a non-null byte with key + else { + target.putBoolean(currentOffset, false) + // write key + comparator.putNormalizedKey(element, target, currentOffset, len - 1) + } + } + + bytesLeft -= len + currentOffset += len + i += 1 + } + } + + override def extractKeys(record: Any, target: Array[AnyRef], index: Int): Int = { + val len = comparators.length + var localIndex = index + var i = 0 + while (i < len) { + val element = record.asInstanceOf[Row].productElement(keyPositions(i)) + localIndex += comparators(i).extractKeys(element, target, localIndex) + i += 1 + } + localIndex - index + } + +} + +object RowComparator { + private class NullChecker extends Comparable[AnyRef] with Serializable { + override def compareTo(obj: AnyRef): Int = + throw new UnsupportedOperationException("This should never be called.") + override def equals(obj: Any): Boolean = + obj == null + } + + /** + * @return creates auxiliary fields for normalized key support + */ + private def createAuxiliaryFields( + comparators: Array[TypeComparator[_]]) + : (List[Int], Int, Int) = { + comparators.takeWhile { + c => + // As long as the leading keys support normalized keys, we can build up the composite key. + // The first comparator decides whether we need to invert the key direction. + // If a successor does not agree on the inversion direction, it cannot be part of the + // normalized key. + c.supportsNormalizedKey() && + c.invertNormalizedKey() == comparators(0).invertNormalizedKey() + }.foldLeft((List[Int](), 0, 0)) { + case ((normKeyLengths, numLeadingNormKeys, normKeyPrefixLen), comparator) => + val len = comparator.getNormalizeKeyLen + + if (len < 0) { + throw new RuntimeException("Comparator " + comparator.getClass.getName + + " specifies an invalid length for the normalized key: " + len) + } + + val newNormKeyLengths = (len + 1) :: normKeyLengths // add one for a null byte --- End diff -- `len` can be `Integer.MAX_VALUE` (as for example for `StringComparator`). Adding 1 results in an overflow. > NULL value data layout in Row Serializer/Comparator > --------------------------------------------------- > > Key: FLINK-3140 > URL: https://issues.apache.org/jira/browse/FLINK-3140 > Project: Flink > Issue Type: Sub-task > Components: Table API > Reporter: Chengxiang Li > Assignee: Timo Walther > > To store/materialize NULL value in Row objects, we should need new Row > Serializer/Comparator which is aware of NULL value fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)