This is an automated email from the ASF dual-hosted git repository.
mandarambawane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new ab3257e7b ATLAS-5006 : Atlas incremental export takes too much time
(almost equivalent to bootstrap export with the entire data) (#312)
ab3257e7b is described below
commit ab3257e7ba6548099639132e37c954986e49ea03
Author: mandarambawane <[email protected]>
AuthorDate: Mon Apr 14 14:55:13 2025 +0530
ATLAS-5006 : Atlas incremental export takes too much time (almost
equivalent to bootstrap export with the entire data) (#312)
---
.../atlas/repository/graphdb/AtlasGraph.java | 8 +++
.../repository/graphdb/janus/AtlasJanusGraph.java | 12 ++++
.../atlas/repository/impexp/ExportService.java | 83 +++++++++++++++++++++-
.../store/graph/v2/EntityGraphRetriever.java | 4 ++
4 files changed, 105 insertions(+), 2 deletions(-)
diff --git
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
index c1f7109ab..1c3e94924 100644
---
a/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
+++
b/graphdb/api/src/main/java/org/apache/atlas/repository/graphdb/AtlasGraph.java
@@ -27,6 +27,7 @@ import javax.script.ScriptException;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -359,4 +360,11 @@ public interface AtlasGraph<V, E> {
* @throws AtlasException when error encountered in creating the client.
*/
AtlasGraphIndexClient getGraphIndexClient() throws AtlasException;
+
+ /**
+ *
+ * @param vertex
+ * @return
+ */
+ List<AtlasVertex> getAllEdgesVertices(AtlasVertex vertex);
}
diff --git
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
index eeca84f54..fb29e4cec 100644
---
a/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
+++
b/graphdb/janus/src/main/java/org/apache/atlas/repository/graphdb/janus/AtlasJanusGraph.java
@@ -78,6 +78,7 @@ import javax.script.ScriptException;
import java.io.IOException;
import java.io.OutputStream;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
@@ -419,6 +420,17 @@ public class AtlasJanusGraph implements
AtlasGraph<AtlasJanusVertex, AtlasJanusE
return this.janusGraph;
}
+ @Override
+ public List<AtlasVertex> getAllEdgesVertices(AtlasVertex vertex) {
+ GraphTraversal gt = V(vertex.getId()).both();
+ List<AtlasVertex> resultList = new ArrayList<>();
+ while (gt.hasNext()) {
+ Vertex v = (Vertex) gt.next();
+ resultList.add(GraphDbObjectFactory.createVertex(this, v));
+ }
+ return resultList;
+ }
+
public Iterable<AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>>
wrapVertices(Iterable<? extends Vertex> it) {
return Iterables.transform(it, (Function<Vertex,
AtlasVertex<AtlasJanusVertex, AtlasJanusEdge>>) input ->
GraphDbObjectFactory.createVertex(AtlasJanusGraph.this, input));
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 54b5aba97..8ccc63c1b 100644
---
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -33,7 +33,10 @@ import org.apache.atlas.model.typedef.AtlasEnumDef;
import org.apache.atlas.model.typedef.AtlasRelationshipDef;
import org.apache.atlas.model.typedef.AtlasStructDef;
import org.apache.atlas.model.typedef.AtlasTypesDef;
+import org.apache.atlas.repository.graph.GraphHelper;
import org.apache.atlas.repository.graphdb.AtlasGraph;
+import org.apache.atlas.repository.graphdb.AtlasVertex;
+import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever;
import org.apache.atlas.repository.util.UniqueList;
import org.apache.atlas.type.AtlasTypeRegistry;
@@ -56,6 +59,8 @@ import java.util.Set;
import static
org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_CONNECTED;
import static org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_FULL;
import static
org.apache.atlas.model.impexp.AtlasExportRequest.FETCH_TYPE_INCREMENTAL;
+import static org.apache.atlas.repository.Constants.GUID_PROPERTY_KEY;
+import static
org.apache.atlas.repository.Constants.MODIFICATION_TIMESTAMP_PROPERTY_KEY;
@Component
public class ExportService {
@@ -69,6 +74,7 @@ public class ExportService {
private final GlossaryService glossaryService;
private final AuditsWriter auditsWriter;
private ExportTypeProcessor exportTypeProcessor;
+ private static final String ATLAS_TYPE_HIVE_DB =
"hive_db";
@Inject
public ExportService(final AtlasTypeRegistry typeRegistry, AtlasGraph
graph, AuditsWriter auditsWriter, HdfsPathEntityCreator hdfsPathEntityCreator,
GlossaryService glossaryService) {
@@ -239,6 +245,9 @@ public class ExportService {
entitiesExtractor.setExtractor(typeRegistry.getEntityDefByName(item.getTypeName()));
for (String guid : entityGuids) {
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid);
+ String typeName = GraphHelper.getTypeName(vertex);
+ context.startingEntityType = typeName;
processEntityGuid(guid, context);
}
@@ -283,13 +292,72 @@ public class ExportService {
return;
}
- AtlasEntityWithExtInfo entityWithExtInfo =
entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
+ if (context.fetchType == ExportFetchType.INCREMENTAL &&
context.startingEntityType.equals(ATLAS_TYPE_HIVE_DB) && !context.skipLineage) {
+ AtlasVertex vertex = AtlasGraphUtilsV2.findByGuid(guid);
- processEntity(entityWithExtInfo, context);
+ processVertex(context, vertex, guid);
+ } else {
+ AtlasEntityWithExtInfo entityWithExtInfo =
entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
+
+ processEntity(entityWithExtInfo, context);
+ }
LOG.debug("<== processEntityGuid({})", guid);
}
+ public void processVertex(ExportContext context, AtlasVertex vertex,
String guid) throws AtlasBaseException {
+ if (MapUtils.isNotEmpty(context.termsGlossary)) {
+ addGlossaryEntities(context);
+ }
+
+ addVertex(vertex, guid, context);
+
+ context.guidsProcessed.add(guid);
+
+ extractConnectedVertices(vertex, context);
+ }
+
+ public void extractConnectedVertices(AtlasVertex vertex, ExportContext
context) {
+ List<AtlasVertex> connectedVertices =
entityGraphRetriever.findAllConnectedVertices(vertex);
+
+ if (CollectionUtils.isNotEmpty(connectedVertices)) {
+ for (AtlasVertex e : connectedVertices) {
+ String typeName = GraphHelper.getTypeName(e);
+
+ if (typeRegistry.getEntityTypeByName(typeName) != null) {
+ String guid = AtlasGraphUtilsV2.getEncodedProperty(e,
GUID_PROPERTY_KEY, String.class);
+
+ if (!context.guidsProcessed.contains(guid)) {
+ context.guidsToProcess.add(guid);
+ }
+ }
+ }
+ }
+ }
+
+ private void addVertex(AtlasVertex vertex, String guid, ExportContext
context) throws AtlasBaseException {
+ if (context.sink.hasEntity(guid)) {
+ return;
+ }
+
+ LOG.info("export: Guid in process: {}", guid);
+ if (context.doesTimestampQualify(vertex)) {
+ AtlasEntityWithExtInfo entityWithExtInfo =
entityGraphRetriever.toAtlasEntityWithExtInfo(guid);
+ exportTypeProcessor.addTypes(entityWithExtInfo.getEntity(),
context);
+ context.addToSink(entityWithExtInfo);
+
+ context.result.incrementMeticsCounter(String.format("entity:%s",
entityWithExtInfo.getEntity().getTypeName()));
+ if (entityWithExtInfo.getReferredEntities() != null) {
+ for (AtlasEntity e :
entityWithExtInfo.getReferredEntities().values()) {
+
context.result.incrementMeticsCounter(String.format("entity:%s",
e.getTypeName()));
+ }
+ }
+
+ context.result.incrementMeticsCounter("entity:withExtInfo");
+ }
+ context.reportProgress();
+ }
+
private void addGlossaryEntities(ExportContext context) {
try {
for (String termGuid : context.termsGlossary.keySet()) {
@@ -403,6 +471,7 @@ public class ExportService {
boolean isSkipConnectedFetch;
private int progressReportCount;
+ public String startingEntityType;
ExportContext(AtlasExportResult result, ZipSink sink) {
this.result = result;
@@ -442,6 +511,7 @@ public class ExportService {
guidsToProcess.clear();
guidsProcessed.clear();
guidDirection.clear();
+ startingEntityType = null;
}
public void addToBeProcessed(boolean isSuperTypeProcess, String guid,
TraversalDirection direction) {
@@ -470,6 +540,15 @@ public class ExportService {
return changeMarker <= entity.getUpdateTime().getTime();
}
+ public boolean doesTimestampQualify(AtlasVertex vertex) {
+ if (fetchType != ExportFetchType.INCREMENTAL) {
+ return true;
+ }
+
+ Long updatedTime = AtlasGraphUtilsV2.getEncodedProperty(vertex,
MODIFICATION_TIMESTAMP_PROPERTY_KEY, Long.class);
+ return changeMarker <= updatedTime;
+ }
+
public boolean getSkipLineage() {
return skipLineage;
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
index 7f98da30e..a6aa1695f 100644
---
a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
+++
b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphRetriever.java
@@ -482,6 +482,10 @@ public class EntityGraphRetriever {
return ret;
}
+ public List<AtlasVertex> findAllConnectedVertices(AtlasVertex vertex) {
+ return graph.getAllEdgesVertices(vertex);
+ }
+
public Map<String, Object> getEntityUniqueAttribute(AtlasVertex
entityVertex) throws AtlasBaseException {
Map<String, Object> ret = null;
String typeName =
AtlasGraphUtilsV2.getTypeName(entityVertex);