simonbence commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r576630600



##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -57,6 +58,7 @@
 @SeeAlso(PutSplunkHTTP.class)
 public class QuerySplunkIndexingStatus extends SplunkAPICall {
     private static final String ENDPOINT = "/services/collector/ack";
+    private static final String ACKCHECKED_ATTRIBUTE = "ack.checked";

Review comment:
       I would prefix this with "splunk." or something similar. This is because 
later processors will have not context about what kind of acknowledgement is 
this

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -194,7 +197,11 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 final EventIndexStatusResponse splunkResponse = 
unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = 
undetermined.get(result.getKey());
+                    FlowFile toTransfer = undetermined.get(result.getKey());

Review comment:
       I would go with an approach which keeps final in place. Like: introduce 
a helper method, which extracts and returns the toTransfer file from the 
undetermined map and enriches it with the attribute internally

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +241,14 @@ private String generateContent(final Map<Long, FlowFile> 
undetermined) throws IO
             return Optional.empty();
         }
     }
+
+    private static Optional<Boolean> extractBoolean(final String value) {
+        return Objects.nonNull(value) ? 
Optional.of(value).map(Boolean::valueOf) : Optional.empty();
+    }
+
+    private FlowFile setAckCheckedToTrue(final ProcessSession session, final 
FlowFile flowFile) {
+        final Map<String, String> attributes = new 
HashMap<>(flowFile.getAttributes());

Review comment:
       If you care about the HashMap size (which is a good thing :) ) add 1 to 
the flowFile.getAttributes() already, as you will extend the map. ;) 

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -194,7 +197,11 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 final EventIndexStatusResponse splunkResponse = 
unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = 
undetermined.get(result.getKey());
+                    FlowFile toTransfer = undetermined.get(result.getKey());
+                    if 
(!extractBoolean(toTransfer.getAttribute(ACKCHECKED_ATTRIBUTE)).orElse(Boolean.FALSE))
 {
+                        toTransfer = setAckCheckedToTrue(session, toTransfer);

Review comment:
       This extraction logic looks to be the same as above. It would be luckier 
to not copy it (for the case we want to change the behaviour). I would suggest 
some method which simply tells if a flow file has the flag set. The value is 
irrelevant now anyway. Something like: `private static boolean 
isAlreadyChecked(FlowFile ff)`. 

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -194,7 +197,11 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
                 final EventIndexStatusResponse splunkResponse = 
unmarshallResult(responseMessage.getContent(), EventIndexStatusResponse.class);
 
                 splunkResponse.getAcks().entrySet().forEach(result -> {
-                    final FlowFile toTransfer = 
undetermined.get(result.getKey());
+                    FlowFile toTransfer = undetermined.get(result.getKey());
+                    if 
(!extractBoolean(toTransfer.getAttribute(ACKCHECKED_ATTRIBUTE)).orElse(Boolean.FALSE))
 {
+                        toTransfer = setAckCheckedToTrue(session, toTransfer);
+                        
session.getProvenanceReporter().modifyAttributes(toTransfer, "ackChecked 
attribute has been modified to true.");

Review comment:
       I am not sure we need to store this as a provenance event (however it 
might not an issue)

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/test/java/org/apache/nifi/processors/splunk/TestQuerySplunkIndexingStatus.java
##########
@@ -121,16 +121,44 @@ public void testMoreIncomingFlowFileThanQueryLimit() 
throws Exception {
     }
 
     @Test
-    public void testTimedOutEvents() throws Exception {
+    public void testAckCheckedIsTrueAndFlowFileWithTimedOutEvents() throws 
Exception {
         // when
-        testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(2)));
+        testRunner.enqueue(givenFlowFile(1, System.currentTimeMillis() - 
TimeUnit.HOURS.toMillis(2), true));
         testRunner.run();
 
         // then
         Mockito.verify(service, Mockito.never()).send(Mockito.anyString(), 
Mockito.any(RequestMessage.class));
         
testRunner.assertAllFlowFilesTransferred(QuerySplunkIndexingStatus.RELATIONSHIP_UNACKNOWLEDGED,
 1);
     }
 
+    @Test
+    public void 
testAckCheckedIsFalseAndTimedOutEventFlowFileWithAcknowledgeResponse() throws 
Exception {
+        // when
+        final Map<Integer, Boolean> acks = new HashMap<>();

Review comment:
       Minor: you can make it more readable and shorter with 
Collections.singletonMap
   Also:  this should be part of the given block

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +241,14 @@ private String generateContent(final Map<Long, FlowFile> 
undetermined) throws IO
             return Optional.empty();
         }
     }
+
+    private static Optional<Boolean> extractBoolean(final String value) {
+        return Objects.nonNull(value) ? 
Optional.of(value).map(Boolean::valueOf) : Optional.empty();
+    }
+
+    private FlowFile setAckCheckedToTrue(final ProcessSession session, final 
FlowFile flowFile) {
+        final Map<String, String> attributes = new 
HashMap<>(flowFile.getAttributes());
+        attributes.put( ACKCHECKED_ATTRIBUTE, String.valueOf(Boolean.TRUE));

Review comment:
       Minor: unnecessary space after `put(`

##########
File path: 
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -162,12 +164,13 @@ public void onTrigger(final ProcessContext context, final 
ProcessSession session
         for (final FlowFile flowFile : flowFiles)  {
             final Optional<Long> sentAt = 
extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE));
             final Optional<Long> ackId = 
extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE));
+            final Optional<Boolean> ackChecked = 
extractBoolean(flowFile.getAttribute(ACKCHECKED_ATTRIBUTE));
 
             if (!sentAt.isPresent() || !ackId.isPresent()) {
                 getLogger().error("Flow file ({}) attributes {} and {} are 
expected to be set using 64-bit integer values!",
                         new Object[]{flowFile.getId(), 
SplunkAPICall.RESPONDED_AT_ATTRIBUTE, 
SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
                 session.transfer(flowFile, RELATIONSHIP_FAILURE);
-            } else if (sentAt.get() + ttl < currentTime) {
+            } else if (ackChecked.orElse(Boolean.FALSE) && sentAt.get() + ttl 
< currentTime) {

Review comment:
       For readability, in the condition I would suggest to use something 
simpler. Maybe the simplest way would be to not use optional when you define 
ackChecked, but convert it into a boolean in the first place (in other words: 
consider non-existent attribute as false, not an optional empty)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to