Yordan Pavlov created FLINK-31970:
-------------------------------------

             Summary: "Key group 0 is not in KeyGroupRange" when using 
CheckpointedFunction
                 Key: FLINK-31970
                 URL: https://issues.apache.org/jira/browse/FLINK-31970
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.17.0
            Reporter: Yordan Pavlov
         Attachments: fill-topic.sh, main.scala
I am experiencing a problem where the following exception would be thrown on 
Flink stop (stop with savepoint):

 
{code:java}
org.apache.flink.util.SerializedThrowable: java.lang.IllegalArgumentException: 
Key group 0 is not in KeyGroupRange{startKeyGroup=86, endKeyGroup=127}.{code}
 

I do not have a non deterministic keyBy() operator in fact, I use 
{code:java}
.keyBy(_ => 1){code}
I believe the problem is related to using RocksDB state along with a 
{code:java}
CheckpointedFunction{code}
In my test program I have commented out a reduction of the parallelism which 
would make the problem go away. I am attaching a standalone program which 
presents the problem and also a script which generates the input data. For 
clarity I would paste here the essence of the job:

 

 
{code:scala}
env.fromSource(kafkaSource, watermarkStrategy, "KafkaSource")
.setParallelism(3)
.keyBy(_ => 1)
.window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.MILLISECONDS)))
.apply(new TestWindow())
/* .setParallelism(1) this would prevent the problem */
.uid("window tester")
.name("window tester")
.print()

class TestWindow() extends WindowFunction[(Long, Int), Long, Int, TimeWindow] 
with CheckpointedFunction {
  var state: ValueState[Long] = _
  var count = 0

  override def snapshotState(functionSnapshotContext: FunctionSnapshotContext): 
Unit = {
    state.update(count)
  }

  override def initializeState(context: FunctionInitializationContext): Unit = {
    val storeDescriptor = new 
ValueStateDescriptor[Long]("state-xrp-dex-pricer", createTypeInformation[Long])

    state = context.getKeyedStateStore.getState(storeDescriptor)
  }

  override def apply(key: Int, window: TimeWindow, input: Iterable[(Long, 
Int)], out: Collector[Long]): Unit = {
    count += input.size
    out.collect(count)
  }
}{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to