Hi all, I'm upgrading an application from Flink 1.16.1 to 1.17.0 and I noticed that delegation tokens (DTs) configuration [1] seems to have started to be mandatory. Is my understanding correct?
I found this announcement [2] saying that from 1.17.0 version DTs are enabled by default [3] but it would be good to have something related to it in the 1.17.0 release notes [4] if that's the case. Perhaps making it disabled by default would be better. For instance, if you try to run the TopSpeedWindowing streaming example [5] against a fresh downloaded 1.17.0 distribution, you should get an error message like this: 2023-04-07 09:18:32,814 [main] ERROR org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] - Failed to initialize delegation token provider s3 java.lang.IllegalStateException: Delegation token provider with service name {} has multiple implementations [s3] at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.lambda$loadProviders$0(DefaultDelegationTokenManager.java:133) ~[flink-dist-1.17.0.jar:1.17.0] at java.util.Iterator.forEachRemaining(Unknown Source) ~[?:?] at org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.loadProviders(DefaultDelegationTokenManager.java:156) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.<init>(DefaultDelegationTokenManager.java:111) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory.create(DefaultDelegationTokenManagerFactory.java:50) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:392) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282) ~[flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232) ~[flink-dist-1.17.0.jar:1.17.0] at java.security.AccessController.doPrivileged(Native Method) ~[?:?] at javax.security.auth.Subject.doAs(Unknown Source) [?:?] at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) [hadoop-common-2.8.5.jar:?] at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229) [flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) [flink-dist-1.17.0.jar:1.17.0] at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:82) [flink-dist-1.17.0.jar:1.17.0] 2023-04-07 09:18:32,824 [main] INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting StandaloneApplicationClusterEntryPoint down with application status FAILED. Diagnostics org.apache.flink.util.FlinkRuntimeException: java.lang.IllegalStateException: Delegation token provider with service name {} has multiple implementations [s3] at org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.lambda$loadProviders$0(DefaultDelegationTokenManager.java:151) at java.base/java.util.Iterator.forEachRemaining(Unknown Source) at org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.loadProviders(DefaultDelegationTokenManager.java:156) at org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.<init>(DefaultDelegationTokenManager.java:111) at org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory.create(DefaultDelegationTokenManagerFactory.java:50) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:392) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232) at java.base/java.security.AccessController.doPrivileged(Native Method) at java.base/javax.security.auth.Subject.doAs(Unknown Source) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729) at org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:82) Caused by: java.lang.IllegalStateException: Delegation token provider with service name {} has multiple implementations [s3] at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215) at org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.lambda$loadProviders$0(DefaultDelegationTokenManager.java:133) ... 14 more I only managed to make the example run by setting the configuration "security.delegation.token.provider.s3.enabled" to "false". Refs: [1] https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#auth-with-external-systems [2] https://flink.apache.org/2023/01/20/delegation-token-framework-obtain-distribute-and-use-temporary-credentials-automatically/ [3] https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#security-delegation-tokens-enabled [4] https://nightlies.apache.org/flink/flink-docs-release-1.17/release-notes/flink-1.17 [5] https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/resource-providers/standalone/overview/#application-mode Best regards, Arthur