[ https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962357#comment-15962357 ]
ASF GitHub Bot commented on FLINK-5756: --------------------------------------- Github user SyinChwunLeo commented on the issue: https://github.com/apache/flink/pull/3704 OK, FRocksDB is only available in master branch or we can also use it in flink-1.2.0? > When there are many values under the same key in ListState, > RocksDBStateBackend performances poor > ------------------------------------------------------------------------------------------------- > > Key: FLINK-5756 > URL: https://issues.apache.org/jira/browse/FLINK-5756 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.2.0 > Environment: CentOS 7.2 > Reporter: Syinchwun Leo > > When using RocksDB as the StateBackend, if there are many values under the > same key in ListState, the windowState.get() operator performances very poor. > I also the the RocksDB using version 4.11.2, the performance is also very > poor. The problem is likely to related to RocksDB itself's get() operator > after using merge(). The problem may influences the window operation's > performance when the size is very large using ListState. I try to merge 50000 > values under the same key in RocksDB, It costs 120 seconds to execute get() > operation. > /////////////////////////////////////////////////////////////////////////////// > The flink's code is as follows: > {code} > class SEventSource extends RichSourceFunction [SEvent] { > private var count = 0L > private val alphabet = > "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" > override def run(sourceContext: SourceContext[SEvent]): Unit = { > while (true) { > for (i <- 0 until 5000) { > sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) > count += 1L > } > Thread.sleep(1000) > } > } > } > env.addSource(new SEventSource) > .assignTimestampsAndWatermarks(new > AssignerWithPeriodicWatermarks[SEvent] { > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis()) > } > override def extractTimestamp(t: SEvent, l: Long): Long = { > System.currentTimeMillis() > } > }) > .keyBy(0) > .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2))) > .apply(new WindowStatistic) > .map(x => (System.currentTimeMillis(), x)) > .print() > {code} > //////////////////////////////////// > The RocksDB Test code: > {code} > val stringAppendOperator = new StringAppendOperator > val options = new Options() > options.setCompactionStyle(CompactionStyle.LEVEL) > .setCompressionType(CompressionType.SNAPPY_COMPRESSION) > .setLevelCompactionDynamicLevelBytes(true) > .setIncreaseParallelism(4) > .setUseFsync(true) > .setMaxOpenFiles(-1) > .setCreateIfMissing(true) > .setMergeOperator(stringAppendOperator) > val write_options = new WriteOptions > write_options.setSync(false) > val rocksDB = RocksDB.open(options, "/******/Data/") > val key = "key" > val value = > "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321" > val beginmerge = System.currentTimeMillis() > for(i <- 0 to 50000) { > rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes()) > //rocksDB.put(key.getBytes, value.getBytes) > } > println("finish") > val begin = System.currentTimeMillis() > rocksDB.get(key.getBytes) > val end = System.currentTimeMillis() > println("merge cost:" + (begin - beginmerge)) > println("Time consuming:" + (end - begin)) > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)