asfgit closed pull request #7061: [FLINK-10827][tests] Add test for duplicate() to SerializerTestBase URL: https://github.com/apache/flink/pull/7061
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index a4d3839f66f..f1dd8fcbde9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -18,29 +18,17 @@ package org.apache.flink.api.java.typeutils.runtime; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.lang.reflect.Field; -import java.lang.reflect.Modifier; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Objects; - import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.GenericTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; import org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil; import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TypeExtractor; @@ -52,6 +40,18 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.Preconditions; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Objects; + import static org.apache.flink.util.Preconditions.checkNotNull; @Internal @@ -181,11 +181,13 @@ public boolean isImmutableType() { } } - if (stateful) { - return new PojoSerializer<T>(clazz, duplicateFieldSerializers, fields, executionConfig); - } else { - return this; + if (!stateful) { + // as a small memory optimization, we can share the same object between instances + duplicateFieldSerializers = fieldSerializers; } + + // we must create a new instance, otherwise the subclassSerializerCache can create concurrency problems + return new PojoSerializer<>(clazz, duplicateFieldSerializers, fields, executionConfig); } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java index a4908f95e90..77099ef1cc9 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/SerializerTestBase.java @@ -18,32 +18,39 @@ package org.apache.flink.api.common.typeutils; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Arrays; - import org.apache.flink.api.java.typeutils.runtime.NullableSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.apache.commons.lang3.SerializationException; import org.apache.commons.lang3.SerializationUtils; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; +import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CyclicBarrier; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Abstract test base for serializers. * @@ -436,6 +443,34 @@ public void testNullability() { } } + @Test + public void testDuplicate() throws Exception { + final int numThreads = 10; + final TypeSerializer<T> serializer = getSerializer(); + final CyclicBarrier startLatch = new CyclicBarrier(numThreads); + final List<SerializerRunner<T>> concurrentRunners = new ArrayList<>(numThreads); + Assert.assertEquals(serializer, serializer.duplicate()); + + T[] testData = getData(); + final int testDataIterations = Math.max(1, 250 / testData.length); + + for (int i = 0; i < numThreads; ++i) { + SerializerRunner<T> runner = new SerializerRunner<>( + startLatch, + serializer.duplicate(), + testData, + testDataIterations); + + runner.start(); + concurrentRunners.add(runner); + } + + for (SerializerRunner<T> concurrentRunner : concurrentRunners) { + concurrentRunner.join(); + concurrentRunner.checkResult(); + } + } + // -------------------------------------------------------------------------------------------- protected void deepEquals(String message, T should, T is) { @@ -526,6 +561,56 @@ public void write(DataInputView source, int numBytes) throws IOException { } } + /** + * Runner to test serializer duplication via concurrency. + * @param <T> type of the test elements. + */ + static class SerializerRunner<T> extends Thread { + final CyclicBarrier allReadyBarrier; + final TypeSerializer<T> serializer; + final T[] testData; + final int iterations; + Exception failure; + + SerializerRunner(CyclicBarrier allReadyBarrier, TypeSerializer<T> serializer, T[] testData, int iterations) { + this.allReadyBarrier = allReadyBarrier; + this.serializer = serializer; + this.testData = testData; + this.iterations = iterations; + this.failure = null; + } + + @Override + public void run() { + DataInputDeserializer dataInputDeserializer = new DataInputDeserializer(); + DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(128); + try { + allReadyBarrier.await(); + for (int repeat = 0; repeat < iterations; ++repeat) { + for (T testItem : testData) { + serializer.serialize(testItem, dataOutputSerializer); + dataInputDeserializer.setBuffer( + dataOutputSerializer.getSharedBuffer(), + 0, + dataOutputSerializer.length()); + T serdeTestItem = serializer.deserialize(dataInputDeserializer); + T copySerdeTestItem = serializer.copy(serdeTestItem); + dataOutputSerializer.clear(); + Preconditions.checkState(Objects.equals(testItem, copySerdeTestItem), + "Serialization/Deserialization cycle resulted in an object that are not equal to the original."); + } + } + } catch (Exception ex) { + failure = ex; + } + } + + void checkResult() throws Exception { + if (failure != null) { + throw failure; + } + } + } private static final class TestInputView extends DataInputStream implements DataInputView { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services