This is an automated email from the ASF dual-hosted git repository.
amestry pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new dc9e268 ATLAS-4015: Add Re-indexing as JAVA_PATCH.
dc9e268 is described below
commit dc9e268966fa27df400e827fd6e57a5098d7d478
Author: Ashutosh Mestry <[email protected]>
AuthorDate: Tue Nov 10 09:47:00 2020 -0800
ATLAS-4015: Add Re-indexing as JAVA_PATCH.
---
.../repository/graphdb/AtlasGraphManagement.java | 8 +
.../graphdb/janus/AtlasJanusGraphManagement.java | 64 +++++++
.../java/org/apache/atlas/AtlasConfiguration.java | 3 +-
.../repository/patches/AtlasPatchManager.java | 3 +-
.../patches/ConcurrentPatchProcessor.java | 6 +-
.../atlas/repository/patches/ReIndexPatch.java | 199 +++++++++++++++++++++
6 files changed, 278 insertions(+), 5 deletions(-)
diff --git
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
index f7d2e27..7e3b2f4 100644
---
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
+++
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraphManagement.java
@@ -172,4 +172,12 @@ public interface AtlasGraphManagement {
* Set consistency to ConsistencyModifier.LOCK for all vertex and edge
indexes.
*/
void updateUniqueIndexesForConsistencyLock();
+
+ /***
+ * Re-index elements.
+ * @param indexName: Name of the index that needs to be operated on.
+ * @param elements: Elements to be re-indexed.
+ * @throws Exception
+ */
+ void reindex(String indexName, List<AtlasElement> elements) throws
Exception;
}
diff --git
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
index 2a2ef92..1cc7f8b 100644
---
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
+++
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraphManagement.java
@@ -19,14 +19,22 @@ package org.apache.atlas.repository.graphdb.janus;
import com.google.common.base.Preconditions;
import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.exception.AtlasBaseException;
import org.apache.atlas.repository.graphdb.AtlasEdgeDirection;
+import org.apache.atlas.repository.graphdb.AtlasElement;
import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.janusgraph.core.Cardinality;
import org.janusgraph.core.EdgeLabel;
+import org.janusgraph.core.JanusGraphElement;
import org.janusgraph.core.PropertyKey;
import org.janusgraph.core.schema.*;
import org.janusgraph.core.schema.JanusGraphManagement.IndexBuilder;
+import org.janusgraph.diskstorage.BackendTransaction;
+import org.janusgraph.diskstorage.indexing.IndexEntry;
+import org.janusgraph.graphdb.database.IndexSerializer;
+import org.janusgraph.graphdb.database.StandardJanusGraph;
+import org.janusgraph.graphdb.database.management.ManagementSystem;
import org.janusgraph.graphdb.internal.Token;
import org.apache.atlas.repository.graphdb.AtlasCardinality;
import org.apache.atlas.repository.graphdb.AtlasEdgeLabel;
@@ -36,11 +44,16 @@ import org.apache.atlas.repository.graphdb.AtlasPropertyKey;
import org.apache.commons.lang.StringUtils;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.janusgraph.graphdb.transaction.StandardJanusGraphTx;
+import org.janusgraph.graphdb.types.IndexType;
+import org.janusgraph.graphdb.types.MixedIndexType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
@@ -308,4 +321,55 @@ public class AtlasJanusGraphManagement implements
AtlasGraphManagement {
LOG.info("setConsistency: {}: {}: Done!",
elementType.getSimpleName(), count);
}
}
+
+ @Override
+ public void reindex(String indexName, List<AtlasElement> elements) throws
Exception {
+ try {
+ JanusGraphIndex index = management.getGraphIndex(indexName);
+ if (index == null || !(management instanceof ManagementSystem) ||
!(graph.getGraph() instanceof StandardJanusGraph)) {
+ LOG.error("Could not retrieve index for name: {} ", indexName);
+ return;
+ }
+
+ ManagementSystem managementSystem = (ManagementSystem) management;
+ IndexType indexType =
managementSystem.getSchemaVertex(index).asIndexType();
+ if (!(indexType instanceof MixedIndexType)) {
+ LOG.warn("Index: {}: Not of MixedIndexType ", indexName);
+ return;
+ }
+
+ IndexSerializer indexSerializer = ((StandardJanusGraph)
graph.getGraph()).getIndexSerializer();
+ reindexElement(managementSystem, indexSerializer, (MixedIndexType)
indexType, elements);
+ } catch (Exception exception) {
+ throw exception;
+ } finally {
+ management.commit();
+ }
+ }
+
+ private void reindexElement(ManagementSystem managementSystem,
IndexSerializer indexSerializer, MixedIndexType indexType, List<AtlasElement>
elements) throws Exception {
+ Map<String, Map<String, List<IndexEntry>>> documentsPerStore = new
HashMap<>();
+ StandardJanusGraphTx tx = managementSystem.getWrappedTx();
+ BackendTransaction txHandle = tx.getTxHandle();
+
+ try {
+ JanusGraphElement janusGraphElement = null;
+ for (AtlasElement element : elements) {
+ try {
+ if (element == null || element.getWrappedElement() ==
null) {
+ continue;
+ }
+
+ janusGraphElement = element.getWrappedElement();
+ indexSerializer.reindexElement(janusGraphElement,
indexType, documentsPerStore);
+ } catch (Exception e) {
+ LOG.warn("{}: Exception: {}:{}", indexType.getName(),
e.getClass().getSimpleName(), e.getMessage());
+ }
+ }
+ } finally {
+ if (txHandle != null) {
+
txHandle.getIndexTransaction(indexType.getBackingIndexName()).restore(documentsPerStore);
+ }
+ }
+ }
}
diff --git a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
index 1c79158..2d3dfdf 100644
--- a/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
+++ b/intg/src/main/java/org/apache/atlas/AtlasConfiguration.java
@@ -73,7 +73,8 @@ public enum AtlasConfiguration {
LINEAGE_USING_GREMLIN("atlas.lineage.query.use.gremlin", false),
HTTP_HEADER_SERVER_VALUE("atlas.http.header.server.value","Apache Atlas"),
-
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled",
true);
+
STORAGE_CONSISTENCY_LOCK_ENABLED("atlas.graph.storage.consistency-lock.enabled",
true),
+ REINDEX_PATCH_ENABLED("atlas.patch.reindex.enabled", false);
private static final Configuration APPLICATION_PROPERTIES;
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
index b142a2a..478376b 100644
---
a/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/AtlasPatchManager.java
@@ -54,7 +54,8 @@ public class AtlasPatchManager {
new ClassificationTextPatch(context),
new FreeTextRequestHandlerPatch(context),
new SuggestionsRequestHandlerPatch(context),
- new IndexConsistencyPatch(context)
+ new IndexConsistencyPatch(context),
+ new ReIndexPatch(context)
};
try {
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
index c6f0e64..e5dcb2e 100644
---
a/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/ConcurrentPatchProcessor.java
@@ -43,8 +43,8 @@ public abstract class ConcurrentPatchProcessor {
private static final String BATCH_SIZE_PROPERTY = "atlas.patch.batchSize";
private static final String ATLAS_SOLR_SHARDS = "ATLAS_SOLR_SHARDS";
private static final String WORKER_NAME_PREFIX = "patchWorkItem";
- private static final int NUM_WORKERS;
- private static final int BATCH_SIZE;
+ public static final int NUM_WORKERS;
+ public static final int BATCH_SIZE;
private final EntityGraphMapper entityGraphMapper;
private final AtlasGraph graph;
@@ -61,7 +61,7 @@ public abstract class ConcurrentPatchProcessor {
numWorkers = config.getInt(NUM_WORKERS_PROPERTY,
config.getInt(ATLAS_SOLR_SHARDS, 1) * 3);
batchSize = config.getInt(BATCH_SIZE_PROPERTY, 300);
- LOG.info("UniqueAttributePatch: {}={}, {}={}",
NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
+ LOG.info("ConcurrentPatchProcessor: {}={}, {}={}",
NUM_WORKERS_PROPERTY, numWorkers, BATCH_SIZE_PROPERTY, batchSize);
} catch (Exception e) {
LOG.error("Error retrieving configuration.", e);
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java
b/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java
new file mode 100644
index 0000000..a47a2cc
--- /dev/null
+++
b/repository/src/main/java/org/apache/atlas/repository/patches/ReIndexPatch.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.atlas.repository.patches;
+
+import org.apache.atlas.AtlasConfiguration;
+import org.apache.atlas.exception.AtlasBaseException;
+import org.apache.atlas.pc.WorkItemBuilder;
+import org.apache.atlas.pc.WorkItemConsumer;
+import org.apache.atlas.pc.WorkItemManager;
+import org.apache.atlas.repository.Constants;
+import org.apache.atlas.repository.graphdb.AtlasElement;
+import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+
+import static org.apache.atlas.model.patches.AtlasPatch.PatchStatus.UNKNOWN;
+
+public class ReIndexPatch extends AtlasPatchHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(ReIndexPatch.class);
+
+ private static final String PATCH_ID = "JAVA_PATCH_0000_006";
+ private static final String PATCH_DESCRIPTION = "Performs reindex on all
the indexes.";
+
+ private final PatchContext context;
+
+ public ReIndexPatch(PatchContext context) {
+ super(context.getPatchRegistry(), PATCH_ID, PATCH_DESCRIPTION);
+ this.context = context;
+ }
+
+ @Override
+ public void apply() throws AtlasBaseException {
+ if (AtlasConfiguration.REINDEX_PATCH_ENABLED.getBoolean() == false) {
+ return;
+ }
+
+ try {
+ LOG.info("ReIndexPatch: Starting...");
+ ReindexPatchProcessor reindexPatchProcessor = new
ReindexPatchProcessor(context);
+
+ reindexPatchProcessor.repairVertices();
+ reindexPatchProcessor.repairEdges();
+ } catch (Exception exception) {
+ LOG.error("Error while reindexing.", exception);
+ } finally {
+ LOG.info("ReIndexPatch: Done!");
+ }
+
+ setStatus(UNKNOWN);
+
+ LOG.info("ReIndexPatch.apply(): patchId={}, status={}", getPatchId(),
getStatus());
+ }
+
+ public static class ReindexPatchProcessor {
+ private static String[] vertexIndexNames = new String[]{
Constants.VERTEX_INDEX, Constants.FULLTEXT_INDEX };
+ private static String[] edgeIndexNames = new String[]{
Constants.EDGE_INDEX };
+ private static String WORKER_PREFIX = "reindex";
+
+ private PatchContext context;
+
+ public ReindexPatchProcessor(PatchContext context) {
+ this.context = context;
+ }
+
+ public void repairVertices() {
+ repairElements(ReindexPatchProcessor::vertices, vertexIndexNames);
+ }
+
+ public void repairEdges() {
+ repairElements(ReindexPatchProcessor::edges, edgeIndexNames);
+ }
+
+ private void repairElements(BiConsumer<WorkItemManager, AtlasGraph>
action, String[] indexNames) {
+ WorkItemManager manager = new WorkItemManager(new
ReindexConsumerBuilder(context.getGraph(), indexNames),
+ WORKER_PREFIX, ConcurrentPatchProcessor.BATCH_SIZE,
ConcurrentPatchProcessor.NUM_WORKERS, false);
+
+ try {
+ LOG.info("repairElements.execute(): {}: Starting...",
indexNames);
+ action.accept(manager, context.getGraph());
+ manager.drain();
+ } finally {
+ try {
+ manager.shutdown();
+ } catch (InterruptedException e) {
+ LOG.error("repairEdges.execute(): interrupted during
WorkItemManager shutdown.", e);
+ }
+
+ LOG.info("repairElements.execute(): {}: Done!", indexNames);
+ }
+ }
+
+ private static void edges(WorkItemManager manager, AtlasGraph graph) {
+ Iterable<Object> iterable = graph.getEdges();
+ for (Iterator<Object> iter = iterable.iterator(); iter.hasNext();
) {
+ manager.checkProduce(iter.next());
+ }
+ }
+
+ private static void vertices(WorkItemManager manager, AtlasGraph
graph) {
+ Iterable<AtlasVertex> iterable = graph.getVertices();
+ for (Iterator<AtlasVertex> iter = iterable.iterator();
iter.hasNext(); ) {
+ AtlasVertex vertex = iter.next();
+ manager.checkProduce(vertex);
+ }
+ }
+ }
+
+ private static class ReindexConsumerBuilder implements
WorkItemBuilder<ReindexConsumer, AtlasElement> {
+ private AtlasGraph graph;
+ private String[] indexNames;
+
+ public ReindexConsumerBuilder(AtlasGraph graph, String[] indexNames) {
+ this.graph = graph;
+ this.indexNames = indexNames;
+ }
+
+ @Override
+ public ReindexConsumer build(BlockingQueue queue) {
+ return new ReindexConsumer(queue, this.graph, this.indexNames);
+ }
+ }
+
+ private static class ReindexConsumer extends
WorkItemConsumer<AtlasElement> {
+ private List<AtlasElement> list = new ArrayList();
+ private AtlasGraph graph;
+ private String[] indexNames;
+ private final AtomicLong counter;
+
+ public ReindexConsumer(BlockingQueue queue, AtlasGraph graph, String[]
indexNames) {
+ super(queue);
+ this.graph = graph;
+ this.indexNames = indexNames;
+ this.counter = new AtomicLong(0);
+ }
+
+ @Override
+ protected void doCommit() {
+ if (list.size() >= ConcurrentPatchProcessor.BATCH_SIZE) {
+ attemptCommit();
+ }
+ }
+
+ @Override
+ protected void commitDirty() {
+ attemptCommit();
+
+ LOG.info("Total: Commit: {}", counter.get());
+ super.commitDirty();
+ }
+
+ private void attemptCommit() {
+ for (String indexName : indexNames) {
+ try {
+ this.graph.getManagementSystem().reindex(indexName, list);
+ }
+ catch (IllegalStateException e) {
+ LOG.error("IllegalStateException: Exception", e);
+ return;
+ }
+ catch (Exception exception) {
+ LOG.error("Exception: {}", indexName, exception);
+ }
+ }
+
+ list.clear();
+ LOG.info("Processed: {}", counter.get());
+ }
+
+ @Override
+ protected void processItem(AtlasElement item) {
+ counter.incrementAndGet();
+ list.add(item);
+ commit();
+ }
+ }
+}