[ https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16327907#comment-16327907 ]
ASF GitHub Bot commented on FLINK-7938: --------------------------------------- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5281#discussion_r161905498 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java --- @@ -167,24 +167,48 @@ public void update(List<V> values) throws Exception { try { writeCurrentKeyWithGroupAndNamespace(); byte[] key = keySerializationStream.toByteArray(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - List<byte[]> bytes = new ArrayList<>(values.size()); - for (V value : values) { - keySerializationStream.reset(); - valueSerializer.serialize(value, out); - bytes.add(keySerializationStream.toByteArray()); + byte[] premerge = getPreMergedValue(values); + if (premerge != null) { + backend.db.put(columnFamily, writeOptions, key, premerge); + } else { + throw new IOException("Failed pre-merge values in update()"); } + } catch (IOException | RocksDBException e) { + throw new RuntimeException("Error while updating data to RocksDB", e); + } + } + } + + @Override + public void addAll(List<V> values) throws Exception { + if (values != null && !values.isEmpty()) { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); - byte[] premerge = MergeUtils.merge(bytes); + byte[] premerge = getPreMergedValue(values); if (premerge != null) { - backend.db.put(columnFamily, writeOptions, key, premerge); + backend.db.merge(columnFamily, writeOptions, key, premerge); } else { - throw new IOException("Failed pre-merge values"); + throw new IOException("Failed pre-merge values in addAll()"); } } catch (IOException | RocksDBException e) { throw new RuntimeException("Error while updating data to RocksDB", e); } } } + + private byte[] getPreMergedValue(List<V> values) throws IOException { + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); + + List<byte[]> bytes = new ArrayList<>(values.size()); --- End diff -- @StefanRRichter I actually tried it before, but it didn't work out very well. I can give it another try. I don't think this PR should be addressing this issue, because that code is already there and this PR only move it to its own method. Besides, it will be great if we can get the new API in before I announce it in our Flink meetup at Seattle's Wednesday evening :) (Thanks in advance if that may take extra work from your end!) I opened FLINK-8441 and I'll be working on it shortly after. > support addAll() in ListState > ----------------------------- > > Key: FLINK-7938 > URL: https://issues.apache.org/jira/browse/FLINK-7938 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Reporter: Bowen Li > Assignee: Bowen Li > Priority: Major > Fix For: 1.5.0 > > > support {{addAll()}} in {{ListState}}, so Flink can be more efficient in > adding elements to {{ListState}} in batch. This should give us a much better > performance especially for {{ListState}} backed by RocksDB -- This message was sent by Atlassian JIRA (v7.6.3#76005)