This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch atlas-2.5 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit b64ed679832c653a166ad7b6a29451a8c5446bb7 Author: jackhalfalltrades <[email protected]> AuthorDate: Tue Sep 23 12:35:11 2025 -0500 ATLAS-5101: Enable concurrent import requests and retry locking (#439) Co-authored-by: Chandra Kanth Peravelli <[email protected]> (cherry picked from commit 4eb05db53f3ff1b7803deb0a116e46b4398689ce) --- .../atlas/repository/impexp/ImportService.java | 9 +-- .../store/graph/v2/AsyncImportTaskExecutor.java | 76 +++++++++++++++++----- .../graph/v2/AsyncImportTaskExecutorTest.java | 55 ++++++++++++++++ .../apache/atlas/web/resources/AdminResource.java | 18 +---- 4 files changed, 123 insertions(+), 35 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java index f8e8b024f..0b502515c 100644 --- a/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java +++ b/repository/src/main/java/org/apache/atlas/repository/impexp/ImportService.java @@ -247,7 +247,7 @@ public class ImportService implements AsyncImporter { @Override public void onImportTypeDef(AtlasTypesDef typesDef, String importId) throws AtlasBaseException { - LOG.info("==> onImportTypeDef(typesDef={}, importId={})", typesDef, importId); + LOG.info("==> onImportTypeDef(importId={})", importId); AtlasAsyncImportRequest importRequest = asyncImportService.fetchImportRequestByImportId(importId); @@ -272,13 +272,13 @@ public class ImportService implements AsyncImporter { asyncImportService.updateImportRequest(importRequest); - LOG.info("<== onImportTypeDef(typesDef={}, importResult={})", typesDef, importRequest.getImportResult()); + LOG.info("<== onImportTypeDef()"); } } @Override public Boolean onImportEntity(AtlasEntityWithExtInfo entityWithExtInfo, String importId, int position) throws AtlasBaseException { - LOG.info("==> onImportEntity(entityWithExtInfo={}, importId={}, position={})", entityWithExtInfo, importId, position); + LOG.info("==> onImportEntity(importId={}, position={})", importId, position); AtlasAsyncImportRequest importRequest = asyncImportService.fetchImportRequestByImportId(importId); @@ -309,6 +309,7 @@ public class ImportService implements AsyncImporter { importRequest.getImportDetails().setImportProgress(resp.right); } catch (AtlasBaseException abe) { + LOG.warn("Failed to import entity: {} at position: {} for import: {}", entityWithExtInfo.getEntity().getGuid(), position, importId, abe); failedEntitiesCounter += 1; importRequest.getImportDetails().setFailedEntitiesCount(failedEntitiesCounter); @@ -324,7 +325,7 @@ public class ImportService implements AsyncImporter { asyncImportService.updateImportRequest(importRequest); - LOG.info("<== onImportEntity(entityWithExtInfo={}, importId={}, position={})", entityWithExtInfo, importId, position); + LOG.info("<== onImportEntity(importId={}, position={})", importId, position); } if (importRequest.getImportDetails().getPublishedEntityCount() <= diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java index a6f781cfb..cb6bebdee 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutor.java @@ -44,6 +44,7 @@ import javax.inject.Inject; import java.util.Collections; import java.util.List; +import java.util.concurrent.Callable; import static org.apache.atlas.notification.NotificationInterface.NotificationType.ASYNC_IMPORT; @@ -52,6 +53,8 @@ public class AsyncImportTaskExecutor { private static final Logger LOG = LoggerFactory.getLogger(AsyncImportTaskExecutor.class); private static final String MESSAGE_SOURCE = AsyncImportTaskExecutor.class.getSimpleName(); + private static final int MAX_RETRIES = 3; + private static final int BASE_BACKOFF_MS = 500; private final AsyncImportService importService; private final NotificationInterface notificationInterface; @@ -91,14 +94,14 @@ public class AsyncImportTaskExecutor { } public void publishTypeDefNotification(AtlasAsyncImportRequest importRequest, AtlasTypesDef atlasTypesDef) throws AtlasBaseException { - LOG.info("==> publishTypeDefNotification(importRequest={}, atlasTypesDef={})", importRequest, atlasTypesDef); + LOG.info("==> publishTypeDefNotification()"); try { HookNotification typeDefImportNotification = new ImportNotification.AtlasTypesDefImportNotification(importRequest.getImportId(), importRequest.getImportResult().getUserName(), atlasTypesDef); sendToTopic(importRequest.getTopicName(), typeDefImportNotification); } finally { - LOG.info("<== publishTypeDefNotification(atlasAsyncImportRequest={})", importRequest); + LOG.info("<== publishTypeDefNotification()"); } } @@ -144,13 +147,13 @@ public class AsyncImportTaskExecutor { } finally { notificationInterface.closeProducer(ASYNC_IMPORT, importRequest.getTopicName()); - LOG.info("<== publishImportRequest(atlasAsyncImportRequest={})", importRequest); + LOG.info("<== publishImportRequest()"); } } @VisibleForTesting void publishEntityNotification(AtlasAsyncImportRequest importRequest, EntityImportStream entityImportStream) { - LOG.info("==> publishEntityNotification(atlasAsyncImportRequest={})", importRequest); + LOG.info("==> publishEntityNotification()"); int publishedEntityCounter = importRequest.getImportDetails().getPublishedEntityCount(); int failedEntityCounter = importRequest.getImportDetails().getFailedEntitiesCount(); @@ -186,7 +189,7 @@ public class AsyncImportTaskExecutor { importService.updateImportRequest(importRequest); - LOG.info("<== publishEntityNotification(atlasAsyncImportRequest={})", importRequest); + LOG.info("<== publishEntityNotification()"); } } } @@ -195,13 +198,13 @@ public class AsyncImportTaskExecutor { void skipToStartEntityPosition(AtlasAsyncImportRequest importRequest, EntityImportStream entityImportStream) { int startEntityPosition = importRequest.getImportTrackingInfo().getStartEntityPosition(); - LOG.info("==> skipToStartEntityPosition(atlasAsyncImportRequest={}): position={}", importRequest, startEntityPosition); + LOG.info("==> skipToPosition(importId={}): startEntityPosition={}", importRequest.getImportId(), startEntityPosition); while (entityImportStream.hasNext() && startEntityPosition > entityImportStream.getPosition()) { entityImportStream.next(); } - LOG.info("<== skipToStartEntityPosition(atlasAsyncImportRequest={}): position={}", importRequest, startEntityPosition); + LOG.info("<== skipToPosition()"); } @VisibleForTesting @@ -223,20 +226,22 @@ public class AsyncImportTaskExecutor { newImportRequest.setReceivedTime(System.currentTimeMillis()); newImportRequest.getImportDetails().setTotalEntitiesCount(totalEntities); newImportRequest.getImportDetails().setCreationOrder(creationOrder); - - importService.saveImportRequest(newImportRequest); - - LOG.info("registerRequest(importId={}): registered new request {}", importId, newImportRequest); - - return newImportRequest; + return withRetry(() -> { + importService.saveImportRequest(newImportRequest); + LOG.info("registerRequest(importId={}): registered new request", importId); + return newImportRequest; }, importId); } else if (ObjectUtils.equals(existingImportRequest.getStatus(), ImportStatus.STAGING)) { // if we are resuming staging, we need to update the latest request received at existingImportRequest.setReceivedTime(System.currentTimeMillis()); - importService.updateImportRequest(existingImportRequest); + return withRetry(() -> { + importService.updateImportRequest(existingImportRequest); + LOG.info("registerRequest(importId={}): resumed {}", importId, existingImportRequest); + return existingImportRequest; + }, importId); } - // handle request in STAGING / WAITING / PROCESSING status as resume + // handle request in / WAITING / PROCESSING status as resume LOG.info("registerRequest(importId={}): not a new request, resuming {}", importId, existingImportRequest); return existingImportRequest; @@ -249,6 +254,47 @@ public class AsyncImportTaskExecutor { } } + // retry to handle JanusGraph locking conflicts + private <T> T withRetry(Callable<T> action, String importId) throws AtlasBaseException { + int attempt = 0; + + while (true) { + try { + return action.call(); + } catch (Exception e) { + // detect JanusGraph lock contention by walking the cause chain + boolean lockingConflict = false; + for (Throwable c = e; c != null; c = c.getCause()) { + if ("org.janusgraph.diskstorage.locking.PermanentLockingException" + .equals(c.getClass().getName())) { + lockingConflict = true; + break; + } + } + + boolean canRetry = lockingConflict && attempt < (MAX_RETRIES - 1); + if (canRetry) { + long backoff = (long) BASE_BACKOFF_MS * (attempt + 1); + LOG.warn("Lock conflict for importId={} on attempt {}/{}, backing off {} ms", + importId, attempt + 1, MAX_RETRIES, backoff); + try { + Thread.sleep(backoff); + } catch (InterruptedException ignored) { + } + attempt++; + continue; // next attempt + } + + // Non-retryable OR last attempt failed + LOG.error("Failed to process importId={} on attempt {}/{}", importId, attempt + 1, MAX_RETRIES, e); + if (e instanceof AtlasBaseException) { + throw (AtlasBaseException) e; + } + throw new AtlasBaseException(AtlasErrorCode.IMPORT_REGISTRATION_FAILED, e); + } + } + } + private void sendToTopic(String topic, HookNotification notification) throws AtlasBaseException { try { notificationInterface.send(topic, Collections.singletonList(notification), messageSource); diff --git a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java index 6c731d016..1fe38885e 100644 --- a/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java +++ b/repository/src/test/java/org/apache/atlas/repository/store/graph/v2/AsyncImportTaskExecutorTest.java @@ -30,6 +30,7 @@ import org.apache.atlas.notification.NotificationException; import org.apache.atlas.notification.NotificationInterface; import org.apache.atlas.repository.impexp.AsyncImportService; import org.apache.atlas.repository.store.graph.v2.asyncimport.ImportTaskListener; +import org.janusgraph.diskstorage.locking.PermanentLockingException; import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.Mockito; @@ -450,6 +451,60 @@ public class AsyncImportTaskExecutorTest { assertEquals(exception.getAtlasErrorCode(), AtlasErrorCode.IMPORT_REGISTRATION_FAILED); } + @Test + public void testWithRetrySucceedsAfterLockingConflict() throws Exception { + AtlasImportResult result = mock(AtlasImportResult.class); + AtlasAsyncImportRequest newRequest = new AtlasAsyncImportRequest(result); + + // First call fails with a PermanentLockingException, second succeeds + doThrow(new RuntimeException(new PermanentLockingException("lock conflict"))) + .doNothing() + .when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class)); + + when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null); + + AtlasAsyncImportRequest response = + asyncImportTaskExecutor.registerRequest(result, "import-id", 5, Collections.emptyList()); + + assertNotNull(response); + verify(importService, times(2)).saveImportRequest(any(AtlasAsyncImportRequest.class)); + } + + @Test + public void testWithRetryFailsAfterMaxLockingConflicts() throws Exception { + AtlasImportResult result = mock(AtlasImportResult.class); + + // Always fail with PermanentLockingException + doThrow(new RuntimeException(new PermanentLockingException("lock conflict"))) + .when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class)); + + when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null); + + AtlasBaseException ex = expectThrows( + AtlasBaseException.class, + () -> asyncImportTaskExecutor.registerRequest(result, "import-id", 5, Collections.emptyList())); + + assertEquals(ex.getAtlasErrorCode(), AtlasErrorCode.IMPORT_REGISTRATION_FAILED); + verify(importService, times(3)).saveImportRequest(any(AtlasAsyncImportRequest.class)); + } + + @Test + public void testWithRetryFailsOnNonLockingException() throws Exception { + AtlasImportResult result = mock(AtlasImportResult.class); + + // Fail with a different exception + doThrow(new RuntimeException("Unexpected error")) + .when(importService).saveImportRequest(any(AtlasAsyncImportRequest.class)); + + when(importService.fetchImportRequestByImportId("import-id")).thenReturn(null); + + AtlasBaseException ex = expectThrows( + AtlasBaseException.class, + () -> asyncImportTaskExecutor.registerRequest(result, "import-id", 5, Collections.emptyList())); + + assertEquals(ex.getAtlasErrorCode(), AtlasErrorCode.IMPORT_REGISTRATION_FAILED); + } + @Test public void testAbortAsyncImportRequest() throws AtlasBaseException { AtlasAsyncImportRequest mockImportRequest = mock(AtlasAsyncImportRequest.class); diff --git a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java index 7064cc0da..22e51f77c 100755 --- a/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java +++ b/webapp/src/main/java/org/apache/atlas/web/resources/AdminResource.java @@ -703,19 +703,9 @@ public class AdminResource { AtlasAuthorizationUtils.verifyAccess(new AtlasAdminAccessRequest(AtlasPrivilege.ADMIN_IMPORT), "asyncImportData"); AtlasAsyncImportRequest asyncImportRequest; - boolean releaseExportImportLockOnCompletion = false; - try { - AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class); - boolean preventMultipleRequests = request != null && request.getOptions() != null && !request.getOptions().containsKey(AtlasImportRequest.OPTION_KEY_REPLICATED_FROM); - - if (preventMultipleRequests) { - acquireExportImportLock("import"); - - releaseExportImportLockOnCompletion = true; - } - - asyncImportRequest = importService.run(request, inputStream, Servlets.getUserName(httpServletRequest), Servlets.getHostName(httpServletRequest), AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest)); + AtlasImportRequest request = AtlasType.fromJson(jsonData, AtlasImportRequest.class); + asyncImportRequest = importService.run(request, inputStream, Servlets.getUserName(httpServletRequest), Servlets.getHostName(httpServletRequest), AtlasAuthorizationUtils.getRequestIpAddress(httpServletRequest)); } catch (AtlasBaseException excp) { if (excp.getAtlasErrorCode().equals(AtlasErrorCode.IMPORT_ATTEMPTING_EMPTY_ZIP)) { LOG.info(excp.getMessage()); @@ -731,10 +721,6 @@ public class AdminResource { throw new AtlasBaseException(excp); } finally { - if (releaseExportImportLockOnCompletion) { - releaseExportImportLock(); - } - LOG.debug("<== AdminResource.importAsync(binary)"); }
