[ https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15922909#comment-15922909 ]
Stephan Ewen edited comment on FLINK-5756 at 3/13/17 8:52 PM: -------------------------------------------------------------- Thanks for opening this and sharing the test results. I agree that the performance of RocksDB is not optimal and that we would like to get better performance out of the state backend. In general, RocksDB is heavily optimized for writes and for small values. Larger values (as you get with the merge) perform very bad. Here are a few things we can do and have already started doing: *Improve the other state backends* - We are currently making the in-memory state backend (object data) much stronger, with async snapshots (see FLINK-5715 ) - It makes sense to eventually build an own state backend that operators on serialized data with managed memory *Optimize the RocksDB State Backend* - We can try an avoid RocksDB's merge operation and instead use range iterators for ListState. - Quick benchmark of the same task in that approach gives *91ms* insert time and *35ms* {{get()}} time. That looks like worth exploring. *Code for range-iterator mini-benchmark* {code} final File rocksDir = new File("/tmp/rdb"); FileUtils.deleteDirectory(rocksDir); final Options options = new Options() .setCompactionStyle(CompactionStyle.LEVEL) .setLevelCompactionDynamicLevelBytes(true) .setIncreaseParallelism(4) .setUseFsync(false) .setMaxOpenFiles(-1) .setAllowOsBuffer(true) .setDisableDataSync(true) .setCreateIfMissing(true) .setMergeOperator(new StringAppendOperator()); final WriteOptions write_options = new WriteOptions() .setSync(false) .setDisableWAL(true); final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); final String key = "key"; final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); final Unsafe unsafe = MemoryUtils.UNSAFE; final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; final int num = 50000; System.out.println("begin insert"); final long beginInsert = System.nanoTime(); for (int i = 0; i < num; i++) { unsafe.putInt(keyTemplate, offset, i); rocksDB.put(write_options, keyTemplate, valueBytes); } final long endInsert = System.nanoTime(); System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); final byte[] resultHolder = new byte[num * valueBytes.length]; final long beginGet = System.nanoTime(); final RocksIterator iterator = rocksDB.newIterator(); int pos = 0; // seek to start unsafe.putInt(keyTemplate, offset, 0); iterator.seek(keyTemplate); // mark end unsafe.putInt(keyTemplate, offset, -1); // iterate while (iterator.isValid()) { byte[] currKey = iterator.key(); if (sameKey(keyBytes, currKey)) { byte[] currValue = iterator.value(); System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); pos += currValue.length; iterator.next(); } else { break; } } final long endGet = System.nanoTime(); System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms"); {code} was (Author: stephanewen): Thanks for opening this and sharing the test results. I agree that the performance of RocksDB is not optimal and that we would like to get better performance out of the state backend. In general, RocksDB is heavily optimized for writes and for small values. Larger values (as you get with the merge) perform very bad. Here are a few things we can do and have already started doing: *Improve the other state backends* - We are currently making the in-memory state backend (object data) much stronger, with async snapshots (see FLINK-5715 ) - It makes sense to eventually build an own state backend that operators on serialized data with managed memory *Optimize the RocksDB State Backend* - We can try an avoid RocksDB's merge operation and instead use range iterators for ListState. - Quick benchmark of the same task in that approach gives *91ms* insert time and *35ms* get() time. That looks like worth exploring. *Code for range-iterator mini-benchmark* {code} final File rocksDir = new File("/tmp/rdb"); FileUtils.deleteDirectory(rocksDir); final Options options = new Options() .setCompactionStyle(CompactionStyle.LEVEL) .setLevelCompactionDynamicLevelBytes(true) .setIncreaseParallelism(4) .setUseFsync(false) .setMaxOpenFiles(-1) .setAllowOsBuffer(true) .setDisableDataSync(true) .setCreateIfMissing(true) .setMergeOperator(new StringAppendOperator()); final WriteOptions write_options = new WriteOptions() .setSync(false) .setDisableWAL(true); final RocksDB rocksDB = RocksDB.open(options, rocksDir.getAbsolutePath()); final String key = "key"; final String value = "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"; final byte[] keyBytes = key.getBytes(StandardCharsets.UTF_8); final byte[] valueBytes = value.getBytes(StandardCharsets.UTF_8); final byte[] keyTemplate = Arrays.copyOf(keyBytes, keyBytes.length + 4); final Unsafe unsafe = MemoryUtils.UNSAFE; final long offset = unsafe.arrayBaseOffset(byte[].class) + keyTemplate.length - 4; final int num = 50000; System.out.println("begin insert"); final long beginInsert = System.nanoTime(); for (int i = 0; i < num; i++) { unsafe.putInt(keyTemplate, offset, i); rocksDB.put(write_options, keyTemplate, valueBytes); } final long endInsert = System.nanoTime(); System.out.println("end insert - duration: " + ((endInsert - beginInsert) / 1_000_000) + " ms"); final byte[] resultHolder = new byte[num * valueBytes.length]; final long beginGet = System.nanoTime(); final RocksIterator iterator = rocksDB.newIterator(); int pos = 0; // seek to start unsafe.putInt(keyTemplate, offset, 0); iterator.seek(keyTemplate); // mark end unsafe.putInt(keyTemplate, offset, -1); // iterate while (iterator.isValid()) { byte[] currKey = iterator.key(); if (sameKey(keyBytes, currKey)) { byte[] currValue = iterator.value(); System.arraycopy(currValue, 0, resultHolder, pos, currValue.length); pos += currValue.length; iterator.next(); } else { break; } } final long endGet = System.nanoTime(); System.out.println("end get - duration: " + ((endGet - beginGet) / 1_000_000) + " ms"); {code} > When there are many values under the same key in ListState, > RocksDBStateBackend performances poor > ------------------------------------------------------------------------------------------------- > > Key: FLINK-5756 > URL: https://issues.apache.org/jira/browse/FLINK-5756 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.2.0 > Environment: CentOS 7.2 > Reporter: Syinchwun Leo > > When using RocksDB as the StateBackend, if there are many values under the > same key in ListState, the windowState.get() operator performances very poor. > I also the the RocksDB using version 4.11.2, the performance is also very > poor. The problem is likely to related to RocksDB itself's get() operator > after using merge(). The problem may influences the window operation's > performance when the size is very large using ListState. I try to merge 50000 > values under the same key in RocksDB, It costs 120 seconds to execute get() > operation. > /////////////////////////////////////////////////////////////////////////////// > The flink's code is as follows: > {code} > class SEventSource extends RichSourceFunction [SEvent] { > private var count = 0L > private val alphabet = > "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" > override def run(sourceContext: SourceContext[SEvent]): Unit = { > while (true) { > for (i <- 0 until 5000) { > sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) > count += 1L > } > Thread.sleep(1000) > } > } > } > env.addSource(new SEventSource) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[SEvent] { > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis()) > } > override def extractTimestamp(t: SEvent, l: Long): Long = { > System.currentTimeMillis() > } > }) > .keyBy(0) > .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2))) > .apply(new WindowStatistic) > .map(x => (System.currentTimeMillis(), x)) > .print() > {code} > //////////////////////////////////// > The RocksDB Test code: > {code} > val stringAppendOperator = new StringAppendOperator > val options = new Options() > options.setCompactionStyle(CompactionStyle.LEVEL) > .setCompressionType(CompressionType.SNAPPY_COMPRESSION) > .setLevelCompactionDynamicLevelBytes(true) > .setIncreaseParallelism(4) > .setUseFsync(true) > .setMaxOpenFiles(-1) > .setCreateIfMissing(true) > .setMergeOperator(stringAppendOperator) > val write_options = new WriteOptions > write_options.setSync(false) > val rocksDB = RocksDB.open(options, "/******/Data/") > val key = "key" > val value = > "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" > val beginmerge = System.currentTimeMillis() > for(i <- 0 to 50000) { > rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes()) > //rocksDB.put(key.getBytes, value.getBytes) > } > println("finish") > val begin = System.currentTimeMillis() > rocksDB.get(key.getBytes) > val end = System.currentTimeMillis() > println("merge cost:" + (begin - beginmerge)) > println("Time consuming:" + (end - begin)) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)