This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch atlas-2.5 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit ada9a1100aec9e10115d64a865e93b804f733204 Author: jackhalfalltrades <[email protected]> AuthorDate: Thu Aug 21 15:19:56 2025 -0500 ATLAS-5093: Ensure executors are avialable and alive before submitting import tasks (#429) Co-authored-by: Chandra Kanth Peravelli <[email protected]> (cherry picked from commit 959cb93cece17bcb91f978c41820f92d71a203b1) --- .../patches/UpdateCompositeIndexStatusPatch.java | 1 - .../store/graph/v2/AtlasEnumDefStoreV2.java | 1 - .../atlas/notification/ImportTaskListenerImpl.java | 46 +++++- .../notification/ImportTaskListenerImplTest.java | 167 +++++++++++++++++++++ 4 files changed, 206 insertions(+), 9 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java index c88853aa3..ce551ae1f 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java @@ -19,7 +19,6 @@ package org.apache.atlas.repository.patches; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.exception.AtlasBaseException; -import org.apache.atlas.repository.graphdb.AtlasGraph; import org.apache.atlas.repository.graphdb.AtlasGraphManagement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java index 426a185c1..2eb92e2be 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java @@ -313,7 +313,6 @@ class AtlasEnumDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasEnumDef> { Exception err = null; try { - // create property keys first for (AtlasEnumElementDef element : enumDef.getElementDefs()) { // Validate the enum element diff --git a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java index c7489debe..fa4ea8167 100644 --- a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java +++ b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java @@ -58,13 +58,14 @@ import static org.apache.atlas.AtlasErrorCode.IMPORT_QUEUEING_FAILED; @Order(8) @DependsOn(value = "notificationHookConsumer") public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler, ImportTaskListener { - private static final Logger LOG = LoggerFactory.getLogger(ImportTaskListenerImpl.class); + private static final Logger LOG = LoggerFactory.getLogger(ImportTaskListenerImpl.class); - private static final String THREADNAME_PREFIX = ImportTaskListener.class.getSimpleName(); - private static final int ASYNC_IMPORT_PERMITS = 1; // Only one asynchronous import task is permitted + private static final String THREADNAME_PREFIX = ImportTaskListener.class.getSimpleName(); + private static final int ASYNC_IMPORT_PERMITS = 1; // Only one asynchronous import task is permitted + private volatile boolean isActiveInstance = true; + private volatile ExecutorService executorService; // Single-thread executor for sequential processing private final BlockingQueue<String> requestQueue; // Blocking queue for requests - private final ExecutorService executorService; // Single-thread executor for sequential processing private final AsyncImportService asyncImportService; private final NotificationHookConsumer notificationHookConsumer; private final Semaphore asyncImportSemaphore; @@ -81,8 +82,6 @@ public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler this.requestQueue = requestQueue; this.asyncImportSemaphore = new Semaphore(ASYNC_IMPORT_PERMITS); this.applicationProperties = ApplicationProperties.get(); - this.executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d") - .setUncaughtExceptionHandler((thread, throwable) -> LOG.error("Uncaught exception in thread {}: {}", thread.getName(), throwable.getMessage(), throwable)).build()); } @Override @@ -109,11 +108,13 @@ public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler public void instanceIsActive() { LOG.info("Reacting to active state: initializing Kafka consumers"); + isActiveInstance = true; startInternal(); } @Override public void instanceIsPassive() { + isActiveInstance = false; try { stopImport(); } finally { @@ -166,6 +167,10 @@ public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler public void stopImport() { LOG.info("Shutting down import processor..."); + if (executorService == null) { + LOG.info("Executor service is already null, nothing to shut down."); + return; + } executorService.shutdown(); // Initiate an orderly shutdown try { @@ -217,6 +222,10 @@ public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler void startAsyncImportIfAvailable(String importId) { LOG.info("==> startAsyncImportIfAvailable()"); + if (!isActiveInstance) { + LOG.warn("Import processing attempted while instance is passive. Skipping import."); + return; + } try { if (!asyncImportSemaphore.tryAcquire()) { LOG.info("An async import is in progress, import request is queued"); @@ -232,7 +241,12 @@ public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler return; } - executorService.submit(() -> startImportConsumer(nextImport)); + ExecutorService exec = ensureExecutorAlive(); + if (exec != null) { + exec.submit(() -> startImportConsumer(nextImport)); + } else { + LOG.warn("No executor available to process import task (instance is passive)."); + } } catch (Exception e) { LOG.error("Error while starting the next import, releasing the lock if held", e); @@ -296,6 +310,24 @@ public class ImportTaskListenerImpl implements Service, ActiveStateChangeHandler (!ImportStatus.WAITING.equals(importRequest.getStatus()) && !ImportStatus.PROCESSING.equals(importRequest.getStatus())); } + @VisibleForTesting + ExecutorService ensureExecutorAlive() { + if (!isActiveInstance) { + LOG.warn("Attempted to create executor while instance is passive. No executor will be created."); + return null; + } + if (executorService == null || executorService.isShutdown() || executorService.isTerminated()) { + synchronized (this) { + if (executorService == null || executorService.isShutdown() || executorService.isTerminated()) { + executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d") + .setUncaughtExceptionHandler((thread, throwable) -> LOG.error("Uncaught exception in thread {}: {}", thread.getName(), throwable.getMessage(), throwable)).build()); + LOG.info("ExecutorService was recreated."); + } + } + } + return executorService; + } + void populateRequestQueue() { LOG.info("==> populateRequestQueue()"); diff --git a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java index 2f5281c0d..76020d2da 100644 --- a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java +++ b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java @@ -37,7 +37,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingDeque; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -61,7 +63,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNotSame; import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -286,6 +290,7 @@ public class ImportTaskListenerImplTest { @Test public void testStartImportConsumer_Successful() throws Exception { + Mockito.doReturn("import123").when(importRequest).getImportId(); when(importRequest.getStatus()).thenReturn(WAITING); when(importRequest.getTopicName()).thenReturn("topic1"); @@ -565,6 +570,168 @@ public class ImportTaskListenerImplTest { blockStartNextLatch.countDown(); } + @Test + public void testImportNotProcessedWhenPassive() throws Exception { + Mockito.doReturn("import123").when(importRequest).getImportId(); + when(importRequest.getStatus()).thenReturn(WAITING); + when(requestQueue.poll(anyLong(), any(TimeUnit.class))).thenReturn("import123"); + importTaskListener.instanceIsPassive(); + importTaskListener.onReceiveImportRequest(importRequest); + Thread.sleep(200); + verify(notificationHookConsumer, never()).startAsyncImportConsumer(any(), anyString(), anyString()); + } + + @Test + public void testExecutorNotRecreatedWhenPassive() throws Exception { + Mockito.doReturn("import123").when(importRequest).getImportId(); + when(importRequest.getStatus()).thenReturn(WAITING); + when(requestQueue.poll(anyLong(), any(TimeUnit.class))).thenReturn("import123"); + when(importRequest.getStatus()).thenReturn(WAITING); + when(requestQueue.poll(anyLong(), any(TimeUnit.class))).thenReturn("import123"); + importTaskListener.instanceIsPassive(); + Field executorField = ImportTaskListenerImpl.class.getDeclaredField("executorService"); + executorField.setAccessible(true); + ExecutorService exec = (ExecutorService) executorField.get(importTaskListener); + if (exec != null) { + exec.shutdownNow(); + } + importTaskListener.onReceiveImportRequest(importRequest); + Thread.sleep(200); + ExecutorService execAfter = (ExecutorService) executorField.get(importTaskListener); + // Should remain null when passive + assertTrue(execAfter == null); + } + + @Test + public void testExecutorRecreatedWhenActive() throws Exception { + when(importRequest.getStatus()).thenReturn(WAITING); + when(requestQueue.poll(anyLong(), any(TimeUnit.class))).thenReturn("import123"); + importTaskListener.instanceIsActive(); + Field executorField = ImportTaskListenerImpl.class.getDeclaredField("executorService"); + executorField.setAccessible(true); + ExecutorService exec = (ExecutorService) executorField.get(importTaskListener); + if (exec != null) { + exec.shutdownNow(); + } + importTaskListener.onReceiveImportRequest(importRequest); + Thread.sleep(200); + ExecutorService execAfter = (ExecutorService) executorField.get(importTaskListener); + assertNotNull(execAfter); + assertTrue(!execAfter.isShutdown() && !execAfter.isTerminated()); + } + + @Test + public void ensureExecutorAliveCreatesSingleInstanceUnderConcurrency() throws Exception { + // Ensure active mode and a clean executor state + importTaskListener.instanceIsActive(); + + Field execField = ImportTaskListenerImpl.class.getDeclaredField("executorService"); + execField.setAccessible(true); + execField.set(importTaskListener, null); + + int threads = 64; + CyclicBarrier start = new CyclicBarrier(threads); + ExecutorService callers = java.util.concurrent.Executors.newFixedThreadPool(threads); + + List<Future<ExecutorService>> futures = new ArrayList<>(); + for (int i = 0; i < threads; i++) { + futures.add(callers.submit(() -> { + start.await(); + return importTaskListener.ensureExecutorAlive(); + })); + } + + ExecutorService first = null; + for (Future<ExecutorService> f : futures) { + ExecutorService es = f.get(10, TimeUnit.SECONDS); + assertNotNull(es, "Executor should be created"); + if (first == null) { + first = es; + } + else { + assertSame(first, es, "All callers must see the same instance"); + } + } + + callers.shutdownNow(); + first.shutdownNow(); + } + + @Test + public void ensureExecutorAliveRecreatesOnceIfShutdownUnderConcurrency() throws Exception { + // Ensure active mode + importTaskListener.instanceIsActive(); + + // First creation + ExecutorService first = importTaskListener.ensureExecutorAlive(); + assertNotNull(first); + + // Force recreate path: mark current as shutdown and ensure the field holds that value + first.shutdown(); + + Field execField = ImportTaskListenerImpl.class.getDeclaredField("executorService"); + execField.setAccessible(true); + execField.set(importTaskListener, first); + + int threads = 64; + CyclicBarrier start = new CyclicBarrier(threads); + ExecutorService callers = java.util.concurrent.Executors.newFixedThreadPool(threads); + + List<Future<ExecutorService>> futures = new ArrayList<>(); + for (int i = 0; i < threads; i++) { + futures.add(callers.submit(() -> { + start.await(); + return importTaskListener.ensureExecutorAlive(); + })); + } + + ExecutorService second = null; + for (Future<ExecutorService> f : futures) { + ExecutorService es = f.get(10, TimeUnit.SECONDS); + assertNotNull(es); + if (second == null) { + second = es; + } + else { + assertSame(second, es, "All callers must see the same new instance"); + } + } + + assertNotSame(first, second, "Executor must be replaced after shutdown"); + callers.shutdownNow(); + second.shutdownNow(); + } + + @Test + public void ensureExecutorAliveReturnsNullWhenPassiveEvenUnderConcurrency() throws Exception { + // Put into passive mode (ensureExecutorAlive should early-return null) + importTaskListener.instanceIsPassive(); + + Field execField = ImportTaskListenerImpl.class.getDeclaredField("executorService"); + execField.setAccessible(true); + execField.set(importTaskListener, null); + + int threads = 32; + CyclicBarrier start = new CyclicBarrier(threads); + ExecutorService callers = java.util.concurrent.Executors.newFixedThreadPool(threads); + + List<Future<ExecutorService>> futures = new ArrayList<>(); + for (int i = 0; i < threads; i++) { + futures.add(callers.submit(() -> { + start.await(); + return importTaskListener.ensureExecutorAlive(); + })); + } + + for (Future<ExecutorService> f : futures) { + assertNull(f.get(5, TimeUnit.SECONDS), "No executor should be created in passive mode"); + } + + // Field should remain null + assertNull(execField.get(importTaskListener)); + callers.shutdownNow(); + } + private void setExecutorServiceAndSemaphore(ImportTaskListenerImpl importTaskListener, ExecutorService mockExecutor, Semaphore mockSemaphore) { try { Field executorField = ImportTaskListenerImpl.class.getDeclaredField("executorService");
