This is an automated email from the ASF dual-hosted git repository.
dishatalreja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 959cb93ce ATLAS-5093: Ensure executors are avialable and alive before
submitting import tasks (#429)
959cb93ce is described below
commit 959cb93cece17bcb91f978c41820f92d71a203b1
Author: jackhalfalltrades <[email protected]>
AuthorDate: Thu Aug 21 15:19:56 2025 -0500
ATLAS-5093: Ensure executors are avialable and alive before submitting
import tasks (#429)
Co-authored-by: Chandra Kanth Peravelli
<[email protected]>
---
.../patches/UpdateCompositeIndexStatusPatch.java | 1 -
.../store/graph/v2/AtlasEnumDefStoreV2.java | 1 -
.../atlas/notification/ImportTaskListenerImpl.java | 46 +++++-
.../notification/ImportTaskListenerImplTest.java | 167 +++++++++++++++++++++
4 files changed, 206 insertions(+), 9 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
index c88853aa3..ce551ae1f 100644
---
a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
@@ -19,7 +19,6 @@ package org.apache.atlas.repository.patches;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.exception.AtlasBaseException;
-import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
index 426a185c1..2eb92e2be 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
@@ -313,7 +313,6 @@ class AtlasEnumDefStoreV2 extends
AtlasAbstractDefStoreV2<AtlasEnumDef> {
Exception err = null;
try {
-
// create property keys first
for (AtlasEnumElementDef element : enumDef.getElementDefs()) {
// Validate the enum element
diff --git
a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
index c7489debe..fa4ea8167 100644
---
a/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
+++
b/webapp/src/main/java/org/apache/atlas/notification/ImportTaskListenerImpl.java
@@ -58,13 +58,14 @@ import static
org.apache.atlas.AtlasErrorCode.IMPORT_QUEUEING_FAILED;
@Order(8)
@DependsOn(value = "notificationHookConsumer")
public class ImportTaskListenerImpl implements Service,
ActiveStateChangeHandler, ImportTaskListener {
- private static final Logger LOG =
LoggerFactory.getLogger(ImportTaskListenerImpl.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(ImportTaskListenerImpl.class);
- private static final String THREADNAME_PREFIX =
ImportTaskListener.class.getSimpleName();
- private static final int ASYNC_IMPORT_PERMITS = 1; // Only one
asynchronous import task is permitted
+ private static final String THREADNAME_PREFIX =
ImportTaskListener.class.getSimpleName();
+ private static final int ASYNC_IMPORT_PERMITS = 1; // Only
one asynchronous import task is permitted
+ private volatile boolean isActiveInstance = true;
+ private volatile ExecutorService executorService; // Single-thread
executor for sequential processing
private final BlockingQueue<String> requestQueue; // Blocking queue
for requests
- private final ExecutorService executorService; // Single-thread
executor for sequential processing
private final AsyncImportService asyncImportService;
private final NotificationHookConsumer notificationHookConsumer;
private final Semaphore asyncImportSemaphore;
@@ -81,8 +82,6 @@ public class ImportTaskListenerImpl implements Service,
ActiveStateChangeHandler
this.requestQueue = requestQueue;
this.asyncImportSemaphore = new Semaphore(ASYNC_IMPORT_PERMITS);
this.applicationProperties = ApplicationProperties.get();
- this.executorService = Executors.newSingleThreadExecutor(new
ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d")
- .setUncaughtExceptionHandler((thread, throwable) ->
LOG.error("Uncaught exception in thread {}: {}", thread.getName(),
throwable.getMessage(), throwable)).build());
}
@Override
@@ -109,11 +108,13 @@ public class ImportTaskListenerImpl implements Service,
ActiveStateChangeHandler
public void instanceIsActive() {
LOG.info("Reacting to active state: initializing Kafka consumers");
+ isActiveInstance = true;
startInternal();
}
@Override
public void instanceIsPassive() {
+ isActiveInstance = false;
try {
stopImport();
} finally {
@@ -166,6 +167,10 @@ public class ImportTaskListenerImpl implements Service,
ActiveStateChangeHandler
public void stopImport() {
LOG.info("Shutting down import processor...");
+ if (executorService == null) {
+ LOG.info("Executor service is already null, nothing to shut
down.");
+ return;
+ }
executorService.shutdown(); // Initiate an orderly shutdown
try {
@@ -217,6 +222,10 @@ public class ImportTaskListenerImpl implements Service,
ActiveStateChangeHandler
void startAsyncImportIfAvailable(String importId) {
LOG.info("==> startAsyncImportIfAvailable()");
+ if (!isActiveInstance) {
+ LOG.warn("Import processing attempted while instance is passive.
Skipping import.");
+ return;
+ }
try {
if (!asyncImportSemaphore.tryAcquire()) {
LOG.info("An async import is in progress, import request is
queued");
@@ -232,7 +241,12 @@ public class ImportTaskListenerImpl implements Service,
ActiveStateChangeHandler
return;
}
- executorService.submit(() -> startImportConsumer(nextImport));
+ ExecutorService exec = ensureExecutorAlive();
+ if (exec != null) {
+ exec.submit(() -> startImportConsumer(nextImport));
+ } else {
+ LOG.warn("No executor available to process import task
(instance is passive).");
+ }
} catch (Exception e) {
LOG.error("Error while starting the next import, releasing the
lock if held", e);
@@ -296,6 +310,24 @@ public class ImportTaskListenerImpl implements Service,
ActiveStateChangeHandler
(!ImportStatus.WAITING.equals(importRequest.getStatus()) &&
!ImportStatus.PROCESSING.equals(importRequest.getStatus()));
}
+ @VisibleForTesting
+ ExecutorService ensureExecutorAlive() {
+ if (!isActiveInstance) {
+ LOG.warn("Attempted to create executor while instance is passive.
No executor will be created.");
+ return null;
+ }
+ if (executorService == null || executorService.isShutdown() ||
executorService.isTerminated()) {
+ synchronized (this) {
+ if (executorService == null || executorService.isShutdown() ||
executorService.isTerminated()) {
+ executorService = Executors.newSingleThreadExecutor(new
ThreadFactoryBuilder().setNameFormat(THREADNAME_PREFIX + " thread-%d")
+ .setUncaughtExceptionHandler((thread, throwable)
-> LOG.error("Uncaught exception in thread {}: {}", thread.getName(),
throwable.getMessage(), throwable)).build());
+ LOG.info("ExecutorService was recreated.");
+ }
+ }
+ }
+ return executorService;
+ }
+
void populateRequestQueue() {
LOG.info("==> populateRequestQueue()");
diff --git
a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
index 2f5281c0d..76020d2da 100644
---
a/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
+++
b/webapp/src/test/java/org/apache/atlas/notification/ImportTaskListenerImplTest.java
@@ -37,7 +37,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -61,7 +63,9 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -286,6 +290,7 @@ public class ImportTaskListenerImplTest {
@Test
public void testStartImportConsumer_Successful() throws Exception {
+ Mockito.doReturn("import123").when(importRequest).getImportId();
when(importRequest.getStatus()).thenReturn(WAITING);
when(importRequest.getTopicName()).thenReturn("topic1");
@@ -565,6 +570,168 @@ public class ImportTaskListenerImplTest {
blockStartNextLatch.countDown();
}
+ @Test
+ public void testImportNotProcessedWhenPassive() throws Exception {
+ Mockito.doReturn("import123").when(importRequest).getImportId();
+ when(importRequest.getStatus()).thenReturn(WAITING);
+ when(requestQueue.poll(anyLong(),
any(TimeUnit.class))).thenReturn("import123");
+ importTaskListener.instanceIsPassive();
+ importTaskListener.onReceiveImportRequest(importRequest);
+ Thread.sleep(200);
+ verify(notificationHookConsumer,
never()).startAsyncImportConsumer(any(), anyString(), anyString());
+ }
+
+ @Test
+ public void testExecutorNotRecreatedWhenPassive() throws Exception {
+ Mockito.doReturn("import123").when(importRequest).getImportId();
+ when(importRequest.getStatus()).thenReturn(WAITING);
+ when(requestQueue.poll(anyLong(),
any(TimeUnit.class))).thenReturn("import123");
+ when(importRequest.getStatus()).thenReturn(WAITING);
+ when(requestQueue.poll(anyLong(),
any(TimeUnit.class))).thenReturn("import123");
+ importTaskListener.instanceIsPassive();
+ Field executorField =
ImportTaskListenerImpl.class.getDeclaredField("executorService");
+ executorField.setAccessible(true);
+ ExecutorService exec = (ExecutorService)
executorField.get(importTaskListener);
+ if (exec != null) {
+ exec.shutdownNow();
+ }
+ importTaskListener.onReceiveImportRequest(importRequest);
+ Thread.sleep(200);
+ ExecutorService execAfter = (ExecutorService)
executorField.get(importTaskListener);
+ // Should remain null when passive
+ assertTrue(execAfter == null);
+ }
+
+ @Test
+ public void testExecutorRecreatedWhenActive() throws Exception {
+ when(importRequest.getStatus()).thenReturn(WAITING);
+ when(requestQueue.poll(anyLong(),
any(TimeUnit.class))).thenReturn("import123");
+ importTaskListener.instanceIsActive();
+ Field executorField =
ImportTaskListenerImpl.class.getDeclaredField("executorService");
+ executorField.setAccessible(true);
+ ExecutorService exec = (ExecutorService)
executorField.get(importTaskListener);
+ if (exec != null) {
+ exec.shutdownNow();
+ }
+ importTaskListener.onReceiveImportRequest(importRequest);
+ Thread.sleep(200);
+ ExecutorService execAfter = (ExecutorService)
executorField.get(importTaskListener);
+ assertNotNull(execAfter);
+ assertTrue(!execAfter.isShutdown() && !execAfter.isTerminated());
+ }
+
+ @Test
+ public void ensureExecutorAliveCreatesSingleInstanceUnderConcurrency()
throws Exception {
+ // Ensure active mode and a clean executor state
+ importTaskListener.instanceIsActive();
+
+ Field execField =
ImportTaskListenerImpl.class.getDeclaredField("executorService");
+ execField.setAccessible(true);
+ execField.set(importTaskListener, null);
+
+ int threads = 64;
+ CyclicBarrier start = new CyclicBarrier(threads);
+ ExecutorService callers =
java.util.concurrent.Executors.newFixedThreadPool(threads);
+
+ List<Future<ExecutorService>> futures = new ArrayList<>();
+ for (int i = 0; i < threads; i++) {
+ futures.add(callers.submit(() -> {
+ start.await();
+ return importTaskListener.ensureExecutorAlive();
+ }));
+ }
+
+ ExecutorService first = null;
+ for (Future<ExecutorService> f : futures) {
+ ExecutorService es = f.get(10, TimeUnit.SECONDS);
+ assertNotNull(es, "Executor should be created");
+ if (first == null) {
+ first = es;
+ }
+ else {
+ assertSame(first, es, "All callers must see the same
instance");
+ }
+ }
+
+ callers.shutdownNow();
+ first.shutdownNow();
+ }
+
+ @Test
+ public void ensureExecutorAliveRecreatesOnceIfShutdownUnderConcurrency()
throws Exception {
+ // Ensure active mode
+ importTaskListener.instanceIsActive();
+
+ // First creation
+ ExecutorService first = importTaskListener.ensureExecutorAlive();
+ assertNotNull(first);
+
+ // Force recreate path: mark current as shutdown and ensure the field
holds that value
+ first.shutdown();
+
+ Field execField =
ImportTaskListenerImpl.class.getDeclaredField("executorService");
+ execField.setAccessible(true);
+ execField.set(importTaskListener, first);
+
+ int threads = 64;
+ CyclicBarrier start = new CyclicBarrier(threads);
+ ExecutorService callers =
java.util.concurrent.Executors.newFixedThreadPool(threads);
+
+ List<Future<ExecutorService>> futures = new ArrayList<>();
+ for (int i = 0; i < threads; i++) {
+ futures.add(callers.submit(() -> {
+ start.await();
+ return importTaskListener.ensureExecutorAlive();
+ }));
+ }
+
+ ExecutorService second = null;
+ for (Future<ExecutorService> f : futures) {
+ ExecutorService es = f.get(10, TimeUnit.SECONDS);
+ assertNotNull(es);
+ if (second == null) {
+ second = es;
+ }
+ else {
+ assertSame(second, es, "All callers must see the same new
instance");
+ }
+ }
+
+ assertNotSame(first, second, "Executor must be replaced after
shutdown");
+ callers.shutdownNow();
+ second.shutdownNow();
+ }
+
+ @Test
+ public void
ensureExecutorAliveReturnsNullWhenPassiveEvenUnderConcurrency() throws
Exception {
+ // Put into passive mode (ensureExecutorAlive should early-return null)
+ importTaskListener.instanceIsPassive();
+
+ Field execField =
ImportTaskListenerImpl.class.getDeclaredField("executorService");
+ execField.setAccessible(true);
+ execField.set(importTaskListener, null);
+
+ int threads = 32;
+ CyclicBarrier start = new CyclicBarrier(threads);
+ ExecutorService callers =
java.util.concurrent.Executors.newFixedThreadPool(threads);
+
+ List<Future<ExecutorService>> futures = new ArrayList<>();
+ for (int i = 0; i < threads; i++) {
+ futures.add(callers.submit(() -> {
+ start.await();
+ return importTaskListener.ensureExecutorAlive();
+ }));
+ }
+
+ for (Future<ExecutorService> f : futures) {
+ assertNull(f.get(5, TimeUnit.SECONDS), "No executor should be
created in passive mode");
+ }
+
+ // Field should remain null
+ assertNull(execField.get(importTaskListener));
+ callers.shutdownNow();
+ }
+
private void setExecutorServiceAndSemaphore(ImportTaskListenerImpl
importTaskListener, ExecutorService mockExecutor, Semaphore mockSemaphore) {
try {
Field executorField =
ImportTaskListenerImpl.class.getDeclaredField("executorService");