Could be different things actually, including the parts of the network
you mentioned.

1)

Regarding the TM config:
- It can help to increase the number of network buffers (you can go
ahead and give it 4 GB, e.g. 134217728 buffers a 32 KB)
- In general, you have way more memory available than you actually
give to Flink. I would increase the 20 GB heap size.

As a first step you could address these two points and re-run your job.

2)

As a follow-up you could also work with the FileSystemStateBackend,
which keeps state in memory (on-heap) and writes checkpoints to files.
This would help in checking how much RocksDB is slowing things down.


I'm curious about the results. Do you think you will have time to try this?

– Ufuk


On Wed, Apr 20, 2016 at 3:45 PM, Maciek Próchniak <m...@touk.pl> wrote:
> Hi,
> I'm running my flink job on one rather large machine (20 cores with
> hyperthreading, 120GB RAM). Task manager has 20GB heap allocated.
> It does more or less:
> read csv from kafka -> keyBy one of the fields -> some custom state
> processing.
> Kafka topic has 24 partitions, so my parallelism is also 24
>
> After some tweaks and upgrading to 1.0.2-rc3 (as I use RocksDB state
> backend) I reached a point when throughput is ~120-150k/s.
> One the same kafka and machine I reached > 500k/s with simple filtering job,
> so I wanted to see what's the bottleneck.
>
> It turns out that quite often all of kafka threads are stuck waiting for
> buffer from pool:
> "Thread-6695" #7466 daemon prio=5 os_prio=0 tid=0x00007f77fd80d000
> nid=0x8118 in Object.wait() [0x00007f7ad54d9000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
>         - locked <0x00000002eade3890> (a java.util.ArrayDeque)
>         at
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
>         at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
>         - locked <0x00000002eb73cbd0> (a
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>         at
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
>         at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
>         at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>         at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>         at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at
> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
>         at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>         at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>         at
> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>         at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:309)
>         at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:297)
>         at
> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
>         - locked <0x00000002eaf3eb50> (a java.lang.Object)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
>         - locked <0x00000002eaf3eb50> (a java.lang.Object)
>
> This seems a bit weird for me, as most of state processing threads are idle:
>
> "My custom function -> (Sink: Unnamed, Map) (19/24)" #7353 daemon prio=5
> os_prio=0 tid=0x00007f7a7400e000 nid=0x80a7 waiting on condition
> [0x00007f7bee8ed000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000002eb840c38> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
>         at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
>         at
> java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
>         at
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:415)
>         at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:108)
>         at
> org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:175)
>         at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:65)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:225)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> I tried with using more network buffers, but I doesn't seem to change
> anything - and if I understand correctly
> https://ci.apache.org/projects/flink/flink-docs-master/setup/config.html#configuring-the-network-buffers
> I should not need more than 24^2 * 4 of them...
>
> Does anybody encountered such problem? Or maybe it's just normal for such
> case...
>
> thanks,
> maciek
>

Reply via email to