Hi! As far as I know, this output.collect thingy is not thread safe, and you should never run your operator's main logic (from reading in the record to writing the results out) in a separated thread. Flink's runtime expect the whole operator chain to run in a single thread.
Yuval Itzchakov <yuva...@gmail.com> 于2021年8月3日周二 下午1:47写道: > Hi, > > Flink 1.13.1 > Scala 2.12.4 > > I have an implementation of an AbstractStreamOperator, where in it's > processElement function enqueues an element to a queue which is pooled from > a background thread. > > When processing the elements in the background, I use the Output class to > emit elements downstream with a timestamp. > > [image: image.png] > > Is there any problem using Output from a background thread? I didn't see > anything in the docs and looking in the implementation I didn't see > anything that could be potentially problematic. > > The reason I'm asking this is that I'm seeing some weird behavior, > particularly with runtime exceptions happening with Flink SQL auto > generated code and I'm wondering if this may be causing issues. I was > previously using a MailboxExecutor to emit elements downstream from the > ASO, but that turned out to be problematic on its own as it doesn't honor > backpressure from downstream operators. > > For example, I'm seeing the following exception at runtime from one of my > Flink SQL apps: > > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > java.lang.IndexOutOfBoundsException: null > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:375) > ~[hunting-pipeline.jar:0.1-SNAPSHOT] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.table.data.binary.BinarySegmentUtils.bitGet(BinarySegmentUtils.java:461) > ~[flink-table-blink_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.table.data.binary.BinaryRowData.isNullAt(BinaryRowData.java:157) > ~[flink-table-blink_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at KeyProjection$245.apply(Unknown Source) ~[?:?] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at KeyProjection$245.apply(Unknown Source) ~[?:?] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector.getKey(BinaryRowDataKeySelector.java:49) > ~[flink-table-blink_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector.getKey(BinaryRowDataKeySelector.java:28) > ~[flink-table-blink_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:526) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:514) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:204) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) > ~[flink-dist_2.12-1.13.1.jar:1.13.1] > flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager > at java.lang.Thread.run(Thread.java:829) ~[?:?] > > And this is the runtime generated code for KeyProjection: > > public class KeyProjection$56 implements > org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, > org.apache.flink.table.data.binary.BinaryRowData> { > org.apache.flink.table.data.binary.BinaryRowData out = new > org.apache.flink.table.data.binary.BinaryRowData(1); > org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new > org.apache.flink.table.data.writer.BinaryRowWriter(out); public > KeyProjection$56(Object[] references) throws Exception { } @Override > public org.apache.flink.table.data.binary.BinaryRowData > apply(org.apache.flink.table.data.RowData in1) { > org.apache.flink.table.data.binary.BinaryStringData field$57; > boolean isNull$57;outWriter.reset();isNull$57 = in1.isNullAt(0); > field$57 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; > if (!isNull$57) { > field$57 = ((org.apache.flink.table.data.binary.BinaryStringData) > in1.getString(0)); > } > if (isNull$57) { > outWriter.setNullAt(0); > } else { > outWriter.writeString(0, field$57); > }outWriter.complete(); return out; > } > } > > -- > Best Regards, > Yuval Itzchakov. >