[ 
https://issues.apache.org/jira/browse/FLINK-8802?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16419114#comment-16419114
 ] 

ASF GitHub Bot commented on FLINK-8802:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5691#discussion_r177690430
  
    --- Diff: 
flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
 ---
    @@ -124,10 +134,56 @@ public void testListSerialization() throws Exception {
                        .createListState(VoidNamespaceSerializer.INSTANCE,
                                new ListStateDescriptor<>("test", 
LongSerializer.INSTANCE));
     
    -           KvStateRequestSerializerTest.testListSerialization(key, 
listState);
    +           testListSerialization(key, (RocksDBListState<Long, 
VoidNamespace, Long>) listState);
                longHeapKeyedStateBackend.dispose();
        }
     
    +   /**
    +    * Verifies that the serialization of a list using the given list state
    +    * matches the deserialization with {@link 
KvStateSerializer#deserializeList}.
    +    *
    +    * @param key
    +    *              key of the list state
    +    * @param listState
    +    *              list state using the {@link VoidNamespace}, must also 
be a {@link RocksDBListState} instance
    +    *
    +    * @throws Exception
    +    */
    +   private void testListSerialization(
    +                   final long key,
    +                   final RocksDBListState<Long, VoidNamespace, Long> 
listState) throws Exception {
    +
    +           TypeSerializer<Long> valueSerializer = LongSerializer.INSTANCE;
    +           listState.setCurrentNamespace(VoidNamespace.INSTANCE);
    +
    +           // List
    +           final int numElements = 10;
    +
    +           final List<Long> expectedValues = new ArrayList<>();
    +           for (int i = 0; i < numElements; i++) {
    +                   final long value = 
ThreadLocalRandom.current().nextLong();
    +                   expectedValues.add(value);
    +                   listState.add(value);
    +           }
    +
    +           final byte[] serializedKey =
    +                           KvStateSerializer.serializeKeyAndNamespace(
    +                                           key, LongSerializer.INSTANCE,
    +                                           VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE);
    +
    +           final byte[] serializedValues = 
listState.getSerializedValue(serializedKey);
    +
    +           List<Long> actualValues = 
KvStateSerializer.deserializeList(serializedValues, valueSerializer);
    +           assertEquals(expectedValues, actualValues);
    +
    +           // Single value
    +           long expectedValue = ThreadLocalRandom.current().nextLong();
    --- End diff --
    
    nit: why is this using `ThreadLocalRandom`?


> Concurrent serialization without duplicating serializers in state server.
> -------------------------------------------------------------------------
>
>                 Key: FLINK-8802
>                 URL: https://issues.apache.org/jira/browse/FLINK-8802
>             Project: Flink
>          Issue Type: Bug
>          Components: Queryable State
>    Affects Versions: 1.5.0
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>            Priority: Blocker
>             Fix For: 1.5.0
>
>
> The `getSerializedValue()` may be called by multiple threads but serializers 
> are not duplicated, which may lead to exceptions thrown when a serializer is 
> stateful.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to