simonbence commented on a change in pull request #4721:
URL: https://github.com/apache/nifi/pull/4721#discussion_r540944725
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
##########
@@ -207,6 +225,39 @@
.defaultValue(BY_TIMESTAMPS.getValue())
.build();
+ public static final PropertyDescriptor TIME_ADJUSTMENT = new Builder()
+ .name("time-adjustment")
+ .displayName("Time Adjustment")
+ .description("If the system hosting the files is in a different time
zone than NiFi, either it's timezone or the numerical difference should be set
here." +
+ " If a timezone is specified, NiFi tries to calculate the time
difference." +
+ " If a numeric value is set, its value can be either a single
integer (milliseconds) or in HH:mm/HH:mm:ss format." +
+ " EXAMPLE: NiFi is hosted in UTC, File Server is hosted in EST. In
this case 'Time Adjustment' value should be -05:00:00 or 18000000." +
Review comment:
I suggest to use file source instead of File Server as the
implementations of this abstract class possibly works with the local file
system. File Server suggests there is an additional concept not introduced in
the documentation.
Note: I get that, this property is used only in case of ListFTP and ListSFTP
and it is a necessity coming from the structure that this is here and not in
ListFileTransfer but as it is in the common place, and no remark about the this
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
##########
@@ -207,6 +225,39 @@
.defaultValue(BY_TIMESTAMPS.getValue())
.build();
+ public static final PropertyDescriptor TIME_ADJUSTMENT = new Builder()
+ .name("time-adjustment")
+ .displayName("Time Adjustment")
+ .description("If the system hosting the files is in a different time
zone than NiFi, either it's timezone or the numerical difference should be set
here." +
+ " If a timezone is specified, NiFi tries to calculate the time
difference." +
+ " If a numeric value is set, its value can be either a single
integer (milliseconds) or in HH:mm/HH:mm:ss format." +
+ " EXAMPLE: NiFi is hosted in UTC, File Server is hosted in EST. In
this case 'Time Adjustment' value should be -05:00:00 or 18000000." +
+ " If the locations were reversed i.e. NiFi is hosted in EST, File
Server is hosted in UTC, the value should be 05:00:00 or 18000000." +
+ " NOTE: Any mid-year changes (due to daylight saving for example)
requires manual re-adjustment in this case."
+ )
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(false)
+ .addValidator(new Validator() {
+ Pattern signed_integer_or_signed_HHmm_or_HHmmss =
Pattern.compile("-?(\\d{2}:\\d{2}(:\\d{2})?)|-?\\d+");
Review comment:
Minor: this could go into a constant to keep the validator code slightly
smaller and cleaner
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
##########
@@ -431,11 +482,157 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
+ } else if (BY_ADJUSTED_TIME_WINDOW.equals(listingStrategy)) {
+ listByAdjustedSlidingTimeWindow(context, session);
+
} else {
throw new ProcessException("Unknown listing strategy: " +
listingStrategy);
}
}
+ public void listByAdjustedSlidingTimeWindow(final ProcessContext context,
final ProcessSession session) throws ProcessException {
+ if (this.lastListedLatestEntryTimestampMillis == null ||
justElectedPrimaryNode) {
+ try {
+ final StateMap stateMap =
context.getStateManager().getState(getStateScope(context));
+
Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY))
+ .map(Long::parseLong)
+ .ifPresent(lastTimestamp ->
this.lastListedLatestEntryTimestampMillis = lastTimestamp);
+
+ justElectedPrimaryNode = false;
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to retrieve timestamp of last
listing from the State Manager. Will not perform listing until this is
accomplished.");
+ context.yield();
+ return;
+ }
+ }
+
+ long lowerBoundInclusiveTimestamp =
Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L);
+ long upperBoundExclusiveTimestamp;
+
+ long currentTime = getCurrentTime();
+
+ final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
+ try {
+ List<T> entityList = performListing(context,
lowerBoundInclusiveTimestamp);
+
+ boolean targetSystemHasMilliseconds = false;
+ boolean targetSystemHasSeconds = false;
+ for (final T entity : entityList) {
+ final long entityTimestampMillis = entity.getTimestamp();
+ if (!targetSystemHasMilliseconds) {
+ targetSystemHasMilliseconds = entityTimestampMillis % 1000
> 0;
+ }
+ if (!targetSystemHasSeconds) {
+ targetSystemHasSeconds = entityTimestampMillis % 60_000 >
0;
+ }
+ }
+
+ // Determine target system time precision.
+ String specifiedPrecision =
context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
+ if (StringUtils.isBlank(specifiedPrecision)) {
+ // If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by
the Processor, then specifiedPrecision can be null, instead of its default
value.
+ specifiedPrecision = getDefaultTimePrecision();
+ }
+ final TimeUnit targetSystemTimePrecision
+ =
PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
+ ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS
: targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
+ : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ?
TimeUnit.MILLISECONDS
+ : PRECISION_SECONDS.getValue().equals(specifiedPrecision)
? TimeUnit.SECONDS : TimeUnit.MINUTES;
+ final Long listingLagMillis =
LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
+
+ upperBoundExclusiveTimestamp =
getAdjustedCurrentTimestamp(context, currentTime) - listingLagMillis;
+
+ if (getLogger().isTraceEnabled()) {
+ getLogger().trace("interval: " + lowerBoundInclusiveTimestamp
+ " - " + upperBoundExclusiveTimestamp);
+ getLogger().trace("entityList: " +
entityList.stream().map(entity -> entity.getName() + "_" +
entity.getTimestamp()).collect(Collectors.joining(", ")));
+ }
+ entityList
+ .stream()
+ .filter(entity -> entity.getTimestamp() >=
lowerBoundInclusiveTimestamp)
+ .filter(entity -> entity.getTimestamp() <
upperBoundExclusiveTimestamp)
+ .forEach(entity -> orderedEntries
+ .computeIfAbsent(entity.getTimestamp(), __ -> new
ArrayList<>())
+ .add(entity)
+ );
+ if (getLogger().isTraceEnabled()) {
+ getLogger().trace("orderedEntries: " +
+ orderedEntries.values().stream()
+ .flatMap(List::stream)
+ .map(entity -> entity.getName() + "_" +
entity.getTimestamp())
+ .collect(Collectors.joining(", "))
+ );
+ }
+ } catch (final IOException e) {
+ getLogger().error("Failed to perform listing on remote host due to
{}", new Object[]{e.getMessage()}, e);
+ context.yield();
+ return;
+ }
+
+ if (orderedEntries.isEmpty()) {
+ getLogger().debug("There is no data to list. Yielding.");
+ context.yield();
+ return;
+ }
+
+ final boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
Review comment:
Same question as in case of target precision
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/list/TestAbstractListProcessor.java
##########
@@ -105,6 +111,174 @@ public void setup() {
@Rule
public final TemporaryFolder testFolder = new TemporaryFolder();
+ @Test
+ public void testGetAdjustedCurrentTimestampWith0() throws Exception {
+ // GIVEN
+ String timeAdjustment = "0";
+ long currentTime = 100;
+
+ long expected = 100;
+
+ // WHEN
+ // THEN
+ testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime);
+
+ }
+
+ @Test
+ public void testGetAdjustedCurrentTimestampWith15() throws Exception {
+ // GIVEN
+ String timeAdjustment = "15";
+ long currentTime = 100;
+
+ long expected = 100 + 15;
+
+ // WHEN
+ // THEN
+ testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime);
+
+ }
+
+ @Test
+ public void testGetAdjustedCurrentTimestampWithMinus15() throws Exception {
+ // GIVEN
+ String timeAdjustment = "-15";
+ long currentTime = 100;
+
+ long expected = 100 - 15;
+
+ // WHEN
+ // THEN
+ testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime);
+
+ }
+
+ @Test
+ public void testGetAdjustedCurrentTimestampWithMinus_01_02() throws
Exception {
+ // GIVEN
+ String timeAdjustment = "-01:02";
+ long currentTime = 100;
+
+ long expected = 100
+ - 1 * 60 * 60 * 1000
+ - 2 * 60 * 1000;
+
+ // WHEN
+ // THEN
+ testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime);
+ }
+
+ @Test
+ public void testGetAdjustedCurrentTimestampWith_01_02() throws Exception {
+ // GIVEN
+ String timeAdjustment = "01:02";
+ long currentTime = 100;
+
+ long expected = 100
+ + 1 * 60 * 60 * 1000
+ + 2 * 60 * 1000;
+
+ // WHEN
+ // THEN
+ testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime);
+ }
+
+ @Test
+ public void testGetAdjustedCurrentTimestampWith_01_02_34() throws
Exception {
+ // GIVEN
+ String timeAdjustment = "01:02:34";
+ long currentTime = 100;
+
+ long expected = 100
+ + 1 * 60 * 60 * 1000
+ + 2 * 60 * 1000
+ + 34 * 1000;
+
+ // WHEN
+ // THEN
+ testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime);
+ }
+
+ @Test
+ public void testGetAdjustedCurrentTimestampWithMinus_01_02_34() throws
Exception {
+ // GIVEN
+ String timeAdjustment = "-01:02:34";
+ long currentTime = 100;
+
+ long expected = 100
+ - 1 * 60 * 60 * 1000
+ - 2 * 60 * 1000
+ - 34 * 1000;
+
+ // WHEN
+ // THEN
+ testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime);
+ }
+
+ @Test
+ public void testGetAdjustedCurrentTimestampWithEST() throws Exception {
+ // GIVEN
+ String timeAdjustment = "EST";
+ long currentTime = System.currentTimeMillis();
+
+ TimeZone targetTimeZone = TimeZone.getTimeZone(timeAdjustment);
+ TimeZone localTimeZone = Calendar.getInstance().getTimeZone();
+ long expected = currentTime + targetTimeZone.getOffset(currentTime) -
localTimeZone.getOffset(currentTime);
+
+ // WHEN
+ // THEN
+ testGetAdjustedCurrentTimestamp(timeAdjustment, expected, currentTime);
+ }
+
+ private void testGetAdjustedCurrentTimestamp(String timeAdjustment, long
expected, final long currentTime) {
Review comment:
As there are a lot of abstract method needs to be implemented, it might
be a possibility to use Mockito.spy
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
##########
@@ -207,6 +225,39 @@
.defaultValue(BY_TIMESTAMPS.getValue())
.build();
+ public static final PropertyDescriptor TIME_ADJUSTMENT = new Builder()
Review comment:
This property appears second in the property list, before hostname,
port, etc. As this is an optional parameter used in certain cases, I think it
should be somewhere in the bottom part of the list.
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
##########
@@ -431,11 +482,157 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
+ } else if (BY_ADJUSTED_TIME_WINDOW.equals(listingStrategy)) {
+ listByAdjustedSlidingTimeWindow(context, session);
+
} else {
throw new ProcessException("Unknown listing strategy: " +
listingStrategy);
}
}
+ public void listByAdjustedSlidingTimeWindow(final ProcessContext context,
final ProcessSession session) throws ProcessException {
+ if (this.lastListedLatestEntryTimestampMillis == null ||
justElectedPrimaryNode) {
+ try {
+ final StateMap stateMap =
context.getStateManager().getState(getStateScope(context));
+
Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY))
+ .map(Long::parseLong)
+ .ifPresent(lastTimestamp ->
this.lastListedLatestEntryTimestampMillis = lastTimestamp);
+
+ justElectedPrimaryNode = false;
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to retrieve timestamp of last
listing from the State Manager. Will not perform listing until this is
accomplished.");
+ context.yield();
+ return;
+ }
+ }
+
+ long lowerBoundInclusiveTimestamp =
Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L);
+ long upperBoundExclusiveTimestamp;
+
+ long currentTime = getCurrentTime();
+
+ final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
+ try {
+ List<T> entityList = performListing(context,
lowerBoundInclusiveTimestamp);
+
+ boolean targetSystemHasMilliseconds = false;
+ boolean targetSystemHasSeconds = false;
+ for (final T entity : entityList) {
Review comment:
This bigger code block about precision of the target system seems very
similar to the according part of the listByAdjustedSlidingTimeWindow. Would
there be a chance to extract this?
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
##########
@@ -431,11 +482,157 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
+ } else if (BY_ADJUSTED_TIME_WINDOW.equals(listingStrategy)) {
+ listByAdjustedSlidingTimeWindow(context, session);
+
} else {
throw new ProcessException("Unknown listing strategy: " +
listingStrategy);
}
}
+ public void listByAdjustedSlidingTimeWindow(final ProcessContext context,
final ProcessSession session) throws ProcessException {
+ if (this.lastListedLatestEntryTimestampMillis == null ||
justElectedPrimaryNode) {
+ try {
+ final StateMap stateMap =
context.getStateManager().getState(getStateScope(context));
+
Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY))
+ .map(Long::parseLong)
+ .ifPresent(lastTimestamp ->
this.lastListedLatestEntryTimestampMillis = lastTimestamp);
+
+ justElectedPrimaryNode = false;
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to retrieve timestamp of last
listing from the State Manager. Will not perform listing until this is
accomplished.");
+ context.yield();
+ return;
+ }
+ }
+
+ long lowerBoundInclusiveTimestamp =
Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L);
+ long upperBoundExclusiveTimestamp;
+
+ long currentTime = getCurrentTime();
+
+ final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
+ try {
+ List<T> entityList = performListing(context,
lowerBoundInclusiveTimestamp);
+
+ boolean targetSystemHasMilliseconds = false;
+ boolean targetSystemHasSeconds = false;
+ for (final T entity : entityList) {
+ final long entityTimestampMillis = entity.getTimestamp();
+ if (!targetSystemHasMilliseconds) {
+ targetSystemHasMilliseconds = entityTimestampMillis % 1000
> 0;
+ }
+ if (!targetSystemHasSeconds) {
+ targetSystemHasSeconds = entityTimestampMillis % 60_000 >
0;
+ }
+ }
+
+ // Determine target system time precision.
+ String specifiedPrecision =
context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
+ if (StringUtils.isBlank(specifiedPrecision)) {
+ // If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by
the Processor, then specifiedPrecision can be null, instead of its default
value.
+ specifiedPrecision = getDefaultTimePrecision();
+ }
+ final TimeUnit targetSystemTimePrecision
+ =
PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
+ ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS
: targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
+ : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ?
TimeUnit.MILLISECONDS
+ : PRECISION_SECONDS.getValue().equals(specifiedPrecision)
? TimeUnit.SECONDS : TimeUnit.MINUTES;
+ final Long listingLagMillis =
LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
+
+ upperBoundExclusiveTimestamp =
getAdjustedCurrentTimestamp(context, currentTime) - listingLagMillis;
+
+ if (getLogger().isTraceEnabled()) {
+ getLogger().trace("interval: " + lowerBoundInclusiveTimestamp
+ " - " + upperBoundExclusiveTimestamp);
+ getLogger().trace("entityList: " +
entityList.stream().map(entity -> entity.getName() + "_" +
entity.getTimestamp()).collect(Collectors.joining(", ")));
+ }
+ entityList
+ .stream()
+ .filter(entity -> entity.getTimestamp() >=
lowerBoundInclusiveTimestamp)
+ .filter(entity -> entity.getTimestamp() <
upperBoundExclusiveTimestamp)
+ .forEach(entity -> orderedEntries
+ .computeIfAbsent(entity.getTimestamp(), __ -> new
ArrayList<>())
+ .add(entity)
+ );
+ if (getLogger().isTraceEnabled()) {
+ getLogger().trace("orderedEntries: " +
+ orderedEntries.values().stream()
+ .flatMap(List::stream)
+ .map(entity -> entity.getName() + "_" +
entity.getTimestamp())
+ .collect(Collectors.joining(", "))
+ );
+ }
+ } catch (final IOException e) {
+ getLogger().error("Failed to perform listing on remote host due to
{}", new Object[]{e.getMessage()}, e);
+ context.yield();
+ return;
+ }
+
+ if (orderedEntries.isEmpty()) {
+ getLogger().debug("There is no data to list. Yielding.");
+ context.yield();
+ return;
+ }
+
+ final boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
+ if (writerSet) {
+ try {
+ createRecordsForEntities(context, session, orderedEntries);
+ } catch (final IOException | SchemaNotFoundException e) {
+ getLogger().error("Failed to write listing to FlowFile", e);
+ context.yield();
+ return;
+ }
+ } else {
+ createFlowFilesForEntities(context, session, orderedEntries);
+ }
+
+ try {
+ if (getLogger().isTraceEnabled()) {
+ getLogger().info("this.lastListedLatestEntryTimestampMillis =
upperBoundExclusiveTimestamp: " + this.lastListedLatestEntryTimestampMillis + "
= " + upperBoundExclusiveTimestamp);
+ }
+ this.lastListedLatestEntryTimestampMillis =
upperBoundExclusiveTimestamp;
+ persist(upperBoundExclusiveTimestamp,
upperBoundExclusiveTimestamp, latestIdentifiersProcessed,
context.getStateManager(), getStateScope(context));
+ } catch (final IOException ioe) {
+ getLogger().warn("Unable to save state due to {}. If NiFi is
restarted before state is saved, or "
+ + "if another node begins executing this Processor, data
duplication may occur.", ioe);
+ }
+ }
+
+ protected long getAdjustedCurrentTimestamp(ProcessContext context, long
currentTime) {
+ String timeAdjustmentString =
context.getProperty(TIME_ADJUSTMENT).evaluateAttributeExpressions().getValue();
+
+ long positiveOrNegative;
+ long timeAdjustment;
+
+ if (timeAdjustmentString.startsWith("-")) {
+ positiveOrNegative = -1L;
+ timeAdjustmentString = timeAdjustmentString.substring(1);
+ } else {
+ positiveOrNegative = 1L;
+ }
+
+ if (timeAdjustmentString.matches("\\d{2}:\\d{2}(:\\d{2})?")) {
Review comment:
Why not consider "-" as part of the regexps? parseLong should be able to
handle the original string, and hypothetically this would allow input like
"-GMT". (It should be caught by the validator however)
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
##########
@@ -69,7 +72,10 @@
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue(".")
.build();
-
+ public static final PropertyDescriptor FILE_TANSFER_LISTING_STRATEGY = new
PropertyDescriptor.Builder()
Review comment:
Typo: FILE_TRANSFER...
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFileTransfer.java
##########
@@ -138,4 +144,19 @@ protected boolean isListingResetNecessary(final
PropertyDescriptor property) {
protected abstract FileTransfer getFileTransfer(final ProcessContext
context);
protected abstract String getProtocolName();
+
+ protected void validateAdjustedTimeWindow(ValidationContext
validationContext, Collection<ValidationResult> results) {
+ if (
+
BY_ADJUSTED_TIME_WINDOW.getValue().equals(validationContext.getProperty(LISTING_STRATEGY).getValue())
Review comment:
Would it make sense to add a negative case, e.g.: when it's not adjusted
time window, the time adjustment should _not_ be set?
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListFTP.java
##########
@@ -72,7 +72,8 @@
final PropertyDescriptor port = new
PropertyDescriptor.Builder().fromPropertyDescriptor(UNDEFAULTED_PORT).defaultValue("21").build();
final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(LISTING_STRATEGY);
+ properties.add(FILE_TANSFER_LISTING_STRATEGY);
+ properties.add(TIME_ADJUSTMENT);
Review comment:
As mentioned above: I think this should be lower in the list
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]