[ https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16331289#comment-16331289 ]
ASF GitHub Bot commented on FLINK-7938: --------------------------------------- Github user bowenli86 commented on a diff in the pull request: https://github.com/apache/flink/pull/5281#discussion_r162479898 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java --- @@ -167,24 +167,48 @@ public void update(List<V> values) throws Exception { try { writeCurrentKeyWithGroupAndNamespace(); byte[] key = keySerializationStream.toByteArray(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - List<byte[]> bytes = new ArrayList<>(values.size()); - for (V value : values) { - keySerializationStream.reset(); - valueSerializer.serialize(value, out); - bytes.add(keySerializationStream.toByteArray()); + byte[] premerge = getPreMergedValue(values); + if (premerge != null) { + backend.db.put(columnFamily, writeOptions, key, premerge); + } else { + throw new IOException("Failed pre-merge values in update()"); } + } catch (IOException | RocksDBException e) { + throw new RuntimeException("Error while updating data to RocksDB", e); + } + } + } + + @Override + public void addAll(List<V> values) throws Exception { + if (values != null && !values.isEmpty()) { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); - byte[] premerge = MergeUtils.merge(bytes); + byte[] premerge = getPreMergedValue(values); if (premerge != null) { - backend.db.put(columnFamily, writeOptions, key, premerge); + backend.db.merge(columnFamily, writeOptions, key, premerge); } else { - throw new IOException("Failed pre-merge values"); + throw new IOException("Failed pre-merge values in addAll()"); } } catch (IOException | RocksDBException e) { throw new RuntimeException("Error while updating data to RocksDB", e); } } } + + private byte[] getPreMergedValue(List<V> values) throws IOException { + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); + + List<byte[]> bytes = new ArrayList<>(values.size()); --- End diff -- That'll be great, let's get this in. I will dive into FLINK-8441 in a couple days. Thanks! > support addAll() in ListState > ----------------------------- > > Key: FLINK-7938 > URL: https://issues.apache.org/jira/browse/FLINK-7938 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Reporter: Bowen Li > Assignee: Bowen Li > Priority: Major > Fix For: 1.5.0 > > > support {{addAll()}} in {{ListState}}, so Flink can be more efficient in > adding elements to {{ListState}} in batch. This should give us a much better > performance especially for {{ListState}} backed by RocksDB -- This message was sent by Atlassian JIRA (v7.6.3#76005)