This is an automated email from the ASF dual-hosted git repository.
chaitalithombare pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/atlas.git
The following commit(s) were added to refs/heads/master by this push:
new 54befb37f ATLAS-4974: impala-bridge, impala-bridge-shim,
impala-hook-api modules: update for code readability improvements (#378)
54befb37f is described below
commit 54befb37f569c69391fe01f532bb54422fb847f4
Author: chaitalicod <[email protected]>
AuthorDate: Tue Jun 17 16:09:45 2025 +0530
ATLAS-4974: impala-bridge, impala-bridge-shim, impala-hook-api modules:
update for code readability improvements (#378)
Co-authored-by: chaitalithombare <[email protected]>
---
.../org/apache/atlas/impala/ImpalaLineageTool.java | 277 +++++++++++----------
.../atlas/impala/hook/AtlasImpalaHookContext.java | 39 +--
.../atlas/impala/hook/ImpalaIdentifierParser.java | 108 ++++----
.../atlas/impala/hook/ImpalaLineageHook.java | 37 ++-
.../atlas/impala/hook/ImpalaOperationParser.java | 11 +-
.../atlas/impala/hook/events/BaseImpalaEvent.java | 76 +++---
.../impala/hook/events/CreateImpalaProcess.java | 46 ++--
7 files changed, 296 insertions(+), 298 deletions(-)
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java
index 6e6d6f1ee..d10a7bf07 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/ImpalaLineageTool.java
@@ -19,13 +19,8 @@
package org.apache.atlas.impala;
import org.apache.atlas.impala.hook.ImpalaLineageHook;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
@@ -35,141 +30,148 @@ import org.apache.commons.io.filefilter.PrefixFileFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
/**
* Entry point of actual implementation of Impala lineage tool. It reads the
lineage records in
* lineage log. It then calls instance of ImpalaLineageHook to convert lineage
records to
* lineage notifications and send them to Atlas.
*/
public class ImpalaLineageTool {
- private static final Logger LOG =
LoggerFactory.getLogger(ImpalaLineageTool.class);
- private static final String WAL_FILE_EXTENSION = ".wal";
- private static final String WAL_FILE_PREFIX = "WAL";
- private String directoryName;
- private String prefix;
-
- public ImpalaLineageTool(String[] args) {
- try {
- Options options = new Options();
- options.addOption("d", "directory", true, "the lineage files' folder");
- options.addOption("p", "prefix", true, "the prefix of the lineage
files");
-
- CommandLine cmd = new DefaultParser().parse(options, args);
- directoryName = cmd.getOptionValue("d");
- prefix = cmd.getOptionValue("p");
- } catch(ParseException e) {
- LOG.warn("Failed to parse command arguments. Error: ", e.getMessage());
- printUsage();
-
- throw new RuntimeException(e);
+ private static final Logger LOG =
LoggerFactory.getLogger(ImpalaLineageTool.class);
+ private static final String WAL_FILE_EXTENSION = ".wal";
+ private static final String WAL_FILE_PREFIX = "WAL";
+ private String directoryName;
+ private String prefix;
+
+ public ImpalaLineageTool(String[] args) {
+ try {
+ Options options = new Options();
+ options.addOption("d", "directory", true, "the lineage files'
folder");
+ options.addOption("p", "prefix", true, "the prefix of the lineage
files");
+
+ CommandLine cmd = new DefaultParser().parse(options, args);
+ directoryName = cmd.getOptionValue("d");
+ prefix = cmd.getOptionValue("p");
+ } catch (ParseException e) {
+ LOG.warn("Failed to parse command arguments. Error: ",
e.getMessage());
+ printUsage();
+
+ throw new RuntimeException(e);
+ }
}
- }
-
- public void run() {
- ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
- File[] currentFiles = getCurrentFiles();
- int fileNum = currentFiles.length;
+ public void run() {
+ ImpalaLineageHook impalaLineageHook = new ImpalaLineageHook();
- for(int i = 0; i < fileNum; i++) {
- String filename = currentFiles[i].getAbsolutePath();
- String walFilename = directoryName + WAL_FILE_PREFIX +
currentFiles[i].getName() + WAL_FILE_EXTENSION;
+ File[] currentFiles = getCurrentFiles();
+ int fileNum = currentFiles.length;
- LOG.info("Importing: {}", filename);
- importHImpalaEntities(impalaLineageHook, filename, walFilename);
+ for (int i = 0; i < fileNum; i++) {
+ String filename = currentFiles[i].getAbsolutePath();
+ String walFilename = directoryName + WAL_FILE_PREFIX +
currentFiles[i].getName() + WAL_FILE_EXTENSION;
+ LOG.info("Importing: {}", filename);
+ importHImpalaEntities(impalaLineageHook, filename, walFilename);
- if(i != fileNum - 1) {
- deleteLineageAndWal(currentFiles[i], walFilename);
- }
- }
- LOG.info("Impala bridge processing: Done! ");
- }
-
- public static void main(String[] args) {
- if (args != null && args.length != 4) {
- // The lineage file location and prefix should be input as the parameters
- System.out.println("Impala bridge: wrong number of arguments. Please try
again");
- printUsage();
- return;
+ if (i != fileNum - 1) {
+ deleteLineageAndWal(currentFiles[i], walFilename);
+ }
+ }
+ LOG.info("Impala bridge processing: Done! ");
}
- ImpalaLineageTool instance = new ImpalaLineageTool(args);
- instance.run();
- }
+ public static void main(String[] args) {
+ if (args != null && args.length != 4) {
+ // The lineage file location and prefix should be input as the
parameters
+ System.out.println("Impala bridge: wrong number of arguments.
Please try again");
+ printUsage();
+ return;
+ }
+
+ ImpalaLineageTool instance = new ImpalaLineageTool(args);
+ instance.run();
+ }
/**
* Delete the used lineage file and wal file
* @param currentFile The current file
* @param wal The wal file
*/
- public static void deleteLineageAndWal(File currentFile, String wal) {
- if(currentFile.exists() && currentFile.delete()) {
- LOG.info("Lineage file {} is deleted successfully",
currentFile.getPath());
- } else {
- LOG.info("Failed to delete the lineage file {}", currentFile.getPath());
+ public static void deleteLineageAndWal(File currentFile, String wal) {
+ if (currentFile.exists() && currentFile.delete()) {
+ LOG.info("Lineage file {} is deleted successfully",
currentFile.getPath());
+ } else {
+ LOG.info("Failed to delete the lineage file {}",
currentFile.getPath());
+ }
+
+ File file = new File(wal);
+
+ if (file.exists() && file.delete()) {
+ LOG.info("Wal file {} deleted successfully", wal);
+ } else {
+ LOG.info("Failed to delete the wal file {}", wal);
+ }
}
- File file = new File(wal);
-
- if(file.exists() && file.delete()) {
- LOG.info("Wal file {} deleted successfully", wal);
- } else {
- LOG.info("Failed to delete the wal file {}", wal);
+ private static void printUsage() {
+ System.out.println();
+ System.out.println();
+ System.out.println("Usage: import-impala.sh [-d <directory>] [-p
<prefix>]");
+ System.out.println("Imports specified lineage files by given directory
and file prefix.");
+ System.out.println();
}
- }
-
- private static void printUsage() {
- System.out.println();
- System.out.println();
- System.out.println("Usage: import-impala.sh [-d <directory>] [-p
<prefix>]" );
- System.out.println(" Imports specified lineage files by given directory
and file prefix.");
- System.out.println();
- }
/**
* This function figures out the right lineage file path+name to process
sorted by the last
* time they are modified. (old -> new)
* @return get the lineage files from given directory with given prefix.
*/
- public File[] getCurrentFiles() {
- try {
- LOG.info("Scanning: " + directoryName);
- File folder = new File(directoryName);
- File[] listOfFiles = folder.listFiles((FileFilter) new
PrefixFileFilter(prefix, IOCase.SENSITIVE));
-
- if ((listOfFiles == null) || (listOfFiles.length == 0)) {
- LOG.info("Found no lineage files.");
+ public File[] getCurrentFiles() {
+ try {
+ LOG.info("Scanning: " + directoryName);
+ File folder = new File(directoryName);
+ File[] listOfFiles = folder.listFiles((FileFilter) new
PrefixFileFilter(prefix, IOCase.SENSITIVE));
+
+ if ((listOfFiles == null) || (listOfFiles.length == 0)) {
+ LOG.info("Found no lineage files.");
+ return new File[0];
+ }
+
+ if (listOfFiles.length > 1) {
+ Arrays.sort(listOfFiles,
LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);
+ }
+
+ LOG.info("Found {} lineage files" + listOfFiles.length);
+ return listOfFiles;
+ } catch (Exception e) {
+ LOG.error("Import lineage file failed.", e);
+ }
return new File[0];
- }
+ }
- if(listOfFiles.length > 1) {
- Arrays.sort(listOfFiles,
LastModifiedFileComparator.LASTMODIFIED_COMPARATOR);
- }
+ private boolean processImpalaLineageHook(ImpalaLineageHook
impalaLineageHook, List<String> lineageList) {
+ boolean allSucceed = true;
- LOG.info("Found {} lineage files" + listOfFiles.length);
- return listOfFiles;
- } catch(Exception e) {
- LOG.error("Import lineage file failed.", e);
- }
- return new File[0];
- }
-
- private boolean processImpalaLineageHook(ImpalaLineageHook
impalaLineageHook, List<String> lineageList) {
- boolean allSucceed = true;
-
- // returns true if successfully sent to Atlas
- for (String lineageRecord : lineageList) {
- try {
- impalaLineageHook.process(lineageRecord);
- } catch (Exception ex) {
- String errorMessage = String.format("Exception at query {} \n",
lineageRecord);
- LOG.error(errorMessage, ex);
-
- allSucceed = false;
- }
- }
+ // returns true if successfully sent to Atlas
+ for (String lineageRecord : lineageList) {
+ try {
+ impalaLineageHook.process(lineageRecord);
+ } catch (Exception ex) {
+ String errorMessage = String.format("Exception at query {}
\n", lineageRecord);
+ LOG.error(errorMessage, ex);
- return allSucceed;
- }
+ allSucceed = false;
+ }
+ }
+
+ return allSucceed;
+ }
/**
* Create a list of lineage queries based on the lineage file and the wal
file
@@ -177,40 +179,39 @@ public class ImpalaLineageTool {
* @param walfile
* @return
*/
- public void importHImpalaEntities(ImpalaLineageHook impalaLineageHook,
String name, String walfile) {
- List<String> lineageList = new ArrayList<>();
+ public void importHImpalaEntities(ImpalaLineageHook impalaLineageHook,
String name, String walfile) {
+ List<String> lineageList = new ArrayList<>();
- try {
- File lineageFile = new File(name); //use current file length to minus
the offset
- File walFile = new File(walfile);
+ try {
+ File lineageFile = new File(name); //use current file length to
minus the offset
+ File walFile = new File(walfile);
// if the wal file does not exist, create one with 0 byte read, else,
read the number
- if(!walFile.exists()) {
- BufferedWriter writer = new BufferedWriter(new FileWriter(walfile));
- writer.write("0, " + name);
- writer.close();
- }
+ if (!walFile.exists()) {
+ BufferedWriter writer = new BufferedWriter(new
FileWriter(walfile));
+ writer.write("0, " + name);
+ writer.close();
+ }
- LOG.debug("Reading: " + name);
- String lineageRecord = FileUtils.readFileToString(lineageFile, "UTF-8");
+ LOG.debug("Reading: " + name);
+ String lineageRecord = FileUtils.readFileToString(lineageFile,
"UTF-8");
- lineageList.add(lineageRecord);
+ lineageList.add(lineageRecord);
// call instance of ImpalaLineageHook to process the list of Impala
lineage record
- if(processImpalaLineageHook(impalaLineageHook, lineageList)) {
+ if (processImpalaLineageHook(impalaLineageHook, lineageList)) {
// write how many bytes the current file is to the wal file
- FileWriter newWalFile = new FileWriter(walfile, true);
- BufferedWriter newWalFileBuf = new BufferedWriter(newWalFile);
- newWalFileBuf.newLine();
- newWalFileBuf.write(String.valueOf(lineageFile.length()) + "," + name);
-
- newWalFileBuf.close();
- newWalFile.close();
- } else {
- LOG.error("Error sending some of impala lineage records to
ImpalaHook");
- }
- } catch (Exception e) {
- LOG.error("Error in processing lineage records. Exception: " +
e.getMessage());
+ FileWriter newWalFile = new FileWriter(walfile, true);
+ BufferedWriter newWalFileBuf = new BufferedWriter(newWalFile);
+ newWalFileBuf.newLine();
+ newWalFileBuf.write(String.valueOf(lineageFile.length()) + ","
+ name);
+
+ newWalFileBuf.close();
+ newWalFile.close();
+ } else {
+ LOG.error("Error sending some of impala lineage records to
ImpalaHook");
+ }
+ } catch (Exception e) {
+ LOG.error("Error in processing lineage records. Exception: " +
e.getMessage());
+ }
}
- }
-
-}
\ No newline at end of file
+}
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
index 51b2f832e..e29f814ad 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/AtlasImpalaHookContext.java
@@ -18,9 +18,6 @@
package org.apache.atlas.impala.hook;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.atlas.impala.model.ImpalaOperationType;
import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.impala.model.LineageVertex;
@@ -28,10 +25,13 @@ import org.apache.atlas.impala.model.LineageVertexMetadata;
import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.commons.lang.StringUtils;
-
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
/**
* Contain the info related to an linear record from Impala
*/
+
public class AtlasImpalaHookContext {
public static final char QNAME_SEP_METADATA_NAMESPACE = '@';
public static final char QNAME_SEP_ENTITY_NAME = '.';
@@ -43,17 +43,19 @@ public class AtlasImpalaHookContext {
private final Map<String, AtlasEntity> qNameEntityMap = new HashMap<>();
public AtlasImpalaHookContext(ImpalaLineageHook hook, ImpalaOperationType
operationType,
- ImpalaQuery lineageQuery) throws Exception {
+ ImpalaQuery lineageQuery) throws Exception {
this.hook = hook;
this.impalaOperation = operationType;
this.lineageQuery = lineageQuery;
-
}
public ImpalaQuery getLineageQuery() {
return lineageQuery;
}
- public String getQueryStr() { return lineageQuery.getQueryText(); }
+
+ public String getQueryStr() {
+ return lineageQuery.getQueryText();
+ }
public ImpalaOperationType getImpalaOperationType() {
return impalaOperation;
@@ -67,7 +69,9 @@ public class AtlasImpalaHookContext {
return qNameEntityMap.get(qualifiedName);
}
- public Collection<AtlasEntity> getEntities() { return
qNameEntityMap.values(); }
+ public Collection<AtlasEntity> getEntities() {
+ return qNameEntityMap.values();
+ }
public String getMetadataNamespace() {
return hook.getMetadataNamespace();
@@ -96,7 +100,7 @@ public class AtlasImpalaHookContext {
throw new IllegalArgumentException(fullTableName + " does not
contain database name");
}
- return getQualifiedNameForTable(fullTableName.substring(0, sepPos),
fullTableName.substring(sepPos+1));
+ return getQualifiedNameForTable(fullTableName.substring(0, sepPos),
fullTableName.substring(sepPos + 1));
}
public String getQualifiedNameForTable(String dbName, String tableName) {
@@ -131,12 +135,12 @@ public class AtlasImpalaHookContext {
int sepPosLast = columnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
if (isSeparatorIndexValid(sepPosLast)) {
- columnName = columnName.substring(sepPosLast+1);
+ columnName = columnName.substring(sepPosLast + 1);
}
return getQualifiedNameForColumn(
fullTableName.substring(0, sepPos),
- fullTableName.substring(sepPos+1),
+ fullTableName.substring(sepPos + 1),
columnName);
}
@@ -149,7 +153,7 @@ public class AtlasImpalaHookContext {
int sepPosLast = fullColumnName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
if (!isSeparatorIndexValid(sepPosFirst) ||
!isSeparatorIndexValid(sepPosLast) ||
- sepPosFirst == sepPosLast) {
+ sepPosFirst == sepPosLast) {
throw new IllegalArgumentException(
String.format("fullColumnName {} does not contain database
name or table name",
fullColumnName));
@@ -157,8 +161,8 @@ public class AtlasImpalaHookContext {
return getQualifiedNameForColumn(
fullColumnName.substring(0, sepPosFirst),
- fullColumnName.substring(sepPosFirst+1, sepPosLast),
- fullColumnName.substring(sepPosLast+1));
+ fullColumnName.substring(sepPosFirst + 1, sepPosLast),
+ fullColumnName.substring(sepPosLast + 1));
}
public String getColumnNameOnly(String fullColumnName) throws
IllegalArgumentException {
@@ -172,7 +176,7 @@ public class AtlasImpalaHookContext {
return fullColumnName;
}
- return fullColumnName.substring(sepPosLast+1);
+ return fullColumnName.substring(sepPosLast + 1);
}
public String getQualifiedNameForColumn(String dbName, String tableName,
String columnName) {
@@ -181,7 +185,9 @@ public class AtlasImpalaHookContext {
columnName + QNAME_SEP_METADATA_NAMESPACE).toLowerCase() +
getMetadataNamespace();
}
- public String getUserName() { return lineageQuery.getUser(); }
+ public String getUserName() {
+ return lineageQuery.getUser();
+ }
public String getDatabaseNameFromTable(String fullTableName) {
int sepPos = fullTableName.lastIndexOf(QNAME_SEP_ENTITY_NAME);
@@ -209,5 +215,4 @@ public class AtlasImpalaHookContext {
public boolean isSeparatorIndexValid(int index) {
return index > 0;
}
-
}
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java
index 33e44f729..9be87d62b 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaIdentifierParser.java
@@ -18,11 +18,11 @@
package org.apache.atlas.impala.hook;
+import org.apache.commons.lang.StringUtils;
+
import java.util.Arrays;
import java.util.HashSet;
-
import java.util.Set;
-import org.apache.commons.lang.StringUtils;
/**
* Check if a string is a valid Impala table identifier.
@@ -35,6 +35,10 @@ public class ImpalaIdentifierParser {
// add "." to allow <dbName>.<tableName>
public static final String VALID_IMPALA_IDENTIFIER_REGEX =
"^[a-zA-Z][a-zA-Z0-9_.]{0,127}$";
+ private ImpalaIdentifierParser() {
+ throw new UnsupportedOperationException("ImpalaIdentifierParser");
+ }
+
public static boolean isTableNameValid(String inTableName) {
if (StringUtils.isEmpty(inTableName)) {
return false;
@@ -74,7 +78,6 @@ public class ImpalaIdentifierParser {
// keywords.
static Set<String> reservedWords;
-
public static void init() {
// initilize keywords
keywordMap = new HashSet<>();
@@ -312,7 +315,6 @@ public class ImpalaIdentifierParser {
tokenIdMap.add("COMMENTED_PLAN_HINT_END");
tokenIdMap.add("Unexpected character");
-
// For impala 3.0, reserved words = keywords + sql16ReservedWords -
builtinFunctions
// - whitelist
// unused reserved words = reserved words - keywords. These words are
reserved for
@@ -320,50 +322,50 @@ public class ImpalaIdentifierParser {
reservedWords = new HashSet<>(keywordMap);
// Add SQL:2016 reserved words
reservedWords.addAll(Arrays.asList(new String[] {
- "abs", "acos", "allocate", "any", "are", "array_agg",
"array_max_cardinality",
- "asensitive", "asin", "asymmetric", "at", "atan", "atomic", "avg",
"begin",
- "begin_frame", "begin_partition", "blob", "both", "call",
"called", "cardinality",
- "cascaded", "ceil", "ceiling", "char_length", "character",
"character_length",
- "check", "classifier", "clob", "close", "coalesce", "collate",
"collect",
- "commit", "condition", "connect", "constraint", "contains",
"convert", "copy",
- "corr", "corresponding", "cos", "cosh", "count", "covar_pop",
"covar_samp",
- "cube", "cume_dist", "current_catalog", "current_date",
- "current_default_transform_group", "current_path", "current_path",
"current_role",
- "current_role", "current_row", "current_schema", "current_time",
- "current_timestamp", "current_transform_group_for_type",
"current_user", "cursor",
- "cycle", "day", "deallocate", "dec", "decfloat", "declare",
"define",
- "dense_rank", "deref", "deterministic", "disconnect", "dynamic",
"each",
- "element", "empty", "end-exec", "end_frame", "end_partition",
"equals", "escape",
- "every", "except", "exec", "execute", "exp", "extract", "fetch",
"filter",
- "first_value", "floor", "foreign", "frame_row", "free", "fusion",
"get", "global",
- "grouping", "groups", "hold", "hour", "identity", "indicator",
"initial", "inout",
- "insensitive", "integer", "intersect", "intersection",
"json_array",
- "json_arrayagg", "json_exists", "json_object", "json_objectagg",
"json_query",
- "json_table", "json_table_primitive", "json_value", "lag",
"language", "large",
- "last_value", "lateral", "lead", "leading", "like_regex",
"listagg", "ln",
- "local", "localtime", "localtimestamp", "log", "log10 ", "lower",
"match",
- "match_number", "match_recognize", "matches", "max", "member",
"merge", "method",
- "min", "minute", "mod", "modifies", "module", "month", "multiset",
"national",
- "natural", "nchar", "nclob", "new", "no", "none", "normalize",
"nth_value",
- "ntile", "nullif", "numeric", "occurrences_regex", "octet_length",
"of", "old",
- "omit", "one", "only", "open", "out", "overlaps", "overlay",
"parameter",
- "pattern", "per", "percent", "percent_rank", "percentile_cont",
"percentile_disc",
- "period", "portion", "position", "position_regex", "power",
"precedes",
- "precision", "prepare", "procedure", "ptf", "rank", "reads",
"real", "recursive",
- "ref", "references", "referencing", "regr_avgx", "regr_avgy",
"regr_count",
- "regr_intercept", "regr_r2", "regr_slope", "regr_sxx", "regr_sxy",
"regr_syy",
- "release", "result", "return", "rollback", "rollup", "row_number",
"running",
- "savepoint", "scope", "scroll", "search", "second", "seek",
"sensitive",
- "session_user", "similar", "sin", "sinh", "skip", "some",
"specific",
- "specifictype", "sql", "sqlexception", "sqlstate", "sqlwarning",
"sqrt", "start",
- "static", "stddev_pop", "stddev_samp", "submultiset", "subset",
"substring",
- "substring_regex", "succeeds", "sum", "symmetric", "system",
"system_time",
- "system_user", "tan", "tanh", "time", "timezone_hour",
"timezone_minute",
- "trailing", "translate", "translate_regex", "translation",
"treat", "trigger",
- "trim", "trim_array", "uescape", "unique", "unknown", "unnest",
"update ",
- "upper", "user", "value", "value_of", "var_pop", "var_samp",
"varbinary",
- "varying", "versioning", "whenever", "width_bucket", "window",
"within",
- "without", "year"}));
+ "abs", "acos", "allocate", "any", "are", "array_agg",
"array_max_cardinality",
+ "asensitive", "asin", "asymmetric", "at", "atan", "atomic",
"avg", "begin",
+ "begin_frame", "begin_partition", "blob", "both", "call",
"called", "cardinality",
+ "cascaded", "ceil", "ceiling", "char_length", "character",
"character_length",
+ "check", "classifier", "clob", "close", "coalesce", "collate",
"collect",
+ "commit", "condition", "connect", "constraint", "contains",
"convert", "copy",
+ "corr", "corresponding", "cos", "cosh", "count", "covar_pop",
"covar_samp",
+ "cube", "cume_dist", "current_catalog", "current_date",
+ "current_default_transform_group", "current_path",
"current_path", "current_role",
+ "current_role", "current_row", "current_schema",
"current_time",
+ "current_timestamp", "current_transform_group_for_type",
"current_user", "cursor",
+ "cycle", "day", "deallocate", "dec", "decfloat", "declare",
"define",
+ "dense_rank", "deref", "deterministic", "disconnect",
"dynamic", "each",
+ "element", "empty", "end-exec", "end_frame", "end_partition",
"equals", "escape",
+ "every", "except", "exec", "execute", "exp", "extract",
"fetch", "filter",
+ "first_value", "floor", "foreign", "frame_row", "free",
"fusion", "get", "global",
+ "grouping", "groups", "hold", "hour", "identity", "indicator",
"initial", "inout",
+ "insensitive", "integer", "intersect", "intersection",
"json_array",
+ "json_arrayagg", "json_exists", "json_object",
"json_objectagg", "json_query",
+ "json_table", "json_table_primitive", "json_value", "lag",
"language", "large",
+ "last_value", "lateral", "lead", "leading", "like_regex",
"listagg", "ln",
+ "local", "localtime", "localtimestamp", "log", "log10 ",
"lower", "match",
+ "match_number", "match_recognize", "matches", "max", "member",
"merge", "method",
+ "min", "minute", "mod", "modifies", "module", "month",
"multiset", "national",
+ "natural", "nchar", "nclob", "new", "no", "none", "normalize",
"nth_value",
+ "ntile", "nullif", "numeric", "occurrences_regex",
"octet_length", "of", "old",
+ "omit", "one", "only", "open", "out", "overlaps", "overlay",
"parameter",
+ "pattern", "per", "percent", "percent_rank",
"percentile_cont", "percentile_disc",
+ "period", "portion", "position", "position_regex", "power",
"precedes",
+ "precision", "prepare", "procedure", "ptf", "rank", "reads",
"real", "recursive",
+ "ref", "references", "referencing", "regr_avgx", "regr_avgy",
"regr_count",
+ "regr_intercept", "regr_r2", "regr_slope", "regr_sxx",
"regr_sxy", "regr_syy",
+ "release", "result", "return", "rollback", "rollup",
"row_number", "running",
+ "savepoint", "scope", "scroll", "search", "second", "seek",
"sensitive",
+ "session_user", "similar", "sin", "sinh", "skip", "some",
"specific",
+ "specifictype", "sql", "sqlexception", "sqlstate",
"sqlwarning", "sqrt", "start",
+ "static", "stddev_pop", "stddev_samp", "submultiset",
"subset", "substring",
+ "substring_regex", "succeeds", "sum", "symmetric", "system",
"system_time",
+ "system_user", "tan", "tanh", "time", "timezone_hour",
"timezone_minute",
+ "trailing", "translate", "translate_regex", "translation",
"treat", "trigger",
+ "trim", "trim_array", "uescape", "unique", "unknown",
"unnest", "update ",
+ "upper", "user", "value", "value_of", "var_pop", "var_samp",
"varbinary",
+ "varying", "versioning", "whenever", "width_bucket", "window",
"within",
+ "without", "year"}));
// TODO: Remove impala builtin function names. Need to find content of
// BuiltinsDb.getInstance().getAllFunctions()
//reservedWords.removeAll(BuiltinsDb.getInstance().getAllFunctions().keySet());
@@ -371,11 +373,11 @@ public class ImpalaIdentifierParser {
// Remove whitelist words. These words might be heavily used in
production, and
// impala is unlikely to implement SQL features around these words in
the near future.
reservedWords.removeAll(Arrays.asList(new String[] {
- // time units
- "year", "month", "day", "hour", "minute", "second",
- "begin", "call", "check", "classifier", "close", "identity",
"language",
- "localtime", "member", "module", "new", "nullif", "old", "open",
"parameter",
- "period", "result", "return", "sql", "start", "system", "time",
"user", "value"
+ // time units
+ "year", "month", "day", "hour", "minute", "second",
+ "begin", "call", "check", "classifier", "close", "identity",
"language",
+ "localtime", "member", "module", "new", "nullif", "old",
"open", "parameter",
+ "period", "result", "return", "sql", "start", "system",
"time", "user", "value"
}));
}
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
index 023e2bb4a..5798aa5ad 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaLineageHook.java
@@ -18,10 +18,7 @@
package org.apache.atlas.impala.hook;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import com.google.common.collect.Sets;
-import java.io.IOException;
import org.apache.atlas.hook.AtlasHook;
import org.apache.atlas.impala.hook.events.BaseImpalaEvent;
import org.apache.atlas.impala.hook.events.CreateImpalaProcess;
@@ -32,8 +29,13 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import javax.security.auth.Subject;
import javax.security.auth.kerberos.KerberosPrincipal;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.HashSet;
import static org.apache.atlas.repository.Constants.IMPALA_SOURCE;
@@ -63,9 +65,7 @@ public class ImpalaLineageHook extends AtlasHook {
}
}
- public ImpalaLineageHook() {
-
- }
+ public ImpalaLineageHook() {}
public String getMessageSource() {
return IMPALA_SOURCE;
@@ -99,17 +99,17 @@ public class ImpalaLineageHook extends AtlasHook {
try {
ImpalaOperationType operationType =
ImpalaOperationParser.getImpalaOperationType(lineageQuery.getQueryText());
AtlasImpalaHookContext context =
- new AtlasImpalaHookContext(this, operationType, lineageQuery);
+ new AtlasImpalaHookContext(this, operationType,
lineageQuery);
BaseImpalaEvent event = null;
switch (operationType) {
- case CREATEVIEW:
- case CREATETABLE_AS_SELECT:
- case ALTERVIEW_AS:
- case QUERY:
- case QUERY_WITH_CLAUSE:
- event = new CreateImpalaProcess(context);
- break;
+ case CREATEVIEW:
+ case CREATETABLE_AS_SELECT:
+ case ALTERVIEW_AS:
+ case QUERY:
+ case QUERY_WITH_CLAUSE:
+ event = new CreateImpalaProcess(context);
+ break;
default:
if (LOG.isDebugEnabled()) {
LOG.debug("HiveHook.run({}): operation ignored",
lineageQuery.getQueryText());
@@ -125,9 +125,8 @@ public class ImpalaLineageHook extends AtlasHook {
super.notifyEntities(event.getNotificationMessages(), ugi);
}
} catch (Throwable t) {
-
LOG.error("ImpalaLineageHook.process(): failed to process query
{}",
- AtlasType.toJson(lineageQuery), t);
+ AtlasType.toJson(lineageQuery), t);
}
if (LOG.isDebugEnabled()) {
@@ -140,9 +139,9 @@ public class ImpalaLineageHook extends AtlasHook {
}
private UserGroupInformation getUgiFromUserName(String userName) throws
IOException {
- String userPrincipal = userName.contains(REALM_SEPARATOR)? userName :
userName + "@" + getRealm();
+ String userPrincipal = userName.contains(REALM_SEPARATOR) ? userName :
userName + "@" + getRealm();
Subject userSubject = new Subject(false, Sets.newHashSet(
- new KerberosPrincipal(userPrincipal)), new HashSet<Object>(),new
HashSet<Object>());
+ new KerberosPrincipal(userPrincipal)), new HashSet<Object>(),
new HashSet<Object>());
return UserGroupInformation.getUGIFromSubject(userSubject);
}
@@ -153,4 +152,4 @@ public class ImpalaLineageHook extends AtlasHook {
public boolean isConvertHdfsPathToLowerCase() {
return convertHdfsPathToLowerCase;
}
-}
\ No newline at end of file
+}
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaOperationParser.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaOperationParser.java
index fbc57b698..1fece6de9 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaOperationParser.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/ImpalaOperationParser.java
@@ -20,13 +20,13 @@ package org.apache.atlas.impala.hook;
import org.apache.atlas.impala.model.ImpalaOperationType;
import org.apache.commons.lang.StringUtils;
+
import java.util.regex.Pattern;
/**
* Parse an Impala query text and output the impala operation type
*/
public class ImpalaOperationParser {
-
private static final Pattern COMMENT_PATTERN =
Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL);
private static final Pattern CREATE_VIEW_PATTERN =
@@ -43,9 +43,8 @@ public class ImpalaOperationParser {
private static final Pattern WITH_CLAUSE_INSERT_SELECT_FROM_PATTERN =
Pattern.compile("^[
]*(\\bwith\\b.*)?\\s*\\binsert\\b.*\\b(into|overwrite)\\b.*\\bselect\\b.*\\bfrom\\b.*",
Pattern.DOTALL | Pattern.CASE_INSENSITIVE);
-
- public ImpalaOperationParser() {
- }
+
+ private ImpalaOperationParser() {}
public static ImpalaOperationType getImpalaOperationType(String queryText)
{
// Impala does no generate lineage record for command "LOAD DATA IN
PATH"
@@ -60,7 +59,6 @@ public class ImpalaOperationParser {
return ImpalaOperationType.QUERY;
} else if (doesMatch(queryTextWithNoComments,
WITH_CLAUSE_INSERT_SELECT_FROM_PATTERN)) {
return ImpalaOperationType.QUERY_WITH_CLAUSE;
-
}
return ImpalaOperationType.UNKNOWN;
@@ -81,5 +79,4 @@ public class ImpalaOperationParser {
private static boolean doesMatch(final String queryText, final Pattern
pattern) {
return pattern.matcher(queryText).matches();
}
-
-}
\ No newline at end of file
+}
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
index 82e126abe..d6c1c8575 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/BaseImpalaEvent.java
@@ -18,18 +18,6 @@
package org.apache.atlas.impala.hook.events;
-import static
org.apache.atlas.impala.hook.AtlasImpalaHookContext.QNAME_SEP_PROCESS;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.hook.ImpalaOperationParser;
import org.apache.atlas.impala.model.ImpalaDataType;
@@ -45,12 +33,23 @@ import
org.apache.atlas.model.instance.AtlasEntity.AtlasEntityWithExtInfo;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.atlas.model.notification.HookNotification;
import org.apache.atlas.type.AtlasTypeUtil;
-
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static
org.apache.atlas.impala.hook.AtlasImpalaHookContext.QNAME_SEP_PROCESS;
+
/**
* The base class for generating notification event to Atlas server
* Most code is copied from BaseHiveEvent to avoid depending on
org.apache.atlas.hive.hook
@@ -99,7 +98,6 @@ public abstract class BaseImpalaEvent {
protected final Map<Long, LineageVertex> verticesMap;
public BaseImpalaEvent(AtlasImpalaHookContext context) {
-
this.context = context;
vertexNameMap = new HashMap<>();
verticesMap = new HashMap<>();
@@ -111,7 +109,9 @@ public abstract class BaseImpalaEvent {
public abstract List<HookNotification> getNotificationMessages() throws
Exception;
- public String getUserName() { return context.getUserName(); }
+ public String getUserName() {
+ return context.getUserName();
+ }
public String getTableNameFromVertex(LineageVertex vertex) {
if (vertex.getVertexType() == ImpalaVertexType.COLUMN) {
@@ -130,7 +130,6 @@ public abstract class BaseImpalaEvent {
}
public String getQualifiedName(ImpalaNode node) throws
IllegalArgumentException {
-
return getQualifiedName(node.getOwnVertex());
}
@@ -172,8 +171,8 @@ public abstract class BaseImpalaEvent {
static final class AtlasEntityComparator implements
Comparator<AtlasEntity> {
@Override
public int compare(AtlasEntity entity1, AtlasEntity entity2) {
- String name1 =
(String)entity1.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
- String name2 =
(String)entity2.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+ String name1 = (String)
entity1.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+ String name2 = (String)
entity2.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
if (name1 == null) {
return -1;
@@ -193,17 +192,17 @@ public abstract class BaseImpalaEvent {
ImpalaOperationType operation = context.getImpalaOperationType();
if (operation == ImpalaOperationType.CREATEVIEW ||
- operation == ImpalaOperationType.CREATETABLE_AS_SELECT ||
- operation == ImpalaOperationType.ALTERVIEW_AS) {
+ operation == ImpalaOperationType.CREATETABLE_AS_SELECT ||
+ operation == ImpalaOperationType.ALTERVIEW_AS) {
List<? extends AtlasEntity> sortedEntities = new
ArrayList<>(outputs);
Collections.sort(sortedEntities, entityComparator);
for (AtlasEntity entity : sortedEntities) {
if (entity.getTypeName().equalsIgnoreCase(HIVE_TYPE_TABLE)) {
- Long createTime =
(Long)entity.getAttribute(ATTRIBUTE_CREATE_TIME);
+ Long createTime = (Long)
entity.getAttribute(ATTRIBUTE_CREATE_TIME);
- return
(String)entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS +
createTime;
+ return (String)
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME) + QNAME_SEP_PROCESS + createTime;
}
}
}
@@ -228,7 +227,6 @@ public abstract class BaseImpalaEvent {
qualifiedName = sb.toString();
}
-
return qualifiedName;
}
@@ -249,10 +247,10 @@ public abstract class BaseImpalaEvent {
String qualifiedName = null;
long createTime = 0;
- qualifiedName =
(String)entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+ qualifiedName = (String)
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
if (entity.getTypeName().equalsIgnoreCase(HIVE_TYPE_TABLE)) {
- Long createTimeObj =
(Long)entity.getAttribute(ATTRIBUTE_CREATE_TIME);
+ Long createTimeObj = (Long)
entity.getAttribute(ATTRIBUTE_CREATE_TIME);
if (createTimeObj != null) {
createTime = createTimeObj;
}
@@ -266,17 +264,17 @@ public abstract class BaseImpalaEvent {
boolean addWriteType = false;
ImpalaOperationType subType =
ImpalaOperationParser.getImpalaOperationSubType(operation, queryText);
- switch (subType) {
- // Impala does not generate lineage for UPDATE and
DELETE
- case INSERT:
- case INSERT_OVERWRITE:
- addWriteType = true;
- break;
- }
-
- if (addWriteType) {
-
processQualifiedName.append(QNAME_SEP_PROCESS).append(subType.name());
- }
+ switch (subType) {
+ // Impala does not generate lineage for UPDATE and DELETE
+ case INSERT:
+ case INSERT_OVERWRITE:
+ addWriteType = true;
+ break;
+ }
+
+ if (addWriteType) {
+
processQualifiedName.append(QNAME_SEP_PROCESS).append(subType.name());
+ }
}
processQualifiedName.append(QNAME_SEP_PROCESS).append(qualifiedName.toLowerCase().replaceAll("/",
""));
@@ -296,7 +294,7 @@ public abstract class BaseImpalaEvent {
case DFS_DIR: {
ret = toAtlasEntity(node, entityExtInfo);
}
- break;
+ break;
}
return ret;
@@ -436,10 +434,8 @@ public abstract class BaseImpalaEvent {
}
}
}
-
ret.setAttribute(ATTRIBUTE_COLUMNS, getObjectIds(columns));
-
context.putEntity(tblQualifiedName, ret);
return ret;
@@ -448,7 +444,7 @@ public abstract class BaseImpalaEvent {
public static AtlasObjectId getObjectId(AtlasEntity entity) {
String qualifiedName = (String)
entity.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
AtlasObjectId ret = new AtlasObjectId(entity.getGuid(),
entity.getTypeName(), Collections
- .singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
+ .singletonMap(ATTRIBUTE_QUALIFIED_NAME, qualifiedName));
return ret;
}
diff --git
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java
index 5e6ea5a55..ac2347011 100644
---
a/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java
+++
b/addons/impala-bridge/src/main/java/org/apache/atlas/impala/hook/events/CreateImpalaProcess.java
@@ -18,20 +18,13 @@
package org.apache.atlas.impala.hook.events;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
import org.apache.atlas.impala.hook.AtlasImpalaHookContext;
import org.apache.atlas.impala.model.ImpalaDataType;
import org.apache.atlas.impala.model.ImpalaDependencyType;
import org.apache.atlas.impala.model.ImpalaNode;
+import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.impala.model.ImpalaVertexType;
import org.apache.atlas.impala.model.LineageEdge;
-import org.apache.atlas.impala.model.ImpalaQuery;
import org.apache.atlas.impala.model.LineageVertex;
import org.apache.atlas.impala.model.LineageVertexMetadata;
import org.apache.atlas.model.instance.AtlasEntity;
@@ -42,6 +35,14 @@ import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
public class CreateImpalaProcess extends BaseImpalaEvent {
private static final Logger LOG =
LoggerFactory.getLogger(CreateImpalaProcess.class);
@@ -117,10 +118,10 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
if (!inputs.isEmpty() || !outputs.isEmpty()) {
AtlasEntity process = getImpalaProcessEntity(inputs, outputs);
- if (process!= null) {
+ if (process != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("get process entity with qualifiedName: {}",
- process.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+
process.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
ret.addEntity(process);
@@ -129,7 +130,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
if (processExecution != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("get process executition entity with
qualifiedName: {}",
-
processExecution.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
+
processExecution.getAttribute(ATTRIBUTE_QUALIFIED_NAME));
}
ret.addEntity(processExecution);
@@ -142,8 +143,6 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
} else {
ret = null;
}
-
-
return ret;
}
@@ -158,7 +157,6 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
final Set<String> processedOutputCols = new HashSet<>();
for (LineageEdge edge : edges) {
-
if (!edge.getEdgeType().equals(ImpalaDependencyType.PROJECTION)) {
// Impala dependency type can only be predicate or projection.
// Impala predicate dependency: This is a dependency between a
set of target
@@ -176,7 +174,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
if (LOG.isDebugEnabled()) {
LOG.debug("processColumnLineage(): target id = {}, target
column name = {}",
- targetId, outputColName);
+ targetId, outputColName);
}
if (outputColumn == null) {
@@ -215,8 +213,8 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
AtlasEntity columnLineageProcess = new
AtlasEntity(ImpalaDataType.IMPALA_COLUMN_LINEAGE.getName());
- String columnQualifiedName =
(String)impalaProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) +
- AtlasImpalaHookContext.QNAME_SEP_PROCESS +
outputColumns.get(0).getAttribute(ATTRIBUTE_NAME);
+ String columnQualifiedName = (String)
impalaProcess.getAttribute(ATTRIBUTE_QUALIFIED_NAME) +
+ AtlasImpalaHookContext.QNAME_SEP_PROCESS +
outputColumns.get(0).getAttribute(ATTRIBUTE_NAME);
columnLineageProcess.setAttribute(ATTRIBUTE_NAME,
columnQualifiedName);
columnLineageProcess.setAttribute(ATTRIBUTE_QUALIFIED_NAME,
columnQualifiedName);
columnLineageProcess.setAttribute(ATTRIBUTE_INPUTS,
getObjectIds(inputColumns));
@@ -240,7 +238,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
}
for (AtlasEntity columnLineage : columnLineages) {
- String columnQualifiedName =
(String)columnLineage.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
+ String columnQualifiedName = (String)
columnLineage.getAttribute(ATTRIBUTE_QUALIFIED_NAME);
if (LOG.isDebugEnabled()) {
LOG.debug("get column lineage entity with qualifiedName: {}",
columnQualifiedName);
}
@@ -253,9 +251,9 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
// Then organize the vertices into hierarchical structure: put all column
vertices of a table
// as children of a ImpalaNode representing that table.
private void getInputOutList(ImpalaQuery lineageQuery, List<ImpalaNode>
inputNodes,
- List<ImpalaNode> outputNodes) {
- // get vertex map with key being its id and
- // ImpalaNode map with its own vertex's vertexId as its key
+ List<ImpalaNode> outputNodes) {
+ // get vertex map with key being its id and
+ // ImpalaNode map with its own vertex's vertexId as its key
for (LineageVertex vertex : lineageQuery.getVertices()) {
updateVertexMap(vertex);
}
@@ -308,7 +306,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
* @return the table name to ImpalaNode map, whose table node contains its
columns
*/
private Map<String, ImpalaNode> buildInputOutputList(Set<Long> idSet,
Map<Long, LineageVertex> vertexMap,
- Map<String, ImpalaNode> vertexNameMap) {
+ Map<String, ImpalaNode> vertexNameMap) {
Map<String, ImpalaNode> returnTableMap = new HashMap<>();
for (Long id : idSet) {
@@ -323,7 +321,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
String tableName = getTableNameFromVertex(vertex);
if (tableName == null) {
LOG.warn("cannot find tableName for vertex with id: {},
column name : {}",
- id, vertex.getVertexId() == null? "null" :
vertex.getVertexId());
+ id, vertex.getVertexId() == null ? "null" :
vertex.getVertexId());
continue;
}
@@ -335,7 +333,7 @@ public class CreateImpalaProcess extends BaseImpalaEvent {
if (tableNode == null) {
LOG.warn("cannot find table node for vertex with id:
{}, column name : {}",
- id, vertex.getVertexId());
+ id, vertex.getVertexId());
tableNode = createTableNode(tableName,
getCreateTimeInVertex(null));
vertexNameMap.put(tableName, tableNode);