[ 
https://issues.apache.org/jira/browse/FLINK-5756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen updated FLINK-5756:
--------------------------------
    Description: 
When using RocksDB as the StateBackend, if there are many values under the same 
key in ListState, the windowState.get() operator performances very poor. I also 
the the RocksDB using version 4.11.2, the performance is also very poor. The 
problem is likely to related to RocksDB itself's get() operator after using 
merge(). The problem may influences the window operation's performance when the 
size is very large using ListState. I try to merge 50000 values under the same 
key in RocksDB, It costs 120 seconds to execute get() operation.

///////////////////////////////////////////////////////////////////////////////
The flink's code is as follows:    

{code}
class SEventSource extends RichSourceFunction [SEvent] {

  private var count = 0L

  private val alphabet = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
  override def run(sourceContext: SourceContext[SEvent]): Unit = {
    while (true) {
      for (i <- 0 until 5000) {
        sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
        count += 1L
      }
      Thread.sleep(1000)
    }
  }
}

env.addSource(new SEventSource)
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] 
{
        override def getCurrentWatermark: Watermark = {
          new Watermark(System.currentTimeMillis())
        }

        override def extractTimestamp(t: SEvent, l: Long): Long = {
          System.currentTimeMillis()
        }
      })
      .keyBy(0)
      .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
      .apply(new WindowStatistic)
      .map(x => (System.currentTimeMillis(), x))
      .print()
{code}

////////////////////////////////////
The RocksDB Test code:    

{code}
val stringAppendOperator = new StringAppendOperator
    val options = new Options()
    options.setCompactionStyle(CompactionStyle.LEVEL)
      .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
      .setLevelCompactionDynamicLevelBytes(true)
      .setIncreaseParallelism(4)
      .setUseFsync(true)
      .setMaxOpenFiles(-1)
      .setCreateIfMissing(true)
      .setMergeOperator(stringAppendOperator)

    val write_options = new WriteOptions
    write_options.setSync(false)

    val rocksDB = RocksDB.open(options, "/******/Data/")
    val key = "key"
    val value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"

    val beginmerge = System.currentTimeMillis()
    for(i <- 0 to 50000) {
      rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
      //rocksDB.put(key.getBytes, value.getBytes)
    }
    println("finish")

    val begin = System.currentTimeMillis()
    rocksDB.get(key.getBytes)
    val end = System.currentTimeMillis()

    println("merge cost:" + (begin - beginmerge))
    println("Time consuming:" + (end - begin))
  }
}
{code}

  was:
When using RocksDB as the StateBackend, if there are many values under the same 
key in ListState, the windowState.get() operator performances very poor. I also 
the the RocksDB using version 4.11.2, the performance is also very poor. The 
problem is likely to related to RocksDB itself's get() operator after using 
merge(). The problem may influences the window operation's performance when the 
size is very large using ListState. I try to merge 50000 values under the same 
key in RocksDB, It costs 120 seconds to execute get() operation.

///////////////////////////////////////////////////////////////////////////////
The flink's code is as follows:    

class SEventSource extends RichSourceFunction [SEvent] {

  private var count = 0L

  private val alphabet = 
"abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
  override def run(sourceContext: SourceContext[SEvent]): Unit = {
    while (true) {
      for (i <- 0 until 5000) {
        sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
        count += 1L
      }
      Thread.sleep(1000)
    }
  }
}

env.addSource(new SEventSource)
      .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] 
{
        override def getCurrentWatermark: Watermark = {
          new Watermark(System.currentTimeMillis())
        }

        override def extractTimestamp(t: SEvent, l: Long): Long = {
          System.currentTimeMillis()
        }
      })
      .keyBy(0)
      .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
      .apply(new WindowStatistic)
      .map(x => (System.currentTimeMillis(), x))
      .print()

////////////////////////////////////
The RocksDB Test code:    

