Hi all, I was debugging a curious problem with a streaming job that contained an iteration and several AsynFunctions.
The entire job would stall out, with no progress being made. But when I checked back pressure, only one function showed it as being high - everything else was OK. And when I dumped threads, the only bit of my code that was running was indeed that one function w/high back pressure, stuck while making a collect() call. There were two issues here…. 1. A downstream function in the iteration was (significantly) increasing the number of tuples - it would get one in, and sometimes emit 100+. The output would loop back as input via the iteration. This eventually caused the network buffers to fill up, and that’s why the job got stuck. I had to add my own tracking/throttling in one of my custom function, to avoid having too many “active” tuples. So maybe something to note in documentation on iterations, if it’s not there already. 2. The back pressure calculation doesn’t take into account AsyncIO When I double-checked the thread dump, there were actually a number of threads (one for each of my AsyncFunctions) that were stuck calling collect(). These all were named "AsyncIO-Emitter-Thread (<name of AsyncFunction>…). For example: > "AsyncIO-Emitter-Thread (MyAsyncFunction -> (<blah>)) (1/1))" #125 daemon > prio=5 os_prio=31 tid=0x00007fb191025800 nid=0xac0b in Object.wait() > [0x00007000123f0000] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:224) > - locked <0x0000000773cb3ec0> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:193) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132) > - locked <0x0000000773b98020> (a > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:85) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:83) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:41) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) > at > org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:140) > at > org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:42) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:132) > - locked <0x0000000773b1bb70> (a java.lang.Object) > at > org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83) > at java.lang.Thread.run(Thread.java:748) I’m assuming that when my AsyncFunction calls collect(), this hands off the tuple to this AsyncIO-Emitter-Thread thread, which is why none of my code (either AsyncFunctions or threads in my pool doing async stuff) shows up in the thread dump. And I’m assuming that the back pressure calculation isn’t associating these threads with the source function, which is why they don’t show up in the GUI. I’m hoping someone can confirm the above. If so, I’ll file an issue. Thanks, — Ken -------------------------- Ken Krugler http://www.scaleunlimited.com custom big data solutions & training Hadoop, Cascading, Cassandra & Solr