Hi, I am facing the below exception while running the job from Flink Dashboard and from AKS Cluster : Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'wasb'. The scheme is directly supported by Flink through the following plugin: flink-fs-azure-hadoop.
In order to connect to the Blob storage we have done the below setup: 1. Put the flink-azure-fs-hadoop.jar in plugins folder in AKS cluster. 2. In the yaml file we have put this fs.azure.account.key.pjmsimt1pfdbsbsa.blob.core.windows.net: zSw+bQfhHIZsN0sJHsa8Qz4oJjNhktdwuFWHkfA3tjxgSodnniLfOQ2ZKLmFOHcme8DwWk3s0C8+2r1wzlqNmQ== 3. We have also tried restarting pods after yaml configurations. 4. Also, in code, tried writing to blob storage with stream.writeAsText and also with StreamingFileSink.forRowFormat but no luck. I have also attached the code and error for reference. Could you please suggest and help to get the connectivity of Flink job with Azure Blob Storage? Hoping to get your reply soon. Regards Simar
BlobStorageSink.scala
Description: BlobStorageSink.scala
Apache Flink Dashboard Overview Jobs Running Jobs Completed Jobs Task Managers Job Manager Submit New Job Version: 1.13.0 Commit: f06faf1 @ 2021-04-23T15:39:21+02:00 Message: 0 1 2 3 4 5 6 7 8 9 Job failed during initialization of JobManager org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'Source: Collection Source -> Sink: Unnamed': Could not find a file system implementation for scheme 'wasb'. The scheme is directly supported by Flink through the following plugin: flink-fs-azure-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) ... 7 more Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'Source: Collection Source -> Sink: Unnamed': Could not find a file system implementation for scheme 'wasb'. The scheme is directly supported by Flink through the following plugin: flink-fs-azure-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/. at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 7 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Cannot initialize task 'Source: Collection Source -> Sink: Unnamed': Could not find a file system implementation for scheme 'wasb'. The scheme is directly supported by Flink through the following plugin: flink-fs-azure-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/. at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:178) at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:107) at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ... 8 more Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'wasb'. The scheme is directly supported by Flink through the following plugin: flink-fs-azure-hadoop. Please ensure that each plugin resides within its own subfolder within the plugins directory. See https://ci.apache.org/projects/flink/flink-docs-stable/ops/plugins.html for more information. If you want to use a Hadoop file system for that scheme, please add the scheme to the configuration fs.allowed-fallback-filesystems. For a full list of supported file systems, please see https://ci.apache.org/projects/flink/flink-docs-stable/ops/filesystems/. at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:513) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:407) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) at org.apache.flink.api.common.io.FileOutputFormat.initializeGlobal(FileOutputFormat.java:288) at org.apache.flink.runtime.jobgraph.InputOutputFormatVertex.initializeOnMaster(InputOutputFormatVertex.java:110) at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:174) ... 19 more Server Response Message List