[ https://issues.apache.org/jira/browse/FLINK-3140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15093743#comment-15093743 ]
ASF GitHub Bot commented on FLINK-3140: --------------------------------------- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/1465#discussion_r49440748 --- Diff: flink-staging/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/NullAwareComparator.scala --- @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.typeinfo + +import org.apache.flink.api.common.typeutils.TypeComparator +import org.apache.flink.core.memory.{MemorySegment, DataOutputView, DataInputView} + +/** + * Null-aware comparator that wraps a comparator which does not support null references. + */ +class NullAwareComparator[T]( + val wrappedComparator: TypeComparator[T], + val order: Boolean) + extends TypeComparator[T] { + + // stores the null for reference comparison + private var nullReference = false + + override def hash(record: T): Int = { + if (record != null) { + wrappedComparator.hash(record) + } + else { + 0 + } + } + + override def getNormalizeKeyLen: Int = { + val len = wrappedComparator.getNormalizeKeyLen + if (len == Integer.MAX_VALUE) { + Integer.MAX_VALUE + } + else { + len + 1 // add one for a null byte + } + } + + override def putNormalizedKey( + record: T, + target: MemorySegment, + offset: Int, + numBytes: Int) + : Unit = { + if (numBytes > 0) { + // write a null byte with padding + if (record == null) { + target.putBoolean(offset, false) + // write padding + var j = 0 + while (j < numBytes - 1) { + target.put(offset + 1 + j, 0.toByte) + j += 1 + } + } + // write a non-null byte with key + else { + target.putBoolean(offset, true) + // write key + wrappedComparator.putNormalizedKey(record, target, offset + 1, numBytes - 1) + } + } + } + + override def invertNormalizedKey(): Boolean = wrappedComparator.invertNormalizedKey() + + override def supportsSerializationWithKeyNormalization(): Boolean = false + + override def writeWithKeyNormalization(record: T, target: DataOutputView): Unit = + throw new UnsupportedOperationException("Normalized keys not supported.") + + override def readWithKeyDenormalization(reuse: T, source: DataInputView): T = + throw new UnsupportedOperationException("Normalized keys not supported.") --- End diff -- Does it make sense to change it in the other Comparators as well? `TupleComparatorBase` doesn't even have a exception message. > NULL value data layout in Row Serializer/Comparator > --------------------------------------------------- > > Key: FLINK-3140 > URL: https://issues.apache.org/jira/browse/FLINK-3140 > Project: Flink > Issue Type: Sub-task > Components: Table API > Reporter: Chengxiang Li > Assignee: Timo Walther > > To store/materialize NULL value in Row objects, we should need new Row > Serializer/Comparator which is aware of NULL value fields. -- This message was sent by Atlassian JIRA (v6.3.4#6332)