[ https://issues.apache.org/jira/browse/FLINK-7938?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326911#comment-16326911 ]
ASF GitHub Bot commented on FLINK-7938: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5281#discussion_r161694419 --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java --- @@ -167,24 +167,48 @@ public void update(List<V> values) throws Exception { try { writeCurrentKeyWithGroupAndNamespace(); byte[] key = keySerializationStream.toByteArray(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - List<byte[]> bytes = new ArrayList<>(values.size()); - for (V value : values) { - keySerializationStream.reset(); - valueSerializer.serialize(value, out); - bytes.add(keySerializationStream.toByteArray()); + byte[] premerge = getPreMergedValue(values); + if (premerge != null) { + backend.db.put(columnFamily, writeOptions, key, premerge); + } else { + throw new IOException("Failed pre-merge values in update()"); } + } catch (IOException | RocksDBException e) { + throw new RuntimeException("Error while updating data to RocksDB", e); + } + } + } + + @Override + public void addAll(List<V> values) throws Exception { + if (values != null && !values.isEmpty()) { + try { + writeCurrentKeyWithGroupAndNamespace(); + byte[] key = keySerializationStream.toByteArray(); - byte[] premerge = MergeUtils.merge(bytes); + byte[] premerge = getPreMergedValue(values); if (premerge != null) { - backend.db.put(columnFamily, writeOptions, key, premerge); + backend.db.merge(columnFamily, writeOptions, key, premerge); } else { - throw new IOException("Failed pre-merge values"); + throw new IOException("Failed pre-merge values in addAll()"); } } catch (IOException | RocksDBException e) { throw new RuntimeException("Error while updating data to RocksDB", e); } } } + + private byte[] getPreMergedValue(List<V> values) throws IOException { + DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); + + List<byte[]> bytes = new ArrayList<>(values.size()); --- End diff -- Why not just serialize all objects in the stream and write the RocksDB separator byte between all object bytes? This could improve performance (less temporary copies and objects) and might be equally or more readable? From a performance point of view, even `#toByteArray()` results in an unnecessary copy - we could use the internal array, offset + len for our insert to RocksDB. What do you think? > support addAll() in ListState > ----------------------------- > > Key: FLINK-7938 > URL: https://issues.apache.org/jira/browse/FLINK-7938 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Reporter: Bowen Li > Assignee: Bowen Li > Priority: Major > Fix For: 1.5.0 > > > support {{addAll()}} in {{ListState}}, so Flink can be more efficient in > adding elements to {{ListState}} in batch. This should give us a much better > performance especially for {{ListState}} backed by RocksDB -- This message was sent by Atlassian JIRA (v7.6.3#76005)