Hi!

As far as I know, this output.collect thingy is not thread safe, and you
should never run your operator's main logic (from reading in the record to
writing the results out) in a separated thread. Flink's runtime expect the
whole operator chain to run in a single thread.

Yuval Itzchakov <yuva...@gmail.com> 于2021年8月3日周二 下午1:47写道:

> Hi,
>
> Flink 1.13.1
> Scala 2.12.4
>
> I have an implementation of an AbstractStreamOperator, where in it's
> processElement function enqueues an element to a queue which is pooled from
> a background thread.
>
> When processing the elements in the background, I use the Output class to
> emit elements downstream with a timestamp.
>
> [image: image.png]
>
> Is there any problem using Output from a background thread? I didn't see
> anything in the docs and looking in the implementation I didn't see
> anything that could be potentially problematic.
>
> The reason I'm asking this is that I'm seeing some weird behavior,
> particularly with runtime exceptions happening with Flink SQL auto
> generated code and I'm wondering if this may be causing issues. I was
> previously using a MailboxExecutor to emit elements downstream from the
> ASO, but that turned out to be problematic on its own as it doesn't honor
> backpressure from downstream operators.
>
> For example, I'm seeing the following exception at runtime from one of my
> Flink SQL apps:
>
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager 
> java.lang.IndexOutOfBoundsException: null
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:375) 
> ~[hunting-pipeline.jar:0.1-SNAPSHOT]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.table.data.binary.BinarySegmentUtils.bitGet(BinarySegmentUtils.java:461)
>  ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.table.data.binary.BinaryRowData.isNullAt(BinaryRowData.java:157)
>  ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at KeyProjection$245.apply(Unknown Source) ~[?:?]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at KeyProjection$245.apply(Unknown Source) ~[?:?]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector.getKey(BinaryRowDataKeySelector.java:49)
>  ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector.getKey(BinaryRowDataKeySelector.java:28)
>  ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:526)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:514)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:204)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager    
> at java.lang.Thread.run(Thread.java:829) ~[?:?]
>
> And this is the runtime generated code for KeyProjection:
>
> public class KeyProjection$56 implements 
> org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
>  org.apache.flink.table.data.binary.BinaryRowData> {  
> org.apache.flink.table.data.binary.BinaryRowData out = new 
> org.apache.flink.table.data.binary.BinaryRowData(1);
> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new 
> org.apache.flink.table.data.writer.BinaryRowWriter(out);  public 
> KeyProjection$56(Object[] references) throws Exception {  }  @Override
>   public org.apache.flink.table.data.binary.BinaryRowData 
> apply(org.apache.flink.table.data.RowData in1) {
>     org.apache.flink.table.data.binary.BinaryStringData field$57;
> boolean isNull$57;outWriter.reset();isNull$57 = in1.isNullAt(0);
> field$57 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
> if (!isNull$57) {
>   field$57 = ((org.apache.flink.table.data.binary.BinaryStringData) 
> in1.getString(0));
> }
> if (isNull$57) {
>   outWriter.setNullAt(0);
> } else {
>   outWriter.writeString(0, field$57);
> }outWriter.complete();    return out;
>   }
> }
>
> --
> Best Regards,
> Yuval Itzchakov.
>

Reply via email to