XComp commented on a change in pull request #14499:
URL: https://github.com/apache/flink/pull/14499#discussion_r555570200



##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -115,4 +127,35 @@ public static boolean 
isDeclarativeResourceManagementEnabled(Configuration confi
         return configuration.get(ENABLE_DECLARATIVE_RESOURCE_MANAGEMENT)
                 && 
!System.getProperties().containsKey("flink.tests.disable-declarative");
     }
+
+    /** The mode of how to handle user code attempting to exit JVM. */
+    public enum UserSystemExitMode {
+        DISABLED("No check is enabled, that is allowing exit without any 
action"),

Review comment:
       ```suggestion
           DISABLED("No check is enabled. Hence, exits are allowed without any 
action being triggered."),
   ```
   I feel like the current version of the description might be ambiguous: 
Firstly, I read it like "there is no check which allows exiting without any 
action". I suggest the version above.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/FlinkSecurityManager.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.security.Permission;
+import java.util.function.Consumer;
+
+/**
+ * {@code FlinkSecurityManager} to control certain behaviors that can be 
captured by Java system
+ * security manager. It can be used to control unexpected user behaviors that 
potentially impact
+ * cluster availability, for example, it can warn or prevent user code from 
terminating JVM by
+ * System.exit or halt by logging or throwing an exception. This does not 
necessarily prevent
+ * malicious users who try to tweak security manager on their own, but more 
for being dependable
+ * against user mistakes by gracefully handling them informing users rather 
than causing silent
+ * unavailability.
+ */
+public class FlinkSecurityManager extends SecurityManager {
+
+    static final Logger LOG = 
LoggerFactory.getLogger(FlinkSecurityManager.class);
+
+    /**
+     * Security manager reference lastly set to system's security manager by 
public API. As system
+     * security manager can be reset with another but still chain-called into 
this manager properly,
+     * this reference may not be referenced by System.getSecurityManager, but 
we still need to
+     * control runtime check behaviors such as monitoring exit from user code.
+     */
+    private static FlinkSecurityManager flinkSecurityManager;
+
+    private final SecurityManager originalSecurityManager;
+    private final ThreadLocal<Boolean> monitorUserSystemExit = new 
InheritableThreadLocal<>();
+    private final ClusterOptions.UserSystemExitMode userSystemExitMode;
+
+    /** The behavior to execute when the JVM exists. */
+    private final Consumer<Integer> onExitBehavior;
+
+    @VisibleForTesting
+    FlinkSecurityManager(
+            ClusterOptions.UserSystemExitMode userSystemExitMode,
+            @Nullable Consumer<Integer> onExitBehavior) {
+        this(userSystemExitMode, onExitBehavior, System.getSecurityManager());
+    }
+
+    @VisibleForTesting
+    FlinkSecurityManager(
+            ClusterOptions.UserSystemExitMode userSystemExitMode,
+            @Nullable Consumer<Integer> onExitBehavior,
+            SecurityManager originalSecurityManager) {
+        this.userSystemExitMode = 
Preconditions.checkNotNull(userSystemExitMode);
+        this.onExitBehavior = onExitBehavior;
+        this.originalSecurityManager = originalSecurityManager;
+    }
+
+    /**
+     * Instantiate FlinkUserSecurityManager from configuration. Return null if 
no security manager
+     * check is needed, so that a caller can skip setting security manager 
avoiding runtime check
+     * cost, if there is no security check set up already. Use {@link 
#setFromConfiguration} helper,
+     * which handles disabled case.
+     *
+     * @param configuration to instantiate the security manager from
+     * @return FlinkUserSecurityManager instantiated based on configuration. 
Return null if
+     *     disabled.
+     */
+    @VisibleForTesting
+    static FlinkSecurityManager fromConfiguration(Configuration configuration) 
{
+        final ClusterOptions.UserSystemExitMode userSystemExitMode =
+                configuration.get(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT);
+
+        boolean haltOnSystemExit = 
configuration.get(ClusterOptions.HALT_ON_SYSTEM_EXIT);
+
+        // If no check is needed, return null so that caller can avoid setting 
security manager not
+        // to incur any runtime cost.
+        if (userSystemExitMode == ClusterOptions.UserSystemExitMode.DISABLED 
&& !haltOnSystemExit) {
+            return null;
+        }
+        Consumer<Integer> onExitBehavior = null;
+        // If halt on system exit is configured, registers a custom 
SecurityManager which converts
+        // graceful exists calls using {@code System#exit} into forceful exit 
calls using
+        // {@code Runtime#halt}. The latter does not perform a clean shutdown 
using the registered
+        // shutdown hooks. This may be configured to prevent deadlocks with 
Java 8 and the G1
+        // garbage collection, see 
https://issues.apache.org/jira/browse/FLINK-16510.
+        if (haltOnSystemExit) {
+            onExitBehavior = status -> Runtime.getRuntime().halt(status);
+        }
+        LOG.info(
+                "FlinkSecurityManager is created with {} user system exit mode 
and {} exit",
+                userSystemExitMode,
+                haltOnSystemExit ? "forceful" : "graceful");
+        // Add more configuration parameters that need user security manager 
(currently only for
+        // system exit).
+        return new FlinkSecurityManager(userSystemExitMode, onExitBehavior);
+    }
+
+    public static void setFromConfiguration(Configuration configuration) {
+        final FlinkSecurityManager flinkSecurityManager =
+                FlinkSecurityManager.fromConfiguration(configuration);
+        if (flinkSecurityManager != null) {
+            try {
+                System.setSecurityManager(flinkSecurityManager);
+            } catch (Exception e) {
+                throw new IllegalConfigurationException(
+                        String.format(
+                                "Could not register security manager due to no 
permission to "
+                                        + "set a SecurityManager. Either 
update your existing "
+                                        + "SecurityManager to allow the 
permission or not using "
+                                        + "security manager features (e.g., 
'%s: %s', '%s: %s')",
+                                
ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT.key(),
+                                
ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT.defaultValue(),
+                                ClusterOptions.HALT_ON_SYSTEM_EXIT.key(),
+                                
ClusterOptions.HALT_ON_SYSTEM_EXIT.defaultValue(),
+                                e));
+            }
+        }
+        FlinkSecurityManager.flinkSecurityManager = flinkSecurityManager;
+    }
+
+    public static void monitorUserSystemExitForCurrentThread() {
+        if (FlinkSecurityManager.flinkSecurityManager != null) {
+            FlinkSecurityManager.flinkSecurityManager.monitorUserSystemExit();
+        }
+    }
+
+    public static void unmonitorUserSystemExitForCurrentThread() {
+        if (FlinkSecurityManager.flinkSecurityManager != null) {
+            
FlinkSecurityManager.flinkSecurityManager.unmonitorUserSystemExit();
+        }
+    }
+
+    @Override
+    public void checkPermission(Permission perm) {
+        if (originalSecurityManager != null) {
+            originalSecurityManager.checkPermission(perm);
+        }
+    }
+
+    @Override
+    public void checkPermission(Permission perm, Object context) {
+        if (originalSecurityManager != null) {
+            originalSecurityManager.checkPermission(perm, context);
+        }
+    }
+
+    @Override
+    public void checkExit(int status) {
+        if (userSystemExitMonitored()) {
+            switch (userSystemExitMode) {
+                case DISABLED:
+                    break;
+                case LOG:
+                    // Add exception trace log to help users to debug where 
exit came from.
+                    LOG.warn(
+                            "Exiting JVM with status {} is monitored, logging 
and exiting",
+                            status,
+                            new UserSystemExitException());

Review comment:
       Do we need to add this exception here? It does not add any value, 
doesn't it?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+    private static final int TEST_EXIT_CODE = 123;
+    SecurityManager originalSecurityManager;
+    FlinkSecurityManager flinkSecurityManager;
+
+    @Before
+    public void setUp() {
+        originalSecurityManager = System.getSecurityManager();
+    }
+
+    @After
+    public void tearDown() {
+        System.setSecurityManager(originalSecurityManager);
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testThrowUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testToggleUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testPerThreadThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        // Async thread test before enabling monitoring ensures it does not 
throw while prestarting
+        // worker thread, which is to be unmonitored and tested after enabling 
monitoring enabled.
+        CompletableFuture<Void> future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        // This threaded exit should be allowed as thread is not spawned while 
monitor is enabled.
+        future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+    }
+
+    @Test
+    public void testInheritedThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        CheckedThread thread =
+                new CheckedThread() {
+                    @Override
+                    public void go() {
+                        try {
+                            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                            fail();
+                        } catch (UserSystemExitException ignored) {
+                        } catch (Throwable t) {
+                            fail();
+                        }
+                    }
+                };
+        thread.start();
+        thread.sync();
+    }
+
+    @Test
+    public void testLogUserExit() {
+        // Log mode enables monitor but only logging allowing exit, hence not 
expecting exception.
+        // NOTE - Do not specifically test warning logging.
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testExitBehaviorChanged() {
+        AtomicInteger exitStatus = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED,
+                        status -> exitStatus.set(status));
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test
+    public void testExitBehaviorChangedWithExistingSecurityManager() {
+        TestExitSecurityManager existingSecurityManager = new 
TestExitSecurityManager();
+        System.setSecurityManager(existingSecurityManager);
+        AtomicInteger customExitExecuted = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED, 
customExitExecuted::set);
+
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(existingSecurityManager.getExitStatus(), 
is(TEST_EXIT_CODE));
+        assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testExitBehaviorUnchangeOnThrowingUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail());
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testDisabledConfiguration() {
+        // Default case (no provided option) - allowing everything, so null 
security manager is
+        // expected.
+        Configuration configuration = new Configuration();
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+
+        // Disabled case (same as default)
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
+                ClusterOptions.UserSystemExitMode.DISABLED);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+
+        // No halt (same as default)
+        configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+    }
+
+    @Test
+    public void testLogConfiguration() {
+        // Enabled - log case (logging as warning but allowing exit)
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.LOG);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.unmonitorUserSystemExit();
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+    }
+
+    @Test
+    public void testThrowConfiguration() {
+        // Enabled - throw case (disallowing by throwing exception)
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.THROW);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+
+        // Test for disabled test to check if exit is still allowed 
(fromConfiguration gives null
+        // since currently
+        // there is only one option to have a valid security manager, so test 
with constructor).
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.DISABLED, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testHaltConfiguration() {
+        // Halt as forceful shutdown replacing graceful system exit
+        Configuration configuration = new Configuration();
+        configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, true);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testInvalidConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, null);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+    }
+
+    @Test
+    public void testExistingSecurityManagerRespected() {
+        // Don't set the following security manager directly to system, which 
makes test hang.
+        SecurityManager originalSecurityManager =
+                new SecurityManager() {
+                    @Override
+                    public void checkPermission(Permission perm) {
+                        throw new SecurityException("not allowed");
+                    }
+                };
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED,
+                        status -> Assert.fail(),
+                        originalSecurityManager);
+
+        assertThrows(
+                "not allowed",
+                SecurityException.class,
+                () -> {
+                    flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                    return null;
+                });
+    }
+
+    @Test
+    public void testRegistrationNotAllowedByExistingSecurityManager() {
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.THROW);
+
+        System.setSecurityManager(
+                new SecurityManager() {
+
+                    private boolean fired;
+
+                    @Override
+                    public void checkPermission(Permission perm) {
+                        if (!fired && 
perm.getName().equals("setSecurityManager")) {
+                            try {
+                                throw new SecurityException("not allowed");
+                            } finally {
+                                // Allow removing this manager again
+                                fired = true;
+                            }
+                        }
+                    }
+                });
+
+        assertThrows(
+                "Could not register security manager",
+                IllegalConfigurationException.class,
+                () -> {
+                    FlinkSecurityManager.setFromConfiguration(configuration);
+                    return null;
+                });
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testMultiSecurityManagersWithSetFirstAndMonitored() {
+        Configuration configuration = new Configuration();
+
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.THROW);
+        configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+
+        FlinkSecurityManager.setFromConfiguration(configuration);
+
+        TestExitSecurityManager newSecurityManager = new 
TestExitSecurityManager();
+        System.setSecurityManager(newSecurityManager);
+
+        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
+        newSecurityManager.checkExit(TEST_EXIT_CODE);

