[ 
https://issues.apache.org/jira/browse/FLINK-3140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15093747#comment-15093747
 ] 

ASF GitHub Bot commented on FLINK-3140:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1465#discussion_r49441097
  
    --- Diff: 
flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/NullAwareComparator.scala
 ---
    @@ -0,0 +1,186 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.api.table.typeinfo
    +
    +import org.apache.flink.api.common.typeutils.TypeComparator
    +import org.apache.flink.core.memory.{MemorySegment, DataOutputView, 
DataInputView}
    +
    +/**
    + * Null-aware comparator that wraps a comparator which does not support 
null references.
    + */
    +class NullAwareComparator[T](
    +    val wrappedComparator: TypeComparator[T],
    +    val order: Boolean)
    +  extends TypeComparator[T] {
    +
    +  // stores the null for reference comparison
    +  private var nullReference = false
    +
    +  override def hash(record: T): Int = {
    +    if (record != null) {
    +      wrappedComparator.hash(record)
    +    }
    +    else {
    +      0
    +    }
    +  }
    +
    + override def getNormalizeKeyLen: Int = {
    +    val len = wrappedComparator.getNormalizeKeyLen
    +    if (len == Integer.MAX_VALUE) {
    +      Integer.MAX_VALUE
    +    }
    +    else {
    +      len + 1 // add one for a null byte
    +    }
    +  }
    +
    +  override def putNormalizedKey(
    +      record: T,
    +      target: MemorySegment,
    +      offset: Int,
    +      numBytes: Int)
    +    : Unit = {
    +    if (numBytes > 0) {
    +      // write a null byte with padding
    +      if (record == null) {
    +        target.putBoolean(offset, false)
    +        // write padding
    +        var j = 0
    +        while (j < numBytes - 1) {
    +          target.put(offset + 1 + j, 0.toByte)
    +          j += 1
    +        }
    +      }
    +      // write a non-null byte with key
    +      else {
    +        target.putBoolean(offset, true)
    +        // write key
    +        wrappedComparator.putNormalizedKey(record, target, offset + 1, 
numBytes - 1)
    +      }
    +    }
    +  }
    +
    +  override def invertNormalizedKey(): Boolean = 
wrappedComparator.invertNormalizedKey()
    +
    +  override def supportsSerializationWithKeyNormalization(): Boolean = false
    +
    +  override def writeWithKeyNormalization(record: T, target: 
DataOutputView): Unit =
    +    throw new UnsupportedOperationException("Normalized keys not 
supported.")
    +
    +  override def readWithKeyDenormalization(reuse: T, source: 
DataInputView): T =
    +    throw new UnsupportedOperationException("Normalized keys not 
supported.")
    +
    +  override def isNormalizedKeyPrefixOnly(keyBytes: Int): Boolean =
    +    wrappedComparator.isNormalizedKeyPrefixOnly(keyBytes)
    +
    +  override def setReference(toCompare: T): Unit = {
    +    if (toCompare == null) {
    +      nullReference = true
    +    }
    +    else {
    +      nullReference = false
    +      wrappedComparator.setReference(toCompare)
    +    }
    +  }
    +
    +  override def compare(first: T, second: T): Int = {
    +    // both values are null -> equality
    +    if (first == null && second == null) {
    +      0
    +    }
    +    // first value is null -> inequality
    +    // but order is considered
    +    else if (first == null) {
    +      if (order) -1 else 1
    +    }
    +    // second value is null -> inequality
    +    // but order is considered
    +    else if (second == null) {
    +      if (order) 1 else -1
    +    }
    +    // no null values
    +    else {
    +      wrappedComparator.compare(first, second)
    +    }
    +  }
    +
    +  override def compareToReference(referencedComparator: 
TypeComparator[T]): Int = {
    +    val otherComparator = 
referencedComparator.asInstanceOf[NullAwareComparator[T]]
    +    val otherNullReference = otherComparator.nullReference
    +    // both values are null -> equality
    +    if (nullReference && otherNullReference) {
    +      0
    +    }
    +    // first value is null -> inequality
    +    // but order is considered
    +    else if (nullReference) {
    +      if (order) 1 else -1
    +    }
    +    // second value is null -> inequality
    +    // but order is considered
    +    else if (otherNullReference) {
    +      if (order) -1 else 1
    +    }
    +    // no null values
    +    else {
    +      
wrappedComparator.compareToReference(otherComparator.wrappedComparator)
    +    }
    +  }
    +
    +  override def supportsNormalizedKey(): Boolean = 
wrappedComparator.supportsNormalizedKey()
    +
    +  override def equalToReference(candidate: T): Boolean = {
    +    // both values are null
    +    if (candidate == null && nullReference) {
    +      true
    +    }
    +    // one value is null
    +    else if (candidate == null || nullReference) {
    +      false
    +    }
    +    // no null value
    +    else {
    +      wrappedComparator.equalToReference(candidate)
    +    }
    +  }
    +
    +  override def duplicate(): TypeComparator[T] = {
    +    new NullAwareComparator[T](wrappedComparator.duplicate(), order)
    +  }
    +
    +  override def extractKeys(record: Any, target: Array[AnyRef], index: 
Int): Int = {
    +    if (record == null) {
    +      target(index) = null
    +      1
    +    }
    +    else {
    +      wrappedComparator.extractKeys(record, target, index)
    +    }
    +  }
    +
    +  // 
----------------------------------------------------------------------------------------------
    +
    +  override def getFlatComparators: Array[TypeComparator[_]] =
    --- End diff --
    
    OK, I see. Your implementation assumes that this class is only used within 
`RowComparator`. How about guarding against other uses and making this a 
private member class of `RowComparator`?


> NULL value data layout in Row Serializer/Comparator
> ---------------------------------------------------
>
>                 Key: FLINK-3140
>                 URL: https://issues.apache.org/jira/browse/FLINK-3140
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API
>            Reporter: Chengxiang Li
>            Assignee: Timo Walther
>
> To store/materialize NULL value in Row objects, we should need new Row 
> Serializer/Comparator which is aware of NULL value fields.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to