Not sure @yangze ... but other services which are deployed in same places we are able to access s3 bucket, the link which you share are recommended way, if we have access to s3 then we should not pass credentials ? On Wednesday, September 22, 2021, 02:59:05 AM EDT, Yangze Guo <karma...@gmail.com> wrote: You might need to configure the access credential. [1]
[1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/filesystems/s3/#configure-access-credentials Best, Yangze Guo On Wed, Sep 22, 2021 at 2:17 PM Dhiru <userdh...@yahoo.com> wrote: > > > i see org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326) plugin is > not able to create folder , not sure if I need to change something > Whereas when We are trying to pass from the local laptop and passing aws > credentails its able to create a folder and running as expected > On Wednesday, September 22, 2021, 01:39:04 AM EDT, Dhiru > <userdh...@yahoo.com> wrote: > > > flink image I have added both s3 plugin > FROM flink:1.11.3-scala_2.12-java11 > RUN mkdir ./plugins/flink-s3-fs-presto > RUN cp ./opt/flink-s3-fs-presto-1.11.3.jar ./plugins/flink-s3-fs-presto/ > RUN mkdir ./plugins/flink-s3-fs-hadoop > RUN cp ./opt/flink-s3-fs-hadoop-1.11.3.jar ./plugins/flink-s3-fs-hadoop/ > > some part of flink-conf.yaml ( I tried with both s3a and s3 ) > # REQUIRED: set storage location for job metadata in remote storage > state.backend: filesystem > state.backend.fs.checkpointdir: >s3a://msc-actigraph-test-bucket/flink-checkpointing/checkpoints > state.checkpoints.dir: >s3a://msc-actigraph-test-bucket/flink-checkpointing/externalized-checkpoints > state.savepoints.dir: >s3a://msc-actigraph-test-bucket/flink-checkpointing/savepoints > high-availability.storageDir: >s3a://msc-actigraph-test-bucket/flink-checkpointing/storagedir > s3.path.style.access: true > > org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute > application. at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:103) > at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown > Source) at > java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown > Source) at > java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source) > at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown > Source) at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) at java.base/java.lang.Thread.run(Unknown Source) Caused by: > java.util.concurrent.CompletionException: > org.apache.flink.util.FlinkRuntimeException: Could not execute application. > at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown > Source) at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown > Source) ... 7 more Caused by: org.apache.flink.util.FlinkRuntimeException: > Could not execute application. at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:81) > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:67) > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:100) > ... 7 more Caused by: > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error: Failed to execute job 'DeduplicationJob'. at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149) > at > org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:78) > ... 9 more Caused by: org.apache.flink.util.FlinkException: Failed to > execute job 'DeduplicationJob'. at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1829) > at > org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128) > at > org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:76) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1700) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699) > at io.epiphanous.flinkrunner.flink.BaseFlinkJob.run(BaseFlinkJob.scala:45) > at io.epiphanous.flinkrunner.FlinkRunner.process1(FlinkRunner.scala:56) at > io.epiphanous.flinkrunner.FlinkRunner.process(FlinkRunner.scala:33) at > com.mdsol.flink.delivery_streams.Runner$.run(Runner.scala:25) at > com.mdsol.flink.delivery_streams.Runner$.main(Runner.scala:7) at > com.mdsol.flink.delivery_streams.Runner.main(Runner.scala) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) at java.base/java.lang.reflect.Method.invoke(Unknown Source) at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288) > ... 12 more Caused by: > org.apache.flink.runtime.client.JobSubmissionException: Failed to submit job. > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$3(Dispatcher.java:362) > at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown > Source) at > java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown > Source) at > java.base/java.util.concurrent.CompletableFuture$Completion.run(Unknown > Source) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not > instantiate JobManager. at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:427) > at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown > Source) ... 6 more Caused by: org.apache.flink.util.FlinkRuntimeException: > Failed to create checkpoint storage at checkpoint coordinator side. at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:307) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:226) > at > org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:483) > at > org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338) > at > org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:270) > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:244) > at org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:231) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119) > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103) > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:290) > at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:278) at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40) > at > org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140) > at > org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:417) > ... 7 more Caused by: java.nio.file.AccessDeniedException: > s3a://msc-actigraph-test-bucket/flink-checkpointing/b3b19b338b6c2b2bf022b219051d6d1a/shared: > getFileStatus on > s3a://msc-actigraph-test-bucket/flink-checkpointing/b3b19b338b6c2b2bf022b219051d6d1a/shared: > com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: > Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: > B99FWA0KE3JYKGN9; S3 Extended Request ID: > OL+2LER+8Pofuv5sM7G6qvjHiTzmSce66URzMRvlw4VwkS4jKeU2/INZRj9UEAsGASjl8Ohn3OE=), > S3 Extended Request ID: > OL+2LER+8Pofuv5sM7G6qvjHiTzmSce66URzMRvlw4VwkS4jKeU2/INZRj9UEAsGASjl8Ohn3OE=:403 > Forbidden at > org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:218) at > org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:145) at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2184) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2149) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2088) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2037) > at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2007) at > org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2326) at > org.apache.flink.fs.s3hadoop.common.HadoopFileSystem.mkdirs(HadoopFileSystem.java:170) > at > org.apache.flink.core.fs.PluginFileSystemFactory$ClassLoaderFixingFileSystem.mkdirs(PluginFileSystemFactory.java:162) > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStorage.initializeBaseLocations(FsCheckpointStorage.java:111) > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:305) > ... 22 more Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: > Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; > Request ID: B99FWA0KE3JYKGN9; S3 Extended Request ID: > OL+2LER+8Pofuv5sM7G6qvjHiTzmSce66URzMRvlw4VwkS4jKeU2/INZRj9UEAsGASjl8Ohn3OE=), > S3 Extended Request ID: > OL+2LER+8Pofuv5sM7G6qvjHiTzmSce66URzMRvlw4VwkS4jKeU2/INZRj9UEAsGASjl8Ohn3OE= > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1799) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1383) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1359) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698) > at > com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680) > at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544) at > com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524) at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5054) at > com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5000) at > com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1335) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$4(S3AFileSystem.java:1235) > at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:317) at > org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:280) at > org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:1232) > at > org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2169) > ... 31 more > > > I have tried with some other sample code, I am able to read/write/create > folder access of s3 bucket , but when I try from flink I see some new folder > try to create > s3a://msc-actigraph-test-bucket/flink-checkpointing/b3b19b338b6c2b2bf022b219051d6d1a/ > > do not see (b3b19b338b6c2b2bf022b219051d6d1a) folder created to s3, I am not > sure if this is the right way we are trying to write to the s3 bucket using > flink ? > >