asfgit closed pull request #6707: [FLINK-10157] [State TTL] Allow `null` user 
values in map state with TTL
URL: https://github.com/apache/flink/pull/6707
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md
index fb787763536..decf1dbe9de 100644
--- a/docs/dev/stream/state/state.md
+++ b/docs/dev/stream/state/state.md
@@ -350,6 +350,9 @@ will lead to compatibility failure and 
`StateMigrationException`.
 
 - The TTL configuration is not part of check- or savepoints but rather a way 
of how Flink treats it in the currently running job.
 
+- The map state with TTL currently supports null user values only if the user 
value serializer can handle null values. 
+If the serializer does not support null values, it can be wrapped with 
`NullableSerializer` at the cost of an extra byte in the serialized form.
+
 #### Cleanup of Expired State
 
 Currently, expired values are only removed when they are read out explicitly, 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
index 6eb8ddcd649..9b0094d50cd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java
@@ -37,6 +37,12 @@
  * <p>To create keyed map state (on a KeyedStream), use
  * {@link 
org.apache.flink.api.common.functions.RuntimeContext#getMapState(MapStateDescriptor)}.
  *
+ * <p>Note: The map state with TTL currently supports {@code null} user values
+ * only if the user value serializer can handle {@code null} values.
+ * If the serializer does not support {@code null} values,
+ * it can be wrapped with {@link 
org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
+ * at the cost of an extra byte in the serialized form.
+ *
  * @param <UK> The type of the keys that can be added to the map state.
  */
 @PublicEvolving
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index f4ed929bca9..42eaea4c482 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -32,6 +32,12 @@
 
 /**
  * Configuration of state TTL logic.
+ *
+ * <p>Note: The map state with TTL currently supports {@code null} user values
+ * only if the user value serializer can handle {@code null} values.
+ * If the serializer does not support {@code null} values,
+ * it can be wrapped with {@link 
org.apache.flink.api.java.typeutils.runtime.NullableSerializer}
+ * at the cost of an extra byte in the serialized form.
  */
 public class StateTtlConfig implements Serializable {
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
new file mode 100644
index 00000000000..fe392e4b1cb
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.CompatibilityUtil;
+import 
org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Serializer wrapper to add support of {@code null} value serialization.
+ *
+ * <p>If the target serializer does not support {@code null} values of its 
type,
+ * you can use this class to wrap this serializer.
+ * This is a generic treatment of {@code null} value serialization
+ * which comes with the cost of additional byte in the final serialized value.
+ * The {@code NullableSerializer} will intercept {@code null} value 
serialization case
+ * and prepend the target serialized value with a boolean flag marking whether 
it is {@code null} or not.
+ * <pre> {@code
+ * TypeSerializer<T> originalSerializer = ...;
+ * TypeSerializer<T> serializerWithNullValueSupport = 
NullableSerializer.wrap(originalSerializer);
+ * // or
+ * TypeSerializer<T> serializerWithNullValueSupport = 
NullableSerializer.wrapIfNullIsNotSupported(originalSerializer);
+ * }}</pre>
+ *
+ * @param <T> type to serialize
+ */
+public class NullableSerializer<T> extends TypeSerializer<T> {
+       private static final long serialVersionUID = 3335569358214720033L;
+       private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
+
+       @Nonnull
+       private final TypeSerializer<T> originalSerializer;
+       private final byte[] padding;
+
+       private NullableSerializer(@Nonnull TypeSerializer<T> 
originalSerializer, boolean padNullValueIfFixedLen) {
+               this.originalSerializer = originalSerializer;
+               this.padding = createPadding(originalSerializer.getLength(), 
padNullValueIfFixedLen);
+
+       }
+
+       private static <T> byte[] createPadding(int originalSerializerLength, 
boolean padNullValueIfFixedLen) {
+               boolean padNullValue = originalSerializerLength > 0 && 
padNullValueIfFixedLen;
+               return padNullValue ? new byte[originalSerializerLength] : 
EMPTY_BYTE_ARRAY;
+       }
+
+       /**
+        * This method tries to serialize {@code null} value with the {@code 
originalSerializer}
+        * and wraps it in case of {@link NullPointerException}, otherwise it 
returns the {@code originalSerializer}.
+        *
+        * @param originalSerializer serializer to wrap and add {@code null} 
support
+        * @param padNullValueIfFixedLen pad null value to preserve the fixed 
length of original serializer
+        * @return serializer which supports {@code null} values
+        */
+       public static <T> TypeSerializer<T> wrapIfNullIsNotSupported(
+               @Nonnull TypeSerializer<T> originalSerializer, boolean 
padNullValueIfFixedLen) {
+               return checkIfNullSupported(originalSerializer) ?
+                       originalSerializer : wrap(originalSerializer, 
padNullValueIfFixedLen);
+       }
+
+       /**
+        * This method checks if {@code serializer} supports {@code null} value.
+        *
+        * @param serializer serializer to check
+        */
+       public static <T> boolean checkIfNullSupported(@Nonnull 
TypeSerializer<T> serializer) {
+               int length = serializer.getLength() > 0 ? 
serializer.getLength() : 1;
+               DataOutputSerializer dos = new DataOutputSerializer(length);
+               try {
+                       serializer.serialize(null, dos);
+               } catch (IOException | RuntimeException e) {
+                       return false;
+               }
+               Preconditions.checkArgument(
+                       serializer.getLength() < 0 || serializer.getLength() == 
dos.getCopyOfBuffer().length,
+                       "The serialized form of the null value should have the 
same length " +
+                               "as any other if the length is fixed in the 
serializer");
+               DataInputDeserializer dis = new 
DataInputDeserializer(dos.getSharedBuffer());
+               try {
+                       Preconditions.checkArgument(serializer.deserialize(dis) 
== null);
+               } catch (IOException e) {
+                       throw new RuntimeException(
+                               String.format("Unexpected failure to 
deserialize just serialized null value with %s",
+                                       serializer.getClass().getName()), e);
+               }
+               Preconditions.checkArgument(
+                       serializer.copy(null) == null,
+                       "Serializer %s has to be able properly copy null value 
if it can serialize it",
+                       serializer.getClass().getName());
+               return true;
+       }
+
+       private boolean padNullValue() {
+               return padding.length > 0;
+       }
+
+       /**
+        * This method wraps the {@code originalSerializer} with the {@code 
NullableSerializer} if not already wrapped.
+        *
+        * @param originalSerializer serializer to wrap and add {@code null} 
support
+        * @param padNullValueIfFixedLen pad null value to preserve the fixed 
length of original serializer
+        * @return wrapped serializer which supports {@code null} values
+        */
+       public static <T> TypeSerializer<T> wrap(
+               @Nonnull TypeSerializer<T> originalSerializer, boolean 
padNullValueIfFixedLen) {
+               return originalSerializer instanceof NullableSerializer ?
+                       originalSerializer : new 
NullableSerializer<>(originalSerializer, padNullValueIfFixedLen);
+       }
+
+       @Override
+       public boolean isImmutableType() {
+               return originalSerializer.isImmutableType();
+       }
+
+       @Override
+       public TypeSerializer<T> duplicate() {
+               TypeSerializer<T> duplicateOriginalSerializer = 
originalSerializer.duplicate();
+               return duplicateOriginalSerializer == originalSerializer ?
+                       this : new 
NullableSerializer<>(originalSerializer.duplicate(), padNullValue());
+       }
+
+       @Override
+       public T createInstance() {
+               return originalSerializer.createInstance();
+       }
+
+       @Override
+       public T copy(T from) {
+               return from == null ? null : originalSerializer.copy(from);
+       }
+
+       @Override
+       public T copy(T from, T reuse) {
+               return from == null ? null :
+                       (reuse == null ? originalSerializer.copy(from) : 
originalSerializer.copy(from, reuse));
+       }
+
+       @Override
+       public int getLength() {
+               return padNullValue() ? 1 + padding.length : -1;
+       }
+
+       @Override
+       public void serialize(T record, DataOutputView target) throws 
IOException {
+               if (record == null) {
+                       target.writeBoolean(true);
+                       target.write(padding);
+               } else {
+                       target.writeBoolean(false);
+                       originalSerializer.serialize(record, target);
+               }
+       }
+
+       @Override
+       public T deserialize(DataInputView source) throws IOException {
+               boolean isNull = deserializeNull(source);
+               return isNull ? null : originalSerializer.deserialize(source);
+       }
+
+       @Override
+       public T deserialize(T reuse, DataInputView source) throws IOException {
+               boolean isNull = deserializeNull(source);
+               return isNull ? null : (reuse == null ?
+                       originalSerializer.deserialize(source) : 
originalSerializer.deserialize(reuse, source));
+       }
+
+       private boolean deserializeNull(DataInputView source) throws 
IOException {
+               boolean isNull = source.readBoolean();
+               if (isNull) {
+                       source.skipBytesToRead(padding.length);
+               }
+               return isNull;
+       }
+
+       @Override
+       public void copy(DataInputView source, DataOutputView target) throws 
IOException {
+               boolean isNull = source.readBoolean();
+               target.writeBoolean(isNull);
+               if (isNull) {
+                       target.write(padding);
+               } else {
+                       originalSerializer.copy(source, target);
+               }
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return obj == this ||
+                       (obj != null && obj.getClass() == getClass() &&
+                               originalSerializer.equals(((NullableSerializer) 
obj).originalSerializer));
+       }
+
+       @Override
+       public boolean canEqual(Object obj) {
+               return (obj != null && obj.getClass() == getClass() &&
+                       originalSerializer.canEqual(((NullableSerializer) 
obj).originalSerializer));
+       }
+
+       @Override
+       public int hashCode() {
+               return originalSerializer.hashCode();
+       }
+
+       @Override
+       public NullableSerializerConfigSnapshot<T> snapshotConfiguration() {
+               return new 
NullableSerializerConfigSnapshot<>(originalSerializer);
+       }
+
+       @Override
+       public CompatibilityResult<T> 
ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+               if (configSnapshot instanceof NullableSerializerConfigSnapshot) 
{
+                       List<Tuple2<TypeSerializer<?>, 
TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs =
+                               ((NullableSerializerConfigSnapshot) 
configSnapshot).getNestedSerializersAndConfigs();
+
+                       CompatibilityResult<T> compatResult = 
CompatibilityUtil.resolveCompatibilityResult(
+                               previousKvSerializersAndConfigs.get(0).f0,
+                               UnloadableDummyTypeSerializer.class,
+                               previousKvSerializersAndConfigs.get(0).f1,
+                               originalSerializer);
+
+                       if (!compatResult.isRequiresMigration()) {
+                               return CompatibilityResult.compatible();
+                       } else if (compatResult.getConvertDeserializer() != 
null) {
+                               return CompatibilityResult.requiresMigration(
+                                       new NullableSerializer<>(
+                                               new 
TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), 
padNullValue()));
+                       }
+               }
+
+               return CompatibilityResult.requiresMigration();
+       }
+
+       /**
+        * Configuration snapshot for serializers of nullable types, containing 
the
+        * configuration snapshot of its original serializer.
+        */
+       @Internal
+       public static class NullableSerializerConfigSnapshot<T> extends 
CompositeTypeSerializerConfigSnapshot {
+               private static final int VERSION = 1;
+
+               /** This empty nullary constructor is required for 
deserializing the configuration. */
+               @SuppressWarnings("unused")
+               public NullableSerializerConfigSnapshot() {}
+
+               NullableSerializerConfigSnapshot(TypeSerializer<T> 
originalSerializer) {
+                       super(originalSerializer);
+               }
+
+               @Override
+               public int getVersion() {
+                       return VERSION;
+               }
+       }
+}
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
index 57015c78be0..1997866fb3c 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java
@@ -32,6 +32,7 @@
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.flink.api.java.typeutils.runtime.NullableSerializer;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.InstantiationUtil;
@@ -53,23 +54,23 @@
  * internal state would be corrupt, which becomes evident when toString is 
called.
  */
 public abstract class SerializerTestBase<T> extends TestLogger {
-       
+
        protected abstract TypeSerializer<T> createSerializer();
 
        /**
         * Gets the expected length for the serializer's {@link 
TypeSerializer#getLength()} method.
-        * 
+        *
         * <p>The expected length should be positive, for fix-length data 
types, or {@code -1} for
         * variable-length types.
         */
        protected abstract int getLength();
-       
+
        protected abstract Class<T> getTypeClass();
-       
+
        protected abstract T[] getTestData();
 
        // 
--------------------------------------------------------------------------------------------
-       
+
        @Test
        public void testInstantiate() {
                try {
@@ -80,13 +81,13 @@ public void testInstantiate() {
                        }
                        T instance = serializer.createInstance();
                        assertNotNull("The created instance must not be null.", 
instance);
-                       
+
                        Class<T> type = getTypeClass();
                        assertNotNull("The test is corrupt: type class is 
null.", type);
 
                        if (!type.isAssignableFrom(instance.getClass())) {
                                fail("Type of the instantiated object is wrong. 
" +
-                                               "Expected Type: " + type + " 
present type " + instance.getClass());
+                                       "Expected Type: " + type + " present 
type " + instance.getClass());
                        }
                }
                catch (Exception e) {
@@ -127,7 +128,7 @@ public void testSnapshotConfigurationAndReconfigure() 
throws Exception {
                strategy = getSerializer().ensureCompatibility(new 
TestIncompatibleSerializerConfigSnapshot());
                assertTrue(strategy.isRequiresMigration());
        }
-       
+
        @Test
        public void testGetLength() {
                final int len = getLength();
@@ -146,16 +147,16 @@ public void testGetLength() {
                        fail("Exception in test: " + e.getMessage());
                }
        }
-       
+
        @Test
        public void testCopy() {
                try {
                        TypeSerializer<T> serializer = getSerializer();
                        T[] testData = getData();
-                       
+
                        for (T datum : testData) {
                                T copy = serializer.copy(datum);
-                               copy.toString();
+                               checkToString(copy);
                                deepEquals("Copied element is not equal to the 
original element.", datum, copy);
                        }
                }
@@ -165,16 +166,16 @@ public void testCopy() {
                        fail("Exception in test: " + e.getMessage());
                }
        }
-       
+
        @Test
        public void testCopyIntoNewElements() {
                try {
                        TypeSerializer<T> serializer = getSerializer();
                        T[] testData = getData();
-                       
+
                        for (T datum : testData) {
                                T copy = serializer.copy(datum, 
serializer.createInstance());
-                               copy.toString();
+                               checkToString(copy);
                                deepEquals("Copied element is not equal to the 
original element.", datum, copy);
                        }
                }
@@ -184,18 +185,18 @@ public void testCopyIntoNewElements() {
                        fail("Exception in test: " + e.getMessage());
                }
        }
-       
+
        @Test
        public void testCopyIntoReusedElements() {
                try {
                        TypeSerializer<T> serializer = getSerializer();
                        T[] testData = getData();
-                       
+
                        T target = serializer.createInstance();
-                       
+
                        for (T datum : testData) {
                                T copy = serializer.copy(datum, target);
-                               copy.toString();
+                               checkToString(copy);
                                deepEquals("Copied element is not equal to the 
original element.", datum, copy);
                                target = copy;
                        }
@@ -206,25 +207,25 @@ public void testCopyIntoReusedElements() {
                        fail("Exception in test: " + e.getMessage());
                }
        }
-       
+
        @Test
        public void testSerializeIndividually() {
                try {
                        TypeSerializer<T> serializer = getSerializer();
                        T[] testData = getData();
-                       
+
                        for (T value : testData) {
                                TestOutputView out = new TestOutputView();
                                serializer.serialize(value, out);
                                TestInputView in = out.getInputView();
-                               
+
                                assertTrue("No data available during 
deserialization.", in.available() > 0);
-                               
+
                                T deserialized = 
serializer.deserialize(serializer.createInstance(), in);
-                               deserialized.toString();
+                               checkToString(deserialized);
 
                                deepEquals("Deserialized value if wrong.", 
value, deserialized);
-                               
+
                                assertTrue("Trailing data available after 
deserialization.", in.available() == 0);
                        }
                }
@@ -241,23 +242,23 @@ public void testSerializeIndividuallyReusingValues() {
                try {
                        TypeSerializer<T> serializer = getSerializer();
                        T[] testData = getData();
-                       
+
                        T reuseValue = serializer.createInstance();
-                       
+
                        for (T value : testData) {
                                TestOutputView out = new TestOutputView();
                                serializer.serialize(value, out);
                                TestInputView in = out.getInputView();
-                               
+
                                assertTrue("No data available during 
deserialization.", in.available() > 0);
-                               
+
                                T deserialized = 
serializer.deserialize(reuseValue, in);
-                               deserialized.toString();
+                               checkToString(deserialized);
 
                                deepEquals("Deserialized value if wrong.", 
value, deserialized);
-                               
+
                                assertTrue("Trailing data available after 
deserialization.", in.available() == 0);
-                               
+
                                reuseValue = deserialized;
                        }
                }
@@ -267,29 +268,29 @@ public void testSerializeIndividuallyReusingValues() {
                        fail("Exception in test: " + e.getMessage());
                }
        }
-       
+
        @Test
        public void testSerializeAsSequenceNoReuse() {
                try {
                        TypeSerializer<T> serializer = getSerializer();
                        T[] testData = getData();
-                       
+
                        TestOutputView out = new TestOutputView();
                        for (T value : testData) {
                                serializer.serialize(value, out);
                        }
-                       
+
                        TestInputView in = out.getInputView();
-                       
+
                        int num = 0;
                        while (in.available() > 0) {
                                T deserialized = serializer.deserialize(in);
-                               deserialized.toString();
+                               checkToString(deserialized);
 
                                deepEquals("Deserialized value if wrong.", 
testData[num], deserialized);
                                num++;
                        }
-                       
+
                        assertEquals("Wrong number of elements deserialized.", 
testData.length, num);
                }
                catch (Exception e) {
@@ -298,31 +299,31 @@ public void testSerializeAsSequenceNoReuse() {
                        fail("Exception in test: " + e.getMessage());
                }
        }
-       
+
        @Test
        public void testSerializeAsSequenceReusingValues() {
                try {
                        TypeSerializer<T> serializer = getSerializer();
                        T[] testData = getData();
-                       
+
                        TestOutputView out = new TestOutputView();
                        for (T value : testData) {
                                serializer.serialize(value, out);
                        }
-                       
+
                        TestInputView in = out.getInputView();
                        T reuseValue = serializer.createInstance();
-                       
+
                        int num = 0;
                        while (in.available() > 0) {
                                T deserialized = 
serializer.deserialize(reuseValue, in);
-                               deserialized.toString();
+                               checkToString(deserialized);
 
                                deepEquals("Deserialized value if wrong.", 
testData[num], deserialized);
                                reuseValue = deserialized;
                                num++;
                        }
-                       
+
                        assertEquals("Wrong number of elements deserialized.", 
testData.length, num);
                }
                catch (Exception e) {
@@ -331,30 +332,30 @@ public void testSerializeAsSequenceReusingValues() {
                        fail("Exception in test: " + e.getMessage());
                }
        }
-       
+
        @Test
        public void testSerializedCopyIndividually() {
                try {
                        TypeSerializer<T> serializer = getSerializer();
                        T[] testData = getData();
-                       
+
                        for (T value : testData) {
                                TestOutputView out = new TestOutputView();
                                serializer.serialize(value, out);
-                               
+
                                TestInputView source = out.getInputView();
                                TestOutputView target = new TestOutputView();
                                serializer.copy(source, target);
-                               
+
                                TestInputView toVerify = target.getInputView();
-                               
+
                                assertTrue("No data available copying.", 
toVerify.available() > 0);
-                               
+
                                T deserialized = 
serializer.deserialize(serializer.createInstance(), toVerify);
-                               deserialized.toString();
+                               checkToString(deserialized);
 
                                deepEquals("Deserialized value if wrong.", 
value, deserialized);
-                               
+
                                assertTrue("Trailing data available after 
deserialization.", toVerify.available() == 0);
                        }
                }
@@ -364,36 +365,36 @@ public void testSerializedCopyIndividually() {
                        fail("Exception in test: " + e.getMessage());
                }
        }
-       
-       
+
+
        @Test
        public void testSerializedCopyAsSequence() {
                try {
                        TypeSerializer<T> serializer = getSerializer();
                        T[] testData = getData();
-                       
+
                        TestOutputView out = new TestOutputView();
                        for (T value : testData) {
                                serializer.serialize(value, out);
                        }
-                       
+
                        TestInputView source = out.getInputView();
                        TestOutputView target = new TestOutputView();
                        for (int i = 0; i < testData.length; i++) {
                                serializer.copy(source, target);
                        }
-                       
+
                        TestInputView toVerify = target.getInputView();
                        int num = 0;
-                       
+
                        while (toVerify.available() > 0) {
                                T deserialized = 
serializer.deserialize(serializer.createInstance(), toVerify);
-                               deserialized.toString();
+                               checkToString(deserialized);
 
                                deepEquals("Deserialized value if wrong.", 
testData[num], deserialized);
                                num++;
                        }
-                       
+
                        assertEquals("Wrong number of elements copied.", 
testData.length, num);
                }
                catch (Exception e) {
@@ -402,7 +403,7 @@ public void testSerializedCopyAsSequence() {
                        fail("Exception in test: " + e.getMessage());
                }
        }
-       
+
        @Test
        public void testSerializabilityAndEquals() {
                try {
@@ -414,7 +415,7 @@ public void testSerializabilityAndEquals() {
                                fail("The serializer is not serializable: " + 
e);
                                return;
                        }
-                       
+
                        assertEquals("The copy of the serializer is not equal 
to the original one.", ser1, ser2);
                }
                catch (Exception e) {
@@ -423,10 +424,26 @@ public void testSerializabilityAndEquals() {
                        fail("Exception in test: " + e.getMessage());
                }
        }
-       
+
+       @Test
+       public void testNullability() {
+               TypeSerializer<T> serializer = getSerializer();
+               try {
+                       NullableSerializer.checkIfNullSupported(serializer);
+               } catch (Throwable t) {
+                       System.err.println(t.getMessage());
+                       t.printStackTrace();
+                       fail("Unexpected failure of null value handling: " + 
t.getMessage());
+               }
+       }
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        protected void deepEquals(String message, T should, T is) {
+               Assert.assertTrue((should == null && is == null) || (should != 
null && is != null));
+               if (should == null) {
+                       return;
+               }
                if (should.getClass().isArray()) {
                        if (should instanceof boolean[]) {
                                Assert.assertTrue(message, 
Arrays.equals((boolean[]) should, (boolean[]) is));
@@ -463,9 +480,9 @@ else if (should instanceof Throwable) {
                        assertEquals(message,  should, is);
                }
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        protected TypeSerializer<T> getSerializer() {
                TypeSerializer<T> serializer = createSerializer();
                if (serializer == null) {
@@ -473,7 +490,7 @@ else if (should instanceof Throwable) {
                }
                return serializer;
        }
-       
+
        private T[] getData() {
                T[] data = getTestData();
                if (data == null) {
@@ -481,15 +498,15 @@ else if (should instanceof Throwable) {
                }
                return data;
        }
-       
+
        // 
--------------------------------------------------------------------------------------------
-       
+
        private static final class TestOutputView extends DataOutputStream 
implements DataOutputView {
-               
+
                public TestOutputView() {
                        super(new ByteArrayOutputStream(4096));
                }
-               
+
                public TestInputView getInputView() {
                        ByteArrayOutputStream baos = (ByteArrayOutputStream) 
out;
                        return new TestInputView(baos.toByteArray());
@@ -509,8 +526,8 @@ public void write(DataInputView source, int numBytes) 
throws IOException {
                        write(buffer);
                }
        }
-       
-       
+
+
        private static final class TestInputView extends DataInputStream 
implements DataInputView {
 
                public TestInputView(byte[] data) {
@@ -542,4 +559,10 @@ public int hashCode() {
                        return getClass().hashCode();
                }
        }
+
+       private static <T> void checkToString(T value) {
+               if (value != null) {
+                       value.toString();
+               }
+       }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerTest.java
new file mode 100644
index 00000000000..3bd176a8fbc
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.java.typeutils.runtime;
+
+import org.apache.flink.api.common.typeutils.SerializerTestBase;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/** Unit tests for {@link NullableSerializer}. */
+@RunWith(Parameterized.class)
+public class NullableSerializerTest extends SerializerTestBase<Integer> {
+       private static final TypeSerializer<Integer> originalSerializer = 
IntSerializer.INSTANCE;
+
+       @Parameterized.Parameters(name = "{0}")
+       public static List<Boolean> whetherToPadNullValue() {
+               return Arrays.asList(true, false);
+       }
+
+       @Parameterized.Parameter
+       public boolean padNullValue;
+
+       private TypeSerializer<Integer> nullableSerializer;
+
+       @Before
+       public void init() {
+               nullableSerializer = 
NullableSerializer.wrapIfNullIsNotSupported(originalSerializer, padNullValue);
+       }
+
+       @Override
+       protected TypeSerializer<Integer> createSerializer() {
+               return 
NullableSerializer.wrapIfNullIsNotSupported(originalSerializer, padNullValue);
+       }
+
+       @Override
+       protected int getLength() {
+               return padNullValue ? 5 : -1;
+       }
+
+       @Override
+       protected Class<Integer> getTypeClass() {
+               return Integer.class;
+       }
+
+       @Override
+       protected Integer[] getTestData() {
+               return new Integer[] { 5, -1, 0, null };
+       }
+
+       @Test
+       public void testWrappingNotNeeded() {
+               
assertEquals(NullableSerializer.wrapIfNullIsNotSupported(StringSerializer.INSTANCE,
 padNullValue), StringSerializer.INSTANCE);
+       }
+
+       @Test
+       public void testWrappingNeeded() {
+               assertTrue(nullableSerializer instanceof NullableSerializer);
+               
assertEquals(NullableSerializer.wrapIfNullIsNotSupported(nullableSerializer, 
padNullValue), nullableSerializer);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
index 268f84aaa9b..3b0a99f782c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
@@ -78,6 +78,14 @@
        }
 
        <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> 
V getWithTtlCheckAndUpdate(
+               SupplierWithException<TtlValue<V>, SE> getter,
+               ThrowingConsumer<TtlValue<V>, CE> updater,
+               ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
+               TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, 
updater, stateClear);
+               return ttlValue == null ? null : ttlValue.getUserValue();
+       }
+
+       <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> 
TtlValue<V> getWrappedWithTtlCheckAndUpdate(
                SupplierWithException<TtlValue<V>, SE> getter,
                ThrowingConsumer<TtlValue<V>, CE> updater,
                ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
@@ -92,6 +100,6 @@
                } else if (updateTsOnRead) {
                        updater.accept(rewrapWithNewTs(ttlValue));
                }
-               return ttlValue.getUserValue();
+               return ttlValue;
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index 160dbeb71e9..f84a2ee4820 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -54,7 +54,13 @@
 
        @Override
        public UV get(UK key) throws Exception {
-               return getWithTtlCheckAndUpdate(() -> original.get(key), v -> 
original.put(key, v), () -> original.remove(key));
+               TtlValue<UV> ttlValue = getWrapped(key);
+               return ttlValue == null ? null : ttlValue.getUserValue();
+       }
+
+       private TtlValue<UV> getWrapped(UK key) throws Exception {
+               return getWrappedWithTtlCheckAndUpdate(
+                       () -> original.get(key), v -> original.put(key, v), () 
-> original.remove(key));
        }
 
        @Override
@@ -83,7 +89,8 @@ public void remove(UK key) throws Exception {
 
        @Override
        public boolean contains(UK key) throws Exception {
-               return get(key) != null;
+               TtlValue<UV> ttlValue = getWrapped(key);
+               return ttlValue != null;
        }
 
        @Override
@@ -161,16 +168,16 @@ public void remove() {
                }
 
                private Map.Entry<UK, UV> 
getUnexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) {
-                       UV unexpiredValue;
+                       TtlValue<UV> unexpiredValue;
                        try {
-                               unexpiredValue = getWithTtlCheckAndUpdate(
+                               unexpiredValue = 
getWrappedWithTtlCheckAndUpdate(
                                        e::getValue,
                                        v -> original.put(e.getKey(), v),
                                        originalIterator::remove);
                        } catch (Exception ex) {
                                throw new FlinkRuntimeException(ex);
                        }
-                       return unexpiredValue == null ? null : new 
AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue);
+                       return unexpiredValue == null ? null : new 
AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue.getUserValue());
                }
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
index 773fe7c4474..be231c63428 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
@@ -44,6 +44,6 @@ private static long getExpirationTimestamp(long ts, long ttl) 
{
        }
 
        static <V> TtlValue<V> wrapWithTs(V value, long ts) {
-               return value == null ? null : new TtlValue<>(value, ts);
+               return new TtlValue<>(value, ts);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
index a8bcadf9d19..48435d567e8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.state.ttl;
 
-import org.apache.flink.util.Preconditions;
+import javax.annotation.Nullable;
 
 import java.io.Serializable;
 
@@ -30,15 +30,16 @@
 class TtlValue<T> implements Serializable {
        private static final long serialVersionUID = 5221129704201125020L;
 
+       @Nullable
        private final T userValue;
        private final long lastAccessTimestamp;
 
-       TtlValue(T userValue, long lastAccessTimestamp) {
-               Preconditions.checkNotNull(userValue);
+       TtlValue(@Nullable T userValue, long lastAccessTimestamp) {
                this.userValue = userValue;
                this.lastAccessTimestamp = lastAccessTimestamp;
        }
 
+       @Nullable
        T getUserValue() {
                return userValue;
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
index 7fd61aaeb29..7294b4aa747 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
@@ -22,6 +22,7 @@
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -35,9 +36,9 @@
        void initTestValues() {
                emptyValue = Collections.emptySet();
 
-               updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), 
Tuple2.of(10, "10"));
-               updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
-               updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
+               updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), 
Tuple2.of(23, null), Tuple2.of(10, "10"));
+               updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(24, 
null), Tuple2.of(7, "7"));
+               updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(25, null), 
Tuple2.of(4, "4"));
 
                getUpdateEmpty = updateEmpty.entrySet();
                getUnexpired = updateUnexpired.entrySet();
@@ -46,7 +47,9 @@ void initTestValues() {
 
        @SafeVarargs
        private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
-               return Arrays.stream(entries).collect(Collectors.toMap(t -> 
t.f0, t -> t.f1));
+               Map<UK, UV> map = new HashMap<>();
+               Arrays.stream(entries).forEach(t -> map.put(t.f0, t.f1));
+               return map;
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
index fb025afd9ad..a77c8ed958c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
@@ -43,7 +43,10 @@ void update(String value) throws Exception {
 
        @Override
        String get() throws Exception {
-               return ttlState.get(TEST_KEY);
+               String value = ttlState.get(TEST_KEY);
+               assert (getOriginal() == null && !ttlState.contains(TEST_KEY)) 
||
+                       (getOriginal() != null && ttlState.contains(TEST_KEY));
+               return value;
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerNullElementTestContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerNullElementTestContext.java
new file mode 100644
index 00000000000..6f8c70bcb63
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerNullElementTestContext.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+class TtlMapStatePerNullElementTestContext extends 
TtlMapStatePerElementTestContext {
+       @Override
+       void initTestValues() {
+               updateEmpty = null;
+               updateUnexpired = null;
+               updateExpired = null;
+
+               getUpdateEmpty = null;
+               getUnexpired = null;
+               getUpdateExpired = null;
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index 6b3a15fdd30..f9f108a3a12 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -69,6 +69,7 @@ public void setup() {
                        new TtlListStateTestContext(),
                        new TtlMapStateAllEntriesTestContext(),
                        new TtlMapStatePerElementTestContext(),
+                       new TtlMapStatePerNullElementTestContext(),
                        new TtlAggregatingStateTestContext(),
                        new TtlReducingStateTestContext(),
                        new TtlFoldingStateTestContext());


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to