Michał Fijołek created FLINK-32160:
--------------------------------------

             Summary: CompactOperator cannot continue from checkpoint because 
of java.util.NoSuchElementException
                 Key: FLINK-32160
                 URL: https://issues.apache.org/jira/browse/FLINK-32160
             Project: Flink
          Issue Type: Bug
          Components: Connectors / FileSystem
    Affects Versions: 1.17.0, 1.16.0
         Environment: Flink 1.17 on k8s (flink-kubernetes-operator v.1.4.0), s3
            Reporter: Michał Fijołek


Hello :) We have a flink job (v 1.17) on k8s (using official 
flink-k8s-operator) that reads data from kafka and writes it to s3 using 
flink-sql using compaction. Job sometimes fails and continues from checkpoint 
just fine, but once a couple of days we experience a crash loop. Job cannot 
continue from the latest checkpoint and fails with such exception:
{noformat}
java.util.NoSuchElementException at 
java.base/java.util.ArrayList$Itr.next(Unknown Source)
 at 
org.apache.flink.connector.file.table.stream.compact.CompactOperator.initializeState(CompactOperator.java:114)
 at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:122)
 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:274)
 at 
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:734)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:709)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:675)
 at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
 at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:921)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
 at java.base/java.lang.Thread.run(Unknown Source){noformat}
Here’s the relevant code: 
[https://github.com/apache/flink/blob/release-1.17/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/table/stream/compact/CompactOperator.java#L114]

It looks like `CompactOperator` is calling `next()` on iterator without 
checking `hasNext()` first - why's that? Is it a bug? Why 
`context.getOperatorStateStore().getListState(metaDescriptor)` returns empty 
iterator? Is latest checkpoint broken in such case? 
We have an identical job, but without compaction, and it works smoothly for a 
couple of weeks now. 

The whole job is just `select` from kafka and `insert` to s3.
{noformat}
CREATE EXTERNAL TABLE IF NOT EXISTS hive.`foo`.`bar` (  `foo_bar1` STRING,
  `foo_bar2` STRING,
  `foo_bar3` STRING,
  `foo_bar4` STRING
  )
  PARTITIONED BY (`foo_bar1` STRING, `foo_bar2` STRING, `foo_bar3` STRING)
  STORED AS parquet
  LOCATION 's3a://my/bucket/'
  TBLPROPERTIES (
    'auto-compaction' = 'true',
    'compaction.file-size' = '128MB',
    'sink.parallelism' = '8',
    'format' = 'parquet',
    'parquet.compression' = 'SNAPPY',
    'sink.rolling-policy.rollover-interval' = '1 h',
    'sink.partition-commit.policy.kind' = 'metastore'
  ){noformat}
Checkpoint configuration:
{noformat}
Checkpointing Mode Exactly Once
Checkpoint Storage FileSystemCheckpointStorage
State Backend HashMapStateBackend
Interval 20m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 0ms
Maximum Concurrent Checkpoints 1
Unaligned Checkpoints Disabled
Persist Checkpoints Externally Enabled (retain on cancellation)
Tolerable Failed Checkpoints 0
Checkpoints With Finished Tasks Enabled
State Changelog Disabled{noformat}

Is there something wrong with given config or is this some unhandled edge case? 

Currently our workaround is to restart a job, without using checkpoint - it 
uses a state from kafka which in this case is fine



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to