Hi JING ZHANG! I changed StreamExecutionEnvironment.setParallelism(...).
If I savepoint and start with the same parallelism, then it's fine. I hit the error with the parallelism values that I've tried. The failing rows change depending on the checkpoint that I use. Is there a good job template that I can use to build a reproduction? It's too hard to build a reproduction from my current setup. On Thu, Oct 14, 2021 at 8:07 PM JING ZHANG <beyond1...@gmail.com> wrote: > Hi Hill, > Would you please give more detail about "When I savepoint and start again > with a different number of partitions" ? > Do you change max_parallellism or Partitioner strategy? > Besides, does this problem always happen, or does it happen occasionally > when you restore from the savepoint? > Would you please provide the code to reproduce the code? > > Best, > JING ZHANG > > Dan Hill <quietgol...@gmail.com> 于2021年10月14日周四 下午11:51写道: > >> I have a job that uses reinterpretAsKeyedStream across a simple map to >> avoid a shuffle. When changing the number of partitions, I'm hitting an >> issue with registerEventTimeTimer complaining that "key group from 110 >> to 119 does not contain 186". I'm using Flink v1.12.3. >> >> Any thoughts on this? I don't know if there is a known issue >> with reinterpretAsKeyedStream. >> >> Rough steps: >> 1. I have a raw input stream of View records. I keyBy the View using >> Tuple2<Long, String>(platform_id, log_user_id). >> 2. I do a small transformation of View to a TinyView. I >> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key. >> The keys are the same. >> 3. I use the TinyView in a KeyedCoProcessFunction. >> >> When I savepoint and start again with a different number of partitions, >> my KeyedCoProcessFunction hits an issue with registerEventTimeTimer and >> complains that "key group from 110 to 119 does not contain 186". I >> verified that the key does not change and that we use Tuple2 with >> primitives Long and String. >> >> >> >> 2021-10-14 08:17:07 java.lang.IllegalArgumentException: view x insertion >> issue with registerEventTimeTimer for key=(120, >> 3bfd5b19-9d86-4455-a5a1-480f8596a174), flat=platform_id: 120 >> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174" log_timestamp: >> 1634224329606 view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2" at >> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318) >> at >> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59) >> at >> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36) >> at >> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78) >> at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199) >> at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164) >> at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277) >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204) >> at >> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >> at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at >> java.lang.Thread.run(Thread.java:748) Caused by: >> java.lang.IllegalArgumentException: key group from 110 to 119 does not >> contain 186 at >> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160) >> at >> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191) >> at >> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186) >> at >> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179) >> at >> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114) >> at >> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233) >> at >> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52) >> at >> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315) >> ... 17 more >> >