This is an automated email from the ASF dual-hosted git repository.
madhan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 0ad88da7d ATLAS-5091: ensure commit/rollback is called on
ManagementSystem instances (#426)
0ad88da7d is described below
commit 0ad88da7d8537ba5d6a3211c4d8479bc7266dab6
Author: Madhan Neethiraj <[email protected]>
AuthorDate: Tue Aug 26 07:24:01 2025 -0700
ATLAS-5091: ensure commit/rollback is called on ManagementSystem instances
(#426)
---
.../repository/graphdb/AtlasGraphManagement.java | 15 +--
.../repository/graphdb/janus/AtlasJanusGraph.java | 21 ++--
.../graphdb/janus/AtlasJanusGraphDatabase.java | 19 ++--
.../graphdb/janus/AtlasJanusGraphIndexClient.java | 19 ----
.../graphdb/janus/AtlasJanusGraphManagement.java | 53 +++++----
.../graphdb/janus/AbstractGraphDatabaseTest.java | 33 +++---
.../graphdb/janus/AtlasJanusDatabaseTest.java | 22 ++--
.../repository/graph/GraphBackedSearchIndexer.java | 118 ++++-----------------
.../repository/graph/IndexRecoveryService.java | 32 ++----
.../repository/patches/IndexConsistencyPatch.java | 10 +-
.../atlas/repository/patches/ReIndexPatch.java | 7 +-
.../repository/patches/UniqueAttributePatch.java | 19 +++-
.../patches/UpdateCompositeIndexStatusPatch.java | 17 +--
.../store/graph/v2/AtlasEnumDefStoreV2.java | 31 +-----
.../store/graph/v2/AtlasStructDefStoreV2.java | 31 +-----
.../java/org/apache/atlas/tools/RepairIndex.java | 65 ++++++++----
16 files changed, 205 insertions(+), 307 deletions(-)
diff --git
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
index a5060408d..ba0abf464 100644
---
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
+++
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
@@ -24,7 +24,7 @@ import java.util.List;
* Management interface for a graph.
*
*/
-public interface AtlasGraphManagement {
+public interface AtlasGraphManagement extends AutoCloseable {
/**
* Checks whether a property with the given key has been defined in the
graph schema.
*
@@ -33,17 +33,6 @@ public interface AtlasGraphManagement {
*/
boolean containsPropertyKey(String key);
- /**
- * Rolls back the changes that have been made to the management system.
- */
- void rollback();
-
- /**
- * Commits the changes that have been made to the management system.
- */
-
- void commit();
-
/**
* @param propertyName
* @param propertyClass
@@ -203,4 +192,6 @@ public interface AtlasGraphManagement {
* @param txRecoveryObject
*/
void printIndexRecoveryStats(Object txRecoveryObject);
+
+ void setIsSuccess(boolean isSuccess);
}
diff --git
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index fb29e4cec..7153448a5 100644
---
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -529,15 +529,22 @@ public class AtlasJanusGraph implements
AtlasGraph<AtlasJanusVertex, AtlasJanusE
}
private Set<String> getIndexKeys(Class<? extends Element>
janusGraphElementClass) {
- JanusGraphManagement mgmt = getGraph().openManagement();
- Iterable<JanusGraphIndex> indices =
mgmt.getGraphIndexes(janusGraphElementClass);
- Set<String> result = new HashSet<>();
+ Set<String> result = new HashSet<>();
+ JanusGraphManagement mgmt = null;
- for (JanusGraphIndex index : indices) {
- result.add(index.name());
- }
+ try {
+ mgmt = getGraph().openManagement();
- mgmt.commit();
+ Iterable<JanusGraphIndex> indices =
mgmt.getGraphIndexes(janusGraphElementClass);
+
+ for (JanusGraphIndex index : indices) {
+ result.add(index.name());
+ }
+ } finally {
+ if (mgmt != null) {
+ mgmt.commit();
+ }
+ }
return result;
}
diff --git
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
index 77428ba99..e4b4f0ff7 100644
---
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
+++
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java
@@ -238,14 +238,21 @@ public class AtlasJanusGraphDatabase implements
GraphDatabase<AtlasJanusVertex,
}
static void validateIndexBackend(Configuration config) {
- String configuredIndexBackend =
config.getString(INDEX_BACKEND_CONF);
- JanusGraphManagement managementSystem =
getGraphInstance().openManagement();
- String currentIndexBackend =
managementSystem.get(INDEX_BACKEND_CONF);
+ JanusGraphManagement managementSystem = null;
+
+ try {
+ managementSystem = getGraphInstance().openManagement();
- managementSystem.commit();
+ String configuredIndexBackend =
config.getString(INDEX_BACKEND_CONF);
+ String currentIndexBackend =
managementSystem.get(INDEX_BACKEND_CONF);
- if (!configuredIndexBackend.equals(currentIndexBackend)) {
- throw new RuntimeException("Configured Index Backend " +
configuredIndexBackend + " differs from earlier configured Index Backend " +
currentIndexBackend + ". Aborting!");
+ if (!configuredIndexBackend.equals(currentIndexBackend)) {
+ throw new RuntimeException("Configured Index Backend " +
configuredIndexBackend + " differs from earlier configured Index Backend " +
currentIndexBackend + ". Aborting!");
+ }
+ } finally {
+ if (managementSystem != null) {
+ managementSystem.commit();
+ }
}
}
diff --git
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
index 8edcf7433..25b504b8a 100644
---
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
+++
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java
@@ -24,7 +24,6 @@ import org.apache.atlas.model.discovery.AtlasAggregationEntry;
import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AggregationContext;
import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient;
-import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.type.AtlasStructType.AtlasAttribute;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
@@ -484,24 +483,6 @@ public class AtlasJanusGraphIndexClient implements
AtlasGraphIndexClient {
return client != null && client.indexExists(janusVertexIndex);
}
- private void graphManagementCommit(AtlasGraphManagement management) {
- try {
- management.commit();
- } catch (Exception ex) {
- LOG.warn("Graph transaction management commit failed; attempting
rollback", ex);
-
- graphManagementRollback(management);
- }
- }
-
- private void graphManagementRollback(AtlasGraphManagement management) {
- try {
- management.rollback();
- } catch (Exception ex) {
- LOG.warn("Graph transaction management rollback failed", ex);
- }
- }
-
private SolrResponse updateFreeTextRequestHandler(SolrClient solrClient,
String collectionName, Map<String, Integer> indexFieldName2SearchWeightMap,
Solr6Index.Mode mode) throws IOException, SolrServerException,
AtlasBaseException {
String searchWeightString =
generateSearchWeightString(indexFieldName2SearchWeightMap);
String payLoadString =
generatePayLoadForFreeText("update-requesthandler", searchWeightString);
diff --git
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
index a1db859b6..c9fa2e4cd 100644
---
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
+++
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
@@ -87,6 +87,7 @@ public class AtlasJanusGraphManagement implements
AtlasGraphManagement {
private final AtlasJanusGraph graph;
private final JanusGraphManagement management;
private final Set<String> newMultProperties = new HashSet<>();
+ private boolean isSuccess = false;
public AtlasJanusGraphManagement(AtlasJanusGraph graph,
JanusGraphManagement managementSystem) {
this.management = managementSystem;
@@ -107,11 +108,19 @@ public class AtlasJanusGraphManagement implements
AtlasGraphManagement {
try {
if (status == REGISTERED) {
- JanusGraphManagement management =
graph.openManagement();
- JanusGraphIndex indexToUpdate =
management.getGraphIndex(indexName);
+ JanusGraphManagement management = null;
- management.updateIndex(indexToUpdate,
ENABLE_INDEX).get();
- management.commit();
+ try {
+ management = graph.openManagement();
+
+ JanusGraphIndex indexToUpdate =
management.getGraphIndex(indexName);
+
+ management.updateIndex(indexToUpdate,
ENABLE_INDEX).get();
+ } finally {
+ if (management != null) {
+ management.commit();
+ }
+ }
GraphIndexStatusReport report =
ManagementSystem.awaitGraphIndexStatus(graph, indexName).status(ENABLED).call();
@@ -137,20 +146,22 @@ public class AtlasJanusGraphManagement implements
AtlasGraphManagement {
}
@Override
- public boolean containsPropertyKey(String propertyName) {
- return management.containsPropertyKey(propertyName);
+ public void close() throws Exception {
+ if (isSuccess) {
+ commit();
+ } else {
+ rollback();
+ }
}
@Override
- public void rollback() {
- management.rollback();
+ public void setIsSuccess(boolean isSuccess) {
+ this.isSuccess = isSuccess;
}
@Override
- public void commit() {
- graph.addMultiProperties(newMultProperties);
- newMultProperties.clear();
- management.commit();
+ public boolean containsPropertyKey(String propertyName) {
+ return management.containsPropertyKey(propertyName);
}
@Override
@@ -325,12 +336,8 @@ public class AtlasJanusGraphManagement implements
AtlasGraphManagement {
@Override
public void updateUniqueIndexesForConsistencyLock() {
- try {
- setConsistency(this.management, Vertex.class);
- setConsistency(this.management, Edge.class);
- } finally {
- commit();
- }
+ setConsistency(this.management, Vertex.class);
+ setConsistency(this.management, Edge.class);
}
@Override
@@ -423,6 +430,16 @@ public class AtlasJanusGraphManagement implements
AtlasGraphManagement {
}
}
+ private void rollback() {
+ management.rollback();
+ }
+
+ private void commit() {
+ graph.addMultiProperties(newMultProperties);
+ newMultProperties.clear();
+ management.commit();
+ }
+
private static void checkName(String name) {
//for some reason, name checking was removed from
StandardPropertyKeyMaker.make()
//in Janus. For consistency, do the check here.
diff --git
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AbstractGraphDatabaseTest.java
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AbstractGraphDatabaseTest.java
index bca3f32f9..b1502e3bc 100644
---
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AbstractGraphDatabaseTest.java
+++
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AbstractGraphDatabaseTest.java
@@ -56,24 +56,25 @@ public abstract class AbstractGraphDatabaseTest {
LocalSolrRunner.start();
}
- AtlasJanusGraphDatabase db = new AtlasJanusGraphDatabase();
- AtlasGraphManagement mgmt = db.getGraph().getManagementSystem();
+ AtlasJanusGraphDatabase db = new AtlasJanusGraphDatabase();
- if (mgmt.getGraphIndex(BACKING_INDEX_NAME) == null) {
- mgmt.createVertexMixedIndex(BACKING_INDEX_NAME,
Constants.BACKING_INDEX, Collections.emptyList());
+ try (AtlasGraphManagement mgmt = db.getGraph().getManagementSystem()) {
+ if (mgmt.getGraphIndex(BACKING_INDEX_NAME) == null) {
+ mgmt.createVertexMixedIndex(BACKING_INDEX_NAME,
Constants.BACKING_INDEX, Collections.emptyList());
+ }
+ mgmt.makePropertyKey("age13", Integer.class,
AtlasCardinality.SINGLE);
+
+ createIndices(mgmt, "name", String.class, false,
AtlasCardinality.SINGLE);
+ createIndices(mgmt, WEIGHT_PROPERTY, Integer.class, false,
AtlasCardinality.SINGLE);
+ createIndices(mgmt, "size15", String.class, false,
AtlasCardinality.SINGLE);
+ createIndices(mgmt, "typeName", String.class, false,
AtlasCardinality.SINGLE);
+ createIndices(mgmt, "__type", String.class, false,
AtlasCardinality.SINGLE);
+ createIndices(mgmt, Constants.GUID_PROPERTY_KEY, String.class,
true, AtlasCardinality.SINGLE);
+ createIndices(mgmt, Constants.TRAIT_NAMES_PROPERTY_KEY,
String.class, false, AtlasCardinality.SET);
+ createIndices(mgmt, Constants.SUPER_TYPES_PROPERTY_KEY,
String.class, false, AtlasCardinality.SET);
+
+ mgmt.setIsSuccess(true);
}
- mgmt.makePropertyKey("age13", Integer.class, AtlasCardinality.SINGLE);
-
- createIndices(mgmt, "name", String.class, false,
AtlasCardinality.SINGLE);
- createIndices(mgmt, WEIGHT_PROPERTY, Integer.class, false,
AtlasCardinality.SINGLE);
- createIndices(mgmt, "size15", String.class, false,
AtlasCardinality.SINGLE);
- createIndices(mgmt, "typeName", String.class, false,
AtlasCardinality.SINGLE);
- createIndices(mgmt, "__type", String.class, false,
AtlasCardinality.SINGLE);
- createIndices(mgmt, Constants.GUID_PROPERTY_KEY, String.class, true,
AtlasCardinality.SINGLE);
- createIndices(mgmt, Constants.TRAIT_NAMES_PROPERTY_KEY, String.class,
false, AtlasCardinality.SET);
- createIndices(mgmt, Constants.SUPER_TYPES_PROPERTY_KEY, String.class,
false, AtlasCardinality.SET);
-
- mgmt.commit();
}
@AfterClass
diff --git
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusDatabaseTest.java
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusDatabaseTest.java
index 9f8e0bc07..5ee4203f9 100644
---
a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusDatabaseTest.java
+++
b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusDatabaseTest.java
@@ -391,21 +391,21 @@ public class AtlasJanusDatabaseTest {
atlasGraph = db.getGraph();
- AtlasGraphManagement mgmt = atlasGraph.getManagementSystem();
+ try (AtlasGraphManagement mgmt = atlasGraph.getManagementSystem())
{
+ // create the index (which defines these properties as being
mult
+ // many)
+ for (String propertyName : new String[] {"__superTypeNames",
"__traitNames"}) {
+ AtlasPropertyKey propertyKey =
mgmt.getPropertyKey(propertyName);
- // create the index (which defines these properties as being mult
- // many)
- for (String propertyName : new String[] {"__superTypeNames",
"__traitNames"}) {
- AtlasPropertyKey propertyKey =
mgmt.getPropertyKey(propertyName);
+ if (propertyKey == null) {
+ propertyKey = mgmt.makePropertyKey(propertyName,
String.class, AtlasCardinality.SET);
- if (propertyKey == null) {
- propertyKey = mgmt.makePropertyKey(propertyName,
String.class, AtlasCardinality.SET);
-
- mgmt.createVertexCompositeIndex(propertyName, false,
Collections.singletonList(propertyKey));
+ mgmt.createVertexCompositeIndex(propertyName, false,
Collections.singletonList(propertyKey));
+ }
}
- }
- mgmt.commit();
+ mgmt.setIsSuccess(true);
+ }
}
return (AtlasGraph<V, E>) atlasGraph;
diff --git
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
index ee71843f7..191dea810 100755
---
a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
+++
b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java
@@ -256,12 +256,7 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
public void onChange(ChangedTypeDefs changedTypeDefs) throws
AtlasBaseException {
LOG.debug("Processing changed typedefs {}", changedTypeDefs);
- AtlasGraphManagement management = null;
- boolean isRollbackNeeded = true;
-
- try {
- management = provider.get().getManagementSystem();
-
+ try (AtlasGraphManagement management =
provider.get().getManagementSystem()) {
// Update index for newly created types
if
(CollectionUtils.isNotEmpty(changedTypeDefs.getCreatedTypeDefs())) {
for (AtlasBaseTypeDef typeDef :
changedTypeDefs.getCreatedTypeDefs()) {
@@ -289,22 +284,11 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
createEdgeLabels(management, changedTypeDefs.getCreatedTypeDefs());
createEdgeLabels(management, changedTypeDefs.getUpdatedTypeDefs());
- isRollbackNeeded = false;
-
- //Commit indexes
- commit(management);
- } catch (RepositoryException | IndexException e) {
+ management.setIsSuccess(true);
+ } catch (Exception e) {
LOG.error("Failed to update indexes for changed typedefs", e);
-
- isRollbackNeeded = false;
-
- attemptRollback(changedTypeDefs, management);
} finally {
- if (isRollbackNeeded) {
- LOG.warn("onChange({}): was not committed. Rolling back...",
changedTypeDefs);
-
- attemptRollback(changedTypeDefs, management);
- }
+ recomputeIndexedKeys = true;
}
notifyChangeListeners(changedTypeDefs);
@@ -319,33 +303,26 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
typeDefs.addAll(typeRegistry.getAllEntityDefs());
typeDefs.addAll(typeRegistry.getAllBusinessMetadataDefs());
- ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(null, new
ArrayList<>(typeDefs), null);
- AtlasGraphManagement management = null;
-
- try {
- management = provider.get().getManagementSystem();
+ ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(null, new
ArrayList<>(typeDefs), null);
+ try (AtlasGraphManagement management =
provider.get().getManagementSystem()) {
//resolve index fields names
resolveIndexFieldNames(management, changedTypeDefs);
//Commit indexes
- commit(management);
+ management.setIsSuccess(true);
notifyInitializationCompletion(changedTypeDefs);
- } catch (RepositoryException | IndexException e) {
+ } catch (Exception e) {
LOG.error("Failed to update indexes for changed typedefs", e);
-
- attemptRollback(changedTypeDefs, management);
+ } finally {
+ recomputeIndexedKeys = true;
}
}
public Set<String> getVertexIndexKeys() {
if (recomputeIndexedKeys) {
- AtlasGraphManagement management = null;
-
- try {
- management = provider.get().getManagementSystem();
-
+ try (AtlasGraphManagement management =
provider.get().getManagementSystem()) {
if (management != null) {
AtlasGraphIndex vertexIndex =
management.getGraphIndex(VERTEX_INDEX);
@@ -361,18 +338,10 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
recomputeIndexedKeys = false;
}
- management.commit();
+ management.setIsSuccess(true);
}
} catch (Exception excp) {
LOG.error("getVertexIndexKeys(): failed to get indexedKeys
from graph", excp);
-
- if (management != null) {
- try {
- management.rollback();
- } catch (Exception e) {
- LOG.error("getVertexIndexKeys(): rollback failed", e);
- }
- }
}
}
@@ -381,11 +350,7 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
public Set<String> getEdgeIndexKeys() {
if (recomputeEdgeIndexedKeys) {
- AtlasGraphManagement management = null;
-
- try {
- management = provider.get().getManagementSystem();
-
+ try (AtlasGraphManagement management =
provider.get().getManagementSystem()) {
if (management != null) {
AtlasGraphIndex edgeIndex =
management.getGraphIndex(EDGE_INDEX);
@@ -401,18 +366,10 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
recomputeEdgeIndexedKeys = false;
}
- management.commit();
+ management.setIsSuccess(true);
}
} catch (Exception excp) {
LOG.error("getEdgeIndexKeys(): failed to get indexedKeys from
graph", excp);
-
- if (management != null) {
- try {
- management.rollback();
- } catch (Exception e) {
- LOG.error("getEdgeIndexKeys(): rollback failed", e);
- }
- }
}
}
@@ -494,6 +451,8 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
createVertexCompositeIndexWithTypeName(management,
propertyClass, propertyKey, uniqueKind == UniqueKind.PER_TYPE_UNIQUE);
createVertexCompositeIndexWithSuperTypeName(management,
propertyClass, propertyKey);
}
+
+ recomputeIndexedKeys = true;
} else {
LOG.warn("Index not created for {}: propertyKey is null",
propertyName);
}
@@ -528,30 +487,6 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
}
}
- public void commit(AtlasGraphManagement management) throws IndexException {
- try {
- management.commit();
-
- recomputeIndexedKeys = true;
- } catch (Exception e) {
- LOG.error("Index commit failed", e);
-
- throw new IndexException("Index commit failed ", e);
- }
- }
-
- public void rollback(AtlasGraphManagement management) throws
IndexException {
- try {
- management.rollback();
-
- recomputeIndexedKeys = true;
- } catch (Exception e) {
- LOG.error("Index rollback failed ", e);
-
- throw new IndexException("Index rollback failed ", e);
- }
- }
-
/**
* Initializes the indices for the graph - create indices for Global
AtlasVertex Keys
*/
@@ -563,9 +498,7 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
* Initializes the indices for the graph - create indices for Global
AtlasVertex and AtlasEdge Keys
*/
private void initialize(AtlasGraph graph) throws RepositoryException,
IndexException {
- AtlasGraphManagement management = graph.getManagementSystem();
-
- try {
+ try (AtlasGraphManagement management = graph.getManagementSystem()) {
LOG.info("Creating indexes for graph.");
if (management.getGraphIndex(VERTEX_INDEX) == null) {
@@ -670,14 +603,15 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
createPropertyKey(management, RELATIONSHIPTYPE_LABEL_KEY,
String.class, SINGLE);
createPropertyKey(management,
RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, String.class, SINGLE);
- commit(management);
+ management.setIsSuccess(true);
LOG.info("Index creation for global keys complete.");
} catch (Throwable t) {
LOG.error("GraphBackedSearchIndexer.initialize() failed", t);
- rollback(management);
throw new RepositoryException(t);
+ } finally {
+ recomputeIndexedKeys = true;
}
}
@@ -1093,18 +1027,6 @@ public class GraphBackedSearchIndexer implements
SearchIndexer, ActiveStateChang
return !(INDEX_EXCLUSION_CLASSES.contains(propertyClass) ||
cardinality.isMany());
}
- private void attemptRollback(ChangedTypeDefs changedTypeDefs,
AtlasGraphManagement management) throws AtlasBaseException {
- if (null != management) {
- try {
- rollback(management);
- } catch (IndexException e) {
- LOG.error("Index rollback has failed", e);
-
- throw new
AtlasBaseException(AtlasErrorCode.INDEX_ROLLBACK_FAILED, e,
changedTypeDefs.toString());
- }
- }
- }
-
private void updateIndexForTypeDef(AtlasGraphManagement management,
AtlasBaseTypeDef typeDef) {
checkNotNull(typeDef, "Cannot index on null typedefs");
diff --git
a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
index 659c967d8..04a8d841d 100644
---
a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
+++
b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java
@@ -250,11 +250,13 @@ public class IndexRecoveryService implements Service,
ActiveStateChangeHandler {
return;
}
- try {
- txRecoveryObject =
this.graph.getManagementSystem().startIndexRecovery(startTime);
+ try (AtlasGraphManagement management =
this.graph.getManagementSystem()) {
+ txRecoveryObject = management.startIndexRecovery(startTime);
printIndexRecoveryStats();
+ management.setIsSuccess(true);
+
LOG.info("Index Recovery: Started! Recovery time: {}",
Instant.ofEpochMilli(startTime));
} catch (Exception e) {
LOG.error("Index Recovery with recovery time: {} failed",
Instant.ofEpochMilli(startTime), e);
@@ -276,10 +278,12 @@ public class IndexRecoveryService implements Service,
ActiveStateChangeHandler {
}
private void stopIndexRecovery() {
- try {
-
this.graph.getManagementSystem().stopIndexRecovery(txRecoveryObject);
+ try (AtlasGraphManagement management =
this.graph.getManagementSystem()) {
+ management.stopIndexRecovery(txRecoveryObject);
printIndexRecoveryStats();
+
+ management.setIsSuccess(true);
} catch (Exception e) {
LOG.info("Index Recovery: Stopped! Error!", e);
} finally {
@@ -288,27 +292,11 @@ public class IndexRecoveryService implements Service,
ActiveStateChangeHandler {
}
private void printIndexRecoveryStats() {
- AtlasGraphManagement management =
this.graph.getManagementSystem();
- boolean isRollbackNeeded = true;
-
- try {
+ try (AtlasGraphManagement management =
this.graph.getManagementSystem()) {
management.printIndexRecoveryStats(txRecoveryObject);
-
- isRollbackNeeded = false;
-
- management.commit();
+ management.setIsSuccess(true);
} catch (Exception e) {
LOG.error("Index Recovery: printIndexRecoveryStats() failed!",
e);
- } finally {
- if (isRollbackNeeded) {
- LOG.warn("Index Recovery: printIndexRecoveryStats()
failed. Rolling back...");
-
- try {
- management.rollback();
- } catch (Exception rollbackEx) {
- LOG.error("Index Recovery: printIndexRecoveryStats()
rollback failed!", rollbackEx);
- }
- }
}
}
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/IndexConsistencyPatch.java
b/repository/src/main/java/org/apache/atlas/repository/patches/IndexConsistencyPatch.java
index 8d0113582..b08c942be 100644
---
a/repository/src/main/java/org/apache/atlas/repository/patches/IndexConsistencyPatch.java
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/IndexConsistencyPatch.java
@@ -20,6 +20,7 @@ package org.apache.atlas.repository.patches;
import org.apache.atlas.AtlasConfiguration;
import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -49,10 +50,15 @@ public class IndexConsistencyPatch extends
AtlasPatchHandler {
AtlasGraph graph = context.getGraph();
- try {
+ try (AtlasGraphManagement management = graph.getManagementSystem()) {
LOG.info("IndexConsistencyPatch: Starting...");
-
graph.getManagementSystem().updateUniqueIndexesForConsistencyLock();
+ management.updateUniqueIndexesForConsistencyLock();
+ management.setIsSuccess(true);
+ } catch (Exception excp) {
+ LOG.warn("IndexConsistencyPatch: failed", excp);
+
+ throw (excp instanceof AtlasBaseException) ? (AtlasBaseException)
excp : new AtlasBaseException("IndexConsistencyPatch failed", excp);
} finally {
LOG.info("IndexConsistencyPatch: Done!");
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java
b/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java
index 8077c5f9c..2dbbf7c50 100644
---
a/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java
@@ -26,6 +26,7 @@ import org.apache.atlas.repository.Constants;
import org.apache.atlas.repository.graphdb.AtlasEdge;
import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasVertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -188,8 +189,10 @@ public class ReIndexPatch extends AtlasPatchHandler {
private void attemptCommit() {
for (String indexName : indexNames) {
- try {
- this.graph.getManagementSystem().reindex(indexName, list);
+ try (AtlasGraphManagement management =
this.graph.getManagementSystem()) {
+ management.reindex(indexName, list);
+
+ management.setIsSuccess(true);
} catch (IllegalStateException e) {
LOG.error("IllegalStateException: Exception", e);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
index cd0137bc8..cf364a01d 100644
---
a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java
@@ -134,9 +134,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler
{
}
private void createIndexForUniqueAttributes(String typeName,
Collection<AtlasAttribute> attributes) {
- try {
- AtlasGraphManagement management =
getGraph().getManagementSystem();
+ boolean isSuccess = false;
+ try (AtlasGraphManagement management =
getGraph().getManagementSystem()) {
for (AtlasAttribute attribute : attributes) {
String uniquePropertyName =
attribute.getVertexUniquePropertyName();
@@ -160,12 +160,21 @@ public class UniqueAttributePatch extends
AtlasPatchHandler {
AtlasAttributeDef.IndexType.STRING.equals(attribute.getIndexType()));
}
- getIndexer().commit(management);
- getGraph().commit();
+ management.setIsSuccess(true);
+
+ isSuccess = true;
LOG.info("Unique attributes: type: {}: Registered!", typeName);
- } catch (IndexException e) {
+ } catch (Exception e) {
LOG.error("Error creating index: type: {}", typeName, e);
+
+ isSuccess = false;
+ } finally {
+ if (isSuccess) {
+ getGraph().commit();
+ } else {
+ getGraph().rollback();
+ }
}
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
index ce551ae1f..f875a4d73 100644
---
a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java
@@ -47,25 +47,18 @@ public class UpdateCompositeIndexStatusPatch extends
AtlasPatchHandler {
return;
}
- AtlasGraphManagement management =
context.getGraph().getManagementSystem();
- boolean isRollbackNeeded = true;
-
- try {
+ try (AtlasGraphManagement management =
context.getGraph().getManagementSystem()) {
LOG.info("UpdateCompositeIndexStatusPatch: Starting...");
management.updateSchemaStatus();
- isRollbackNeeded = false;
-
- management.commit();
+ management.setIsSuccess(true);
LOG.info("UpdateCompositeIndexStatusPatch: Done!");
- } finally {
- if (isRollbackNeeded) {
- LOG.warn("UpdateCompositeIndexStatusPatch: was not committed.
Rolling back...");
+ } catch (Exception excp) {
+ LOG.warn("UpdateCompositeIndexStatusPatch: failed", excp);
- management.rollback();
- }
+ throw (excp instanceof AtlasBaseException) ? (AtlasBaseException)
excp : new AtlasBaseException(excp);
}
setStatus(UNKNOWN);
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
index 2eb92e2be..83f1635e7 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java
@@ -25,7 +25,6 @@ import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumElementDef;
import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -308,11 +307,7 @@ class AtlasEnumDefStoreV2 extends
AtlasAbstractDefStoreV2<AtlasEnumDef> {
}
private void createPropertyKeys(AtlasEnumDef enumDef) throws
AtlasBaseException {
- AtlasGraphManagement management =
typeDefStore.atlasGraph.getManagementSystem();
- boolean isSuccess = false;
- Exception err = null;
-
- try {
+ try (AtlasGraphManagement management =
typeDefStore.atlasGraph.getManagementSystem()) {
// create property keys first
for (AtlasEnumElementDef element : enumDef.getElementDefs()) {
// Validate the enum element
@@ -331,29 +326,11 @@ class AtlasEnumDefStoreV2 extends
AtlasAbstractDefStoreV2<AtlasEnumDef> {
createPropertyKey(encodePropertyKey(typeDefKey), Object.class,
AtlasCardinality.SINGLE, management);
createPropertyKey(encodePropertyKey(defaultValueKey),
String.class, AtlasCardinality.SINGLE, management);
- isSuccess = true;
+ management.setIsSuccess(true);
} catch (Exception e) {
- err = e;
- } finally {
- try {
- if (isSuccess) {
- management.commit();
- } else {
- management.rollback();
- }
- } catch (Exception e) {
- if (err == null) {
- err = new AtlasBaseException(new IndexException("Index " +
(isSuccess ? "commit" : "rollback") + " failed", e));
- } else {
- LOG.error("Index {} failed", (isSuccess ? "commit" :
"rollback"), e);
- }
- }
- }
-
- if (err != null) {
- LOG.error("PropertyKey creation failed for enum {}",
enumDef.getName(), err);
+ LOG.error("PropertyKey creation failed for enum {}",
enumDef.getName(), e);
- throw (err instanceof AtlasBaseException) ? (AtlasBaseException)
err : new AtlasBaseException(err);
+ throw (e instanceof AtlasBaseException) ? (AtlasBaseException) e :
new AtlasBaseException(e);
}
}
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
index 7ec68e707..a77840802 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java
@@ -28,7 +28,6 @@ import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef;
import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef;
import org.apache.atlas.repository.Constants;
-import org.apache.atlas.repository.IndexException;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasGraphManagement;
import org.apache.atlas.repository.graphdb.AtlasVertex;
@@ -646,11 +645,7 @@ public class AtlasStructDefStoreV2 extends
AtlasAbstractDefStoreV2<AtlasStructDe
}
private static void createPropertyKeys(AtlasStructDef structDef,
AtlasTypeDefGraphStoreV2 typeDefStore) throws AtlasBaseException {
- AtlasGraphManagement management =
typeDefStore.atlasGraph.getManagementSystem();
- boolean isSuccess = false;
- Exception err = null;
-
- try {
+ try (AtlasGraphManagement management =
typeDefStore.atlasGraph.getManagementSystem()) {
for (AtlasAttributeDef attributeDef :
structDef.getAttributeDefs()) {
// Validate the mandatory features of an attribute
(compatibility with legacy type system)
if (StringUtils.isEmpty(attributeDef.getName())) {
@@ -670,29 +665,11 @@ public class AtlasStructDefStoreV2 extends
AtlasAbstractDefStoreV2<AtlasStructDe
createPropertyKey(AtlasGraphUtilsV2.encodePropertyKey(typeNamePropertyKey),
Object.class, AtlasCardinality.SINGLE, management);
- isSuccess = true;
+ management.setIsSuccess(true);
} catch (Exception e) {
- err = e;
- } finally {
- try {
- if (isSuccess) {
- management.commit();
- } else {
- management.rollback();
- }
- } catch (Exception e) {
- if (err == null) {
- err = new AtlasBaseException(new IndexException("Index " +
(isSuccess ? "commit" : "rollback") + " failed", e));
- } else {
- LOG.error("Index {} failed", (isSuccess ? "commit" :
"rollback"), e);
- }
- }
- }
-
- if (err != null) {
- LOG.error("PropertyKey creation failed for structDef: {}",
structDef, err);
+ LOG.error("PropertyKey creation failed for structDef: {}",
structDef, e);
- throw (err instanceof AtlasBaseException) ? (AtlasBaseException)
err : new AtlasBaseException(err);
+ throw e instanceof AtlasBaseException ? (AtlasBaseException) e :
new AtlasBaseException(e);
}
}
}
diff --git
a/tools/atlas-index-repair/src/main/java/org/apache/atlas/tools/RepairIndex.java
b/tools/atlas-index-repair/src/main/java/org/apache/atlas/tools/RepairIndex.java
index 4d362cdbd..f3992ebff 100644
---
a/tools/atlas-index-repair/src/main/java/org/apache/atlas/tools/RepairIndex.java
+++
b/tools/atlas-index-repair/src/main/java/org/apache/atlas/tools/RepairIndex.java
@@ -143,12 +143,21 @@ public class RepairIndex {
private void restoreAll() throws Exception {
for (String indexName : getIndexes()) {
displayCrlf("Restoring: " + indexName);
- long startTime = System.currentTimeMillis();
- ManagementSystem mgmt = (ManagementSystem) graph.openManagement();
- JanusGraphIndex index = mgmt.getGraphIndex(indexName);
- mgmt.updateIndex(index, SchemaAction.REINDEX).get();
- mgmt.commit();
+ long startTime = System.currentTimeMillis();
+ ManagementSystem mgmt = null;
+
+ try {
+ mgmt = (ManagementSystem) graph.openManagement();
+
+ JanusGraphIndex index = mgmt.getGraphIndex(indexName);
+
+ mgmt.updateIndex(index, SchemaAction.REINDEX).get();
+ } finally {
+ if (mgmt != null) {
+ mgmt.commit();
+ }
+ }
ManagementSystem.awaitGraphIndexStatus(graph,
indexName).status(SchemaStatus.ENABLED).call();
@@ -175,27 +184,37 @@ public class RepairIndex {
}
private static void reindexVertex(String indexName, IndexSerializer
indexSerializer, Set<String> entityGUIDs) throws Exception {
- Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new
java.util.HashMap<>();
- ManagementSystem mgmt =
(ManagementSystem) graph.openManagement();
- StandardJanusGraphTx tx =
mgmt.getWrappedTx();
- BackendTransaction mutator =
tx.getTxHandle();
- JanusGraphIndex index =
mgmt.getGraphIndex(indexName);
- MixedIndexType indexType =
(MixedIndexType) mgmt.getSchemaVertex(index).asIndexType();
-
- for (String entityGuid : entityGUIDs) {
- for (int attemptCount = 1; attemptCount <= MAX_TRIES_ON_FAILURE;
attemptCount++) {
- AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(entityGuid);
- try {
- indexSerializer.reindexElement(vertex.getWrappedElement(),
indexType, documentsPerStore);
- break;
- } catch (Exception e) {
- displayCrlf("Exception: " + e.getMessage());
- displayCrlf("Pausing before retry..");
- Thread.sleep(2000 * attemptCount);
+ ManagementSystem mgmt = null;
+
+ try {
+ mgmt = (ManagementSystem) graph.openManagement();
+
+ StandardJanusGraphTx tx = mgmt.getWrappedTx();
+ BackendTransaction mutator = tx.getTxHandle();
+ JanusGraphIndex index = mgmt.getGraphIndex(indexName);
+ MixedIndexType indexType = (MixedIndexType)
mgmt.getSchemaVertex(index).asIndexType();
+
+ Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new
java.util.HashMap<>();
+
+ for (String entityGuid : entityGUIDs) {
+ for (int attemptCount = 1; attemptCount <=
MAX_TRIES_ON_FAILURE; attemptCount++) {
+ AtlasVertex vertex =
AtlasGraphUtilsV2.findByGuid(entityGuid);
+ try {
+
indexSerializer.reindexElement(vertex.getWrappedElement(), indexType,
documentsPerStore);
+ break;
+ } catch (Exception e) {
+ displayCrlf("Exception: " + e.getMessage());
+ displayCrlf("Pausing before retry..");
+ Thread.sleep(2000 * attemptCount);
+ }
}
}
+
mutator.getIndexTransaction(indexType.getBackingIndexName()).restore(documentsPerStore);
+ } finally {
+ if (mgmt != null) {
+ mgmt.commit();
+ }
}
-
mutator.getIndexTransaction(indexType.getBackingIndexName()).restore(documentsPerStore);
}
private static Set<String> getEntityAndReferenceGuids(String guid) throws
Exception {