sfc-gh-mpayne commented on code in PR #10908:
URL: https://github.com/apache/nifi/pull/10908#discussion_r2842942360
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -600,7 +611,38 @@ private KafkaConsumerService getConsumerService(final
ProcessContext context) {
getLogger().info("No Kafka Consumer Service available; creating a new
one. Active count: {}", activeCount);
final KafkaConnectionService connectionService =
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
- return connectionService.getConsumerService(pollingContext);
+ final KafkaConsumerService newService =
connectionService.getConsumerService(pollingContext);
+ newService.setRebalanceCallback(createRebalanceCallback());
+ return newService;
+ }
+
+ private RebalanceCallback createRebalanceCallback() {
+ return revokedPartitions -> {
+ final ProcessSession session = currentSession.get();
+ final OffsetTracker offsetTracker = currentOffsetTracker.get();
+ if (session == null) {
+ getLogger().debug("No active session during rebalance
callback, nothing to commit");
+ return;
+ }
+
+ getLogger().info("Rebalance callback invoked for {} revoked
partitions, committing session synchronously",
+ revokedPartitions.size());
+
+ try {
+ session.commit();
Review Comment:
We should avoid `session.commit` and instead ensure that we're using
`session.commitAsync` with callbacks
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -600,7 +611,38 @@ private KafkaConsumerService getConsumerService(final
ProcessContext context) {
getLogger().info("No Kafka Consumer Service available; creating a new
one. Active count: {}", activeCount);
final KafkaConnectionService connectionService =
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
- return connectionService.getConsumerService(pollingContext);
+ final KafkaConsumerService newService =
connectionService.getConsumerService(pollingContext);
+ newService.setRebalanceCallback(createRebalanceCallback());
+ return newService;
+ }
+
+ private RebalanceCallback createRebalanceCallback() {
+ return revokedPartitions -> {
Review Comment:
We should avoid any lambdas over 3-5 lines long in favor of anonymous inner
classes.
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-3-integration/src/test/java/org/apache/nifi/kafka/processors/ConsumeKafkaRebalanceIT.java:
##########
@@ -291,6 +296,170 @@ void
testNoDataLossWhenRebalanceOccursBeforeProcessingComplete() throws Exceptio
" records since processing was never completed.");
}
+ /**
+ * Tests that a REAL Kafka rebalance (triggered by a second consumer
joining) does not cause
+ * duplicate message processing.
+ *
+ * This test reproduces the real-world scenario where:
+ * 1. Consumer 1 is actively polling and processing messages (with slow
processing)
+ * 2. Consumer 2 joins the same group, triggering a Kafka rebalance
+ * 3. During Consumer 1's poll(), onPartitionsRevoked() is called
internally by Kafka
+ * 4. The RebalanceCallback is invoked, allowing the processor to commit
its session
+ * 5. Kafka offsets are committed synchronously while still in
onPartitionsRevoked()
+ * 6. Rebalance completes successfully with no duplicates
+ *
+ * The fix commits offsets INSIDE the onPartitionsRevoked() callback,
which is the
+ * only time when the consumer is still in a valid state to commit. This
is similar to how
+ * NiFi 1.x handled rebalances in ConsumerLease.
+ */
+ @Test
+ @Timeout(value = 120, unit = TimeUnit.SECONDS)
+ void testRealRebalanceDoesNotCauseDuplicates() throws Exception {
+ final String topic = "real-rebalance-test-" + UUID.randomUUID();
+ final String groupId = "real-rebalance-group-" + UUID.randomUUID();
+ final int numPartitions = 6;
+ final int messagesPerPartition = 500; // More messages to ensure
overlap
+ final int totalMessages = numPartitions * messagesPerPartition;
+
+ createTopic(topic, numPartitions);
+ produceMessagesToTopic(topic, numPartitions, messagesPerPartition);
+
+ // Track all consumed message IDs across both consumers
+ final Set<String> allConsumedMessages = ConcurrentHashMap.newKeySet();
+ final AtomicInteger duplicateCount = new AtomicInteger(0);
+ final AtomicInteger rebalanceCount = new AtomicInteger(0);
+ final CountDownLatch consumer1Started = new CountDownLatch(1);
+ final CountDownLatch consumer2Started = new CountDownLatch(1);
+ final CountDownLatch testComplete = new CountDownLatch(2);
+ final AtomicInteger consumer1Count = new AtomicInteger(0);
+ final AtomicInteger consumer2Count = new AtomicInteger(0);
+
+ final ComponentLog mockLog = mock(ComponentLog.class);
+ final ExecutorService executor = Executors.newFixedThreadPool(2);
+
+ // Rebalance callback that simulates processor committing its session
+ // In a real processor, this would commit FlowFiles; here we just log
and allow the commit
+ final RebalanceCallback callback = revokedPartitions -> {
+ rebalanceCount.incrementAndGet();
+ System.out.println("Rebalance callback invoked for partitions: " +
revokedPartitions);
+ };
+
+ try {
+ // Consumer 1: Start consuming with simulated slow processing
+ executor.submit(() -> {
+ final Properties props1 = getConsumerProperties(groupId);
+ // Fetch fewer records per poll to slow down consumption
+ props1.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
+ try (KafkaConsumer<byte[], byte[]> kafkaConsumer1 = new
KafkaConsumer<>(props1)) {
+ final Subscription subscription = new
Subscription(groupId, Collections.singletonList(topic),
AutoOffsetReset.EARLIEST);
+ // Use the constructor with callback to enable synchronous
commit during rebalance
+ final Kafka3ConsumerService service1 = new
Kafka3ConsumerService(mockLog, kafkaConsumer1, subscription, callback);
+ consumer1Started.countDown();
+
+ int emptyPolls = 0;
+ while (emptyPolls < 15 && allConsumedMessages.size() <
totalMessages) {
+ boolean hasRecords = false;
+ for (ByteRecord record :
service1.poll(Duration.ofSeconds(1))) {
+ hasRecords = true;
+ final String messageId = record.getTopic() + "-" +
record.getPartition() + "-" + record.getOffset();
+ if (!allConsumedMessages.add(messageId)) {
+ duplicateCount.incrementAndGet();
+ }
+ consumer1Count.incrementAndGet();
+ }
+
+ if (hasRecords) {
+ emptyPolls = 0;
+ // Simulate slow processing
+ Thread.sleep(50);
+ } else {
+ emptyPolls++;
+ }
+ }
+ service1.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ testComplete.countDown();
+ }
+ });
+
+ // Wait for consumer 1 to start
+ assertTrue(consumer1Started.await(30, TimeUnit.SECONDS), "Consumer
1 did not start");
+
+ // Wait a bit then start consumer 2 to trigger rebalance while
consumer 1 is actively consuming
+ Thread.sleep(200);
+
+ // Consumer 2: Join the group to trigger rebalance
+ executor.submit(() -> {
+ final Properties props2 = getConsumerProperties(groupId);
+ props2.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
+ try (KafkaConsumer<byte[], byte[]> kafkaConsumer2 = new
KafkaConsumer<>(props2)) {
+ final Subscription subscription = new
Subscription(groupId, Collections.singletonList(topic),
AutoOffsetReset.EARLIEST);
+ // Use the constructor with callback to enable synchronous
commit during rebalance
+ final Kafka3ConsumerService service2 = new
Kafka3ConsumerService(mockLog, kafkaConsumer2, subscription, callback);
+ consumer2Started.countDown();
+
+ int emptyPolls = 0;
+ while (emptyPolls < 15 && allConsumedMessages.size() <
totalMessages) {
+ boolean hasRecords = false;
+ for (ByteRecord record :
service2.poll(Duration.ofSeconds(1))) {
+ hasRecords = true;
+ final String messageId = record.getTopic() + "-" +
record.getPartition() + "-" + record.getOffset();
+ if (!allConsumedMessages.add(messageId)) {
+ duplicateCount.incrementAndGet();
+ }
+ consumer2Count.incrementAndGet();
+ }
+
+ if (hasRecords) {
+ emptyPolls = 0;
+ Thread.sleep(50);
+ } else {
+ emptyPolls++;
+ }
+ }
+ service2.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ testComplete.countDown();
+ }
+ });
+
+ // Wait for consumer 2 to start (confirms rebalance was triggered)
+ assertTrue(consumer2Started.await(30, TimeUnit.SECONDS), "Consumer
2 did not start");
+
+ // Wait for both consumers to finish
+ assertTrue(testComplete.await(90, TimeUnit.SECONDS), "Test did not
complete in time");
+
+ } finally {
+ executor.shutdownNow();
+ }
+
+ // Log results for debugging
+ System.out.println("Consumer 1 polled: " + consumer1Count.get() + "
records");
+ System.out.println("Consumer 2 polled: " + consumer2Count.get() + "
records");
+ System.out.println("Total unique messages: " +
allConsumedMessages.size());
+ System.out.println("Duplicate count: " + duplicateCount.get());
+ System.out.println("Rebalance count: " + rebalanceCount.get());
Review Comment:
We should be using loggers here, not `System.out.println`
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -600,7 +611,38 @@ private KafkaConsumerService getConsumerService(final
ProcessContext context) {
getLogger().info("No Kafka Consumer Service available; creating a new
one. Active count: {}", activeCount);
final KafkaConnectionService connectionService =
context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
- return connectionService.getConsumerService(pollingContext);
+ final KafkaConsumerService newService =
connectionService.getConsumerService(pollingContext);
+ newService.setRebalanceCallback(createRebalanceCallback());
+ return newService;
+ }
+
+ private RebalanceCallback createRebalanceCallback() {
+ return revokedPartitions -> {
+ final ProcessSession session = currentSession.get();
+ final OffsetTracker offsetTracker = currentOffsetTracker.get();
Review Comment:
Not sure that I understand the intention behind the ThreadLocal here. This
should just be passed into the `createRebalanceCallback` method (piped through
the `getConsumerService` method)
##########
nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java:
##########
@@ -348,6 +349,8 @@ public class ConsumeKafka extends AbstractProcessor
implements VerifiableProcess
private final Queue<KafkaConsumerService> consumerServices = new
LinkedBlockingQueue<>();
private final AtomicInteger activeConsumerCount = new AtomicInteger();
+ private final ThreadLocal<ProcessSession> currentSession = new
ThreadLocal<>();
+ private final ThreadLocal<OffsetTracker> currentOffsetTracker = new
ThreadLocal<>();
Review Comment:
We generally want to avoid `ThreadLocal` in processors because of the shared
thread pool.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]