This is an automated email from the ASF dual-hosted git repository. madhan pushed a commit to branch atlas-2.5 in repository https://gitbox.apache.org/repos/asf/atlas.git
commit 1273c79647ac9688811805b7a2c4c7922a3192e0 Author: Madhan Neethiraj <[email protected]> AuthorDate: Tue Aug 26 07:24:01 2025 -0700 ATLAS-5091: ensure commit/rollback is called on ManagementSystem instances (#426) (cherry picked from commit 0ad88da7d8537ba5d6a3211c4d8479bc7266dab6) --- .../repository/graphdb/AtlasGraphManagement.java | 15 +-- .../repository/graphdb/janus/AtlasJanusGraph.java | 21 ++-- .../graphdb/janus/AtlasJanusGraphDatabase.java | 19 ++-- .../graphdb/janus/AtlasJanusGraphIndexClient.java | 19 ---- .../graphdb/janus/AtlasJanusGraphManagement.java | 53 +++++---- .../graphdb/janus/AbstractGraphDatabaseTest.java | 33 +++--- .../graphdb/janus/AtlasJanusDatabaseTest.java | 22 ++-- .../repository/graph/GraphBackedSearchIndexer.java | 118 ++++----------------- .../repository/graph/IndexRecoveryService.java | 32 ++---- .../repository/patches/IndexConsistencyPatch.java | 10 +- .../atlas/repository/patches/ReIndexPatch.java | 7 +- .../repository/patches/UniqueAttributePatch.java | 19 +++- .../patches/UpdateCompositeIndexStatusPatch.java | 17 +-- .../store/graph/v2/AtlasEnumDefStoreV2.java | 31 +----- .../store/graph/v2/AtlasStructDefStoreV2.java | 31 +----- .../java/org/apache/atlas/tools/RepairIndex.java | 65 ++++++++---- 16 files changed, 205 insertions(+), 307 deletions(-) diff --git a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java index a5060408d..ba0abf464 100644 --- a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java +++ b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java @@ -24,7 +24,7 @@ import java.util.List; * Management interface for a graph. * */ -public interface AtlasGraphManagement { +public interface AtlasGraphManagement extends AutoCloseable { /** * Checks whether a property with the given key has been defined in the graph schema. * @@ -33,17 +33,6 @@ public interface AtlasGraphManagement { */ boolean containsPropertyKey(String key); - /** - * Rolls back the changes that have been made to the management system. - */ - void rollback(); - - /** - * Commits the changes that have been made to the management system. - */ - - void commit(); - /** * @param propertyName * @param propertyClass @@ -203,4 +192,6 @@ public interface AtlasGraphManagement { * @param txRecoveryObject */ void printIndexRecoveryStats(Object txRecoveryObject); + + void setIsSuccess(boolean isSuccess); } diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java index eeca84f54..980588556 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java @@ -517,15 +517,22 @@ public class AtlasJanusGraph implements AtlasGraph<AtlasJanusVertex, AtlasJanusE } private Set<String> getIndexKeys(Class<? extends Element> janusGraphElementClass) { - JanusGraphManagement mgmt = getGraph().openManagement(); - Iterable<JanusGraphIndex> indices = mgmt.getGraphIndexes(janusGraphElementClass); - Set<String> result = new HashSet<>(); + Set<String> result = new HashSet<>(); + JanusGraphManagement mgmt = null; - for (JanusGraphIndex index : indices) { - result.add(index.name()); - } + try { + mgmt = getGraph().openManagement(); - mgmt.commit(); + Iterable<JanusGraphIndex> indices = mgmt.getGraphIndexes(janusGraphElementClass); + + for (JanusGraphIndex index : indices) { + result.add(index.name()); + } + } finally { + if (mgmt != null) { + mgmt.commit(); + } + } return result; } diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java index 77428ba99..e4b4f0ff7 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphDatabase.java @@ -238,14 +238,21 @@ public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, } static void validateIndexBackend(Configuration config) { - String configuredIndexBackend = config.getString(INDEX_BACKEND_CONF); - JanusGraphManagement managementSystem = getGraphInstance().openManagement(); - String currentIndexBackend = managementSystem.get(INDEX_BACKEND_CONF); + JanusGraphManagement managementSystem = null; + + try { + managementSystem = getGraphInstance().openManagement(); - managementSystem.commit(); + String configuredIndexBackend = config.getString(INDEX_BACKEND_CONF); + String currentIndexBackend = managementSystem.get(INDEX_BACKEND_CONF); - if (!configuredIndexBackend.equals(currentIndexBackend)) { - throw new RuntimeException("Configured Index Backend " + configuredIndexBackend + " differs from earlier configured Index Backend " + currentIndexBackend + ". Aborting!"); + if (!configuredIndexBackend.equals(currentIndexBackend)) { + throw new RuntimeException("Configured Index Backend " + configuredIndexBackend + " differs from earlier configured Index Backend " + currentIndexBackend + ". Aborting!"); + } + } finally { + if (managementSystem != null) { + managementSystem.commit(); + } } } diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java index 8edcf7433..25b504b8a 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphIndexClient.java @@ -24,7 +24,6 @@ import org.apache.atlas.model.discovery.AtlasAggregationEntry; import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graphdb.AggregationContext; import org.apache.atlas.repository.graphdb.AtlasGraphIndexClient; -import org.apache.atlas.repository.graphdb.AtlasGraphManagement; import org.apache.atlas.type.AtlasStructType.AtlasAttribute; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.MapUtils; @@ -484,24 +483,6 @@ public class AtlasJanusGraphIndexClient implements AtlasGraphIndexClient { return client != null && client.indexExists(janusVertexIndex); } - private void graphManagementCommit(AtlasGraphManagement management) { - try { - management.commit(); - } catch (Exception ex) { - LOG.warn("Graph transaction management commit failed; attempting rollback", ex); - - graphManagementRollback(management); - } - } - - private void graphManagementRollback(AtlasGraphManagement management) { - try { - management.rollback(); - } catch (Exception ex) { - LOG.warn("Graph transaction management rollback failed", ex); - } - } - private SolrResponse updateFreeTextRequestHandler(SolrClient solrClient, String collectionName, Map<String, Integer> indexFieldName2SearchWeightMap, Solr6Index.Mode mode) throws IOException, SolrServerException, AtlasBaseException { String searchWeightString = generateSearchWeightString(indexFieldName2SearchWeightMap); String payLoadString = generatePayLoadForFreeText("update-requesthandler", searchWeightString); diff --git a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java index a1db859b6..c9fa2e4cd 100644 --- a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java +++ b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java @@ -87,6 +87,7 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement { private final AtlasJanusGraph graph; private final JanusGraphManagement management; private final Set<String> newMultProperties = new HashSet<>(); + private boolean isSuccess = false; public AtlasJanusGraphManagement(AtlasJanusGraph graph, JanusGraphManagement managementSystem) { this.management = managementSystem; @@ -107,11 +108,19 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement { try { if (status == REGISTERED) { - JanusGraphManagement management = graph.openManagement(); - JanusGraphIndex indexToUpdate = management.getGraphIndex(indexName); + JanusGraphManagement management = null; - management.updateIndex(indexToUpdate, ENABLE_INDEX).get(); - management.commit(); + try { + management = graph.openManagement(); + + JanusGraphIndex indexToUpdate = management.getGraphIndex(indexName); + + management.updateIndex(indexToUpdate, ENABLE_INDEX).get(); + } finally { + if (management != null) { + management.commit(); + } + } GraphIndexStatusReport report = ManagementSystem.awaitGraphIndexStatus(graph, indexName).status(ENABLED).call(); @@ -137,20 +146,22 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement { } @Override - public boolean containsPropertyKey(String propertyName) { - return management.containsPropertyKey(propertyName); + public void close() throws Exception { + if (isSuccess) { + commit(); + } else { + rollback(); + } } @Override - public void rollback() { - management.rollback(); + public void setIsSuccess(boolean isSuccess) { + this.isSuccess = isSuccess; } @Override - public void commit() { - graph.addMultiProperties(newMultProperties); - newMultProperties.clear(); - management.commit(); + public boolean containsPropertyKey(String propertyName) { + return management.containsPropertyKey(propertyName); } @Override @@ -325,12 +336,8 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement { @Override public void updateUniqueIndexesForConsistencyLock() { - try { - setConsistency(this.management, Vertex.class); - setConsistency(this.management, Edge.class); - } finally { - commit(); - } + setConsistency(this.management, Vertex.class); + setConsistency(this.management, Edge.class); } @Override @@ -423,6 +430,16 @@ public class AtlasJanusGraphManagement implements AtlasGraphManagement { } } + private void rollback() { + management.rollback(); + } + + private void commit() { + graph.addMultiProperties(newMultProperties); + newMultProperties.clear(); + management.commit(); + } + private static void checkName(String name) { //for some reason, name checking was removed from StandardPropertyKeyMaker.make() //in Janus. For consistency, do the check here. diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AbstractGraphDatabaseTest.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AbstractGraphDatabaseTest.java index bca3f32f9..b1502e3bc 100644 --- a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AbstractGraphDatabaseTest.java +++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AbstractGraphDatabaseTest.java @@ -56,24 +56,25 @@ public abstract class AbstractGraphDatabaseTest { LocalSolrRunner.start(); } - AtlasJanusGraphDatabase db = new AtlasJanusGraphDatabase(); - AtlasGraphManagement mgmt = db.getGraph().getManagementSystem(); + AtlasJanusGraphDatabase db = new AtlasJanusGraphDatabase(); - if (mgmt.getGraphIndex(BACKING_INDEX_NAME) == null) { - mgmt.createVertexMixedIndex(BACKING_INDEX_NAME, Constants.BACKING_INDEX, Collections.emptyList()); + try (AtlasGraphManagement mgmt = db.getGraph().getManagementSystem()) { + if (mgmt.getGraphIndex(BACKING_INDEX_NAME) == null) { + mgmt.createVertexMixedIndex(BACKING_INDEX_NAME, Constants.BACKING_INDEX, Collections.emptyList()); + } + mgmt.makePropertyKey("age13", Integer.class, AtlasCardinality.SINGLE); + + createIndices(mgmt, "name", String.class, false, AtlasCardinality.SINGLE); + createIndices(mgmt, WEIGHT_PROPERTY, Integer.class, false, AtlasCardinality.SINGLE); + createIndices(mgmt, "size15", String.class, false, AtlasCardinality.SINGLE); + createIndices(mgmt, "typeName", String.class, false, AtlasCardinality.SINGLE); + createIndices(mgmt, "__type", String.class, false, AtlasCardinality.SINGLE); + createIndices(mgmt, Constants.GUID_PROPERTY_KEY, String.class, true, AtlasCardinality.SINGLE); + createIndices(mgmt, Constants.TRAIT_NAMES_PROPERTY_KEY, String.class, false, AtlasCardinality.SET); + createIndices(mgmt, Constants.SUPER_TYPES_PROPERTY_KEY, String.class, false, AtlasCardinality.SET); + + mgmt.setIsSuccess(true); } - mgmt.makePropertyKey("age13", Integer.class, AtlasCardinality.SINGLE); - - createIndices(mgmt, "name", String.class, false, AtlasCardinality.SINGLE); - createIndices(mgmt, WEIGHT_PROPERTY, Integer.class, false, AtlasCardinality.SINGLE); - createIndices(mgmt, "size15", String.class, false, AtlasCardinality.SINGLE); - createIndices(mgmt, "typeName", String.class, false, AtlasCardinality.SINGLE); - createIndices(mgmt, "__type", String.class, false, AtlasCardinality.SINGLE); - createIndices(mgmt, Constants.GUID_PROPERTY_KEY, String.class, true, AtlasCardinality.SINGLE); - createIndices(mgmt, Constants.TRAIT_NAMES_PROPERTY_KEY, String.class, false, AtlasCardinality.SET); - createIndices(mgmt, Constants.SUPER_TYPES_PROPERTY_KEY, String.class, false, AtlasCardinality.SET); - - mgmt.commit(); } @AfterClass diff --git a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusDatabaseTest.java b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusDatabaseTest.java index 9f8e0bc07..5ee4203f9 100644 --- a/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusDatabaseTest.java +++ b/graphdb/janus/src/test/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusDatabaseTest.java @@ -391,21 +391,21 @@ public class AtlasJanusDatabaseTest { atlasGraph = db.getGraph(); - AtlasGraphManagement mgmt = atlasGraph.getManagementSystem(); + try (AtlasGraphManagement mgmt = atlasGraph.getManagementSystem()) { + // create the index (which defines these properties as being mult + // many) + for (String propertyName : new String[] {"__superTypeNames", "__traitNames"}) { + AtlasPropertyKey propertyKey = mgmt.getPropertyKey(propertyName); - // create the index (which defines these properties as being mult - // many) - for (String propertyName : new String[] {"__superTypeNames", "__traitNames"}) { - AtlasPropertyKey propertyKey = mgmt.getPropertyKey(propertyName); + if (propertyKey == null) { + propertyKey = mgmt.makePropertyKey(propertyName, String.class, AtlasCardinality.SET); - if (propertyKey == null) { - propertyKey = mgmt.makePropertyKey(propertyName, String.class, AtlasCardinality.SET); - - mgmt.createVertexCompositeIndex(propertyName, false, Collections.singletonList(propertyKey)); + mgmt.createVertexCompositeIndex(propertyName, false, Collections.singletonList(propertyKey)); + } } - } - mgmt.commit(); + mgmt.setIsSuccess(true); + } } return (AtlasGraph<V, E>) atlasGraph; diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java index ee71843f7..191dea810 100755 --- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphBackedSearchIndexer.java @@ -256,12 +256,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang public void onChange(ChangedTypeDefs changedTypeDefs) throws AtlasBaseException { LOG.debug("Processing changed typedefs {}", changedTypeDefs); - AtlasGraphManagement management = null; - boolean isRollbackNeeded = true; - - try { - management = provider.get().getManagementSystem(); - + try (AtlasGraphManagement management = provider.get().getManagementSystem()) { // Update index for newly created types if (CollectionUtils.isNotEmpty(changedTypeDefs.getCreatedTypeDefs())) { for (AtlasBaseTypeDef typeDef : changedTypeDefs.getCreatedTypeDefs()) { @@ -289,22 +284,11 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang createEdgeLabels(management, changedTypeDefs.getCreatedTypeDefs()); createEdgeLabels(management, changedTypeDefs.getUpdatedTypeDefs()); - isRollbackNeeded = false; - - //Commit indexes - commit(management); - } catch (RepositoryException | IndexException e) { + management.setIsSuccess(true); + } catch (Exception e) { LOG.error("Failed to update indexes for changed typedefs", e); - - isRollbackNeeded = false; - - attemptRollback(changedTypeDefs, management); } finally { - if (isRollbackNeeded) { - LOG.warn("onChange({}): was not committed. Rolling back...", changedTypeDefs); - - attemptRollback(changedTypeDefs, management); - } + recomputeIndexedKeys = true; } notifyChangeListeners(changedTypeDefs); @@ -319,33 +303,26 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang typeDefs.addAll(typeRegistry.getAllEntityDefs()); typeDefs.addAll(typeRegistry.getAllBusinessMetadataDefs()); - ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(null, new ArrayList<>(typeDefs), null); - AtlasGraphManagement management = null; - - try { - management = provider.get().getManagementSystem(); + ChangedTypeDefs changedTypeDefs = new ChangedTypeDefs(null, new ArrayList<>(typeDefs), null); + try (AtlasGraphManagement management = provider.get().getManagementSystem()) { //resolve index fields names resolveIndexFieldNames(management, changedTypeDefs); //Commit indexes - commit(management); + management.setIsSuccess(true); notifyInitializationCompletion(changedTypeDefs); - } catch (RepositoryException | IndexException e) { + } catch (Exception e) { LOG.error("Failed to update indexes for changed typedefs", e); - - attemptRollback(changedTypeDefs, management); + } finally { + recomputeIndexedKeys = true; } } public Set<String> getVertexIndexKeys() { if (recomputeIndexedKeys) { - AtlasGraphManagement management = null; - - try { - management = provider.get().getManagementSystem(); - + try (AtlasGraphManagement management = provider.get().getManagementSystem()) { if (management != null) { AtlasGraphIndex vertexIndex = management.getGraphIndex(VERTEX_INDEX); @@ -361,18 +338,10 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang recomputeIndexedKeys = false; } - management.commit(); + management.setIsSuccess(true); } } catch (Exception excp) { LOG.error("getVertexIndexKeys(): failed to get indexedKeys from graph", excp); - - if (management != null) { - try { - management.rollback(); - } catch (Exception e) { - LOG.error("getVertexIndexKeys(): rollback failed", e); - } - } } } @@ -381,11 +350,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang public Set<String> getEdgeIndexKeys() { if (recomputeEdgeIndexedKeys) { - AtlasGraphManagement management = null; - - try { - management = provider.get().getManagementSystem(); - + try (AtlasGraphManagement management = provider.get().getManagementSystem()) { if (management != null) { AtlasGraphIndex edgeIndex = management.getGraphIndex(EDGE_INDEX); @@ -401,18 +366,10 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang recomputeEdgeIndexedKeys = false; } - management.commit(); + management.setIsSuccess(true); } } catch (Exception excp) { LOG.error("getEdgeIndexKeys(): failed to get indexedKeys from graph", excp); - - if (management != null) { - try { - management.rollback(); - } catch (Exception e) { - LOG.error("getEdgeIndexKeys(): rollback failed", e); - } - } } } @@ -494,6 +451,8 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang createVertexCompositeIndexWithTypeName(management, propertyClass, propertyKey, uniqueKind == UniqueKind.PER_TYPE_UNIQUE); createVertexCompositeIndexWithSuperTypeName(management, propertyClass, propertyKey); } + + recomputeIndexedKeys = true; } else { LOG.warn("Index not created for {}: propertyKey is null", propertyName); } @@ -528,30 +487,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang } } - public void commit(AtlasGraphManagement management) throws IndexException { - try { - management.commit(); - - recomputeIndexedKeys = true; - } catch (Exception e) { - LOG.error("Index commit failed", e); - - throw new IndexException("Index commit failed ", e); - } - } - - public void rollback(AtlasGraphManagement management) throws IndexException { - try { - management.rollback(); - - recomputeIndexedKeys = true; - } catch (Exception e) { - LOG.error("Index rollback failed ", e); - - throw new IndexException("Index rollback failed ", e); - } - } - /** * Initializes the indices for the graph - create indices for Global AtlasVertex Keys */ @@ -563,9 +498,7 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang * Initializes the indices for the graph - create indices for Global AtlasVertex and AtlasEdge Keys */ private void initialize(AtlasGraph graph) throws RepositoryException, IndexException { - AtlasGraphManagement management = graph.getManagementSystem(); - - try { + try (AtlasGraphManagement management = graph.getManagementSystem()) { LOG.info("Creating indexes for graph."); if (management.getGraphIndex(VERTEX_INDEX) == null) { @@ -670,14 +603,15 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang createPropertyKey(management, RELATIONSHIPTYPE_LABEL_KEY, String.class, SINGLE); createPropertyKey(management, RELATIONSHIPTYPE_TAG_PROPAGATION_KEY, String.class, SINGLE); - commit(management); + management.setIsSuccess(true); LOG.info("Index creation for global keys complete."); } catch (Throwable t) { LOG.error("GraphBackedSearchIndexer.initialize() failed", t); - rollback(management); throw new RepositoryException(t); + } finally { + recomputeIndexedKeys = true; } } @@ -1093,18 +1027,6 @@ public class GraphBackedSearchIndexer implements SearchIndexer, ActiveStateChang return !(INDEX_EXCLUSION_CLASSES.contains(propertyClass) || cardinality.isMany()); } - private void attemptRollback(ChangedTypeDefs changedTypeDefs, AtlasGraphManagement management) throws AtlasBaseException { - if (null != management) { - try { - rollback(management); - } catch (IndexException e) { - LOG.error("Index rollback has failed", e); - - throw new AtlasBaseException(AtlasErrorCode.INDEX_ROLLBACK_FAILED, e, changedTypeDefs.toString()); - } - } - } - private void updateIndexForTypeDef(AtlasGraphManagement management, AtlasBaseTypeDef typeDef) { checkNotNull(typeDef, "Cannot index on null typedefs"); diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java index 659c967d8..04a8d841d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java +++ b/repository/src/main/java/org/apache/atlas/repository/graph/IndexRecoveryService.java @@ -250,11 +250,13 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { return; } - try { - txRecoveryObject = this.graph.getManagementSystem().startIndexRecovery(startTime); + try (AtlasGraphManagement management = this.graph.getManagementSystem()) { + txRecoveryObject = management.startIndexRecovery(startTime); printIndexRecoveryStats(); + management.setIsSuccess(true); + LOG.info("Index Recovery: Started! Recovery time: {}", Instant.ofEpochMilli(startTime)); } catch (Exception e) { LOG.error("Index Recovery with recovery time: {} failed", Instant.ofEpochMilli(startTime), e); @@ -276,10 +278,12 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { } private void stopIndexRecovery() { - try { - this.graph.getManagementSystem().stopIndexRecovery(txRecoveryObject); + try (AtlasGraphManagement management = this.graph.getManagementSystem()) { + management.stopIndexRecovery(txRecoveryObject); printIndexRecoveryStats(); + + management.setIsSuccess(true); } catch (Exception e) { LOG.info("Index Recovery: Stopped! Error!", e); } finally { @@ -288,27 +292,11 @@ public class IndexRecoveryService implements Service, ActiveStateChangeHandler { } private void printIndexRecoveryStats() { - AtlasGraphManagement management = this.graph.getManagementSystem(); - boolean isRollbackNeeded = true; - - try { + try (AtlasGraphManagement management = this.graph.getManagementSystem()) { management.printIndexRecoveryStats(txRecoveryObject); - - isRollbackNeeded = false; - - management.commit(); + management.setIsSuccess(true); } catch (Exception e) { LOG.error("Index Recovery: printIndexRecoveryStats() failed!", e); - } finally { - if (isRollbackNeeded) { - LOG.warn("Index Recovery: printIndexRecoveryStats() failed. Rolling back..."); - - try { - management.rollback(); - } catch (Exception rollbackEx) { - LOG.error("Index Recovery: printIndexRecoveryStats() rollback failed!", rollbackEx); - } - } } } } diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/IndexConsistencyPatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/IndexConsistencyPatch.java index 8d0113582..b08c942be 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/IndexConsistencyPatch.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/IndexConsistencyPatch.java @@ -20,6 +20,7 @@ package org.apache.atlas.repository.patches; import org.apache.atlas.AtlasConfiguration; import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasGraphManagement; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,10 +50,15 @@ public class IndexConsistencyPatch extends AtlasPatchHandler { AtlasGraph graph = context.getGraph(); - try { + try (AtlasGraphManagement management = graph.getManagementSystem()) { LOG.info("IndexConsistencyPatch: Starting..."); - graph.getManagementSystem().updateUniqueIndexesForConsistencyLock(); + management.updateUniqueIndexesForConsistencyLock(); + management.setIsSuccess(true); + } catch (Exception excp) { + LOG.warn("IndexConsistencyPatch: failed", excp); + + throw (excp instanceof AtlasBaseException) ? (AtlasBaseException) excp : new AtlasBaseException("IndexConsistencyPatch failed", excp); } finally { LOG.info("IndexConsistencyPatch: Done!"); } diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java index 8077c5f9c..2dbbf7c50 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java @@ -26,6 +26,7 @@ import org.apache.atlas.repository.Constants; import org.apache.atlas.repository.graphdb.AtlasEdge; import org.apache.atlas.repository.graphdb.AtlasElement; import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasGraphManagement; import org.apache.atlas.repository.graphdb.AtlasVertex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -188,8 +189,10 @@ public class ReIndexPatch extends AtlasPatchHandler { private void attemptCommit() { for (String indexName : indexNames) { - try { - this.graph.getManagementSystem().reindex(indexName, list); + try (AtlasGraphManagement management = this.graph.getManagementSystem()) { + management.reindex(indexName, list); + + management.setIsSuccess(true); } catch (IllegalStateException e) { LOG.error("IllegalStateException: Exception", e); diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java index cd0137bc8..cf364a01d 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/UniqueAttributePatch.java @@ -134,9 +134,9 @@ public class UniqueAttributePatch extends AtlasPatchHandler { } private void createIndexForUniqueAttributes(String typeName, Collection<AtlasAttribute> attributes) { - try { - AtlasGraphManagement management = getGraph().getManagementSystem(); + boolean isSuccess = false; + try (AtlasGraphManagement management = getGraph().getManagementSystem()) { for (AtlasAttribute attribute : attributes) { String uniquePropertyName = attribute.getVertexUniquePropertyName(); @@ -160,12 +160,21 @@ public class UniqueAttributePatch extends AtlasPatchHandler { AtlasAttributeDef.IndexType.STRING.equals(attribute.getIndexType())); } - getIndexer().commit(management); - getGraph().commit(); + management.setIsSuccess(true); + + isSuccess = true; LOG.info("Unique attributes: type: {}: Registered!", typeName); - } catch (IndexException e) { + } catch (Exception e) { LOG.error("Error creating index: type: {}", typeName, e); + + isSuccess = false; + } finally { + if (isSuccess) { + getGraph().commit(); + } else { + getGraph().rollback(); + } } } diff --git a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java index ce551ae1f..f875a4d73 100644 --- a/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java +++ b/repository/src/main/java/org/apache/atlas/repository/patches/UpdateCompositeIndexStatusPatch.java @@ -47,25 +47,18 @@ public class UpdateCompositeIndexStatusPatch extends AtlasPatchHandler { return; } - AtlasGraphManagement management = context.getGraph().getManagementSystem(); - boolean isRollbackNeeded = true; - - try { + try (AtlasGraphManagement management = context.getGraph().getManagementSystem()) { LOG.info("UpdateCompositeIndexStatusPatch: Starting..."); management.updateSchemaStatus(); - isRollbackNeeded = false; - - management.commit(); + management.setIsSuccess(true); LOG.info("UpdateCompositeIndexStatusPatch: Done!"); - } finally { - if (isRollbackNeeded) { - LOG.warn("UpdateCompositeIndexStatusPatch: was not committed. Rolling back..."); + } catch (Exception excp) { + LOG.warn("UpdateCompositeIndexStatusPatch: failed", excp); - management.rollback(); - } + throw (excp instanceof AtlasBaseException) ? (AtlasBaseException) excp : new AtlasBaseException(excp); } setStatus(UNKNOWN); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java index 2eb92e2be..83f1635e7 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEnumDefStoreV2.java @@ -25,7 +25,6 @@ import org.apache.atlas.exception.AtlasBaseException; import org.apache.atlas.model.typedef.AtlasEnumDef; import org.apache.atlas.model.typedef.AtlasEnumDef.AtlasEnumElementDef; import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.IndexException; import org.apache.atlas.repository.graphdb.AtlasCardinality; import org.apache.atlas.repository.graphdb.AtlasGraphManagement; import org.apache.atlas.repository.graphdb.AtlasVertex; @@ -308,11 +307,7 @@ class AtlasEnumDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasEnumDef> { } private void createPropertyKeys(AtlasEnumDef enumDef) throws AtlasBaseException { - AtlasGraphManagement management = typeDefStore.atlasGraph.getManagementSystem(); - boolean isSuccess = false; - Exception err = null; - - try { + try (AtlasGraphManagement management = typeDefStore.atlasGraph.getManagementSystem()) { // create property keys first for (AtlasEnumElementDef element : enumDef.getElementDefs()) { // Validate the enum element @@ -331,29 +326,11 @@ class AtlasEnumDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasEnumDef> { createPropertyKey(encodePropertyKey(typeDefKey), Object.class, AtlasCardinality.SINGLE, management); createPropertyKey(encodePropertyKey(defaultValueKey), String.class, AtlasCardinality.SINGLE, management); - isSuccess = true; + management.setIsSuccess(true); } catch (Exception e) { - err = e; - } finally { - try { - if (isSuccess) { - management.commit(); - } else { - management.rollback(); - } - } catch (Exception e) { - if (err == null) { - err = new AtlasBaseException(new IndexException("Index " + (isSuccess ? "commit" : "rollback") + " failed", e)); - } else { - LOG.error("Index {} failed", (isSuccess ? "commit" : "rollback"), e); - } - } - } - - if (err != null) { - LOG.error("PropertyKey creation failed for enum {}", enumDef.getName(), err); + LOG.error("PropertyKey creation failed for enum {}", enumDef.getName(), e); - throw (err instanceof AtlasBaseException) ? (AtlasBaseException) err : new AtlasBaseException(err); + throw (e instanceof AtlasBaseException) ? (AtlasBaseException) e : new AtlasBaseException(e); } } } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java index 7ec68e707..a77840802 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasStructDefStoreV2.java @@ -28,7 +28,6 @@ import org.apache.atlas.model.typedef.AtlasStructDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasAttributeDef; import org.apache.atlas.model.typedef.AtlasStructDef.AtlasConstraintDef; import org.apache.atlas.repository.Constants; -import org.apache.atlas.repository.IndexException; import org.apache.atlas.repository.graphdb.AtlasCardinality; import org.apache.atlas.repository.graphdb.AtlasGraphManagement; import org.apache.atlas.repository.graphdb.AtlasVertex; @@ -646,11 +645,7 @@ public class AtlasStructDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasStructDe } private static void createPropertyKeys(AtlasStructDef structDef, AtlasTypeDefGraphStoreV2 typeDefStore) throws AtlasBaseException { - AtlasGraphManagement management = typeDefStore.atlasGraph.getManagementSystem(); - boolean isSuccess = false; - Exception err = null; - - try { + try (AtlasGraphManagement management = typeDefStore.atlasGraph.getManagementSystem()) { for (AtlasAttributeDef attributeDef : structDef.getAttributeDefs()) { // Validate the mandatory features of an attribute (compatibility with legacy type system) if (StringUtils.isEmpty(attributeDef.getName())) { @@ -670,29 +665,11 @@ public class AtlasStructDefStoreV2 extends AtlasAbstractDefStoreV2<AtlasStructDe createPropertyKey(AtlasGraphUtilsV2.encodePropertyKey(typeNamePropertyKey), Object.class, AtlasCardinality.SINGLE, management); - isSuccess = true; + management.setIsSuccess(true); } catch (Exception e) { - err = e; - } finally { - try { - if (isSuccess) { - management.commit(); - } else { - management.rollback(); - } - } catch (Exception e) { - if (err == null) { - err = new AtlasBaseException(new IndexException("Index " + (isSuccess ? "commit" : "rollback") + " failed", e)); - } else { - LOG.error("Index {} failed", (isSuccess ? "commit" : "rollback"), e); - } - } - } - - if (err != null) { - LOG.error("PropertyKey creation failed for structDef: {}", structDef, err); + LOG.error("PropertyKey creation failed for structDef: {}", structDef, e); - throw (err instanceof AtlasBaseException) ? (AtlasBaseException) err : new AtlasBaseException(err); + throw e instanceof AtlasBaseException ? (AtlasBaseException) e : new AtlasBaseException(e); } } } diff --git a/tools/atlas-index-repair/src/main/java/org/apache/atlas/tools/RepairIndex.java b/tools/atlas-index-repair/src/main/java/org/apache/atlas/tools/RepairIndex.java index 4d362cdbd..f3992ebff 100644 --- a/tools/atlas-index-repair/src/main/java/org/apache/atlas/tools/RepairIndex.java +++ b/tools/atlas-index-repair/src/main/java/org/apache/atlas/tools/RepairIndex.java @@ -143,12 +143,21 @@ public class RepairIndex { private void restoreAll() throws Exception { for (String indexName : getIndexes()) { displayCrlf("Restoring: " + indexName); - long startTime = System.currentTimeMillis(); - ManagementSystem mgmt = (ManagementSystem) graph.openManagement(); - JanusGraphIndex index = mgmt.getGraphIndex(indexName); - mgmt.updateIndex(index, SchemaAction.REINDEX).get(); - mgmt.commit(); + long startTime = System.currentTimeMillis(); + ManagementSystem mgmt = null; + + try { + mgmt = (ManagementSystem) graph.openManagement(); + + JanusGraphIndex index = mgmt.getGraphIndex(indexName); + + mgmt.updateIndex(index, SchemaAction.REINDEX).get(); + } finally { + if (mgmt != null) { + mgmt.commit(); + } + } ManagementSystem.awaitGraphIndexStatus(graph, indexName).status(SchemaStatus.ENABLED).call(); @@ -175,27 +184,37 @@ public class RepairIndex { } private static void reindexVertex(String indexName, IndexSerializer indexSerializer, Set<String> entityGUIDs) throws Exception { - Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new java.util.HashMap<>(); - ManagementSystem mgmt = (ManagementSystem) graph.openManagement(); - StandardJanusGraphTx tx = mgmt.getWrappedTx(); - BackendTransaction mutator = tx.getTxHandle(); - JanusGraphIndex index = mgmt.getGraphIndex(indexName); - MixedIndexType indexType = (MixedIndexType) mgmt.getSchemaVertex(index).asIndexType(); - - for (String entityGuid : entityGUIDs) { - for (int attemptCount = 1; attemptCount <= MAX_TRIES_ON_FAILURE; attemptCount++) { - AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(entityGuid); - try { - indexSerializer.reindexElement(vertex.getWrappedElement(), indexType, documentsPerStore); - break; - } catch (Exception e) { - displayCrlf("Exception: " + e.getMessage()); - displayCrlf("Pausing before retry.."); - Thread.sleep(2000 * attemptCount); + ManagementSystem mgmt = null; + + try { + mgmt = (ManagementSystem) graph.openManagement(); + + StandardJanusGraphTx tx = mgmt.getWrappedTx(); + BackendTransaction mutator = tx.getTxHandle(); + JanusGraphIndex index = mgmt.getGraphIndex(indexName); + MixedIndexType indexType = (MixedIndexType) mgmt.getSchemaVertex(index).asIndexType(); + + Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new java.util.HashMap<>(); + + for (String entityGuid : entityGUIDs) { + for (int attemptCount = 1; attemptCount <= MAX_TRIES_ON_FAILURE; attemptCount++) { + AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(entityGuid); + try { + indexSerializer.reindexElement(vertex.getWrappedElement(), indexType, documentsPerStore); + break; + } catch (Exception e) { + displayCrlf("Exception: " + e.getMessage()); + displayCrlf("Pausing before retry.."); + Thread.sleep(2000 * attemptCount); + } } } + mutator.getIndexTransaction(indexType.getBackingIndexName()).restore(documentsPerStore); + } finally { + if (mgmt != null) { + mgmt.commit(); + } } - mutator.getIndexTransaction(indexType.getBackingIndexName()).restore(documentsPerStore); } private static Set<String> getEntityAndReferenceGuids(String guid) throws Exception {
