asfgit closed pull request #6707: [FLINK-10157] [State TTL] Allow `null` user values in map state with TTL URL: https://github.com/apache/flink/pull/6707
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/stream/state/state.md b/docs/dev/stream/state/state.md index fb787763536..decf1dbe9de 100644 --- a/docs/dev/stream/state/state.md +++ b/docs/dev/stream/state/state.md @@ -350,6 +350,9 @@ will lead to compatibility failure and `StateMigrationException`. - The TTL configuration is not part of check- or savepoints but rather a way of how Flink treats it in the currently running job. +- The map state with TTL currently supports null user values only if the user value serializer can handle null values. +If the serializer does not support null values, it can be wrapped with `NullableSerializer` at the cost of an extra byte in the serialized form. + #### Cleanup of Expired State Currently, expired values are only removed when they are read out explicitly, diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java index 6eb8ddcd649..9b0094d50cd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MapStateDescriptor.java @@ -37,6 +37,12 @@ * <p>To create keyed map state (on a KeyedStream), use * {@link org.apache.flink.api.common.functions.RuntimeContext#getMapState(MapStateDescriptor)}. * + * <p>Note: The map state with TTL currently supports {@code null} user values + * only if the user value serializer can handle {@code null} values. + * If the serializer does not support {@code null} values, + * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer} + * at the cost of an extra byte in the serialized form. + * * @param <UK> The type of the keys that can be added to the map state. */ @PublicEvolving diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java index f4ed929bca9..42eaea4c482 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java @@ -32,6 +32,12 @@ /** * Configuration of state TTL logic. + * + * <p>Note: The map state with TTL currently supports {@code null} user values + * only if the user value serializer can handle {@code null} values. + * If the serializer does not support {@code null} values, + * it can be wrapped with {@link org.apache.flink.api.java.typeutils.runtime.NullableSerializer} + * at the cost of an extra byte in the serialized form. */ public class StateTtlConfig implements Serializable { diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java new file mode 100644 index 00000000000..fe392e4b1cb --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializer.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.CompatibilityUtil; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.io.IOException; +import java.util.List; + +/** + * Serializer wrapper to add support of {@code null} value serialization. + * + * <p>If the target serializer does not support {@code null} values of its type, + * you can use this class to wrap this serializer. + * This is a generic treatment of {@code null} value serialization + * which comes with the cost of additional byte in the final serialized value. + * The {@code NullableSerializer} will intercept {@code null} value serialization case + * and prepend the target serialized value with a boolean flag marking whether it is {@code null} or not. + * <pre> {@code + * TypeSerializer<T> originalSerializer = ...; + * TypeSerializer<T> serializerWithNullValueSupport = NullableSerializer.wrap(originalSerializer); + * // or + * TypeSerializer<T> serializerWithNullValueSupport = NullableSerializer.wrapIfNullIsNotSupported(originalSerializer); + * }}</pre> + * + * @param <T> type to serialize + */ +public class NullableSerializer<T> extends TypeSerializer<T> { + private static final long serialVersionUID = 3335569358214720033L; + private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; + + @Nonnull + private final TypeSerializer<T> originalSerializer; + private final byte[] padding; + + private NullableSerializer(@Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) { + this.originalSerializer = originalSerializer; + this.padding = createPadding(originalSerializer.getLength(), padNullValueIfFixedLen); + + } + + private static <T> byte[] createPadding(int originalSerializerLength, boolean padNullValueIfFixedLen) { + boolean padNullValue = originalSerializerLength > 0 && padNullValueIfFixedLen; + return padNullValue ? new byte[originalSerializerLength] : EMPTY_BYTE_ARRAY; + } + + /** + * This method tries to serialize {@code null} value with the {@code originalSerializer} + * and wraps it in case of {@link NullPointerException}, otherwise it returns the {@code originalSerializer}. + * + * @param originalSerializer serializer to wrap and add {@code null} support + * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer + * @return serializer which supports {@code null} values + */ + public static <T> TypeSerializer<T> wrapIfNullIsNotSupported( + @Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) { + return checkIfNullSupported(originalSerializer) ? + originalSerializer : wrap(originalSerializer, padNullValueIfFixedLen); + } + + /** + * This method checks if {@code serializer} supports {@code null} value. + * + * @param serializer serializer to check + */ + public static <T> boolean checkIfNullSupported(@Nonnull TypeSerializer<T> serializer) { + int length = serializer.getLength() > 0 ? serializer.getLength() : 1; + DataOutputSerializer dos = new DataOutputSerializer(length); + try { + serializer.serialize(null, dos); + } catch (IOException | RuntimeException e) { + return false; + } + Preconditions.checkArgument( + serializer.getLength() < 0 || serializer.getLength() == dos.getCopyOfBuffer().length, + "The serialized form of the null value should have the same length " + + "as any other if the length is fixed in the serializer"); + DataInputDeserializer dis = new DataInputDeserializer(dos.getSharedBuffer()); + try { + Preconditions.checkArgument(serializer.deserialize(dis) == null); + } catch (IOException e) { + throw new RuntimeException( + String.format("Unexpected failure to deserialize just serialized null value with %s", + serializer.getClass().getName()), e); + } + Preconditions.checkArgument( + serializer.copy(null) == null, + "Serializer %s has to be able properly copy null value if it can serialize it", + serializer.getClass().getName()); + return true; + } + + private boolean padNullValue() { + return padding.length > 0; + } + + /** + * This method wraps the {@code originalSerializer} with the {@code NullableSerializer} if not already wrapped. + * + * @param originalSerializer serializer to wrap and add {@code null} support + * @param padNullValueIfFixedLen pad null value to preserve the fixed length of original serializer + * @return wrapped serializer which supports {@code null} values + */ + public static <T> TypeSerializer<T> wrap( + @Nonnull TypeSerializer<T> originalSerializer, boolean padNullValueIfFixedLen) { + return originalSerializer instanceof NullableSerializer ? + originalSerializer : new NullableSerializer<>(originalSerializer, padNullValueIfFixedLen); + } + + @Override + public boolean isImmutableType() { + return originalSerializer.isImmutableType(); + } + + @Override + public TypeSerializer<T> duplicate() { + TypeSerializer<T> duplicateOriginalSerializer = originalSerializer.duplicate(); + return duplicateOriginalSerializer == originalSerializer ? + this : new NullableSerializer<>(originalSerializer.duplicate(), padNullValue()); + } + + @Override + public T createInstance() { + return originalSerializer.createInstance(); + } + + @Override + public T copy(T from) { + return from == null ? null : originalSerializer.copy(from); + } + + @Override + public T copy(T from, T reuse) { + return from == null ? null : + (reuse == null ? originalSerializer.copy(from) : originalSerializer.copy(from, reuse)); + } + + @Override + public int getLength() { + return padNullValue() ? 1 + padding.length : -1; + } + + @Override + public void serialize(T record, DataOutputView target) throws IOException { + if (record == null) { + target.writeBoolean(true); + target.write(padding); + } else { + target.writeBoolean(false); + originalSerializer.serialize(record, target); + } + } + + @Override + public T deserialize(DataInputView source) throws IOException { + boolean isNull = deserializeNull(source); + return isNull ? null : originalSerializer.deserialize(source); + } + + @Override + public T deserialize(T reuse, DataInputView source) throws IOException { + boolean isNull = deserializeNull(source); + return isNull ? null : (reuse == null ? + originalSerializer.deserialize(source) : originalSerializer.deserialize(reuse, source)); + } + + private boolean deserializeNull(DataInputView source) throws IOException { + boolean isNull = source.readBoolean(); + if (isNull) { + source.skipBytesToRead(padding.length); + } + return isNull; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + boolean isNull = source.readBoolean(); + target.writeBoolean(isNull); + if (isNull) { + target.write(padding); + } else { + originalSerializer.copy(source, target); + } + } + + @Override + public boolean equals(Object obj) { + return obj == this || + (obj != null && obj.getClass() == getClass() && + originalSerializer.equals(((NullableSerializer) obj).originalSerializer)); + } + + @Override + public boolean canEqual(Object obj) { + return (obj != null && obj.getClass() == getClass() && + originalSerializer.canEqual(((NullableSerializer) obj).originalSerializer)); + } + + @Override + public int hashCode() { + return originalSerializer.hashCode(); + } + + @Override + public NullableSerializerConfigSnapshot<T> snapshotConfiguration() { + return new NullableSerializerConfigSnapshot<>(originalSerializer); + } + + @Override + public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + if (configSnapshot instanceof NullableSerializerConfigSnapshot) { + List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousKvSerializersAndConfigs = + ((NullableSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs(); + + CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult( + previousKvSerializersAndConfigs.get(0).f0, + UnloadableDummyTypeSerializer.class, + previousKvSerializersAndConfigs.get(0).f1, + originalSerializer); + + if (!compatResult.isRequiresMigration()) { + return CompatibilityResult.compatible(); + } else if (compatResult.getConvertDeserializer() != null) { + return CompatibilityResult.requiresMigration( + new NullableSerializer<>( + new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()), padNullValue())); + } + } + + return CompatibilityResult.requiresMigration(); + } + + /** + * Configuration snapshot for serializers of nullable types, containing the + * configuration snapshot of its original serializer. + */ + @Internal + public static class NullableSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot { + private static final int VERSION = 1; + + /** This empty nullary constructor is required for deserializing the configuration. */ + @SuppressWarnings("unused") + public NullableSerializerConfigSnapshot() {} + + NullableSerializerConfigSnapshot(TypeSerializer<T> originalSerializer) { + super(originalSerializer); + } + + @Override + public int getVersion() { + return VERSION; + } + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index 57015c78be0..1997866fb3c 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.util.Arrays; +import org.apache.flink.api.java.typeutils.runtime.NullableSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; @@ -53,23 +54,23 @@ * internal state would be corrupt, which becomes evident when toString is called. */ public abstract class SerializerTestBase<T> extends TestLogger { - + protected abstract TypeSerializer<T> createSerializer(); /** * Gets the expected length for the serializer's {@link TypeSerializer#getLength()} method. - * + * * <p>The expected length should be positive, for fix-length data types, or {@code -1} for * variable-length types. */ protected abstract int getLength(); - + protected abstract Class<T> getTypeClass(); - + protected abstract T[] getTestData(); // -------------------------------------------------------------------------------------------- - + @Test public void testInstantiate() { try { @@ -80,13 +81,13 @@ public void testInstantiate() { } T instance = serializer.createInstance(); assertNotNull("The created instance must not be null.", instance); - + Class<T> type = getTypeClass(); assertNotNull("The test is corrupt: type class is null.", type); if (!type.isAssignableFrom(instance.getClass())) { fail("Type of the instantiated object is wrong. " + - "Expected Type: " + type + " present type " + instance.getClass()); + "Expected Type: " + type + " present type " + instance.getClass()); } } catch (Exception e) { @@ -127,7 +128,7 @@ public void testSnapshotConfigurationAndReconfigure() throws Exception { strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot()); assertTrue(strategy.isRequiresMigration()); } - + @Test public void testGetLength() { final int len = getLength(); @@ -146,16 +147,16 @@ public void testGetLength() { fail("Exception in test: " + e.getMessage()); } } - + @Test public void testCopy() { try { TypeSerializer<T> serializer = getSerializer(); T[] testData = getData(); - + for (T datum : testData) { T copy = serializer.copy(datum); - copy.toString(); + checkToString(copy); deepEquals("Copied element is not equal to the original element.", datum, copy); } } @@ -165,16 +166,16 @@ public void testCopy() { fail("Exception in test: " + e.getMessage()); } } - + @Test public void testCopyIntoNewElements() { try { TypeSerializer<T> serializer = getSerializer(); T[] testData = getData(); - + for (T datum : testData) { T copy = serializer.copy(datum, serializer.createInstance()); - copy.toString(); + checkToString(copy); deepEquals("Copied element is not equal to the original element.", datum, copy); } } @@ -184,18 +185,18 @@ public void testCopyIntoNewElements() { fail("Exception in test: " + e.getMessage()); } } - + @Test public void testCopyIntoReusedElements() { try { TypeSerializer<T> serializer = getSerializer(); T[] testData = getData(); - + T target = serializer.createInstance(); - + for (T datum : testData) { T copy = serializer.copy(datum, target); - copy.toString(); + checkToString(copy); deepEquals("Copied element is not equal to the original element.", datum, copy); target = copy; } @@ -206,25 +207,25 @@ public void testCopyIntoReusedElements() { fail("Exception in test: " + e.getMessage()); } } - + @Test public void testSerializeIndividually() { try { TypeSerializer<T> serializer = getSerializer(); T[] testData = getData(); - + for (T value : testData) { TestOutputView out = new TestOutputView(); serializer.serialize(value, out); TestInputView in = out.getInputView(); - + assertTrue("No data available during deserialization.", in.available() > 0); - + T deserialized = serializer.deserialize(serializer.createInstance(), in); - deserialized.toString(); + checkToString(deserialized); deepEquals("Deserialized value if wrong.", value, deserialized); - + assertTrue("Trailing data available after deserialization.", in.available() == 0); } } @@ -241,23 +242,23 @@ public void testSerializeIndividuallyReusingValues() { try { TypeSerializer<T> serializer = getSerializer(); T[] testData = getData(); - + T reuseValue = serializer.createInstance(); - + for (T value : testData) { TestOutputView out = new TestOutputView(); serializer.serialize(value, out); TestInputView in = out.getInputView(); - + assertTrue("No data available during deserialization.", in.available() > 0); - + T deserialized = serializer.deserialize(reuseValue, in); - deserialized.toString(); + checkToString(deserialized); deepEquals("Deserialized value if wrong.", value, deserialized); - + assertTrue("Trailing data available after deserialization.", in.available() == 0); - + reuseValue = deserialized; } } @@ -267,29 +268,29 @@ public void testSerializeIndividuallyReusingValues() { fail("Exception in test: " + e.getMessage()); } } - + @Test public void testSerializeAsSequenceNoReuse() { try { TypeSerializer<T> serializer = getSerializer(); T[] testData = getData(); - + TestOutputView out = new TestOutputView(); for (T value : testData) { serializer.serialize(value, out); } - + TestInputView in = out.getInputView(); - + int num = 0; while (in.available() > 0) { T deserialized = serializer.deserialize(in); - deserialized.toString(); + checkToString(deserialized); deepEquals("Deserialized value if wrong.", testData[num], deserialized); num++; } - + assertEquals("Wrong number of elements deserialized.", testData.length, num); } catch (Exception e) { @@ -298,31 +299,31 @@ public void testSerializeAsSequenceNoReuse() { fail("Exception in test: " + e.getMessage()); } } - + @Test public void testSerializeAsSequenceReusingValues() { try { TypeSerializer<T> serializer = getSerializer(); T[] testData = getData(); - + TestOutputView out = new TestOutputView(); for (T value : testData) { serializer.serialize(value, out); } - + TestInputView in = out.getInputView(); T reuseValue = serializer.createInstance(); - + int num = 0; while (in.available() > 0) { T deserialized = serializer.deserialize(reuseValue, in); - deserialized.toString(); + checkToString(deserialized); deepEquals("Deserialized value if wrong.", testData[num], deserialized); reuseValue = deserialized; num++; } - + assertEquals("Wrong number of elements deserialized.", testData.length, num); } catch (Exception e) { @@ -331,30 +332,30 @@ public void testSerializeAsSequenceReusingValues() { fail("Exception in test: " + e.getMessage()); } } - + @Test public void testSerializedCopyIndividually() { try { TypeSerializer<T> serializer = getSerializer(); T[] testData = getData(); - + for (T value : testData) { TestOutputView out = new TestOutputView(); serializer.serialize(value, out); - + TestInputView source = out.getInputView(); TestOutputView target = new TestOutputView(); serializer.copy(source, target); - + TestInputView toVerify = target.getInputView(); - + assertTrue("No data available copying.", toVerify.available() > 0); - + T deserialized = serializer.deserialize(serializer.createInstance(), toVerify); - deserialized.toString(); + checkToString(deserialized); deepEquals("Deserialized value if wrong.", value, deserialized); - + assertTrue("Trailing data available after deserialization.", toVerify.available() == 0); } } @@ -364,36 +365,36 @@ public void testSerializedCopyIndividually() { fail("Exception in test: " + e.getMessage()); } } - - + + @Test public void testSerializedCopyAsSequence() { try { TypeSerializer<T> serializer = getSerializer(); T[] testData = getData(); - + TestOutputView out = new TestOutputView(); for (T value : testData) { serializer.serialize(value, out); } - + TestInputView source = out.getInputView(); TestOutputView target = new TestOutputView(); for (int i = 0; i < testData.length; i++) { serializer.copy(source, target); } - + TestInputView toVerify = target.getInputView(); int num = 0; - + while (toVerify.available() > 0) { T deserialized = serializer.deserialize(serializer.createInstance(), toVerify); - deserialized.toString(); + checkToString(deserialized); deepEquals("Deserialized value if wrong.", testData[num], deserialized); num++; } - + assertEquals("Wrong number of elements copied.", testData.length, num); } catch (Exception e) { @@ -402,7 +403,7 @@ public void testSerializedCopyAsSequence() { fail("Exception in test: " + e.getMessage()); } } - + @Test public void testSerializabilityAndEquals() { try { @@ -414,7 +415,7 @@ public void testSerializabilityAndEquals() { fail("The serializer is not serializable: " + e); return; } - + assertEquals("The copy of the serializer is not equal to the original one.", ser1, ser2); } catch (Exception e) { @@ -423,10 +424,26 @@ public void testSerializabilityAndEquals() { fail("Exception in test: " + e.getMessage()); } } - + + @Test + public void testNullability() { + TypeSerializer<T> serializer = getSerializer(); + try { + NullableSerializer.checkIfNullSupported(serializer); + } catch (Throwable t) { + System.err.println(t.getMessage()); + t.printStackTrace(); + fail("Unexpected failure of null value handling: " + t.getMessage()); + } + } + // -------------------------------------------------------------------------------------------- - + protected void deepEquals(String message, T should, T is) { + Assert.assertTrue((should == null && is == null) || (should != null && is != null)); + if (should == null) { + return; + } if (should.getClass().isArray()) { if (should instanceof boolean[]) { Assert.assertTrue(message, Arrays.equals((boolean[]) should, (boolean[]) is)); @@ -463,9 +480,9 @@ else if (should instanceof Throwable) { assertEquals(message, should, is); } } - + // -------------------------------------------------------------------------------------------- - + protected TypeSerializer<T> getSerializer() { TypeSerializer<T> serializer = createSerializer(); if (serializer == null) { @@ -473,7 +490,7 @@ else if (should instanceof Throwable) { } return serializer; } - + private T[] getData() { T[] data = getTestData(); if (data == null) { @@ -481,15 +498,15 @@ else if (should instanceof Throwable) { } return data; } - + // -------------------------------------------------------------------------------------------- - + private static final class TestOutputView extends DataOutputStream implements DataOutputView { - + public TestOutputView() { super(new ByteArrayOutputStream(4096)); } - + public TestInputView getInputView() { ByteArrayOutputStream baos = (ByteArrayOutputStream) out; return new TestInputView(baos.toByteArray()); @@ -509,8 +526,8 @@ public void write(DataInputView source, int numBytes) throws IOException { write(buffer); } } - - + + private static final class TestInputView extends DataInputStream implements DataInputView { public TestInputView(byte[] data) { @@ -542,4 +559,10 @@ public int hashCode() { return getClass().hashCode(); } } + + private static <T> void checkToString(T value) { + if (value != null) { + value.toString(); + } + } } diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerTest.java new file mode 100644 index 00000000000..3bd176a8fbc --- /dev/null +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/NullableSerializerTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.java.typeutils.runtime; + +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.IntSerializer; +import org.apache.flink.api.common.typeutils.base.StringSerializer; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Unit tests for {@link NullableSerializer}. */ +@RunWith(Parameterized.class) +public class NullableSerializerTest extends SerializerTestBase<Integer> { + private static final TypeSerializer<Integer> originalSerializer = IntSerializer.INSTANCE; + + @Parameterized.Parameters(name = "{0}") + public static List<Boolean> whetherToPadNullValue() { + return Arrays.asList(true, false); + } + + @Parameterized.Parameter + public boolean padNullValue; + + private TypeSerializer<Integer> nullableSerializer; + + @Before + public void init() { + nullableSerializer = NullableSerializer.wrapIfNullIsNotSupported(originalSerializer, padNullValue); + } + + @Override + protected TypeSerializer<Integer> createSerializer() { + return NullableSerializer.wrapIfNullIsNotSupported(originalSerializer, padNullValue); + } + + @Override + protected int getLength() { + return padNullValue ? 5 : -1; + } + + @Override + protected Class<Integer> getTypeClass() { + return Integer.class; + } + + @Override + protected Integer[] getTestData() { + return new Integer[] { 5, -1, 0, null }; + } + + @Test + public void testWrappingNotNeeded() { + assertEquals(NullableSerializer.wrapIfNullIsNotSupported(StringSerializer.INSTANCE, padNullValue), StringSerializer.INSTANCE); + } + + @Test + public void testWrappingNeeded() { + assertTrue(nullableSerializer instanceof NullableSerializer); + assertEquals(NullableSerializer.wrapIfNullIsNotSupported(nullableSerializer, padNullValue), nullableSerializer); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java index 268f84aaa9b..3b0a99f782c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java @@ -78,6 +78,14 @@ } <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate( + SupplierWithException<TtlValue<V>, SE> getter, + ThrowingConsumer<TtlValue<V>, CE> updater, + ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE { + TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear); + return ttlValue == null ? null : ttlValue.getUserValue(); + } + + <SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate( SupplierWithException<TtlValue<V>, SE> getter, ThrowingConsumer<TtlValue<V>, CE> updater, ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE { @@ -92,6 +100,6 @@ } else if (updateTsOnRead) { updater.accept(rewrapWithNewTs(ttlValue)); } - return ttlValue.getUserValue(); + return ttlValue; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java index 160dbeb71e9..f84a2ee4820 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java @@ -54,7 +54,13 @@ @Override public UV get(UK key) throws Exception { - return getWithTtlCheckAndUpdate(() -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); + TtlValue<UV> ttlValue = getWrapped(key); + return ttlValue == null ? null : ttlValue.getUserValue(); + } + + private TtlValue<UV> getWrapped(UK key) throws Exception { + return getWrappedWithTtlCheckAndUpdate( + () -> original.get(key), v -> original.put(key, v), () -> original.remove(key)); } @Override @@ -83,7 +89,8 @@ public void remove(UK key) throws Exception { @Override public boolean contains(UK key) throws Exception { - return get(key) != null; + TtlValue<UV> ttlValue = getWrapped(key); + return ttlValue != null; } @Override @@ -161,16 +168,16 @@ public void remove() { } private Map.Entry<UK, UV> getUnexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) { - UV unexpiredValue; + TtlValue<UV> unexpiredValue; try { - unexpiredValue = getWithTtlCheckAndUpdate( + unexpiredValue = getWrappedWithTtlCheckAndUpdate( e::getValue, v -> original.put(e.getKey(), v), originalIterator::remove); } catch (Exception ex) { throw new FlinkRuntimeException(ex); } - return unexpiredValue == null ? null : new AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue); + return unexpiredValue == null ? null : new AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue.getUserValue()); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java index 773fe7c4474..be231c63428 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java @@ -44,6 +44,6 @@ private static long getExpirationTimestamp(long ts, long ttl) { } static <V> TtlValue<V> wrapWithTs(V value, long ts) { - return value == null ? null : new TtlValue<>(value, ts); + return new TtlValue<>(value, ts); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java index a8bcadf9d19..48435d567e8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.state.ttl; -import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; import java.io.Serializable; @@ -30,15 +30,16 @@ class TtlValue<T> implements Serializable { private static final long serialVersionUID = 5221129704201125020L; + @Nullable private final T userValue; private final long lastAccessTimestamp; - TtlValue(T userValue, long lastAccessTimestamp) { - Preconditions.checkNotNull(userValue); + TtlValue(@Nullable T userValue, long lastAccessTimestamp) { this.userValue = userValue; this.lastAccessTimestamp = lastAccessTimestamp; } + @Nullable T getUserValue() { return userValue; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java index 7fd61aaeb29..7294b4aa747 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -35,9 +36,9 @@ void initTestValues() { emptyValue = Collections.emptySet(); - updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10")); - updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7")); - updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4")); + updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(23, null), Tuple2.of(10, "10")); + updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(24, null), Tuple2.of(7, "7")); + updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(25, null), Tuple2.of(4, "4")); getUpdateEmpty = updateEmpty.entrySet(); getUnexpired = updateUnexpired.entrySet(); @@ -46,7 +47,9 @@ void initTestValues() { @SafeVarargs private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) { - return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1)); + Map<UK, UV> map = new HashMap<>(); + Arrays.stream(entries).forEach(t -> map.put(t.f0, t.f1)); + return map; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java index fb025afd9ad..a77c8ed958c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java @@ -43,7 +43,10 @@ void update(String value) throws Exception { @Override String get() throws Exception { - return ttlState.get(TEST_KEY); + String value = ttlState.get(TEST_KEY); + assert (getOriginal() == null && !ttlState.contains(TEST_KEY)) || + (getOriginal() != null && ttlState.contains(TEST_KEY)); + return value; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerNullElementTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerNullElementTestContext.java new file mode 100644 index 00000000000..6f8c70bcb63 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerNullElementTestContext.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.ttl; + +class TtlMapStatePerNullElementTestContext extends TtlMapStatePerElementTestContext { + @Override + void initTestValues() { + updateEmpty = null; + updateUnexpired = null; + updateExpired = null; + + getUpdateEmpty = null; + getUnexpired = null; + getUpdateExpired = null; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java index 6b3a15fdd30..f9f108a3a12 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java @@ -69,6 +69,7 @@ public void setup() { new TtlListStateTestContext(), new TtlMapStateAllEntriesTestContext(), new TtlMapStatePerElementTestContext(), + new TtlMapStatePerNullElementTestContext(), new TtlAggregatingStateTestContext(), new TtlReducingStateTestContext(), new TtlFoldingStateTestContext()); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services