Thanks Chesnay ! that helped me resolve the issue
On Fri, 6 Aug 2021 at 04:31, Chesnay Schepler <ches...@apache.org> wrote: > The reason this doesn't work is that your application works directly > against Hadoop. > The filesystems in the plugins directory are only loaded via specific > code-paths, specifically when the Flink FileSystem class is used. > Since you are using Hadoop directly you are side-stepping the plugin > mechanism. > > So you have to make sure that Hadoop + Hadoop's S3 filesystem is available > to the client. > > On 06/08/2021 08:02, tarun joshi wrote: > > Hey All, > > I am running flink in docker containers (image Tag > :flink:scala_2.11-java11) on EC2 and getting exception as I am trying to > submit a job through the local ./opt/flink/bin > > *org.apache.flink.client.program.ProgramInvocationException: The main > method caused an error: No FileSystem for scheme "s3"* > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) > at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) > at > org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) > Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: No > FileSystem for scheme "s3" > at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3443) > at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466) > at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174) > at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574) > at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521) > at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540) > at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365) > at > org.apache.parquet.hadoop.util.HadoopInputFile.fromPath(HadoopInputFile.java:38) > at > org.apache.flink.examples.java.wordcount.WordCount.printParquetData(WordCount.java:142) > at > org.apache.flink.examples.java.wordcount.WordCount.main(WordCount.java:83) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown > Source) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown > Source) > at java.base/java.lang.reflect.Method.invoke(Unknown Source) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) > ... 8 more > > This is the way I am invoking Flink Built_IN S3 plugins for the > Jobmanager and TaskManager : > > > > > > > > > > *docker run \ --rm \ --volume /root/:/root/ \ --env > JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \ --env > TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" \ > --env > ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar" > \ --name=jobmanager \ --network flink-network \ --publish 8081:8081 \ > flink:scala_2.11-java11 jobmanager &* > > > > > > > > > > *docker run \ --rm \ --env > JOB_MANAGER_RPC_ADDRESS="${JOB_MANAGER_RPC_ADDRESS}" \ --env > TASK_MANAGER_NUMBER_OF_TASK_SLOTS="${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" \ > --env > ENABLE_BUILT_IN_PLUGINS="flink-s3-fs-hadoop-1.13.1.jar;flink-s3-fs-presto-1.13.1.jar" > \ --name=taskmanager_0 \ --network flink-network \ flink:scala_2.11-java11 > taskmanager & * > > This is how I am defining dependencies in my pom.xml (I am working upon > the Flink-Examples project from Flink Github repo). > > <dependencies> > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-java</artifactId> > <version>${project.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-scala_${scala.binary.version}</artifactId> > <version>${project.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.flink</groupId> > <artifactId>flink-clients_${scala.binary.version}</artifactId> > <version>${project.version}</version> > <scope>provided</scope> > </dependency> > > <dependency> > <groupId>org.apache.parquet</groupId> > <artifactId>parquet-avro</artifactId> > <version>1.12.0</version> > </dependency> > <dependency> > <groupId>org.apache.parquet</groupId> > <artifactId>parquet-column</artifactId> > <version>1.12.0</version> > </dependency> > <dependency> > <groupId>org.apache.parquet</groupId> > <artifactId>parquet-hadoop</artifactId> > <version>1.12.0</version> > </dependency> > <dependency> > <groupId>org.apache.hadoop</groupId> > <artifactId>hadoop-common</artifactId> > <version>3.3.1</version> > </dependency> > </dependencies> > > I am also able to see plugins being loaded for JobManager and TaskManager > : > > > > > > *Linking flink-s3-fs-hadoop-1.13.1.jar to plugin directory Successfully > enabled flink-s3-fs-hadoop-1.13.1.jar Linking flink-s3-fs-presto-1.13.1.jar > to plugin directory Successfully enabled flink-s3-fs-presto-1.13.1.jar * > > Let me if I am doing anything wrong. > > *Thanks for the help! * > > >