Thanks Thias and JING ZHANG! Here's a Google drive folder link <https://drive.google.com/drive/folders/1chE0i3HyBWgREeRuM2qlGU8sK1jr4rLm?usp=sharing> with the execution plan and two screenshots from the job graph.
I'm guessing I violate the "The second operator needs to be single-input (i.e. no TwoInputOp nor union() before)" part. After I do a transformation on a KeyedStream (so it goes back to a SingleOutputStreamOperator), even if I do a simple map, it usually disallows operator chaining. Even with reinterpretAsKeyedStream, it doesn't work. On Fri, Oct 15, 2021 at 12:34 AM JING ZHANG <beyond1...@gmail.com> wrote: > Hi Dan, > Sorry for tipos, I meant to provide the code to reproduce the problem. If > the current program is complex and secret, maybe you could try to simplify > the code. > Besides, Matthias's guess is very reasonable. Could you please whether is > there network shuffle between your two operators. Were those two > operators chained into one vertex? > > Best, > JING ZHANG > > Schwalbe Matthias <matthias.schwa...@viseca.ch> 于2021年10月15日周五 下午2:57写道: > >> … didn’t mean to hit the send button so soon 😊 >> >> >> >> I guess we are getting closer to a solution >> >> >> >> >> >> Thias >> >> >> >> >> >> >> >> *From:* Schwalbe Matthias >> *Sent:* Freitag, 15. Oktober 2021 08:49 >> *To:* 'Dan Hill' <quietgol...@gmail.com>; user <user@flink.apache.org> >> *Subject:* RE: Any issues with reinterpretAsKeyedStream when scaling >> partitions? >> >> >> >> Hi Dan again 😊, >> >> >> >> I shed a second look … from what I see from your call stack I conclude >> that indeed you have a network shuffle between your two operators, >> >> In which case reinterpretAsKeyedStream wouldn’t work >> >> >> >> ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277 >> indicates that the two operators are not chained) >> >> >> >> >> >> … just as a double-check could you please share both your >> >> - Execution plan (call println(env.getExecutionPlan) right before >> your call env.execute) (json), and >> - Your job plan (screenshot from flink dashboard) >> >> >> >> There is a number of preconditions before two operators get chained, and >> probably one of them fails (see [1]): >> >> - The two operators need to allow chaining the resp. other (see [2] … >> chaining strategy) >> - We need a ForwardPartitioner in between >> - We need to be in streaming mode >> - Both operators need the same parallelism >> - Chaining needs to be enabled for the streaming environment >> - The second operator needs to be single-input (i.e. no TwoInputOp >> nor union() before) >> >> >> >> >> >> [1] >> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873 >> >> [2] >> https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932 >> >> >> >> >> >> *From:* Dan Hill <quietgol...@gmail.com> >> *Sent:* Donnerstag, 14. Oktober 2021 17:50 >> *To:* user <user@flink.apache.org> >> *Subject:* Any issues with reinterpretAsKeyedStream when scaling >> partitions? >> >> >> >> I have a job that uses reinterpretAsKeyedStream across a simple map to >> avoid a shuffle. When changing the number of partitions, I'm hitting an >> issue with registerEventTimeTimer complaining that "key group from 110 to >> 119 does not contain 186". I'm using Flink v1.12.3. >> >> >> >> Any thoughts on this? I don't know if there is a known issue >> with reinterpretAsKeyedStream. >> >> >> >> Rough steps: >> >> 1. I have a raw input stream of View records. I keyBy the View using >> Tuple2<Long, String>(platform_id, log_user_id). >> >> 2. I do a small transformation of View to a TinyView. I >> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key. >> The keys are the same. >> >> 3. I use the TinyView in a KeyedCoProcessFunction. >> >> >> >> When I savepoint and start again with a different number of partitions, >> my KeyedCoProcessFunction hits an issue with registerEventTimeTimer and >> complains that "key group from 110 to 119 does not contain 186". I >> verified that the key does not change and that we use Tuple2 with >> primitives Long and String. >> >> >> >> >> >> >> >> 2021-10-14 08:17:07 >> >> java.lang.IllegalArgumentException: view x insertion issue with >> registerEventTimeTimer for key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174), >> flat=platform_id: 120 >> >> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174" >> >> log_timestamp: 1634224329606 >> >> view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2" >> >> >> >> at >> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318) >> >> at >> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59) >> >> at >> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36) >> >> at >> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78) >> >> at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199) >> >> at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164) >> >> at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277) >> >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) >> >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) >> >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >> >> at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) >> >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) >> >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) >> >> at >> org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >> >> at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >> >> at java.lang.Thread.run(Thread.java:748) >> >> Caused by: java.lang.IllegalArgumentException: key group from 110 to 119 >> does not contain 186 >> >> at >> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) >> >> at >> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191) >> >> at >> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186) >> >> at >> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179) >> >> at >> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114) >> >> at >> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233) >> >> at >> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52) >> >> at >> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315) >> >> ... 17 more >> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >> dieser Informationen ist streng verboten. >> >> This message is intended only for the named recipient and may contain >> confidential or privileged information. As the confidentiality of email >> communication cannot be guaranteed, we do not accept any responsibility for >> the confidentiality and the intactness of this message. If you have >> received it in error, please advise the sender by return e-mail and delete >> this message and any attachments. Any unauthorised use or dissemination of >> this information is strictly prohibited. >> >