Github user jvwing commented on a diff in the pull request:
https://github.com/apache/nifi/pull/2409#discussion_r162796710
--- Diff:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/stream/PutKinesisStream.java
---
@@ -111,64 +112,80 @@ public void onTrigger(final ProcessContext context,
final ProcessSession session
for (int i = 0; i < flowFiles.size(); i++) {
FlowFile flowFile = flowFiles.get(i);
+ String streamName =
context.getProperty(KINESIS_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue();;
+
final ByteArrayOutputStream baos = new
ByteArrayOutputStream();
session.exportTo(flowFile, baos);
PutRecordsRequestEntry record = new
PutRecordsRequestEntry().withData(ByteBuffer.wrap(baos.toByteArray()));
String partitionKey =
context.getProperty(PutKinesisStream.KINESIS_PARTITION_KEY)
-
.evaluateAttributeExpressions(flowFiles.get(i)).getValue();
+
.evaluateAttributeExpressions(flowFiles.get(i)).getValue();
if ( ! StringUtils.isBlank(partitionKey) ) {
record.setPartitionKey(partitionKey);
} else {
record.setPartitionKey(Integer.toString(randomParitionKeyGenerator.nextInt()));
}
- records.add(record);
+ if ( !recordHash.containsKey(streamName) ) {
--- End diff --
Would you also please fix these `if`s? I completely understand they were
already that way.
---