Hi Dan,
> I'm guessing I violate the "The second operator needs to be single-input
(i.e. no TwoInputOp nor union() before)" part.
I think.you are right.

Do you want to remove shuffle of two inputs in your case? If yes,
Flink provides
support for multiple input operators since 1.11 version. I think it might
satisfy your need. You could find more in [1].
However, at present, this function does not provide a complete interface of
dataStream API. If users want to use it, they need to manually create
multipleInputTransformation and multipleConnectedStreams.

> MultipleInputTransformation<Long> transform = new 
> MultipleInputTransformation<>(
>    "My Operator",
>    new SumAllInputOperatorFactory(),
>    BasicTypeInfo.LONG_TYPE_INFO,
>    1);
>
> env.addOperator(transform
>    .addInput(source1.getTransformation())
>    .addInput(source2.getTransformation())
>    .addInput(source3.getTransformation()));
> new MultipleConnectedStreams(env)
>    .transform(transform)
>    .addSink(resultSink);
>
>
I would invite @Piotr to double check this conclusion. He is more
professional on this topic.

@Piotr, Would you please check Dan's question? Please correct me if I'm
wrong.

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink?spm=a2c65.11461447.0.0.786d2323FtzWaR

Best,
JING ZHANG

Dan Hill <quietgol...@gmail.com> 于2021年10月16日周六 上午6:28写道:

