This issue has been going on for a while but i couldn't figure it out no matter what I tried Some general info Flink 1.14.5 with checkpoint/HA storage in S3 we have 3 jobs which are identical code the only difference is which kafka topic is read and what prefix is used in the S3 sink this means that each job has its own kafka topic and its own prefix, there are no shared paths here
The jobs are working fine and data is being written until some point in time (every couple of days on average) where a checkpoint would fail but one of the MPU in S3 will be missing and no completed file exists. once that happens the only way to move forward is to cancel the job and resubmit without a savepoint so the job will just start writing from part 0 For example from our last job run Checkpoint 4733 expires and fails since subtask 3 didn't report any progress for 1 minute (the checkpoint interval and expiry time). Confirmed the file in the exception log is indeed missing and no temp file for that part exists in the bucket. After job recovery checkpoint 4734 is completed successfully and that's when i assume temporary file are being committed and the job fails This is the failure log exception_message:Recovering commit failed for object topics/logging_adevent_log/2022/09/09/18/part-3-2458.json.gz. Object does not exist and MultiPart Upload KlSLd1QvaqrWZ08l.8QwJDBdcUMEgCIeLmrAQ4o4zUdjOqqbxx0_7neUyXBwMDN3D8ezt0R9_PfvxVO2zxLow_qC6IuiJv3iMV1tsMFIXQbtkLkbY8Uf2aQYIhBOQd975CglzPuTPbJtHdGLmODvcl0A5GH87xFBLmPeSgjvRPc- is not valid. stacktrace:java.io.IOException: Recovering commit failed for object topics/logging_adevent_log/2022/09/09/18/part-3-2458.json.gz. Object does not exist and MultiPart Upload KlSLd1QvaqrWZ08l.8QwJDBdcUMEgCIeLmrAQ4o4zUdjOqqbxx0_7neUyXBwMDN3D8ezt0R9_PfvxVO2zxLow_qC6IuiJv3iMV1tsMFIXQbtkLkbY8Uf2aQYIhBOQd975CglzPuTPbJtHdGLmODvcl0A5GH87xFBLmPeSgjvRPc- is not valid. at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:123) at org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:218) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466) at org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(StreamingFileSinkHelper.java:75) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:472) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189) at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94) at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:119) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286) at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.base/java.lang.Thread.run(Unknown Source) Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: Completing multipart commit on topics/logging_adevent_log/2022/09/09/18/part-3-2458.json.gz: com.amazonaws.services.s3.model.AmazonS3Exception: One or more of the specified parts could not be found. The part may not have been uploaded, or the specified entity tag may not match the part's entity tag. (Service: Amazon S3; Status Code: 400; Error Code: InvalidPart; Request ID: BNTAA71KECK5WBWN; S3 Extended Request ID: XU3BLgTWm1o1WWpoRlNAZYzCXXt406dtOodxhQyedau06Q70uar/wG++hgJP3/o5RvAgGeQUtpM=; Proxy: null), S3 Extended Request ID: XU3BLgTWm1o1WWpoRlNAZYzCXXt406dtOodxhQyedau06Q70uar/wG++hgJP3/o5RvAgGeQUtpM=:InvalidPart: One or more of the specified parts could not be found. The part may not have been uploaded, or the specified entity tag may not match the part's entity tag. (Service: Amazon S3; Status Code: 400; Error Code: InvalidPart; Request ID: BNTAA71KECK5WBWN; S3 Extended Request ID: XU3BLgTWm1o1WWpoRlNAZYzCXXt406dtOodxhQyedau06Q70uar/wG++hgJP3/o5RvAgGeQUtpM=; Proxy: null) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322) at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261) at org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:226) at org.apache.hadoop.fs.s3a.WriteOperationHelper.completeMPUwithRetries(WriteOperationHelper.java:271) at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.commitMultiPartUpload(HadoopS3AccessHelper.java:95) at org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:93) ... 25 more Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: One or more of the specified parts could not be found. The part may not have been uploaded, or the specified entity tag may not match the part's entity tag. (Service: Amazon S3; Status Code: 400; Error Code: InvalidPart; Request ID: BNTAA71KECK5WBWN; S3 Extended Request ID: XU3BLgTWm1o1WWpoRlNAZYzCXXt406dtOodxhQyedau06Q70uar/wG++hgJP3/o5RvAgGeQUtpM=; Proxy: null), S3 Extended Request ID: XU3BLgTWm1o1WWpoRlNAZYzCXXt406dtOodxhQyedau06Q70uar/wG++hgJP3/o5RvAgGeQUtpM= at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206) at com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3511) at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$finalizeMultipartUpload$1(WriteOperationHelper.java:233) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) ... 32 more More logs from around the failure time --- Failed to commit after recovery topics/logging_adevent_log/2022/09/09/18/part-3-2458.json.gz with MPU ID KlSLd1QvaqrWZ08l.8QwJDBdcUMEgCIeLmrAQ4o4zUdjOqqbxx0_7neUyXBwMDN3D8ezt0R9_PfvxVO2zxLow_qC6IuiJv3iMV1tsMFIXQbtkLkbY8Uf2aQYIhBOQd975CglzPuTPbJtHdGLmODvcl0A5GH87xFBLmPeSgjvRPc-. Checking if file was committed before... --- Object topics/logging_adevent_log/2022/09/09/18/part-3-2458.json.gz not existing after failed recovery commit with MPU ID KlSLd1QvaqrWZ08l.8QwJDBdcUMEgCIeLmrAQ4o4zUdjOqqbxx0_7neUyXBwMDN3D8ezt0R9_PfvxVO2zxLow_qC6IuiJv3iMV1tsMFIXQbtkLkbY8Uf2aQYIhBOQd975CglzPuTPbJtHdGLmODvcl0A5GH87xFBLmPeSgjvRPc- --- No master state to restore