Yangze Guo created FLINK-38558:
----------------------------------
Summary: Checkpoint rescale and restore fails with RocksDB Heap
timer
Key: FLINK-38558
URL: https://issues.apache.org/jira/browse/FLINK-38558
Project: Flink
Issue Type: Bug
Components: Runtime / State Backends
Affects Versions: 1.20.3
Reporter: Yangze Guo
In Flink 1.20, when using the RocksDB state backend with timer service
configured as Heap (state.backend.rocksdb.timer-service.factory: Heap),
rescaling a job from a checkpoint fails during restore, throwing the following
exception:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:332)
...
Caused by: java.lang.IllegalArgumentException: KeyGroupRange
{startKeyGroup=0, endKeyGroup=31}
does not contain key group 32
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:160)
at
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:149)
...
And restoring from a savepoint works fine with the exact same configuration.
Root Cause Analysis:
1. When state.backend.rocksdb.timer-service.factory=heap, timers in checkpoints
are snapshotted into RawKeyedState. During restore, these timers are added to
HeapPriorityQueueSet, which validates against the current key group range.
However, timers are not pruned during rescaling, leading to a mismatch (e.g., a
timer belonging to key group 32 when the current range is 0-31), triggering the
IllegalArgumentException.
2. The reason savepoints work is due to FLINK-21344, which stops serializing
heap timers in the RocksDB state backend during savepoint creation. This avoids
the key group validation issue during savepoint restore.
We could use the folloing code to reproduce:
1. Firstly, waiting the following job (parallelism = 2) complete checkpoint
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
config.set(AUTO_WATERMARK_INTERVAL, Duration.ofMillis(200));
config.set(
CheckpointingOptions.CHECKPOINTS_DIRECTORY, "xxxx");
config.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofMillis(10_000));
config.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT,
RETAIN_ON_CANCELLATION);
config.setString("state.backend.rocksdb.timer-service.factory", "HEAP");
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.disableOperatorChaining();
env.setParallelism(2);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStreamSource<Row> source =
env.addSource(
new SourceFunction<Row>() {
private boolean running = true;
private int ts = 1000;
private Random rnd = new Random();
@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (running)
{ Thread.sleep(1); ts += rnd.nextInt(1); ctx.collect( Row.ofKind(
RowKind.INSERT, Instant.ofEpochMilli(ts), "USD", rnd.nextInt(300)));
ctx.collect( Row.ofKind( RowKind.INSERT, Instant.ofEpochMilli(ts), "CHD",
rnd.nextInt(200))); ctx.collect( Row.ofKind( RowKind.INSERT,
Instant.ofEpochMilli(ts), "BST", rnd.nextInt(200))); }
}
@Override
public void cancel()
{ running = false; }
},
TypeExtractor.getForObject(
Row.ofKind(RowKind.INSERT, Instant.ofEpochMilli(0), "USD", 1)));
// Create a table from change log stream
Table rateTable =
tEnv.fromDataStream(
source,
Schema.newBuilder()
.column("f0", DataTypes.TIMESTAMP_LTZ(3))
.column("f1", DataTypes.STRING().notNull())
.column("f2", DataTypes.INT().notNull())
.watermark("f0", "f0 - INTERVAL '2' SECONDS")
.primaryKey("f1")
.build())
.as("ts", "product", "amount");
// Register the table as a view, it will be accessible under a name
tEnv.createTemporaryView("source", rateTable);
String query =
"SELECT\n"
+ " CAST(window_start AS STRING) as window_start,\n"
+ " CAST(window_end AS STRING) as window_end,\n"
+ " sum(amount) as pv,\n"
+ " count(1) AS uv,\n"
+ " `product`\n"
+ "FROM\n"
+ " TABLE(\n"
+ " CUMULATE(\n"
+ " TABLE source,\n"
+ " DESCRIPTOR(ts),\n"
+ " INTERVAL '10' MINUTES,\n"
+ " INTERVAL '60' MINUTES\n"
+ " )\n"
+ " )\n"
+ "GROUP BY\n"
+ " window_start,\n"
+ " window_end,\n"
+ " product;";
tEnv.executeSql(query);
2. Secondly, restore from cp with new parallelism (p = 4)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)