Hi Hill, Would you please give more detail about "When I savepoint and start again with a different number of partitions" ? Do you change max_parallellism or Partitioner strategy? Besides, does this problem always happen, or does it happen occasionally when you restore from the savepoint? Would you please provide the code to reproduce the code?
Best, JING ZHANG Dan Hill <quietgol...@gmail.com> 于2021年10月14日周四 下午11:51写道: > I have a job that uses reinterpretAsKeyedStream across a simple map to > avoid a shuffle. When changing the number of partitions, I'm hitting an > issue with registerEventTimeTimer complaining that "key group from 110 to > 119 does not contain 186". I'm using Flink v1.12.3. > > Any thoughts on this? I don't know if there is a known issue > with reinterpretAsKeyedStream. > > Rough steps: > 1. I have a raw input stream of View records. I keyBy the View using > Tuple2<Long, String>(platform_id, log_user_id). > 2. I do a small transformation of View to a TinyView. I > reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key. > The keys are the same. > 3. I use the TinyView in a KeyedCoProcessFunction. > > When I savepoint and start again with a different number of partitions, my > KeyedCoProcessFunction hits an issue with registerEventTimeTimer and > complains that "key group from 110 to 119 does not contain 186". I > verified that the key does not change and that we use Tuple2 with > primitives Long and String. > > > > 2021-10-14 08:17:07 java.lang.IllegalArgumentException: view x insertion > issue with registerEventTimeTimer for key=(120, > 3bfd5b19-9d86-4455-a5a1-480f8596a174), flat=platform_id: 120 log_user_id: > "3bfd5b19-9d86-4455-a5a1-480f8596a174" log_timestamp: 1634224329606 > view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2" at > ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318) > at > ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59) > at > ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36) > at > org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) > at > org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) > at > org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at > org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at > java.lang.Thread.run(Thread.java:748) Caused by: > java.lang.IllegalArgumentException: key group from 110 to 119 does not > contain 186 at > org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179) > at > org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114) > at > org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233) > at > org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52) > at > ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315) > ... 17 more >