This issue has been going on for a while but i couldn't figure it out no
matter what I tried
Some general info
Flink 1.14.5 with checkpoint/HA storage in S3
we have 3 jobs which are identical code the only difference is which kafka
topic is read and what prefix is used in the S3 sink
this means that each job has its own kafka topic and its own prefix, there
are no shared paths here

The jobs are working fine and data is being written until some point in
time (every couple of days on average) where a checkpoint would fail but
one of the MPU in S3 will be missing and no completed file exists.
once that happens the only way to move forward is to cancel the job and
resubmit without a savepoint so the job will just start writing from part 0

For example from our last job run
Checkpoint 4733 expires and fails since subtask 3 didn't report any
progress for 1 minute (the checkpoint interval and expiry time). Confirmed
the file in the exception log is indeed missing and no temp file for that
part exists in the bucket. After job recovery checkpoint 4734 is completed
successfully and that's when i assume temporary file are being committed
and the job fails
This is the failure log
exception_message:Recovering commit failed for object
topics/logging_adevent_log/2022/09/09/18/part-3-2458.json.gz. Object does
not exist and MultiPart Upload
KlSLd1QvaqrWZ08l.8QwJDBdcUMEgCIeLmrAQ4o4zUdjOqqbxx0_7neUyXBwMDN3D8ezt0R9_PfvxVO2zxLow_qC6IuiJv3iMV1tsMFIXQbtkLkbY8Uf2aQYIhBOQd975CglzPuTPbJtHdGLmODvcl0A5GH87xFBLmPeSgjvRPc-
is not valid.
stacktrace:java.io.IOException: Recovering commit failed for object
topics/logging_adevent_log/2022/09/09/18/part-3-2458.json.gz. Object does
not exist and MultiPart Upload
KlSLd1QvaqrWZ08l.8QwJDBdcUMEgCIeLmrAQ4o4zUdjOqqbxx0_7neUyXBwMDN3D8ezt0R9_PfvxVO2zxLow_qC6IuiJv3iMV1tsMFIXQbtkLkbY8Uf2aQYIhBOQd975CglzPuTPbJtHdGLmODvcl0A5GH87xFBLmPeSgjvRPc-
is not valid.
at
org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:123)
at
org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter$OutputStreamBasedPendingFile.commitAfterRecovery(OutputStreamBasedPartFileWriter.java:218)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.commitRecoveredPendingFiles(Bucket.java:160)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.<init>(Bucket.java:127)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.restore(Bucket.java:466)
at
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl.restoreBucket(DefaultBucketFactoryImpl.java:67)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.handleRestoredBucketState(Buckets.java:192)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeActiveBuckets(Buckets.java:179)
at
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.initializeState(Buckets.java:163)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(StreamingFileSinkHelper.java:75)
at
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.initializeState(StreamingFileSink.java:472)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:94)
at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:119)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:286)
at
org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:109)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:711)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:687)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:654)
at
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: Completing
multipart commit on
topics/logging_adevent_log/2022/09/09/18/part-3-2458.json.gz:
com.amazonaws.services.s3.model.AmazonS3Exception: One or more of the
specified parts could not be found. The part may not have been uploaded, or
the specified entity tag may not match the part's entity tag. (Service:
Amazon S3; Status Code: 400; Error Code: InvalidPart; Request ID:
BNTAA71KECK5WBWN; S3 Extended Request ID:
XU3BLgTWm1o1WWpoRlNAZYzCXXt406dtOodxhQyedau06Q70uar/wG++hgJP3/o5RvAgGeQUtpM=;
Proxy: null), S3 Extended Request ID:
XU3BLgTWm1o1WWpoRlNAZYzCXXt406dtOodxhQyedau06Q70uar/wG++hgJP3/o5RvAgGeQUtpM=:InvalidPart:
One or more of the specified parts could not be found. The part may not
have been uploaded, or the specified entity tag may not match the part's
entity tag. (Service: Amazon S3; Status Code: 400; Error Code: InvalidPart;
Request ID: BNTAA71KECK5WBWN; S3 Extended Request ID:
XU3BLgTWm1o1WWpoRlNAZYzCXXt406dtOodxhQyedau06Q70uar/wG++hgJP3/o5RvAgGeQUtpM=;
Proxy: null)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.finalizeMultipartUpload(WriteOperationHelper.java:226)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.completeMPUwithRetries(WriteOperationHelper.java:271)
at
org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.commitMultiPartUpload(HadoopS3AccessHelper.java:95)
at
org.apache.flink.fs.s3.common.writer.S3Committer.commitAfterRecovery(S3Committer.java:93)
... 25 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: One or more
of the specified parts could not be found. The part may not have been
uploaded, or the specified entity tag may not match the part's entity tag.
(Service: Amazon S3; Status Code: 400; Error Code: InvalidPart; Request ID:
BNTAA71KECK5WBWN; S3 Extended Request ID:
XU3BLgTWm1o1WWpoRlNAZYzCXXt406dtOodxhQyedau06Q70uar/wG++hgJP3/o5RvAgGeQUtpM=;
Proxy: null), S3 Extended Request ID:
XU3BLgTWm1o1WWpoRlNAZYzCXXt406dtOodxhQyedau06Q70uar/wG++hgJP3/o5RvAgGeQUtpM=
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
at
com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
at
com.amazonaws.services.s3.AmazonS3Client.completeMultipartUpload(AmazonS3Client.java:3511)
at
org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$finalizeMultipartUpload$1(WriteOperationHelper.java:233)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 32 more

More logs from around the failure time
---
Failed to commit after recovery
topics/logging_adevent_log/2022/09/09/18/part-3-2458.json.gz with MPU ID
KlSLd1QvaqrWZ08l.8QwJDBdcUMEgCIeLmrAQ4o4zUdjOqqbxx0_7neUyXBwMDN3D8ezt0R9_PfvxVO2zxLow_qC6IuiJv3iMV1tsMFIXQbtkLkbY8Uf2aQYIhBOQd975CglzPuTPbJtHdGLmODvcl0A5GH87xFBLmPeSgjvRPc-.
Checking if file was committed before...
---
Object topics/logging_adevent_log/2022/09/09/18/part-3-2458.json.gz not
existing after failed recovery commit with MPU ID
KlSLd1QvaqrWZ08l.8QwJDBdcUMEgCIeLmrAQ4o4zUdjOqqbxx0_7neUyXBwMDN3D8ezt0R9_PfvxVO2zxLow_qC6IuiJv3iMV1tsMFIXQbtkLkbY8Uf2aQYIhBOQd975CglzPuTPbJtHdGLmODvcl0A5GH87xFBLmPeSgjvRPc-
---
No master state to restore

Reply via email to