Hi

If you want to improve the performance of point lookup, you could try to use 
additional hash index. This feature needs to pass a prefix extractor, however, 
original interface is not exposed out directly in java API.

You could try to call 
columnFamilyOptions.optimizeForPointLookup(blockCacheSizeMb) and it would use 
NoopTransform prefix extractor by default[1].
Please also consider to use this feature after Flink-1.10.2 due to RocksDB 
internal bug [2].

[1] 
https://github.com/dataArtisans/frocksdb/blob/c724d41fab7f9f09f9676dfccc6d210a191da4d6/options/options.cc#L477
[2] https://issues.apache.org/jira/browse/FLINK-17800

Best
Yun Tang


________________________________
From: ירון שני <yaron.sh...@gmail.com>
Sent: Wednesday, September 23, 2020 23:56
To: user@flink.apache.org <user@flink.apache.org>
Subject: Poor performance with large keys using RocksDB and MapState

Hello,
I have a poor throughput issue, and I think I managed to reproduce it using the 
following code:

val conf = new Configuration()
conf.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(6 * 
1000))
conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(8 * 
1000))
conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(256))
conf.set(RocksDBConfigurableOptions.BLOCK_SIZE, new MemorySize(8 * 1000))

val be = new RocksDBStateBackend("file:///tmp")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
  .setStateBackend(be)


env.setParallelism(3)
env.getConfig.enableObjectReuse()

val r = new scala.util.Random(31)
    val randStr = r.nextString(4992)
    val s = env.fromElements(1).process((value: Int, ctx: 
_root_.org.apache.flink.streaming.api.functions.ProcessFunction[Int, 
_root_.scala.Predef.String]#Context, out: 
_root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]) => {
      for (a <- 1 to 1000 * 1000 * 10) {
        out.collect( randStr + r.nextString(8) )

      }
    }).keyBy(a=>a).process(new ProcessFunction[String, String] {
      private var someState: MapState[String, String] = _

      override def open(parameters: Configuration): Unit = {
        someState = getRuntimeContext.getMapState(
          new MapStateDescriptor[String, String]("someState", 
createTypeInformation[String], createTypeInformation[String])
        )
      }

      override def processElement(value: _root_.scala.Predef.String, ctx: 
_root_.org.apache.flink.streaming.api.functions.ProcessFunction[_root_.scala.Predef.String,
 _root_.scala.Predef.String]#Context, out: 
_root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = {
        if(!someState.contains(value)) {
          someState.put(value, value)
        }
      }
})

env.execute()

This has really poor throughput.
Now changing
out.collect( randStr + r.nextString(8) )

to
out.collect( r.nextString(8) + randStr)
Solves the issue.
Is there any way easy to fix this?
I tried to use hash index, but it required rocks db option called "prefix 
extractor" which I don't know how to fill yet, and no idea if it will fix it.
If anyone encountered that before, I would really use some advice/help. Thanks!







Reply via email to