> Thanks Thias and JING ZHANG!
>
> Here's a Google drive folder link
> <https://drive.google.com/drive/folders/1chE0i3HyBWgREeRuM2qlGU8sK1jr4rLm?usp=sharing>
> with the execution plan and two screenshots from the job graph.
>
> I'm guessing I violate the "The second operator needs to be single-input
> (i.e. no TwoInputOp nor union() before)" part.
>
> After I do a transformation on a KeyedStream (so it goes back to a
> SingleOutputStreamOperator), even if I do a simple map, it usually
> disallows operator chaining.  Even with reinterpretAsKeyedStream, it
> doesn't work.
>
>
> On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG <beyond1...@gmail.com> wrote:
>
>> Hi Dan,
>> Sorry for tipos,  I meant to provide the code to reproduce the problem.
>> If the current program is complex and secret, maybe you could try to
>> simplify the code.
>> Besides, Matthias's guess is very reasonable. Could you please whether is
>> there network shuffle between your two operators. Were those two
>> operators chained into one vertex?
>>
>> Best,
>> JING ZHANG
>>
>> Schwalbe Matthias <matthias.schwa...@viseca.ch> 于2021年10月15日周五 下午2:57写道:
>>
>>> … didn’t mean to hit the send button so soon 😊
>>>
>>>
>>>
>>> I guess we are getting closer to a solution
>>>
>>>
>>>
>>>
>>>
>>> Thias
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *From:* Schwalbe Matthias
>>> *Sent:* Freitag, 15. Oktober 2021 08:49
>>> *To:* 'Dan Hill' <quietgol...@gmail.com>; user <user@flink.apache.org>
>>> *Subject:* RE: Any issues with reinterpretAsKeyedStream when scaling
>>> partitions?
>>>
>>>
>>>
>>> Hi Dan again 😊,
>>>
>>>
>>>
>>> I shed a second look … from what I see from your call stack I conclude
>>> that indeed you have a network shuffle between your two operators,
>>>
>>> In which case reinterpretAsKeyedStream wouldn’t work
>>>
>>>
>>>
>>> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277
>>> indicates that the two operators are not chained)
>>>
>>>
>>>
>>>
>>>
>>> … just as a double-check could you please share both your
>>>
>>>    - Execution plan (call println(env.getExecutionPlan) right before
>>>    your call env.execute) (json), and
>>>    - Your job plan (screenshot from flink dashboard)
>>>
>>>
>>>
>>> There is a number of preconditions before two operators get chained, and
>>> probably one of them fails (see [1]):
>>>
>>>    - The two operators need to allow chaining the resp. other (see [2]
>>>    … chaining strategy)
>>>    - We need a ForwardPartitioner in between
>>>    - We need to be in streaming mode
>>>    - Both operators need the same parallelism
>>>    - Chaining needs to be enabled for the streaming environment
>>>    - The second operator needs to be single-input (i.e. no TwoInputOp
>>>    nor union() before)
>>>
>>>
>>>
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873
>>>
>>> [2]
>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932
>>>
>>>
>>>
>>>
>>>
>>> *From:* Dan Hill <quietgol...@gmail.com>
>>> *Sent:* Donnerstag, 14. Oktober 2021 17:50
>>> *To:* user <user@flink.apache.org>
>>> *Subject:* Any issues with reinterpretAsKeyedStream when scaling
>>> partitions?
>>>
>>>
>>>
>>> I have a job that uses reinterpretAsKeyedStream across a simple map to
>>> avoid a shuffle.  When changing the number of partitions, I'm hitting an
>>> issue with registerEventTimeTimer complaining that "key group from 110 to
>>> 119 does not contain 186".  I'm using Flink v1.12.3.
>>>
>>>
>>>
>>> Any thoughts on this?  I don't know if there is a known issue
>>> with reinterpretAsKeyedStream.
>>>
>>>
>>>
>>> Rough steps:
>>>
>>> 1. I have a raw input stream of View records.  I keyBy the View using
>>> Tuple2<Long, String>(platform_id, log_user_id).
>>>
>>> 2. I do a small transformation of View to a TinyView.  I
>>> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key.
>>> The keys are the same.
>>>
>>> 3. I use the TinyView in a KeyedCoProcessFunction.
>>>
>>>
>>>
>>> When I savepoint and start again with a different number of partitions,
>>> my KeyedCoProcessFunction hits an issue with registerEventTimeTimer and
>>> complains that "key group from 110 to 119 does not contain 186".  I
>>> verified that the key does not change and that we use Tuple2 with
>>> primitives Long and String.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 2021-10-14 08:17:07
>>>
>>> java.lang.IllegalArgumentException: view x insertion issue with
>>> registerEventTimeTimer for key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174),
>>> flat=platform_id: 120
>>>
>>> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174"
>>>
>>> log_timestamp: 1634224329606
>>>
>>> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2"
>>>
>>>
>>>
>>>                 at
>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318)
>>>
>>>                 at
>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59)
>>>
>>>                 at
>>> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36)
>>>
>>>                 at
>>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
>>>
>>>                 at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>>>
>>>                 at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>>>
>>>                 at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>>>
>>>                 at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>>
>>>                 at
>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>
>>>                 at
>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>
>>>                 at
>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>>>
>>>                 at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>>
>>>                 at
>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>
>>>                 at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>>
>>>                 at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>>
>>>                 at
>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>
>>>                 at
>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>
>>>                 at java.lang.Thread.run(Thread.java:748)
>>>
>>> Caused by: java.lang.IllegalArgumentException: key group from 110 to 119
>>> does not contain 186
>>>
>>>                 at
>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>>>
>>>                 at
>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>>>
>>>                 at
>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>>>
>>>                 at
>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>>>
>>>                 at
>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>>>
>>>                 at
>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>>>
>>>                 at
>>> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
>>>
>>>                 at
>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315)
>>>
>>>                 ... 17 more
>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>> dieser Informationen ist streng verboten.
>>>
>>> This message is intended only for the named recipient and may contain
>>> confidential or privileged information. As the confidentiality of email
>>> communication cannot be guaranteed, we do not accept any responsibility for
>>> the confidentiality and the intactness of this message. If you have
>>> received it in error, please advise the sender by return e-mail and delete
>>> this message and any attachments. Any unauthorised use or dissemination of
>>> this information is strictly prohibited.
>>>
>>

Reply via email to