Probably worth restating.  I was hoping to avoid a lot of shuffles in my
Flink job.  A bunch of the data is already keyed by the userId (and stays
keyed by it most of the Flink job).

I'm not sure I understand why having two inputs is an issue in my case.
Does Flink send the same keys to different task managers depending on the
operator (even if parallelism is the same)?  It's also weird the output
looks fine as is and fails on rescaling partitions.


On Thu, Oct 21, 2021 at 10:23 PM Dan Hill <quietgol...@gmail.com> wrote:

> I found the following link about this.  Still looks applicable.  In my
> case, I don't need to do a broadcast join.
>
> https://www.alibabacloud.com/blog/flink-how-to-optimize-sql-performance-using-multiple-input-operators_597839
>
> On Thu, Oct 21, 2021 at 9:51 PM Dan Hill <quietgol...@gmail.com> wrote:
>
>> Interesting.  Thanks, JING ZHANG!
>>
>> On Mon, Oct 18, 2021 at 12:16 AM JING ZHANG <beyond1...@gmail.com> wrote:
>>
>>> Hi Dan,
>>> > I'm guessing I violate the "The second operator needs to be
>>> single-input (i.e. no TwoInputOp nor union() before)" part.
>>> I think.you are right.
>>>
>>> Do you want to remove shuffle of two inputs in your case? If yes, Flink 
>>> provides
>>> support for multiple input operators since 1.11 version. I think it
>>> might satisfy your need. You could find more in [1].
>>> However, at present, this function does not provide a complete interface
>>> of dataStream API. If users want to use it, they need to manually create
>>> multipleInputTransformation and multipleConnectedStreams.
>>>
>>>> MultipleInputTransformation<Long> transform = new 
>>>> MultipleInputTransformation<>(
>>>>    "My Operator",
>>>>    new SumAllInputOperatorFactory(),
>>>>    BasicTypeInfo.LONG_TYPE_INFO,
>>>>    1);
>>>>
>>>> env.addOperator(transform
>>>>    .addInput(source1.getTransformation())
>>>>    .addInput(source2.getTransformation())
>>>>    .addInput(source3.getTransformation()));
>>>> new MultipleConnectedStreams(env)
>>>>    .transform(transform)
>>>>    .addSink(resultSink);
>>>>
>>>>
>>> I would invite @Piotr to double check this conclusion. He is more
>>> professional on this topic.
>>>
>>> @Piotr, Would you please check Dan's question? Please correct me if I'm
>>> wrong.
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink?spm=a2c65.11461447.0.0.786d2323FtzWaR
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Dan Hill <quietgol...@gmail.com> 于2021年10月16日周六 上午6:28写道:
>>>
>>>> Thanks Thias and JING ZHANG!
>>>>
>>>> Here's a Google drive folder link
>>>> <https://drive.google.com/drive/folders/1chE0i3HyBWgREeRuM2qlGU8sK1jr4rLm?usp=sharing>
>>>> with the execution plan and two screenshots from the job graph.
>>>>
>>>> I'm guessing I violate the "The second operator needs to be
>>>> single-input (i.e. no TwoInputOp nor union() before)" part.
>>>>
>>>> After I do a transformation on a KeyedStream (so it goes back to a
>>>> SingleOutputStreamOperator), even if I do a simple map, it usually
>>>> disallows operator chaining.  Even with reinterpretAsKeyedStream, it
>>>> doesn't work.
>>>>
>>>>
>>>> On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG <beyond1...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Dan,
>>>>> Sorry for tipos,  I meant to provide the code to reproduce the
>>>>> problem. If the current program is complex and secret, maybe you could try
>>>>> to simplify the code.
>>>>> Besides, Matthias's guess is very reasonable. Could you please whether
>>>>> is there network shuffle between your two operators. Were those two
>>>>> operators chained into one vertex?
>>>>>
>>>>> Best,
>>>>> JING ZHANG
>>>>>
>>>>> Schwalbe Matthias <matthias.schwa...@viseca.ch> 于2021年10月15日周五
>>>>> 下午2:57写道:
>>>>>
>>>>>> … didn’t mean to hit the send button so soon 😊
>>>>>>
>>>>>>
>>>>>>
>>>>>> I guess we are getting closer to a solution
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thias
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Schwalbe Matthias
>>>>>> *Sent:* Freitag, 15. Oktober 2021 08:49
>>>>>> *To:* 'Dan Hill' <quietgol...@gmail.com>; user <user@flink.apache.org
>>>>>> >
>>>>>> *Subject:* RE: Any issues with reinterpretAsKeyedStream when scaling
>>>>>> partitions?
>>>>>>
>>>>>>
>>>>>>
>>>>>> Hi Dan again 😊,
>>>>>>
>>>>>>
>>>>>>
>>>>>> I shed a second look … from what I see from your call stack I
>>>>>> conclude that indeed you have a network shuffle between your two 
>>>>>> operators,
>>>>>>
>>>>>> In which case reinterpretAsKeyedStream wouldn’t work
>>>>>>
>>>>>>
>>>>>>
>>>>>> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277
>>>>>> indicates that the two operators are not chained)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> … just as a double-check could you please share both your
>>>>>>
>>>>>>    - Execution plan (call println(env.getExecutionPlan) right before
>>>>>>    your call env.execute) (json), and
>>>>>>    - Your job plan (screenshot from flink dashboard)
>>>>>>
>>>>>>
>>>>>>
>>>>>> There is a number of preconditions before two operators get chained,
>>>>>> and probably one of them fails (see [1]):
>>>>>>
>>>>>>    - The two operators need to allow chaining the resp. other (see
>>>>>>    [2] … chaining strategy)
>>>>>>    - We need a ForwardPartitioner in between
>>>>>>    - We need to be in streaming mode
>>>>>>    - Both operators need the same parallelism
>>>>>>    - Chaining needs to be enabled for the streaming environment
>>>>>>    - The second operator needs to be single-input (i.e. no
>>>>>>    TwoInputOp nor union() before)
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873
>>>>>>
>>>>>> [2]
>>>>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> *From:* Dan Hill <quietgol...@gmail.com>
>>>>>> *Sent:* Donnerstag, 14. Oktober 2021 17:50
>>>>>> *To:* user <user@flink.apache.org>
>>>>>> *Subject:* Any issues with reinterpretAsKeyedStream when scaling
>>>>>> partitions?
>>>>>>
>>>>>>
>>>>>>
>>>>>> I have a job that uses reinterpretAsKeyedStream across a simple map
>>>>>> to avoid a shuffle.  When changing the number of partitions, I'm hitting 
>>>>>> an
>>>>>> issue with registerEventTimeTimer complaining that "key group from 110 to
>>>>>> 119 does not contain 186".  I'm using Flink v1.12.3.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Any thoughts on this?  I don't know if there is a known issue
>>>>>> with reinterpretAsKeyedStream.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Rough steps:
>>>>>>
>>>>>> 1. I have a raw input stream of View records.  I keyBy the View using
>>>>>> Tuple2<Long, String>(platform_id, log_user_id).
>>>>>>
>>>>>> 2. I do a small transformation of View to a TinyView.  I
>>>>>> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key.
>>>>>> The keys are the same.
>>>>>>
>>>>>> 3. I use the TinyView in a KeyedCoProcessFunction.
>>>>>>
>>>>>>
>>>>>>
>>>>>> When I savepoint and start again with a different number of
>>>>>> partitions, my KeyedCoProcessFunction hits an issue
>>>>>> with registerEventTimeTimer and complains that "key group from 110 to 119
>>>>>> does not contain 186".  I verified that the key does not change and that 
>>>>>> we
>>>>>> use Tuple2 with primitives Long and String.
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> 2021-10-14 08:17:07
>>>>>>
>>>>>> java.lang.IllegalArgumentException: view x insertion issue with
>>>>>> registerEventTimeTimer for 
>>>>>> key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174),
>>>>>> flat=platform_id: 120
>>>>>>
>>>>>> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174"
>>>>>>
>>>>>> log_timestamp: 1634224329606
>>>>>>
>>>>>> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2"
>>>>>>
>>>>>>
>>>>>>
>>>>>>                 at
>>>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318)
>>>>>>
>>>>>>                 at
>>>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59)
>>>>>>
>>>>>>                 at
>>>>>> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>>>
>>>>>>                 at java.lang.Thread.run(Thread.java:748)
>>>>>>
>>>>>> Caused by: java.lang.IllegalArgumentException: key group from 110 to
>>>>>> 119 does not contain 186
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>>>>>>
>>>>>>                 at
>>>>>> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
>>>>>>
>>>>>>                 at
>>>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315)
>>>>>>
>>>>>>                 ... 17 more
>>>>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>>>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>>>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>>>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>>>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>>>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>>>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>>>>> dieser Informationen ist streng verboten.
>>>>>>
>>>>>> This message is intended only for the named recipient and may contain
>>>>>> confidential or privileged information. As the confidentiality of email
>>>>>> communication cannot be guaranteed, we do not accept any responsibility 
>>>>>> for
>>>>>> the confidentiality and the intactness of this message. If you have
>>>>>> received it in error, please advise the sender by return e-mail and 
>>>>>> delete
>>>>>> this message and any attachments. Any unauthorised use or dissemination 
>>>>>> of
>>>>>> this information is strictly prohibited.
>>>>>>
>>>>>

Reply via email to