Interesting.  Thanks, JING ZHANG!

On Mon, Oct 18, 2021 at 12:16 AM JING ZHANG <beyond1...@gmail.com> wrote:

> Hi Dan,
> > I'm guessing I violate the "The second operator needs to be single-input
> (i.e. no TwoInputOp nor union() before)" part.
> I think.you are right.
>
> Do you want to remove shuffle of two inputs in your case? If yes, Flink 
> provides
> support for multiple input operators since 1.11 version. I think it might
> satisfy your need. You could find more in [1].
> However, at present, this function does not provide a complete interface
> of dataStream API. If users want to use it, they need to manually create
> multipleInputTransformation and multipleConnectedStreams.
>
>> MultipleInputTransformation<Long> transform = new 
>> MultipleInputTransformation<>(
>>    "My Operator",
>>    new SumAllInputOperatorFactory(),
>>    BasicTypeInfo.LONG_TYPE_INFO,
>>    1);
>>
>> env.addOperator(transform
>>    .addInput(source1.getTransformation())
>>    .addInput(source2.getTransformation())
>>    .addInput(source3.getTransformation()));
>> new MultipleConnectedStreams(env)
>>    .transform(transform)
>>    .addSink(resultSink);
>>
>>
> I would invite @Piotr to double check this conclusion. He is more
> professional on this topic.
>
> @Piotr, Would you please check Dan's question? Please correct me if I'm
> wrong.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A+Add+N-Ary+Stream+Operator+in+Flink?spm=a2c65.11461447.0.0.786d2323FtzWaR
>
> Best,
> JING ZHANG
>
> Dan Hill <quietgol...@gmail.com> 于2021年10月16日周六 上午6:28写道:
>
>> Thanks Thias and JING ZHANG!
>>
>> Here's a Google drive folder link
>> <https://drive.google.com/drive/folders/1chE0i3HyBWgREeRuM2qlGU8sK1jr4rLm?usp=sharing>
>> with the execution plan and two screenshots from the job graph.
>>
>> I'm guessing I violate the "The second operator needs to be single-input
>> (i.e. no TwoInputOp nor union() before)" part.
>>
>> After I do a transformation on a KeyedStream (so it goes back to a
>> SingleOutputStreamOperator), even if I do a simple map, it usually
>> disallows operator chaining.  Even with reinterpretAsKeyedStream, it
>> doesn't work.
>>
>>
>> On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG <beyond1...@gmail.com> wrote:
>>
>>> Hi Dan,
>>> Sorry for tipos,  I meant to provide the code to reproduce the problem.
>>> If the current program is complex and secret, maybe you could try to
>>> simplify the code.
>>> Besides, Matthias's guess is very reasonable. Could you please whether
>>> is there network shuffle between your two operators. Were those two
>>> operators chained into one vertex?
>>>
>>> Best,
>>> JING ZHANG
>>>
>>> Schwalbe Matthias <matthias.schwa...@viseca.ch> 于2021年10月15日周五 下午2:57写道:
>>>
>>>> … didn’t mean to hit the send button so soon 😊
>>>>
>>>>
>>>>
>>>> I guess we are getting closer to a solution
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Thias
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From:* Schwalbe Matthias
>>>> *Sent:* Freitag, 15. Oktober 2021 08:49
>>>> *To:* 'Dan Hill' <quietgol...@gmail.com>; user <user@flink.apache.org>
>>>> *Subject:* RE: Any issues with reinterpretAsKeyedStream when scaling
>>>> partitions?
>>>>
>>>>
>>>>
>>>> Hi Dan again 😊,
>>>>
>>>>
>>>>
>>>> I shed a second look … from what I see from your call stack I conclude
>>>> that indeed you have a network shuffle between your two operators,
>>>>
>>>> In which case reinterpretAsKeyedStream wouldn’t work
>>>>
>>>>
>>>>
>>>> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277
>>>> indicates that the two operators are not chained)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> … just as a double-check could you please share both your
>>>>
>>>>    - Execution plan (call println(env.getExecutionPlan) right before
>>>>    your call env.execute) (json), and
>>>>    - Your job plan (screenshot from flink dashboard)
>>>>
>>>>
>>>>
>>>> There is a number of preconditions before two operators get chained,
>>>> and probably one of them fails (see [1]):
>>>>
>>>>    - The two operators need to allow chaining the resp. other (see [2]
>>>>    … chaining strategy)
>>>>    - We need a ForwardPartitioner in between
>>>>    - We need to be in streaming mode
>>>>    - Both operators need the same parallelism
>>>>    - Chaining needs to be enabled for the streaming environment
>>>>    - The second operator needs to be single-input (i.e. no TwoInputOp
>>>>    nor union() before)
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873
>>>>
>>>> [2]
>>>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *From:* Dan Hill <quietgol...@gmail.com>
>>>> *Sent:* Donnerstag, 14. Oktober 2021 17:50
>>>> *To:* user <user@flink.apache.org>
>>>> *Subject:* Any issues with reinterpretAsKeyedStream when scaling
>>>> partitions?
>>>>
>>>>
>>>>
>>>> I have a job that uses reinterpretAsKeyedStream across a simple map to
>>>> avoid a shuffle.  When changing the number of partitions, I'm hitting an
>>>> issue with registerEventTimeTimer complaining that "key group from 110 to
>>>> 119 does not contain 186".  I'm using Flink v1.12.3.
>>>>
>>>>
>>>>
>>>> Any thoughts on this?  I don't know if there is a known issue
>>>> with reinterpretAsKeyedStream.
>>>>
>>>>
>>>>
>>>> Rough steps:
>>>>
>>>> 1. I have a raw input stream of View records.  I keyBy the View using
>>>> Tuple2<Long, String>(platform_id, log_user_id).
>>>>
>>>> 2. I do a small transformation of View to a TinyView.  I
>>>> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key.
>>>> The keys are the same.
>>>>
>>>> 3. I use the TinyView in a KeyedCoProcessFunction.
>>>>
>>>>
>>>>
>>>> When I savepoint and start again with a different number of partitions,
>>>> my KeyedCoProcessFunction hits an issue with registerEventTimeTimer and
>>>> complains that "key group from 110 to 119 does not contain 186".  I
>>>> verified that the key does not change and that we use Tuple2 with
>>>> primitives Long and String.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> 2021-10-14 08:17:07
>>>>
>>>> java.lang.IllegalArgumentException: view x insertion issue with
>>>> registerEventTimeTimer for key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174),
>>>> flat=platform_id: 120
>>>>
>>>> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174"
>>>>
>>>> log_timestamp: 1634224329606
>>>>
>>>> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2"
>>>>
>>>>
>>>>
>>>>                 at
>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318)
>>>>
>>>>                 at
>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59)
>>>>
>>>>                 at
>>>> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>>>
>>>>                 at
>>>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>>>
>>>>                 at
>>>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>>>
>>>>                 at java.lang.Thread.run(Thread.java:748)
>>>>
>>>> Caused by: java.lang.IllegalArgumentException: key group from 110 to
>>>> 119 does not contain 186
>>>>
>>>>                 at
>>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>>>>
>>>>                 at
>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>>>>
>>>>                 at
>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>>>>
>>>>                 at
>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>>>>
>>>>                 at
>>>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>>>>
>>>>                 at
>>>> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
>>>>
>>>>                 at
>>>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315)
>>>>
>>>>                 ... 17 more
>>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>>> dieser Informationen ist streng verboten.
>>>>
>>>> This message is intended only for the named recipient and may contain
>>>> confidential or privileged information. As the confidentiality of email
>>>> communication cannot be guaranteed, we do not accept any responsibility for
>>>> the confidentiality and the intactness of this message. If you have
>>>> received it in error, please advise the sender by return e-mail and delete
>>>> this message and any attachments. Any unauthorised use or dissemination of
>>>> this information is strictly prohibited.
>>>>
>>>

Reply via email to