I have a job that uses reinterpretAsKeyedStream across a simple map to
avoid a shuffle.  When changing the number of partitions, I'm hitting an
issue with registerEventTimeTimer complaining that "key group from 110 to
119 does not contain 186".  I'm using Flink v1.12.3.

Any thoughts on this?  I don't know if there is a known issue
with reinterpretAsKeyedStream.

Rough steps:
1. I have a raw input stream of View records.  I keyBy the View using
Tuple2<Long, String>(platform_id, log_user_id).
2. I do a small transformation of View to a TinyView.  I
reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key.
The keys are the same.
3. I use the TinyView in a KeyedCoProcessFunction.

When I savepoint and start again with a different number of partitions, my
KeyedCoProcessFunction hits an issue with registerEventTimeTimer and
complains that "key group from 110 to 119 does not contain 186". I verified
that the key does not change and that we use Tuple2 with primitives Long
and String.



2021-10-14 08:17:07 java.lang.IllegalArgumentException: view x insertion
issue with registerEventTimeTimer for key=(120,
3bfd5b19-9d86-4455-a5a1-480f8596a174), flat=platform_id: 120 log_user_id: "
3bfd5b19-9d86-4455-a5a1-480f8596a174" log_timestamp: 1634224329606 view_id:
"8fcdf922-7c79-4902-9778-3f20f39b0bc2" at
ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318)
at
ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59)
at
ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36)
at
org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at
org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at
java.lang.Thread.run(Thread.java:748) Caused by:
java.lang.IllegalArgumentException: key group from 110 to 119 does not
contain 186 at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
at
org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
at
ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315)
... 17 more

Reply via email to