Hi,
I'm running my flink job on one rather large machine (20 cores with
hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
It does more or less:
read csv from kafka -> keyBy one of the fields -> some custom state
processing.
Kafka topic has 24 partitions, so my parallelism is also 24
After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
backend) I reached a point when throughput is ~120-150k/s.
One the same kafka and machine I reached > 500k/s with simple filtering
job, so I wanted to see what's the bottleneck.
It turns out that quite often all of kafka threads are stuck waiting for
buffer from pool:
"Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x00007f77fd80d000
nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
- locked <0x00000002eade3890> (a java.util.ArrayDeque)
at
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
- locked <0x00000002eb73cbd0> (a
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
at scala.collection.immutable.List.foreach(List.scala:381)
at
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
at
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
- locked <0x00000002eaf3eb50> (a java.lang.Object)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
- locked <0x00000002eaf3eb50> (a java.lang.Object)
This seems a bit weird for me, as most of state processing threads are idle:
"My custom function -> (Sink: Unnamed, Map) (19/24)" #7353 daemon prio=5
os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on condition
[0x00007f7bee8ed000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00000002eb840c38> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at
java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
at
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415)
at
org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
at
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
at java.lang.Thread.run(Thread.java:745)
I tried with using more network buffers, but I doesn't seem to change
anything - and if I understand correctly
https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
I should not need more than 24^2 * 4 of them...
Does anybody encountered such problem? Or maybe it's just normal for
such case...
thanks,
maciek