Thanks Thias and JING ZHANG!

Here's a Google drive folder link
<https://drive.google.com/drive/folders/1chE0i3HyBWgREeRuM2qlGU8sK1jr4rLm?usp=sharing>
with the execution plan and two screenshots from the job graph.

I'm guessing I violate the "The second operator needs to be single-input
(i.e. no TwoInputOp nor union() before)" part.

After I do a transformation on a KeyedStream (so it goes back to a
SingleOutputStreamOperator), even if I do a simple map, it usually
disallows operator chaining.  Even with reinterpretAsKeyedStream, it
doesn't work.


On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG <beyond1...@gmail.com> wrote:

> Hi Dan,
> Sorry for tipos,  I meant to provide the code to reproduce the problem. If
> the current program is complex and secret, maybe you could try to simplify
> the code.
> Besides, Matthias's guess is very reasonable. Could you please whether is
> there network shuffle between your two operators. Were those two
> operators chained into one vertex?
>
> Best,
> JING ZHANG
>
> Schwalbe Matthias <matthias.schwa...@viseca.ch> 于2021年10月15日周五 下午2:57写道:
>
>> … didn’t mean to hit the send button so soon 😊
>>
>>
>>
>> I guess we are getting closer to a solution
>>
>>
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>>
>>
>> *From:* Schwalbe Matthias
>> *Sent:* Freitag, 15. Oktober 2021 08:49
>> *To:* 'Dan Hill' <quietgol...@gmail.com>; user <user@flink.apache.org>
>> *Subject:* RE: Any issues with reinterpretAsKeyedStream when scaling
>> partitions?
>>
>>
>>
>> Hi Dan again 😊,
>>
>>
>>
>> I shed a second look … from what I see from your call stack I conclude
>> that indeed you have a network shuffle between your two operators,
>>
>> In which case reinterpretAsKeyedStream wouldn’t work
>>
>>
>>
>> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277
>> indicates that the two operators are not chained)
>>
>>
>>
>>
>>
>> … just as a double-check could you please share both your
>>
>>    - Execution plan (call println(env.getExecutionPlan) right before
>>    your call env.execute) (json), and
>>    - Your job plan (screenshot from flink dashboard)
>>
>>
>>
>> There is a number of preconditions before two operators get chained, and
>> probably one of them fails (see [1]):
>>
>>    - The two operators need to allow chaining the resp. other (see [2] …
>>    chaining strategy)
>>    - We need a ForwardPartitioner in between
>>    - We need to be in streaming mode
>>    - Both operators need the same parallelism
>>    - Chaining needs to be enabled for the streaming environment
>>    - The second operator needs to be single-input (i.e. no TwoInputOp
>>    nor union() before)
>>
>>
>>
>>
>>
>> [1]
>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873
>>
>> [2]
>> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932
>>
>>
>>
>>
>>
>> *From:* Dan Hill <quietgol...@gmail.com>
>> *Sent:* Donnerstag, 14. Oktober 2021 17:50
>> *To:* user <user@flink.apache.org>
>> *Subject:* Any issues with reinterpretAsKeyedStream when scaling
>> partitions?
>>
>>
>>
>> I have a job that uses reinterpretAsKeyedStream across a simple map to
>> avoid a shuffle.  When changing the number of partitions, I'm hitting an
>> issue with registerEventTimeTimer complaining that "key group from 110 to
>> 119 does not contain 186".  I'm using Flink v1.12.3.
>>
>>
>>
>> Any thoughts on this?  I don't know if there is a known issue
>> with reinterpretAsKeyedStream.
>>
>>
>>
>> Rough steps:
>>
>> 1. I have a raw input stream of View records.  I keyBy the View using
>> Tuple2<Long, String>(platform_id, log_user_id).
>>
>> 2. I do a small transformation of View to a TinyView.  I
>> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key.
>> The keys are the same.
>>
>> 3. I use the TinyView in a KeyedCoProcessFunction.
>>
>>
>>
>> When I savepoint and start again with a different number of partitions,
>> my KeyedCoProcessFunction hits an issue with registerEventTimeTimer and
>> complains that "key group from 110 to 119 does not contain 186".  I
>> verified that the key does not change and that we use Tuple2 with
>> primitives Long and String.
>>
>>
>>
>>
>>
>>
>>
>> 2021-10-14 08:17:07
>>
>> java.lang.IllegalArgumentException: view x insertion issue with
>> registerEventTimeTimer for key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174),
>> flat=platform_id: 120
>>
>> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174"
>>
>> log_timestamp: 1634224329606
>>
>> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2"
>>
>>
>>
>>                 at
>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318)
>>
>>                 at
>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59)
>>
>>                 at
>> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36)
>>
>>                 at
>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
>>
>>                 at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>>
>>                 at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>>
>>                 at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>>
>>                 at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>>
>>                 at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>>
>>                 at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>>
>>                 at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>>
>>                 at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>>
>>                 at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>>
>>                 at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>>
>>                 at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>>
>>                 at
>> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
>>
>>                 at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
>>
>>                 at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: java.lang.IllegalArgumentException: key group from 110 to 119
>> does not contain 186
>>
>>                 at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>>
>>                 at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>>
>>                 at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>>
>>                 at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>>
>>                 at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>>
>>                 at
>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>>
>>                 at
>> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
>>
>>                 at
>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315)
>>
>>                 ... 17 more
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>

Reply via email to