This is an automated email from the ASF dual-hosted git repository.

madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git


The following commit(s) were added to refs/heads/master by this push:
     new dea587dd0 ATLAS-5091: fix stale transaction warnings (#424)
dea587dd0 is described below

commit dea587dd0a0f4fc9f19b2802345a880811fd2e4b
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Wed Aug 13 23:16:45 2025 -0700

    ATLAS-5091: fix stale transaction warnings (#424)
---
 .../repository/graph/GraphBackedSearchIndexer.java | 13 ++++-
 .../repository/graph/IndexRecoveryService.java     | 24 +++++++++-
 .../patches/UpdateCompositeIndexStatusPatch.java   | 19 ++++++--
 .../store/graph/v2/AtlasEnumDefStoreV2.java        | 55 +++++++++++++++-------
 .../store/graph/v2/AtlasStructDefStoreV2.java      | 54 ++++++++++++++-------
 5 files changed, 126 insertions(+), 39 deletions(-)

diff --git 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 3abb3db3f..ee71843f7 100755
--- 
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -256,7 +256,8 @@ public class GraphBackedSearchIndexer implements 
SearchIndexer, ActiveStateChang
     public void onChange(ChangedTypeDefs changedTypeDefs) throws 
AtlasBaseException {
         LOG.debug("Processing changed typedefs {}", changedTypeDefs);
 
-        AtlasGraphManagement management = null;
+        AtlasGraphManagement management       = null;
+        boolean              isRollbackNeeded = true;
 
         try {
             management = provider.get().getManagementSystem();
@@ -288,12 +289,22 @@ public class GraphBackedSearchIndexer implements 
SearchIndexer, ActiveStateChang
             createEdgeLabels(management, changedTypeDefs.getCreatedTypeDefs());
             createEdgeLabels(management, changedTypeDefs.getUpdatedTypeDefs());
 
+            isRollbackNeeded = false;
+
             //Commit indexes
             commit(management);
         } catch (RepositoryException | IndexException e) {
             LOG.error("Failed to update indexes for changed typedefs", e);
 
+            isRollbackNeeded = false;
+
             attemptRollback(changedTypeDefs, management);
+        } finally {
+            if (isRollbackNeeded) {
+                LOG.warn("onChange({}): was not committed. Rolling back...", 
changedTypeDefs);
+
+                attemptRollback(changedTypeDefs, management);
+            }
         }
 
         notifyChangeListeners(changedTypeDefs);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
 
b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
index 277143336..659c967d8 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
@@ -24,6 +24,7 @@ import org.apache.atlas.AtlasException;
 import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
 import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
 import org.apache.atlas.repository.graphdb.AtlasVertex;
 import org.apache.atlas.service.Service;
@@ -287,7 +288,28 @@ public class IndexRecoveryService implements Service, 
ActiveStateChangeHandler {
         }
 
         private void printIndexRecoveryStats() {
-            
this.graph.getManagementSystem().printIndexRecoveryStats(txRecoveryObject);
+            AtlasGraphManagement management       = 
this.graph.getManagementSystem();
+            boolean              isRollbackNeeded = true;
+
+            try {
+                management.printIndexRecoveryStats(txRecoveryObject);
+
+                isRollbackNeeded = false;
+
+                management.commit();
+            } catch (Exception e) {
+                LOG.error("Index Recovery: printIndexRecoveryStats() failed!", 
e);
+            } finally {
+                if (isRollbackNeeded) {
+                    LOG.warn("Index Recovery: printIndexRecoveryStats() 
failed. Rolling back...");
+
+                    try {
+                        management.rollback();
+                    } catch (Exception rollbackEx) {
+                        LOG.error("Index Recovery: printIndexRecoveryStats() 
rollback failed!", rollbackEx);
+                    }
+                }
+            }
         }
     }
 
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
 
b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
index e3689e57f..c88853aa3 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.patches;
 import org.apache.atlas.AtlasConfiguration;
 import org.apache.atlas.exception.AtlasBaseException;
 import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,13 +48,25 @@ public class UpdateCompositeIndexStatusPatch extends 
AtlasPatchHandler {
             return;
         }
 
-        AtlasGraph graph = context.getGraph();
+        AtlasGraphManagement management       = 
context.getGraph().getManagementSystem();
+        boolean              isRollbackNeeded = true;
 
         try {
             LOG.info("UpdateCompositeIndexStatusPatch: Starting...");
-            graph.getManagementSystem().updateSchemaStatus();
-        } finally {
+
+            management.updateSchemaStatus();
+
+            isRollbackNeeded = false;
+
+            management.commit();
+
             LOG.info("UpdateCompositeIndexStatusPatch: Done!");
+        } finally {
+            if (isRollbackNeeded) {
+                LOG.warn("UpdateCompositeIndexStatusPatch: was not committed. 
Rolling back...");
+
+                management.rollback();
+            }
         }
 
         setStatus(UNKNOWN);
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
index 6977b737f..426a185c1 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
@@ -309,31 +309,52 @@ class AtlasEnumDefStoreV2 extends 
AtlasAbstractDefStoreV2<AtlasEnumDef> {
 
     private void createPropertyKeys(AtlasEnumDef enumDef) throws 
AtlasBaseException {
         AtlasGraphManagement management = 
typeDefStore.atlasGraph.getManagementSystem();
+        boolean              isSuccess  = false;
+        Exception            err        = null;
 
-        // create property keys first
-        for (AtlasEnumElementDef element : enumDef.getElementDefs()) {
-            // Validate the enum element
-            if (StringUtils.isEmpty(element.getValue()) || null == 
element.getOrdinal()) {
-                throw new 
AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, 
enumDef.getName(), "elementValue");
-            }
+        try {
 
-            String elemKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef, 
element.getValue());
+            // create property keys first
+            for (AtlasEnumElementDef element : enumDef.getElementDefs()) {
+                // Validate the enum element
+                if (StringUtils.isEmpty(element.getValue()) || null == 
element.getOrdinal()) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, 
enumDef.getName(), "elementValue");
+                }
 
-            createPropertyKey(encodePropertyKey(elemKey), Integer.class, 
AtlasCardinality.SINGLE, management);
-        }
+                String elemKey = 
AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef, element.getValue());
 
-        String typeDefKey      = 
AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef);
-        String defaultValueKey = 
AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef, "defaultValue");
+                createPropertyKey(encodePropertyKey(elemKey), Integer.class, 
AtlasCardinality.SINGLE, management);
+            }
 
-        createPropertyKey(encodePropertyKey(typeDefKey), Object.class, 
AtlasCardinality.SINGLE, management);
-        createPropertyKey(encodePropertyKey(defaultValueKey), String.class, 
AtlasCardinality.SINGLE, management);
+            String typeDefKey      = 
AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef);
+            String defaultValueKey = 
AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef, "defaultValue");
 
