Github user jvwing commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2409#discussion_r162796692
  
    --- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java
 ---
    @@ -89,63 +89,80 @@ public void onTrigger(final ProcessContext context, 
final ProcessSession session
     
             final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
             final long maxBufferSizeBytes = 
context.getProperty(MAX_MESSAGE_BUFFER_SIZE_MB).asDataSize(DataUnit.B).longValue();
    -        final String firehoseStreamName = 
context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).getValue();
     
    -        List<FlowFile> flowFiles = filterMessagesByMaxSize(session, 
batchSize, maxBufferSizeBytes, firehoseStreamName,
    -            AWS_KINESIS_FIREHOSE_ERROR_MESSAGE);
    +        List<FlowFile> flowFiles = filterMessagesByMaxSize(session, 
batchSize, maxBufferSizeBytes, AWS_KINESIS_FIREHOSE_ERROR_MESSAGE);
    +        HashMap<String, List<FlowFile>> hashFlowFiles = new HashMap<>();
    +        HashMap<String, List<Record>> recordHash = new HashMap<String, 
List<Record>>();
     
             final AmazonKinesisFirehoseClient client = getClient();
     
             try {
    -            List<Record> records = new ArrayList<>();
    -
                 List<FlowFile> failedFlowFiles = new ArrayList<>();
                 List<FlowFile> successfulFlowFiles = new ArrayList<>();
     
                 // Prepare batch of records
                 for (int i = 0; i < flowFiles.size(); i++) {
                     FlowFile flowFile = flowFiles.get(i);
     
    +                final String firehoseStreamName = 
context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue();
    +
                     final ByteArrayOutputStream baos = new 
ByteArrayOutputStream();
                     session.exportTo(flowFile, baos);
    -                records.add(new 
Record().withData(ByteBuffer.wrap(baos.toByteArray())));
    +
    +                if ( !recordHash.containsKey(firehoseStreamName) )
    --- End diff --
    
    Thank you for following the pre-existing code style in this class file.  
That's usually a good practice.  However, `if` statements in NiFi code 
typically have braces even for single-line contents, and there is typically no 
space between parenthesis:
    ```
    if (somevar == true) {
        doStuff();
    }
    ```
    Would you please fix these up?


---

Reply via email to