[
https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15395687#comment-15395687
]
ASF GitHub Bot commented on FLINK-3929:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/2275#discussion_r72439915
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityContext.java
---
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Preconditions;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+/*
+ * Process-wide security context object which initializes UGI with
appropriate security credentials and also it
+ * creates in-memory JAAS configuration object which will serve
appropriate ApplicationConfigurationEntry for the
+ * connector login module implementation that authenticates Kerberos
identity using SASL/JAAS based mechanism.
+ */
+@Internal
+public class SecurityContext {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SecurityContext.class);
+
+ private static SecurityContext installedContext;
+
+ public static SecurityContext getInstalled() { return installedContext;
}
+
+ private UserGroupInformation ugi;
+
+ SecurityContext(UserGroupInformation ugi) {
+ this.ugi = ugi;
+ }
+
+ public <T> T runSecured(final FlinkSecuredRunner<T> runner) throws
Exception {
+ return ugi.doAs(new PrivilegedExceptionAction<T>() {
+ @Override
+ public T run() throws Exception {
+ return runner.run();
+ }
+ });
+ }
+
+ public static void install(SecurityConfiguration config) throws
Exception {
+
+ // perform static initialization of UGI, JAAS
+ if(installedContext != null) {
+ LOG.warn("overriding previous security context");
+ }
+
+ // establish the JAAS config
+ JaasConfiguration jaasConfig = new
JaasConfiguration(config.keytab, config.principal);
+
javax.security.auth.login.Configuration.setConfiguration(jaasConfig);
+
+ //hack since Kafka Login Handler explicitly look for the
property or else it throws an exception
+
//https://github.com/apache/kafka/blob/0.9.0/clients/src/main/java/org/apache/kafka/common/security/kerberos/Login.java#L289
+ System.setProperty("java.security.auth.login.config", "");
+
+ // establish the UGI login user
+ UserGroupInformation.setConfiguration(config.hadoopConf);
+ UserGroupInformation loginUser;
+ if(UserGroupInformation.isSecurityEnabled() && config.keytab !=
null && !Preconditions.isNullOrEmpty(config.principal)) {
+
+ String keytabPath = (new
File(config.keytab)).getAbsolutePath();
+ // login with keytab
+
UserGroupInformation.loginUserFromKeytab(config.principal, keytabPath);
+
+ loginUser = UserGroupInformation.getLoginUser();
+
+ // supplement with any available tokens
+ String fileLocation =
System.getenv(UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION);
+ if(fileLocation != null) {
+ /*
+ * Use reflection API since the API semantics
are not available in Hadoop1 profile. Below APIs are
+ * used in the context of reading the stored
tokens from UGI.
+ * Credentials cred =
Credentials.readTokenStorageFile(new File(fileLocation), config.hadoopConf);
+ * loginUser.addCredentials(cred);
+ */
+ try {
+ Method readTokenStorageFileMethod =
Credentials.class.getMethod("readTokenStorageFile",
+
File.class,
org.apache.hadoop.conf.Configuration.class);
+ Credentials cred = (Credentials)
readTokenStorageFileMethod.invoke(null,new File(fileLocation),
+
config.hadoopConf);
+ Method addCredentialsMethod =
UserGroupInformation.class.getMethod("addCredentials",
+
Credentials.class);
+
addCredentialsMethod.invoke(loginUser,cred);
+ } catch(NoSuchMethodException e) {
+ LOG.warn("Could not find method
implementations in the shaded jar. Exception: {}", e);
+ }
+ }
+ } else {
+ // login with current user credentials (e.g. ticket
cache)
+ try {
+ //Use reflection API to get the login user
object
+
//UserGroupInformation.loginUserFromSubject(null);
+ Method loginUserFromSubjectMethod =
UserGroupInformation.class.getMethod("loginUserFromSubject", Subject.class);
+ Subject subject = null;
+ loginUserFromSubjectMethod.invoke(null,subject);
+ } catch(NoSuchMethodException e) {
+ LOG.warn("Could not find method implementations
in the shaded jar. Exception: {}", e);
+ }
+
+ loginUser = UserGroupInformation.getLoginUser();
+ // note that the stored tokens are read automatically
+ }
+
+ if(UserGroupInformation.isSecurityEnabled() &&
!UserGroupInformation.getLoginUser().hasKerberosCredentials()) {
+ LOG.error("Hadoop Security is enabled but current login
user does not have Kerberos Credentials");
+ throw new RuntimeException("Hadoop Security is enabled
but current login user does not have Kerberos Credentials");
+ }
+
+ installedContext = new SecurityContext(loginUser);
+ }
+
+ /**
+ * Inputs for establishing the security context.
+ */
+ public static class SecurityConfiguration {
+
+ Configuration flinkConf = null;
+
+ org.apache.hadoop.conf.Configuration hadoopConf = new
org.apache.hadoop.conf.Configuration();
+
+ String keytab = null;
+
+ String principal = null;
+
+ public String getKeytab() {
+ return keytab;
+ }
+
+ public String getPrincipal() {
+ return principal;
+ }
+
+ public SecurityConfiguration
setFlinkConfiguration(Configuration flinkConf) {
--- End diff --
Why is the Flink configuration not passed upon Object creation of this
class?
> Support for Kerberos Authentication with Keytab Credential
> ----------------------------------------------------------
>
> Key: FLINK-3929
> URL: https://issues.apache.org/jira/browse/FLINK-3929
> Project: Flink
> Issue Type: New Feature
> Reporter: Eron Wright
> Assignee: Vijay Srinivasaraghavan
> Labels: kerberos, security
> Original Estimate: 672h
> Remaining Estimate: 672h
>
> _This issue is part of a series of improvements detailed in the [Secure Data
> Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing]
> design doc._
> Add support for a keytab credential to be associated with the Flink cluster,
> to facilitate:
> - Kerberos-authenticated data access for connectors
> - Kerberos-authenticated ZooKeeper access
> Support both the standalone and YARN deployment modes.
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)