Hi Caizhi, Thanks for the reply. Is this documented somewhere?
On Tue, Aug 3, 2021 at 10:56 AM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > As far as I know, this output.collect thingy is not thread safe, and you > should never run your operator's main logic (from reading in the record to > writing the results out) in a separated thread. Flink's runtime expect the > whole operator chain to run in a single thread. > > Yuval Itzchakov <yuva...@gmail.com> 于2021年8月3日周二 下午1:47写道: > >> Hi, >> >> Flink 1.13.1 >> Scala 2.12.4 >> >> I have an implementation of an AbstractStreamOperator, where in it's >> processElement function enqueues an element to a queue which is pooled from >> a background thread. >> >> When processing the elements in the background, I use the Output class to >> emit elements downstream with a timestamp. >> >> [image: image.png] >> >> Is there any problem using Output from a background thread? I didn't see >> anything in the docs and looking in the implementation I didn't see >> anything that could be potentially problematic. >> >> The reason I'm asking this is that I'm seeing some weird behavior, >> particularly with runtime exceptions happening with Flink SQL auto >> generated code and I'm wondering if this may be causing issues. I was >> previously using a MailboxExecutor to emit elements downstream from the >> ASO, but that turned out to be problematic on its own as it doesn't honor >> backpressure from downstream operators. >> >> For example, I'm seeing the following exception at runtime from one of my >> Flink SQL apps: >> >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> java.lang.IndexOutOfBoundsException: null >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:375) >> ~[hunting-pipeline.jar:0.1-SNAPSHOT] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.table.data.binary.BinarySegmentUtils.bitGet(BinarySegmentUtils.java:461) >> ~[flink-table-blink_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.table.data.binary.BinaryRowData.isNullAt(BinaryRowData.java:157) >> ~[flink-table-blink_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at KeyProjection$245.apply(Unknown Source) ~[?:?] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at KeyProjection$245.apply(Unknown Source) ~[?:?] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector.getKey(BinaryRowDataKeySelector.java:49) >> ~[flink-table-blink_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector.getKey(BinaryRowDataKeySelector.java:28) >> ~[flink-table-blink_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:526) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:514) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:204) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) >> ~[flink-dist_2.12-1.13.1.jar:1.13.1] >> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager >> at java.lang.Thread.run(Thread.java:829) ~[?:?] >> >> And this is the runtime generated code for KeyProjection: >> >> public class KeyProjection$56 implements >> org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData, >> org.apache.flink.table.data.binary.BinaryRowData> { >> org.apache.flink.table.data.binary.BinaryRowData out = new >> org.apache.flink.table.data.binary.BinaryRowData(1); >> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new >> org.apache.flink.table.data.writer.BinaryRowWriter(out); public >> KeyProjection$56(Object[] references) throws Exception { } @Override >> public org.apache.flink.table.data.binary.BinaryRowData >> apply(org.apache.flink.table.data.RowData in1) { >> org.apache.flink.table.data.binary.BinaryStringData field$57; >> boolean isNull$57;outWriter.reset();isNull$57 = in1.isNullAt(0); >> field$57 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8; >> if (!isNull$57) { >> field$57 = ((org.apache.flink.table.data.binary.BinaryStringData) >> in1.getString(0)); >> } >> if (isNull$57) { >> outWriter.setNullAt(0); >> } else { >> outWriter.writeString(0, field$57); >> }outWriter.complete(); return out; >> } >> } >> >> -- >> Best Regards, >> Yuval Itzchakov. >> > -- Best Regards, Yuval Itzchakov.