tpalfy commented on a change in pull request #4721:
URL: https://github.com/apache/nifi/pull/4721#discussion_r542476852
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
##########
@@ -431,11 +482,157 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
+ } else if (BY_ADJUSTED_TIME_WINDOW.equals(listingStrategy)) {
+ listByAdjustedSlidingTimeWindow(context, session);
+
} else {
throw new ProcessException("Unknown listing strategy: " +
listingStrategy);
}
}
+ public void listByAdjustedSlidingTimeWindow(final ProcessContext context,
final ProcessSession session) throws ProcessException {
+ if (this.lastListedLatestEntryTimestampMillis == null ||
justElectedPrimaryNode) {
+ try {
+ final StateMap stateMap =
context.getStateManager().getState(getStateScope(context));
+
Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY))
+ .map(Long::parseLong)
+ .ifPresent(lastTimestamp ->
this.lastListedLatestEntryTimestampMillis = lastTimestamp);
+
+ justElectedPrimaryNode = false;
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to retrieve timestamp of last
listing from the State Manager. Will not perform listing until this is
accomplished.");
+ context.yield();
+ return;
+ }
+ }
+
+ long lowerBoundInclusiveTimestamp =
Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L);
+ long upperBoundExclusiveTimestamp;
+
+ long currentTime = getCurrentTime();
+
+ final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
+ try {
+ List<T> entityList = performListing(context,
lowerBoundInclusiveTimestamp);
+
+ boolean targetSystemHasMilliseconds = false;
+ boolean targetSystemHasSeconds = false;
+ for (final T entity : entityList) {
+ final long entityTimestampMillis = entity.getTimestamp();
+ if (!targetSystemHasMilliseconds) {
+ targetSystemHasMilliseconds = entityTimestampMillis % 1000
> 0;
+ }
+ if (!targetSystemHasSeconds) {
+ targetSystemHasSeconds = entityTimestampMillis % 60_000 >
0;
+ }
+ }
+
+ // Determine target system time precision.
+ String specifiedPrecision =
context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
+ if (StringUtils.isBlank(specifiedPrecision)) {
+ // If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by
the Processor, then specifiedPrecision can be null, instead of its default
value.
+ specifiedPrecision = getDefaultTimePrecision();
+ }
+ final TimeUnit targetSystemTimePrecision
+ =
PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
+ ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS
: targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
+ : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ?
TimeUnit.MILLISECONDS
+ : PRECISION_SECONDS.getValue().equals(specifiedPrecision)
? TimeUnit.SECONDS : TimeUnit.MINUTES;
+ final Long listingLagMillis =
LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
+
+ upperBoundExclusiveTimestamp =
getAdjustedCurrentTimestamp(context, currentTime) - listingLagMillis;
+
+ if (getLogger().isTraceEnabled()) {
+ getLogger().trace("interval: " + lowerBoundInclusiveTimestamp
+ " - " + upperBoundExclusiveTimestamp);
+ getLogger().trace("entityList: " +
entityList.stream().map(entity -> entity.getName() + "_" +
entity.getTimestamp()).collect(Collectors.joining(", ")));
+ }
+ entityList
+ .stream()
+ .filter(entity -> entity.getTimestamp() >=
lowerBoundInclusiveTimestamp)
+ .filter(entity -> entity.getTimestamp() <
upperBoundExclusiveTimestamp)
+ .forEach(entity -> orderedEntries
+ .computeIfAbsent(entity.getTimestamp(), __ -> new
ArrayList<>())
+ .add(entity)
+ );
+ if (getLogger().isTraceEnabled()) {
+ getLogger().trace("orderedEntries: " +
+ orderedEntries.values().stream()
+ .flatMap(List::stream)
+ .map(entity -> entity.getName() + "_" +
entity.getTimestamp())
+ .collect(Collectors.joining(", "))
+ );
+ }
+ } catch (final IOException e) {
+ getLogger().error("Failed to perform listing on remote host due to
{}", new Object[]{e.getMessage()}, e);
+ context.yield();
+ return;
+ }
+
+ if (orderedEntries.isEmpty()) {
+ getLogger().debug("There is no data to list. Yielding.");
+ context.yield();
+ return;
+ }
+
+ final boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
Review comment:
Same answer.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]