dan-s1 commented on code in PR #9807:
URL: https://github.com/apache/nifi/pull/9807#discussion_r1998979958


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -365,23 +363,53 @@ public void onStopped() {
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
         final KafkaConsumerService consumerService = 
getConsumerService(context);
 
+        final long maxUncommittedMillis = 
context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+        final long stopTime = System.currentTimeMillis() + 
maxUncommittedMillis;
+        final OffsetTracker offsetTracker = new OffsetTracker();
+
         try {
-            final Iterator<ByteRecord> consumerRecords = 
consumerService.poll().iterator();
-            if (!consumerRecords.hasNext()) {
-                getLogger().debug("No Kafka Records consumed: {}", 
pollingContext);
-                return;
+            while (System.currentTimeMillis() < stopTime) {
+                try {
+                    final Duration maxWaitDuration = 
Duration.ofMillis(stopTime - System.currentTimeMillis());
+                    final Iterator<ByteRecord> consumerRecords = 
consumerService.poll(maxWaitDuration).iterator();
+                    if (!consumerRecords.hasNext()) {
+                        getLogger().debug("No Kafka Records consumed: {}", 
pollingContext);
+                        continue;
+                    }
+
+                    processConsumerRecords(context, session, offsetTracker, 
consumerRecords);
+                } catch (final Exception e) {
+                    getLogger().error("Failed to consume Kafka Records", e);
+                    consumerService.rollback();
+
+                    try {
+                        consumerService.close();
+                    } catch (final IOException ex) {
+                        getLogger().warn("Failed to close Kafka Consumer 
Service", ex);
+                    }

Review Comment:
   You can use try with resources to close `consumerService` although you will 
end up with an empty `try` block.  Ditto for lines 406-410
   ```suggestion
                              try (consumerService){
               } catch (IOException ex) {
                   getLogger().warn("Failed to close Kafka Consumer Service", 
ex);
               }
   ```



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/consumer/convert/RecordStreamKafkaMessageConverter.java:
##########
@@ -72,129 +71,121 @@ public RecordStreamKafkaMessageConverter(
             final KeyEncoding keyEncoding,
             final boolean commitOffsets,
             final OffsetTracker offsetTracker,
-            final Runnable onSuccess,
-            final ComponentLog logger) {
+            final ComponentLog logger,
+            final String brokerUri) {
         this.readerFactory = readerFactory;
         this.writerFactory = writerFactory;
         this.headerEncoding = headerEncoding;
         this.headerNamePattern = headerNamePattern;
         this.keyEncoding = keyEncoding;
         this.commitOffsets = commitOffsets;
         this.offsetTracker = offsetTracker;
-        this.onSuccess = onSuccess;
         this.logger = logger;
+        this.brokerUri = brokerUri;
     }
 
     @Override
     public void toFlowFiles(final ProcessSession session, final 
Iterator<ByteRecord> consumerRecords) {
-        try {
-            final Map<RecordGroupCriteria, RecordGroup> recordGroups = new 
HashMap<>();
-
-            String topic = null;
-            int partition = 0;
-            while (consumerRecords.hasNext()) {
-                final ByteRecord consumerRecord = consumerRecords.next();
-                if (topic == null) {
-                    partition = consumerRecord.getPartition();
-                    topic = consumerRecord.getTopic();
-                }
+        final Map<RecordGroupCriteria, RecordGroup> recordGroups = new 
HashMap<>();
+
+        while (consumerRecords.hasNext()) {
+            final ByteRecord consumerRecord = consumerRecords.next();
+            final String topic = consumerRecord.getTopic();
+            final int partition = consumerRecord.getPartition();
 
-                final byte[] value = consumerRecord.getValue();
-                final Map<String, String> headers = 
getRelevantHeaders(consumerRecord, headerNamePattern);
+            final byte[] value = consumerRecord.getValue();
+            final Map<String, String> headers = 
getRelevantHeaders(consumerRecord, headerNamePattern);
 
-                final Map<String, String> attributes = KafkaUtils.toAttributes(
+            final Map<String, String> attributes = KafkaUtils.toAttributes(
                     consumerRecord, keyEncoding, headerNamePattern, 
headerEncoding, commitOffsets);
 
-                try (final InputStream in = new ByteArrayInputStream(value);
-                     final RecordReader valueRecordReader = 
readerFactory.createRecordReader(attributes, in, value.length, logger)) {
+            try (final InputStream in = new ByteArrayInputStream(value);
+                    final RecordReader valueRecordReader = 
readerFactory.createRecordReader(attributes, in, value.length, logger)) {
 
-                    int recordCount = 0;
-                    while (true) {
-                        final Record record = valueRecordReader.nextRecord();
-                        // If we get a KafkaRecord that has no value, we still 
need to process it.
-                        if (recordCount++ > 0 && record == null) {
-                            break;
-                        }
+                int recordCount = 0;
+                while (true) {
+                    final Record record = valueRecordReader.nextRecord();
+                    // If we get a KafkaRecord that has no value, we still 
need to process it.
+                    if (recordCount++ > 0 && record == null) {
+                        break;
+                    }
 
-                        final RecordSchema recordSchema = record == null ? 
EMPTY_SCHEMA : record.getSchema();
-                        final RecordSchema writeSchema = 
writerFactory.getSchema(attributes, recordSchema);
+                    final RecordSchema recordSchema = record == null ? 
EMPTY_SCHEMA : record.getSchema();
+                    final RecordSchema writeSchema = 
writerFactory.getSchema(attributes, recordSchema);
 
-                        // Get/Register the Record Group that is associated 
with the schema for this Kafka Record
-                        final RecordGroupCriteria groupCriteria = new 
RecordGroupCriteria(writeSchema, headers);
-                        RecordGroup recordGroup = 
recordGroups.get(groupCriteria);
-                        if (recordGroup == null) {
-                            FlowFile flowFile = session.create();
-                            final Map<String, String> groupAttributes = Map.of(
+                    // Get/Register the Record Group that is associated with 
the schema, topic and
+                    // partition id for this Kafka Record
+                    final RecordGroupCriteria groupCriteria = new 
RecordGroupCriteria(writeSchema, headers, topic, partition);
+                    RecordGroup recordGroup = recordGroups.get(groupCriteria);
+                    if (recordGroup == null) {
+                        FlowFile flowFile = session.create();
+                        final Map<String, String> groupAttributes = Map.of(
                                 KafkaFlowFileAttribute.KAFKA_TOPIC, 
consumerRecord.getTopic(),
-                                KafkaFlowFileAttribute.KAFKA_PARTITION, 
Long.toString(consumerRecord.getPartition())
-                            );
-                            flowFile = session.putAllAttributes(flowFile, 
groupAttributes);
-
-                            final OutputStream out = session.write(flowFile);
-                            final RecordSetWriter writer;
-                            try {
-                                writer = writerFactory.createWriter(logger, 
writeSchema, out, attributes);
-                                writer.beginRecordSet();
-                            } catch (final Exception e) {
-                                out.close();
-                                throw e;
-                            }
-
-                            recordGroup = new RecordGroup(flowFile, writer, 
topic, partition);
-                            recordGroups.put(groupCriteria, recordGroup);
+                                KafkaFlowFileAttribute.KAFKA_PARTITION, 
Long.toString(consumerRecord.getPartition()));
+                        flowFile = session.putAllAttributes(flowFile, 
groupAttributes);
+
+                        final OutputStream out = session.write(flowFile);
+                        final RecordSetWriter writer;
+                        try {
+                            writer = writerFactory.createWriter(logger, 
writeSchema, out, attributes);
+                            writer.beginRecordSet();
+                        } catch (final Exception e) {
+                            out.close();
+                            throw e;

Review Comment:
   Couldn't all this (including lines 137-138) be placed in a try with resources
   ```suggestion
                          try (OutputStream out = session.write(flowFile);
                                    RecordSetWriter writer = 
writerFactory.createWriter(logger, writeSchema, out, attributes)) {
                                   writer.beginRecordSet();
                                   recordGroup = new RecordGroup(flowFile, 
writer, topic, partition);
                                   recordGroups.put(groupCriteria, recordGroup);
                               }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to