Michał Fijołek created FLINK-32160:
--------------------------------------
Summary: CompactOperator cannot continue from checkpoint because
of java.util.NoSuchElementException
Key: FLINK-32160
URL: https://issues.apache.org/jira/browse/FLINK-32160
Project: Flink
Issue Type: Bug
Components: Connectors / FileSystem
Affects Versions: 1.17.0, 1.16.0
Environment: Flink 1.17 on k8s (flink-kubernetes-operator v.1.4.0), s3
Reporter: Michał Fijołek
Hello :) We have a flink job (v 1.17) on k8s (using official
flink-k8s-operator) that reads data from kafka and writes it to s3 using
flink-sql using compaction. Job sometimes fails and continues from checkpoint
just fine, but once a couple of days we experience a crash loop. Job cannot
continue from the latest checkpoint and fails with such exception:
{noformat}
java.util.NoSuchElementException at
java.base/java.util.ArrayList$Itr.next(Unknown Source)
at
org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:114)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Unknown Source){noformat}
Here’s the relevant code:
[https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114]
It looks like `CompactOperator` is calling `next()` on iterator without
checking `hasNext()` first - why's that? Is it a bug? Why
`context.getOperatorStateStore().getListState(metaDescriptor)` returns empty
iterator? Is latest checkpoint broken in such case?
We have an identical job, but without compaction, and it works smoothly for a
couple of weeks now.
The whole job is just `select` from kafka and `insert` to s3.
{noformat}
CREATE EXTERNAL TABLE IF NOT EXISTS hive.`foo`.`bar` ( `foo_bar1` STRING,
`foo_bar2` STRING,
`foo_bar3` STRING,
`foo_bar4` STRING
)
PARTITIONED BY (`foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING)
STORED AS parquet
LOCATION 's3a://my/bucket/'
TBLPROPERTIES (
'auto-compaction' = 'true',
'compaction.file-size' = '128MB',
'sink.parallelism' = '8',
'format' = 'parquet',
'parquet.compression' = 'SNAPPY',
'sink.rolling-policy.rollover-interval' = '1 h',
'sink.partition-commit.policy.kind' = 'metastore'
){noformat}
Checkpoint configuration:
{noformat}
Checkpointing Mode Exactly Once
Checkpoint Storage FileSystemCheckpointStorage
State Backend HashMapStateBackend
Interval 20m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 0ms
Maximum Concurrent Checkpoints 1
Unaligned Checkpoints Disabled
Persist Checkpoints Externally Enabled (retain on cancellation)
Tolerable Failed Checkpoints 0
Checkpoints With Finished Tasks Enabled
State Changelog Disabled{noformat}
Is there something wrong with given config or is this some unhandled edge case?
Currently our workaround is to restart a job, without using checkpoint - it
uses a state from kafka which in this case is fine
--
This message was sent by Atlassian Jira
(v8.20.10#820010)