This is an automated email from the ASF dual-hosted git repository.
pinal pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 3aebe0cb1 ATLAS-4842 : Export/Import: fetchType as incremental does
full export instead of connected
3aebe0cb1 is described below
commit 3aebe0cb1e1937a58f144b16b6616fe24e55f7eb
Author: priyanshi-shah26 <[email protected]>
AuthorDate: Fri Apr 5 14:06:55 2024 +0530
ATLAS-4842 : Export/Import: fetchType as incremental does full export
instead of connected
Signed-off-by: Pinal Shah <[email protected]>
---
.../atlas/repository/impexp/EntitiesExtractor.java | 8 ++-
.../atlas/repository/impexp/ExportService.java | 13 ++--
.../repository/impexp/ExportIncrementalTest.java | 73 +++++++++++++++++++++-
.../typedef-new-classification-T3.json | 23 +++++++
.../typesdef-new-classification-T2.json | 23 +++++++
5 files changed, 131 insertions(+), 9 deletions(-)
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
index da5cf37c4..4f603cb8b 100644
---
a/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/EntitiesExtractor.java
@@ -55,8 +55,12 @@ public class EntitiesExtractor {
if (context.isHiveDBIncrementalSkipLineage()) {
extractors.get(INCREMENTAL_EXTRACT).fullFetch(entity,
context);
break;
- } else if (context.isHiveTableIncrementalSkipLineage()) {
- extractors.get(INCREMENTAL_EXTRACT).connectedFetch(entity,
context);
+ } else if (context.isHiveTableIncremental()) {
+ if (context.skipLineage) {
+
extractors.get(INCREMENTAL_EXTRACT).connectedFetch(entity, context);
+ } else {
+ extractor.connectedFetch(entity, context);
+ }
break;
}
diff --git
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
index 65d7a1872..4615c6c2f 100644
---
a/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
+++
b/repository/src/main/java/org/apache/atlas/repository/impexp/ExportService.java
@@ -408,7 +408,7 @@ public class ExportService {
skipLineage = result.getRequest().getSkipLineageOptionValue();
this.changeMarker =
result.getRequest().getChangeTokenFromOptions();
this.isHiveDBIncremental =
checkHiveDBIncrementalSkipLineage(result.getRequest());
- this.isHiveTableIncremental =
checkHiveTableIncrementalSkipLineage(result.getRequest());
+ this.isHiveTableIncremental =
checkHiveTableIncremental(result.getRequest());
this.isSkipConnectedFetch = false;
}
@@ -422,14 +422,13 @@ public class ExportService {
request.getSkipLineageOptionValue();
}
- private boolean
checkHiveTableIncrementalSkipLineage(AtlasExportRequest request) {
- if(CollectionUtils.isEmpty(request.getItemsToExport())) {
+ private boolean checkHiveTableIncremental(AtlasExportRequest request) {
+ if (CollectionUtils.isEmpty(request.getItemsToExport())) {
return false;
}
return
request.getItemsToExport().get(0).getTypeName().equalsIgnoreCase(ATLAS_TYPE_HIVE_TABLE)
&&
-
request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL)
&&
- request.getSkipLineageOptionValue();
+
request.getFetchTypeOptionValue().equalsIgnoreCase(AtlasExportRequest.FETCH_TYPE_INCREMENTAL);
}
public List<AtlasEntity>
getEntitiesWithModifiedTimestamp(AtlasEntityWithExtInfo entityWithExtInfo) {
@@ -501,6 +500,10 @@ public class ExportService {
return isHiveTableIncremental;
}
+ public boolean isHiveTableIncremental() {
+ return isHiveTableIncremental;
+ }
+
public void addToEntityCreationOrder(String guid) {
entityCreationOrder.add(guid);
}
diff --git
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
index 0e3955dcd..bbdab3b63 100644
---
a/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
+++
b/repository/src/test/java/org/apache/atlas/repository/impexp/ExportIncrementalTest.java
@@ -62,6 +62,7 @@ import static
org.apache.atlas.repository.impexp.ZipFileResourceTestUtils.runImp
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
@Guice(modules = TestModules.TestOnlyModule.class)
public class ExportIncrementalTest extends AtlasTestBase {
@@ -83,22 +84,27 @@ public class ExportIncrementalTest extends AtlasTestBase {
private final String EXPORT_REQUEST_INCREMENTAL = "export-incremental";
private final String EXPORT_REQUEST_CONNECTED = "export-connected";
private AtlasClassificationType classificationTypeT1;
+ private AtlasClassificationType classificationTypeT2;
+ private AtlasClassificationType classificationTypeT3;
private long nextTimestamp;
private static final String EXPORT_INCREMENTAL = "incremental";
private static final String QUALIFIED_NAME_DB = "db_test_1@02052019";
private static final String QUALIFIED_NAME_TABLE_LINEAGE =
"db_test_1.test_tbl_ctas_2@02052019";
-
-
+ private static final String QUALIFIED_NAME_TABLE_2 =
"db_test_1.test_tbl_2@02052019";
private static final String GUID_DB =
"f0b72ab4-7452-4e42-ac74-2aee7728cce4";
private static final String GUID_TABLE_2 =
"8d0b834c-61ce-42d8-8f66-6fa51c36bccb";
private static final String GUID_TABLE_CTAS_2 =
"eaec545b-3ac7-4e1b-a497-bd4a2b6434a2";
+ private static final String GUID_HIVE_PROCESS =
"bd3138b2-f29e-4226-b859-de25eaa1c18b";
+ private static final String GUID_TABLE_1 =
"4d5adf00-2c9b-4877-ad23-c41fd7319150";
@BeforeClass
public void setup() throws IOException, AtlasBaseException {
basicSetup(typeDefStore, typeRegistry);
RequestContext.get().setImportInProgress(true);
classificationTypeT1 = createNewClassification();
+ classificationTypeT2 = createNewClassificationT2();
+ classificationTypeT3 = createNewClassificationT3();
createEntities(entityStore, ENTITIES_SUB_DIR, new String[] { "db",
"table-columns"});
final String[] entityGuids = {DB_GUID, TABLE_GUID};
@@ -163,6 +169,15 @@ public class ExportIncrementalTest extends AtlasTestBase {
return typeRegistry.getClassificationTypeByName("T1");
}
+ private AtlasClassificationType createNewClassificationT2() {
+ createTypes(typeDefStore,
ENTITIES_SUB_DIR,"typesdef-new-classification-T2");
+ return typeRegistry.getClassificationTypeByName("T2");
+ }
+ private AtlasClassificationType createNewClassificationT3() {
+ createTypes(typeDefStore,
ENTITIES_SUB_DIR,"typedef-new-classification-T3");
+ return typeRegistry.getClassificationTypeByName("T3");
+ }
+
@Test(dependsOnMethods =
"atT1_NewClassificationAttachedToTable_ReturnsChangedTable")
public void atT2_NewClassificationAttachedToColumn_ReturnsChangedColumn()
throws AtlasBaseException, IOException {
final int expectedEntityCount = 1;
@@ -245,6 +260,54 @@ public class ExportIncrementalTest extends AtlasTestBase {
verifyExpectedEntities(getFileNames(getZipSourceCopy(source)),
GUID_TABLE_CTAS_2);
}
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportTableIncrementalForParentEntity() throws
AtlasBaseException, IOException {
+ InputStream source = runExportWithParameters(exportService,
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, 0,
false));
+
+ ZipSource sourceCopy = getZipSourceCopy(source);
+ verifyExpectedEntities(getFileNames(sourceCopy), GUID_DB,
GUID_HIVE_PROCESS, GUID_TABLE_2, GUID_TABLE_CTAS_2);
+
+ source = runExportWithParameters(exportService,
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL, 0,
false));
+ verifyUnExpectedEntities(getFileNames(getZipSourceCopy(source)),
GUID_TABLE_1);
+
+ nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy);
+
+ try {
+ source = runExportWithParameters(exportService,
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL,
nextTimestamp, false));
+ } catch (SkipException e) {
+ throw e;
+ }
+
+ entityStore.addClassifications(GUID_TABLE_1,
ImmutableList.of(classificationTypeT2.createDefaultValue()));
+ entityStore.addClassifications(GUID_TABLE_2,
ImmutableList.of(classificationTypeT2.createDefaultValue()));
+
+ source = runExportWithParameters(exportService,
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL,
nextTimestamp, false));
+ verifyExpectedEntities(getFileNames(getZipSourceCopy(source)),
GUID_TABLE_2);
+
+ source = runExportWithParameters(exportService,
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_2, EXPORT_INCREMENTAL,
nextTimestamp, false));
+ verifyUnExpectedEntities(getFileNames(getZipSourceCopy(source)),
GUID_TABLE_1);
+
+ }
+
+ @Test(dependsOnMethods = "importHiveDb")
+ public void exportTableIncremental() throws AtlasBaseException,
IOException {
+ InputStream source = runExportWithParameters(exportService,
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL,
0, true));
+
+ ZipSource sourceCopy = getZipSourceCopy(source);
+ verifyExpectedEntities(getFileNames(sourceCopy), GUID_DB,
GUID_TABLE_CTAS_2);
+
+ nextTimestamp = updateTimesampForNextIncrementalExport(sourceCopy);
+
+ entityStore.addClassifications(GUID_TABLE_1,
ImmutableList.of(classificationTypeT3.createDefaultValue()));
+ entityStore.addClassifications(GUID_TABLE_CTAS_2,
ImmutableList.of(classificationTypeT3.createDefaultValue()));
+
+ source = runExportWithParameters(exportService,
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL,
nextTimestamp, false));
+ verifyExpectedEntities(getFileNames(getZipSourceCopy(source)),
GUID_TABLE_CTAS_2);
+
+ source = runExportWithParameters(exportService,
getExportRequestForHiveTable(QUALIFIED_NAME_TABLE_LINEAGE, EXPORT_INCREMENTAL,
nextTimestamp, false));
+ verifyUnExpectedEntities(getFileNames(getZipSourceCopy(source)),
GUID_TABLE_1);
+ }
+
private AtlasExportRequest getIncrementalRequest(long timestamp) {
try {
@@ -292,6 +355,12 @@ public class ExportIncrementalTest extends AtlasTestBase {
}
}
+ private void verifyUnExpectedEntities(List<String> fileNames, String...
guids){
+ for (String guid : guids) {
+ assertFalse(fileNames.contains(guid.toLowerCase()));
+ }
+ }
+
private List<String> getFileNames(ZipSource zipSource){
List<String> ret = new ArrayList<>();
assertTrue(zipSource.hasNext());
diff --git
a/repository/src/test/resources/json/stocksDB-Entities/typedef-new-classification-T3.json
b/repository/src/test/resources/json/stocksDB-Entities/typedef-new-classification-T3.json
new file mode 100644
index 000000000..534a0b39d
--- /dev/null
+++
b/repository/src/test/resources/json/stocksDB-Entities/typedef-new-classification-T3.json
@@ -0,0 +1,23 @@
+{
+ "classificationDefs": [
+ {
+ "category": "CLASSIFICATION",
+ "guid": "6hee5e9f-e703-447a-b23b-0b831dc8a933",
+ "createdBy": "admin",
+ "updatedBy": "admin",
+ "createTime": 1711991058600,
+ "updateTime": 1711991058600,
+ "version": 1,
+ "name": "T3",
+ "description": "T3",
+ "typeVersion": "1.0",
+ "attributeDefs": [],
+ "superTypes": [],
+ "entityTypes": [],
+ "subTypes": []
+ }
+ ],
+ "entityDefs": [],
+ "enumDefs": [],
+ "structDefs": []
+}
diff --git
a/repository/src/test/resources/json/stocksDB-Entities/typesdef-new-classification-T2.json
b/repository/src/test/resources/json/stocksDB-Entities/typesdef-new-classification-T2.json
new file mode 100644
index 000000000..967bec161
--- /dev/null
+++
b/repository/src/test/resources/json/stocksDB-Entities/typesdef-new-classification-T2.json
@@ -0,0 +1,23 @@
+{
+ "classificationDefs": [
+ {
+ "category": "CLASSIFICATION",
+ "guid": "2e135e9f-e703-447a-b23b-0b831dc8a933",
+ "createdBy": "admin",
+ "updatedBy": "admin",
+ "createTime": 1711991058474,
+ "updateTime": 1711991058474,
+ "version": 1,
+ "name": "T2",
+ "description": "T2",
+ "typeVersion": "1.0",
+ "attributeDefs": [],
+ "superTypes": [],
+ "entityTypes": [],
+ "subTypes": []
+ }
+ ],
+ "entityDefs": [],
+ "enumDefs": [],
+ "structDefs": []
+}