Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/3057#discussion_r95265401 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java --- @@ -71,163 +64,93 @@ */ public static void install(SecurityConfiguration config) throws Exception { - if (!config.securityIsEnabled()) { - // do not perform any initialization if no Kerberos crendetails are provided - return; - } - - // establish the JAAS config - JaasConfiguration jaasConfig = new JaasConfiguration(config.keytab, config.principal); - javax.security.auth.login.Configuration.setConfiguration(jaasConfig); - - populateSystemSecurityProperties(config.flinkConf); - - // establish the UGI login user - UserGroupInformation.setConfiguration(config.hadoopConf); - - // only configure Hadoop security if we have security enabled - if (UserGroupInformation.isSecurityEnabled()) { - - final UserGroupInformation loginUser; - - if (config.keytab != null && !StringUtils.isBlank(config.principal)) { - String keytabPath = (new File(config.keytab)).getAbsolutePath(); - - UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath); - - loginUser = UserGroupInformation.getLoginUser(); - - // supplement with any available tokens - String fileLocation = System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION); - if (fileLocation != null) { - /* - * Use reflection API since the API semantics are not available in Hadoop1 profile. Below APIs are - * used in the context of reading the stored tokens from UGI. - * Credentials cred = Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf); - * loginUser.addCredentials(cred); - */ - try { - Method readTokenStorageFileMethod = Credentials.class.getMethod("readTokenStorageFile", - File.class, org.apache.hadoop.conf.Configuration.class); - Credentials cred = (Credentials) readTokenStorageFileMethod.invoke(null, new File(fileLocation), - config.hadoopConf); - Method addCredentialsMethod = UserGroupInformation.class.getMethod("addCredentials", - Credentials.class); - addCredentialsMethod.invoke(loginUser, cred); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - } - } else { - // login with current user credentials (e.g. ticket cache) - try { - //Use reflection API to get the login user object - //UserGroupInformation.loginUserFromSubject(null); - Method loginUserFromSubjectMethod = UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class); - Subject subject = null; - loginUserFromSubjectMethod.invoke(null, subject); - } catch (NoSuchMethodException e) { - LOG.warn("Could not find method implementations in the shaded jar. Exception: {}", e); - } - - // note that the stored tokens are read automatically - loginUser = UserGroupInformation.getLoginUser(); + // install the security modules + List<SecurityModule> modules = new ArrayList(); + try { + for (Class<? extends SecurityModule> moduleClass : config.getSecurityModules()) { + SecurityModule module = moduleClass.newInstance(); + module.install(config); + modules.add(module); } + } + catch(Exception ex) { + throw new Exception("unable to establish the security context", ex); + } + installedModules = modules; - LOG.info("Hadoop user set to {}", loginUser.toString()); + // install a security context + // use the Hadoop login user as the subject of the installed security context + if (!(installedContext instanceof NoOpSecurityContext)) { + LOG.warn("overriding previous security context"); + } + UserGroupInformation loginUser = UserGroupInformation.getLoginUser(); + installedContext = new HadoopSecurityContext(loginUser); + } - boolean delegationToken = false; - final Text HDFS_DELEGATION_KIND = new Text("HDFS_DELEGATION_TOKEN"); - Collection<Token<? extends TokenIdentifier>> usrTok = loginUser.getTokens(); - for (Token<? extends TokenIdentifier> token : usrTok) { - final Text id = new Text(token.getIdentifier()); - LOG.debug("Found user token " + id + " with " + token); - if (token.getKind().equals(HDFS_DELEGATION_KIND)) { - delegationToken = true; + static void uninstall() { + if(installedModules != null) { + for (SecurityModule module : Lists.reverse(installedModules)) { + try { + module.uninstall(); } - } - - if (!loginUser.hasKerberosCredentials()) { - //throw an error in non-yarn deployment if kerberos cache is not available - if (!delegationToken) { - LOG.error("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); - throw new RuntimeException("Hadoop Security is enabled but current login user does not have Kerberos Credentials"); + catch(UnsupportedOperationException e) { --- End diff -- The uninstall isn't used in production, only in test code. Throwing gives more information to the unit test. Hopefully getting rid of the warning will suffice for now.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---