Hi JING ZHANG!

I changed StreamExecutionEnvironment.setParallelism(...).

If I savepoint and start with the same parallelism, then it's fine.  I hit
the error with the parallelism values that I've tried.  The failing rows
change depending on the checkpoint that I use.

Is there a good job template that I can use to build a reproduction?  It's
too hard to build a reproduction from my current setup.




On Thu, Oct 14, 2021 at 8:07 PM JING ZHANG <beyond1...@gmail.com> wrote:

> Hi Hill,
> Would you please give more detail about "When I savepoint and start again
> with a different number of partitions" ?
> Do you change max_parallellism or Partitioner strategy?
> Besides, does this problem always happen, or does it happen occasionally
> when you restore from the savepoint?
> Would you please provide the code to reproduce the code?
>
> Best,
> JING ZHANG
>
> Dan Hill <quietgol...@gmail.com> 于2021年10月14日周四 下午11:51写道:
>
>> I have a job that uses reinterpretAsKeyedStream across a simple map to
>> avoid a shuffle.  When changing the number of partitions, I'm hitting an
>> issue with registerEventTimeTimer complaining that "key group from 110
>> to 119 does not contain 186".  I'm using Flink v1.12.3.
>>
>> Any thoughts on this?  I don't know if there is a known issue
>> with reinterpretAsKeyedStream.
>>
>> Rough steps:
>> 1. I have a raw input stream of View records.  I keyBy the View using
>> Tuple2<Long, String>(platform_id, log_user_id).
>> 2. I do a small transformation of View to a TinyView.  I
>> reinterpretAsKeyedStream the TinyView as a KeyedStream with the same key.
>> The keys are the same.
>> 3. I use the TinyView in a KeyedCoProcessFunction.
>>
>> When I savepoint and start again with a different number of partitions,
>> my KeyedCoProcessFunction hits an issue with registerEventTimeTimer and
>> complains that "key group from 110 to 119 does not contain 186". I
>> verified that the key does not change and that we use Tuple2 with
>> primitives Long and String.
>>
>>
>>
>> 2021-10-14 08:17:07 java.lang.IllegalArgumentException: view x insertion
>> issue with registerEventTimeTimer for key=(120,
>> 3bfd5b19-9d86-4455-a5a1-480f8596a174), flat=platform_id: 120
>> log_user_id: "3bfd5b19-9d86-4455-a5a1-480f8596a174" log_timestamp:
>> 1634224329606 view_id: "8fcdf922-7c79-4902-9778-3f20f39b0bc2" at
>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:318)
>> at
>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:59)
>> at
>> ai.promoted.metrics.logprocessor.common.functions.LogSlowOnTimer.processElement1(LogSlowOnTimer.java:36)
>> at
>> org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator.processElement1(KeyedCoProcessOperator.java:78)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:199)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:164)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:277)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
>> at
>> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>> at
>> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:95)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:396)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:191)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:617)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:581)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) at
>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) at
>> java.lang.Thread.run(Thread.java:748) Caused by:
>> java.lang.IllegalArgumentException: key group from 110 to 119 does not
>> contain 186 at
>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:191)
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:186)
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:179)
>> at
>> org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:114)
>> at
>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:233)
>> at
>> org.apache.flink.streaming.api.SimpleTimerService.registerEventTimeTimer(SimpleTimerService.java:52)
>> at
>> ai.promoted.metrics.logprocessor.common.functions.inferred.BaseInferred.processElement1(BaseInferred.java:315)
>> ... 17 more
>>
>

Reply via email to