Ah, I figured it out after all, turns out it was due to KMS encryption on the bucket; needed to add KMS permissions for the IAM role, otherwise there is an unauthorized error. Thanks for your help!
On Fri, Dec 6, 2019 at 2:34 AM Khachatryan Roman < khachatryan.ro...@gmail.com> wrote: > Hey Li, > > > my permissions is as listed above > As I understand it, it's a terraform script above. But what are the actual > permissions in AWS? > And it also makes sense to make sure that they are associated with the > right role and role with user. > > > Maybe I need to add the directory level as a resource? > You don't have to. > > If it's possible in your setup, you can debug by granting all s3 > permissions to all objects, like this: > actions = ["s3:*"] > resources = ["*"] > > Regards, > Roman > > > On Fri, Dec 6, 2019 at 12:15 AM Li Peng <li.p...@doordash.com> wrote: > >> Hey Roman, my permissions is as listed above, and here's the >> error message I get: >> >> ava.lang.Exception: >> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >> Could not forward element to next operator >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: >> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >> Could not forward element to next operator >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) >> at >> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) >> at >> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> at >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) >> Caused by: >> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: >> Could not forward element to next operator >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) >> at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) >> at >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >> at >> org.apache.flink.streaming.api.scala.DataStream$$anon$6.$anonfun$flatMap$1(DataStream.scala:675) >> at >> org.apache.flink.streaming.api.scala.DataStream$$anon$6.$anonfun$flatMap$1$adapted(DataStream.scala:675) >> at scala.collection.immutable.List.foreach(List.scala:388) >> at >> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:675) >> at >> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) >> ... 13 more >> Caused by: java.nio.file.AccessDeniedException: 2019-12-05--02/part-1-0: >> initiate MultiPartUpload on 2019-12-05--02/part-1-0: >> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: >> Access Denied (Service: Amazon S3; Status Code: 403; Error Code: >> AccessDenied; Request ID: XXXX; S3 Extended Request ID: XXXX), S3 Extended >> Request ID: XXXX=:AccessDenied >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198) >> at >> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:66) >> at >> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:245) >> at >> org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68) >> at >> org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:76) >> at >> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:221) >> at >> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212) >> at >> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268) >> at >> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370) >> at >> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637) >> ... 26 more >> Caused by: >> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: >> Access Denied (Service: Amazon S3; Status Code: 403; Error Code: >> AccessDenied; Request ID: XXXX; S3 Extended Request ID: XXXX, S3 Extended >> Request ID: XXXX >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272) >> at >> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3152) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199) >> at >> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) >> ... 42 more >> >> Maybe I need to add the directory level as a resource? >> >> resources = [ >> "arn:aws:s3:::bucket-name", >> "arn:aws:s3:::bucket-name/", >> "arn:aws:s3:::bucket-name/*" >> ] >> >> Thanks, >> Li >> >> On Thu, Dec 5, 2019 at 6:11 AM r_khachatryan <khachatryan.ro...@gmail.com> >> wrote: >> >>> Hi Li, >>> >>> Could you please list the permissions you see and the error message you >>> receive from AWS? >>> >>> >>> Li Peng-2 wrote >>> > Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles >>> > for >>> > auth. Does anyone know what permissions the role should have for the >>> > specified s3 bucket to work properly? I've been getting some auth >>> errors, >>> > and I suspect I'm missing some permissions: >>> >>> Regards, >>> Roman >>> >>> >>> >>> -- >>> Sent from: >>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ >>> >>