Hi Caizhi,

Thanks for the reply. Is this documented somewhere?

On Tue, Aug 3, 2021 at 10:56 AM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> As far as I know, this output.collect thingy is not thread safe, and you
> should never run your operator's main logic (from reading in the record to
> writing the results out) in a separated thread. Flink's runtime expect the
> whole operator chain to run in a single thread.
>
> Yuval Itzchakov <yuva...@gmail.com> 于2021年8月3日周二 下午1:47写道:
>
>> Hi,
>>
>> Flink 1.13.1
>> Scala 2.12.4
>>
>> I have an implementation of an AbstractStreamOperator, where in it's
>> processElement function enqueues an element to a queue which is pooled from
>> a background thread.
>>
>> When processing the elements in the background, I use the Output class to
>> emit elements downstream with a timestamp.
>>
>> [image: image.png]
>>
>> Is there any problem using Output from a background thread? I didn't see
>> anything in the docs and looking in the implementation I didn't see
>> anything that could be potentially problematic.
>>
>> The reason I'm asking this is that I'm seeing some weird behavior,
>> particularly with runtime exceptions happening with Flink SQL auto
>> generated code and I'm wondering if this may be causing issues. I was
>> previously using a MailboxExecutor to emit elements downstream from the
>> ASO, but that turned out to be problematic on its own as it doesn't honor
>> backpressure from downstream operators.
>>
>> For example, I'm seeing the following exception at runtime from one of my
>> Flink SQL apps:
>>
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager 
>> java.lang.IndexOutOfBoundsException: null
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at org.apache.flink.core.memory.MemorySegment.get(MemorySegment.java:375) 
>> ~[hunting-pipeline.jar:0.1-SNAPSHOT]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.table.data.binary.BinarySegmentUtils.bitGet(BinarySegmentUtils.java:461)
>>  ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.table.data.binary.BinaryRowData.isNullAt(BinaryRowData.java:157)
>>  ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at KeyProjection$245.apply(Unknown Source) ~[?:?]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at KeyProjection$245.apply(Unknown Source) ~[?:?]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector.getKey(BinaryRowDataKeySelector.java:49)
>>  ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.table.runtime.keyselector.BinaryRowDataKeySelector.getKey(BinaryRowDataKeySelector.java:28)
>>  ~[flink-table-blink_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement(AbstractStreamOperator.java:526)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.setKeyContextElement1(AbstractStreamOperator.java:514)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:204)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at 
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
>>  ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779) 
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566) 
>> ~[flink-dist_2.12-1.13.1.jar:1.13.1]
>> flink-eai-80503b991b894dc2ed76549f55683f05-b14acfcb-jm-f688b9b6 jobmanager   
>> at java.lang.Thread.run(Thread.java:829) ~[?:?]
>>
>> And this is the runtime generated code for KeyProjection:
>>
>> public class KeyProjection$56 implements 
>> org.apache.flink.table.runtime.generated.Projection<org.apache.flink.table.data.RowData,
>>  org.apache.flink.table.data.binary.BinaryRowData> {  
>> org.apache.flink.table.data.binary.BinaryRowData out = new 
>> org.apache.flink.table.data.binary.BinaryRowData(1);
>> org.apache.flink.table.data.writer.BinaryRowWriter outWriter = new 
>> org.apache.flink.table.data.writer.BinaryRowWriter(out);  public 
>> KeyProjection$56(Object[] references) throws Exception {  }  @Override
>>   public org.apache.flink.table.data.binary.BinaryRowData 
>> apply(org.apache.flink.table.data.RowData in1) {
>>     org.apache.flink.table.data.binary.BinaryStringData field$57;
>> boolean isNull$57;outWriter.reset();isNull$57 = in1.isNullAt(0);
>> field$57 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
>> if (!isNull$57) {
>>   field$57 = ((org.apache.flink.table.data.binary.BinaryStringData) 
>> in1.getString(0));
>> }
>> if (isNull$57) {
>>   outWriter.setNullAt(0);
>> } else {
>>   outWriter.writeString(0, field$57);
>> }outWriter.complete();    return out;
>>   }
>> }
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

-- 
Best Regards,
Yuval Itzchakov.

Reply via email to