Hi,
I'm running my flink job on one rather large machine (20 cores with hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
It does more or less:
read csv from kafka -> keyBy one of the fields -> some custom state processing.
Kafka topic has 24 partitions, so my parallelism is also 24

After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state backend) I reached a point when throughput is ~120-150k/s. One the same kafka and machine I reached > 500k/s with simple filtering job, so I wanted to see what's the bottleneck.

It turns out that quite often all of kafka threads are stuck waiting for buffer from pool: "Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x00007f77fd80d000 nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
        - locked <0x00000002eade3890> (a java.util.ArrayDeque)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92) - locked <0x00000002eb73cbd0> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541) at org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
        at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297) at org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
        - locked <0x00000002eaf3eb50> (a java.lang.Object)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
        - locked <0x00000002eaf3eb50> (a java.lang.Object)

This seems a bit weird for me, as most of state processing threads are idle:

"My custom function -> (Sink: Unnamed, Map) (19/24)" #7353 daemon prio=5 os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on condition [0x00007f7bee8ed000]
   java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000002eb840c38> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467) at org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415) at org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        at java.lang.Thread.run(Thread.java:745)


I tried with using more network buffers, but I doesn't seem to change anything - and if I understand correctly https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers I should not need more than 24^2 * 4 of them...

Does anybody encountered such problem? Or maybe it's just normal for such case...

thanks,
maciek

Reply via email to