Hello team, I’m developing a system where we are trying to sink to an immutable s3 bucket. This bucket has server side encryption set as KMS. The DataStream sink works perfectly fine when I don’t use the immutable bucket but when I use an immutable bucket, I get exceptions regarding multipart upload failures. It says we need to enable md5 hashing for the put object to work.
Here’s the stack trace: org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught exception while processing timer. at org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:1090) at org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:1058) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1520) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$null$10(StreamTask.java:1509) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87) at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:261) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.flink.streaming.runtime.tasks.TimerException: java.io.IOException: Uploading parts failed ... 11 common frames omitted Caused by: java.io.IOException: Uploading parts failed at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartUploadToComplete(RecoverableMultiPartUploadImpl.java:231) at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.awaitPendingPartsUpload(RecoverableMultiPartUploadImpl.java:215) at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetRecoverable(RecoverableMultiPartUploadImpl.java:151) at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:123) at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.snapshotAndGetCommitter(RecoverableMultiPartUploadImpl.java:56) at org.apache.flink.fs.s3.common.writer.S3RecoverableFsDataOutputStream.closeForCommit(S3RecoverableFsDataOutputStream.java:167) at org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter.closeForCommit(PartFileWriter.java:71) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.closePartFile(Bucket.java:239) at org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.onProcessingTime(Bucket.java:338) at org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(Buckets.java:304) at org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.onProcessingTime(StreamingFileSink.java:439) at org.apache.flink.streaming.runtime.tasks.StreamTask.invokeProcessingTimeCallback(StreamTask.java:1518) ... 10 common frames omitted Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: upload part on raw_events/xxx/xxx/2020/07/15/20/archived-2-0.txt: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header is required for Put Part requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxx; S3 Extended Request ID: xxxx), S3 Extended Request ID: xxxxxx :InvalidRequest: Content-MD5 HTTP header is required for Put Part requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxxx; S3 Extended Request ID: xxxx) at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:212) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) at org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) at org.apache.hadoop.fs.s3a.WriteOperationHelper.uploadPart(WriteOperationHelper.java:471) at org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.uploadPart(HadoopS3AccessHelper.java:73) at org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl$UploadTask.run(RecoverableMultiPartUploadImpl.java:318) at org.apache.flink.fs.s3.common.utils.BackPressuringExecutor$SemaphoreReleasingRunnable.run(BackPressuringExecutor.java:92) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ... 1 common frames omitted Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: Content-MD5 HTTP header is required for Put Part requests with Object Lock parameters (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: xxxxx; S3 Extended Request ID: xxxx) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) at com.amazonaws.services.s3.AmazonS3Client.doUploadPart(AmazonS3Client.java:3306) at com.amazonaws.services.s3.AmazonS3Client.uploadPart(AmazonS3Client.java:3291) at org.apache.hadoop.fs.s3a.S3AFileSystem.uploadPart(S3AFileSystem.java:1576) at org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$uploadPart$8(WriteOperationHelper.java:474) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) ... 12 common frames omitted My questions are: 1. Is this md5 hashing a mandatory rule to support this? The first part of the file always gets uploaded to s3 but next part onward, it fails. According to was s3 documentation for immutable buckets (with object locks) they say it’s mandatory - https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html “The Content-MD5 header is required for any request to upload an object with a retention period configured using Amazon S3 Object Lock. For more information about Amazon S3 Object Lock, see Amazon S3 Object Lock Overview <https://docs.aws.amazon.com/AmazonS3/latest/dev/object-lock-overview.html> in the *Amazon Simple Storage Service Developer Guide*." 2. If it’s mandatory, how do I set this HTTP header while sinking? I checked most of the documentation and tried going through the source code too but couldn’t really find a provision where we could set the headers for a request that goes in as a sink.