
ASF GitHub Bot commented on FLINK-3140:

Github user fhueske commented on a diff in the pull request:

    --- Diff: 
    @@ -0,0 +1,463 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.typeinfo
    +import java.util
    +import org.apache.flink.api.common.typeutils.base.BasicTypeComparator
    +import org.apache.flink.api.common.typeutils.{CompositeTypeComparator, 
TypeComparator, TypeSerializer}
    +import org.apache.flink.api.java.typeutils.runtime.TupleComparatorBase
    +import org.apache.flink.api.table.Row
    +import org.apache.flink.api.table.typeinfo.NullMaskUtils.readIntoNullMask
    +import org.apache.flink.api.table.typeinfo.RowComparator.NullChecker
    +import org.apache.flink.core.memory.{MemorySegment, DataOutputView, 
    +import org.apache.flink.types.{KeyFieldOutOfBoundsException, 
    + * Comparator for [[Row]].
    + */
    +class RowComparator private (
    +    /** key positions describe which fields are keys in what order */
    +    val keyPositions: Array[Int],
    +    /** comparators for the key fields, in the same order as the key 
fields */
    +    val comparators: Array[TypeComparator[_]],
    +    /** serializers to deserialize the first n fields for comparison */
    +    val serializers: Array[TypeSerializer[_]],
    +    /** auxiliary fields for normalized key support */
    +    private val auxiliaryFields: (List[Int], Int, Int))
    +  extends CompositeTypeComparator[Row] with Serializable {
    +  // null masks for serialized comparison
    +  private val nullMask1 = new Array[Boolean](serializers.length)
    +  private val nullMask2 = new Array[Boolean](serializers.length)
    +  // cache for the deserialized key field objects
    +  @transient
    +  private lazy val deserializedKeyFields1: Array[Any] = 
    +  @transient
    +  private lazy val deserializedKeyFields2: Array[Any] = 
    +  // null checker for reference comparison
    +  private val nullChecker = new NullChecker()
    +  // create auxiliary fields
    +  private val normalizedKeyLengths: Array[Int] = 
    +  private val numLeadingNormalizableKeys: Int = auxiliaryFields._2
    +  private val normalizableKeyPrefixLen: Int = auxiliaryFields._3
    +  // first comparator decides invert the key direction
    +  private val invertNormKey: Boolean = comparators(0).invertNormalizedKey()
    +  def this(
    +      keyPositions: Array[Int],
    +      comparators: Array[TypeComparator[_]],
    +      serializers: Array[TypeSerializer[_]]) = {
    +    this(
    +      keyPositions,
    +      comparators,
    +      serializers,
    +      createAuxiliaryFields(comparators))
    +  }
    +  private def instantiateDeserializationFields(): Array[Any] = {
    +    val newFields = new Array[Any](serializers.length)
    +    var i = 0
    +    while (i < serializers.length) {
    +      newFields(i) = serializers(i).createInstance()
    +      i += 1
    +    }
    +    newFields
    +  }
    +  // 
    +  //  Comparator Methods
    +  // 
    +  override def getFlatComparator(flatComparators: 
util.List[TypeComparator[_]]): Unit =
    +    comparators.foreach {
    +      case ctc: CompositeTypeComparator[_] => 
    +      case c@_ => flatComparators.add(c)
    +    }
    +  override def compareToReference(referencedComparator: 
TypeComparator[Row]): Int = {
    +    val other: RowComparator = 
    +    var i = 0
    +    try {
    +      while (i < keyPositions.length) {
    +        val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
    +        val otherComparator = 
    +        val nullCheck = comparator.equalToReference(nullChecker)
    +        val nullCheckOther = otherComparator.equalToReference(nullChecker)
    +        var cmp = 0
    +        // both values are null -> equality
    +        if (nullCheck && nullCheckOther) {
    +          cmp = 0
    +        }
    +        // one value is null -> inequality
    +        // order is considered for basic types
    +        else if (nullCheck || nullCheckOther) {
    +          if (comparator.isInstanceOf[BasicTypeComparator[_]]) {
    +            val basicComp = comparator.asInstanceOf[BasicTypeComparator[_]]
    +            if (nullCheck) {
    +              return if (basicComp.isAscendingComparison) 1 else -1
    +            }
    +            else if (nullCheckOther) {
    +              return if (basicComp.isAscendingComparison) -1 else 1
    +            }
    +          }
    +          else {
    +            return if (nullCheck) 1 else -1
    +          }
    +        }
    +        else {
    +          cmp = comparator.compareToReference(otherComparator)
    +        }
    +        if (cmp != 0) {
    +          return cmp
    +        }
    +        i = i + 1
    +      }
    +      0
    +    }
    +    catch {
    +      case npex: NullPointerException =>
    +        throw new NullKeyFieldException(keyPositions(i))
    +      case iobex: IndexOutOfBoundsException =>
    +        throw new KeyFieldOutOfBoundsException(keyPositions(i))
    +    }
    +  }
    +  override def compareSerialized(firstSource: DataInputView, secondSource: 
DataInputView): Int = {
    +    val len = serializers.length
    +    val keyLen = keyPositions.length
    +    readIntoNullMask(len, firstSource, nullMask1)
    +    readIntoNullMask(len, secondSource, nullMask2)
    +    // deserialize
    +    var i = 0
    +    while (i < len) {
    +      val serializer = serializers(i).asInstanceOf[TypeSerializer[Any]]
    +      // deserialize field 1
    +      if (nullMask1(i)) {
    +        deserializedKeyFields1(i) = null
    +      }
    +      else if (deserializedKeyFields1(i) != null) {
    +        deserializedKeyFields1(i) = 
serializer.deserialize(deserializedKeyFields1(i), firstSource)
    +      }
    +      else {
    +        deserializedKeyFields1(i) = serializer.deserialize(firstSource)
    +      }
    +      // deserialize field 2
    +      if (nullMask2(i)) {
    +        deserializedKeyFields2(i) = null
    +      }
    +      else if (deserializedKeyFields2(i) != null) {
    +        deserializedKeyFields2(i) = 
serializer.deserialize(deserializedKeyFields2(i), secondSource)
    +      }
    +      else {
    +        deserializedKeyFields2(i) = serializer.deserialize(secondSource)
    +      }
    +      i += 1
    +    }
    +    // compare
    +    i = 0
    +    while (i < keyLen) {
    +      val keyPos = keyPositions(i)
    +      val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
    +      val firstElement = deserializedKeyFields1(keyPos)
    +      val secondElement = deserializedKeyFields2(keyPos)
    +      var cmp = 0
    +      // both values are null -> equality
    +      if (firstElement == null && secondElement == null) {
    +        cmp = 0
    +      }
    +      // one value is null -> inequality
    +      // order is considered for basic types
    +      else if (firstElement == null || secondElement == null) {
    +        if (comparator.isInstanceOf[BasicTypeComparator[_]]) {
    +          val basicComp = comparator.asInstanceOf[BasicTypeComparator[_]]
    +          if (firstElement == null) {
    +            return if (basicComp.isAscendingComparison) -1 else 1
    +          }
    +          else if (secondElement == null) {
    +            return if (basicComp.isAscendingComparison) 1 else -1
    +          }
    +        }
    +        else {
    +          return if (firstElement == null) -1 else 1
    +        }
    +      }
    +      else {
    +        cmp = comparator.compare(firstElement, secondElement)
    +      }
    +      if (cmp != 0) {
    +        return cmp
    +      }
    +      i += 1
    +    }
    +    0
    +  }
    +  override def supportsNormalizedKey(): Boolean = 
numLeadingNormalizableKeys > 0
    +  override def getNormalizeKeyLen: Int = normalizableKeyPrefixLen
    +  override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
    +    numLeadingNormalizableKeys < keyPositions.length ||
    +      normalizableKeyPrefixLen == Integer.MAX_VALUE ||
    +      normalizableKeyPrefixLen > keyBytes
    +  override def invertNormalizedKey(): Boolean = invertNormKey
    +  override def supportsSerializationWithKeyNormalization(): Boolean = false
    +  override def writeWithKeyNormalization(record: Row, target: 
DataOutputView): Unit =
    +    throw new UnsupportedOperationException("Normalized keys not 
suppported for Rows.")
    +  override def readWithKeyDenormalization(reuse: Row, source: 
DataInputView): Row =
    +    throw new UnsupportedOperationException("Normalized keys not 
suppported for Rows.")
    +  override def duplicate(): TypeComparator[Row] = {
    +    // copy comparator and serializer factories
    +    val comparatorsCopy = comparators.map(_.duplicate())
    +    val serializersCopy = serializers.map(_.duplicate())
    +    new RowComparator(
    +      keyPositions,
    +      comparatorsCopy,
    +      serializersCopy,
    +      auxiliaryFields)
    +  }
    +  override def hash(value: Row): Int = {
    +    var code: Int = 0
    +    var i = 0
    +    try {
    +      while(i < keyPositions.length) {
    +        code *= TupleComparatorBase.HASH_SALT(i & 0x1F)
    +        val element = value.productElement(keyPositions(i))
    +        if (element != null) {
    +          code += 
    +        }
    +        i += 1
    +      }
    +    } catch {
    +      case npex: NullPointerException =>
    +        throw new NullKeyFieldException(keyPositions(i))
    +      case iobex: IndexOutOfBoundsException =>
    +        throw new KeyFieldOutOfBoundsException(keyPositions(i))
    +    }
    +    code
    +  }
    +  override def setReference(toCompare: Row) {
    +    var i = 0
    +    try {
    +      while(i < keyPositions.length) {
    +        val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
    +        val element = toCompare.productElement(keyPositions(i))
    +        comparator.setReference(element) // element can be null
    +        i += 1
    +      }
    +    } catch {
    +      case npex: NullPointerException =>
    +        throw new NullKeyFieldException(keyPositions(i))
    +      case iobex: IndexOutOfBoundsException =>
    +        throw new KeyFieldOutOfBoundsException(keyPositions(i))
    +    }
    +  }
    +  override def equalToReference(candidate: Row): Boolean = {
    +    var i = 0
    +    try {
    +      while(i < keyPositions.length) {
    +        val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
    +        val element = candidate.productElement(keyPositions(i))
    +        // check if reference is null
    +        if (element == null && !comparator.equalToReference(nullChecker)) {
    +          return false
    +        }
    +        else if (element != null && !comparator.equalToReference(element)) 
    +          return false
    +        }
    +        i += 1
    +      }
    +    } catch {
    +      case npex: NullPointerException =>
    +        throw new NullKeyFieldException(keyPositions(i))
    +      case iobex: IndexOutOfBoundsException =>
    +        throw new KeyFieldOutOfBoundsException(keyPositions(i))
    +    }
    +    true
    +  }
    +  override def compare(first: Row, second: Row): Int = {
    +    var i = 0
    +    try {
    +      while(i < keyPositions.length) {
    +        val keyPos: Int = keyPositions(i)
    +        val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
    +        val firstElement = first.productElement(keyPos)
    +        val secondElement = second.productElement(keyPos)
    +        var cmp = 0
    +        // both values are null -> equality
    +        if (firstElement == null && secondElement == null) {
    +          cmp = 0
    +        }
    +        // one value is null -> inequality
    +        // order is considered for basic types
    +        else if (firstElement == null || secondElement == null) {
    +          if (comparator.isInstanceOf[BasicTypeComparator[_]]) {
    +            val basicComp = comparator.asInstanceOf[BasicTypeComparator[_]]
    +            if (firstElement == null) {
    +              return if (basicComp.isAscendingComparison) -1 else 1
    +            }
    +            else if (secondElement == null) {
    +              return if (basicComp.isAscendingComparison) 1 else -1
    +            }
    +          }
    +          else {
    +            return if (firstElement == null) -1 else 1
    +          }
    +        }
    +        else {
    +          cmp = comparator.compare(firstElement, secondElement)
    +        }
    +        if (cmp != 0) {
    +          return cmp
    +        }
    +        i += 1
    +      }
    +    } catch {
    +      case npex: NullPointerException =>
    +        throw new NullKeyFieldException(keyPositions(i))
    +      case iobex: IndexOutOfBoundsException =>
    +        throw new KeyFieldOutOfBoundsException(keyPositions(i))
    +    }
    +    0
    +  }
    +  override def putNormalizedKey(record: Row, target: MemorySegment, 
offset: Int, numBytes: Int)
    +      : Unit = {
    +    var bytesLeft = numBytes
    +    var currentOffset = offset
    +    var i = 0
    +    var j = 0
    +    while (i < numLeadingNormalizableKeys && bytesLeft > 0) {
    +      var len = normalizedKeyLengths(i)
    +      len = if (bytesLeft >= len) len else bytesLeft
    +      val comparator = comparators(i).asInstanceOf[TypeComparator[Any]]
    +      val element = record.productElement(keyPositions(i))
    +      if (len > 0) {
    +        // write a null byte with padding
    +        if (element == null) {
    +          target.putBoolean(currentOffset, true)
    +          // write padding
    +          j = 0
    +          while (j < len - 1) {
    +            target.put(currentOffset + j, 0.toByte)
    +            j += 1
    +          }
    +        }
    +        // write a non-null byte with key
    +        else {
    +          target.putBoolean(currentOffset, false)
    +          // write key
    +          comparator.putNormalizedKey(element, target, currentOffset, len 
- 1)
    +        }
    +      }
    +      bytesLeft -= len
    +      currentOffset += len
    +      i += 1
    +    }
    +  }
    +  override def extractKeys(record: Any, target: Array[AnyRef], index: 
Int): Int = {
    +    val len = comparators.length
    +    var localIndex = index
    +    var i = 0
    +    while (i < len) {
    +      val element = 
    +      localIndex += comparators(i).extractKeys(element, target, localIndex)
    +      i += 1
    +    }
    +    localIndex - index
    +  }
    +object RowComparator {
    +  private class NullChecker extends Comparable[AnyRef] with Serializable {
    +    override def compareTo(obj: AnyRef): Int =
    +      throw new UnsupportedOperationException("This should never be 
    +    override def equals(obj: Any): Boolean =
    +      obj == null
    +  }
    +  /**
    +   * @return creates auxiliary fields for normalized key support
    +   */
    +  private def createAuxiliaryFields(
    +      comparators: Array[TypeComparator[_]])
    +    : (List[Int], Int, Int) = {
    +    comparators.takeWhile {
    +      c =>
    +        // As long as the leading keys support normalized keys, we can 
build up the composite key.
    +        // The first comparator decides whether we need to invert the key 
    +        // If a successor does not agree on the inversion direction, it 
cannot be part of the
    +        // normalized key.
    +        c.supportsNormalizedKey() &&
    +          c.invertNormalizedKey() == comparators(0).invertNormalizedKey()
    +    }.foldLeft((List[Int](), 0, 0)) {
    +      case ((normKeyLengths, numLeadingNormKeys, normKeyPrefixLen), 
comparator) =>
    +        val len = comparator.getNormalizeKeyLen
    +        if (len < 0) {
    +          throw new RuntimeException("Comparator " + 
comparator.getClass.getName +
    +            " specifies an invalid length for the normalized key: " + len)
    +        }
    +        val newNormKeyLengths = (len + 1) :: normKeyLengths // add one for 
a null byte
    --- End diff --
    `len` can be `Integer.MAX_VALUE` (as for example for `StringComparator`). 
Adding 1 results in an overflow.

> NULL value data layout in Row Serializer/Comparator
> ---------------------------------------------------
>                 Key: FLINK-3140
>                 URL: https://issues.apache.org/jira/browse/FLINK-3140
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API
>            Reporter: Chengxiang Li
>            Assignee: Timo Walther
> To store/materialize NULL value in Row objects, we should need new Row 
> Serializer/Comparator which is aware of NULL value fields.

This message was sent by Atlassian JIRA

Reply via email to