[ https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16419114#comment-16419114 ]
ASF GitHub Bot commented on FLINK-8802: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5691#discussion_r177690430 --- Diff: flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java --- @@ -124,10 +134,56 @@ public void testListSerialization() throws Exception { .createListState(VoidNamespaceSerializer.INSTANCE, new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); - KvStateRequestSerializerTest.testListSerialization(key, listState); + testListSerialization(key, (RocksDBListState<Long, VoidNamespace, Long>) listState); longHeapKeyedStateBackend.dispose(); } + /** + * Verifies that the serialization of a list using the given list state + * matches the deserialization with {@link KvStateSerializer#deserializeList}. + * + * @param key + * key of the list state + * @param listState + * list state using the {@link VoidNamespace}, must also be a {@link RocksDBListState} instance + * + * @throws Exception + */ + private void testListSerialization( + final long key, + final RocksDBListState<Long, VoidNamespace, Long> listState) throws Exception { + + TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE; + listState.setCurrentNamespace(VoidNamespace.INSTANCE); + + // List + final int numElements = 10; + + final List<Long> expectedValues = new ArrayList<>(); + for (int i = 0; i < numElements; i++) { + final long value = ThreadLocalRandom.current().nextLong(); + expectedValues.add(value); + listState.add(value); + } + + final byte[] serializedKey = + KvStateSerializer.serializeKeyAndNamespace( + key, LongSerializer.INSTANCE, + VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE); + + final byte[] serializedValues = listState.getSerializedValue(serializedKey); + + List<Long> actualValues = KvStateSerializer.deserializeList(serializedValues, valueSerializer); + assertEquals(expectedValues, actualValues); + + // Single value + long expectedValue = ThreadLocalRandom.current().nextLong(); --- End diff -- nit: why is this using `ThreadLocalRandom`? > Concurrent serialization without duplicating serializers in state server. > ------------------------------------------------------------------------- > > Key: FLINK-8802 > URL: https://issues.apache.org/jira/browse/FLINK-8802 > Project: Flink > Issue Type: Bug > Components: Queryable State > Affects Versions: 1.5.0 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Priority: Blocker > Fix For: 1.5.0 > > > The `getSerializedValue()` may be called by multiple threads but serializers > are not duplicated, which may lead to exceptions thrown when a serializer is > stateful. -- This message was sent by Atlassian JIRA (v7.6.3#76005)