Github user ijokarumawak commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2162#discussion_r180055717
--- Diff:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractDatabaseFetchProcessor.java
---
@@ -291,20 +291,30 @@ public void setup(final ProcessContext context,
boolean shouldCleanCache, FlowFi
if (shouldCleanCache){
columnTypeMap.clear();
}
+
+ final List<String> maxValueColumnNameList =
Arrays.asList(maxValueColumnNames.toLowerCase().split(","));
+ final List<String> maxValueQualifiedColumnNameList =
new ArrayList<>();
+
+ for(String maxValueColumn:maxValueColumnNameList){
+ String colKey = getStateKey(tableName,
maxValueColumn.trim());
+ maxValueQualifiedColumnNameList.add(colKey);
+ }
+
for (int i = 1; i <= numCols; i++) {
String colName =
resultSetMetaData.getColumnName(i).toLowerCase();
String colKey = getStateKey(tableName, colName);
+
+ //only include columns that are part of the
maximum value tracking column list
+
if(!maxValueQualifiedColumnNameList.contains(colKey)){
+ continue;
+ }
+
int colType = resultSetMetaData.getColumnType(i);
columnTypeMap.putIfAbsent(colKey, colType);
}
- List<String> maxValueColumnNameList =
Arrays.asList(maxValueColumnNames.split(","));
-
- for(String maxValueColumn:maxValueColumnNameList){
- String colKey = getStateKey(tableName,
maxValueColumn.trim().toLowerCase());
- if(!columnTypeMap.containsKey(colKey)){
- throw new ProcessException("Column not found
in the table/query specified: " + maxValueColumn);
- }
+ if(maxValueQualifiedColumnNameList.size() > 0 &&
columnTypeMap.size() != maxValueQualifiedColumnNameList.size()){
--- End diff --
@patricker This check should be implemented as the previous commit. The
size of columnTypeMap can be different with GenerateTableFetch when it's
configured to resolve table and column names dynamically with FlowFile EL and
deals with multiple tables.
---