tpalfy commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r585783440



##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -54,9 +56,13 @@
 @ReadsAttributes({
         @ReadsAttribute(attribute = "splunk.acknowledgement.id", description = 
"The indexing acknowledgement id provided by Splunk."),
         @ReadsAttribute(attribute = "splunk.responded.at", description = "The 
time of the response of put request for Splunk.")})
+@WritesAttributes({
+        @WritesAttribute(attribute = "ack.checked.at.splunk", description = 
"Contains a boolean value representing whether Splunk acknowledgement check has 
happened. If not set considered as false.")
+})
 @SeeAlso(PutSplunkHTTP.class)
 public class QuerySplunkIndexingStatus extends SplunkAPICall {
     private static final String ENDPOINT = "/services/collector/ack";
+    private static final String ACK_CHECKED_ATTRIBUTE = 
"ack.checked.at.splunk";

Review comment:
       ```suggestion
   ```

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -162,12 +168,13 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
         for (final FlowFile flowFile : flowFiles)  {
             final Optional<Long> sentAt = 
extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE));
             final Optional<Long> ackId = 
extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE));
+            final boolean ackChecked = isAlreadyChecked(flowFile);
 
             if (!sentAt.isPresent() || !ackId.isPresent()) {
                 getLogger().error("Flow file ({}) attributes {} and {} are 
expected to be set using 64-bit integer values!",
                         new Object[]{flowFile.getId(), 
SplunkAPICall.RESPONDED_AT_ATTRIBUTE, 
SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
                 session.transfer(flowFile, RELATIONSHIP_FAILURE);
-            } else if (sentAt.get() + ttl < currentTime) {
+            } else if (ackChecked && sentAt.get() + ttl < currentTime) {
                 session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);

Review comment:
       ```suggestion
   ```

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -195,6 +202,9 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {

Review comment:
       ```suggestion
                                   
splunkResponse.getAcks().forEach((flowFileId, isAcknowledged) -> {
                       final FlowFile toTransfer = undetermined.get(flowFileId);
   
                       if (isAcknowledged) {
                           session.transfer(toTransfer, 
RELATIONSHIP_ACKNOWLEDGED);
                       } else {
                           final Long sentAt = 
extractLong(toTransfer.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE)).get();
                           if (sentAt + ttl < currentTime) {
                               session.transfer(toTransfer, 
RELATIONSHIP_UNACKNOWLEDGED);
                           } else {
                               session.penalize(toTransfer);
                               session.transfer(toTransfer, 
RELATIONSHIP_UNDETERMINED);
                           }
                       }
                   });
   ```

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +244,18 @@ private String generateContent(final Map<Long, FlowFile> 
undetermined) throws IO
             return Optional.empty();
         }
     }
+
+    private static boolean isAlreadyChecked(FlowFile flowFile) {
+        return extractBoolean(flowFile.getAttribute(ACK_CHECKED_ATTRIBUTE));
+    }
+
+    private static boolean extractBoolean(final String value) {
+        return Boolean.parseBoolean(value);
+    }
+
+    private void setAckCheckedToTrue(final ProcessSession session, final 
FlowFile flowFile) {
+        final Map<String, String> attributes = new 
HashMap<>(flowFile.getAttributes());
+        attributes.put(ACK_CHECKED_ATTRIBUTE, String.valueOf(Boolean.TRUE));
+        session.putAllAttributes(flowFile, attributes);
+    }

Review comment:
       ```suggestion
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to