Michał Fijołek created FLINK-32160: --------------------------------------
Summary: CompactOperator cannot continue from checkpoint because of java.util.NoSuchElementException Key: FLINK-32160 URL: https://issues.apache.org/jira/browse/FLINK-32160 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.17.0, 1.16.0 Environment: Flink 1.17 on k8s (flink-kubernetes-operator v.1.4.0), s3 Reporter: Michał Fijołek Hello :) We have a flink job (v 1.17) on k8s (using official flink-k8s-operator) that reads data from kafka and writes it to s3 using flink-sql using compaction. Job sometimes fails and continues from checkpoint just fine, but once a couple of days we experience a crash loop. Job cannot continue from the latest checkpoint and fails with such exception: {noformat} java.util.NoSuchElementException at java.base/java.util.ArrayList$Itr.next(Unknown Source) at org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:114) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.base/java.lang.Thread.run(Unknown Source){noformat} Here’s the relevant code: [https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114] It looks like `CompactOperator` is calling `next()` on iterator without checking `hasNext()` first - why's that? Is it a bug? Why `context.getOperatorStateStore().getListState(metaDescriptor)` returns empty iterator? Is latest checkpoint broken in such case? We have an identical job, but without compaction, and it works smoothly for a couple of weeks now. The whole job is just `select` from kafka and `insert` to s3. {noformat} CREATE EXTERNAL TABLE IF NOT EXISTS hive.`foo`.`bar` ( `foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING, `foo_bar4` STRING ) PARTITIONED BY (`foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING) STORED AS parquet LOCATION 's3a://my/bucket/' TBLPROPERTIES ( 'auto-compaction' = 'true', 'compaction.file-size' = '128MB', 'sink.parallelism' = '8', 'format' = 'parquet', 'parquet.compression' = 'SNAPPY', 'sink.rolling-policy.rollover-interval' = '1 h', 'sink.partition-commit.policy.kind' = 'metastore' ){noformat} Checkpoint configuration: {noformat} Checkpointing Mode Exactly Once Checkpoint Storage FileSystemCheckpointStorage State Backend HashMapStateBackend Interval 20m 0s Timeout 10m 0s Minimum Pause Between Checkpoints 0ms Maximum Concurrent Checkpoints 1 Unaligned Checkpoints Disabled Persist Checkpoints Externally Enabled (retain on cancellation) Tolerable Failed Checkpoints 0 Checkpoints With Finished Tasks Enabled State Changelog Disabled{noformat} Is there something wrong with given config or is this some unhandled edge case? Currently our workaround is to restart a job, without using checkpoint - it uses a state from kafka which in this case is fine -- This message was sent by Atlassian Jira (v8.20.10#820010)