Hi all,

I'm upgrading an application from Flink 1.16.1 to 1.17.0 and I noticed that
delegation tokens (DTs) configuration [1] seems to have started to be
mandatory. Is my understanding correct?

I found this announcement [2] saying that from 1.17.0 version DTs are
enabled by default [3] but it would be good to have something related to it
in the 1.17.0 release notes [4] if that's the case. Perhaps making it
disabled by default would be better.

For instance, if you try to run the TopSpeedWindowing streaming example [5]
against a fresh downloaded 1.17.0 distribution, you should get an error
message like this:

2023-04-07 09:18:32,814 [main] ERROR
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager [] -
Failed to initialize delegation token provider s3
java.lang.IllegalStateException: Delegation token provider with service
name {} has multiple implementations [s3]
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
~[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.lambda$loadProviders$0(DefaultDelegationTokenManager.java:133)
~[flink-dist-1.17.0.jar:1.17.0]
at java.util.Iterator.forEachRemaining(Unknown Source) ~[?:?]
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.loadProviders(DefaultDelegationTokenManager.java:156)
~[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.<init>(DefaultDelegationTokenManager.java:111)
~[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory.create(DefaultDelegationTokenManagerFactory.java:50)
~[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:392)
~[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
~[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
~[flink-dist-1.17.0.jar:1.17.0]
at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
at javax.security.auth.Subject.doAs(Unknown Source) [?:?]
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
[hadoop-common-2.8.5.jar:?]
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
[flink-dist-1.17.0.jar:1.17.0]
at
org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:82)
[flink-dist-1.17.0.jar:1.17.0]
2023-04-07 09:18:32,824 [main] INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting
StandaloneApplicationClusterEntryPoint down with application status FAILED.
Diagnostics org.apache.flink.util.FlinkRuntimeException:
java.lang.IllegalStateException: Delegation token provider with service
name {} has multiple implementations [s3]
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.lambda$loadProviders$0(DefaultDelegationTokenManager.java:151)
at java.base/java.util.Iterator.forEachRemaining(Unknown Source)
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.loadProviders(DefaultDelegationTokenManager.java:156)
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.<init>(DefaultDelegationTokenManager.java:111)
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManagerFactory.create(DefaultDelegationTokenManagerFactory.java:50)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.initializeServices(ClusterEntrypoint.java:392)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:282)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$1(ClusterEntrypoint.java:232)
at java.base/java.security.AccessController.doPrivileged(Native Method)
at java.base/javax.security.auth.Subject.doAs(Unknown Source)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1844)
at
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:229)
at
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:729)
at
org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint.main(StandaloneApplicationClusterEntryPoint.java:82)
Caused by: java.lang.IllegalStateException: Delegation token provider with
service name {} has multiple implementations [s3]
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:215)
at
org.apache.flink.runtime.security.token.DefaultDelegationTokenManager.lambda$loadProviders$0(DefaultDelegationTokenManager.java:133)
... 14 more

I only managed to make the example run by setting the configuration
"security.delegation.token.provider.s3.enabled" to "false".

Refs:
[1]
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#auth-with-external-systems
[2]
https://flink.apache.org/2023/01/20/delegation-token-framework-obtain-distribute-and-use-temporary-credentials-automatically/
[3]
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/config/#security-delegation-tokens-enabled
[4]
https://nightlies.apache.org/flink/flink-docs-release-1.17/release-notes/flink-1.17
[5]
https://nightlies.apache.org/flink/flink-docs-stable/docs/deployment/resource-providers/standalone/overview/#application-mode

Best regards,
Arthur

Reply via email to