[ https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398267#comment-16398267 ]
ASF GitHub Bot commented on FLINK-8802: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r174385834 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateRegistryTest.java --- @@ -40,6 +54,81 @@ */ public class KvStateRegistryTest extends TestLogger { + @Test + public void testKvStateEntry() throws InterruptedException { + final int threads = 10; + + final CountDownLatch latch1 = new CountDownLatch(threads); + final CountDownLatch latch2 = new CountDownLatch(1); + + final List<KvStateInfo<?, ?, ?>> infos = Collections.synchronizedList(new ArrayList<>()); + + final JobID jobID = new JobID(); + + final JobVertexID jobVertexId = new JobVertexID(); + final KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1); + final String registrationName = "foobar"; + + final KvStateRegistry kvStateRegistry = new KvStateRegistry(); + final KvStateID stateID = kvStateRegistry.registerKvState( + jobID, + jobVertexId, + keyGroupRange, + registrationName, + new DummyKvState() + ); + + for (int i = 0; i < threads; i++) { + new Thread(() -> { + final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID); + final KvStateInfo<?, ?, ?> stateInfo = kvState.getInfoForCurrentThread(); + infos.add(stateInfo); + + latch1.countDown(); + try { + latch2.await(); + } catch (InterruptedException e) { + Assert.fail(e.getMessage()); + } + + }).start(); + } + + latch1.await(); + + final KvStateEntry<?, ?, ?> kvState = kvStateRegistry.getKvState(stateID); + + // verify that all the threads are done correctly. + Assert.assertEquals(threads, infos.size()); + Assert.assertEquals(threads, kvState.getCacheSize()); + + latch2.countDown(); + + for (KvStateInfo<?, ?, ?> infoA: infos) { + boolean found = false; + for (KvStateInfo<?, ?, ?> infoB: infos) { + if (infoA == infoB) { + if (found) { --- End diff -- `found` needs a better name or a comment what it means > Concurrent serialization without duplicating serializers in state server. > ------------------------------------------------------------------------- > > Key: FLINK-8802 > URL: https://issues.apache.org/jira/browse/FLINK-8802 > Project: Flink > Issue Type: Bug > Components: Queryable State > Affects Versions: 1.5.0 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Priority: Blocker > Fix For: 1.5.0 > > > The `getSerializedValue()` may be called by multiple threads but serializers > are not duplicated, which may lead to exceptions thrown when a serializer is > stateful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)