Hello, I have a poor throughput issue, and I think I managed to reproduce it using the following code:
val conf = new Configuration() conf.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.ofMebiBytes(6 * 1000)) conf.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(8 * 1000)) conf.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.ofMebiBytes(256)) conf.set(RocksDBConfigurableOptions.BLOCK_SIZE, new MemorySize(8 * 1000)) val be = new RocksDBStateBackend("file:///tmp") val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf) .setStateBackend(be) env.setParallelism(3) env.getConfig.enableObjectReuse() val r = new scala.util.Random(31) val randStr = r.nextString(4992) val s = env.fromElements(1).process((value: Int, ctx: _root_.org.apache.flink.streaming.api.functions.ProcessFunction[Int, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]) => { for (a <- 1 to 1000 * 1000 * 10) { out.collect( randStr + r.nextString(8) ) } }).keyBy(a=>a).process(new ProcessFunction[String, String] { private var someState: MapState[String, String] = _ override def open(parameters: Configuration): Unit = { someState = getRuntimeContext.getMapState( new MapStateDescriptor[String, String]("someState", createTypeInformation[String], createTypeInformation[String]) ) } override def processElement(value: _root_.scala.Predef.String, ctx: _root_.org.apache.flink.streaming.api.functions.ProcessFunction[_root_.scala.Predef.String, _root_.scala.Predef.String]#Context, out: _root_.org.apache.flink.util.Collector[_root_.scala.Predef.String]): Unit = { if(!someState.contains(value)) { someState.put(value, value) } } }) env.execute() This has really poor throughput. Now changing out.collect( randStr + r.nextString(8) ) to out.collect( r.nextString(8) + randStr) Solves the issue. Is there any way easy to fix this? I tried to use hash index, but it required rocks db option called "prefix extractor" which I don't know how to fill yet, and no idea if it will fix it. If anyone encountered that before, I would really use some advice/help. Thanks!