Yordan Pavlov created FLINK-31970: ------------------------------------- Summary: "Key group 0 is not in KeyGroupRange" when using CheckpointedFunction Key: FLINK-31970 URL: https://issues.apache.org/jira/browse/FLINK-31970 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.17.0 Reporter: Yordan Pavlov Attachments: fill-topic.sh, main.scala
I am experiencing a problem where the following exception would be thrown on Flink stop (stop with savepoint): {code:java} org.apache.flink.util.SerializedThrowable: java.lang.IllegalArgumentException: Key group 0 is not in KeyGroupRange{startKeyGroup=86, endKeyGroup=127}.{code} I do not have a non deterministic keyBy() operator in fact, I use {code:java} .keyBy(_ => 1){code} I believe the problem is related to using RocksDB state along with a {code:java} CheckpointedFunction{code} In my test program I have commented out a reduction of the parallelism which would make the problem go away. I am attaching a standalone program which presents the problem and also a script which generates the input data. For clarity I would paste here the essence of the job: {code:scala} env.fromSource(kafkaSource, watermarkStrategy, "KafkaSource") .setParallelism(3) .keyBy(_ => 1) .window(TumblingEventTimeWindows.of(Time.of(1, TimeUnit.MILLISECONDS))) .apply(new TestWindow()) /* .setParallelism(1) this would prevent the problem */ .uid("window tester") .name("window tester") .print() class TestWindow() extends WindowFunction[(Long, Int), Long, Int, TimeWindow] with CheckpointedFunction { var state: ValueState[Long] = _ var count = 0 override def snapshotState(functionSnapshotContext: FunctionSnapshotContext): Unit = { state.update(count) } override def initializeState(context: FunctionInitializationContext): Unit = { val storeDescriptor = new ValueStateDescriptor[Long]("state-xrp-dex-pricer", createTypeInformation[Long]) state = context.getKeyedStateStore.getState(storeDescriptor) } override def apply(key: Int, window: TimeWindow, input: Iterable[(Long, Int)], out: Collector[Long]): Unit = { count += input.size out.collect(count) } }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)