[ 
https://issues.apache.org/jira/browse/KAFKA-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378231#comment-16378231
 ] 

ASF GitHub Bot commented on KAFKA-6534:
---------------------------------------

guozhangwang closed pull request #4544: KAFKA-6534: Enforce a rebalance in the 
next poll call when encounter task migration
URL: https://github.com/apache/kafka/pull/4544
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 6884ff0dff7..b39f52c0552 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -767,20 +767,20 @@ public void handle(HeartbeatResponse heartbeatResponse, 
RequestFuture<Void> futu
                 future.complete(null);
             } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                     || error == Errors.NOT_COORDINATOR) {
-                log.debug("Attempt to heartbeat since coordinator {} is either 
not started or not valid.",
+                log.info("Attempt to heartbeat failed since coordinator {} is 
either not started or not valid.",
                         coordinator());
                 coordinatorDead();
                 future.raise(error);
             } else if (error == Errors.REBALANCE_IN_PROGRESS) {
-                log.debug("Attempt to heartbeat failed since group is 
rebalancing");
+                log.info("Attempt to heartbeat failed since group is 
rebalancing");
                 requestRejoin();
                 future.raise(Errors.REBALANCE_IN_PROGRESS);
             } else if (error == Errors.ILLEGAL_GENERATION) {
-                log.debug("Attempt to heartbeat failed since generation {} is 
not current", generation.generationId);
+                log.info("Attempt to heartbeat failed since generation {} is 
not current", generation.generationId);
                 resetGeneration();
                 future.raise(Errors.ILLEGAL_GENERATION);
             } else if (error == Errors.UNKNOWN_MEMBER_ID) {
-                log.debug("Attempt to heartbeat failed for since member id {} 
is not valid.", generation.memberId);
+                log.info("Attempt to heartbeat failed for since member id {} 
is not valid.", generation.memberId);
                 resetGeneration();
                 future.raise(Errors.UNKNOWN_MEMBER_ID);
             } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index d9c827fff52..a8f7e652da9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -52,6 +52,7 @@
     final Logger log;
     final LogContext logContext;
     boolean taskInitialized;
+    boolean taskClosed;
     final StateDirectory stateDirectory;
 
     InternalProcessorContext processorContext;
@@ -256,6 +257,9 @@ void closeStateManager(final boolean writeCheckpoint) 
throws ProcessorStateExcep
         }
     }
 
+    public boolean isClosed() {
+        return taskClosed;
+    }
 
     public boolean hasStateStores() {
         return !topology.stateStores().isEmpty();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
index 7b05f6488e7..f98e6356a22 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java
@@ -95,6 +95,8 @@ int process() {
                     processed++;
                 }
             } catch (final TaskMigratedException e) {
+                log.info("Failed to process stream task {} since it got 
migrated to another thread already. " +
+                        "Closing it as zombie before triggering a new 
rebalance.", task.id());
                 final RuntimeException fatalException = closeZombieTask(task);
                 if (fatalException != null) {
                     throw fatalException;
@@ -125,6 +127,8 @@ int punctuate() {
                     punctuated++;
                 }
             } catch (final TaskMigratedException e) {
+                log.info("Failed to punctuate stream task {} since it got 
migrated to another thread already. " +
+                        "Closing it as zombie before triggering a new 
rebalance.", task.id());
                 final RuntimeException fatalException = closeZombieTask(task);
                 if (fatalException != null) {
                     throw fatalException;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 2cd82f461dd..8529c9eca88 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -200,6 +200,8 @@ private RuntimeException suspendTasks(final Collection<T> 
tasks) {
                 suspended.put(task.id(), task);
             } catch (final TaskMigratedException closeAsZombieAndSwallow) {
                 // as we suspend a task, we are either shutting down or 
rebalancing, thus, we swallow and move on
+                log.info("Failed to suspend {} {} since it got migrated to 
another thread already. " +
+                        "Closing it as zombie and move on.", taskTypeName, 
task.id());
                 firstException.compareAndSet(null, closeZombieTask(task));
                 it.remove();
             } catch (final RuntimeException e) {
@@ -216,7 +218,6 @@ private RuntimeException suspendTasks(final Collection<T> 
tasks) {
     }
 
     RuntimeException closeZombieTask(final T task) {
-        log.warn("{} {} got migrated to another thread already. Closing it as 
zombie.", taskTypeName, task.id());
         try {
             task.close(false, true);
         } catch (final RuntimeException e) {
@@ -242,11 +243,12 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, 
final Set<TopicPartition>
                 try {
                     task.resume();
                 } catch (final TaskMigratedException e) {
+                    log.info("Failed to resume {} {} since it got migrated to 
another thread already. " +
+                            "Closing it as zombie before triggering a new 
rebalance.", taskTypeName, task.id());
                     final RuntimeException fatalException = 
closeZombieTask(task);
                     if (fatalException != null) {
                         throw fatalException;
                     }
-                    suspended.remove(taskId);
                     throw e;
                 }
                 transitionToRunning(task, new HashSet<TopicPartition>());
@@ -368,14 +370,14 @@ void applyToRunningTasks(final TaskAction<T> action) {
             try {
                 action.apply(task);
             } catch (final TaskMigratedException e) {
+                log.info("Failed to commit {} {} since it got migrated to 
another thread already. " +
+                        "Closing it as zombie before triggering a new 
rebalance.", taskTypeName, task.id());
                 final RuntimeException fatalException = closeZombieTask(task);
                 if (fatalException != null) {
                     throw fatalException;
                 }
                 it.remove();
-                if (firstException == null) {
-                    firstException = e;
-                }
+                throw e;
             } catch (final RuntimeException t) {
                 log.error("Failed to {} {} {} due to the following error:",
                           action.name(),
@@ -416,6 +418,8 @@ void close(final boolean clean) {
             try {
                 task.close(clean, false);
             } catch (final TaskMigratedException e) {
+                log.info("Failed to close {} {} since it got migrated to 
another thread already. " +
+                        "Closing it as zombie and move on.", taskTypeName, 
task.id());
                 firstException.compareAndSet(null, closeZombieTask(task));
             } catch (final RuntimeException t) {
                 log.error("Failed while closing {} {} due to the following 
error:",
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 39d34d799d2..861556cded3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -144,6 +144,8 @@ public void close(final boolean clean,
         } finally {
             closeStateManager(committedSuccessfully);
         }
+
+        taskClosed = true;
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6bca02ad9bc..b8777ad5521 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -541,6 +541,8 @@ public void close(boolean clean,
         }
 
         closeSuspended(clean, isZombie, firstException);
+
+        taskClosed = true;
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 61a22be5a15..cda04e9efc0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -757,7 +757,12 @@ private void runLoop() {
             } catch (final TaskMigratedException ignoreAndRejoinGroup) {
                 log.warn("Detected task {} that got migrated to another 
thread. " +
                     "This implies that this thread missed a rebalance and 
dropped out of the consumer group. " +
-                    "Trying to rejoin the consumer group now. Below is the 
detailed description of the task:\n{}", 
ignoreAndRejoinGroup.migratedTask().id(), 
ignoreAndRejoinGroup.migratedTask().toString(">"));
+                    "Will try to rejoin the consumer group. Below is the 
detailed description of the task:\n{}",
+                        ignoreAndRejoinGroup.migratedTask().id(), 
ignoreAndRejoinGroup.migratedTask().toString(">"));
+
+                // re-subscribe to enforce a rebalance in the next poll call
+                consumer.unsubscribe();
+                consumer.subscribe(builder.sourceTopicPattern(), 
rebalanceListener);
             }
         }
     }
@@ -898,6 +903,13 @@ private void addRecordsToTasks(final 
ConsumerRecords<byte[], byte[]> records) {
 
         for (final TopicPartition partition : records.partitions()) {
             final StreamTask task = taskManager.activeTask(partition);
+
+            if (task.isClosed()) {
+                log.warn("Stream task {} is already closed, probably because 
it got unexpectly migrated to another thread already. " +
+                        "Notifying the thread to trigger a new rebalance 
immediately.", task.id());
+                throw new TaskMigratedException(task);
+            }
+
             numAddedRecords += task.addRecords(partition, 
records.records(partition));
         }
         streamsMetrics.skippedRecordsSensor.record(records.count() - 
numAddedRecords, timerStartedMs);
@@ -1024,6 +1036,13 @@ private void maybeUpdateStandbyTasks(final long now) {
                         List<ConsumerRecord<byte[], byte[]>> remaining = 
entry.getValue();
                         if (remaining != null) {
                             final StandbyTask task = 
taskManager.standbyTask(partition);
+
+                            if (task.isClosed()) {
+                                log.warn("Standby task {} is already closed, 
probably because it got unexpectly migrated to another thread already. " +
+                                        "Notifying the thread to trigger a new 
rebalance immediately.", task.id());
+                                throw new TaskMigratedException(task);
+                            }
+
                             remaining = task.update(partition, remaining);
                             if (remaining != null) {
                                 remainingStandbyRecords.put(partition, 
remaining);
@@ -1051,6 +1070,12 @@ private void maybeUpdateStandbyTasks(final long now) {
                             throw new StreamsException(logPrefix + "Missing 
standby task for partition " + partition);
                         }
 
+                        if (task.isClosed()) {
+                            log.warn("Standby task {} is already closed, 
probably because it got unexpectly migrated to another thread already. " +
+                                    "Notifying the thread to trigger a new 
rebalance immediately.", task.id());
+                            throw new TaskMigratedException(task);
+                        }
+
                         final List<ConsumerRecord<byte[], byte[]>> remaining = 
task.update(partition, records.records(partition));
                         if (remaining != null) {
                             restoreConsumer.pause(singleton(partition));
@@ -1063,6 +1088,13 @@ private void maybeUpdateStandbyTasks(final long now) {
                 final Set<TopicPartition> partitions = 
recoverableException.partitions();
                 for (final TopicPartition partition : partitions) {
                     final StandbyTask task = 
taskManager.standbyTask(partition);
+
+                    if (task.isClosed()) {
+                        log.warn("Standby task {} is already closed, probably 
because it got unexpectly migrated to another thread already. " +
+                                "Notifying the thread to trigger a new 
rebalance immediately.", task.id());
+                        throw new TaskMigratedException(task);
+                    }
+
                     log.info("Reinitializing StandbyTask {}", task);
                     
task.reinitializeStateStoresForPartitions(recoverableException.partitions());
                 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 62ddacfb33c..9f02834dd75 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -300,7 +300,6 @@ StreamTask activeTask(final TopicPartition partition) {
         return active.runningTaskFor(partition);
     }
 
-
     StandbyTask standbyTask(final TopicPartition partition) {
         return standby.runningTaskFor(partition);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 6c7b2b43c9d..c4ea9647a8d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -61,6 +61,7 @@
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -78,18 +79,18 @@
     });
 
     private static String applicationId;
+    private final static int NUM_TOPIC_PARTITIONS = 2;
     private final static String CONSUMER_GROUP_ID = "readCommitted";
     private final static String SINGLE_PARTITION_INPUT_TOPIC = 
"singlePartitionInputTopic";
     private final static String SINGLE_PARTITION_THROUGH_TOPIC = 
"singlePartitionThroughTopic";
     private final static String SINGLE_PARTITION_OUTPUT_TOPIC = 
"singlePartitionOutputTopic";
-    private final static int NUM_TOPIC_PARTITIONS = 2;
     private final static String MULTI_PARTITION_INPUT_TOPIC = 
"multiPartitionInputTopic";
     private final static String MULTI_PARTITION_THROUGH_TOPIC = 
"multiPartitionThroughTopic";
     private final static String MULTI_PARTITION_OUTPUT_TOPIC = 
"multiPartitionOutputTopic";
     private final String storeName = "store";
 
     private AtomicBoolean errorInjected;
-    private AtomicBoolean injectGC;
+    private AtomicBoolean gcInjected;
     private volatile boolean doGC = true;
     private AtomicInteger commitRequested;
     private Throwable uncaughtException;
@@ -153,7 +154,6 @@ private void runSimpleCopyTest(final int numberOfRestarts,
         output.to(outputTopic);
 
         for (int i = 0; i < numberOfRestarts; ++i) {
-            final long factor = i;
             final KafkaStreams streams = new KafkaStreams(
                 builder.build(),
                 StreamsTestUtils.getStreamsConfig(
@@ -171,7 +171,7 @@ private void runSimpleCopyTest(final int numberOfRestarts,
             try {
                 streams.start();
 
-                final List<KeyValue<Long, Long>> inputData = 
prepareData(factor * 100, factor * 100 + 10L, 0L, 1L);
+                final List<KeyValue<Long, Long>> inputData = prepareData(i * 
100, i * 100 + 10L, 0L, 1L);
 
                 IntegrationTestUtils.produceKeyValuesSynchronously(
                     inputTopic,
@@ -510,7 +510,7 @@ public boolean conditionMet() {
             checkResultPerKey(committedRecords, committedDataBeforeGC);
             checkResultPerKey(uncommittedRecords, dataBeforeGC);
 
-            injectGC.set(true);
+            gcInjected.set(true);
             writeInputData(dataToTriggerFirstRebalance);
 
             TestUtils.waitForCondition(new TestCondition() {
@@ -577,7 +577,7 @@ public boolean conditionMet() {
     private KafkaStreams getKafkaStreams(final boolean withState, final String 
appDir, final int numberOfStreamsThreads) {
         commitRequested = new AtomicInteger(0);
         errorInjected = new AtomicBoolean(false);
-        injectGC = new AtomicBoolean(false);
+        gcInjected = new AtomicBoolean(false);
         final StreamsBuilder builder = new StreamsBuilder();
 
         String[] storeNames = null;
@@ -614,7 +614,7 @@ public void init(final ProcessorContext context) {
                             // only tries to fail once on one of the task
                             throw new RuntimeException("Injected test 
exception.");
                         }
-                        if (injectGC.compareAndSet(true, false)) {
+                        if (gcInjected.compareAndSet(true, false)) {
                             while (doGC) {
                                 try {
                                     Thread.sleep(100);
@@ -779,6 +779,8 @@ private void verifyStateStore(final KafkaStreams streams, 
final Set<KeyValue<Lon
             }
         }
 
+        assertNotNull(store);
+
         final KeyValueIterator<Long, Long> it = store.all();
         while (it.hasNext()) {
             assertTrue(expectedStoreContent.remove(it.next()));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer.poll may not trigger rebalance in time when there is a task migration
> ------------------------------------------------------------------------------
>
>                 Key: KAFKA-6534
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6534
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Guozhang Wang
>            Priority: Major
>
> When Streams detect a task migration event in one of its thread, today it 
> will always let its trigger to call {{consumer.poll}} hoping it could trigger 
> the rebalance and hence clean up the records buffered from the partitions 
> that on longer owned. However, because the rebalance is based onĀ heartbeat 
> responses which has a window of race, the rebalance is not always guaranteed 
> to be triggered when task migration happens. As a result it could cause the 
> records buffered in consumer to not be cleaned up and later be processed by 
> Streams, realizing it no longer belongs to the thread, causing:
> {code:java}
> java.lang.IllegalStateException: Record's partition does not belong to this 
> partition-group.
> {code}
> Note this issue is only relevant when EOS is turned on, and based the default 
> heartbeat.interval.ms value (3 sec), the race likelihood should not be high.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to