This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch atlas-2.5
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/atlas-2.5 by this push:
new 570dfbe63 ATLAS-5091: fix stale transaction warnings (#424)
570dfbe63 is described below
commit 570dfbe63575cdc483da9d1b649d6acdf4bccd31
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Wed Aug 13 23:16:45 2025 -0700
ATLAS-5091: fix stale transaction warnings (#424)
(cherry picked from commit dea587dd0a0f4fc9f19b2802345a880811fd2e4b)
---
.../repository/graph/GraphBackedSearchIndexer.java | 13 ++++-
.../repository/graph/IndexRecoveryService.java | 24 +++++++++-
.../patches/UpdateCompositeIndexStatusPatch.java | 19 ++++++--
.../store/graph/v2/AtlasEnumDefStoreV2.java | 55 +++++++++++++++-------
.../store/graph/v2/AtlasStructDefStoreV2.java | 54 ++++++++++++++-------
5 files changed, 126 insertions(+), 39 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index 3abb3db3f..ee71843f7 100755
---
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -256,7 +256,8 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
public void onChange(ChangedTypeDefs changedTypeDefs) throws
AtlasBaseException {
LOG.debug("Processing changed typedefs {}", changedTypeDefs);
- AtlasGraphManagement management = null;
+ AtlasGraphManagement management = null;
+ boolean isRollbackNeeded = true;
try {
management = provider.get().getManagementSystem();
@@ -288,12 +289,22 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
createEdgeLabels(management, changedTypeDefs.getCreatedTypeDefs());
createEdgeLabels(management, changedTypeDefs.getUpdatedTypeDefs());
+ isRollbackNeeded = false;
+
//Commit indexes
commit(management);
} catch (RepositoryException | IndexException e) {
LOG.error("Failed to update indexes for changed typedefs", e);
+ isRollbackNeeded = false;
+
attemptRollback(changedTypeDefs, management);
+ } finally {
+ if (isRollbackNeeded) {
+ LOG.warn("onChange({}): was not committed. Rolling back...",
changedTypeDefs);
+
+ attemptRollback(changedTypeDefs, management);
+ }
}
notifyChangeListeners(changedTypeDefs);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
index 277143336..659c967d8 100644
---
a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
+++
b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
@@ -24,6 +24,7 @@ import org.apache.atlas.AtlasException;
import org.apache.atlas.ha.HAConfiguration;
import org.apache.atlas.listener.ActiveStateChangeHandler;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasGraphQuery;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.apache.atlas.service.Service;
@@ -287,7 +288,28 @@ public class IndexRecoveryService implements Service,
ActiveStateChangeHandler {
}
private void printIndexRecoveryStats() {
-
this.graph.getManagementSystem().printIndexRecoveryStats(txRecoveryObject);
+ AtlasGraphManagement management =
this.graph.getManagementSystem();
+ boolean isRollbackNeeded = true;
+
+ try {
+ management.printIndexRecoveryStats(txRecoveryObject);
+
+ isRollbackNeeded = false;
+
+ management.commit();
+ } catch (Exception e) {
+ LOG.error("Index Recovery: printIndexRecoveryStats() failed!",
e);
+ } finally {
+ if (isRollbackNeeded) {
+ LOG.warn("Index Recovery: printIndexRecoveryStats()
failed. Rolling back...");
+
+ try {
+ management.rollback();
+ } catch (Exception rollbackEx) {
+ LOG.error("Index Recovery: printIndexRecoveryStats()
rollback failed!", rollbackEx);
+ }
+ }
+ }
}
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
index e3689e57f..c88853aa3 100644
---
a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.patches;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,13 +48,25 @@ public class UpdateCompositeIndexStatusPatch extends
AtlasPatchHandler {
return;
}
- AtlasGraph graph = context.getGraph();
+ AtlasGraphManagement management =
context.getGraph().getManagementSystem();
+ boolean isRollbackNeeded = true;
try {
LOG.info("UpdateCompositeIndexStatusPatch: Starting...");
- graph.getManagementSystem().updateSchemaStatus();
- } finally {
+
+ management.updateSchemaStatus();
+
+ isRollbackNeeded = false;
+
+ management.commit();
+
LOG.info("UpdateCompositeIndexStatusPatch: Done!");
+ } finally {
+ if (isRollbackNeeded) {
+ LOG.warn("UpdateCompositeIndexStatusPatch: was not committed.
Rolling back...");
+
+ management.rollback();
+ }
}
setStatus(UNKNOWN);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
index 6977b737f..426a185c1 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
@@ -309,31 +309,52 @@ class AtlasEnumDefStoreV2 extends
AtlasAbstractDefStoreV2<AtlasEnumDef> {
private void createPropertyKeys(AtlasEnumDef enumDef) throws
AtlasBaseException {
AtlasGraphManagement management =
typeDefStore.atlasGraph.getManagementSystem();
+ boolean isSuccess = false;
+ Exception err = null;
- // create property keys first
- for (AtlasEnumElementDef element : enumDef.getElementDefs()) {
- // Validate the enum element
- if (StringUtils.isEmpty(element.getValue()) || null ==
element.getOrdinal()) {
- throw new
AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE,
enumDef.getName(), "elementValue");
- }
+ try {
- String elemKey = AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef,
element.getValue());
+ // create property keys first
+ for (AtlasEnumElementDef element : enumDef.getElementDefs()) {
+ // Validate the enum element
+ if (StringUtils.isEmpty(element.getValue()) || null ==
element.getOrdinal()) {
+ throw new
AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE,
enumDef.getName(), "elementValue");
+ }
- createPropertyKey(encodePropertyKey(elemKey), Integer.class,
AtlasCardinality.SINGLE, management);
- }
+ String elemKey =
AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef, element.getValue());
- String typeDefKey =
AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef);
- String defaultValueKey =
AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef, "defaultValue");
+ createPropertyKey(encodePropertyKey(elemKey), Integer.class,
AtlasCardinality.SINGLE, management);
+ }
- createPropertyKey(encodePropertyKey(typeDefKey), Object.class,
AtlasCardinality.SINGLE, management);
- createPropertyKey(encodePropertyKey(defaultValueKey), String.class,
AtlasCardinality.SINGLE, management);
+ String typeDefKey =
AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef);
+ String defaultValueKey =
AtlasGraphUtilsV2.getTypeDefPropertyKey(enumDef, "defaultValue");
- try {
- management.commit();
+ createPropertyKey(encodePropertyKey(typeDefKey), Object.class,
AtlasCardinality.SINGLE, management);
+ createPropertyKey(encodePropertyKey(defaultValueKey),
String.class, AtlasCardinality.SINGLE, management);
+
+ isSuccess = true;
} catch (Exception e) {
- LOG.error("PropertyKey creation failed", e);
+ err = e;
+ } finally {
+ try {
+ if (isSuccess) {
+ management.commit();
+ } else {
+ management.rollback();
+ }
+ } catch (Exception e) {
+ if (err == null) {
+ err = new AtlasBaseException(new IndexException("Index " +
(isSuccess ? "commit" : "rollback") + " failed", e));
+ } else {
+ LOG.error("Index {} failed", (isSuccess ? "commit" :
"rollback"), e);
+ }
+ }
+ }
+
+ if (err != null) {
+ LOG.error("PropertyKey creation failed for enum {}",
enumDef.getName(), err);
- throw new AtlasBaseException(new IndexException("Index commit
failed", e));
+ throw (err instanceof AtlasBaseException) ? (AtlasBaseException)
err : new AtlasBaseException(err);
}
}
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
index 3c891bbcb..7ec68e707 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
@@ -647,32 +647,52 @@ public class AtlasStructDefStoreV2 extends
AtlasAbstractDefStoreV2<AtlasStructDe
private static void createPropertyKeys(AtlasStructDef structDef,
AtlasTypeDefGraphStoreV2 typeDefStore) throws AtlasBaseException {
AtlasGraphManagement management =
typeDefStore.atlasGraph.getManagementSystem();
+ boolean isSuccess = false;
+ Exception err = null;
- for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) {
- // Validate the mandatory features of an attribute (compatibility
with legacy type system)
- if (StringUtils.isEmpty(attributeDef.getName())) {
- throw new
AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE,
structDef.getName(), "name");
- }
+ try {
+ for (AtlasAttributeDef attributeDef :
structDef.getAttributeDefs()) {
+ // Validate the mandatory features of an attribute
(compatibility with legacy type system)
+ if (StringUtils.isEmpty(attributeDef.getName())) {
+ throw new
AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE,
structDef.getName(), "name");
+ }
- if (StringUtils.isEmpty(attributeDef.getTypeName())) {
- throw new
AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE,
structDef.getName(), "typeName");
- }
+ if (StringUtils.isEmpty(attributeDef.getTypeName())) {
+ throw new
AtlasBaseException(AtlasErrorCode.MISSING_MANDATORY_ATTRIBUTE,
structDef.getName(), "typeName");
+ }
- String propertyKey =
AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef, attributeDef.getName());
+ String propertyKey =
AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef, attributeDef.getName());
-
createPropertyKey(AtlasGraphUtilsV2.encodePropertyKey(propertyKey),
String.class, AtlasCardinality.SINGLE, management);
- }
+
createPropertyKey(AtlasGraphUtilsV2.encodePropertyKey(propertyKey),
String.class, AtlasCardinality.SINGLE, management);
+ }
- String typeNamePropertyKey =
AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef);
+ String typeNamePropertyKey =
AtlasGraphUtilsV2.getTypeDefPropertyKey(structDef);
-
createPropertyKey(AtlasGraphUtilsV2.encodePropertyKey(typeNamePropertyKey),
Object.class, AtlasCardinality.SINGLE, management);
+
createPropertyKey(AtlasGraphUtilsV2.encodePropertyKey(typeNamePropertyKey),
Object.class, AtlasCardinality.SINGLE, management);
- try {
- management.commit();
+ isSuccess = true;
} catch (Exception e) {
- LOG.error("PropertyKey creation failed", e);
+ err = e;
+ } finally {
+ try {
+ if (isSuccess) {
+ management.commit();
+ } else {
+ management.rollback();
+ }
+ } catch (Exception e) {
+ if (err == null) {
+ err = new AtlasBaseException(new IndexException("Index " +
(isSuccess ? "commit" : "rollback") + " failed", e));
+ } else {
+ LOG.error("Index {} failed", (isSuccess ? "commit" :
"rollback"), e);
+ }
+ }
+ }
+
+ if (err != null) {
+ LOG.error("PropertyKey creation failed for structDef: {}",
structDef, err);
- throw new AtlasBaseException(new IndexException("Index commit
failed", e));
+ throw (err instanceof AtlasBaseException) ? (AtlasBaseException)
err : new AtlasBaseException(err);
}
}
}