… didn’t mean to hit the send button so soon 😊 I guess we are getting closer to a solution
Thias From: Schwalbe Matthias Sent: Freitag, 15. Oktober 2021 08:49 To: 'Dan Hill' <quietgol...@gmail.com>; user <user@flink.apache.org> Subject: RE: Any issues with reinterpretAsKeyedStream when scaling partitions? Hi Dan again 😊, I shed a second look … from what I see from your call stack I conclude that indeed you have a network shuffle between your two operators, In which case reinterpretAsKeyedStream wouldn’t work ($StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277 indicates that the two operators are not chained) … just as a double-check could you please share both your * Execution plan (call println(env.getExecutionPlan) right before your call env.execute) (json), and * Your job plan (screenshot from flink dashboard) There is a number of preconditions before two operators get chained, and probably one of them fails (see [1]): * The two operators need to allow chaining the resp. other (see [2] … chaining strategy) * We need a ForwardPartitioner in between * We need to be in streaming mode * Both operators need the same parallelism * Chaining needs to be enabled for the streaming environment * The second operator needs to be single-input (i.e. no TwoInputOp nor union() before) [1] https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L861-L873 [2] https://github.com/apache/flink/blob/2dabdd95c15ccae2a97a0e898d1acfc958a0f7f3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java#L903-L932 From: Dan Hill <quietgol...@gmail.com<mailto:quietgol...@gmail.com>> Sent: Donnerstag, 14. Oktober 2021 17:50 To: user <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Any issues with reinterpretAsKeyedStream when scaling partitions? I have a job that uses reinterpretAsKeyedStream across a simple map to avoid a shuffle. When changing the number of partitions, I'm hitting an issue with registerEventTimeTimer complaining that "key group from 110 to 119 does not contain 186". I'm using Flink v1.12.3. Any thoughts on this? I don't know if there is a known issue with reinterpretAsKeyedStream. Rough steps: 1. I have a raw input stream of View records. I keyBy the View using Tuple2<Long, String>(platform_id, log_user_id). 2. I do a small transformation of View to a TinyView. I reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key. The keys are the same. 3. I use the TinyView in a KeyedCoProcessFunction. When I savepoint and start again with a different number of partitions, my KeyedCoProcessFunction hits an issue with registerEventTimeTimer and complains that "key group from 110 to 119 does not contain 186". I verified that the key does not change and that we use Tuple2 with primitives Long and String. 2021-10-14 08:17:07 java.lang.IllegalArgumentException: view x insertion issue with registerEventTimeTimer for key=(120,3bfd5b19-9d86-4455-a5a1-480f8596a174), flat=platform_id: 120 log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174" log_timestamp: 1634224329606 view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2" at ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318) at ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59) at ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36) at org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.IllegalArgumentException: key group from 110 to 119 does not contain 186 at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179) at org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233) at org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52) at ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315) ... 17 more Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.