Hi all,

I was debugging a curious problem with a streaming job that contained an 
iteration and several AsynFunctions.

The entire job would stall out, with no progress being made.

But when I checked back pressure, only one function showed it as being high - 
everything else was OK.

And when I dumped threads, the only bit of my code that was running was indeed 
that one function w/high back pressure, stuck while making a collect() call.

There were two issues here….

1. A downstream function in the iteration was (significantly) increasing the 
number of tuples - it would get one in, and sometimes emit 100+.

The output would loop back as input via the iteration.

This eventually caused the network buffers to fill up, and that’s why the job 
got stuck.

I had to add my own tracking/throttling in one of my custom function, to avoid 
having too many “active” tuples.

So maybe something to note in documentation on iterations, if it’s not there 
already.

2. The back pressure calculation doesn’t take into account AsyncIO

When I double-checked the thread dump, there were actually a number of threads 
(one for each of my AsyncFunctions) that were stuck calling collect().

These all were named "AsyncIO-Emitter-Thread (<name of AsyncFunction>…). For 
example:

> "AsyncIO-Emitter-Thread (MyAsyncFunction -> (<blah>)) (1/1))" #125 daemon 
> prio=5 os_prio=31 tid=0x00007fb191025800 nid=0xac0b in Object.wait() 
> [0x00007000123f0000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>       at java.lang.Object.wait(Native Method)
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:224)
>       - locked <0x0000000773cb3ec0> (a java.util.ArrayDeque)
>       at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:193)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:132)
>       - locked <0x0000000773b98020> (a 
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
>       at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
>       at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:85)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:101)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:83)
>       at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:41)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>       at 
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>       at 
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>       at 
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:140)
>       at 
> org.apache.flink.streaming.api.collector.selector.DirectedOutput.collect(DirectedOutput.java:42)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>       at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>       at 
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>       at 
> org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:132)
>       - locked <0x0000000773b1bb70> (a java.lang.Object)
>       at 
> org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:83)
>       at java.lang.Thread.run(Thread.java:748)



I’m assuming that when my AsyncFunction calls collect(), this hands off the 
tuple to this AsyncIO-Emitter-Thread thread, which is why none of my code 
(either AsyncFunctions or threads in my pool doing async stuff) shows up in the 
thread dump.

And I’m assuming that the back pressure calculation isn’t associating these 
threads with the source function, which is why they don’t show up in the GUI.

I’m hoping someone can confirm the above. If so, I’ll file an issue.

Thanks,

— Ken

--------------------------
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr

Reply via email to