Review comment:
       You would have to catch the `UserSystemExitException` here instead of 
using JUnit's `expected` feature if you want to check afterwards whether 
`TestExitSecurityManager` was triggered.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+    private static final int TEST_EXIT_CODE = 123;
+    SecurityManager originalSecurityManager;
+    FlinkSecurityManager flinkSecurityManager;
+
+    @Before
+    public void setUp() {
+        originalSecurityManager = System.getSecurityManager();
+    }
+
+    @After
+    public void tearDown() {
+        System.setSecurityManager(originalSecurityManager);
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testThrowUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testToggleUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testPerThreadThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        // Async thread test before enabling monitoring ensures it does not 
throw while prestarting
+        // worker thread, which is to be unmonitored and tested after enabling 
monitoring enabled.
+        CompletableFuture<Void> future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        // This threaded exit should be allowed as thread is not spawned while 
monitor is enabled.
+        future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+    }
+
+    @Test
+    public void testInheritedThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        CheckedThread thread =
+                new CheckedThread() {
+                    @Override
+                    public void go() {
+                        try {
+                            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                            fail();
+                        } catch (UserSystemExitException ignored) {
+                        } catch (Throwable t) {
+                            fail();
+                        }
+                    }
+                };
+        thread.start();
+        thread.sync();
+    }
+
+    @Test
+    public void testLogUserExit() {
+        // Log mode enables monitor but only logging allowing exit, hence not 
expecting exception.
+        // NOTE - Do not specifically test warning logging.
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testExitBehaviorChanged() {
+        AtomicInteger exitStatus = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED,
+                        status -> exitStatus.set(status));
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test
+    public void testExitBehaviorChangedWithExistingSecurityManager() {
+        TestExitSecurityManager existingSecurityManager = new 
TestExitSecurityManager();
+        System.setSecurityManager(existingSecurityManager);
+        AtomicInteger customExitExecuted = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED, 
customExitExecuted::set);
+
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(existingSecurityManager.getExitStatus(), 
is(TEST_EXIT_CODE));
+        assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testExitBehaviorUnchangeOnThrowingUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail());
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testDisabledConfiguration() {
+        // Default case (no provided option) - allowing everything, so null 
security manager is
+        // expected.
+        Configuration configuration = new Configuration();
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+
+        // Disabled case (same as default)
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
+                ClusterOptions.UserSystemExitMode.DISABLED);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+
+        // No halt (same as default)
+        configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+    }
+
+    @Test
+    public void testLogConfiguration() {
+        // Enabled - log case (logging as warning but allowing exit)
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.LOG);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.unmonitorUserSystemExit();
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+    }
+
+    @Test
+    public void testThrowConfiguration() {
+        // Enabled - throw case (disallowing by throwing exception)
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.THROW);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+
+        // Test for disabled test to check if exit is still allowed 
(fromConfiguration gives null
+        // since currently
+        // there is only one option to have a valid security manager, so test 
with constructor).
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.DISABLED, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testHaltConfiguration() {
+        // Halt as forceful shutdown replacing graceful system exit
+        Configuration configuration = new Configuration();
+        configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, true);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testInvalidConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, null);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+    }
+
+    @Test
+    public void testExistingSecurityManagerRespected() {
+        // Don't set the following security manager directly to system, which 
makes test hang.
+        SecurityManager originalSecurityManager =
+                new SecurityManager() {
+                    @Override
+                    public void checkPermission(Permission perm) {
+                        throw new SecurityException("not allowed");
+                    }
+                };
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED,
+                        status -> Assert.fail(),
+                        originalSecurityManager);
+
+        assertThrows(
+                "not allowed",
+                SecurityException.class,
+                () -> {
+                    flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                    return null;
+                });
+    }
+
+    @Test
+    public void testRegistrationNotAllowedByExistingSecurityManager() {
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.THROW);
+
+        System.setSecurityManager(
+                new SecurityManager() {

Review comment:
       You could add this feature to the testing implementation of 
`SecurityManager` instead of using an anonymous class. See 
[TestingDispatcherRunner](https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/runner/TestingDispatcherRunner.java)
 as an example `Testing*` implementation.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/FlinkSecurityManager.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.security.Permission;
+import java.util.function.Consumer;
+
+/**
+ * {@code FlinkSecurityManager} to control certain behaviors that can be 
captured by Java system
+ * security manager. It can be used to control unexpected user behaviors that 
potentially impact
+ * cluster availability, for example, it can warn or prevent user code from 
terminating JVM by
+ * System.exit or halt by logging or throwing an exception. This does not 
necessarily prevent
+ * malicious users who try to tweak security manager on their own, but more 
for being dependable
+ * against user mistakes by gracefully handling them informing users rather 
than causing silent
+ * unavailability.
+ */
+public class FlinkSecurityManager extends SecurityManager {
+
+    static final Logger LOG = 
LoggerFactory.getLogger(FlinkSecurityManager.class);
+
+    /**
+     * Security manager reference lastly set to system's security manager by 
public API. As system
+     * security manager can be reset with another but still chain-called into 
this manager properly,
+     * this reference may not be referenced by System.getSecurityManager, but 
we still need to
+     * control runtime check behaviors such as monitoring exit from user code.
+     */
+    private static FlinkSecurityManager flinkSecurityManager;
+
+    private final SecurityManager originalSecurityManager;
+    private final ThreadLocal<Boolean> monitorUserSystemExit = new 
InheritableThreadLocal<>();
+    private final ClusterOptions.UserSystemExitMode userSystemExitMode;
+
+    /** The behavior to execute when the JVM exists. */
+    private final Consumer<Integer> onExitBehavior;
+
+    @VisibleForTesting
+    FlinkSecurityManager(
+            ClusterOptions.UserSystemExitMode userSystemExitMode,
+            @Nullable Consumer<Integer> onExitBehavior) {
+        this(userSystemExitMode, onExitBehavior, System.getSecurityManager());
+    }
+
+    @VisibleForTesting
+    FlinkSecurityManager(
+            ClusterOptions.UserSystemExitMode userSystemExitMode,
+            @Nullable Consumer<Integer> onExitBehavior,
+            SecurityManager originalSecurityManager) {
+        this.userSystemExitMode = 
Preconditions.checkNotNull(userSystemExitMode);
+        this.onExitBehavior = onExitBehavior;
+        this.originalSecurityManager = originalSecurityManager;
+    }
+
+    /**
+     * Instantiate FlinkUserSecurityManager from configuration. Return null if 
no security manager
+     * check is needed, so that a caller can skip setting security manager 
avoiding runtime check
+     * cost, if there is no security check set up already. Use {@link 
#setFromConfiguration} helper,
+     * which handles disabled case.
+     *
+     * @param configuration to instantiate the security manager from
+     * @return FlinkUserSecurityManager instantiated based on configuration. 
Return null if
+     *     disabled.
+     */
+    @VisibleForTesting
+    static FlinkSecurityManager fromConfiguration(Configuration configuration) 
{
+        final ClusterOptions.UserSystemExitMode userSystemExitMode =
+                configuration.get(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT);
+
+        boolean haltOnSystemExit = 
configuration.get(ClusterOptions.HALT_ON_SYSTEM_EXIT);
+
+        // If no check is needed, return null so that caller can avoid setting 
security manager not
+        // to incur any runtime cost.
+        if (userSystemExitMode == ClusterOptions.UserSystemExitMode.DISABLED 
&& !haltOnSystemExit) {
+            return null;
+        }
+        Consumer<Integer> onExitBehavior = null;
+        // If halt on system exit is configured, registers a custom 
SecurityManager which converts
+        // graceful exists calls using {@code System#exit} into forceful exit 
calls using
+        // {@code Runtime#halt}. The latter does not perform a clean shutdown 
using the registered
+        // shutdown hooks. This may be configured to prevent deadlocks with 
Java 8 and the G1
+        // garbage collection, see 
https://issues.apache.org/jira/browse/FLINK-16510.
+        if (haltOnSystemExit) {
+            onExitBehavior = status -> Runtime.getRuntime().halt(status);
+        }
+        LOG.info(
+                "FlinkSecurityManager is created with {} user system exit mode 
and {} exit",
+                userSystemExitMode,
+                haltOnSystemExit ? "forceful" : "graceful");
+        // Add more configuration parameters that need user security manager 
(currently only for
+        // system exit).
+        return new FlinkSecurityManager(userSystemExitMode, onExitBehavior);
+    }
+
+    public static void setFromConfiguration(Configuration configuration) {
+        final FlinkSecurityManager flinkSecurityManager =
+                FlinkSecurityManager.fromConfiguration(configuration);
+        if (flinkSecurityManager != null) {
+            try {
+                System.setSecurityManager(flinkSecurityManager);
+            } catch (Exception e) {
+                throw new IllegalConfigurationException(
+                        String.format(
+                                "Could not register security manager due to no 
permission to "
+                                        + "set a SecurityManager. Either 
update your existing "
+                                        + "SecurityManager to allow the 
permission or not using "

Review comment:
       ```suggestion
                                           + "SecurityManager to allow the 
permission or do not use "
   ```

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -88,21 +93,28 @@
                                     .build());
 
     @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
-    public static final ConfigOption<Boolean> HALT_ON_FATAL_ERROR =
-            key("cluster.processes.halt-on-fatal-error")
+    public static final ConfigOption<Boolean> HALT_ON_SYSTEM_EXIT =
+            key("cluster.processes.halt-on-system-exit")

Review comment:
       Renaming an existing parameter has a bigger impact on users as they 
possibly have to change their existing configuration files when upgrading. 
Having said that, I would be ok with that change if we decide to merge both 
parameters into one as they are more or less just defining some special exit 
behavior. What do you think?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/FlinkSecurityManager.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.security.Permission;
+import java.util.function.Consumer;
+
+/**
+ * {@code FlinkSecurityManager} to control certain behaviors that can be 
captured by Java system
+ * security manager. It can be used to control unexpected user behaviors that 
potentially impact
+ * cluster availability, for example, it can warn or prevent user code from 
terminating JVM by
+ * System.exit or halt by logging or throwing an exception. This does not 
necessarily prevent
+ * malicious users who try to tweak security manager on their own, but more 
for being dependable
+ * against user mistakes by gracefully handling them informing users rather 
than causing silent
+ * unavailability.
+ */
+public class FlinkSecurityManager extends SecurityManager {
+
+    static final Logger LOG = 
LoggerFactory.getLogger(FlinkSecurityManager.class);
+
+    /**
+     * Security manager reference lastly set to system's security manager by 
public API. As system
+     * security manager can be reset with another but still chain-called into 
this manager properly,
+     * this reference may not be referenced by System.getSecurityManager, but 
we still need to
+     * control runtime check behaviors such as monitoring exit from user code.
+     */
+    private static FlinkSecurityManager flinkSecurityManager;
+
+    private final SecurityManager originalSecurityManager;
+    private final ThreadLocal<Boolean> monitorUserSystemExit = new 
InheritableThreadLocal<>();
+    private final ClusterOptions.UserSystemExitMode userSystemExitMode;
+
+    /** The behavior to execute when the JVM exists. */
+    private final Consumer<Integer> onExitBehavior;
+
+    @VisibleForTesting
+    FlinkSecurityManager(
+            ClusterOptions.UserSystemExitMode userSystemExitMode,
+            @Nullable Consumer<Integer> onExitBehavior) {
+        this(userSystemExitMode, onExitBehavior, System.getSecurityManager());
+    }
+
+    @VisibleForTesting
+    FlinkSecurityManager(
+            ClusterOptions.UserSystemExitMode userSystemExitMode,
+            @Nullable Consumer<Integer> onExitBehavior,
+            SecurityManager originalSecurityManager) {
+        this.userSystemExitMode = 
Preconditions.checkNotNull(userSystemExitMode);
+        this.onExitBehavior = onExitBehavior;
+        this.originalSecurityManager = originalSecurityManager;
+    }
+
+    /**
+     * Instantiate FlinkUserSecurityManager from configuration. Return null if 
no security manager
+     * check is needed, so that a caller can skip setting security manager 
avoiding runtime check
+     * cost, if there is no security check set up already. Use {@link 
#setFromConfiguration} helper,
+     * which handles disabled case.
+     *
+     * @param configuration to instantiate the security manager from
+     * @return FlinkUserSecurityManager instantiated based on configuration. 
Return null if
+     *     disabled.
+     */
+    @VisibleForTesting
+    static FlinkSecurityManager fromConfiguration(Configuration configuration) 
{
+        final ClusterOptions.UserSystemExitMode userSystemExitMode =
+                configuration.get(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT);
+
+        boolean haltOnSystemExit = 
configuration.get(ClusterOptions.HALT_ON_SYSTEM_EXIT);
+
+        // If no check is needed, return null so that caller can avoid setting 
security manager not
+        // to incur any runtime cost.
+        if (userSystemExitMode == ClusterOptions.UserSystemExitMode.DISABLED 
&& !haltOnSystemExit) {
+            return null;
+        }
+        Consumer<Integer> onExitBehavior = null;
+        // If halt on system exit is configured, registers a custom 
SecurityManager which converts
+        // graceful exists calls using {@code System#exit} into forceful exit 
calls using
+        // {@code Runtime#halt}. The latter does not perform a clean shutdown 
using the registered
+        // shutdown hooks. This may be configured to prevent deadlocks with 
Java 8 and the G1
+        // garbage collection, see 
https://issues.apache.org/jira/browse/FLINK-16510.
+        if (haltOnSystemExit) {
+            onExitBehavior = status -> Runtime.getRuntime().halt(status);
+        }
+        LOG.info(
+                "FlinkSecurityManager is created with {} user system exit mode 
and {} exit",
+                userSystemExitMode,
+                haltOnSystemExit ? "forceful" : "graceful");
+        // Add more configuration parameters that need user security manager 
(currently only for
+        // system exit).
+        return new FlinkSecurityManager(userSystemExitMode, onExitBehavior);
+    }
+
+    public static void setFromConfiguration(Configuration configuration) {
+        final FlinkSecurityManager flinkSecurityManager =
+                FlinkSecurityManager.fromConfiguration(configuration);
+        if (flinkSecurityManager != null) {
+            try {
+                System.setSecurityManager(flinkSecurityManager);
+            } catch (Exception e) {
+                throw new IllegalConfigurationException(
+                        String.format(
+                                "Could not register security manager due to no 
permission to "
+                                        + "set a SecurityManager. Either 
update your existing "
+                                        + "SecurityManager to allow the 
permission or not using "
+                                        + "security manager features (e.g., 
'%s: %s', '%s: %s')",
+                                
ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT.key(),
+                                
ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT.defaultValue(),
+                                ClusterOptions.HALT_ON_SYSTEM_EXIT.key(),
+                                
ClusterOptions.HALT_ON_SYSTEM_EXIT.defaultValue(),
+                                e));
+            }
+        }
+        FlinkSecurityManager.flinkSecurityManager = flinkSecurityManager;
+    }
+
+    public static void monitorUserSystemExitForCurrentThread() {
+        if (FlinkSecurityManager.flinkSecurityManager != null) {
+            FlinkSecurityManager.flinkSecurityManager.monitorUserSystemExit();
+        }
+    }
+
+    public static void unmonitorUserSystemExitForCurrentThread() {
+        if (FlinkSecurityManager.flinkSecurityManager != null) {
+            
FlinkSecurityManager.flinkSecurityManager.unmonitorUserSystemExit();
+        }
+    }
+
+    @Override
+    public void checkPermission(Permission perm) {
+        if (originalSecurityManager != null) {
+            originalSecurityManager.checkPermission(perm);
+        }
+    }
+
+    @Override
+    public void checkPermission(Permission perm, Object context) {
+        if (originalSecurityManager != null) {
+            originalSecurityManager.checkPermission(perm, context);
+        }
+    }
+
+    @Override
+    public void checkExit(int status) {
+        if (userSystemExitMonitored()) {
+            switch (userSystemExitMode) {
+                case DISABLED:
+                    break;
+                case LOG:
+                    // Add exception trace log to help users to debug where 
exit came from.
+                    LOG.warn(
+                            "Exiting JVM with status {} is monitored, logging 
and exiting",

Review comment:
       ```suggestion
                               "Exiting JVM with status {} is monitored: The 
system will exit due to this call.",
   ```
   I suggest a slight modification here...

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+    private static final int TEST_EXIT_CODE = 123;
+    SecurityManager originalSecurityManager;
+    FlinkSecurityManager flinkSecurityManager;
+
+    @Before
+    public void setUp() {
+        originalSecurityManager = System.getSecurityManager();
+    }
+
+    @After
+    public void tearDown() {
+        System.setSecurityManager(originalSecurityManager);
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testThrowUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testToggleUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testPerThreadThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        // Async thread test before enabling monitoring ensures it does not 
throw while prestarting
+        // worker thread, which is to be unmonitored and tested after enabling 
monitoring enabled.
+        CompletableFuture<Void> future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        // This threaded exit should be allowed as thread is not spawned while 
monitor is enabled.
+        future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+    }
+
+    @Test
+    public void testInheritedThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        CheckedThread thread =
+                new CheckedThread() {
+                    @Override
+                    public void go() {
+                        try {
+                            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                            fail();
+                        } catch (UserSystemExitException ignored) {
+                        } catch (Throwable t) {
+                            fail();
+                        }
+                    }
+                };
+        thread.start();
+        thread.sync();
+    }
+
+    @Test
+    public void testLogUserExit() {
+        // Log mode enables monitor but only logging allowing exit, hence not 
expecting exception.
+        // NOTE - Do not specifically test warning logging.
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testExitBehaviorChanged() {
+        AtomicInteger exitStatus = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED,
+                        status -> exitStatus.set(status));
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test
+    public void testExitBehaviorChangedWithExistingSecurityManager() {
+        TestExitSecurityManager existingSecurityManager = new 
TestExitSecurityManager();
+        System.setSecurityManager(existingSecurityManager);
+        AtomicInteger customExitExecuted = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED, 
customExitExecuted::set);
+
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(existingSecurityManager.getExitStatus(), 
is(TEST_EXIT_CODE));
+        assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testExitBehaviorUnchangeOnThrowingUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail());
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testDisabledConfiguration() {
+        // Default case (no provided option) - allowing everything, so null 
security manager is
+        // expected.
+        Configuration configuration = new Configuration();
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+
+        // Disabled case (same as default)
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
+                ClusterOptions.UserSystemExitMode.DISABLED);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+
+        // No halt (same as default)
+        configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+    }
+
+    @Test
+    public void testLogConfiguration() {
+        // Enabled - log case (logging as warning but allowing exit)
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.LOG);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.unmonitorUserSystemExit();
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+    }
+
+    @Test
+    public void testThrowConfiguration() {
+        // Enabled - throw case (disallowing by throwing exception)
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.THROW);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+
+        // Test for disabled test to check if exit is still allowed 
(fromConfiguration gives null
+        // since currently
+        // there is only one option to have a valid security manager, so test 
with constructor).
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.DISABLED, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testHaltConfiguration() {
+        // Halt as forceful shutdown replacing graceful system exit
+        Configuration configuration = new Configuration();
+        configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, true);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testInvalidConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, null);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);

Review comment:
       ```suggestion
           flinkSecurityManager = FlinkSecurityManager.fromConfiguration(new 
Configuration());
   ```
   You're testing the `NullPointerException` being thrown by 
`Configuration.set(..)`. Calling `fromConfiguration` on an empty configuration 
would do it. But, I guess, this test is obsolete as there is no special 
behavior in `fromConfiguration` that needs to get tested. It should use the 
default value for `ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/security/FlinkSecurityManager.java
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.security.Permission;
+import java.util.function.Consumer;
+
+/**
+ * {@code FlinkSecurityManager} to control certain behaviors that can be 
captured by Java system
+ * security manager. It can be used to control unexpected user behaviors that 
potentially impact
+ * cluster availability, for example, it can warn or prevent user code from 
terminating JVM by
+ * System.exit or halt by logging or throwing an exception. This does not 
necessarily prevent
+ * malicious users who try to tweak security manager on their own, but more 
for being dependable
+ * against user mistakes by gracefully handling them informing users rather 
than causing silent
+ * unavailability.
+ */
+public class FlinkSecurityManager extends SecurityManager {
+
+    static final Logger LOG = 
LoggerFactory.getLogger(FlinkSecurityManager.class);
+
+    /**
+     * Security manager reference lastly set to system's security manager by 
public API. As system
+     * security manager can be reset with another but still chain-called into 
this manager properly,
+     * this reference may not be referenced by System.getSecurityManager, but 
we still need to
+     * control runtime check behaviors such as monitoring exit from user code.
+     */
+    private static FlinkSecurityManager flinkSecurityManager;
+
+    private final SecurityManager originalSecurityManager;
+    private final ThreadLocal<Boolean> monitorUserSystemExit = new 
InheritableThreadLocal<>();
+    private final ClusterOptions.UserSystemExitMode userSystemExitMode;
+
+    /** The behavior to execute when the JVM exists. */
+    private final Consumer<Integer> onExitBehavior;
+
+    @VisibleForTesting
+    FlinkSecurityManager(
+            ClusterOptions.UserSystemExitMode userSystemExitMode,
+            @Nullable Consumer<Integer> onExitBehavior) {
+        this(userSystemExitMode, onExitBehavior, System.getSecurityManager());
+    }
+
+    @VisibleForTesting
+    FlinkSecurityManager(
+            ClusterOptions.UserSystemExitMode userSystemExitMode,
+            @Nullable Consumer<Integer> onExitBehavior,
+            SecurityManager originalSecurityManager) {
+        this.userSystemExitMode = 
Preconditions.checkNotNull(userSystemExitMode);
+        this.onExitBehavior = onExitBehavior;
+        this.originalSecurityManager = originalSecurityManager;
+    }
+
+    /**
+     * Instantiate FlinkUserSecurityManager from configuration. Return null if 
no security manager
+     * check is needed, so that a caller can skip setting security manager 
avoiding runtime check
+     * cost, if there is no security check set up already. Use {@link 
#setFromConfiguration} helper,
+     * which handles disabled case.
+     *
+     * @param configuration to instantiate the security manager from
+     * @return FlinkUserSecurityManager instantiated based on configuration. 
Return null if
+     *     disabled.
+     */
+    @VisibleForTesting
+    static FlinkSecurityManager fromConfiguration(Configuration configuration) 
{

Review comment:
       We have a special parameter combination where we might have to add (at 
least) a warning: `HALT_ON_SYSTEM_EXIT ` is implicitly disabled right now if 
`INTERCEPT_USER_SYSTEM_EXIT` is set to `THROW`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+    private static final int TEST_EXIT_CODE = 123;
+    SecurityManager originalSecurityManager;
+    FlinkSecurityManager flinkSecurityManager;
+
+    @Before
+    public void setUp() {
+        originalSecurityManager = System.getSecurityManager();
+    }
+
+    @After
+    public void tearDown() {
+        System.setSecurityManager(originalSecurityManager);
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testThrowUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testToggleUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testPerThreadThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        // Async thread test before enabling monitoring ensures it does not 
throw while prestarting
+        // worker thread, which is to be unmonitored and tested after enabling 
monitoring enabled.
+        CompletableFuture<Void> future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        // This threaded exit should be allowed as thread is not spawned while 
monitor is enabled.
+        future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+    }
+
+    @Test
+    public void testInheritedThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        CheckedThread thread =
+                new CheckedThread() {
+                    @Override
+                    public void go() {
+                        try {
+                            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                            fail();
+                        } catch (UserSystemExitException ignored) {
+                        } catch (Throwable t) {
+                            fail();
+                        }
+                    }
+                };
+        thread.start();
+        thread.sync();
+    }
+
+    @Test
+    public void testLogUserExit() {
+        // Log mode enables monitor but only logging allowing exit, hence not 
expecting exception.
+        // NOTE - Do not specifically test warning logging.
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testExitBehaviorChanged() {
+        AtomicInteger exitStatus = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED,
+                        status -> exitStatus.set(status));
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test
+    public void testExitBehaviorChangedWithExistingSecurityManager() {
+        TestExitSecurityManager existingSecurityManager = new 
TestExitSecurityManager();
+        System.setSecurityManager(existingSecurityManager);
+        AtomicInteger customExitExecuted = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED, 
customExitExecuted::set);
+
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(existingSecurityManager.getExitStatus(), 
is(TEST_EXIT_CODE));
+        assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testExitBehaviorUnchangeOnThrowingUserExit() {

Review comment:
       This test is obsolete as it tests the same thing as 
`testThrowConfiguration` if I'm not mistaken?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+    private static final int TEST_EXIT_CODE = 123;
+    SecurityManager originalSecurityManager;
+    FlinkSecurityManager flinkSecurityManager;
+
+    @Before
+    public void setUp() {
+        originalSecurityManager = System.getSecurityManager();
+    }
+
+    @After
+    public void tearDown() {
+        System.setSecurityManager(originalSecurityManager);
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testThrowUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testToggleUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testPerThreadThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        // Async thread test before enabling monitoring ensures it does not 
throw while prestarting
+        // worker thread, which is to be unmonitored and tested after enabling 
monitoring enabled.
+        CompletableFuture<Void> future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        // This threaded exit should be allowed as thread is not spawned while 
monitor is enabled.
+        future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+    }
+
+    @Test
+    public void testInheritedThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        CheckedThread thread =
+                new CheckedThread() {
+                    @Override
+                    public void go() {
+                        try {
+                            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                            fail();
+                        } catch (UserSystemExitException ignored) {
+                        } catch (Throwable t) {
+                            fail();
+                        }
+                    }
+                };
+        thread.start();
+        thread.sync();
+    }
+
+    @Test
+    public void testLogUserExit() {
+        // Log mode enables monitor but only logging allowing exit, hence not 
expecting exception.
+        // NOTE - Do not specifically test warning logging.
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testExitBehaviorChanged() {

Review comment:
       ```suggestion
       public void testExitHandlerTriggered() {
   ```
   I'm gonna suggest the name change as "behaviorChanged" felt like it's too 
general to describe what the test is doing.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+    private static final int TEST_EXIT_CODE = 123;
+    SecurityManager originalSecurityManager;
+    FlinkSecurityManager flinkSecurityManager;
+
+    @Before
+    public void setUp() {
+        originalSecurityManager = System.getSecurityManager();
+    }
+
+    @After
+    public void tearDown() {
+        System.setSecurityManager(originalSecurityManager);
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testThrowUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testToggleUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testPerThreadThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        // Async thread test before enabling monitoring ensures it does not 
throw while prestarting
+        // worker thread, which is to be unmonitored and tested after enabling 
monitoring enabled.
+        CompletableFuture<Void> future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        // This threaded exit should be allowed as thread is not spawned while 
monitor is enabled.
+        future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+    }
+
+    @Test
+    public void testInheritedThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        CheckedThread thread =
+                new CheckedThread() {
+                    @Override
+                    public void go() {
+                        try {
+                            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                            fail();
+                        } catch (UserSystemExitException ignored) {
+                        } catch (Throwable t) {
+                            fail();
+                        }
+                    }
+                };
+        thread.start();
+        thread.sync();
+    }
+
+    @Test
+    public void testLogUserExit() {
+        // Log mode enables monitor but only logging allowing exit, hence not 
expecting exception.
+        // NOTE - Do not specifically test warning logging.
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testExitBehaviorChanged() {
+        AtomicInteger exitStatus = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED,
+                        status -> exitStatus.set(status));
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test
+    public void testExitBehaviorChangedWithExistingSecurityManager() {
+        TestExitSecurityManager existingSecurityManager = new 
TestExitSecurityManager();
+        System.setSecurityManager(existingSecurityManager);
+        AtomicInteger customExitExecuted = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED, 
customExitExecuted::set);
+
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(existingSecurityManager.getExitStatus(), 
is(TEST_EXIT_CODE));
+        assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testExitBehaviorUnchangeOnThrowingUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail());
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testDisabledConfiguration() {
+        // Default case (no provided option) - allowing everything, so null 
security manager is
+        // expected.
+        Configuration configuration = new Configuration();
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+
+        // Disabled case (same as default)
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
+                ClusterOptions.UserSystemExitMode.DISABLED);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+
+        // No halt (same as default)
+        configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+    }
+
+    @Test
+    public void testLogConfiguration() {
+        // Enabled - log case (logging as warning but allowing exit)
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.LOG);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.unmonitorUserSystemExit();
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+    }
+
+    @Test
+    public void testThrowConfiguration() {
+        // Enabled - throw case (disallowing by throwing exception)
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.THROW);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+
+        // Test for disabled test to check if exit is still allowed 
(fromConfiguration gives null
+        // since currently
+        // there is only one option to have a valid security manager, so test 
with constructor).
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.DISABLED, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testHaltConfiguration() {
+        // Halt as forceful shutdown replacing graceful system exit
+        Configuration configuration = new Configuration();
+        configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, true);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void testInvalidConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, null);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+    }
+
+    @Test
+    public void testExistingSecurityManagerRespected() {
+        // Don't set the following security manager directly to system, which 
makes test hang.
+        SecurityManager originalSecurityManager =
+                new SecurityManager() {
+                    @Override
+                    public void checkPermission(Permission perm) {
+                        throw new SecurityException("not allowed");
+                    }
+                };
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED,
+                        status -> Assert.fail(),
+                        originalSecurityManager);
+
+        assertThrows(
+                "not allowed",
+                SecurityException.class,
+                () -> {
+                    flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                    return null;
+                });
+    }
+
+    @Test
+    public void testRegistrationNotAllowedByExistingSecurityManager() {
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.THROW);
+
+        System.setSecurityManager(
+                new SecurityManager() {
+
+                    private boolean fired;
+
+                    @Override
+                    public void checkPermission(Permission perm) {
+                        if (!fired && 
perm.getName().equals("setSecurityManager")) {
+                            try {
+                                throw new SecurityException("not allowed");
+                            } finally {
+                                // Allow removing this manager again
+                                fired = true;
+                            }
+                        }
+                    }
+                });
+
+        assertThrows(
+                "Could not register security manager",
+                IllegalConfigurationException.class,
+                () -> {
+                    FlinkSecurityManager.setFromConfiguration(configuration);
+                    return null;
+                });
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testMultiSecurityManagersWithSetFirstAndMonitored() {
+        Configuration configuration = new Configuration();
+
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.THROW);
+        configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+
+        FlinkSecurityManager.setFromConfiguration(configuration);
+
+        TestExitSecurityManager newSecurityManager = new 
TestExitSecurityManager();
+        System.setSecurityManager(newSecurityManager);
+
+        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
+        newSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(newSecurityManager.getExitStatus(), is(TEST_EXIT_CODE));
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testMultiSecurityManagersWithSetLastAndMonitored() {
+        Configuration configuration = new Configuration();
+
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.THROW);
+        configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+
+        TestExitSecurityManager oldSecurityManager = new 
TestExitSecurityManager();
+        System.setSecurityManager(oldSecurityManager);
+
+        FlinkSecurityManager.setFromConfiguration(configuration);
+
+        FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
+        System.getSecurityManager().checkExit(TEST_EXIT_CODE);

Review comment:
       Here as well: You would have to catch the `UserSystemExitException` here 
instead of using JUnit's `expected` feature if you want to check afterwards 
whether `TestExitSecurityManager` was triggered.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+    private static final int TEST_EXIT_CODE = 123;
+    SecurityManager originalSecurityManager;
+    FlinkSecurityManager flinkSecurityManager;
+
+    @Before
+    public void setUp() {
+        originalSecurityManager = System.getSecurityManager();
+    }
+
+    @After
+    public void tearDown() {
+        System.setSecurityManager(originalSecurityManager);
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testThrowUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testToggleUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testPerThreadThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        // Async thread test before enabling monitoring ensures it does not 
throw while prestarting
+        // worker thread, which is to be unmonitored and tested after enabling 
monitoring enabled.
+        CompletableFuture<Void> future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        // This threaded exit should be allowed as thread is not spawned while 
monitor is enabled.
+        future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+    }
+
+    @Test
+    public void testInheritedThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        CheckedThread thread =
+                new CheckedThread() {
+                    @Override
+                    public void go() {
+                        try {
+                            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                            fail();
+                        } catch (UserSystemExitException ignored) {
+                        } catch (Throwable t) {
+                            fail();
+                        }
+                    }
+                };
+        thread.start();
+        thread.sync();
+    }
+
+    @Test
+    public void testLogUserExit() {
+        // Log mode enables monitor but only logging allowing exit, hence not 
expecting exception.
+        // NOTE - Do not specifically test warning logging.
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testExitBehaviorChanged() {
+        AtomicInteger exitStatus = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED,
+                        status -> exitStatus.set(status));
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test
+    public void testExitBehaviorChangedWithExistingSecurityManager() {
+        TestExitSecurityManager existingSecurityManager = new 
TestExitSecurityManager();
+        System.setSecurityManager(existingSecurityManager);
+        AtomicInteger customExitExecuted = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED, 
customExitExecuted::set);
+
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(existingSecurityManager.getExitStatus(), 
is(TEST_EXIT_CODE));
+        assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testExitBehaviorUnchangeOnThrowingUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail());
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testDisabledConfiguration() {

Review comment:
       We could split this test case up into three.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+    private static final int TEST_EXIT_CODE = 123;
+    SecurityManager originalSecurityManager;
+    FlinkSecurityManager flinkSecurityManager;

Review comment:
       Is there a reason why we have `FlinkSecurityManager` as a member instead 
of local variables within each test?

##########
File path: 
flink-core/src/main/java/org/apache/flink/configuration/ClusterOptions.java
##########
@@ -88,21 +93,28 @@
                                     .build());
 
     @Documentation.Section(Documentation.Sections.EXPERT_CLUSTER)
-    public static final ConfigOption<Boolean> HALT_ON_FATAL_ERROR =
-            key("cluster.processes.halt-on-fatal-error")
+    public static final ConfigOption<Boolean> HALT_ON_SYSTEM_EXIT =
+            key("cluster.processes.halt-on-system-exit")

Review comment:
       Thinking about it again, I realize that they behave in a slightly 
different way as the `System.exit` logic is only called in specific code 
regions (i.e. user code) whereas the halt logic applies anywhere. @mxm The halt 
logic was intended to work on the whole code base, wasn't it?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+    private static final int TEST_EXIT_CODE = 123;
+    SecurityManager originalSecurityManager;
+    FlinkSecurityManager flinkSecurityManager;
+
+    @Before
+    public void setUp() {
+        originalSecurityManager = System.getSecurityManager();
+    }
+
+    @After
+    public void tearDown() {
+        System.setSecurityManager(originalSecurityManager);
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testThrowUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testToggleUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testPerThreadThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        // Async thread test before enabling monitoring ensures it does not 
throw while prestarting
+        // worker thread, which is to be unmonitored and tested after enabling 
monitoring enabled.
+        CompletableFuture<Void> future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        // This threaded exit should be allowed as thread is not spawned while 
monitor is enabled.
+        future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+    }
+
+    @Test
+    public void testInheritedThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        CheckedThread thread =
+                new CheckedThread() {
+                    @Override
+                    public void go() {
+                        try {
+                            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                            fail();
+                        } catch (UserSystemExitException ignored) {
+                        } catch (Throwable t) {
+                            fail();
+                        }
+                    }
+                };
+        thread.start();
+        thread.sync();
+    }
+
+    @Test
+    public void testLogUserExit() {
+        // Log mode enables monitor but only logging allowing exit, hence not 
expecting exception.
+        // NOTE - Do not specifically test warning logging.
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testExitBehaviorChanged() {
+        AtomicInteger exitStatus = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED,
+                        status -> exitStatus.set(status));

Review comment:
       ```suggestion
                           status -> exitStatus::set);
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+    private static final int TEST_EXIT_CODE = 123;
+    SecurityManager originalSecurityManager;
+    FlinkSecurityManager flinkSecurityManager;
+
+    @Before
+    public void setUp() {
+        originalSecurityManager = System.getSecurityManager();
+    }
+
+    @After
+    public void tearDown() {
+        System.setSecurityManager(originalSecurityManager);
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testThrowUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testToggleUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testPerThreadThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        // Async thread test before enabling monitoring ensures it does not 
throw while prestarting
+        // worker thread, which is to be unmonitored and tested after enabling 
monitoring enabled.
+        CompletableFuture<Void> future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        // This threaded exit should be allowed as thread is not spawned while 
monitor is enabled.
+        future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+    }
+
+    @Test
+    public void testInheritedThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        CheckedThread thread =
+                new CheckedThread() {
+                    @Override
+                    public void go() {
+                        try {
+                            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                            fail();
+                        } catch (UserSystemExitException ignored) {
+                        } catch (Throwable t) {
+                            fail();
+                        }
+                    }
+                };
+        thread.start();
+        thread.sync();
+    }
+
+    @Test
+    public void testLogUserExit() {
+        // Log mode enables monitor but only logging allowing exit, hence not 
expecting exception.
+        // NOTE - Do not specifically test warning logging.
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testExitBehaviorChanged() {
+        AtomicInteger exitStatus = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED,
+                        status -> exitStatus.set(status));
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test
+    public void testExitBehaviorChangedWithExistingSecurityManager() {
+        TestExitSecurityManager existingSecurityManager = new 
TestExitSecurityManager();
+        System.setSecurityManager(existingSecurityManager);
+        AtomicInteger customExitExecuted = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED, 
customExitExecuted::set);
+
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(existingSecurityManager.getExitStatus(), 
is(TEST_EXIT_CODE));
+        assertThat(customExitExecuted.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testExitBehaviorUnchangeOnThrowingUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, status -> fail());
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testDisabledConfiguration() {
+        // Default case (no provided option) - allowing everything, so null 
security manager is
+        // expected.
+        Configuration configuration = new Configuration();
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+
+        // Disabled case (same as default)
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT,
+                ClusterOptions.UserSystemExitMode.DISABLED);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+
+        // No halt (same as default)
+        configuration.set(ClusterOptions.HALT_ON_SYSTEM_EXIT, false);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNull(flinkSecurityManager);
+    }
+
+    @Test
+    public void testLogConfiguration() {
+        // Enabled - log case (logging as warning but allowing exit)
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.LOG);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.unmonitorUserSystemExit();
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+    }
+
+    @Test
+    public void testThrowConfiguration() {
+        // Enabled - throw case (disallowing by throwing exception)
+        Configuration configuration = new Configuration();
+        configuration.set(
+                ClusterOptions.INTERCEPT_USER_SYSTEM_EXIT, 
ClusterOptions.UserSystemExitMode.THROW);
+        flinkSecurityManager = 
FlinkSecurityManager.fromConfiguration(configuration);
+        assertNotNull(flinkSecurityManager);
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+        flinkSecurityManager.monitorUserSystemExit();
+        assertTrue(flinkSecurityManager.userSystemExitMonitored());
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        assertFalse(flinkSecurityManager.userSystemExitMonitored());
+
+        // Test for disabled test to check if exit is still allowed 
(fromConfiguration gives null
+        // since currently
+        // there is only one option to have a valid security manager, so test 
with constructor).
+        flinkSecurityManager =

Review comment:
       This should be a separated test.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+    private static final int TEST_EXIT_CODE = 123;
+    SecurityManager originalSecurityManager;
+    FlinkSecurityManager flinkSecurityManager;
+
+    @Before
+    public void setUp() {
+        originalSecurityManager = System.getSecurityManager();
+    }
+
+    @After
+    public void tearDown() {
+        System.setSecurityManager(originalSecurityManager);
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testThrowUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testToggleUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testPerThreadThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        // Async thread test before enabling monitoring ensures it does not 
throw while prestarting
+        // worker thread, which is to be unmonitored and tested after enabling 
monitoring enabled.
+        CompletableFuture<Void> future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        // This threaded exit should be allowed as thread is not spawned while 
monitor is enabled.
+        future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+    }
+
+    @Test
+    public void testInheritedThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        CheckedThread thread =
+                new CheckedThread() {
+                    @Override
+                    public void go() {
+                        try {
+                            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                            fail();
+                        } catch (UserSystemExitException ignored) {
+                        } catch (Throwable t) {
+                            fail();
+                        }
+                    }
+                };
+        thread.start();
+        thread.sync();
+    }
+
+    @Test
+    public void testLogUserExit() {

Review comment:
       `testLogConfiguration` covers the same scenario - we could delete that 
test in my opinion.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/security/FlinkSecurityManagerTest.java
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.security;
+
+import org.apache.flink.configuration.ClusterOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.runtime.UserSystemExitException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.security.Permission;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.core.testutils.CommonTestUtils.assertThrows;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/** Tests for {@code FlinkUserSecurityManager}. */
+public class FlinkSecurityManagerTest extends TestLogger {
+
+    private static final int TEST_EXIT_CODE = 123;
+    SecurityManager originalSecurityManager;
+    FlinkSecurityManager flinkSecurityManager;
+
+    @Before
+    public void setUp() {
+        originalSecurityManager = System.getSecurityManager();
+    }
+
+    @After
+    public void tearDown() {
+        System.setSecurityManager(originalSecurityManager);
+    }
+
+    @Test(expected = UserSystemExitException.class)
+    public void testThrowUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testToggleUserExit() {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        flinkSecurityManager.unmonitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testPerThreadThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+        // Async thread test before enabling monitoring ensures it does not 
throw while prestarting
+        // worker thread, which is to be unmonitored and tested after enabling 
monitoring enabled.
+        CompletableFuture<Void> future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        // This threaded exit should be allowed as thread is not spawned while 
monitor is enabled.
+        future =
+                CompletableFuture.runAsync(
+                        () -> flinkSecurityManager.checkExit(TEST_EXIT_CODE), 
executorService);
+        future.get();
+    }
+
+    @Test
+    public void testInheritedThrowUserExit() throws Exception {
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.THROW, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        try {
+            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+            fail();
+        } catch (UserSystemExitException ignored) {
+        }
+        CheckedThread thread =
+                new CheckedThread() {
+                    @Override
+                    public void go() {
+                        try {
+                            flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+                            fail();
+                        } catch (UserSystemExitException ignored) {
+                        } catch (Throwable t) {
+                            fail();
+                        }
+                    }
+                };
+        thread.start();
+        thread.sync();
+    }
+
+    @Test
+    public void testLogUserExit() {
+        // Log mode enables monitor but only logging allowing exit, hence not 
expecting exception.
+        // NOTE - Do not specifically test warning logging.
+        flinkSecurityManager =
+                new 
FlinkSecurityManager(ClusterOptions.UserSystemExitMode.LOG, null);
+        flinkSecurityManager.monitorUserSystemExit();
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+    }
+
+    @Test
+    public void testExitBehaviorChanged() {
+        AtomicInteger exitStatus = new AtomicInteger(0);
+        flinkSecurityManager =
+                new FlinkSecurityManager(
+                        ClusterOptions.UserSystemExitMode.DISABLED,
+                        status -> exitStatus.set(status));
+        flinkSecurityManager.checkExit(TEST_EXIT_CODE);
+        assertThat(exitStatus.get(), is(TEST_EXIT_CODE));
+    }
+
+    @Test
+    public void testExitBehaviorChangedWithExistingSecurityManager() {

Review comment:
       ```suggestion
       public void testExitHandlerTriggeredWithExistingSecurityManager() {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to