> I found the following link about this.  Still looks applicable.  In my
> case, I don't need to do a broadcast join.
> https://www.alibabacloud.com/blog/flink-how-to-optimize-sql-performance-using-multiple-input-operators_597839
This is a good document. This document describes how to remove redundant
shuffles in batch jobs. You can find the operator implementation in
`BatchMultipleInputStreamOperator`[1]. It's a subclass of
`MultipleInputStreamOperator`[2] which we referred in the last email.

I'm not sure I understand why having two inputs is an issue in my case.?
We guess there exists a network shuffle between your two operators because
your second operator has multiple inputs. The operator with multiple inputs
would not be chained with it's previous operator, please see more
information in Thias's last email.


Dan Hill <quietgol...@gmail.com> 于2021年10月22日周五 下午1:34写道:

> Probably worth restating.  I was hoping to avoid a lot of shuffles in my
> Flink job.  A bunch of the data is already keyed by the userId (and stays
> keyed by it most of the Flink job).
> I'm not sure I understand why having two inputs is an issue in my case.
> Does Flink send the same keys to different task managers depending on the
> operator (even if parallelism is the same)?  It's also weird the output
> looks fine as is and fails on rescaling partitions.
> On Thu, Oct 21, 2021 at 10:23 PM Dan Hill <quietgol...@gmail.com> wrote:
>> I found the following link about this.  Still looks applicable.  In my
>> case, I don't need to do a broadcast join.
>> https://www.alibabacloud.com/blog/flink-how-to-optimize-sql-performance-using-multiple-input-operators_597839
>> On Thu, Oct 21, 2021 at 9:51 PM Dan Hill <quietgol...@gmail.com> wrote:
>>> Interesting.  Thanks, JING ZHANG!
>>> On Mon, Oct 18, 2021 at 12:16 AM JING ZHANG <beyond1...@gmail.com>
>>> wrote:
>>>> Hi Dan,
>>>> > I'm guessing I violate the "The second operator needs to be
>>>> single-input (i.e. no TwoInputOp nor union() before)" part.
>>>> I think.you are right.
>>>> Do you want to remove shuffle of two inputs in your case? If yes, Flink 
>>>> provides
>>>> support for multiple input operators since 1.11 version. I think it
>>>> might satisfy your need. You could find more in [1].
>>>> However, at present, this function does not provide a complete
>>>> interface of dataStream API. If users want to use it, they need to manually
>>>> create multipleInputTransformation and multipleConnectedStreams.
>>>>> MultipleInputTransformation<Long> transform = new 
>>>>> MultipleInputTransformation<>(
>>>>>    "My Operator",
>>>>>    new SumAllInputOperatorFactory(),
>>>>>    BasicTypeInfo.LONG_TYPE_INFO,
>>>>>    1);
>>>>> env.addOperator(transform
>>>>>    .addInput(source1.getTransformation())
>>>>>    .addInput(source2.getTransformation())
>>>>>    .addInput(source3.getTransformation()));
>>>>> new MultipleConnectedStreams(env)
>>>>>    .transform(transform)
>>>>>    .addSink(resultSink);
>>>> I would invite @Piotr to double check this conclusion. He is more
>>>> professional on this topic.
>>>> @Piotr, Would you please check Dan's question? Please correct me if I'm
>>>> wrong.
>>>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink?spm=a2c65.11461447.0.0.786d2323FtzWaR
>>>> Best,
>>>> Dan Hill <quietgol...@gmail.com> 于2021年10月16日周六 上午6:28写道:
>>>>> Thanks Thias and JING ZHANG!
>>>>> Here's a Google drive folder link
>>>>> <https://drive.google.com/drive/folders/1chE0i3HyBWgREeRuM2qlGU8sK1jr4rLm?usp=sharing>
>>>>> with the execution plan and two screenshots from the job graph.
>>>>> I'm guessing I violate the "The second operator needs to be
>>>>> single-input (i.e. no TwoInputOp nor union() before)" part.
>>>>> After I do a transformation on a KeyedStream (so it goes back to a
>>>>> SingleOutputStreamOperator), even if I do a simple map, it usually
>>>>> disallows operator chaining.  Even with reinterpretAsKeyedStream, it
>>>>> doesn't work.
>>>>> On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG <beyond1...@gmail.com>
>>>>> wrote:
>>>>>> Hi Dan,
>>>>>> Sorry for tipos,  I meant to provide the code to reproduce the
>>>>>> problem. If the current program is complex and secret, maybe you could 
>>>>>> try
>>>>>> to simplify the code.
>>>>>> Besides, Matthias's guess is very reasonable. Could you please
>>>>>> whether is there network shuffle between your two operators. Were
>>>>>> those two operators chained into one vertex?
>>>>>> Best,
>>>>>> Schwalbe Matthias <matthias.schwa...@viseca.ch> 于2021年10月15日周五
>>>>>> 下午2:57写道:
>>>>>>> … didn’t mean to hit the send button so soon 😊
>>>>>>> I guess we are getting closer to a solution
>>>>>>> Thias
>>>>>>> *From:* Schwalbe Matthias
>>>>>>> *Sent:* Freitag, 15. Oktober 2021 08:49
>>>>>>> *To:* 'Dan Hill' <quietgol...@gmail.com>; user <
>>>>>>> user@flink.apache.org>
>>>>>>> *Subject:* RE: Any issues with reinterpretAsKeyedStream when
>>>>>>> scaling partitions?
>>>>>>> Hi Dan again 😊,
>>>>>>> I shed a second look … from what I see from your call stack I
>>>>>>> conclude that indeed you have a network shuffle between your two 
>>>>>>> operators,
>>>>>>> In which case reinterpretAsKeyedStream wouldn’t work
>>>>>>> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277
>>>>>>> indicates that the two operators are not chained)
>>>>>>> … just as a double-check could you please share both your
>>>>>>>    - Execution plan (call println(env.getExecutionPlan) right
>>>>>>>    before your call env.execute) (json), and
>>>>>>>    - Your job plan (screenshot from flink dashboard)
>>>>>>> There is a number of preconditions before two operators get chained,
>>>>>>> and probably one of them fails (see [1]):
>>>>>>>    - The two operators need to allow chaining the resp. other (see
>>>>>>>    [2] … chaining strategy)
>>>>>>>    - We need a ForwardPartitioner in between
>>>>>>>    - We need to be in streaming mode
>>>>>>>    - Both operators need the same parallelism
>>>>>>>    - Chaining needs to be enabled for the streaming environment
>>>>>>>    - The second operator needs to be single-input (i.e. no
>>>>>>>    TwoInputOp nor union() before)
>>>>>>> [1]
>>>>>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873
>>>>>>> [2]
>>>>>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932
>>>>>>> *From:* Dan Hill <quietgol...@gmail.com>
>>>>>>> *Sent:* Donnerstag, 14. Oktober 2021 17:50
>>>>>>> *To:* user <user@flink.apache.org>
>>>>>>> *Subject:* Any issues with reinterpretAsKeyedStream when scaling
>>>>>>> partitions?
>>>>>>> I have a job that uses reinterpretAsKeyedStream across a simple map
>>>>>>> to avoid a shuffle.  When changing the number of partitions, I'm 
>>>>>>> hitting an
>>>>>>> issue with registerEventTimeTimer complaining that "key group from 110 
>>>>>>> to
>>>>>>> 119 does not contain 186".  I'm using Flink v1.12.3.
>>>>>>> Any thoughts on this?  I don't know if there is a known issue
>>>>>>> with reinterpretAsKeyedStream.
>>>>>>> Rough steps:
>>>>>>> 1. I have a raw input stream of View records.  I keyBy the View
>>>>>>> using Tuple2<Long, String>(platform_id, log_user_id).
>>>>>>> 2. I do a small transformation of View to a TinyView.  I
>>>>>>> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same 
>>>>>>> key.
>>>>>>> The keys are the same.
>>>>>>> 3. I use the TinyView in a KeyedCoProcessFunction.
>>>>>>> When I savepoint and start again with a different number of
>>>>>>> partitions, my KeyedCoProcessFunction hits an issue
>>>>>>> with registerEventTimeTimer and complains that "key group from 110 to 
>>>>>>> 119
>>>>>>> does not contain 186".  I verified that the key does not change and 
>>>>>>> that we
>>>>>>> use Tuple2 with primitives Long and String.
>>>>>>> 2021-10-14 08:17:07
>>>>>>> java.lang.IllegalArgumentException: view x insertion issue with
>>>>>>> registerEventTimeTimer for 
>>>>>>> key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174),
>>>>>>> flat=platform_id: 120
>>>>>>> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174"
>>>>>>> log_timestamp: 1634224329606
>>>>>>> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2"
>>>>>>>                 at
>>>>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318)
>>>>>>>                 at
>>>>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59)
>>>>>>>                 at
>>>>>>> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>>>>>>                 at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>>>>>                 at
>>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>>>>                 at java.lang.Thread.run(Thread.java:748)
>>>>>>> Caused by: java.lang.IllegalArgumentException: key group from 110 to
>>>>>>> 119 does not contain 186
>>>>>>>                 at
>>>>>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>>>>>>>                 at
>>>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>>>>>>>                 at
>>>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>>>>>>>                 at
>>>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>>>>>>>                 at
>>>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>>>>>>>                 at
>>>>>>> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
>>>>>>>                 at
>>>>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315)
>>>>>>>                 ... 17 more
>>>>>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>>>>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>>>>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>>>>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>>>>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>>>>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht 
>>>>>>> sowie
>>>>>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>>>>>> dieser Informationen ist streng verboten.
>>>>>>> This message is intended only for the named recipient and may
>>>>>>> contain confidential or privileged information. As the confidentiality 
>>>>>>> of
>>>>>>> email communication cannot be guaranteed, we do not accept any
>>>>>>> responsibility for the confidentiality and the intactness of this 
>>>>>>> message.
>>>>>>> If you have received it in error, please advise the sender by return 
>>>>>>> e-mail
>>>>>>> and delete this message and any attachments. Any unauthorised use or
>>>>>>> dissemination of this information is strictly prohibited.

Reply via email to