tpalfy commented on a change in pull request #4824:
URL: https://github.com/apache/nifi/pull/4824#discussion_r585783440
##########
File path:
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -54,9 +56,13 @@
@ReadsAttributes({
@ReadsAttribute(attribute = "splunk.acknowledgement.id", description =
"The indexing acknowledgement id provided by Splunk."),
@ReadsAttribute(attribute = "splunk.responded.at", description = "The
time of the response of put request for Splunk.")})
+@WritesAttributes({
+ @WritesAttribute(attribute = "ack.checked.at.splunk", description =
"Contains a boolean value representing whether Splunk acknowledgement check has
happened. If not set considered as false.")
+})
@SeeAlso(PutSplunkHTTP.class)
public class QuerySplunkIndexingStatus extends SplunkAPICall {
private static final String ENDPOINT = "/services/collector/ack";
+ private static final String ACK_CHECKED_ATTRIBUTE =
"ack.checked.at.splunk";
Review comment:
```suggestion
```
##########
File path:
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -162,12 +168,13 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
for (final FlowFile flowFile : flowFiles) {
final Optional<Long> sentAt =
extractLong(flowFile.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE));
final Optional<Long> ackId =
extractLong(flowFile.getAttribute(SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE));
+ final boolean ackChecked = isAlreadyChecked(flowFile);
if (!sentAt.isPresent() || !ackId.isPresent()) {
getLogger().error("Flow file ({}) attributes {} and {} are
expected to be set using 64-bit integer values!",
new Object[]{flowFile.getId(),
SplunkAPICall.RESPONDED_AT_ATTRIBUTE,
SplunkAPICall.ACKNOWLEDGEMENT_ID_ATTRIBUTE});
session.transfer(flowFile, RELATIONSHIP_FAILURE);
- } else if (sentAt.get() + ttl < currentTime) {
+ } else if (ackChecked && sentAt.get() + ttl < currentTime) {
session.transfer(flowFile, RELATIONSHIP_UNACKNOWLEDGED);
Review comment:
```suggestion
```
##########
File path:
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -195,6 +202,9 @@ public void onTrigger(final ProcessContext context, final
ProcessSession session
splunkResponse.getAcks().entrySet().forEach(result -> {
Review comment:
```suggestion
splunkResponse.getAcks().forEach((flowFileId, isAcknowledged) -> {
final FlowFile toTransfer = undetermined.get(flowFileId);
if (isAcknowledged) {
session.transfer(toTransfer,
RELATIONSHIP_ACKNOWLEDGED);
} else {
final Long sentAt =
extractLong(toTransfer.getAttribute(SplunkAPICall.RESPONDED_AT_ATTRIBUTE)).get();
if (sentAt + ttl < currentTime) {
session.transfer(toTransfer,
RELATIONSHIP_UNACKNOWLEDGED);
} else {
session.penalize(toTransfer);
session.transfer(toTransfer,
RELATIONSHIP_UNDETERMINED);
}
}
});
```
##########
File path:
nifi-nar-bundles/nifi-splunk-bundle/nifi-splunk-processors/src/main/java/org/apache/nifi/processors/splunk/QuerySplunkIndexingStatus.java
##########
@@ -234,4 +244,18 @@ private String generateContent(final Map<Long, FlowFile>
undetermined) throws IO
return Optional.empty();
}
}
+
+ private static boolean isAlreadyChecked(FlowFile flowFile) {
+ return extractBoolean(flowFile.getAttribute(ACK_CHECKED_ATTRIBUTE));
+ }
+
+ private static boolean extractBoolean(final String value) {
+ return Boolean.parseBoolean(value);
+ }
+
+ private void setAckCheckedToTrue(final ProcessSession session, final
FlowFile flowFile) {
+ final Map<String, String> attributes = new
HashMap<>(flowFile.getAttributes());
+ attributes.put(ACK_CHECKED_ATTRIBUTE, String.valueOf(Boolean.TRUE));
+ session.putAllAttributes(flowFile, attributes);
+ }
Review comment:
```suggestion
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]