Flink fails to authenticate with adlsgen2 azure storage account using managed identities in an Azure Kubernetes Cluster. We receive the following error from flink when we try to configure managed identities to authenticate to adlsgen2.
Caused by: Unable to load key provider class. at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AbfsConfiguration.getTokenProvider(AbfsConfiguration.java:540) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.initializeClient(AzureBlobFileSystemStore.java:1136) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.<init>(AzureBlobFileSystemStore.java:174) at org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.initialize(AzureBlobFileSystem.java:110) at org.apache.flink.fs.azurefs.AbstractAzureFSFactory.create(AbstractAzureFSFactory.java:79) at org.apache.flink.core.fs.PluginFileSystemFactory.create(PluginFileSystemFactory.java:62) at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:508) at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:409) at org.apache.flink.core.fs.Path.getFileSystem(Path.java:274) at org.apache.flink.runtime.blob.BlobUtils.createFileSystemBlobStore(BlobUtils.java:99) ... 12 more We are trying to run flink in an azure Kubernetes cluster using the Flink Apache operator (https://github.com/apache/flink-kubernetes-operator) The following are the settings we are using in the spec.flinkConfigurations to update the flink-conf.yaml file to use azure managed identities based off this documentation (https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Authentication). The client Id we are using is the user assigned identity of the AKS agent pools /VM scalesets . We applied the storage data blob contributor role scoped to the adlsgen2 azure storage account for the client id. Using the storage account key is not recommended for adlsgen2. Any insights into this matter will be helpful as we would prefer to use managed identities. apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: kafkatest spec: flinkVersion: v1_15 flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.backend: rocksdb state.backend.fs.checkpointdir: abfs://containern...@storageaccountname.dfs.core.windows.net/kafkatest-checkpoints/ state.checkpoints.dir: abfs://containern...@storageaccountname.dfs.core.windows.net/kafkatest-externalized-checkpoints/ state.savepoints.dir: abfs://containern...@storageaccountname.dfs.core.windows.net/kafkatest-savepoints/ high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory high-availability.storageDir: abfs://containern...@storageaccountname.dfs.core.windows.net/kafkatest-ha fs.azure.createRemoteFileSystemDuringInitialization: "true" fs.azure.account.auth.type.storageaccountname.dfs.core.windows.net: OAuth fs.azure.account.oauth.provider.type.storageaccountname.dfs.core.windows.net: org.apache.hadoop.fs.azurebfs.oauth2.MsiTokenProvider fs.azure.account.oauth2.msi.tenant.storageaccountname.dfs.core.windows.net: XXXXXXXXXXX fs.azure.account.oauth2.client.id.storageaccountname.dfs.core.windows.net: XXXXXXXXXXX fs.azure.account.oauth2.msi.endpoint.storageaccountname.dfs.core.windows.net: https://login.microsoftonline.com/<TENANT<https://login.microsoftonline.com/%3cTENANT> ID>/oauth2/token serviceAccount: workload-identity-sa podTemplate: metadata: name: test spec: securityContext: runAsUser: 9999 runAsGroup: 9999 fsGroup: 9999 runAsNonRoot: true containers: # Do not change the main container name - name: flink-main-container env: - name: ENABLE_BUILT_IN_PLUGINS value: flink-azure-fs-hadoop-1.15.2.jar