XComp commented on a change in pull request #14499: URL: https://github.com/apache/flink/pull/14499#discussion_r555570200
########## File path: flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java ########## @@ -115,4 +127,35 @@ public static boolean isDeclarativeResourceManagementEnabled(Configuration confi return configuration.get(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT) && !System.getProperties().containsKey("flink.tests.disable-declarative"); } + + /** The mode of how to handle user code attempting to exit JVM. */ + public enum UserSystemExitMode { + DISABLED("No check is enabled, that is allowing exit without any action"), Review comment: ```suggestion DISABLED("No check is enabled. Hence, exits are allowed without any action being triggered."), ``` I feel like the current version of the description might be ambiguous: Firstly, I read it like "there is no check which allows exiting without any action". I suggest the version above. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/security/FlinkSecurityManager.java ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.security.Permission; +import java.util.function.Consumer; + +/** + * {@code FlinkSecurityManager} to control certain behaviors that can be captured by Java system + * security manager. It can be used to control unexpected user behaviors that potentially impact + * cluster availability, for example, it can warn or prevent user code from terminating JVM by + * System.exit or halt by logging or throwing an exception. This does not necessarily prevent + * malicious users who try to tweak security manager on their own, but more for being dependable + * against user mistakes by gracefully handling them informing users rather than causing silent + * unavailability. + */ +public class FlinkSecurityManager extends SecurityManager { + + static final Logger LOG = LoggerFactory.getLogger(FlinkSecurityManager.class); + + /** + * Security manager reference lastly set to system's security manager by public API. As system + * security manager can be reset with another but still chain-called into this manager properly, + * this reference may not be referenced by System.getSecurityManager, but we still need to + * control runtime check behaviors such as monitoring exit from user code. + */ + private static FlinkSecurityManager flinkSecurityManager; + + private final SecurityManager originalSecurityManager; + private final ThreadLocal<Boolean> monitorUserSystemExit = new InheritableThreadLocal<>(); + private final ClusterOptions.UserSystemExitMode userSystemExitMode; + + /** The behavior to execute when the JVM exists. */ + private final Consumer<Integer> onExitBehavior; + + @VisibleForTesting + FlinkSecurityManager( + ClusterOptions.UserSystemExitMode userSystemExitMode, + @Nullable Consumer<Integer> onExitBehavior) { + this(userSystemExitMode, onExitBehavior, System.getSecurityManager()); + } + + @VisibleForTesting + FlinkSecurityManager( + ClusterOptions.UserSystemExitMode userSystemExitMode, + @Nullable Consumer<Integer> onExitBehavior, + SecurityManager originalSecurityManager) { + this.userSystemExitMode = Preconditions.checkNotNull(userSystemExitMode); + this.onExitBehavior = onExitBehavior; + this.originalSecurityManager = originalSecurityManager; + } + + /** + * Instantiate FlinkUserSecurityManager from configuration. Return null if no security manager + * check is needed, so that a caller can skip setting security manager avoiding runtime check + * cost, if there is no security check set up already. Use {@link #setFromConfiguration} helper, + * which handles disabled case. + * + * @param configuration to instantiate the security manager from + * @return FlinkUserSecurityManager instantiated based on configuration. Return null if + * disabled. + */ + @VisibleForTesting + static FlinkSecurityManager fromConfiguration(Configuration configuration) { + final ClusterOptions.UserSystemExitMode userSystemExitMode = + configuration.get(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT); + + boolean haltOnSystemExit = configuration.get(ClusterOptions.HALT_ON_SYSTEM_EXIT); + + // If no check is needed, return null so that caller can avoid setting security manager not + // to incur any runtime cost. + if (userSystemExitMode == ClusterOptions.UserSystemExitMode.DISABLED && !haltOnSystemExit) { + return null; + } + Consumer<Integer> onExitBehavior = null; + // If halt on system exit is configured, registers a custom SecurityManager which converts + // graceful exists calls using {@code System#exit} into forceful exit calls using + // {@code Runtime#halt}. The latter does not perform a clean shutdown using the registered + // shutdown hooks. This may be configured to prevent deadlocks with Java 8 and the G1 + // garbage collection, see https://issues.apache.org/jira/browse/FLINK-16510. + if (haltOnSystemExit) { + onExitBehavior = status -> Runtime.getRuntime().halt(status); + } + LOG.info( + "FlinkSecurityManager is created with {} user system exit mode and {} exit", + userSystemExitMode, + haltOnSystemExit ? "forceful" : "graceful"); + // Add more configuration parameters that need user security manager (currently only for + // system exit). + return new FlinkSecurityManager(userSystemExitMode, onExitBehavior); + } + + public static void setFromConfiguration(Configuration configuration) { + final FlinkSecurityManager flinkSecurityManager = + FlinkSecurityManager.fromConfiguration(configuration); + if (flinkSecurityManager != null) { + try { + System.setSecurityManager(flinkSecurityManager); + } catch (Exception e) { + throw new IllegalConfigurationException( + String.format( + "Could not register security manager due to no permission to " + + "set a SecurityManager. Either update your existing " + + "SecurityManager to allow the permission or not using " + + "security manager features (e.g., '%s: %s', '%s: %s')", + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT.key(), + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT.defaultValue(), + ClusterOptions.HALT_ON_SYSTEM_EXIT.key(), + ClusterOptions.HALT_ON_SYSTEM_EXIT.defaultValue(), + e)); + } + } + FlinkSecurityManager.flinkSecurityManager = flinkSecurityManager; + } + + public static void monitorUserSystemExitForCurrentThread() { + if (FlinkSecurityManager.flinkSecurityManager != null) { + FlinkSecurityManager.flinkSecurityManager.monitorUserSystemExit(); + } + } + + public static void unmonitorUserSystemExitForCurrentThread() { + if (FlinkSecurityManager.flinkSecurityManager != null) { + FlinkSecurityManager.flinkSecurityManager.unmonitorUserSystemExit(); + } + } + + @Override + public void checkPermission(Permission perm) { + if (originalSecurityManager != null) { + originalSecurityManager.checkPermission(perm); + } + } + + @Override + public void checkPermission(Permission perm, Object context) { + if (originalSecurityManager != null) { + originalSecurityManager.checkPermission(perm, context); + } + } + + @Override + public void checkExit(int status) { + if (userSystemExitMonitored()) { + switch (userSystemExitMode) { + case DISABLED: + break; + case LOG: + // Add exception trace log to help users to debug where exit came from. + LOG.warn( + "Exiting JVM with status {} is monitored, logging and exiting", + status, + new UserSystemExitException()); Review comment: Do we need to add this exception here? It does not add any value, doesn't it? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@code FlinkUserSecurityManager}. */ +public class FlinkSecurityManagerTest extends TestLogger { + + private static final int TEST_EXIT_CODE = 123; + SecurityManager originalSecurityManager; + FlinkSecurityManager flinkSecurityManager; + + @Before + public void setUp() { + originalSecurityManager = System.getSecurityManager(); + } + + @After + public void tearDown() { + System.setSecurityManager(originalSecurityManager); + } + + @Test(expected = UserSystemExitException.class) + public void testThrowUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testToggleUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testPerThreadThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + // Async thread test before enabling monitoring ensures it does not throw while prestarting + // worker thread, which is to be unmonitored and tested after enabling monitoring enabled. + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + // This threaded exit should be allowed as thread is not spawned while monitor is enabled. + future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + } + + @Test + public void testInheritedThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + CheckedThread thread = + new CheckedThread() { + @Override + public void go() { + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } catch (Throwable t) { + fail(); + } + } + }; + thread.start(); + thread.sync(); + } + + @Test + public void testLogUserExit() { + // Log mode enables monitor but only logging allowing exit, hence not expecting exception. + // NOTE - Do not specifically test warning logging. + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testExitBehaviorChanged() { + AtomicInteger exitStatus = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, + status -> exitStatus.set(status)); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(exitStatus.get(), is(TEST_EXIT_CODE)); + } + + @Test + public void testExitBehaviorChangedWithExistingSecurityManager() { + TestExitSecurityManager existingSecurityManager = new TestExitSecurityManager(); + System.setSecurityManager(existingSecurityManager); + AtomicInteger customExitExecuted = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, customExitExecuted::set); + + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(existingSecurityManager.getExitStatus(), is(TEST_EXIT_CODE)); + assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE)); + } + + @Test(expected = UserSystemExitException.class) + public void testExitBehaviorUnchangeOnThrowingUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail()); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testDisabledConfiguration() { + // Default case (no provided option) - allowing everything, so null security manager is + // expected. + Configuration configuration = new Configuration(); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + + // Disabled case (same as default) + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, + ClusterOptions.UserSystemExitMode.DISABLED); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + + // No halt (same as default) + configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + } + + @Test + public void testLogConfiguration() { + // Enabled - log case (logging as warning but allowing exit) + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.LOG); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.unmonitorUserSystemExit(); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + } + + @Test + public void testThrowConfiguration() { + // Enabled - throw case (disallowing by throwing exception) + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.THROW); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + + // Test for disabled test to check if exit is still allowed (fromConfiguration gives null + // since currently + // there is only one option to have a valid security manager, so test with constructor). + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.DISABLED, null); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testHaltConfiguration() { + // Halt as forceful shutdown replacing graceful system exit + Configuration configuration = new Configuration(); + configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, true); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + } + + @Test(expected = NullPointerException.class) + public void testInvalidConfiguration() { + Configuration configuration = new Configuration(); + configuration.set(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, null); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + } + + @Test + public void testExistingSecurityManagerRespected() { + // Don't set the following security manager directly to system, which makes test hang. + SecurityManager originalSecurityManager = + new SecurityManager() { + @Override + public void checkPermission(Permission perm) { + throw new SecurityException("not allowed"); + } + }; + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, + status -> Assert.fail(), + originalSecurityManager); + + assertThrows( + "not allowed", + SecurityException.class, + () -> { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + return null; + }); + } + + @Test + public void testRegistrationNotAllowedByExistingSecurityManager() { + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.THROW); + + System.setSecurityManager( + new SecurityManager() { + + private boolean fired; + + @Override + public void checkPermission(Permission perm) { + if (!fired && perm.getName().equals("setSecurityManager")) { + try { + throw new SecurityException("not allowed"); + } finally { + // Allow removing this manager again + fired = true; + } + } + } + }); + + assertThrows( + "Could not register security manager", + IllegalConfigurationException.class, + () -> { + FlinkSecurityManager.setFromConfiguration(configuration); + return null; + }); + } + + @Test(expected = UserSystemExitException.class) + public void testMultiSecurityManagersWithSetFirstAndMonitored() { + Configuration configuration = new Configuration(); + + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.THROW); + configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false); + + FlinkSecurityManager.setFromConfiguration(configuration); + + TestExitSecurityManager newSecurityManager = new TestExitSecurityManager(); + System.setSecurityManager(newSecurityManager); + + FlinkSecurityManager.monitorUserSystemExitForCurrentThread(); + newSecurityManager.checkExit(TEST_EXIT_CODE); Review comment: You would have to catch the `UserSystemExitException` here instead of using JUnit's `expected` feature if you want to check afterwards whether `TestExitSecurityManager` was triggered. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@code FlinkUserSecurityManager}. */ +public class FlinkSecurityManagerTest extends TestLogger { + + private static final int TEST_EXIT_CODE = 123; + SecurityManager originalSecurityManager; + FlinkSecurityManager flinkSecurityManager; + + @Before + public void setUp() { + originalSecurityManager = System.getSecurityManager(); + } + + @After + public void tearDown() { + System.setSecurityManager(originalSecurityManager); + } + + @Test(expected = UserSystemExitException.class) + public void testThrowUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testToggleUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testPerThreadThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + // Async thread test before enabling monitoring ensures it does not throw while prestarting + // worker thread, which is to be unmonitored and tested after enabling monitoring enabled. + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + // This threaded exit should be allowed as thread is not spawned while monitor is enabled. + future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + } + + @Test + public void testInheritedThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + CheckedThread thread = + new CheckedThread() { + @Override + public void go() { + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } catch (Throwable t) { + fail(); + } + } + }; + thread.start(); + thread.sync(); + } + + @Test + public void testLogUserExit() { + // Log mode enables monitor but only logging allowing exit, hence not expecting exception. + // NOTE - Do not specifically test warning logging. + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testExitBehaviorChanged() { + AtomicInteger exitStatus = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, + status -> exitStatus.set(status)); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(exitStatus.get(), is(TEST_EXIT_CODE)); + } + + @Test + public void testExitBehaviorChangedWithExistingSecurityManager() { + TestExitSecurityManager existingSecurityManager = new TestExitSecurityManager(); + System.setSecurityManager(existingSecurityManager); + AtomicInteger customExitExecuted = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, customExitExecuted::set); + + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(existingSecurityManager.getExitStatus(), is(TEST_EXIT_CODE)); + assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE)); + } + + @Test(expected = UserSystemExitException.class) + public void testExitBehaviorUnchangeOnThrowingUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail()); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testDisabledConfiguration() { + // Default case (no provided option) - allowing everything, so null security manager is + // expected. + Configuration configuration = new Configuration(); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + + // Disabled case (same as default) + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, + ClusterOptions.UserSystemExitMode.DISABLED); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + + // No halt (same as default) + configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + } + + @Test + public void testLogConfiguration() { + // Enabled - log case (logging as warning but allowing exit) + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.LOG); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.unmonitorUserSystemExit(); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + } + + @Test + public void testThrowConfiguration() { + // Enabled - throw case (disallowing by throwing exception) + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.THROW); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + + // Test for disabled test to check if exit is still allowed (fromConfiguration gives null + // since currently + // there is only one option to have a valid security manager, so test with constructor). + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.DISABLED, null); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testHaltConfiguration() { + // Halt as forceful shutdown replacing graceful system exit + Configuration configuration = new Configuration(); + configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, true); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + } + + @Test(expected = NullPointerException.class) + public void testInvalidConfiguration() { + Configuration configuration = new Configuration(); + configuration.set(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, null); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + } + + @Test + public void testExistingSecurityManagerRespected() { + // Don't set the following security manager directly to system, which makes test hang. + SecurityManager originalSecurityManager = + new SecurityManager() { + @Override + public void checkPermission(Permission perm) { + throw new SecurityException("not allowed"); + } + }; + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, + status -> Assert.fail(), + originalSecurityManager); + + assertThrows( + "not allowed", + SecurityException.class, + () -> { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + return null; + }); + } + + @Test + public void testRegistrationNotAllowedByExistingSecurityManager() { + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.THROW); + + System.setSecurityManager( + new SecurityManager() { Review comment: You could add this feature to the testing implementation of `SecurityManager` instead of using an anonymous class. See [TestingDispatcherRunner](https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/TestingDispatcherRunner.java) as an example `Testing*` implementation. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/security/FlinkSecurityManager.java ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.security.Permission; +import java.util.function.Consumer; + +/** + * {@code FlinkSecurityManager} to control certain behaviors that can be captured by Java system + * security manager. It can be used to control unexpected user behaviors that potentially impact + * cluster availability, for example, it can warn or prevent user code from terminating JVM by + * System.exit or halt by logging or throwing an exception. This does not necessarily prevent + * malicious users who try to tweak security manager on their own, but more for being dependable + * against user mistakes by gracefully handling them informing users rather than causing silent + * unavailability. + */ +public class FlinkSecurityManager extends SecurityManager { + + static final Logger LOG = LoggerFactory.getLogger(FlinkSecurityManager.class); + + /** + * Security manager reference lastly set to system's security manager by public API. As system + * security manager can be reset with another but still chain-called into this manager properly, + * this reference may not be referenced by System.getSecurityManager, but we still need to + * control runtime check behaviors such as monitoring exit from user code. + */ + private static FlinkSecurityManager flinkSecurityManager; + + private final SecurityManager originalSecurityManager; + private final ThreadLocal<Boolean> monitorUserSystemExit = new InheritableThreadLocal<>(); + private final ClusterOptions.UserSystemExitMode userSystemExitMode; + + /** The behavior to execute when the JVM exists. */ + private final Consumer<Integer> onExitBehavior; + + @VisibleForTesting + FlinkSecurityManager( + ClusterOptions.UserSystemExitMode userSystemExitMode, + @Nullable Consumer<Integer> onExitBehavior) { + this(userSystemExitMode, onExitBehavior, System.getSecurityManager()); + } + + @VisibleForTesting + FlinkSecurityManager( + ClusterOptions.UserSystemExitMode userSystemExitMode, + @Nullable Consumer<Integer> onExitBehavior, + SecurityManager originalSecurityManager) { + this.userSystemExitMode = Preconditions.checkNotNull(userSystemExitMode); + this.onExitBehavior = onExitBehavior; + this.originalSecurityManager = originalSecurityManager; + } + + /** + * Instantiate FlinkUserSecurityManager from configuration. Return null if no security manager + * check is needed, so that a caller can skip setting security manager avoiding runtime check + * cost, if there is no security check set up already. Use {@link #setFromConfiguration} helper, + * which handles disabled case. + * + * @param configuration to instantiate the security manager from + * @return FlinkUserSecurityManager instantiated based on configuration. Return null if + * disabled. + */ + @VisibleForTesting + static FlinkSecurityManager fromConfiguration(Configuration configuration) { + final ClusterOptions.UserSystemExitMode userSystemExitMode = + configuration.get(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT); + + boolean haltOnSystemExit = configuration.get(ClusterOptions.HALT_ON_SYSTEM_EXIT); + + // If no check is needed, return null so that caller can avoid setting security manager not + // to incur any runtime cost. + if (userSystemExitMode == ClusterOptions.UserSystemExitMode.DISABLED && !haltOnSystemExit) { + return null; + } + Consumer<Integer> onExitBehavior = null; + // If halt on system exit is configured, registers a custom SecurityManager which converts + // graceful exists calls using {@code System#exit} into forceful exit calls using + // {@code Runtime#halt}. The latter does not perform a clean shutdown using the registered + // shutdown hooks. This may be configured to prevent deadlocks with Java 8 and the G1 + // garbage collection, see https://issues.apache.org/jira/browse/FLINK-16510. + if (haltOnSystemExit) { + onExitBehavior = status -> Runtime.getRuntime().halt(status); + } + LOG.info( + "FlinkSecurityManager is created with {} user system exit mode and {} exit", + userSystemExitMode, + haltOnSystemExit ? "forceful" : "graceful"); + // Add more configuration parameters that need user security manager (currently only for + // system exit). + return new FlinkSecurityManager(userSystemExitMode, onExitBehavior); + } + + public static void setFromConfiguration(Configuration configuration) { + final FlinkSecurityManager flinkSecurityManager = + FlinkSecurityManager.fromConfiguration(configuration); + if (flinkSecurityManager != null) { + try { + System.setSecurityManager(flinkSecurityManager); + } catch (Exception e) { + throw new IllegalConfigurationException( + String.format( + "Could not register security manager due to no permission to " + + "set a SecurityManager. Either update your existing " + + "SecurityManager to allow the permission or not using " Review comment: ```suggestion + "SecurityManager to allow the permission or do not use " ``` ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java ########## @@ -88,21 +93,28 @@ .build()); @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER) - public static final ConfigOption<Boolean> HALT_ON_FATAL_ERROR = - key("cluster.processes.halt-on-fatal-error") + public static final ConfigOption<Boolean> HALT_ON_SYSTEM_EXIT = + key("cluster.processes.halt-on-system-exit") Review comment: Renaming an existing parameter has a bigger impact on users as they possibly have to change their existing configuration files when upgrading. Having said that, I would be ok with that change if we decide to merge both parameters into one as they are more or less just defining some special exit behavior. What do you think? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/security/FlinkSecurityManager.java ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.security.Permission; +import java.util.function.Consumer; + +/** + * {@code FlinkSecurityManager} to control certain behaviors that can be captured by Java system + * security manager. It can be used to control unexpected user behaviors that potentially impact + * cluster availability, for example, it can warn or prevent user code from terminating JVM by + * System.exit or halt by logging or throwing an exception. This does not necessarily prevent + * malicious users who try to tweak security manager on their own, but more for being dependable + * against user mistakes by gracefully handling them informing users rather than causing silent + * unavailability. + */ +public class FlinkSecurityManager extends SecurityManager { + + static final Logger LOG = LoggerFactory.getLogger(FlinkSecurityManager.class); + + /** + * Security manager reference lastly set to system's security manager by public API. As system + * security manager can be reset with another but still chain-called into this manager properly, + * this reference may not be referenced by System.getSecurityManager, but we still need to + * control runtime check behaviors such as monitoring exit from user code. + */ + private static FlinkSecurityManager flinkSecurityManager; + + private final SecurityManager originalSecurityManager; + private final ThreadLocal<Boolean> monitorUserSystemExit = new InheritableThreadLocal<>(); + private final ClusterOptions.UserSystemExitMode userSystemExitMode; + + /** The behavior to execute when the JVM exists. */ + private final Consumer<Integer> onExitBehavior; + + @VisibleForTesting + FlinkSecurityManager( + ClusterOptions.UserSystemExitMode userSystemExitMode, + @Nullable Consumer<Integer> onExitBehavior) { + this(userSystemExitMode, onExitBehavior, System.getSecurityManager()); + } + + @VisibleForTesting + FlinkSecurityManager( + ClusterOptions.UserSystemExitMode userSystemExitMode, + @Nullable Consumer<Integer> onExitBehavior, + SecurityManager originalSecurityManager) { + this.userSystemExitMode = Preconditions.checkNotNull(userSystemExitMode); + this.onExitBehavior = onExitBehavior; + this.originalSecurityManager = originalSecurityManager; + } + + /** + * Instantiate FlinkUserSecurityManager from configuration. Return null if no security manager + * check is needed, so that a caller can skip setting security manager avoiding runtime check + * cost, if there is no security check set up already. Use {@link #setFromConfiguration} helper, + * which handles disabled case. + * + * @param configuration to instantiate the security manager from + * @return FlinkUserSecurityManager instantiated based on configuration. Return null if + * disabled. + */ + @VisibleForTesting + static FlinkSecurityManager fromConfiguration(Configuration configuration) { + final ClusterOptions.UserSystemExitMode userSystemExitMode = + configuration.get(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT); + + boolean haltOnSystemExit = configuration.get(ClusterOptions.HALT_ON_SYSTEM_EXIT); + + // If no check is needed, return null so that caller can avoid setting security manager not + // to incur any runtime cost. + if (userSystemExitMode == ClusterOptions.UserSystemExitMode.DISABLED && !haltOnSystemExit) { + return null; + } + Consumer<Integer> onExitBehavior = null; + // If halt on system exit is configured, registers a custom SecurityManager which converts + // graceful exists calls using {@code System#exit} into forceful exit calls using + // {@code Runtime#halt}. The latter does not perform a clean shutdown using the registered + // shutdown hooks. This may be configured to prevent deadlocks with Java 8 and the G1 + // garbage collection, see https://issues.apache.org/jira/browse/FLINK-16510. + if (haltOnSystemExit) { + onExitBehavior = status -> Runtime.getRuntime().halt(status); + } + LOG.info( + "FlinkSecurityManager is created with {} user system exit mode and {} exit", + userSystemExitMode, + haltOnSystemExit ? "forceful" : "graceful"); + // Add more configuration parameters that need user security manager (currently only for + // system exit). + return new FlinkSecurityManager(userSystemExitMode, onExitBehavior); + } + + public static void setFromConfiguration(Configuration configuration) { + final FlinkSecurityManager flinkSecurityManager = + FlinkSecurityManager.fromConfiguration(configuration); + if (flinkSecurityManager != null) { + try { + System.setSecurityManager(flinkSecurityManager); + } catch (Exception e) { + throw new IllegalConfigurationException( + String.format( + "Could not register security manager due to no permission to " + + "set a SecurityManager. Either update your existing " + + "SecurityManager to allow the permission or not using " + + "security manager features (e.g., '%s: %s', '%s: %s')", + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT.key(), + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT.defaultValue(), + ClusterOptions.HALT_ON_SYSTEM_EXIT.key(), + ClusterOptions.HALT_ON_SYSTEM_EXIT.defaultValue(), + e)); + } + } + FlinkSecurityManager.flinkSecurityManager = flinkSecurityManager; + } + + public static void monitorUserSystemExitForCurrentThread() { + if (FlinkSecurityManager.flinkSecurityManager != null) { + FlinkSecurityManager.flinkSecurityManager.monitorUserSystemExit(); + } + } + + public static void unmonitorUserSystemExitForCurrentThread() { + if (FlinkSecurityManager.flinkSecurityManager != null) { + FlinkSecurityManager.flinkSecurityManager.unmonitorUserSystemExit(); + } + } + + @Override + public void checkPermission(Permission perm) { + if (originalSecurityManager != null) { + originalSecurityManager.checkPermission(perm); + } + } + + @Override + public void checkPermission(Permission perm, Object context) { + if (originalSecurityManager != null) { + originalSecurityManager.checkPermission(perm, context); + } + } + + @Override + public void checkExit(int status) { + if (userSystemExitMonitored()) { + switch (userSystemExitMode) { + case DISABLED: + break; + case LOG: + // Add exception trace log to help users to debug where exit came from. + LOG.warn( + "Exiting JVM with status {} is monitored, logging and exiting", Review comment: ```suggestion "Exiting JVM with status {} is monitored: The system will exit due to this call.", ``` I suggest a slight modification here... ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@code FlinkUserSecurityManager}. */ +public class FlinkSecurityManagerTest extends TestLogger { + + private static final int TEST_EXIT_CODE = 123; + SecurityManager originalSecurityManager; + FlinkSecurityManager flinkSecurityManager; + + @Before + public void setUp() { + originalSecurityManager = System.getSecurityManager(); + } + + @After + public void tearDown() { + System.setSecurityManager(originalSecurityManager); + } + + @Test(expected = UserSystemExitException.class) + public void testThrowUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testToggleUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testPerThreadThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + // Async thread test before enabling monitoring ensures it does not throw while prestarting + // worker thread, which is to be unmonitored and tested after enabling monitoring enabled. + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + // This threaded exit should be allowed as thread is not spawned while monitor is enabled. + future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + } + + @Test + public void testInheritedThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + CheckedThread thread = + new CheckedThread() { + @Override + public void go() { + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } catch (Throwable t) { + fail(); + } + } + }; + thread.start(); + thread.sync(); + } + + @Test + public void testLogUserExit() { + // Log mode enables monitor but only logging allowing exit, hence not expecting exception. + // NOTE - Do not specifically test warning logging. + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testExitBehaviorChanged() { + AtomicInteger exitStatus = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, + status -> exitStatus.set(status)); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(exitStatus.get(), is(TEST_EXIT_CODE)); + } + + @Test + public void testExitBehaviorChangedWithExistingSecurityManager() { + TestExitSecurityManager existingSecurityManager = new TestExitSecurityManager(); + System.setSecurityManager(existingSecurityManager); + AtomicInteger customExitExecuted = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, customExitExecuted::set); + + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(existingSecurityManager.getExitStatus(), is(TEST_EXIT_CODE)); + assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE)); + } + + @Test(expected = UserSystemExitException.class) + public void testExitBehaviorUnchangeOnThrowingUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail()); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testDisabledConfiguration() { + // Default case (no provided option) - allowing everything, so null security manager is + // expected. + Configuration configuration = new Configuration(); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + + // Disabled case (same as default) + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, + ClusterOptions.UserSystemExitMode.DISABLED); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + + // No halt (same as default) + configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + } + + @Test + public void testLogConfiguration() { + // Enabled - log case (logging as warning but allowing exit) + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.LOG); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.unmonitorUserSystemExit(); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + } + + @Test + public void testThrowConfiguration() { + // Enabled - throw case (disallowing by throwing exception) + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.THROW); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + + // Test for disabled test to check if exit is still allowed (fromConfiguration gives null + // since currently + // there is only one option to have a valid security manager, so test with constructor). + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.DISABLED, null); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testHaltConfiguration() { + // Halt as forceful shutdown replacing graceful system exit + Configuration configuration = new Configuration(); + configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, true); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + } + + @Test(expected = NullPointerException.class) + public void testInvalidConfiguration() { + Configuration configuration = new Configuration(); + configuration.set(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, null); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); Review comment: ```suggestion flinkSecurityManager = FlinkSecurityManager.fromConfiguration(new Configuration()); ``` You're testing the `NullPointerException` being thrown by `Configuration.set(..)`. Calling `fromConfiguration` on an empty configuration would do it. But, I guess, this test is obsolete as there is no special behavior in `fromConfiguration` that needs to get tested. It should use the default value for `ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT`. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/security/FlinkSecurityManager.java ########## @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.security.Permission; +import java.util.function.Consumer; + +/** + * {@code FlinkSecurityManager} to control certain behaviors that can be captured by Java system + * security manager. It can be used to control unexpected user behaviors that potentially impact + * cluster availability, for example, it can warn or prevent user code from terminating JVM by + * System.exit or halt by logging or throwing an exception. This does not necessarily prevent + * malicious users who try to tweak security manager on their own, but more for being dependable + * against user mistakes by gracefully handling them informing users rather than causing silent + * unavailability. + */ +public class FlinkSecurityManager extends SecurityManager { + + static final Logger LOG = LoggerFactory.getLogger(FlinkSecurityManager.class); + + /** + * Security manager reference lastly set to system's security manager by public API. As system + * security manager can be reset with another but still chain-called into this manager properly, + * this reference may not be referenced by System.getSecurityManager, but we still need to + * control runtime check behaviors such as monitoring exit from user code. + */ + private static FlinkSecurityManager flinkSecurityManager; + + private final SecurityManager originalSecurityManager; + private final ThreadLocal<Boolean> monitorUserSystemExit = new InheritableThreadLocal<>(); + private final ClusterOptions.UserSystemExitMode userSystemExitMode; + + /** The behavior to execute when the JVM exists. */ + private final Consumer<Integer> onExitBehavior; + + @VisibleForTesting + FlinkSecurityManager( + ClusterOptions.UserSystemExitMode userSystemExitMode, + @Nullable Consumer<Integer> onExitBehavior) { + this(userSystemExitMode, onExitBehavior, System.getSecurityManager()); + } + + @VisibleForTesting + FlinkSecurityManager( + ClusterOptions.UserSystemExitMode userSystemExitMode, + @Nullable Consumer<Integer> onExitBehavior, + SecurityManager originalSecurityManager) { + this.userSystemExitMode = Preconditions.checkNotNull(userSystemExitMode); + this.onExitBehavior = onExitBehavior; + this.originalSecurityManager = originalSecurityManager; + } + + /** + * Instantiate FlinkUserSecurityManager from configuration. Return null if no security manager + * check is needed, so that a caller can skip setting security manager avoiding runtime check + * cost, if there is no security check set up already. Use {@link #setFromConfiguration} helper, + * which handles disabled case. + * + * @param configuration to instantiate the security manager from + * @return FlinkUserSecurityManager instantiated based on configuration. Return null if + * disabled. + */ + @VisibleForTesting + static FlinkSecurityManager fromConfiguration(Configuration configuration) { Review comment: We have a special parameter combination where we might have to add (at least) a warning: `HALT_ON_SYSTEM_EXIT ` is implicitly disabled right now if `INTERCEPT_USER_SYSTEM_EXIT` is set to `THROW`. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@code FlinkUserSecurityManager}. */ +public class FlinkSecurityManagerTest extends TestLogger { + + private static final int TEST_EXIT_CODE = 123; + SecurityManager originalSecurityManager; + FlinkSecurityManager flinkSecurityManager; + + @Before + public void setUp() { + originalSecurityManager = System.getSecurityManager(); + } + + @After + public void tearDown() { + System.setSecurityManager(originalSecurityManager); + } + + @Test(expected = UserSystemExitException.class) + public void testThrowUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testToggleUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testPerThreadThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + // Async thread test before enabling monitoring ensures it does not throw while prestarting + // worker thread, which is to be unmonitored and tested after enabling monitoring enabled. + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + // This threaded exit should be allowed as thread is not spawned while monitor is enabled. + future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + } + + @Test + public void testInheritedThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + CheckedThread thread = + new CheckedThread() { + @Override + public void go() { + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } catch (Throwable t) { + fail(); + } + } + }; + thread.start(); + thread.sync(); + } + + @Test + public void testLogUserExit() { + // Log mode enables monitor but only logging allowing exit, hence not expecting exception. + // NOTE - Do not specifically test warning logging. + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testExitBehaviorChanged() { + AtomicInteger exitStatus = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, + status -> exitStatus.set(status)); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(exitStatus.get(), is(TEST_EXIT_CODE)); + } + + @Test + public void testExitBehaviorChangedWithExistingSecurityManager() { + TestExitSecurityManager existingSecurityManager = new TestExitSecurityManager(); + System.setSecurityManager(existingSecurityManager); + AtomicInteger customExitExecuted = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, customExitExecuted::set); + + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(existingSecurityManager.getExitStatus(), is(TEST_EXIT_CODE)); + assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE)); + } + + @Test(expected = UserSystemExitException.class) + public void testExitBehaviorUnchangeOnThrowingUserExit() { Review comment: This test is obsolete as it tests the same thing as `testThrowConfiguration` if I'm not mistaken? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@code FlinkUserSecurityManager}. */ +public class FlinkSecurityManagerTest extends TestLogger { + + private static final int TEST_EXIT_CODE = 123; + SecurityManager originalSecurityManager; + FlinkSecurityManager flinkSecurityManager; + + @Before + public void setUp() { + originalSecurityManager = System.getSecurityManager(); + } + + @After + public void tearDown() { + System.setSecurityManager(originalSecurityManager); + } + + @Test(expected = UserSystemExitException.class) + public void testThrowUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testToggleUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testPerThreadThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + // Async thread test before enabling monitoring ensures it does not throw while prestarting + // worker thread, which is to be unmonitored and tested after enabling monitoring enabled. + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + // This threaded exit should be allowed as thread is not spawned while monitor is enabled. + future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + } + + @Test + public void testInheritedThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + CheckedThread thread = + new CheckedThread() { + @Override + public void go() { + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } catch (Throwable t) { + fail(); + } + } + }; + thread.start(); + thread.sync(); + } + + @Test + public void testLogUserExit() { + // Log mode enables monitor but only logging allowing exit, hence not expecting exception. + // NOTE - Do not specifically test warning logging. + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testExitBehaviorChanged() { Review comment: ```suggestion public void testExitHandlerTriggered() { ``` I'm gonna suggest the name change as "behaviorChanged" felt like it's too general to describe what the test is doing. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@code FlinkUserSecurityManager}. */ +public class FlinkSecurityManagerTest extends TestLogger { + + private static final int TEST_EXIT_CODE = 123; + SecurityManager originalSecurityManager; + FlinkSecurityManager flinkSecurityManager; + + @Before + public void setUp() { + originalSecurityManager = System.getSecurityManager(); + } + + @After + public void tearDown() { + System.setSecurityManager(originalSecurityManager); + } + + @Test(expected = UserSystemExitException.class) + public void testThrowUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testToggleUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testPerThreadThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + // Async thread test before enabling monitoring ensures it does not throw while prestarting + // worker thread, which is to be unmonitored and tested after enabling monitoring enabled. + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + // This threaded exit should be allowed as thread is not spawned while monitor is enabled. + future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + } + + @Test + public void testInheritedThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + CheckedThread thread = + new CheckedThread() { + @Override + public void go() { + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } catch (Throwable t) { + fail(); + } + } + }; + thread.start(); + thread.sync(); + } + + @Test + public void testLogUserExit() { + // Log mode enables monitor but only logging allowing exit, hence not expecting exception. + // NOTE - Do not specifically test warning logging. + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testExitBehaviorChanged() { + AtomicInteger exitStatus = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, + status -> exitStatus.set(status)); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(exitStatus.get(), is(TEST_EXIT_CODE)); + } + + @Test + public void testExitBehaviorChangedWithExistingSecurityManager() { + TestExitSecurityManager existingSecurityManager = new TestExitSecurityManager(); + System.setSecurityManager(existingSecurityManager); + AtomicInteger customExitExecuted = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, customExitExecuted::set); + + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(existingSecurityManager.getExitStatus(), is(TEST_EXIT_CODE)); + assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE)); + } + + @Test(expected = UserSystemExitException.class) + public void testExitBehaviorUnchangeOnThrowingUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail()); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testDisabledConfiguration() { + // Default case (no provided option) - allowing everything, so null security manager is + // expected. + Configuration configuration = new Configuration(); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + + // Disabled case (same as default) + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, + ClusterOptions.UserSystemExitMode.DISABLED); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + + // No halt (same as default) + configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + } + + @Test + public void testLogConfiguration() { + // Enabled - log case (logging as warning but allowing exit) + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.LOG); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.unmonitorUserSystemExit(); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + } + + @Test + public void testThrowConfiguration() { + // Enabled - throw case (disallowing by throwing exception) + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.THROW); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + + // Test for disabled test to check if exit is still allowed (fromConfiguration gives null + // since currently + // there is only one option to have a valid security manager, so test with constructor). + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.DISABLED, null); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testHaltConfiguration() { + // Halt as forceful shutdown replacing graceful system exit + Configuration configuration = new Configuration(); + configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, true); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + } + + @Test(expected = NullPointerException.class) + public void testInvalidConfiguration() { + Configuration configuration = new Configuration(); + configuration.set(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, null); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + } + + @Test + public void testExistingSecurityManagerRespected() { + // Don't set the following security manager directly to system, which makes test hang. + SecurityManager originalSecurityManager = + new SecurityManager() { + @Override + public void checkPermission(Permission perm) { + throw new SecurityException("not allowed"); + } + }; + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, + status -> Assert.fail(), + originalSecurityManager); + + assertThrows( + "not allowed", + SecurityException.class, + () -> { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + return null; + }); + } + + @Test + public void testRegistrationNotAllowedByExistingSecurityManager() { + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.THROW); + + System.setSecurityManager( + new SecurityManager() { + + private boolean fired; + + @Override + public void checkPermission(Permission perm) { + if (!fired && perm.getName().equals("setSecurityManager")) { + try { + throw new SecurityException("not allowed"); + } finally { + // Allow removing this manager again + fired = true; + } + } + } + }); + + assertThrows( + "Could not register security manager", + IllegalConfigurationException.class, + () -> { + FlinkSecurityManager.setFromConfiguration(configuration); + return null; + }); + } + + @Test(expected = UserSystemExitException.class) + public void testMultiSecurityManagersWithSetFirstAndMonitored() { + Configuration configuration = new Configuration(); + + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.THROW); + configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false); + + FlinkSecurityManager.setFromConfiguration(configuration); + + TestExitSecurityManager newSecurityManager = new TestExitSecurityManager(); + System.setSecurityManager(newSecurityManager); + + FlinkSecurityManager.monitorUserSystemExitForCurrentThread(); + newSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(newSecurityManager.getExitStatus(), is(TEST_EXIT_CODE)); + } + + @Test(expected = UserSystemExitException.class) + public void testMultiSecurityManagersWithSetLastAndMonitored() { + Configuration configuration = new Configuration(); + + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.THROW); + configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false); + + TestExitSecurityManager oldSecurityManager = new TestExitSecurityManager(); + System.setSecurityManager(oldSecurityManager); + + FlinkSecurityManager.setFromConfiguration(configuration); + + FlinkSecurityManager.monitorUserSystemExitForCurrentThread(); + System.getSecurityManager().checkExit(TEST_EXIT_CODE); Review comment: Here as well: You would have to catch the `UserSystemExitException` here instead of using JUnit's `expected` feature if you want to check afterwards whether `TestExitSecurityManager` was triggered. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@code FlinkUserSecurityManager}. */ +public class FlinkSecurityManagerTest extends TestLogger { + + private static final int TEST_EXIT_CODE = 123; + SecurityManager originalSecurityManager; + FlinkSecurityManager flinkSecurityManager; + + @Before + public void setUp() { + originalSecurityManager = System.getSecurityManager(); + } + + @After + public void tearDown() { + System.setSecurityManager(originalSecurityManager); + } + + @Test(expected = UserSystemExitException.class) + public void testThrowUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testToggleUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testPerThreadThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + // Async thread test before enabling monitoring ensures it does not throw while prestarting + // worker thread, which is to be unmonitored and tested after enabling monitoring enabled. + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + // This threaded exit should be allowed as thread is not spawned while monitor is enabled. + future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + } + + @Test + public void testInheritedThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + CheckedThread thread = + new CheckedThread() { + @Override + public void go() { + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } catch (Throwable t) { + fail(); + } + } + }; + thread.start(); + thread.sync(); + } + + @Test + public void testLogUserExit() { + // Log mode enables monitor but only logging allowing exit, hence not expecting exception. + // NOTE - Do not specifically test warning logging. + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testExitBehaviorChanged() { + AtomicInteger exitStatus = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, + status -> exitStatus.set(status)); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(exitStatus.get(), is(TEST_EXIT_CODE)); + } + + @Test + public void testExitBehaviorChangedWithExistingSecurityManager() { + TestExitSecurityManager existingSecurityManager = new TestExitSecurityManager(); + System.setSecurityManager(existingSecurityManager); + AtomicInteger customExitExecuted = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, customExitExecuted::set); + + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(existingSecurityManager.getExitStatus(), is(TEST_EXIT_CODE)); + assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE)); + } + + @Test(expected = UserSystemExitException.class) + public void testExitBehaviorUnchangeOnThrowingUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail()); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testDisabledConfiguration() { Review comment: We could split this test case up into three. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@code FlinkUserSecurityManager}. */ +public class FlinkSecurityManagerTest extends TestLogger { + + private static final int TEST_EXIT_CODE = 123; + SecurityManager originalSecurityManager; + FlinkSecurityManager flinkSecurityManager; Review comment: Is there a reason why we have `FlinkSecurityManager` as a member instead of local variables within each test? ########## File path: flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java ########## @@ -88,21 +93,28 @@ .build()); @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER) - public static final ConfigOption<Boolean> HALT_ON_FATAL_ERROR = - key("cluster.processes.halt-on-fatal-error") + public static final ConfigOption<Boolean> HALT_ON_SYSTEM_EXIT = + key("cluster.processes.halt-on-system-exit") Review comment: Thinking about it again, I realize that they behave in a slightly different way as the `System.exit` logic is only called in specific code regions (i.e. user code) whereas the halt logic applies anywhere. @mxm The halt logic was intended to work on the whole code base, wasn't it? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@code FlinkUserSecurityManager}. */ +public class FlinkSecurityManagerTest extends TestLogger { + + private static final int TEST_EXIT_CODE = 123; + SecurityManager originalSecurityManager; + FlinkSecurityManager flinkSecurityManager; + + @Before + public void setUp() { + originalSecurityManager = System.getSecurityManager(); + } + + @After + public void tearDown() { + System.setSecurityManager(originalSecurityManager); + } + + @Test(expected = UserSystemExitException.class) + public void testThrowUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testToggleUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testPerThreadThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + // Async thread test before enabling monitoring ensures it does not throw while prestarting + // worker thread, which is to be unmonitored and tested after enabling monitoring enabled. + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + // This threaded exit should be allowed as thread is not spawned while monitor is enabled. + future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + } + + @Test + public void testInheritedThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + CheckedThread thread = + new CheckedThread() { + @Override + public void go() { + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } catch (Throwable t) { + fail(); + } + } + }; + thread.start(); + thread.sync(); + } + + @Test + public void testLogUserExit() { + // Log mode enables monitor but only logging allowing exit, hence not expecting exception. + // NOTE - Do not specifically test warning logging. + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testExitBehaviorChanged() { + AtomicInteger exitStatus = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, + status -> exitStatus.set(status)); Review comment: ```suggestion status -> exitStatus::set); ``` ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@code FlinkUserSecurityManager}. */ +public class FlinkSecurityManagerTest extends TestLogger { + + private static final int TEST_EXIT_CODE = 123; + SecurityManager originalSecurityManager; + FlinkSecurityManager flinkSecurityManager; + + @Before + public void setUp() { + originalSecurityManager = System.getSecurityManager(); + } + + @After + public void tearDown() { + System.setSecurityManager(originalSecurityManager); + } + + @Test(expected = UserSystemExitException.class) + public void testThrowUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testToggleUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testPerThreadThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + // Async thread test before enabling monitoring ensures it does not throw while prestarting + // worker thread, which is to be unmonitored and tested after enabling monitoring enabled. + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + // This threaded exit should be allowed as thread is not spawned while monitor is enabled. + future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + } + + @Test + public void testInheritedThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + CheckedThread thread = + new CheckedThread() { + @Override + public void go() { + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } catch (Throwable t) { + fail(); + } + } + }; + thread.start(); + thread.sync(); + } + + @Test + public void testLogUserExit() { + // Log mode enables monitor but only logging allowing exit, hence not expecting exception. + // NOTE - Do not specifically test warning logging. + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testExitBehaviorChanged() { + AtomicInteger exitStatus = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, + status -> exitStatus.set(status)); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(exitStatus.get(), is(TEST_EXIT_CODE)); + } + + @Test + public void testExitBehaviorChangedWithExistingSecurityManager() { + TestExitSecurityManager existingSecurityManager = new TestExitSecurityManager(); + System.setSecurityManager(existingSecurityManager); + AtomicInteger customExitExecuted = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, customExitExecuted::set); + + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(existingSecurityManager.getExitStatus(), is(TEST_EXIT_CODE)); + assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE)); + } + + @Test(expected = UserSystemExitException.class) + public void testExitBehaviorUnchangeOnThrowingUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail()); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testDisabledConfiguration() { + // Default case (no provided option) - allowing everything, so null security manager is + // expected. + Configuration configuration = new Configuration(); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + + // Disabled case (same as default) + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, + ClusterOptions.UserSystemExitMode.DISABLED); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + + // No halt (same as default) + configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNull(flinkSecurityManager); + } + + @Test + public void testLogConfiguration() { + // Enabled - log case (logging as warning but allowing exit) + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.LOG); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.unmonitorUserSystemExit(); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + } + + @Test + public void testThrowConfiguration() { + // Enabled - throw case (disallowing by throwing exception) + Configuration configuration = new Configuration(); + configuration.set( + ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, ClusterOptions.UserSystemExitMode.THROW); + flinkSecurityManager = FlinkSecurityManager.fromConfiguration(configuration); + assertNotNull(flinkSecurityManager); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + flinkSecurityManager.monitorUserSystemExit(); + assertTrue(flinkSecurityManager.userSystemExitMonitored()); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + assertFalse(flinkSecurityManager.userSystemExitMonitored()); + + // Test for disabled test to check if exit is still allowed (fromConfiguration gives null + // since currently + // there is only one option to have a valid security manager, so test with constructor). + flinkSecurityManager = Review comment: This should be a separated test. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@code FlinkUserSecurityManager}. */ +public class FlinkSecurityManagerTest extends TestLogger { + + private static final int TEST_EXIT_CODE = 123; + SecurityManager originalSecurityManager; + FlinkSecurityManager flinkSecurityManager; + + @Before + public void setUp() { + originalSecurityManager = System.getSecurityManager(); + } + + @After + public void tearDown() { + System.setSecurityManager(originalSecurityManager); + } + + @Test(expected = UserSystemExitException.class) + public void testThrowUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testToggleUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testPerThreadThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + // Async thread test before enabling monitoring ensures it does not throw while prestarting + // worker thread, which is to be unmonitored and tested after enabling monitoring enabled. + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + // This threaded exit should be allowed as thread is not spawned while monitor is enabled. + future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + } + + @Test + public void testInheritedThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + CheckedThread thread = + new CheckedThread() { + @Override + public void go() { + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } catch (Throwable t) { + fail(); + } + } + }; + thread.start(); + thread.sync(); + } + + @Test + public void testLogUserExit() { Review comment: `testLogConfiguration` covers the same scenario - we could delete that test in my opinion. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java ########## @@ -0,0 +1,415 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.security; + +import org.apache.flink.configuration.ClusterOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.UserSystemExitException; +import org.apache.flink.util.TestLogger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.security.Permission; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Tests for {@code FlinkUserSecurityManager}. */ +public class FlinkSecurityManagerTest extends TestLogger { + + private static final int TEST_EXIT_CODE = 123; + SecurityManager originalSecurityManager; + FlinkSecurityManager flinkSecurityManager; + + @Before + public void setUp() { + originalSecurityManager = System.getSecurityManager(); + } + + @After + public void tearDown() { + System.setSecurityManager(originalSecurityManager); + } + + @Test(expected = UserSystemExitException.class) + public void testThrowUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testToggleUserExit() { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + flinkSecurityManager.unmonitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testPerThreadThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + ExecutorService executorService = Executors.newSingleThreadExecutor(); + // Async thread test before enabling monitoring ensures it does not throw while prestarting + // worker thread, which is to be unmonitored and tested after enabling monitoring enabled. + CompletableFuture<Void> future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + // This threaded exit should be allowed as thread is not spawned while monitor is enabled. + future = + CompletableFuture.runAsync( + () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), executorService); + future.get(); + } + + @Test + public void testInheritedThrowUserExit() throws Exception { + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null); + flinkSecurityManager.monitorUserSystemExit(); + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } + CheckedThread thread = + new CheckedThread() { + @Override + public void go() { + try { + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + fail(); + } catch (UserSystemExitException ignored) { + } catch (Throwable t) { + fail(); + } + } + }; + thread.start(); + thread.sync(); + } + + @Test + public void testLogUserExit() { + // Log mode enables monitor but only logging allowing exit, hence not expecting exception. + // NOTE - Do not specifically test warning logging. + flinkSecurityManager = + new FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null); + flinkSecurityManager.monitorUserSystemExit(); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + } + + @Test + public void testExitBehaviorChanged() { + AtomicInteger exitStatus = new AtomicInteger(0); + flinkSecurityManager = + new FlinkSecurityManager( + ClusterOptions.UserSystemExitMode.DISABLED, + status -> exitStatus.set(status)); + flinkSecurityManager.checkExit(TEST_EXIT_CODE); + assertThat(exitStatus.get(), is(TEST_EXIT_CODE)); + } + + @Test + public void testExitBehaviorChangedWithExistingSecurityManager() { Review comment: ```suggestion public void testExitHandlerTriggeredWithExistingSecurityManager() { ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org