val stringAppendOperator = new StringAppendOperator
    val options = new Options()
    options.setCompactionStyle(CompactionStyle.LEVEL)
      .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
      .setLevelCompactionDynamicLevelBytes(true)
      .setIncreaseParallelism(4)
      .setUseFsync(true)
      .setMaxOpenFiles(-1)
      .setCreateIfMissing(true)
      .setMergeOperator(stringAppendOperator)

    val write_options = new WriteOptions
    write_options.setSync(false)

    val rocksDB = RocksDB.open(options, "/******/Data/")
    val key = "key"
    val value = 
"abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"

    val beginmerge = System.currentTimeMillis()
    for(i <- 0 to 50000) {
      rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
      //rocksDB.put(key.getBytes, value.getBytes)
    }
    println("finish")

    val begin = System.currentTimeMillis()
    rocksDB.get(key.getBytes)
    val end = System.currentTimeMillis()

    println("merge cost:" + (begin - beginmerge))
    println("Time consuming:" + (end - begin))
  }
}


> When there are many values under the same key in ListState, 
> RocksDBStateBackend performances poor
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-5756
>                 URL: https://issues.apache.org/jira/browse/FLINK-5756
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.2.0
>         Environment: CentOS 7.2
>            Reporter: Syinchwun Leo
>
> When using RocksDB as the StateBackend, if there are many values under the 
> same key in ListState, the windowState.get() operator performances very poor. 
> I also the the RocksDB using version 4.11.2, the performance is also very 
> poor. The problem is likely to related to RocksDB itself's get() operator 
> after using merge(). The problem may influences the window operation's 
> performance when the size is very large using ListState. I try to merge 50000 
> values under the same key in RocksDB, It costs 120 seconds to execute get() 
> operation.
> ///////////////////////////////////////////////////////////////////////////////
> The flink's code is as follows:    
> {code}
> class SEventSource extends RichSourceFunction [SEvent] {
>   private var count = 0L
>   private val alphabet = 
> "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321"
>   override def run(sourceContext: SourceContext[SEvent]): Unit = {
>     while (true) {
>       for (i <- 0 until 5000) {
>         sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1))
>         count += 1L
>       }
>       Thread.sleep(1000)
>     }
>   }
> }
> env.addSource(new SEventSource)
>       .assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[SEvent] {
>         override def getCurrentWatermark: Watermark = {
>           new Watermark(System.currentTimeMillis())
>         }
>         override def extractTimestamp(t: SEvent, l: Long): Long = {
>           System.currentTimeMillis()
>         }
>       })
>       .keyBy(0)
>       .window(SlidingEventTimeWindows.of(Time.seconds(20), Time.seconds(2)))
>       .apply(new WindowStatistic)
>       .map(x => (System.currentTimeMillis(), x))
>       .print()
> {code}
> ////////////////////////////////////
> The RocksDB Test code:    
> {code}
> val stringAppendOperator = new StringAppendOperator
>     val options = new Options()
>     options.setCompactionStyle(CompactionStyle.LEVEL)
>       .setCompressionType(CompressionType.SNAPPY_COMPRESSION)
>       .setLevelCompactionDynamicLevelBytes(true)
>       .setIncreaseParallelism(4)
>       .setUseFsync(true)
>       .setMaxOpenFiles(-1)
>       .setCreateIfMissing(true)
>       .setMergeOperator(stringAppendOperator)
>     val write_options = new WriteOptions
>     write_options.setSync(false)
>     val rocksDB = RocksDB.open(options, "/******/Data/")
>     val key = "key"
>     val value = 
> "abcdefghijklmnopqrstuvwxyz0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ7890654321"
>     val beginmerge = System.currentTimeMillis()
>     for(i <- 0 to 50000) {
>       rocksDB.merge(key.getBytes(), ("s"+ i + value).getBytes())
>       //rocksDB.put(key.getBytes, value.getBytes)
>     }
>     println("finish")
>     val begin = System.currentTimeMillis()
>     rocksDB.get(key.getBytes)
>     val end = System.currentTimeMillis()
>     println("merge cost:" + (begin - beginmerge))
>     println("Time consuming:" + (end - begin))
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to