[ 
https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327907#comment-16327907
 ] 

ASF GitHub Bot commented on FLINK-7938:
---------------------------------------

Github user bowenli86 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5281#discussion_r161905498
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 ---
    @@ -167,24 +167,48 @@ public void update(List<V> values) throws Exception {
                        try {
                                writeCurrentKeyWithGroupAndNamespace();
                                byte[] key = 
keySerializationStream.toByteArray();
    -                           DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
     
    -                           List<byte[]> bytes = new 
ArrayList<>(values.size());
    -                           for (V value : values) {
    -                                   keySerializationStream.reset();
    -                                   valueSerializer.serialize(value, out);
    -                                   
bytes.add(keySerializationStream.toByteArray());
    +                           byte[] premerge = getPreMergedValue(values);
    +                           if (premerge != null) {
    +                                   backend.db.put(columnFamily, 
writeOptions, key, premerge);
    +                           } else {
    +                                   throw new IOException("Failed pre-merge 
values in update()");
                                }
    +                   } catch (IOException | RocksDBException e) {
    +                           throw new RuntimeException("Error while 
updating data to RocksDB", e);
    +                   }
    +           }
    +   }
    +
    +   @Override
    +   public void addAll(List<V> values) throws Exception {
    +           if (values != null && !values.isEmpty()) {
    +                   try {
    +                           writeCurrentKeyWithGroupAndNamespace();
    +                           byte[] key = 
keySerializationStream.toByteArray();
     
    -                           byte[] premerge = MergeUtils.merge(bytes);
    +                           byte[] premerge = getPreMergedValue(values);
                                if (premerge != null) {
    -                                   backend.db.put(columnFamily, 
writeOptions, key, premerge);
    +                                   backend.db.merge(columnFamily, 
writeOptions, key, premerge);
                                } else {
    -                                   throw new IOException("Failed pre-merge 
values");
    +                                   throw new IOException("Failed pre-merge 
values in addAll()");
                                }
                        } catch (IOException | RocksDBException e) {
                                throw new RuntimeException("Error while 
updating data to RocksDB", e);
                        }
                }
        }
    +
    +   private byte[] getPreMergedValue(List<V> values) throws IOException {
    +           DataOutputViewStreamWrapper out = new 
DataOutputViewStreamWrapper(keySerializationStream);
    +
    +           List<byte[]> bytes = new ArrayList<>(values.size());
    --- End diff --
    
    @StefanRRichter  I actually tried it before, but it didn't work out very 
well. I can give it another try.
    
    I don't think this PR should be addressing this issue, because that code is 
already there and this PR only move it to its own method. Besides, it will be 
great if we can get the new API in before I announce it in our Flink meetup at 
Seattle's Wednesday evening :) (Thanks in advance if that may take extra work 
from your end!)
    
    I opened FLINK-8441 and I'll be working on it shortly after.


> support addAll() in ListState
> -----------------------------
>
>                 Key: FLINK-7938
>                 URL: https://issues.apache.org/jira/browse/FLINK-7938
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>            Reporter: Bowen Li
>            Assignee: Bowen Li
>            Priority: Major
>             Fix For: 1.5.0
>
>
> support {{addAll()}} in {{ListState}}, so Flink can be more efficient in 
> adding elements to {{ListState}} in batch. This should give us a much better 
> performance especially for {{ListState}} backed by RocksDB



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to