Hello,
I have a poor throughput issue, and I think I managed to reproduce it using
the following code:
val conf = new Configuration()
conf.set(TaskManagerOptions.MANAGED_MEMORY_SIZE,
MemorySize.ofMebiBytes(6 * 1000))
conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY,
MemorySize.ofMebiBytes(8 * 1000))
conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(256))
conf.set(RocksDBConfigurableOptions.BLOCK_SIZE, new MemorySize(8 * 1000))
val be = new RocksDBStateBackend("file:///tmp")
val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
.setStateBackend(be)
env.setParallelism(3)
env.getConfig.enableObjectReuse()
val r = new scala.util.Random(31)
val randStr = r.nextString(4992)
val s = env.fromElements(1).process((value: Int, ctx:
_root_.org.apache.flink.streaming.api.functions.ProcessFunction[Int,
_root_.scala.Predef.String]#Context, out:
_root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]) =>
{
for (a <- 1 to 1000 * 1000 * 10) {
out.collect( randStr + r.nextString(8) )
}
}).keyBy(a=>a).process(new ProcessFunction[String, String] {
private var someState: MapState[String, String] = _
override def open(parameters: Configuration): Unit = {
someState = getRuntimeContext.getMapState(
new MapStateDescriptor[String, String]("someState",
createTypeInformation[String], createTypeInformation[String])
)
}
override def processElement(value: _root_.scala.Predef.String,
ctx:
_root_.org.apache.flink.streaming.api.functions.ProcessFunction[_root_.scala.Predef.String,
_root_.scala.Predef.String]#Context, out:
_root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]):
Unit = {
if(!someState.contains(value)) {
someState.put(value, value)
}
}
})
env.execute()
This has really poor throughput.
Now changing
out.collect( randStr + r.nextString(8) )
to
out.collect( r.nextString(8) + randStr)
Solves the issue.
Is there any way easy to fix this?
I tried to use hash index, but it required rocks db option called "prefix
extractor" which I don't know how to fill yet, and no idea if it will fix
it.
If anyone encountered that before, I would really use some advice/help.
Thanks!