I found the following link about this.  Still looks applicable.  In my
case, I don't need to do a broadcast join.
https://www.alibabacloud.com/blog/flink-how-to-optimize-sql-performance-using-multiple-input-operators_597839

On Thu, Oct 21, 2021 at 9:51 PM Dan Hill <quietgol...@gmail.com> wrote:

> Interesting.  Thanks, JING ZHANG!
>
> On Mon, Oct 18, 2021 at 12:16 AM JING ZHANG <beyond1...@gmail.com> wrote:
>
>> Hi Dan,
>> > I'm guessing I violate the "The second operator needs to be
>> single-input (i.e. no TwoInputOp nor union() before)" part.
>> I think.you are right.
>>
>> Do you want to remove shuffle of two inputs in your case? If yes, Flink 
>> provides
>> support for multiple input operators since 1.11 version. I think it
>> might satisfy your need. You could find more in [1].
>> However, at present, this function does not provide a complete interface
>> of dataStream API. If users want to use it, they need to manually create
>> multipleInputTransformation and multipleConnectedStreams.
>>
>>> MultipleInputTransformation<Long> transform = new 
>>> MultipleInputTransformation<>(
>>>    "My Operator",
>>>    new SumAllInputOperatorFactory(),
>>>    BasicTypeInfo.LONG_TYPE_INFO,
>>>    1);
>>>
>>> env.addOperator(transform
>>>    .addInput(source1.getTransformation())
>>>    .addInput(source2.getTransformation())
>>>    .addInput(source3.getTransformation()));
>>> new MultipleConnectedStreams(env)
>>>    .transform(transform)
>>>    .addSink(resultSink);
>>>
>>>
>> I would invite @Piotr to double check this conclusion. He is more
>> professional on this topic.
>>
>> @Piotr, Would you please check Dan's question? Please correct me if I'm
>> wrong.
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink?spm=a2c65.11461447.0.0.786d2323FtzWaR
>>
>> Best,
>> JING ZHANG
>>
>> Dan Hill <quietgol...@gmail.com> 于2021年10月16日周六 上午6:28写道:
>>
>>> Thanks Thias and JING ZHANG!
>>>
>>> Here's a Google drive folder link
>>> <https://drive.google.com/drive/folders/1chE0i3HyBWgREeRuM2qlGU8sK1jr4rLm?usp=sharing>
>>> with the execution plan and two screenshots from the job graph.
>>>
>>> I'm guessing I violate the "The second operator needs to be
>>> single-input (i.e. no TwoInputOp nor union() before)" part.
>>>
>>> After I do a transformation on a KeyedStream (so it goes back to a
>>> SingleOutputStreamOperator), even if I do a simple map, it usually
>>> disallows operator chaining.  Even with reinterpretAsKeyedStream, it
>>> doesn't work.
>>>
>>>
>>> On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG <beyond1...@gmail.com>
>>> wrote:
>>>
>>>> Hi Dan,
>>>> Sorry for tipos,  I meant to provide the code to reproduce the problem.
>>>> If the current program is complex and secret, maybe you could try to
>>>> simplify the code.
>>>> Besides, Matthias's guess is very reasonable. Could you please whether
>>>> is there network shuffle between your two operators. Were those two
>>>> operators chained into one vertex?
>>>>
>>>> Best,
>>>> JING ZHANG
>>>>
>>>> Schwalbe Matthias <matthias.schwa...@viseca.ch> 于2021年10月15日周五
>>>> 下午2:57写道:
>>>>
>>>>> … didn’t mean to hit the send button so soon 😊
>>>>>
>>>>>
>>>>>
>>>>> I guess we are getting closer to a solution
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Thias
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *From:* Schwalbe Matthias
>>>>> *Sent:* Freitag, 15. Oktober 2021 08:49
>>>>> *To:* 'Dan Hill' <quietgol...@gmail.com>; user <user@flink.apache.org>
>>>>> *Subject:* RE: Any issues with reinterpretAsKeyedStream when scaling
>>>>> partitions?
>>>>>
>>>>>
>>>>>
>>>>> Hi Dan again 😊,
>>>>>
>>>>>
>>>>>
>>>>> I shed a second look … from what I see from your call stack I conclude
>>>>> that indeed you have a network shuffle between your two operators,
>>>>>
>>>>> In which case reinterpretAsKeyedStream wouldn’t work
>>>>>
>>>>>
>>>>>
>>>>> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277
>>>>> indicates that the two operators are not chained)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> … just as a double-check could you please share both your
>>>>>
>>>>>    - Execution plan (call println(env.getExecutionPlan) right before
>>>>>    your call env.execute) (json), and
>>>>>    - Your job plan (screenshot from flink dashboard)
>>>>>
>>>>>
>>>>>
>>>>> There is a number of preconditions before two operators get chained,
>>>>> and probably one of them fails (see [1]):
>>>>>
>>>>>    - The two operators need to allow chaining the resp. other (see
>>>>>    [2] … chaining strategy)
>>>>>    - We need a ForwardPartitioner in between
>>>>>    - We need to be in streaming mode
>>>>>    - Both operators need the same parallelism
>>>>>    - Chaining needs to be enabled for the streaming environment
>>>>>    - The second operator needs to be single-input (i.e. no TwoInputOp
>>>>>    nor union() before)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873
>>>>>
>>>>> [2]
>>>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *From:* Dan Hill <quietgol...@gmail.com>
>>>>> *Sent:* Donnerstag, 14. Oktober 2021 17:50
>>>>> *To:* user <user@flink.apache.org>
>>>>> *Subject:* Any issues with reinterpretAsKeyedStream when scaling
>>>>> partitions?
>>>>>
>>>>>
>>>>>
>>>>> I have a job that uses reinterpretAsKeyedStream across a simple map to
>>>>> avoid a shuffle.  When changing the number of partitions, I'm hitting an
>>>>> issue with registerEventTimeTimer complaining that "key group from 110 to
>>>>> 119 does not contain 186".  I'm using Flink v1.12.3.
>>>>>
>>>>>
>>>>>
>>>>> Any thoughts on this?  I don't know if there is a known issue
>>>>> with reinterpretAsKeyedStream.
>>>>>
>>>>>
>>>>>
>>>>> Rough steps:
>>>>>
>>>>> 1. I have a raw input stream of View records.  I keyBy the View using
>>>>> Tuple2<Long, String>(platform_id, log_user_id).
>>>>>
>>>>> 2. I do a small transformation of View to a TinyView.  I
>>>>> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key.
>>>>> The keys are the same.
>>>>>
>>>>> 3. I use the TinyView in a KeyedCoProcessFunction.
>>>>>
>>>>>
>>>>>
>>>>> When I savepoint and start again with a different number of
>>>>> partitions, my KeyedCoProcessFunction hits an issue
>>>>> with registerEventTimeTimer and complains that "key group from 110 to 119
>>>>> does not contain 186".  I verified that the key does not change and that 
>>>>> we
>>>>> use Tuple2 with primitives Long and String.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> 2021-10-14 08:17:07
>>>>>
>>>>> java.lang.IllegalArgumentException: view x insertion issue with
>>>>> registerEventTimeTimer for key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174),
>>>>> flat=platform_id: 120
>>>>>
>>>>> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174"
>>>>>
>>>>> log_timestamp: 1634224329606
>>>>>
>>>>> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2"
>>>>>
>>>>>
>>>>>
>>>>>                 at
>>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318)
>>>>>
>>>>>                 at
>>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59)
>>>>>
>>>>>                 at
>>>>> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>>
>>>>>                 at java.lang.Thread.run(Thread.java:748)
>>>>>
>>>>> Caused by: java.lang.IllegalArgumentException: key group from 110 to
>>>>> 119 does not contain 186
>>>>>
>>>>>                 at
>>>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>>>>>
>>>>>                 at
>>>>> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
>>>>>
>>>>>                 at
>>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315)
>>>>>
>>>>>                 ... 17 more
>>>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>>>> dieser Informationen ist streng verboten.
>>>>>
>>>>> This message is intended only for the named recipient and may contain
>>>>> confidential or privileged information. As the confidentiality of email
>>>>> communication cannot be guaranteed, we do not accept any responsibility 
>>>>> for
>>>>> the confidentiality and the intactness of this message. If you have
>>>>> received it in error, please advise the sender by return e-mail and delete
>>>>> this message and any attachments. Any unauthorised use or dissemination of
>>>>> this information is strictly prohibited.
>>>>>
>>>>

Reply via email to