Ah, I figured it out after all, turns out it was due to KMS encryption on
the bucket; needed to add KMS permissions for the IAM role, otherwise there
is an unauthorized error. Thanks for your help!

On Fri, Dec 6, 2019 at 2:34 AM Khachatryan Roman <
khachatryan.ro...@gmail.com> wrote:

> Hey Li,
>
> > my permissions is as listed above
> As I understand it, it's a terraform script above. But what are the actual
> permissions in AWS?
> And it also makes sense to make sure that they are associated with the
> right role and role with user.
>
> > Maybe I need to add the directory level as a resource?
> You don't have to.
>
> If it's possible in your setup, you can debug by granting all s3
> permissions to all objects, like this:
>     actions   = ["s3:*"]
>     resources = ["*"]
>
> Regards,
> Roman
>
>
> On Fri, Dec 6, 2019 at 12:15 AM Li Peng <li.p...@doordash.com> wrote:
>
>> Hey Roman, my permissions is as listed above, and here's the
>> error message I get:
>>
>> ava.lang.Exception:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
>> Caused by:
>> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
>> Could not forward element to next operator
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>> at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.$anonfun$flatMap$1(DataStream.scala:675)
>> at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.$anonfun$flatMap$1$adapted(DataStream.scala:675)
>> at scala.collection.immutable.List.foreach(List.scala:388)
>> at
>> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:675)
>> at
>> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>> ... 13 more
>> Caused by: java.nio.file.AccessDeniedException: 2019-12-05--02/part-1-0:
>> initiate MultiPartUpload on 2019-12-05--02/part-1-0:
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>> Access Denied (Service: Amazon S3; Status Code: 403; Error Code:
>> AccessDenied; Request ID: XXXX; S3 Extended Request ID: XXXX), S3 Extended
>> Request ID: XXXX=:AccessDenied
>> at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218)
>> at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
>> at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:260)
>> at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317)
>> at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:256)
>> at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:231)
>> at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.retry(WriteOperationHelper.java:123)
>> at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.initiateMultiPartUpload(WriteOperationHelper.java:198)
>> at
>> org.apache.flink.fs.s3hadoop.HadoopS3AccessHelper.startMultiPartUpload(HadoopS3AccessHelper.java:66)
>> at
>> org.apache.flink.fs.s3.common.writer.RecoverableMultiPartUploadImpl.newUpload(RecoverableMultiPartUploadImpl.java:245)
>> at
>> org.apache.flink.fs.s3.common.writer.S3RecoverableMultipartUploadFactory.getNewRecoverableUpload(S3RecoverableMultipartUploadFactory.java:68)
>> at
>> org.apache.flink.fs.s3.common.writer.S3RecoverableWriter.open(S3RecoverableWriter.java:76)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.rollPartFile(Bucket.java:221)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.write(Bucket.java:212)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onElement(Buckets.java:268)
>> at
>> org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.invoke(StreamingFileSink.java:370)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>> ... 26 more
>> Caused by:
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
>> Access Denied (Service: Amazon S3; Status Code: 403; Error Code:
>> AccessDenied; Request ID: XXXX; S3 Extended Request ID: XXXX, S3 Extended
>> Request ID: XXXX
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1304)
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1056)
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:743)
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:717)
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:699)
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:667)
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:649)
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:513)
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4325)
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:4272)
>> at
>> org.apache.flink.fs.s3base.shaded.com.amazonaws.services.s3.AmazonS3Client.initiateMultipartUpload(AmazonS3Client.java:3152)
>> at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.S3AFileSystem.initiateMultipartUpload(S3AFileSystem.java:2597)
>> at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.WriteOperationHelper.lambda$initiateMultiPartUpload$0(WriteOperationHelper.java:199)
>> at
>> org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
>> ... 42 more
>>
>> Maybe I need to add the directory level as a resource?
>>
>> resources = [
>>  "arn:aws:s3:::bucket-name",
>>  "arn:aws:s3:::bucket-name/",
>>  "arn:aws:s3:::bucket-name/*"
>> ]
>>
>> Thanks,
>> Li
>>
>> On Thu, Dec 5, 2019 at 6:11 AM r_khachatryan <khachatryan.ro...@gmail.com>
>> wrote:
>>
>>> Hi Li,
>>>
>>> Could you please list the permissions you see and the error message you
>>> receive from AWS?
>>>
>>>
>>> Li Peng-2 wrote
>>> > Hey folks, I'm trying to use StreamingFileSink with s3, using IAM roles
>>> > for
>>> > auth. Does anyone know what permissions the role should have for the
>>> > specified s3 bucket to work properly? I've been getting some auth
>>> errors,
>>> > and I suspect I'm missing some permissions:
>>>
>>> Regards,
>>> Roman
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>

Reply via email to