tpalfy commented on a change in pull request #4721:
URL: https://github.com/apache/nifi/pull/4721#discussion_r542481674
##########
File path:
nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/list/AbstractListProcessor.java
##########
@@ -431,11 +482,157 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
} else if (BY_ENTITIES.equals(listingStrategy)) {
listByTrackingEntities(context, session);
+ } else if (BY_ADJUSTED_TIME_WINDOW.equals(listingStrategy)) {
+ listByAdjustedSlidingTimeWindow(context, session);
+
} else {
throw new ProcessException("Unknown listing strategy: " +
listingStrategy);
}
}
+ public void listByAdjustedSlidingTimeWindow(final ProcessContext context,
final ProcessSession session) throws ProcessException {
+ if (this.lastListedLatestEntryTimestampMillis == null ||
justElectedPrimaryNode) {
+ try {
+ final StateMap stateMap =
context.getStateManager().getState(getStateScope(context));
+
Optional.ofNullable(stateMap.get(LATEST_LISTED_ENTRY_TIMESTAMP_KEY))
+ .map(Long::parseLong)
+ .ifPresent(lastTimestamp ->
this.lastListedLatestEntryTimestampMillis = lastTimestamp);
+
+ justElectedPrimaryNode = false;
+ } catch (final IOException ioe) {
+ getLogger().error("Failed to retrieve timestamp of last
listing from the State Manager. Will not perform listing until this is
accomplished.");
+ context.yield();
+ return;
+ }
+ }
+
+ long lowerBoundInclusiveTimestamp =
Optional.ofNullable(this.lastListedLatestEntryTimestampMillis).orElse(0L);
+ long upperBoundExclusiveTimestamp;
+
+ long currentTime = getCurrentTime();
+
+ final TreeMap<Long, List<T>> orderedEntries = new TreeMap<>();
+ try {
+ List<T> entityList = performListing(context,
lowerBoundInclusiveTimestamp);
+
+ boolean targetSystemHasMilliseconds = false;
+ boolean targetSystemHasSeconds = false;
+ for (final T entity : entityList) {
+ final long entityTimestampMillis = entity.getTimestamp();
+ if (!targetSystemHasMilliseconds) {
+ targetSystemHasMilliseconds = entityTimestampMillis % 1000
> 0;
+ }
+ if (!targetSystemHasSeconds) {
+ targetSystemHasSeconds = entityTimestampMillis % 60_000 >
0;
+ }
+ }
+
+ // Determine target system time precision.
+ String specifiedPrecision =
context.getProperty(TARGET_SYSTEM_TIMESTAMP_PRECISION).getValue();
+ if (StringUtils.isBlank(specifiedPrecision)) {
+ // If TARGET_SYSTEM_TIMESTAMP_PRECISION is not supported by
the Processor, then specifiedPrecision can be null, instead of its default
value.
+ specifiedPrecision = getDefaultTimePrecision();
+ }
+ final TimeUnit targetSystemTimePrecision
+ =
PRECISION_AUTO_DETECT.getValue().equals(specifiedPrecision)
+ ? targetSystemHasMilliseconds ? TimeUnit.MILLISECONDS
: targetSystemHasSeconds ? TimeUnit.SECONDS : TimeUnit.MINUTES
+ : PRECISION_MILLIS.getValue().equals(specifiedPrecision) ?
TimeUnit.MILLISECONDS
+ : PRECISION_SECONDS.getValue().equals(specifiedPrecision)
? TimeUnit.SECONDS : TimeUnit.MINUTES;
+ final Long listingLagMillis =
LISTING_LAG_MILLIS.get(targetSystemTimePrecision);
+
+ upperBoundExclusiveTimestamp =
getAdjustedCurrentTimestamp(context, currentTime) - listingLagMillis;
+
+ if (getLogger().isTraceEnabled()) {
+ getLogger().trace("interval: " + lowerBoundInclusiveTimestamp
+ " - " + upperBoundExclusiveTimestamp);
+ getLogger().trace("entityList: " +
entityList.stream().map(entity -> entity.getName() + "_" +
entity.getTimestamp()).collect(Collectors.joining(", ")));
+ }
+ entityList
+ .stream()
+ .filter(entity -> entity.getTimestamp() >=
lowerBoundInclusiveTimestamp)
+ .filter(entity -> entity.getTimestamp() <
upperBoundExclusiveTimestamp)
+ .forEach(entity -> orderedEntries
+ .computeIfAbsent(entity.getTimestamp(), __ -> new
ArrayList<>())
+ .add(entity)
+ );
+ if (getLogger().isTraceEnabled()) {
+ getLogger().trace("orderedEntries: " +
+ orderedEntries.values().stream()
+ .flatMap(List::stream)
+ .map(entity -> entity.getName() + "_" +
entity.getTimestamp())
+ .collect(Collectors.joining(", "))
+ );
+ }
+ } catch (final IOException e) {
+ getLogger().error("Failed to perform listing on remote host due to
{}", new Object[]{e.getMessage()}, e);
+ context.yield();
+ return;
+ }
+
+ if (orderedEntries.isEmpty()) {
+ getLogger().debug("There is no data to list. Yielding.");
+ context.yield();
+ return;
+ }
+
+ final boolean writerSet = context.getProperty(RECORD_WRITER).isSet();
+ if (writerSet) {
+ try {
+ createRecordsForEntities(context, session, orderedEntries);
+ } catch (final IOException | SchemaNotFoundException e) {
+ getLogger().error("Failed to write listing to FlowFile", e);
+ context.yield();
+ return;
+ }
+ } else {
+ createFlowFilesForEntities(context, session, orderedEntries);
+ }
+
+ try {
+ if (getLogger().isTraceEnabled()) {
+ getLogger().info("this.lastListedLatestEntryTimestampMillis =
upperBoundExclusiveTimestamp: " + this.lastListedLatestEntryTimestampMillis + "
= " + upperBoundExclusiveTimestamp);
+ }
+ this.lastListedLatestEntryTimestampMillis =
upperBoundExclusiveTimestamp;
+ persist(upperBoundExclusiveTimestamp,
upperBoundExclusiveTimestamp, latestIdentifiersProcessed,
context.getStateManager(), getStateScope(context));
+ } catch (final IOException ioe) {
+ getLogger().warn("Unable to save state due to {}. If NiFi is
restarted before state is saved, or "
+ + "if another node begins executing this Processor, data
duplication may occur.", ioe);
+ }
+ }
+
+ protected long getAdjustedCurrentTimestamp(ProcessContext context, long
currentTime) {
+ String timeAdjustmentString =
context.getProperty(TIME_ADJUSTMENT).evaluateAttributeExpressions().getValue();
+
+ long positiveOrNegative;
+ long timeAdjustment;
+
+ if (timeAdjustmentString.startsWith("-")) {
+ positiveOrNegative = -1L;
+ timeAdjustmentString = timeAdjustmentString.substring(1);
+ } else {
+ positiveOrNegative = 1L;
+ }
+
+ if (timeAdjustmentString.matches("\\d{2}:\\d{2}(:\\d{2})?")) {
Review comment:
It wouldn't work with `LocalTime.parse`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]