[ https://issues.apache.org/jira/browse/KAFKA-6534?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16378231#comment-16378231 ]
ASF GitHub Bot commented on KAFKA-6534: --------------------------------------- guozhangwang closed pull request #4544: KAFKA-6534: Enforce a rebalance in the next poll call when encounter task migration URL: https://github.com/apache/kafka/pull/4544 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 6884ff0dff7..b39f52c0552 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -767,20 +767,20 @@ public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> futu future.complete(null); } else if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR) { - log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.", + log.info("Attempt to heartbeat failed since coordinator {} is either not started or not valid.", coordinator()); coordinatorDead(); future.raise(error); } else if (error == Errors.REBALANCE_IN_PROGRESS) { - log.debug("Attempt to heartbeat failed since group is rebalancing"); + log.info("Attempt to heartbeat failed since group is rebalancing"); requestRejoin(); future.raise(Errors.REBALANCE_IN_PROGRESS); } else if (error == Errors.ILLEGAL_GENERATION) { - log.debug("Attempt to heartbeat failed since generation {} is not current", generation.generationId); + log.info("Attempt to heartbeat failed since generation {} is not current", generation.generationId); resetGeneration(); future.raise(Errors.ILLEGAL_GENERATION); } else if (error == Errors.UNKNOWN_MEMBER_ID) { - log.debug("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId); + log.info("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId); resetGeneration(); future.raise(Errors.UNKNOWN_MEMBER_ID); } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index d9c827fff52..a8f7e652da9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -52,6 +52,7 @@ final Logger log; final LogContext logContext; boolean taskInitialized; + boolean taskClosed; final StateDirectory stateDirectory; InternalProcessorContext processorContext; @@ -256,6 +257,9 @@ void closeStateManager(final boolean writeCheckpoint) throws ProcessorStateExcep } } + public boolean isClosed() { + return taskClosed; + } public boolean hasStateStores() { return !topology.stateStores().isEmpty(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java index 7b05f6488e7..f98e6356a22 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedStreamsTasks.java @@ -95,6 +95,8 @@ int process() { processed++; } } catch (final TaskMigratedException e) { + log.info("Failed to process stream task {} since it got migrated to another thread already. " + + "Closing it as zombie before triggering a new rebalance.", task.id()); final RuntimeException fatalException = closeZombieTask(task); if (fatalException != null) { throw fatalException; @@ -125,6 +127,8 @@ int punctuate() { punctuated++; } } catch (final TaskMigratedException e) { + log.info("Failed to punctuate stream task {} since it got migrated to another thread already. " + + "Closing it as zombie before triggering a new rebalance.", task.id()); final RuntimeException fatalException = closeZombieTask(task); if (fatalException != null) { throw fatalException; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index 2cd82f461dd..8529c9eca88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -200,6 +200,8 @@ private RuntimeException suspendTasks(final Collection<T> tasks) { suspended.put(task.id(), task); } catch (final TaskMigratedException closeAsZombieAndSwallow) { // as we suspend a task, we are either shutting down or rebalancing, thus, we swallow and move on + log.info("Failed to suspend {} {} since it got migrated to another thread already. " + + "Closing it as zombie and move on.", taskTypeName, task.id()); firstException.compareAndSet(null, closeZombieTask(task)); it.remove(); } catch (final RuntimeException e) { @@ -216,7 +218,6 @@ private RuntimeException suspendTasks(final Collection<T> tasks) { } RuntimeException closeZombieTask(final T task) { - log.warn("{} {} got migrated to another thread already. Closing it as zombie.", taskTypeName, task.id()); try { task.close(false, true); } catch (final RuntimeException e) { @@ -242,11 +243,12 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final Set<TopicPartition> try { task.resume(); } catch (final TaskMigratedException e) { + log.info("Failed to resume {} {} since it got migrated to another thread already. " + + "Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id()); final RuntimeException fatalException = closeZombieTask(task); if (fatalException != null) { throw fatalException; } - suspended.remove(taskId); throw e; } transitionToRunning(task, new HashSet<TopicPartition>()); @@ -368,14 +370,14 @@ void applyToRunningTasks(final TaskAction<T> action) { try { action.apply(task); } catch (final TaskMigratedException e) { + log.info("Failed to commit {} {} since it got migrated to another thread already. " + + "Closing it as zombie before triggering a new rebalance.", taskTypeName, task.id()); final RuntimeException fatalException = closeZombieTask(task); if (fatalException != null) { throw fatalException; } it.remove(); - if (firstException == null) { - firstException = e; - } + throw e; } catch (final RuntimeException t) { log.error("Failed to {} {} {} due to the following error:", action.name(), @@ -416,6 +418,8 @@ void close(final boolean clean) { try { task.close(clean, false); } catch (final TaskMigratedException e) { + log.info("Failed to close {} {} since it got migrated to another thread already. " + + "Closing it as zombie and move on.", taskTypeName, task.id()); firstException.compareAndSet(null, closeZombieTask(task)); } catch (final RuntimeException t) { log.error("Failed while closing {} {} due to the following error:", diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 39d34d799d2..861556cded3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -144,6 +144,8 @@ public void close(final boolean clean, } finally { closeStateManager(committedSuccessfully); } + + taskClosed = true; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 6bca02ad9bc..b8777ad5521 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -541,6 +541,8 @@ public void close(boolean clean, } closeSuspended(clean, isZombie, firstException); + + taskClosed = true; } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 61a22be5a15..cda04e9efc0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -757,7 +757,12 @@ private void runLoop() { } catch (final TaskMigratedException ignoreAndRejoinGroup) { log.warn("Detected task {} that got migrated to another thread. " + "This implies that this thread missed a rebalance and dropped out of the consumer group. " + - "Trying to rejoin the consumer group now. Below is the detailed description of the task:\n{}", ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">")); + "Will try to rejoin the consumer group. Below is the detailed description of the task:\n{}", + ignoreAndRejoinGroup.migratedTask().id(), ignoreAndRejoinGroup.migratedTask().toString(">")); + + // re-subscribe to enforce a rebalance in the next poll call + consumer.unsubscribe(); + consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); } } } @@ -898,6 +903,13 @@ private void addRecordsToTasks(final ConsumerRecords<byte[], byte[]> records) { for (final TopicPartition partition : records.partitions()) { final StreamTask task = taskManager.activeTask(partition); + + if (task.isClosed()) { + log.warn("Stream task {} is already closed, probably because it got unexpectly migrated to another thread already. " + + "Notifying the thread to trigger a new rebalance immediately.", task.id()); + throw new TaskMigratedException(task); + } + numAddedRecords += task.addRecords(partition, records.records(partition)); } streamsMetrics.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs); @@ -1024,6 +1036,13 @@ private void maybeUpdateStandbyTasks(final long now) { List<ConsumerRecord<byte[], byte[]>> remaining = entry.getValue(); if (remaining != null) { final StandbyTask task = taskManager.standbyTask(partition); + + if (task.isClosed()) { + log.warn("Standby task {} is already closed, probably because it got unexpectly migrated to another thread already. " + + "Notifying the thread to trigger a new rebalance immediately.", task.id()); + throw new TaskMigratedException(task); + } + remaining = task.update(partition, remaining); if (remaining != null) { remainingStandbyRecords.put(partition, remaining); @@ -1051,6 +1070,12 @@ private void maybeUpdateStandbyTasks(final long now) { throw new StreamsException(logPrefix + "Missing standby task for partition " + partition); } + if (task.isClosed()) { + log.warn("Standby task {} is already closed, probably because it got unexpectly migrated to another thread already. " + + "Notifying the thread to trigger a new rebalance immediately.", task.id()); + throw new TaskMigratedException(task); + } + final List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition)); if (remaining != null) { restoreConsumer.pause(singleton(partition)); @@ -1063,6 +1088,13 @@ private void maybeUpdateStandbyTasks(final long now) { final Set<TopicPartition> partitions = recoverableException.partitions(); for (final TopicPartition partition : partitions) { final StandbyTask task = taskManager.standbyTask(partition); + + if (task.isClosed()) { + log.warn("Standby task {} is already closed, probably because it got unexpectly migrated to another thread already. " + + "Notifying the thread to trigger a new rebalance immediately.", task.id()); + throw new TaskMigratedException(task); + } + log.info("Reinitializing StandbyTask {}", task); task.reinitializeStateStoresForPartitions(recoverableException.partitions()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 62ddacfb33c..9f02834dd75 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -300,7 +300,6 @@ StreamTask activeTask(final TopicPartition partition) { return active.runningTaskFor(partition); } - StandbyTask standbyTask(final TopicPartition partition) { return standby.runningTaskFor(partition); } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 6c7b2b43c9d..c4ea9647a8d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -61,6 +61,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -78,18 +79,18 @@ }); private static String applicationId; + private final static int NUM_TOPIC_PARTITIONS = 2; private final static String CONSUMER_GROUP_ID = "readCommitted"; private final static String SINGLE_PARTITION_INPUT_TOPIC = "singlePartitionInputTopic"; private final static String SINGLE_PARTITION_THROUGH_TOPIC = "singlePartitionThroughTopic"; private final static String SINGLE_PARTITION_OUTPUT_TOPIC = "singlePartitionOutputTopic"; - private final static int NUM_TOPIC_PARTITIONS = 2; private final static String MULTI_PARTITION_INPUT_TOPIC = "multiPartitionInputTopic"; private final static String MULTI_PARTITION_THROUGH_TOPIC = "multiPartitionThroughTopic"; private final static String MULTI_PARTITION_OUTPUT_TOPIC = "multiPartitionOutputTopic"; private final String storeName = "store"; private AtomicBoolean errorInjected; - private AtomicBoolean injectGC; + private AtomicBoolean gcInjected; private volatile boolean doGC = true; private AtomicInteger commitRequested; private Throwable uncaughtException; @@ -153,7 +154,6 @@ private void runSimpleCopyTest(final int numberOfRestarts, output.to(outputTopic); for (int i = 0; i < numberOfRestarts; ++i) { - final long factor = i; final KafkaStreams streams = new KafkaStreams( builder.build(), StreamsTestUtils.getStreamsConfig( @@ -171,7 +171,7 @@ private void runSimpleCopyTest(final int numberOfRestarts, try { streams.start(); - final List<KeyValue<Long, Long>> inputData = prepareData(factor * 100, factor * 100 + 10L, 0L, 1L); + final List<KeyValue<Long, Long>> inputData = prepareData(i * 100, i * 100 + 10L, 0L, 1L); IntegrationTestUtils.produceKeyValuesSynchronously( inputTopic, @@ -510,7 +510,7 @@ public boolean conditionMet() { checkResultPerKey(committedRecords, committedDataBeforeGC); checkResultPerKey(uncommittedRecords, dataBeforeGC); - injectGC.set(true); + gcInjected.set(true); writeInputData(dataToTriggerFirstRebalance); TestUtils.waitForCondition(new TestCondition() { @@ -577,7 +577,7 @@ public boolean conditionMet() { private KafkaStreams getKafkaStreams(final boolean withState, final String appDir, final int numberOfStreamsThreads) { commitRequested = new AtomicInteger(0); errorInjected = new AtomicBoolean(false); - injectGC = new AtomicBoolean(false); + gcInjected = new AtomicBoolean(false); final StreamsBuilder builder = new StreamsBuilder(); String[] storeNames = null; @@ -614,7 +614,7 @@ public void init(final ProcessorContext context) { // only tries to fail once on one of the task throw new RuntimeException("Injected test exception."); } - if (injectGC.compareAndSet(true, false)) { + if (gcInjected.compareAndSet(true, false)) { while (doGC) { try { Thread.sleep(100); @@ -779,6 +779,8 @@ private void verifyStateStore(final KafkaStreams streams, final Set<KeyValue<Lon } } + assertNotNull(store); + final KeyValueIterator<Long, Long> it = store.all(); while (it.hasNext()) { assertTrue(expectedStoreContent.remove(it.next())); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consumer.poll may not trigger rebalance in time when there is a task migration > ------------------------------------------------------------------------------ > > Key: KAFKA-6534 > URL: https://issues.apache.org/jira/browse/KAFKA-6534 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Priority: Major > > When Streams detect a task migration event in one of its thread, today it > will always let its trigger to call {{consumer.poll}} hoping it could trigger > the rebalance and hence clean up the records buffered from the partitions > that on longer owned. However, because the rebalance is based onĀ heartbeat > responses which has a window of race, the rebalance is not always guaranteed > to be triggered when task migration happens. As a result it could cause the > records buffered in consumer to not be cleaned up and later be processed by > Streams, realizing it no longer belongs to the thread, causing: > {code:java} > java.lang.IllegalStateException: Record's partition does not belong to this > partition-group. > {code} > Note this issue is only relevant when EOS is turned on, and based the default > heartbeat.interval.ms value (3 sec), the race likelihood should not be high. -- This message was sent by Atlassian JIRA (v7.6.3#76005)