-        try {
-            management.commit();
+            createPropertyKey(encodePropertyKey(typeDefKey), Object.class, 
AtlasCardinality.SINGLE, management);
+            createPropertyKey(encodePropertyKey(defaultValueKey), 
String.class, AtlasCardinality.SINGLE, management);
+
+            isSuccess = true;
         } catch (Exception e) {
-            LOG.error("PropertyKey creation failed", e);
+            err = e;
+        } finally {
+            try {
+                if (isSuccess) {
+                    management.commit();
+                } else {
+                    management.rollback();
+                }
+            } catch (Exception e) {
+                if (err == null) {
+                    err = new AtlasBaseException(new IndexException("Index " + 
(isSuccess ? "commit" : "rollback") + " failed", e));
+                } else {
+                    LOG.error("Index {} failed", (isSuccess ? "commit" : 
"rollback"), e);
+                }
+            }
+        }
+
+        if (err != null) {
+            LOG.error("PropertyKey creation failed for enum {}", 
enumDef.getName(), err);
 
-            throw new AtlasBaseException(new IndexException("Index commit 
failed", e));
+            throw (err instanceof AtlasBaseException) ? (AtlasBaseException) 
err : new AtlasBaseException(err);
         }
     }
 }
diff --git 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
index 3c891bbcb..7ec68e707 100644
--- 
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
+++ 
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
@@ -647,32 +647,52 @@ public class AtlasStructDefStoreV2 extends 
AtlasAbstractDefStoreV2<AtlasStructDe
 
     private static void createPropertyKeys(AtlasStructDef structDef, 
AtlasTypeDefGraphStoreV2 typeDefStore) throws AtlasBaseException {
         AtlasGraphManagement management = 
typeDefStore.atlasGraph.getManagementSystem();
+        boolean              isSuccess  = false;
+        Exception            err        = null;
 
-        for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) {
-            // Validate the mandatory features of an attribute (compatibility 
with legacy type system)
-            if (StringUtils.isEmpty(attributeDef.getName())) {
-                throw new 
AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, 
structDef.getName(), "name");
-            }
+        try {
+            for (AtlasAttributeDef attributeDef : 
structDef.getAttributeDefs()) {
+                // Validate the mandatory features of an attribute 
(compatibility with legacy type system)
+                if (StringUtils.isEmpty(attributeDef.getName())) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, 
structDef.getName(), "name");
+                }
 
-            if (StringUtils.isEmpty(attributeDef.getTypeName())) {
-                throw new 
AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, 
structDef.getName(), "typeName");
-            }
+                if (StringUtils.isEmpty(attributeDef.getTypeName())) {
+                    throw new 
AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE, 
structDef.getName(), "typeName");
+                }
 
-            String propertyKey = 
AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef, attributeDef.getName());
+                String propertyKey = 
AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef, attributeDef.getName());
 
-            
createPropertyKey(AtlasGraphUtilsV2.encodePropertyKey(propertyKey), 
String.class, AtlasCardinality.SINGLE, management);
-        }
+                
createPropertyKey(AtlasGraphUtilsV2.encodePropertyKey(propertyKey), 
String.class, AtlasCardinality.SINGLE, management);
+            }
 
-        String typeNamePropertyKey = 
AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef);
+            String typeNamePropertyKey = 
AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef);
 
-        
createPropertyKey(AtlasGraphUtilsV2.encodePropertyKey(typeNamePropertyKey), 
Object.class, AtlasCardinality.SINGLE, management);
+            
createPropertyKey(AtlasGraphUtilsV2.encodePropertyKey(typeNamePropertyKey), 
Object.class, AtlasCardinality.SINGLE, management);
 
-        try {
-            management.commit();
+            isSuccess = true;
         } catch (Exception e) {
-            LOG.error("PropertyKey creation failed", e);
+            err = e;
+        } finally {
+            try {
+                if (isSuccess) {
+                    management.commit();
+                } else {
+                    management.rollback();
+                }
+            } catch (Exception e) {
+                if (err == null) {
+                    err = new AtlasBaseException(new IndexException("Index " + 
(isSuccess ? "commit" : "rollback") + " failed", e));
+                } else {
+                    LOG.error("Index {} failed", (isSuccess ? "commit" : 
"rollback"), e);
+                }
+            }
+        }
+
+        if (err != null) {
+            LOG.error("PropertyKey creation failed for structDef: {}", 
structDef, err);
 
-            throw new AtlasBaseException(new IndexException("Index commit 
failed", e));
+            throw (err instanceof AtlasBaseException) ? (AtlasBaseException) 
err : new AtlasBaseException(err);
         }
     }
 }

Reply via email to