This is an automated email from the ASF dual-hosted git repository.
mandarambawane pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 18d7f9dcc ATLAS-5069:Improve Unit Test Coverage for Impala-bridge
Module (#422)
18d7f9dcc is described below
commit 18d7f9dccf5658988d32e387339948286810f0a8
Author: prashantdev88 <[email protected]>
AuthorDate: Mon Sep 1 13:04:51 2025 +0530
ATLAS-5069:Improve Unit Test Coverage for Impala-bridge Module (#422)
---
.../apache/atlas/impala/ImpalaLineageToolTest.java | 436 +++++++++++++++
.../impala/hook/AtlasImpalaHookContextTest.java | 262 +++++++++
.../impala/hook/ImpalaOperationParserTest.java | 92 ++++
.../impala/hook/events/BaseImpalaEventTest.java | 601 +++++++++++++++++++++
.../hook/events/CreateImpalaProcessTest.java | 457 ++++++++++++++++
5 files changed, 1848 insertions(+)
diff --git
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolTest.java
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolTest.java
new file mode 100644
index 000000000..51ae1b002
--- /dev/null
+++
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/ImpalaLineageToolTest.java
@@ -0,0 +1,436 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+**/
+package org.apache.atlas.impala;
+
+import org.apache.atlas.impala.hook.ImpalaLineageHook;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Comparator;
+
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+ import static org.testng.Assert.*;
+
+public class ImpalaLineageToolTest {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ImpalaLineageToolTest.class);
+
+ @Mock
+ private ImpalaLineageHook mockImpalaLineageHook;
+
+ private Path tempDir;
+ private final PrintStream originalOut = System.out;
+ private ByteArrayOutputStream testOut;
+
+ @BeforeMethod
+ public void setUp() throws IOException {
+ MockitoAnnotations.initMocks(this);
+
+ tempDir = Files.createTempDirectory("impala-lineage-test");
+
+ testOut = new ByteArrayOutputStream();
+ System.setOut(new PrintStream(testOut));
+ }
+
+ @AfterMethod
+ public void tearDown() throws IOException {
+
+ if (tempDir != null && Files.exists(tempDir)) {
+ Files.walk(tempDir)
+ .sorted(Comparator.reverseOrder())
+ .map(Path::toFile)
+ .forEach(File::delete);
+ }
+
+ System.setOut(originalOut);
+ }
+
+ @Test
+ public void testConstructorWithValidArguments() {
+ String[] args = {"-d", "/test/directory", "-p", "test_prefix"};
+
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ assertNotNull(tool);
+ }
+
+ @Test(expectedExceptions = RuntimeException.class)
+ public void testConstructorWithInvalidArguments() {
+ String[] args = {"-invalid", "argument"};
+
+ new ImpalaLineageTool(args);
+ }
+
+ @Test
+ public void testGetCurrentFilesWithNoFiles() throws IOException {
+ String[] args = {"-d", tempDir.toString(), "-p", "nonexistent"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ File[] files = tool.getCurrentFiles();
+
+ assertNotNull(files);
+ assertEquals(files.length, 0);
+ }
+
+ @Test
+ public void testGetCurrentFilesWithSingleFile() throws IOException {
+
+ File testFile = new File(tempDir.toFile(), "test_lineage.log");
+ testFile.createNewFile();
+
+ String[] args = {"-d", tempDir.toString(), "-p", "test"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ File[] files = tool.getCurrentFiles();
+
+ assertNotNull(files);
+ assertEquals(files.length, 1);
+ assertEquals(files[0].getName(), "test_lineage.log");
+ }
+
+ @Test
+ public void testGetCurrentFilesWithMultipleFilesSorted() throws
IOException {
+
+ File file1 = new File(tempDir.toFile(), "test_file1.log");
+ File file2 = new File(tempDir.toFile(), "test_file2.log");
+ File file3 = new File(tempDir.toFile(), "test_file3.log");
+
+ file1.createNewFile();
+ file2.createNewFile();
+ file3.createNewFile();
+
+
+ long baseTime = System.currentTimeMillis();
+ file1.setLastModified(baseTime - 3000); // 3 seconds ago
+ file2.setLastModified(baseTime - 2000); // 2 seconds ago
+ file3.setLastModified(baseTime - 1000); // 1 second ago
+
+ String[] args = {"-d", tempDir.toString(), "-p", "test"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ File[] files = tool.getCurrentFiles();
+
+ assertNotNull(files);
+ assertEquals(files.length, 3);
+
+ assertTrue(files[0].lastModified() <= files[1].lastModified());
+ assertTrue(files[1].lastModified() <= files[2].lastModified());
+ }
+
+ @Test
+ public void testGetCurrentFilesWithNonExistentDirectory() {
+ String[] args = {"-d", "/nonexistent/directory", "-p", "test"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ File[] files = tool.getCurrentFiles();
+
+ assertNotNull(files);
+ assertEquals(files.length, 0);
+ }
+
+ @Test
+ public void testDeleteLineageAndWalSuccess() throws IOException {
+ File lineageFile = new File(tempDir.toFile(), "test_lineage.log");
+ File walFile = new File(tempDir.toFile(), "test_wal.wal");
+
+ lineageFile.createNewFile();
+ walFile.createNewFile();
+
+ assertTrue(lineageFile.exists());
+ assertTrue(walFile.exists());
+
+ ImpalaLineageTool.deleteLineageAndWal(lineageFile,
walFile.getAbsolutePath());
+
+ assertFalse(lineageFile.exists());
+ assertFalse(walFile.exists());
+ }
+
+ @Test
+ public void testDeleteLineageAndWalNonExistentFiles() {
+ File nonExistentFile = new File(tempDir.toFile(), "nonexistent.log");
+ String nonExistentWalPath =
tempDir.resolve("nonexistent.wal").toString();
+
+ ImpalaLineageTool.deleteLineageAndWal(nonExistentFile,
nonExistentWalPath);
+ }
+
+ @Test
+ public void testImportHImpalaEntitiesWithNewWalFile() throws Exception {
+
+ File lineageFile = new File(tempDir.toFile(), "test_lineage.log");
+ String testContent = "test lineage content";
+
+ try (FileWriter writer = new FileWriter(lineageFile)) {
+ writer.write(testContent);
+ }
+
+ String walPath = tempDir.resolve("test.wal").toString();
+
+ String[] args = {"-d", tempDir.toString(), "-p", "test"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+
+ doNothing().when(mockImpalaLineageHook).process(anyString());
+
+ tool.importHImpalaEntities(mockImpalaLineageHook,
lineageFile.getAbsolutePath(), walPath);
+
+ verify(mockImpalaLineageHook, times(1)).process(testContent);
+
+ File walFile = new File(walPath);
+ assertTrue(walFile.exists());
+ }
+
+ @Test
+ public void testImportHImpalaEntitiesWithExistingWalFile() throws
Exception {
+
+ File lineageFile = new File(tempDir.toFile(), "test_lineage.log");
+ String testContent = "test lineage content";
+
+ try (FileWriter writer = new FileWriter(lineageFile)) {
+ writer.write(testContent);
+ }
+
+
+ String walPath = tempDir.resolve("test.wal").toString();
+ File walFile = new File(walPath);
+ try (FileWriter writer = new FileWriter(walFile)) {
+ writer.write("0, "+ lineageFile.getAbsolutePath());
+ }
+
+ String[] args = {"-d", tempDir.toString(), "-p", "test"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ doNothing().when(mockImpalaLineageHook).process(anyString());
+
+ tool.importHImpalaEntities(mockImpalaLineageHook,
lineageFile.getAbsolutePath(), walPath);
+
+ verify(mockImpalaLineageHook, times(1)).process(testContent);
+ }
+
+ @Test
+ public void testImportHImpalaEntitiesWithProcessingFailure() throws
Exception {
+
+ File lineageFile = new File(tempDir.toFile(), "test_lineage.log");
+ String testContent = "test lineage content";
+
+ try (FileWriter writer = new FileWriter(lineageFile)) {
+ writer.write(testContent);
+ }
+
+ String walPath = tempDir.resolve("test.wal").toString();
+
+ String[] args = {"-d", tempDir.toString(), "-p", "test"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ doThrow(new RuntimeException("Processing
failed")).when(mockImpalaLineageHook).process(anyString());
+
+ tool.importHImpalaEntities(mockImpalaLineageHook,
lineageFile.getAbsolutePath(), walPath);
+
+ verify(mockImpalaLineageHook, times(1)).process(testContent);
+
+ File walFile = new File(walPath);
+ assertTrue(walFile.exists());
+ String walContent = new String(Files.readAllBytes(walFile.toPath()));
+ assertEquals(walContent.trim(), "0, "+ lineageFile.getAbsolutePath());
+ }
+
+
+ @Test
+ public void testMainWithIncorrectNumberOfArguments() {
+ String[] args = {"-d", tempDir.toString()};
+
+ ImpalaLineageTool.main(args);
+
+ String output = testOut.toString();
+ assertTrue(output.contains("wrong number of arguments"));
+ assertTrue(output.contains("Usage: import-impala.sh"));
+ }
+
+
+ @Test
+ public void testRunWithMultipleFiles() throws IOException {
+ // Create multiple test files
+ File file1 = new File(tempDir.toFile(), "test_file1.log");
+ File file2 = new File(tempDir.toFile(), "test_file2.log");
+ File file3 = new File(tempDir.toFile(), "test_file3.log");
+
+ file1.createNewFile();
+ try (FileWriter writer = new FileWriter(file1)) {
+ writer.write("content1");
+ }
+
+ file2.createNewFile();
+ try (FileWriter writer = new FileWriter(file2)) {
+ writer.write("content2");
+ }
+
+ file3.createNewFile();
+ try (FileWriter writer = new FileWriter(file3)) {
+ writer.write("content3");
+ }
+
+ long baseTime = System.currentTimeMillis();
+ file1.setLastModified(baseTime - 3000);
+ file2.setLastModified(baseTime - 2000);
+ file3.setLastModified(baseTime - 1000);
+
+ String[] args = {"-d", tempDir.toString(), "-p", "test"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ try {
+ tool.run();
+ } catch (Exception e) {
+ LOG.info("Error running test case
ImpalaLineageToolTest.testRunWithMultipleFiles()");
+ }
+
+ assertFalse(file1.exists());
+ assertFalse(file2.exists());
+ assertTrue(file3.exists());
+ }
+
+ @Test
+ public void testRunWithNoFiles() {
+ String[] args = {"-d", tempDir.toString(), "-p", "nonexistent"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ tool.run();
+ }
+
+ @Test
+ public void testRunWithSingleFile() throws IOException {
+ File testFile = new File(tempDir.toFile(), "test_single.log");
+ testFile.createNewFile();
+ try (FileWriter writer = new FileWriter(testFile)) {
+ writer.write("single file content");
+ }
+
+ String[] args = {"-d", tempDir.toString(), "-p", "test"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ try {
+ tool.run();
+ } catch (Exception e) {
+ LOG.info("Error running test case
ImpalaLineageToolTest.testRunWithSingleFile()");
+ }
+
+ assertTrue(testFile.exists());
+ }
+
+ @Test
+ public void testProcessImpalaLineageHookSuccess() throws Exception {
+ String[] args = {"-d", tempDir.toString(), "-p", "test"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ java.lang.reflect.Method method =
ImpalaLineageTool.class.getDeclaredMethod(
+ "processImpalaLineageHook",
+ ImpalaLineageHook.class,
+ java.util.List.class
+ );
+ method.setAccessible(true);
+
+ doNothing().when(mockImpalaLineageHook).process(anyString());
+
+ java.util.List<String> lineageList = java.util.Arrays.asList("query1",
"query2", "query3");
+ boolean result = (Boolean) method.invoke(tool, mockImpalaLineageHook,
lineageList);
+
+ assertTrue(result);
+ verify(mockImpalaLineageHook, times(3)).process(anyString());
+ }
+
+ @Test
+ public void testProcessImpalaLineageHookWithFailures() throws Exception {
+ String[] args = {"-d", tempDir.toString(), "-p", "test"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ // Use reflection to access the private method
+ java.lang.reflect.Method method =
ImpalaLineageTool.class.getDeclaredMethod(
+ "processImpalaLineageHook",
+ ImpalaLineageHook.class,
+ java.util.List.class
+ );
+ method.setAccessible(true);
+
+ doNothing().when(mockImpalaLineageHook).process("query1");
+ doThrow(new RuntimeException("Processing
failed")).when(mockImpalaLineageHook).process("query2");
+ doNothing().when(mockImpalaLineageHook).process("query3");
+
+ java.util.List<String> lineageList = java.util.Arrays.asList("query1",
"query2", "query3");
+ boolean result = (Boolean) method.invoke(tool, mockImpalaLineageHook,
lineageList);
+
+ assertFalse(result);
+ verify(mockImpalaLineageHook, times(3)).process(anyString());
+ }
+
+ @Test
+ public void testProcessImpalaLineageHookWithEmptyList() throws Exception {
+ String[] args = {"-d", tempDir.toString(), "-p", "test"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ java.lang.reflect.Method method =
ImpalaLineageTool.class.getDeclaredMethod(
+ "processImpalaLineageHook",
+ ImpalaLineageHook.class,
+ java.util.List.class
+ );
+ method.setAccessible(true);
+
+ java.util.List<String> emptyList = java.util.Collections.emptyList();
+ boolean result = (Boolean) method.invoke(tool, mockImpalaLineageHook,
emptyList);
+
+ assertTrue(result);
+ verify(mockImpalaLineageHook, never()).process(anyString());
+ }
+
+ @Test
+ public void testGetCurrentFilesWithMultipleFilesSortedByModificationTime()
throws IOException, InterruptedException {
+ // Create multiple test files and ensure different modification times
+ File file1 = new File(tempDir.toFile(), "test_old.log");
+ file1.createNewFile();
+ file1.setLastModified(System.currentTimeMillis() - 3000); // 3 seconds
ago
+
+ File file2 = new File(tempDir.toFile(), "test_medium.log");
+ file2.createNewFile();
+ file2.setLastModified(System.currentTimeMillis() - 2000); // 2 seconds
ago
+
+ File file3 = new File(tempDir.toFile(), "test_new.log");
+ file3.createNewFile();
+ file3.setLastModified(System.currentTimeMillis() - 1000); // 1 second
ago
+
+ String[] args = {"-d", tempDir.toString(), "-p", "test"};
+ ImpalaLineageTool tool = new ImpalaLineageTool(args);
+
+ File[] files = tool.getCurrentFiles();
+
+ assertNotNull(files);
+ assertEquals(files.length, 3);
+ assertEquals(files[0].getName(), "test_old.log");
+ assertEquals(files[1].getName(), "test_medium.log");
+ assertEquals(files[2].getName(), "test_new.log");
+ }
+
+}
diff --git
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/AtlasImpalaHookContextTest.java
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/AtlasImpalaHookContextTest.java
new file mode 100644
index 000000000..386ff0108
--- /dev/null
+++
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/AtlasImpalaHookContextTest.java
@@ -0,0 +1,262 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.atlas.impala.hook;
+
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.impala.model.LineageVertexMetadata;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertThrows;
+
+public class AtlasImpalaHookContextTest {
+
+
+ @Mock
+ private ImpalaLineageHook impalaLineageHook;
+
+ @Mock
+ private ImpalaOperationType impalaOperationType;
+
+ @Mock
+ private ImpalaQuery impalaQuery;
+
+
+ @BeforeMethod
+ public void initializeMocks() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+
+ @Test
+ public void testGetQualifiedNameForTableWithFullTableName() throws
Exception {
+
+ String database = "testDatabase";
+ String table = "testTable";
+ String metadataNamespace = "testNamespace";
+ String expectedTableQualifiedName = (database +
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + table +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
metadataNamespace;
+ String fullTableName =
database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table;
+
+
when(impalaLineageHook.getMetadataNamespace()).thenReturn(metadataNamespace);
+
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+ String receivedTableQualifiedName =
impalaHook.getQualifiedNameForTable(fullTableName);
+ assertEquals(expectedTableQualifiedName,receivedTableQualifiedName);
+ }
+
+ @Test
+ public void testGetQualifiedNameForTableWithNullTableName() throws
Exception {
+
+ String fullTableName = null;
+
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+ assertThrows(IllegalArgumentException.class, () ->
impalaHook.getQualifiedNameForTable(fullTableName));
+ }
+
+ @Test
+ public void testGetQualifiedNameForTableWithPartialTableName() throws
Exception {
+
+ String tableName = "testTableName";
+
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+ assertThrows(IllegalArgumentException.class, () ->
impalaHook.getQualifiedNameForTable(tableName));
+ }
+
+ @Test
+ public void
testGetQualifiedNameForColumnUsingLineageVertexAndLineageVertexMetadata()
throws Exception {
+
+ String database = "testDatabase";
+ String table = "testTable";
+ String column = "testColumn";
+ String metadataNamespace = "testNamespace";
+ String fullTableName =
database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table;
+ String fullColumnName =
database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column;
+ LineageVertex vertex = new LineageVertex();
+ LineageVertexMetadata metadata = new LineageVertexMetadata();
+
+ String expectedColumnQualifiedName = (database +
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + table +
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME +
+ column +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
metadataNamespace;
+
+ metadata.setTableName(fullTableName);
+ vertex.setMetadata(metadata);
+ vertex.setVertexId(fullColumnName);
+
+ when(impalaLineageHook.getMetadataNamespace())
+ .thenReturn(metadataNamespace);
+
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+ String receivedColumnQualifiedName =
impalaHook.getQualifiedNameForColumn(vertex);
+ assertEquals(expectedColumnQualifiedName,receivedColumnQualifiedName);
+ }
+
+
+
+ @Test
+ public void
testGetQualifiedNameForColumnUsingLineageVertexAndLineageVertexMetadataAsNull()
throws Exception {
+
+ String database = "testDatabase";
+ String table = "testTable";
+ String column = "testColumn";
+ String metadataNamespace = "testNamespace";
+ String fullColumnName =
database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column;
+ LineageVertex vertex = new LineageVertex();
+
+ String expectedColumnQualifiedName = (database +
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + table +
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME +
+ column +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
metadataNamespace;
+
+ vertex.setMetadata(null);
+ vertex.setVertexId(fullColumnName);
+
+ when(impalaLineageHook.getMetadataNamespace())
+ .thenReturn(metadataNamespace);
+
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+ String receivedColumnQualifiedName =
impalaHook.getQualifiedNameForColumn(vertex);
+ assertEquals(expectedColumnQualifiedName,receivedColumnQualifiedName);
+ }
+
+ @Test
+ public void
testGetQualifiedNameForColumnUsingLineageVertexAndLineageVertexMetadataTableAsNull()
throws Exception {
+
+ String database = "testDatabase";
+ String table = "testTable";
+ String column = "testColumn";
+ LineageVertex vertex = new LineageVertex();
+ LineageVertexMetadata metadata = new LineageVertexMetadata();
+
+ vertex.setMetadata(metadata);
+
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+ assertThrows(IllegalArgumentException.class, () ->
impalaHook.getQualifiedNameForColumn(vertex));
+ }
+
+ @Test
+ public void testGetQualifiedNameForColumn() throws Exception {
+
+ String database = "testDatabase";
+ String table = "testTable";
+ String column = "testColumn";
+ String metadataNamespace = "testNamespace";
+ String expectedColumnQualifiedName = (database +
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME + table +
AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME +
+ column +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
metadataNamespace;
+ String fullColumnName =
database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column;
+
+ when(impalaLineageHook.getMetadataNamespace())
+ .thenReturn(metadataNamespace);
+
+
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+ String receivedColumnQualifiedName =
impalaHook.getQualifiedNameForColumn(fullColumnName);
+ assertEquals(expectedColumnQualifiedName,receivedColumnQualifiedName);
+ }
+
+
+ @Test
+ public void testGetTableNameFromColumn() throws Exception {
+
+ String table = "testTable";
+ String column = "testColumn";
+ String expectedTableName = table;
+ String fullTableName =
table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column;
+
+
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+ String receivedTableName =
impalaHook.getTableNameFromColumn(fullTableName);
+ assertEquals(expectedTableName,receivedTableName);
+ }
+
+ @Test
+ public void testGetDatabaseNameFromTable() throws Exception {
+
+ String table = "testTable";
+ String database = "testDatabase";
+ String expectedDatabaseName = database;
+ String fullTableName =
database+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+table;
+
+
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+ String receivedDatabaseName =
impalaHook.getDatabaseNameFromTable(fullTableName);
+ assertEquals(expectedDatabaseName,receivedDatabaseName);
+ }
+
+
+ @Test
+ public void testGetColumnNameOnlyWithFullCOlumnName() throws Exception {
+
+ String table = "testTable";
+ String column = "testColumn";
+ String expectedColumnName = column;
+ String fullColumnName =
table+AtlasImpalaHookContext.QNAME_SEP_ENTITY_NAME+column;
+
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+ String receivedColumnName =
impalaHook.getColumnNameOnly(fullColumnName);
+ assertEquals(expectedColumnName,receivedColumnName);
+ }
+
+
+ @Test
+ public void testGetColumnNameOnlyWithNullValue() throws Exception {
+ String fullColumnName = null;
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+ assertThrows(IllegalArgumentException.class, () ->
impalaHook.getColumnNameOnly(fullColumnName));
+ }
+
+ @Test
+ public void testGetColumnNameOnlyWithPartialColumnName() throws Exception {
+
+ String table = "testTable";
+ String column = "testColumn";
+ String expectedColumnName = column;
+ String columnName = column;
+
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+ String receivedColumnName = impalaHook.getColumnNameOnly(columnName);
+ assertEquals(expectedColumnName,receivedColumnName);
+ }
+
+ @Test
+ public void testGetQualifiedNameForDb() throws Exception {
+
+ String database = "testDatabase";
+ String metadataNamespace = "testNamespace";
+ String expectedDatabaseName = (database +
AtlasImpalaHookContext.QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
metadataNamespace;
+ AtlasImpalaHookContext impalaHook = new
AtlasImpalaHookContext(impalaLineageHook, impalaOperationType, impalaQuery);
+
+
when(impalaLineageHook.getMetadataNamespace()).thenReturn(metadataNamespace);
+
+ String receivedDatabaseName =
impalaHook.getQualifiedNameForDb(database);
+ assertEquals(expectedDatabaseName,receivedDatabaseName);
+ }
+
+
+}
diff --git
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaOperationParserTest.java
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaOperationParserTest.java
new file mode 100644
index 000000000..f1662f7c8
--- /dev/null
+++
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/ImpalaOperationParserTest.java
@@ -0,0 +1,92 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.atlas.impala.hook;
+
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.testng.annotations.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ImpalaOperationParserTest {
+
+ @Test
+ public void testGetImpalaOperationTypeCREATEVIEW() {
+ String viewQuery = "create view test_view as select * from test_table";
+ ImpalaOperationType operationType =
ImpalaOperationParser.getImpalaOperationType(viewQuery);
+ assertEquals(ImpalaOperationType.CREATEVIEW,operationType);
+ }
+
+ @Test
+ public void testGetImpalaOperationTypeCREATETABLE_AS_SELECT() {
+ String selectAsQuery = "create table test_table as select * from
test_table_1";
+ ImpalaOperationType operationType =
ImpalaOperationParser.getImpalaOperationType(selectAsQuery);
+ assertEquals(ImpalaOperationType.CREATETABLE_AS_SELECT,operationType);
+ }
+
+ @Test
+ public void testGetImpalaOperationTypeALTERVIEW_AS() {
+ String alterViewQuery = "ALTER VIEW test_view AS SELECT * FROM
test_table_1;";
+ ImpalaOperationType operationType =
ImpalaOperationParser.getImpalaOperationType(alterViewQuery);
+ assertEquals(ImpalaOperationType.ALTERVIEW_AS,operationType);
+ }
+
+ @Test
+ public void testGetImpalaOperationTypeQUERY() {
+ String query = "INSERT INTO test_table SELECT * FROM
test_source_table;";
+ ImpalaOperationType operationType =
ImpalaOperationParser.getImpalaOperationType(query);
+ assertEquals(ImpalaOperationType.QUERY,operationType);
+ }
+
+
+ @Test
+ public void testGetImpalaOperationTypeQUERY_WITH_CLAUSE() {
+ String queryWithClause = "WITH test_table_2 AS (SELECT id FROM
test_table)INSERT INTO test_table_1 SELECT id FROM test_table_2";
+ ImpalaOperationType operationType =
ImpalaOperationParser.getImpalaOperationType(queryWithClause);
+ assertEquals(ImpalaOperationType.QUERY_WITH_CLAUSE,operationType);
+ }
+
+ @Test
+ public void testGetImpalaOperationTypeUNKNOWN() {
+ String unknowQuery = "SELECT * from test_table";
+ ImpalaOperationType operationType =
ImpalaOperationParser.getImpalaOperationType(unknowQuery);
+ assertEquals(ImpalaOperationType.UNKNOWN,operationType);
+ }
+
+ @Test
+ public void testGetImpalaOperationSubTypeWithInvalidQuery() {
+ String unknowQuery = "SELECT * from test_table";
+ ImpalaOperationType operationType =
ImpalaOperationParser.getImpalaOperationSubType(ImpalaOperationType.QUERY_WITH_CLAUSE,unknowQuery);
+ assertEquals(ImpalaOperationType.UNKNOWN,operationType);
+ }
+
+ @Test
+ public void testGetImpalaOperationSubTypeINSERT() {
+ String query = "INSERT INTO test_table SELECT * FROM
test_source_table;";
+ ImpalaOperationType operationType =
ImpalaOperationParser.getImpalaOperationSubType(ImpalaOperationType.QUERY,query);
+ assertEquals(ImpalaOperationType.INSERT,operationType);
+ }
+
+ @Test
+ public void testGetImpalaOperationSubTypeINSERT_OVERWRITE() {
+ String query = "INSERT OVERWRITE TABLE test_table\n" +
+ "SELECT region, SUM(amount) AS test_table_1\n" +
+ "FROM test_table_2\n" +
+ "GROUP BY region;\n";
+ ImpalaOperationType operationType =
ImpalaOperationParser.getImpalaOperationSubType(ImpalaOperationType.QUERY,query);
+ assertEquals(ImpalaOperationType.INSERT_OVERWRITE,operationType);
+ }
+}
diff --git
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/BaseImpalaEventTest.java
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/BaseImpalaEventTest.java
new file mode 100644
index 000000000..0d5c4fc91
--- /dev/null
+++
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/BaseImpalaEventTest.java
@@ -0,0 +1,601 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.atlas.impala.hook.events;
+
+import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.model.ImpalaDataType;
+import org.apache.atlas.impala.model.ImpalaNode;
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.impala.model.ImpalaVertexType;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.impala.model.LineageVertexMetadata;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.instance.AtlasObjectId;
+import org.apache.atlas.model.notification.HookNotification;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.atlas.impala.hook.events.BaseImpalaEvent.*;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+
+public class BaseImpalaEventTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseImpalaEventTest.class);
+
+ @Mock
+ private AtlasImpalaHookContext mockContext;
+
+ @Mock
+ private ImpalaQuery mockQuery;
+
+ private static final String CLUSTER_NAME = "test_cluster";
+ private static final String DB_NAME = "test_database";
+ private static final String TABLE_NAME = "test_table";
+ private static final String COLUMN_NAME = "test_column";
+ private static final String USER_NAME = "test_user";
+ private static final String HOST_NAME = "test_host";
+ private static final String QUERY_TEXT = "SELECT * FROM test_table";
+ private static final String QUERY_ID = "test_query_123";
+ private static final long TIMESTAMP = 1554750072L;
+ private static final long END_TIME = 1554750554L;
+
+ private static class TestBaseImpalaEvent extends BaseImpalaEvent {
+ public TestBaseImpalaEvent(AtlasImpalaHookContext context) {
+ super(context);
+ }
+
+ @Override
+ public List<HookNotification> getNotificationMessages() throws
Exception {
+ return Collections.emptyList();
+ }
+ }
+
+ private TestBaseImpalaEvent baseImpalaEvent;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ when(mockContext.getUserName()).thenReturn(USER_NAME);
+ when(mockContext.getHostName()).thenReturn(HOST_NAME);
+ when(mockContext.getMetadataNamespace()).thenReturn(CLUSTER_NAME);
+ when(mockContext.getQueryStr()).thenReturn(QUERY_TEXT);
+
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY);
+ when(mockContext.getLineageQuery()).thenReturn(mockQuery);
+
+ when(mockQuery.getQueryId()).thenReturn(QUERY_ID);
+ when(mockQuery.getTimestamp()).thenReturn(TIMESTAMP);
+ when(mockQuery.getEndTime()).thenReturn(END_TIME);
+
+ setupQualifiedNameMocks();
+
+ baseImpalaEvent = new TestBaseImpalaEvent(mockContext);
+ }
+
+ private void setupQualifiedNameMocks() {
+
when(mockContext.getQualifiedNameForDb(anyString())).thenAnswer(invocation -> {
+ String dbName = invocation.getArgument(0);
+ return dbName.toLowerCase() +"@"+ CLUSTER_NAME;
+ });
+
+
when(mockContext.getQualifiedNameForTable(anyString())).thenAnswer(invocation
-> {
+ String tableQualifiedName = invocation.getArgument(0);
+ return tableQualifiedName.toLowerCase() +"@"+ CLUSTER_NAME;
+ });
+
+
when(mockContext.getQualifiedNameForColumn(any(LineageVertex.class))).thenAnswer(invocation
-> {
+ LineageVertex vertex = invocation.getArgument(0);
+ return vertex.getVertexId().toLowerCase() +"@"+ CLUSTER_NAME;
+ });
+
+
when(mockContext.getDatabaseNameFromTable(anyString())).thenAnswer(invocation
-> {
+ String tableName = invocation.getArgument(0);
+ if (tableName.contains(".")) {
+ return tableName.split("\\.")[0];
+ }
+ return DB_NAME;
+ });
+
+ when(mockContext.getColumnNameOnly(anyString())).thenAnswer(invocation
-> {
+ String columnName = invocation.getArgument(0);
+ if (columnName.contains(".")) {
+ String[] parts = columnName.split("\\.");
+ return parts[parts.length - 1];
+ }
+ return columnName;
+ });
+
+
when(mockContext.getTableNameFromColumn(anyString())).thenAnswer(invocation -> {
+ String columnName = invocation.getArgument(0);
+ if (columnName.contains(".") && columnName.split("\\.").length >=
2) {
+ String[] parts = columnName.split("\\.");
+ return parts[0] +"."+ parts[1];
+ }
+ return DB_NAME +"."+ TABLE_NAME;
+ });
+ }
+
+ @Test
+ public void testGetUserName() {
+ assertEquals(baseImpalaEvent.getUserName(), USER_NAME);
+ }
+
+ @Test
+ public void testGetQualifiedNameWithImpalaNode() throws Exception {
+ // Create a test ImpalaNode with database vertex
+ LineageVertex dbVertex = createDatabaseVertex(DB_NAME);
+ ImpalaNode dbNode = new ImpalaNode(dbVertex);
+
+ String qualifiedName = baseImpalaEvent.getQualifiedName(dbNode);
+
+ assertNotNull(qualifiedName);
+ assertEquals(qualifiedName, DB_NAME.toLowerCase() +"@"+
CLUSTER_NAME);
+ }
+
+ @Test
+ public void testGetQualifiedNameWithLineageVertex() throws Exception {
+ // Test database vertex
+ LineageVertex dbVertex = createDatabaseVertex(DB_NAME);
+ String dbQualifiedName = baseImpalaEvent.getQualifiedName(dbVertex);
+ assertEquals(dbQualifiedName, DB_NAME.toLowerCase() +"@"+
CLUSTER_NAME);
+
+ // Test table vertex
+ LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME);
+ String tableQualifiedName =
baseImpalaEvent.getQualifiedName(tableVertex);
+ assertEquals(tableQualifiedName, DB_NAME+"."+TABLE_NAME.toLowerCase()
+"@"+ CLUSTER_NAME);
+
+ // Test column vertex
+ LineageVertex columnVertex = createColumnVertex(COLUMN_NAME,
TABLE_NAME);
+ String columnQualifiedName =
baseImpalaEvent.getQualifiedName(columnVertex);
+ System.out.println("columnQualifiedName : "+columnQualifiedName);
+ assertEquals(columnQualifiedName,
DB_NAME+"."+TABLE_NAME+"."+COLUMN_NAME.toLowerCase() +"@"+ CLUSTER_NAME);
+ }
+
+ @Test
+ public void testGetQualifiedNameWithNullVertex() {
+ try {
+ baseImpalaEvent.getQualifiedName((LineageVertex) null);
+ fail("Expected IllegalArgumentException for null vertex");
+ } catch (IllegalArgumentException e) {
+ assertEquals(e.getMessage(), "node is null");
+ }
+ }
+
+ @Test
+ public void testGetQualifiedNameWithNullVertexType() {
+ LineageVertex vertex = new LineageVertex();
+ vertex.setVertexId("test");
+ vertex.setVertexType(null);
+
+ String qualifiedName = baseImpalaEvent.getQualifiedName(vertex);
+ assertNull(qualifiedName);
+ }
+
+ @Test
+ public void testGetQualifiedNameWithNullVertexId() {
+ LineageVertex vertex = new LineageVertex();
+ vertex.setVertexType(ImpalaVertexType.DATABASE);
+ vertex.setVertexId(null);
+
+ String qualifiedName = baseImpalaEvent.getQualifiedName(vertex);
+ assertNull(qualifiedName);
+ }
+
+ @Test
+ public void testGetTableNameFromVertex() {
+ // Test with column vertex that has metadata
+ LineageVertex columnVertex = createColumnVertex(COLUMN_NAME,
TABLE_NAME);
+ LineageVertexMetadata metadata = new LineageVertexMetadata();
+ metadata.setTableName(TABLE_NAME);
+ columnVertex.setMetadata(metadata);
+
+ String tableName =
baseImpalaEvent.getTableNameFromVertex(columnVertex);
+ assertEquals(tableName, TABLE_NAME);
+
+ // Test with non-column vertex
+ LineageVertex dbVertex = createDatabaseVertex(DB_NAME);
+ String tableNameFromDb =
baseImpalaEvent.getTableNameFromVertex(dbVertex);
+ assertEquals(tableNameFromDb, DB_NAME +"."+ TABLE_NAME);
+ }
+
+ @Test
+ public void testGetTableNameFromColumn() {
+ String columnName = DB_NAME +"."+ TABLE_NAME +"."+ COLUMN_NAME;
+ String tableName = baseImpalaEvent.getTableNameFromColumn(columnName);
+ assertEquals(tableName, DB_NAME +"."+ TABLE_NAME);
+ }
+
+ @Test
+ public void testToDbEntityWithString() throws Exception {
+
+ when(mockContext.getEntity(anyString())).thenReturn(null);
+
+ AtlasEntity dbEntity = baseImpalaEvent.toDbEntity(DB_NAME);
+
+ assertNotNull(dbEntity);
+ assertEquals(dbEntity.getTypeName(), HIVE_TYPE_DB);
+ assertEquals(dbEntity.getAttribute(ATTRIBUTE_NAME),
DB_NAME.toLowerCase());
+ assertEquals(dbEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME),
DB_NAME.toLowerCase() +"@"+ CLUSTER_NAME);
+ assertEquals(dbEntity.getAttribute(ATTRIBUTE_CLUSTER_NAME),
CLUSTER_NAME);
+ assertNull(dbEntity.getGuid());
+ }
+
+
+ @Test
+ public void testGetColumnEntities() throws Exception {
+
+ LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME);
+ tableVertex.setId(1L);
+ tableVertex.setCreateTime(TIMESTAMP);
+ ImpalaNode tableNode = new ImpalaNode(tableVertex);
+ tableNode.addChild(tableVertex);
+ AtlasEntity entity = new AtlasEntity(HIVE_TYPE_COLUMN);
+ entity.setGuid("test-guid");
+ entity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.qualified.name");
+
+ AtlasObjectId objectId = BaseImpalaEvent.getObjectId(entity);
+
+ when(mockContext.getEntity(anyString())).thenReturn(null);
+ when(mockContext.getUserName()).thenReturn(USER_NAME);
+
+ List<AtlasEntity> entityList =
baseImpalaEvent.getColumnEntities(objectId ,tableNode);
+
+ assertNotNull(entityList.get(0));
+ assertEquals(entityList.get(0).getTypeName(), HIVE_TYPE_COLUMN);
+ assertEquals(entityList.get(0).getAttribute(ATTRIBUTE_TABLE),objectId);
+ }
+
+ @Test
+ public void testToTableEntity() throws Exception {
+
+ LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME);
+ tableVertex.setId(1L);
+ tableVertex.setCreateTime(TIMESTAMP);
+ ImpalaNode tableNode = new ImpalaNode(tableVertex);
+ tableNode.addChild(tableVertex);
+ AtlasEntity entity = new AtlasEntity(HIVE_TYPE_TABLE);
+ entity.setGuid("test-guid");
+
+ AtlasObjectId objectId = BaseImpalaEvent.getObjectId(entity);
+
+ when(mockContext.getEntity(anyString())).thenReturn(null);
+ when(mockContext.getUserName()).thenReturn(USER_NAME);
+
+ AtlasEntity atlasEntity = baseImpalaEvent.toTableEntity(objectId
,tableNode,null);
+
+
+ assertEquals(atlasEntity.getTypeName(),HIVE_TYPE_TABLE);
+
assertEquals(atlasEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME),DB_NAME+"."+TABLE_NAME+"@"+CLUSTER_NAME);
+ }
+
+
+
+ @Test
+ public void testToDbEntityWithCachedEntity() throws Exception {
+ // Setup entity cache to return existing entity
+ AtlasEntity cachedEntity = new AtlasEntity(HIVE_TYPE_DB);
+ cachedEntity.setAttribute(ATTRIBUTE_NAME, DB_NAME);
+ when(mockContext.getEntity(anyString())).thenReturn(cachedEntity);
+
+ AtlasEntity dbEntity = baseImpalaEvent.toDbEntity(DB_NAME);
+
+ assertNotNull(dbEntity);
+ assertEquals(dbEntity, cachedEntity);
+ }
+
+
+ @Test
+ public void testToTableEntityWithNullNode() {
+ try {
+ baseImpalaEvent.toTableEntity((ImpalaNode) null,
(AtlasEntitiesWithExtInfo) null);
+ fail("Expected IllegalArgumentException for null table node");
+ } catch (Exception e) {
+ assertTrue(e instanceof IllegalArgumentException);
+ assertTrue(e.getMessage().contains("table is null"));
+ }
+ }
+
+ @Test
+ public void testGetObjectId() {
+ AtlasEntity entity = new AtlasEntity(HIVE_TYPE_TABLE);
+ entity.setGuid("test-guid");
+ entity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.qualified.name");
+
+ AtlasObjectId objectId = BaseImpalaEvent.getObjectId(entity);
+
+ assertNotNull(objectId);
+ assertEquals(objectId.getGuid(), "test-guid");
+ assertEquals(objectId.getTypeName(), HIVE_TYPE_TABLE);
+
assertEquals(objectId.getUniqueAttributes().get(ATTRIBUTE_QUALIFIED_NAME),
"test.qualified.name");
+ }
+
+ @Test
+ public void testGetObjectIds() {
+ List<AtlasEntity> entities = new ArrayList<>();
+
+ AtlasEntity entity1 = new AtlasEntity(HIVE_TYPE_TABLE);
+ entity1.setGuid("test-guid-1");
+ entity1.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.table1");
+ entities.add(entity1);
+
+ AtlasEntity entity2 = new AtlasEntity(HIVE_TYPE_COLUMN);
+ entity2.setGuid("test-guid-2");
+ entity2.setAttribute(ATTRIBUTE_QUALIFIED_NAME, "test.column1");
+ entities.add(entity2);
+
+ List<AtlasObjectId> objectIds = BaseImpalaEvent.getObjectIds(entities);
+
+ assertNotNull(objectIds);
+ assertEquals(objectIds.size(), 2);
+ assertEquals(objectIds.get(0).getGuid(), "test-guid-1");
+ assertEquals(objectIds.get(1).getGuid(), "test-guid-2");
+ }
+
+ @Test
+ public void testGetObjectIdsWithEmptyList() {
+ List<AtlasObjectId> objectIds =
BaseImpalaEvent.getObjectIds(Collections.emptyList());
+ assertNotNull(objectIds);
+ assertTrue(objectIds.isEmpty());
+ }
+
+ @Test
+ public void testGetTableCreateTimeWithNode() {
+ LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME);
+ tableVertex.setCreateTime(TIMESTAMP);
+ ImpalaNode tableNode = new ImpalaNode(tableVertex);
+
+ long createTime = BaseImpalaEvent.getTableCreateTime(tableNode);
+ assertEquals(createTime, TIMESTAMP * MILLIS_CONVERT_FACTOR);
+ }
+
+ @Test
+ public void testGetTableCreateTimeWithVertex() {
+ LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME);
+ tableVertex.setCreateTime(TIMESTAMP);
+
+ long createTime = BaseImpalaEvent.getTableCreateTime(tableVertex);
+ assertEquals(createTime, TIMESTAMP * MILLIS_CONVERT_FACTOR);
+ }
+
+ @Test
+ public void testGetTableCreateTimeWithNullTime() {
+ LineageVertex tableVertex = createTableVertex(DB_NAME+"."+TABLE_NAME);
+ tableVertex.setCreateTime(null);
+
+ long createTime = BaseImpalaEvent.getTableCreateTime(tableVertex);
+ assertTrue(createTime > 0);
+ }
+
+ @Test
+ public void testGetImpalaProcessEntity() throws Exception {
+ List<AtlasEntity> inputs = createMockEntities("input_table");
+ List<AtlasEntity> outputs = createMockEntities("output_table");
+
+ AtlasEntity processEntity =
baseImpalaEvent.getImpalaProcessEntity(inputs, outputs);
+
+ assertNotNull(processEntity);
+ assertEquals(processEntity.getTypeName(),
ImpalaDataType.IMPALA_PROCESS.getName());
+ assertNotNull(processEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ assertNotNull(processEntity.getAttribute(ATTRIBUTE_INPUTS));
+ assertNotNull(processEntity.getAttribute(ATTRIBUTE_OUTPUTS));
+ assertEquals(processEntity.getAttribute(ATTRIBUTE_OPERATION_TYPE),
ImpalaOperationType.QUERY);
+ assertEquals(processEntity.getAttribute(ATTRIBUTE_START_TIME),
TIMESTAMP * MILLIS_CONVERT_FACTOR);
+ assertEquals(processEntity.getAttribute(ATTRIBUTE_END_TIME), END_TIME
* MILLIS_CONVERT_FACTOR);
+ assertEquals(processEntity.getAttribute(ATTRIBUTE_USER_NAME),
EMPTY_ATTRIBUTE_VALUE);
+ assertEquals(processEntity.getAttribute(ATTRIBUTE_QUERY_TEXT),
EMPTY_ATTRIBUTE_VALUE);
+ }
+
+ @Test
+ public void testGetImpalaProcessExecutionEntity() throws Exception {
+ List<AtlasEntity> inputs = createMockEntities("input_table");
+ List<AtlasEntity> outputs = createMockEntities("output_table");
+ AtlasEntity processEntity =
baseImpalaEvent.getImpalaProcessEntity(inputs, outputs);
+
+ AtlasEntity executionEntity =
baseImpalaEvent.getImpalaProcessExecutionEntity(processEntity);
+
+ assertNotNull(executionEntity);
+ assertEquals(executionEntity.getTypeName(),
ImpalaDataType.IMPALA_PROCESS_EXECUTION.getName());
+ assertNotNull(executionEntity.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+ assertEquals(executionEntity.getAttribute(ATTRIBUTE_START_TIME),
TIMESTAMP * MILLIS_CONVERT_FACTOR);
+ assertEquals(executionEntity.getAttribute(ATTRIBUTE_END_TIME),
END_TIME * MILLIS_CONVERT_FACTOR);
+ assertEquals(executionEntity.getAttribute(ATTRIBUTE_USER_NAME),
USER_NAME);
+ assertEquals(executionEntity.getAttribute(ATTRIBUTE_QUERY_TEXT),
QUERY_TEXT.toLowerCase().trim());
+ assertEquals(executionEntity.getAttribute(ATTRIBUTE_QUERY_ID),
QUERY_ID);
+ assertEquals(executionEntity.getAttribute(ATTRIBUTE_HOSTNAME),
HOST_NAME);
+ }
+
+ @Test
+ public void testCreateTableNode() {
+ Long createTime = TIMESTAMP;
+ ImpalaNode tableNode = baseImpalaEvent.createTableNode(TABLE_NAME,
createTime);
+
+ assertNotNull(tableNode);
+ assertEquals(tableNode.getOwnVertex().getVertexType(),
ImpalaVertexType.TABLE);
+ assertEquals(tableNode.getOwnVertex().getVertexId(), TABLE_NAME);
+ assertEquals(tableNode.getOwnVertex().getCreateTime(), createTime);
+ }
+
+ @Test
+ public void testCreateHiveDDLEntityWithDb() {
+ AtlasEntity dbEntity = new AtlasEntity(HIVE_TYPE_DB);
+ dbEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
"test.db.qualified.name");
+
+ AtlasEntity ddlEntity = baseImpalaEvent.createHiveDDLEntity(dbEntity);
+
+ assertNotNull(ddlEntity);
+ assertEquals(ddlEntity.getTypeName(),
ImpalaDataType.HIVE_DB_DDL.getName());
+ assertEquals(ddlEntity.getAttribute(ATTRIBUTE_SERVICE_TYPE), "impala");
+ assertEquals(ddlEntity.getAttribute(ATTRIBUTE_EXEC_TIME), TIMESTAMP *
MILLIS_CONVERT_FACTOR);
+ assertEquals(ddlEntity.getAttribute(ATTRIBUTE_QUERY_TEXT), QUERY_TEXT);
+ assertEquals(ddlEntity.getAttribute(ATTRIBUTE_USER_NAME), USER_NAME);
+ assertNotNull(ddlEntity.getAttribute(ATTRIBUTE_DB));
+ }
+
+ @Test
+ public void testCreateHiveDDLEntityWithTable() {
+ AtlasEntity tableEntity = new AtlasEntity(HIVE_TYPE_TABLE);
+ tableEntity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
"test.table.qualified.name");
+
+ AtlasEntity ddlEntity =
baseImpalaEvent.createHiveDDLEntity(tableEntity);
+
+ assertNotNull(ddlEntity);
+ assertEquals(ddlEntity.getTypeName(),
ImpalaDataType.HIVE_TABLE_DDL.getName());
+ assertEquals(ddlEntity.getAttribute(ATTRIBUTE_SERVICE_TYPE), "impala");
+ assertEquals(ddlEntity.getAttribute(ATTRIBUTE_EXEC_TIME), TIMESTAMP *
MILLIS_CONVERT_FACTOR);
+ assertEquals(ddlEntity.getAttribute(ATTRIBUTE_QUERY_TEXT), QUERY_TEXT);
+ assertEquals(ddlEntity.getAttribute(ATTRIBUTE_USER_NAME), USER_NAME);
+ assertNotNull(ddlEntity.getAttribute(ATTRIBUTE_TABLE));
+ }
+
+ @Test
+ public void testIsDdlOperation() {
+
+
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.CREATEVIEW);
+ assertTrue(baseImpalaEvent.isDdlOperation());
+
+
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.ALTERVIEW_AS);
+ assertTrue(baseImpalaEvent.isDdlOperation());
+
+
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.CREATETABLE_AS_SELECT);
+ assertTrue(baseImpalaEvent.isDdlOperation());
+
+
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY);
+ assertFalse(baseImpalaEvent.isDdlOperation());
+ }
+
+ @Test
+ public void testGetCreateTimeInVertex() {
+
+ LineageVertex vertex = createDatabaseVertex(DB_NAME);
+ vertex.setCreateTime(TIMESTAMP);
+
+ Long createTime = baseImpalaEvent.getCreateTimeInVertex(vertex);
+ assertEquals(createTime, Long.valueOf(TIMESTAMP));
+
+ Long createTimeNull = baseImpalaEvent.getCreateTimeInVertex(null);
+ assertTrue(createTimeNull > 0); // Should return current time in
seconds
+
+ LineageVertex columnVertex = createColumnVertex(COLUMN_NAME,
TABLE_NAME);
+ columnVertex.setCreateTime(null);
+ LineageVertexMetadata metadata = new LineageVertexMetadata();
+ metadata.setTableCreateTime(TIMESTAMP);
+ columnVertex.setMetadata(metadata);
+
+ Long metadataCreateTime =
baseImpalaEvent.getCreateTimeInVertex(columnVertex);
+ assertEquals(metadataCreateTime, Long.valueOf(TIMESTAMP));
+ }
+
+ @Test(dataProvider = "qualifiedNameDataProvider")
+ public void testGetQualifiedNameForProcess(ImpalaOperationType
operationType,
+ List<AtlasEntity> inputs,
+ List<AtlasEntity> outputs,
+ boolean shouldThrowException)
throws Exception {
+ when(mockContext.getImpalaOperationType()).thenReturn(operationType);
+
+ if (shouldThrowException) {
+ try {
+ baseImpalaEvent.getQualifiedName(inputs, outputs);
+ fail("Expected IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("unexpected operation
type"));
+ }
+ } else {
+ String qualifiedName = baseImpalaEvent.getQualifiedName(inputs,
outputs);
+ assertNotNull(qualifiedName);
+ if (operationType == ImpalaOperationType.CREATEVIEW ||
+ operationType == ImpalaOperationType.CREATETABLE_AS_SELECT
||
+ operationType == ImpalaOperationType.ALTERVIEW_AS) {
+ assertTrue(qualifiedName.contains("@"));
+ } else {
+ assertTrue(qualifiedName.contains("->"));
+ }
+ }
+ }
+
+ @DataProvider(name = "qualifiedNameDataProvider")
+ public Object[][] qualifiedNameDataProvider() {
+ List<AtlasEntity> inputs = createMockEntities("input_table");
+ List<AtlasEntity> outputs = createMockEntities("output_table");
+
+ return new Object[][] {
+ {ImpalaOperationType.CREATEVIEW, inputs, outputs, false},
+ {ImpalaOperationType.CREATETABLE_AS_SELECT, inputs, outputs,
false},
+ {ImpalaOperationType.ALTERVIEW_AS, inputs, outputs, false},
+ {ImpalaOperationType.QUERY, inputs, outputs, false},
+ {ImpalaOperationType.QUERY_WITH_CLAUSE, inputs, outputs,
false},
+ {ImpalaOperationType.INSERT, inputs, outputs, true} // Should
throw exception
+ };
+ }
+
+ // Helper methods for creating test data
+
+ private LineageVertex createDatabaseVertex(String dbName) {
+ LineageVertex vertex = new LineageVertex();
+ vertex.setVertexType(ImpalaVertexType.DATABASE);
+ vertex.setVertexId(dbName);
+ vertex.setCreateTime(TIMESTAMP);
+ return vertex;
+ }
+
+ private LineageVertex createTableVertex(String tableName) {
+ LineageVertex vertex = new LineageVertex();
+ vertex.setVertexType(ImpalaVertexType.TABLE);
+ vertex.setVertexId(tableName);
+ vertex.setCreateTime(TIMESTAMP);
+ return vertex;
+ }
+
+ private LineageVertex createColumnVertex(String columnName, String
tableName) {
+ LineageVertex vertex = new LineageVertex();
+ vertex.setVertexType(ImpalaVertexType.COLUMN);
+ vertex.setVertexId(DB_NAME+"."+tableName+"."+columnName);
+ vertex.setCreateTime(TIMESTAMP);
+
+ LineageVertexMetadata metadata = new LineageVertexMetadata();
+ metadata.setTableName(tableName);
+ metadata.setTableCreateTime(TIMESTAMP);
+ vertex.setMetadata(metadata);
+
+ return vertex;
+ }
+
+ private List<AtlasEntity> createMockEntities(String tableBaseName) {
+ List<AtlasEntity> entities = new ArrayList<>();
+
+ AtlasEntity entity = new AtlasEntity(HIVE_TYPE_TABLE);
+ entity.setGuid("test-guid-" + tableBaseName);
+ entity.setAttribute(ATTRIBUTE_QUALIFIED_NAME, tableBaseName +"@"+
CLUSTER_NAME);
+ entity.setAttribute(ATTRIBUTE_NAME, tableBaseName);
+ entity.setAttribute(ATTRIBUTE_CREATE_TIME, TIMESTAMP *
MILLIS_CONVERT_FACTOR);
+
+ entities.add(entity);
+ return entities;
+ }
+}
diff --git
a/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/CreateImpalaProcessTest.java
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/CreateImpalaProcessTest.java
new file mode 100644
index 000000000..3d776fb7f
--- /dev/null
+++
b/addons/impala-bridge/src/test/java/org/apache/atlas/impala/hook/events/CreateImpalaProcessTest.java
@@ -0,0 +1,457 @@
+/** Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.atlas.impala.hook.events;
+
+import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
+import org.apache.atlas.impala.hook.ImpalaLineageHook;
+import org.apache.atlas.impala.model.ImpalaDependencyType;
+import org.apache.atlas.impala.model.ImpalaOperationType;
+import org.apache.atlas.impala.model.ImpalaQuery;
+import org.apache.atlas.impala.model.ImpalaVertexType;
+import org.apache.atlas.impala.model.LineageEdge;
+import org.apache.atlas.impala.model.LineageVertex;
+import org.apache.atlas.impala.model.LineageVertexMetadata;
+import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.model.instance.AtlasEntity.AtlasEntitiesWithExtInfo;
+import org.apache.atlas.model.notification.HookNotification;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_NAME;
+import static
org.apache.atlas.impala.hook.events.BaseImpalaEvent.ATTRIBUTE_QUALIFIED_NAME;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.*;
+import static org.testng.AssertJUnit.assertTrue;
+
+public class CreateImpalaProcessTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(CreateImpalaProcessTest.class);
+
+ @Mock
+ private ImpalaLineageHook mockHook;
+
+ @Mock
+ private AtlasImpalaHookContext mockContext;
+
+ @Mock
+ private ImpalaLineageHook impalaLineageHook;
+
+ @Mock
+ private ImpalaOperationType impalaOperationType;
+
+ @Mock
+ private ImpalaQuery impalaQuery;
+
+ @Mock
+ private AtlasEntity mockOutputEntity;
+
+ @Mock
+ private LineageVertexMetadata mockMetadata;
+
+ @Mock
+ private LineageEdge mockLineageEdge;
+ @Mock
+ private ImpalaQuery mockLineageQuery;
+
+ @Mock
+ private AtlasEntity mockInputEntity;
+
+
+ private CreateImpalaProcess createImpalaProcess;
+
+ private static final String TEST_CLUSTER_NAME = "test_cluster";
+ private static final String TEST_DB_NAME = "test_db";
+ private static final String CLUSTER_NAME = "testcluster";
+ private static final String DB_NAME = "testdb";
+ private static final String TABLE_NAME_SOURCE = "source_table";
+ private static final String TABLE_NAME_TARGET = "target_table";
+ private static final String COLUMN_NAME_ID = "id";
+ private static final String COLUMN_NAME_NAME = "name";
+ private static final String USER_NAME = "testuser";
+ private static final String HOST_NAME = "testhost";
+ private static final String QUERY_TEXT = "CREATE VIEW target_table AS
SELECT id, name FROM source_db.source_table";
+ private static final String QUERY_ID = "test_query_id_123";
+ private static final long TIMESTAMP = 1554750072L;
+ private static final long END_TIME = 1554750554L;
+
+ private static final String TEST_TABLE_NAME = "test_table";
+ private static final String TEST_COLUMN_NAME = "test_column";
+ private static final String TEST_QUALIFIED_NAME =
"test_db.test_table@test_cluster";
+ private static final String TEST_QUERY_TEXT = "SELECT * FROM test_table";
+ private static final String TEST_USER_NAME = "test_user";
+ private static final long TEST_TIMESTAMP = 1234567890L;
+ private static final long TEST_END_TIME = 1234567900L;
+ private static final long TEST_VERTEX_ID = 123L;
+
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ MockitoAnnotations.initMocks(this);
+
+ when(mockHook.getMetadataNamespace()).thenReturn(CLUSTER_NAME);
+ when(mockHook.getHostName()).thenReturn(HOST_NAME);
+ when(mockHook.isConvertHdfsPathToLowerCase()).thenReturn(false);
+
+ when(mockContext.getUserName()).thenReturn(USER_NAME);
+ when(mockContext.getHostName()).thenReturn(HOST_NAME);
+ when(mockContext.getMetadataNamespace()).thenReturn(CLUSTER_NAME);
+ when(mockContext.getQueryStr()).thenReturn(QUERY_TEXT);
+
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.CREATEVIEW);
+
+ createImpalaProcess = new CreateImpalaProcess(mockContext);
+ }
+
+
+ @Test
+ public void testGetNotificationMessagesWithNoEntities() throws Exception {
+
+ ImpalaQuery emptyQuery = new ImpalaQuery();
+ emptyQuery.setVertices(new ArrayList<>());
+ emptyQuery.setEdges(new ArrayList<>());
+ emptyQuery.setQueryText(QUERY_TEXT);
+ emptyQuery.setQueryId(QUERY_ID);
+ emptyQuery.setUser(USER_NAME);
+ emptyQuery.setTimestamp(TIMESTAMP);
+ emptyQuery.setEndTime(END_TIME);
+
+ when(mockContext.getLineageQuery()).thenReturn(emptyQuery);
+
+ CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+ List<HookNotification> notifications =
process.getNotificationMessages();
+
+ assertNull(notifications);
+ }
+
+ @Test
+ public void testGetNotificationMessagesWithEntities() throws Exception {
+
+ String sourceTableFullName = "testdb.source_table";
+ String targetTableFullName = "testdb.target_table";
+ ImpalaQuery query = createTestQuery();
+ when(mockContext.getLineageQuery()).thenReturn(query);
+
+ AtlasEntity mockInputEntity = createMockTableEntity(TABLE_NAME_SOURCE);
+ AtlasEntity mockOutputEntity =
createMockTableEntity(TABLE_NAME_TARGET);
+
+
when(mockContext.getQualifiedNameForTable(sourceTableFullName)).thenReturn(TEST_DB_NAME+"."+TABLE_NAME_SOURCE+"@"+TEST_CLUSTER_NAME);
+
when(mockContext.getDatabaseNameFromTable(sourceTableFullName)).thenReturn(TEST_DB_NAME);
+
when(mockContext.getQualifiedNameForTable(targetTableFullName)).thenReturn(TEST_DB_NAME+"."+TABLE_NAME_TARGET+"@"+TEST_CLUSTER_NAME);
+
when(mockContext.getDatabaseNameFromTable(targetTableFullName)).thenReturn(TEST_DB_NAME);
+
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY);
+ when(mockContext.getEntity(getQualifiedTableName(TABLE_NAME_SOURCE)))
+ .thenReturn(mockInputEntity);
+ when(mockContext.getEntity(getQualifiedTableName(TABLE_NAME_TARGET)))
+ .thenReturn(mockOutputEntity);
+
+
+ CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+ AtlasEntitiesWithExtInfo entities = process.getEntities();
+ assertEquals(entities.getEntities().size(),2);
+
assertEquals(entities.getEntities().get(0).getTypeName(),"impala_process");
+
assertEquals(entities.getEntities().get(1).getTypeName(),"impala_process_execution");
+
assertEquals(entities.getEntities().get(0).getAttribute(ATTRIBUTE_QUALIFIED_NAME),"QUERY:"+TEST_DB_NAME+"."+TABLE_NAME_SOURCE+"@"+TEST_CLUSTER_NAME+":"+"1554750072000->:"+TEST_DB_NAME+"."+TABLE_NAME_TARGET+"@"+TEST_CLUSTER_NAME+":1554750072000");
+ }
+
+
+ @Test
+ public void testGetEntitiesWithValidData() throws Exception {
+
+ String sourceTableFullName = "testdb.source_table";
+ String targetTableFullName = "testdb.target_table";
+ ImpalaQuery query = createTestQuery();
+ when(mockContext.getLineageQuery()).thenReturn(query);
+
+ AtlasEntity mockInputEntity = createMockTableEntity(TABLE_NAME_SOURCE);
+ AtlasEntity mockOutputEntity =
createMockTableEntity(TABLE_NAME_TARGET);
+
+
when(mockContext.getQualifiedNameForTable(sourceTableFullName)).thenReturn(TEST_DB_NAME+"."+TABLE_NAME_SOURCE+"@"+TEST_CLUSTER_NAME);
+
when(mockContext.getDatabaseNameFromTable(sourceTableFullName)).thenReturn(TEST_DB_NAME);
+
when(mockContext.getQualifiedNameForTable(targetTableFullName)).thenReturn(TEST_DB_NAME+"."+TABLE_NAME_TARGET+"@"+TEST_CLUSTER_NAME);
+
when(mockContext.getDatabaseNameFromTable(targetTableFullName)).thenReturn(TEST_DB_NAME);
+
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY);
+ when(mockContext.getEntity(getQualifiedTableName(TABLE_NAME_SOURCE)))
+ .thenReturn(mockInputEntity);
+ when(mockContext.getEntity(getQualifiedTableName(TABLE_NAME_TARGET)))
+ .thenReturn(mockOutputEntity);
+
+ CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+ List<HookNotification> hookMsgList =
process.getNotificationMessages();
+
+
assertEquals(hookMsgList.get(0).getType().ordinal(),HookNotification.HookNotificationType.ENTITY_CREATE_V2.ordinal());
+ }
+
+ @Test
+ public void testGetEntitiesWithEmptyInputsAndOutputs() throws Exception {
+ ImpalaQuery query = new ImpalaQuery();
+ query.setVertices(new ArrayList<>());
+ query.setEdges(new ArrayList<>());
+ query.setQueryText(QUERY_TEXT);
+ query.setQueryId(QUERY_ID);
+ query.setUser(USER_NAME);
+ query.setTimestamp(TIMESTAMP);
+ query.setEndTime(END_TIME);
+
+ when(mockContext.getLineageQuery()).thenReturn(query);
+
+ CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+ AtlasEntitiesWithExtInfo entities = process.getEntities();
+
+ assertNull(entities);
+ }
+
+ @Test
+ public void testUserName() {
+ assertEquals(createImpalaProcess.getUserName(), USER_NAME);
+ }
+
+ private ImpalaQuery createTestQuery() {
+ ImpalaQuery query = new ImpalaQuery();
+ query.setQueryText(QUERY_TEXT);
+ query.setQueryId(QUERY_ID);
+ query.setUser(USER_NAME);
+ query.setTimestamp(TIMESTAMP);
+ query.setEndTime(END_TIME);
+
+ List<LineageVertex> vertices = new ArrayList<>();
+
+
+ LineageVertex sourceCol1 = createColumnVertex(0L, DB_NAME +"."+
TABLE_NAME_SOURCE +"."+ COLUMN_NAME_ID,
+ TABLE_NAME_SOURCE, COLUMN_NAME_ID);
+ LineageVertex sourceCol2 = createColumnVertex(1L, DB_NAME +"."+
TABLE_NAME_SOURCE +"." + COLUMN_NAME_NAME,
+ TABLE_NAME_SOURCE, COLUMN_NAME_NAME);
+
+
+ LineageVertex targetCol1 = createColumnVertex(2L, DB_NAME +"."+
TABLE_NAME_TARGET +"."+ COLUMN_NAME_ID,
+ TABLE_NAME_TARGET, COLUMN_NAME_ID);
+ LineageVertex targetCol2 = createColumnVertex(3L, DB_NAME +"."+
TABLE_NAME_TARGET +"." +COLUMN_NAME_NAME,
+ TABLE_NAME_TARGET, COLUMN_NAME_NAME);
+
+ vertices.add(sourceCol1);
+ vertices.add(sourceCol2);
+ vertices.add(targetCol1);
+ vertices.add(targetCol2);
+ query.setVertices(vertices);
+
+ List<LineageEdge> edges = new ArrayList<>();
+
+ LineageEdge edge1 = new LineageEdge();
+ edge1.setSources(Arrays.asList(0L));
+ edge1.setTargets(Arrays.asList(2L));
+ edge1.setEdgeType(ImpalaDependencyType.PROJECTION);
+ edges.add(edge1);
+
+ LineageEdge edge2 = new LineageEdge();
+ edge2.setSources(Arrays.asList(1L));
+ edge2.setTargets(Arrays.asList(3L));
+ edge2.setEdgeType(ImpalaDependencyType.PROJECTION);
+ edges.add(edge2);
+
+ query.setEdges(edges);
+
+ return query;
+ }
+
+
+ private LineageVertex createColumnVertex(Long id, String vertexId, String
tableName, String columnName) {
+ LineageVertex vertex = new LineageVertex();
+ vertex.setId(id);
+ vertex.setVertexId(vertexId);
+ vertex.setVertexType(ImpalaVertexType.COLUMN);
+
+ LineageVertexMetadata metadata = new LineageVertexMetadata();
+ metadata.setTableName(DB_NAME +"."+ tableName);
+ metadata.setTableCreateTime(TIMESTAMP);
+ vertex.setMetadata(metadata);
+
+ return vertex;
+ }
+
+
+ private AtlasEntity createMockTableEntity(String tableName) {
+ AtlasEntity entity = new AtlasEntity(BaseImpalaEvent.HIVE_TYPE_TABLE);
+ entity.setAttribute(ATTRIBUTE_NAME, tableName);
+ entity.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
getQualifiedTableName(tableName));
+ return entity;
+ }
+
+
+ private String getQualifiedTableName(String tableName) {
+ return (DB_NAME +"."+ tableName +"@"+ CLUSTER_NAME).toLowerCase();
+ }
+
+ @Test
+ public void testCreateImpalaProcessWithColumnLineage() throws Exception {
+
+ ImpalaQuery query = createTestQuery();
+ when(mockContext.getLineageQuery()).thenReturn(query);
+
+ AtlasEntity mockInputCol1 = new
AtlasEntity(BaseImpalaEvent.HIVE_TYPE_COLUMN);
+ mockInputCol1.setAttribute(ATTRIBUTE_NAME, COLUMN_NAME_ID);
+ AtlasEntity mockInputCol2 = new
AtlasEntity(BaseImpalaEvent.HIVE_TYPE_COLUMN);
+ mockInputCol2.setAttribute(ATTRIBUTE_NAME, COLUMN_NAME_NAME);
+
+ AtlasEntity mockOutputCol1 = new
AtlasEntity(BaseImpalaEvent.HIVE_TYPE_COLUMN);
+ mockOutputCol1.setAttribute(ATTRIBUTE_NAME, COLUMN_NAME_ID);
+ AtlasEntity mockOutputCol2 = new
AtlasEntity(BaseImpalaEvent.HIVE_TYPE_COLUMN);
+ mockOutputCol2.setAttribute(ATTRIBUTE_NAME, COLUMN_NAME_NAME);
+
+ when(mockContext.getEntity(DB_NAME +"."+ TABLE_NAME_SOURCE +"."+
COLUMN_NAME_ID))
+ .thenReturn(mockInputCol1);
+ when(mockContext.getEntity(DB_NAME +"."+ TABLE_NAME_SOURCE +"."+
COLUMN_NAME_NAME))
+ .thenReturn(mockInputCol2);
+ when(mockContext.getEntity(DB_NAME +"."+ TABLE_NAME_TARGET +"."+
COLUMN_NAME_ID))
+ .thenReturn(mockOutputCol1);
+ when(mockContext.getEntity(DB_NAME +"."+ TABLE_NAME_TARGET +"."+
COLUMN_NAME_NAME))
+ .thenReturn(mockOutputCol2);
+
+ CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+
+ AtlasEntitiesWithExtInfo entities = process.getEntities();
+
+ }
+
+ @Test
+ public void testProcessWithPredicateEdgeType() throws Exception {
+ ImpalaQuery query = createTestQuery();
+
+ for (LineageEdge edge : query.getEdges()) {
+ edge.setEdgeType(ImpalaDependencyType.PREDICATE);
+ }
+
+ when(mockContext.getLineageQuery()).thenReturn(query);
+
+ CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+
+ AtlasEntitiesWithExtInfo entities = process.getEntities();
+
+ }
+
+ @Test
+ public void testProcessWithMixedEdgeTypes() throws Exception {
+ ImpalaQuery query = createTestQuery();
+
+ LineageEdge predicateEdge = new LineageEdge();
+ predicateEdge.setSources(Arrays.asList(0L));
+ predicateEdge.setTargets(Arrays.asList(2L));
+ predicateEdge.setEdgeType(ImpalaDependencyType.PREDICATE);
+ query.getEdges().add(predicateEdge);
+
+ when(mockContext.getLineageQuery()).thenReturn(query);
+
+ CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+ AtlasEntitiesWithExtInfo entities = process.getEntities();
+
+ LOG.info("Mixed edge types test completed");
+ }
+
+ @Test
+ public void testProcessWithNullVertexMetadata() throws Exception {
+ ImpalaQuery query = createTestQuery();
+
+ query.getVertices().get(0).setMetadata(null);
+
+ when(mockContext.getLineageQuery()).thenReturn(query);
+
+ CreateImpalaProcess process = new CreateImpalaProcess(mockContext);
+
+ AtlasEntitiesWithExtInfo entities = process.getEntities();
+
+ }
+
+ private void setupBasicMocks() {
+ when(mockContext.getLineageQuery()).thenReturn(mockLineageQuery);
+ when(mockContext.getUserName()).thenReturn(TEST_USER_NAME);
+
when(mockContext.getImpalaOperationType()).thenReturn(ImpalaOperationType.QUERY);
+ when(mockLineageQuery.getQueryText()).thenReturn(TEST_QUERY_TEXT);
+ when(mockLineageQuery.getTimestamp()).thenReturn(TEST_TIMESTAMP);
+ when(mockLineageQuery.getEndTime()).thenReturn(TEST_END_TIME);
+ }
+
+ private void setupOutputVertices() throws Exception {
+ java.lang.reflect.Field verticesMapField =
BaseImpalaEvent.class.getDeclaredField("verticesMap");
+ verticesMapField.setAccessible(true);
+ Map<Long, LineageVertex> verticesMap = (Map<Long, LineageVertex>)
verticesMapField.get(createImpalaProcess);
+
+ LineageVertex outputVertex1 = mock(LineageVertex.class);
+ LineageVertex inputVertex = mock(LineageVertex.class);
+
+ when(outputVertex1.getVertexId()).thenReturn("output_column1");
+
when(outputVertex1.getVertexType()).thenReturn(ImpalaVertexType.COLUMN);
+ when(outputVertex1.getMetadata()).thenReturn(mockMetadata);
+
+ when(inputVertex.getVertexId()).thenReturn("input_column");
+ when(inputVertex.getVertexType()).thenReturn(ImpalaVertexType.COLUMN);
+ when(inputVertex.getMetadata()).thenReturn(mockMetadata);
+
+ when(mockMetadata.getTableName()).thenReturn(TEST_TABLE_NAME);
+
+ verticesMap.put(TEST_VERTEX_ID, outputVertex1);
+ verticesMap.put(TEST_VERTEX_ID + 2, inputVertex);
+ }
+
+ private AtlasEntity createMockProcessEntity() {
+ AtlasEntity process = mock(AtlasEntity.class);
+
when(process.getAttribute("qualifiedName")).thenReturn(TEST_DB_NAME+"."+TEST_TABLE_NAME+"."+TEST_COLUMN_NAME+"@"+TEST_CLUSTER_NAME);
+ when(process.getAttribute("name")).thenReturn("test_process");
+ return process;
+ }
+
+ @Test
+ public void testProcessColumnLineageWithOutputColumn() throws Exception {
+
+ String outputColumnName = "output_column";
+ setupBasicMocks();
+
+ List<LineageEdge> edges = Arrays.asList(mockLineageEdge);
+ when(mockLineageQuery.getEdges()).thenReturn(edges);
+
when(mockLineageEdge.getEdgeType()).thenReturn(ImpalaDependencyType.PROJECTION);
+
when(mockLineageEdge.getTargets()).thenReturn(Arrays.asList(TEST_VERTEX_ID));
+
when(mockContext.getQualifiedNameForColumn(any(LineageVertex.class))).thenReturn(TEST_DB_NAME+"."+TEST_TABLE_NAME+"."+TEST_COLUMN_NAME+"@"+TEST_CLUSTER_NAME);
+
when(mockLineageEdge.getSources()).thenReturn(Arrays.asList(TEST_VERTEX_ID));
+
+ setupOutputVertices();
+
+ when(mockContext.getEntity(anyString())).thenReturn(mockOutputEntity,
mockOutputEntity, mockInputEntity);
+
when(mockOutputEntity.getAttribute("name")).thenReturn(outputColumnName);
+
+ AtlasEntitiesWithExtInfo entities = new AtlasEntitiesWithExtInfo();
+ AtlasEntity process = createMockProcessEntity();
+
+ Method processColumnLineageMethod =
CreateImpalaProcess.class.getDeclaredMethod("processColumnLineage",
AtlasEntity.class, AtlasEntitiesWithExtInfo.class);
+ processColumnLineageMethod.setAccessible(true);
+
+ processColumnLineageMethod.invoke(createImpalaProcess, process,
entities);
+
+ assertTrue(entities.getEntities().size() > 0);
+
assertEquals(entities.getEntities().get(0).getTypeName(),"impala_column_lineage");
+
assertEquals(entities.getEntities().get(0).getAttribute(ATTRIBUTE_QUALIFIED_NAME),TEST_DB_NAME+"."+TEST_TABLE_NAME+"."+TEST_COLUMN_NAME+"@"+TEST_CLUSTER_NAME+":"+outputColumnName);
+ }
+
+
+}
\ No newline at end of file