This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch atlas-2.5
in repository https://gitbox.apache.org/repos/asf/atlas.git

commit ada9a1100aec9e10115d64a865e93b804f733204
Author: jackhalfalltrades <[email protected]>
AuthorDate: Thu Aug 21 15:19:56 2025 -0500

    ATLAS-5093: Ensure executors are avialable and alive before submitting 
import tasks (#429)
    
    Co-authored-by: Chandra Kanth Peravelli 
<[email protected]>
    (cherry picked from commit 959cb93cece17bcb91f978c41820f92d71a203b1)
---
 .../patches/UpdateCompositeIndexStatusPatch.java   |   1 -
 .../store/graph/v2/AtlasEnumDefStoreV2.java        |   1 -
 .../atlas/notification/ImportTaskListenerImpl.java |  46 +++++-
 .../notification/ImportTaskListenerImplTest.java   | 167 +++++++++++++++++++++
 4 files changed, 206 insertions(+), 9 deletions(-)

diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
index c88853aa3..ce551ae1f 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
@@ -19,7 +19,6 @@ package org.apache.atlas.repository.patches;
 
 import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
 import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
index 426a185c1..2eb92e2be 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
@@ -313,7 +313,6 @@ class AtlasEnumDefStoreV2 extends 
AtlasAbstractDefStoreV2<AtlasEnumDef> {
         Exception            err        = null;
 
         try {
-
             // create property keys first
             for (AtlasEnumElementDef element : enumDef.getElementDefs()) {
                 // Validate the enum element
diff --git 
a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
 
b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
index c7489debe..fa4ea8167 100644
--- 
a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
+++ 
b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
@@ -58,13 +58,14 @@ import static 
org.apache.atlas.AtlasErrorCode.IMPORT_QUEUEING_FAILED;
 @Order(8)
 @DependsOn(value = "notificationHookConsumer")
 public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler, ImportTaskListener {
-    private static final Logger LOG = 
LoggerFactory.getLogger(ImportTaskListenerImpl.class);
+    private static final Logger            LOG                  = 
LoggerFactory.getLogger(ImportTaskListenerImpl.class);
 
-    private static final String THREADNAME_PREFIX    = 
ImportTaskListener.class.getSimpleName();
-    private static final int    ASYNC_IMPORT_PERMITS = 1; // Only one 
asynchronous import task is permitted
+    private static final String            THREADNAME_PREFIX    = 
ImportTaskListener.class.getSimpleName();
+    private static final int               ASYNC_IMPORT_PERMITS = 1; // Only 
one asynchronous import task is permitted
 
+    private volatile boolean               isActiveInstance     = true;
+    private volatile ExecutorService       executorService; // Single-thread 
executor for sequential processing
     private final BlockingQueue<String>    requestQueue;    // Blocking queue 
for requests
-    private final ExecutorService          executorService; // Single-thread 
executor for sequential processing
     private final AsyncImportService       asyncImportService;
     private final NotificationHookConsumer notificationHookConsumer;
     private final Semaphore                asyncImportSemaphore;
@@ -81,8 +82,6 @@ public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler
         this.requestQueue             = requestQueue;
         this.asyncImportSemaphore     = new Semaphore(ASYNC_IMPORT_PERMITS);
         this.applicationProperties    = ApplicationProperties.get();
-        this.executorService          = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d")
-                .setUncaughtExceptionHandler((thread, throwable) -> 
LOG.error("Uncaught exception in thread {}: {}", thread.getName(), 
throwable.getMessage(), throwable)).build());
     }
 
     @Override
@@ -109,11 +108,13 @@ public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler
     public void instanceIsActive() {
         LOG.info("Reacting to active state: initializing Kafka consumers");
 
+        isActiveInstance = true;
         startInternal();
     }
 
     @Override
     public void instanceIsPassive() {
+        isActiveInstance = false;
         try {
             stopImport();
         } finally {
@@ -166,6 +167,10 @@ public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler
     public void stopImport() {
         LOG.info("Shutting down import processor...");
 
+        if (executorService == null) {
+            LOG.info("Executor service is already null, nothing to shut 
down.");
+            return;
+        }
         executorService.shutdown(); // Initiate an orderly shutdown
 
         try {
@@ -217,6 +222,10 @@ public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler
     void startAsyncImportIfAvailable(String importId) {
         LOG.info("==> startAsyncImportIfAvailable()");
 
+        if (!isActiveInstance) {
+            LOG.warn("Import processing attempted while instance is passive. 
Skipping import.");
+            return;
+        }
         try {
             if (!asyncImportSemaphore.tryAcquire()) {
                 LOG.info("An async import is in progress, import request is 
queued");
@@ -232,7 +241,12 @@ public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler
                 return;
             }
 
-            executorService.submit(() -> startImportConsumer(nextImport));
+            ExecutorService exec = ensureExecutorAlive();
+            if (exec != null) {
+                exec.submit(() -> startImportConsumer(nextImport));
+            } else {
+                LOG.warn("No executor available to process import task 
(instance is passive).");
+            }
         } catch (Exception e) {
             LOG.error("Error while starting the next import, releasing the 
lock if held", e);
 
@@ -296,6 +310,24 @@ public class ImportTaskListenerImpl implements Service, 
ActiveStateChangeHandler
                 (!ImportStatus.WAITING.equals(importRequest.getStatus()) && 
!ImportStatus.PROCESSING.equals(importRequest.getStatus()));
     }
 
+    @VisibleForTesting
+    ExecutorService ensureExecutorAlive() {
+        if (!isActiveInstance) {
+            LOG.warn("Attempted to create executor while instance is passive. 
No executor will be created.");
+            return null;
+        }
+        if (executorService == null || executorService.isShutdown() || 
executorService.isTerminated()) {
+            synchronized (this) {
+                if (executorService == null || executorService.isShutdown() || 
executorService.isTerminated()) {
+                    executorService = Executors.newSingleThreadExecutor(new 
ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d")
+                            .setUncaughtExceptionHandler((thread, throwable) 
-> LOG.error("Uncaught exception in thread {}: {}", thread.getName(), 
throwable.getMessage(), throwable)).build());
+                    LOG.info("ExecutorService was recreated.");
+                }
+            }
+        }
+        return executorService;
+    }
+
     void populateRequestQueue() {
         LOG.info("==> populateRequestQueue()");
 
diff --git 
a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
 
b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
index 2f5281c0d..76020d2da 100644
--- 
a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
+++ 
b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
@@ -37,7 +37,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.BlockingDeque;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,7 +63,9 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNotSame;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
@@ -286,6 +290,7 @@ public class ImportTaskListenerImplTest {
 
     @Test
     public void testStartImportConsumer_Successful() throws Exception {
+        Mockito.doReturn("import123").when(importRequest).getImportId();
         when(importRequest.getStatus()).thenReturn(WAITING);
         when(importRequest.getTopicName()).thenReturn("topic1");
 
@@ -565,6 +570,168 @@ public class ImportTaskListenerImplTest {
         blockStartNextLatch.countDown();
     }
 
+    @Test
+    public void testImportNotProcessedWhenPassive() throws Exception {
+        Mockito.doReturn("import123").when(importRequest).getImportId();
+        when(importRequest.getStatus()).thenReturn(WAITING);
+        when(requestQueue.poll(anyLong(), 
any(TimeUnit.class))).thenReturn("import123");
+        importTaskListener.instanceIsPassive();
+        importTaskListener.onReceiveImportRequest(importRequest);
+        Thread.sleep(200);
+        verify(notificationHookConsumer, 
never()).startAsyncImportConsumer(any(), anyString(), anyString());
+    }
+
+    @Test
+    public void testExecutorNotRecreatedWhenPassive() throws Exception {
+        Mockito.doReturn("import123").when(importRequest).getImportId();
+        when(importRequest.getStatus()).thenReturn(WAITING);
+        when(requestQueue.poll(anyLong(), 
any(TimeUnit.class))).thenReturn("import123");
+        when(importRequest.getStatus()).thenReturn(WAITING);
+        when(requestQueue.poll(anyLong(), 
any(TimeUnit.class))).thenReturn("import123");
+        importTaskListener.instanceIsPassive();
+        Field executorField = 
ImportTaskListenerImpl.class.getDeclaredField("executorService");
+        executorField.setAccessible(true);
+        ExecutorService exec = (ExecutorService) 
executorField.get(importTaskListener);
+        if (exec != null) {
+            exec.shutdownNow();
+        }
+        importTaskListener.onReceiveImportRequest(importRequest);
+        Thread.sleep(200);
+        ExecutorService execAfter = (ExecutorService) 
executorField.get(importTaskListener);
+        // Should remain null when passive
+        assertTrue(execAfter == null);
+    }
+
+    @Test
+    public void testExecutorRecreatedWhenActive() throws Exception {
+        when(importRequest.getStatus()).thenReturn(WAITING);
+        when(requestQueue.poll(anyLong(), 
any(TimeUnit.class))).thenReturn("import123");
+        importTaskListener.instanceIsActive();
+        Field executorField = 
ImportTaskListenerImpl.class.getDeclaredField("executorService");
+        executorField.setAccessible(true);
+        ExecutorService exec = (ExecutorService) 
executorField.get(importTaskListener);
+        if (exec != null) {
+            exec.shutdownNow();
+        }
+        importTaskListener.onReceiveImportRequest(importRequest);
+        Thread.sleep(200);
+        ExecutorService execAfter = (ExecutorService) 
executorField.get(importTaskListener);
+        assertNotNull(execAfter);
+        assertTrue(!execAfter.isShutdown() && !execAfter.isTerminated());
+    }
+
+    @Test
+    public void ensureExecutorAliveCreatesSingleInstanceUnderConcurrency() 
throws Exception {
+        // Ensure active mode and a clean executor state
+        importTaskListener.instanceIsActive();
+
+        Field execField = 
ImportTaskListenerImpl.class.getDeclaredField("executorService");
+        execField.setAccessible(true);
+        execField.set(importTaskListener, null);
+
+        int threads = 64;
+        CyclicBarrier start = new CyclicBarrier(threads);
+        ExecutorService callers = 
java.util.concurrent.Executors.newFixedThreadPool(threads);
+
+        List<Future<ExecutorService>> futures = new ArrayList<>();
+        for (int i = 0; i < threads; i++) {
+            futures.add(callers.submit(() -> {
+                start.await();
+                return importTaskListener.ensureExecutorAlive();
+            }));
+        }
+
+        ExecutorService first = null;
+        for (Future<ExecutorService> f : futures) {
+            ExecutorService es = f.get(10, TimeUnit.SECONDS);
+            assertNotNull(es, "Executor should be created");
+            if (first == null) {
+                first = es;
+            }
+            else {
+                assertSame(first, es, "All callers must see the same 
instance");
+            }
+        }
+
+        callers.shutdownNow();
+        first.shutdownNow();
+    }
+
+    @Test
+    public void ensureExecutorAliveRecreatesOnceIfShutdownUnderConcurrency() 
throws Exception {
+        // Ensure active mode
+        importTaskListener.instanceIsActive();
+
+        // First creation
+        ExecutorService first = importTaskListener.ensureExecutorAlive();
+        assertNotNull(first);
+
+        // Force recreate path: mark current as shutdown and ensure the field 
holds that value
+        first.shutdown();
+
+        Field execField = 
ImportTaskListenerImpl.class.getDeclaredField("executorService");
+        execField.setAccessible(true);
+        execField.set(importTaskListener, first);
+
+        int threads = 64;
+        CyclicBarrier start = new CyclicBarrier(threads);
+        ExecutorService callers = 
java.util.concurrent.Executors.newFixedThreadPool(threads);
+
+        List<Future<ExecutorService>> futures = new ArrayList<>();
+        for (int i = 0; i < threads; i++) {
+            futures.add(callers.submit(() -> {
+                start.await();
+                return importTaskListener.ensureExecutorAlive();
+            }));
+        }
+
+        ExecutorService second = null;
+        for (Future<ExecutorService> f : futures) {
+            ExecutorService es = f.get(10, TimeUnit.SECONDS);
+            assertNotNull(es);
+            if (second == null) {
+                second = es;
+            }
+            else {
+                assertSame(second, es, "All callers must see the same new 
instance");
+            }
+        }
+
+        assertNotSame(first, second, "Executor must be replaced after 
shutdown");
+        callers.shutdownNow();
+        second.shutdownNow();
+    }
+
+    @Test
+    public void 
ensureExecutorAliveReturnsNullWhenPassiveEvenUnderConcurrency() throws 
Exception {
+        // Put into passive mode (ensureExecutorAlive should early-return null)
+        importTaskListener.instanceIsPassive();
+
+        Field execField = 
ImportTaskListenerImpl.class.getDeclaredField("executorService");
+        execField.setAccessible(true);
+        execField.set(importTaskListener, null);
+
+        int threads = 32;
+        CyclicBarrier start = new CyclicBarrier(threads);
+        ExecutorService callers = 
java.util.concurrent.Executors.newFixedThreadPool(threads);
+
+        List<Future<ExecutorService>> futures = new ArrayList<>();
+        for (int i = 0; i < threads; i++) {
+            futures.add(callers.submit(() -> {
+                start.await();
+                return importTaskListener.ensureExecutorAlive();
+            }));
+        }
+
+        for (Future<ExecutorService> f : futures) {
+            assertNull(f.get(5, TimeUnit.SECONDS), "No executor should be 
created in passive mode");
+        }
+
+        // Field should remain null
+        assertNull(execField.get(importTaskListener));
+        callers.shutdownNow();
+    }
+
     private void setExecutorServiceAndSemaphore(ImportTaskListenerImpl 
importTaskListener, ExecutorService mockExecutor, Semaphore mockSemaphore) {
         try {
             Field executorField = 
ImportTaskListenerImpl.class.getDeclaredField("executorService");

Reply via email to