sfc-gh-mpayne commented on code in PR #10908:
URL: https://github.com/apache/nifi/pull/10908#discussion_r2842942360


##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -600,7 +611,38 @@ private KafkaConsumerService getConsumerService(final 
ProcessContext context) {
 
         getLogger().info("No Kafka Consumer Service available; creating a new 
one. Active count: {}", activeCount);
         final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
-        return connectionService.getConsumerService(pollingContext);
+        final KafkaConsumerService newService = 
connectionService.getConsumerService(pollingContext);
+        newService.setRebalanceCallback(createRebalanceCallback());
+        return newService;
+    }
+
+    private RebalanceCallback createRebalanceCallback() {
+        return revokedPartitions -> {
+            final ProcessSession session = currentSession.get();
+            final OffsetTracker offsetTracker = currentOffsetTracker.get();
+            if (session == null) {
+                getLogger().debug("No active session during rebalance 
callback, nothing to commit");
+                return;
+            }
+
+            getLogger().info("Rebalance callback invoked for {} revoked 
partitions, committing session synchronously",
+                    revokedPartitions.size());
+
+            try {
+                session.commit();

Review Comment:
   We should avoid `session.commit` and instead ensure that we're using 
`session.commitAsync` with callbacks



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -600,7 +611,38 @@ private KafkaConsumerService getConsumerService(final 
ProcessContext context) {
 
         getLogger().info("No Kafka Consumer Service available; creating a new 
one. Active count: {}", activeCount);
         final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
-        return connectionService.getConsumerService(pollingContext);
+        final KafkaConsumerService newService = 
connectionService.getConsumerService(pollingContext);
+        newService.setRebalanceCallback(createRebalanceCallback());
+        return newService;
+    }
+
+    private RebalanceCallback createRebalanceCallback() {
+        return revokedPartitions -> {

Review Comment:
   We should avoid any lambdas over 3-5 lines long in favor of anonymous inner 
classes.



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java:
##########
@@ -291,6 +296,170 @@ void 
testNoDataLossWhenRebalanceOccursBeforeProcessingComplete() throws Exceptio
                 " records since processing was never completed.");
     }
 
+    /**
+     * Tests that a REAL Kafka rebalance (triggered by a second consumer 
joining) does not cause
+     * duplicate message processing.
+     *
+     * This test reproduces the real-world scenario where:
+     * 1. Consumer 1 is actively polling and processing messages (with slow 
processing)
+     * 2. Consumer 2 joins the same group, triggering a Kafka rebalance
+     * 3. During Consumer 1's poll(), onPartitionsRevoked() is called 
internally by Kafka
+     * 4. The RebalanceCallback is invoked, allowing the processor to commit 
its session
+     * 5. Kafka offsets are committed synchronously while still in 
onPartitionsRevoked()
+     * 6. Rebalance completes successfully with no duplicates
+     *
+     * The fix commits offsets INSIDE the onPartitionsRevoked() callback, 
which is the
+     * only time when the consumer is still in a valid state to commit. This 
is similar to how
+     * NiFi 1.x handled rebalances in ConsumerLease.
+     */
+    @Test
+    @Timeout(value = 120, unit = TimeUnit.SECONDS)
+    void testRealRebalanceDoesNotCauseDuplicates() throws Exception {
+        final String topic = "real-rebalance-test-" + UUID.randomUUID();
+        final String groupId = "real-rebalance-group-" + UUID.randomUUID();
+        final int numPartitions = 6;
+        final int messagesPerPartition = 500; // More messages to ensure 
overlap
+        final int totalMessages = numPartitions * messagesPerPartition;
+
+        createTopic(topic, numPartitions);
+        produceMessagesToTopic(topic, numPartitions, messagesPerPartition);
+
+        // Track all consumed message IDs across both consumers
+        final Set<String> allConsumedMessages = ConcurrentHashMap.newKeySet();
+        final AtomicInteger duplicateCount = new AtomicInteger(0);
+        final AtomicInteger rebalanceCount = new AtomicInteger(0);
+        final CountDownLatch consumer1Started = new CountDownLatch(1);
+        final CountDownLatch consumer2Started = new CountDownLatch(1);
+        final CountDownLatch testComplete = new CountDownLatch(2);
+        final AtomicInteger consumer1Count = new AtomicInteger(0);
+        final AtomicInteger consumer2Count = new AtomicInteger(0);
+
+        final ComponentLog mockLog = mock(ComponentLog.class);
+        final ExecutorService executor = Executors.newFixedThreadPool(2);
+
+        // Rebalance callback that simulates processor committing its session
+        // In a real processor, this would commit FlowFiles; here we just log 
and allow the commit
+        final RebalanceCallback callback = revokedPartitions -> {
+            rebalanceCount.incrementAndGet();
+            System.out.println("Rebalance callback invoked for partitions: " + 
revokedPartitions);
+        };
+
+        try {
+            // Consumer 1: Start consuming with simulated slow processing
+            executor.submit(() -> {
+                final Properties props1 = getConsumerProperties(groupId);
+                // Fetch fewer records per poll to slow down consumption
+                props1.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
+                try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new 
KafkaConsumer<>(props1)) {
+                    final Subscription subscription = new 
Subscription(groupId, Collections.singletonList(topic), 
AutoOffsetReset.EARLIEST);
+                    // Use the constructor with callback to enable synchronous 
commit during rebalance
+                    final Kafka3ConsumerService service1 = new 
Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription, callback);
+                    consumer1Started.countDown();
+
+                    int emptyPolls = 0;
+                    while (emptyPolls < 15 && allConsumedMessages.size() < 
totalMessages) {
+                        boolean hasRecords = false;
+                        for (ByteRecord record : 
service1.poll(Duration.ofSeconds(1))) {
+                            hasRecords = true;
+                            final String messageId = record.getTopic() + "-" + 
record.getPartition() + "-" + record.getOffset();
+                            if (!allConsumedMessages.add(messageId)) {
+                                duplicateCount.incrementAndGet();
+                            }
+                            consumer1Count.incrementAndGet();
+                        }
+
+                        if (hasRecords) {
+                            emptyPolls = 0;
+                            // Simulate slow processing
+                            Thread.sleep(50);
+                        } else {
+                            emptyPolls++;
+                        }
+                    }
+                    service1.close();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    testComplete.countDown();
+                }
+            });
+
+            // Wait for consumer 1 to start
+            assertTrue(consumer1Started.await(30, TimeUnit.SECONDS), "Consumer 
1 did not start");
+
+            // Wait a bit then start consumer 2 to trigger rebalance while 
consumer 1 is actively consuming
+            Thread.sleep(200);
+
+            // Consumer 2: Join the group to trigger rebalance
+            executor.submit(() -> {
+                final Properties props2 = getConsumerProperties(groupId);
+                props2.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
+                try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new 
KafkaConsumer<>(props2)) {
+                    final Subscription subscription = new 
Subscription(groupId, Collections.singletonList(topic), 
AutoOffsetReset.EARLIEST);
+                    // Use the constructor with callback to enable synchronous 
commit during rebalance
+                    final Kafka3ConsumerService service2 = new 
Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription, callback);
+                    consumer2Started.countDown();
+
+                    int emptyPolls = 0;
+                    while (emptyPolls < 15 && allConsumedMessages.size() < 
totalMessages) {
+                        boolean hasRecords = false;
+                        for (ByteRecord record : 
service2.poll(Duration.ofSeconds(1))) {
+                            hasRecords = true;
+                            final String messageId = record.getTopic() + "-" + 
record.getPartition() + "-" + record.getOffset();
+                            if (!allConsumedMessages.add(messageId)) {
+                                duplicateCount.incrementAndGet();
+                            }
+                            consumer2Count.incrementAndGet();
+                        }
+
+                        if (hasRecords) {
+                            emptyPolls = 0;
+                            Thread.sleep(50);
+                        } else {
+                            emptyPolls++;
+                        }
+                    }
+                    service2.close();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                } finally {
+                    testComplete.countDown();
+                }
+            });
+
+            // Wait for consumer 2 to start (confirms rebalance was triggered)
+            assertTrue(consumer2Started.await(30, TimeUnit.SECONDS), "Consumer 
2 did not start");
+
+            // Wait for both consumers to finish
+            assertTrue(testComplete.await(90, TimeUnit.SECONDS), "Test did not 
complete in time");
+
+        } finally {
+            executor.shutdownNow();
+        }
+
+        // Log results for debugging
+        System.out.println("Consumer 1 polled: " + consumer1Count.get() + " 
records");
+        System.out.println("Consumer 2 polled: " + consumer2Count.get() + " 
records");
+        System.out.println("Total unique messages: " + 
allConsumedMessages.size());
+        System.out.println("Duplicate count: " + duplicateCount.get());
+        System.out.println("Rebalance count: " + rebalanceCount.get());

Review Comment:
   We should be using loggers here, not `System.out.println`



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -600,7 +611,38 @@ private KafkaConsumerService getConsumerService(final 
ProcessContext context) {
 
         getLogger().info("No Kafka Consumer Service available; creating a new 
one. Active count: {}", activeCount);
         final KafkaConnectionService connectionService = 
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
-        return connectionService.getConsumerService(pollingContext);
+        final KafkaConsumerService newService = 
connectionService.getConsumerService(pollingContext);
+        newService.setRebalanceCallback(createRebalanceCallback());
+        return newService;
+    }
+
+    private RebalanceCallback createRebalanceCallback() {
+        return revokedPartitions -> {
+            final ProcessSession session = currentSession.get();
+            final OffsetTracker offsetTracker = currentOffsetTracker.get();

Review Comment:
   Not sure that I understand the intention behind the ThreadLocal here. This 
should just be passed into the `createRebalanceCallback` method (piped through 
the `getConsumerService` method)



##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -348,6 +349,8 @@ public class ConsumeKafka extends AbstractProcessor 
implements VerifiableProcess
 
     private final Queue<KafkaConsumerService> consumerServices = new 
LinkedBlockingQueue<>();
     private final AtomicInteger activeConsumerCount = new AtomicInteger();
+    private final ThreadLocal<ProcessSession> currentSession = new 
ThreadLocal<>();
+    private final ThreadLocal<OffsetTracker> currentOffsetTracker = new 
ThreadLocal<>();

Review Comment:
   We generally want to avoid `ThreadLocal` in processors because of the shared 
thread pool.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to