Yangze Guo created FLINK-38558:
----------------------------------

             Summary: Checkpoint rescale and restore fails with RocksDB Heap 
timer
                 Key: FLINK-38558
                 URL: https://issues.apache.org/jira/browse/FLINK-38558
             Project: Flink
          Issue Type: Bug
          Components: Runtime / State Backends
    Affects Versions: 1.20.3
            Reporter: Yangze Guo


In Flink 1.20, when using the RocksDB state backend with timer service 
configured as Heap (state.backend.rocksdb.timer-service.factory: Heap), 
rescaling a job from a checkpoint fails during restore, throwing the following 
exception:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:332)
...
Caused by: java.lang.IllegalArgumentException: KeyGroupRange

{startKeyGroup=0, endKeyGroup=31}

does not contain key group 32
at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:160)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:160)
at 
org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:149)
...

And restoring from a savepoint works fine with the exact same configuration.

Root Cause Analysis:

1. When state.backend.rocksdb.timer-service.factory=heap, timers in checkpoints 
are snapshotted into RawKeyedState. During restore, these timers are added to 
HeapPriorityQueueSet, which validates against the current key group range. 
However, timers are not pruned during rescaling, leading to a mismatch (e.g., a 
timer belonging to key group 32 when the current range is 0-31), triggering the 
IllegalArgumentException.
2. The reason savepoints work is due to FLINK-21344, which stops serializing 
heap timers in the RocksDB state backend during savepoint creation. This avoids 
the key group validation issue during savepoint restore.

We could use the folloing code to reproduce:
1. Firstly, waiting the following job (parallelism = 2) complete checkpoint
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
config.set(AUTO_WATERMARK_INTERVAL, Duration.ofMillis(200));
config.set(
CheckpointingOptions.CHECKPOINTS_DIRECTORY, "xxxx");
config.set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL, 
Duration.ofMillis(10_000));
config.set(ExecutionCheckpointingOptions.EXTERNALIZED_CHECKPOINT, 
RETAIN_ON_CANCELLATION);
config.setString("state.backend.rocksdb.timer-service.factory", "HEAP");

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.disableOperatorChaining();
env.setParallelism(2);

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

DataStreamSource<Row> source =
env.addSource(
new SourceFunction<Row>() {
private boolean running = true;
private int ts = 1000;
private Random rnd = new Random();

@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (running)

{ Thread.sleep(1); ts += rnd.nextInt(1); ctx.collect( Row.ofKind( 
RowKind.INSERT, Instant.ofEpochMilli(ts), "USD", rnd.nextInt(300))); 
ctx.collect( Row.ofKind( RowKind.INSERT, Instant.ofEpochMilli(ts), "CHD", 
rnd.nextInt(200))); ctx.collect( Row.ofKind( RowKind.INSERT, 
Instant.ofEpochMilli(ts), "BST", rnd.nextInt(200))); }

}

@Override
public void cancel()

{ running = false; }

},
TypeExtractor.getForObject(
Row.ofKind(RowKind.INSERT, Instant.ofEpochMilli(0), "USD", 1)));
// Create a table from change log stream
Table rateTable =
tEnv.fromDataStream(
source,
Schema.newBuilder()
.column("f0", DataTypes.TIMESTAMP_LTZ(3))
.column("f1", DataTypes.STRING().notNull())
.column("f2", DataTypes.INT().notNull())
.watermark("f0", "f0 - INTERVAL '2' SECONDS")
.primaryKey("f1")
.build())
.as("ts", "product", "amount");

// Register the table as a view, it will be accessible under a name
tEnv.createTemporaryView("source", rateTable);

String query =
"SELECT\n"
+ " CAST(window_start AS STRING) as window_start,\n"
+ " CAST(window_end AS STRING) as window_end,\n"
+ " sum(amount) as pv,\n"
+ " count(1) AS uv,\n"
+ " `product`\n"
+ "FROM\n"
+ " TABLE(\n"
+ " CUMULATE(\n"
+ " TABLE source,\n"
+ " DESCRIPTOR(ts),\n"
+ " INTERVAL '10' MINUTES,\n"
+ " INTERVAL '60' MINUTES\n"
+ " )\n"
+ " )\n"
+ "GROUP BY\n"
+ " window_start,\n"
+ " window_end,\n"
+ " product;";
tEnv.executeSql(query);

2. Secondly, restore from cp with new parallelism (p = 4)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to