Youjun Yuan created FLINK-24187: ----------------------------------- Summary: Could not commit s3 file after JM restart during state initialization Key: FLINK-24187 URL: https://issues.apache.org/jira/browse/FLINK-24187 Project: Flink Issue Type: Bug Components: FileSystems Affects Versions: 1.12.1 Reporter: Youjun Yuan
we have a SQL job which consumes from Kafka, and write hive table, data stored in S3. One day the zookeeper leader failed over, caused Flink job restart. However the job got stuck during state restore, with the following error: {code:java} java.io.IOException: Could not commit file from s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/.part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371.inprogress.400506e4-23ea-428c-b8eb-9ff196eeca64 to s3://mybuck/hourly_account_activities/dt=2021-09-02/hh=21/activity_category=verification/part-33ef16e7-55b7-4abb-9d97-0cdc7529509c-0-22371 at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:104) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.commitAfterRecovery(HadoopRenameFileCommitter.java:83) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at org.apache.flink.formats.hadoop.bulk.HadoopPathBasedPartFileWriter$HadoopPathBasedPendingFile.commitAfterRecovery(HadoopPathBasedPartFileWriter.java:101) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(StreamingFileSinkHelper.java:75) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.table.filesystem.stream.AbstractStreamingWriter.initializeState(AbstractStreamingWriter.java:120) ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at org.apache.flink.table.filesystem.stream.StreamingFileWriter.initializeState(StreamingFileWriter.java:55) ~[flink-table-blink_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:111) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:425) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:535) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:93) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:525) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:565) ~[flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) [flink-dist_2.11-1.12.1.jar:1.12.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) [flink-dist_2.11-1.12.1.jar:1.12.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_242] Caused by: java.io.IOException: java.util.concurrent.CancellationException at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:171) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:326) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:101) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] ... 22 more Caused by: java.util.concurrent.CancellationException at java.util.concurrent.FutureTask.report(FutureTask.java:121) ~[?:1.8.0_242] at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_242] at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager$3.call(MultipartCopyManager.java:262) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager$3.call(MultipartCopyManager.java:249) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3.MultipartCopyManager.copy(MultipartCopyManager.java:169) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.copy(Jets3tNativeFileSystemStore.java:466) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.rename(S3NativeFileSystem.java:1122) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.rename(EmrFileSystem.java:326) ~[emrfs-hadoop-assembly-2.39.0.jar:?] at org.apache.flink.formats.hadoop.bulk.committer.HadoopRenameFileCommitter.rename(HadoopRenameFileCommitter.java:101) ~[flink-sql-connector-hive-2.3.6_2.11-1.12.0.jar:1.12.0] ... 22 more{code} -- This message was sent by Atlassian Jira (v8.3.4#803005)