[GitHub] [kafka] srishti-saraswat commented on a diff in pull request #12567: Migrate connect system tests to KRaft
srishti-saraswat commented on code in PR #12567: URL: https://github.com/apache/kafka/pull/12567#discussion_r965588572 ## tests/kafkatest/tests/connect/connect_rest_test.py: ## @@ -34,9 +35,7 @@ class ConnectRestApiTest(KafkaTest): FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter', 'header.converter', 'batch.size', 'topic', 'file', 'transforms', 'config.action.reload', 'errors.retry.timeout', 'errors.retry.delay.max.ms', - 'errors.tolerance', 'errors.log.enable', 'errors.log.include.messages', 'predicates', 'topic.creation.groups', - 'offsets.storage.topic', 'transaction.boundary', 'transaction.boundary.interval.ms', 'config.action.reload', - 'exactly.once.support'} + 'errors.tolerance', 'errors.log.enable', 'errors.log.include.messages', 'predicates', 'topic.creation.groups'} Review Comment: Reverted to original since it is taken care in a separate PR - https://github.com/apache/kafka/pull/12575 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13766) Use `max.poll.interval.ms` as the timeout during complete-rebalance phase
[ https://issues.apache.org/jira/browse/KAFKA-13766?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-13766: Component/s: group-coordinator > Use `max.poll.interval.ms` as the timeout during complete-rebalance phase > - > > Key: KAFKA-13766 > URL: https://issues.apache.org/jira/browse/KAFKA-13766 > Project: Kafka > Issue Type: Bug > Components: core, group-coordinator >Reporter: Guozhang Wang >Assignee: David Jacot >Priority: Major > Labels: new-rebalance-should-fix > > The lifetime of a consumer can be categorized in three phases: > 1) During normal processing, the broker expects a hb request periodically > from consumer, and that is timed by the `session.timeout.ms`. > 2) During the prepare_rebalance, the broker would expect a join-group request > to be received within the rebalance.timeout, which is piggy-backed as the > `max.poll.interval.ms`. > 3) During the complete_rebalance, the broker would expect a sync-group > request to be received again within the `session.timeout.ms`. > So during different phases of the life of the consumer, different timeout > would be used to bound the timer. > Nowadays with cooperative rebalance protocol, we can still return records and > process them in the middle of a rebalance from {{consumer.poll}}. In that > case, for phase 3) we should also use the `max.poll.interval.ms` to bound the > timer, which is in practice larger than `session.timeout.ms`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] dajac commented on pull request #12275: MINOR: Change groupInstanceId type from Optional to String in ConsumerGroupMetadata and GroupRebalanceConfig
dajac commented on PR #12275: URL: https://github.com/apache/kafka/pull/12275#issuecomment-1240403160 Thanks @il-kyun for the patch. I looked into it and I don't think that the change is worth it. Using Optional here seems fine to me. I am going to close it for now. Feel free to reopen if needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac closed pull request #12275: MINOR: Change groupInstanceId type from Optional to String in ConsumerGroupMetadata and GroupRebalanceConfig
dajac closed pull request #12275: MINOR: Change groupInstanceId type from Optional to String in ConsumerGroupMetadata and GroupRebalanceConfig URL: https://github.com/apache/kafka/pull/12275 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14133: -- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875A}Merged{color} # -{color:#00875A}WorkerConnectorTest{color} (connect) (owner:- [~yash.mayya] -)- # -{color:#00875A}WorkerCoordinatorTest{color} (connect) (owner:- [~yash.mayya] -)- # -{color:#00875A}RootResourceTest{color} (connect) (owner:- [~yash.mayya] -)- # -{color:#00875A}ByteArrayProducerRecordEquals{color} (connect) (owner:- [~yash.mayya] -)- # {color:#00875A}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875A}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875A}KStreamPrintTest{color} (owner: Christo) # {color:#00875A}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875A}MaterializedInternalTest{color} (owner: Christo) # {color:#00875A}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875A}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875A}ClientUtilsTest{color} (owner: Christo) # {color:#00875A}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#ff8b00}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#ff8b00}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#ff8b00}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#ff8b00}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#ff8b00}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#ff8b00}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#ff8b00}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#ff8b00}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#ff8b00}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#ff8b00}RocksDBStoreTest{color} (owner: Christo) # {color:#ff8b00}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TopologyTest{color} (owner: Christo) # {color:#ff8b00}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#ff8b00}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#ff8b00}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#ff8b00}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#ff8b00}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # InternalTopicManagerTest (owner: Christo) # ProcessorContextImplTest (owner: Christo) # WriteConsistencyVectorTest (owner: Christo) # StreamsAssignmentScaleTest (owner: Christo) # StreamsPartitionAssignorTest (owner: Christo) # TaskManagerTest (owner: Christo) # AssignmentTestUtils (owner: Christo) # ProcessorStateManagerTest ({*}WIP{*} owner: Matthew) # StandbyTaskTest ({*}WIP{*} owner: Matthew) # StoreChangelogReaderTest ({*}WIP{*} owner: Matthew) # StreamTaskTest ({*}WIP{*} owner: Matthew) # StreamThreadTest ({*}WIP{*} owner: Matthew) # StreamsMetricsImplTest # TimeOrderedCachingPersistentWindowStoreTest # TimeOrderedWindowStoreTest *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875A}Merged{color} # -{color:#00875A}WorkerConnectorTest{color} (connect) (owner:- [~yash.mayya] -)- # -{color:#00875A}WorkerCoordinatorTest{color} (connect) (owner:- [~yash.mayya] -)- # -{color:#00875A}RootResourceTest{color} (connect) (owner:- [~yash.mayya] -)- # -{color:#00875A}ByteArrayProducerRecordEquals{color} (connect) (owner:- [~yash.mayya] -)- # {color:#ff8b00}KStreamFlatTransformTest{color} (owner: Christo) # {color:#ff8b00}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#ff8b00}KStreamPrintTest{color} (owner: Christo) # {color:#ff8b00}KStreamRepartitionTest{color} (owner: Christo) # {color:#ff8b00}MaterializedInternalTest{color} (owner: Christo) # {colo
[GitHub] [kafka] mimaison merged pull request #12606: MINOR: Fix usage of @see in IncrementalCooperativeAssignor doc comments
mimaison merged PR #12606: URL: https://github.com/apache/kafka/pull/12606 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-14208) KafkaConsumer#commitAsync throws unexpected WakeupException
Qingsheng Ren created KAFKA-14208: - Summary: KafkaConsumer#commitAsync throws unexpected WakeupException Key: KAFKA-14208 URL: https://issues.apache.org/jira/browse/KAFKA-14208 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 3.2.1 Reporter: Qingsheng Ren We recently encountered a bug after upgrading Kafka client to 3.2.1 in Flink Kafka connector (FLINK-29153). Here's the exception: {code:java} org.apache.kafka.common.errors.WakeupException at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:514) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:252) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnready(ConsumerCoordinator.java:493) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1055) at org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1573) at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:226) {code} As {{WakeupException}} is not listed in the JavaDoc of {{{}KafkaConsumer#commitAsync{}}}, Flink Kafka connector doesn't catch the exception thrown directly from KafkaConsumer#commitAsync but handles all exceptions in the callback. I checked the source code and suspect this is caused by KAFKA-13563. Also we never had this exception in commitAsync when we used Kafka client 2.4.1 & 2.8.1. I'm wondering if this is kind of breaking the public API as the WakeupException is not listed in JavaDoc, and maybe it's better to invoke the callback to handle the {{WakeupException}} instead of throwing it directly from the method itself. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14132) Remaining PowerMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yash Mayya updated KAFKA-14132: --- Description: {color:#de350b}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: # ErrorHandlingTaskTest (owner: Divij) # SourceTaskOffsetCommiterTest (owner: Divij) # WorkerMetricsGroupTest (owner: Divij) # WorkerSinkTaskTest (owner: Divij) # WorkerSinkTaskThreadedTest (owner: Divij) # WorkerTaskTest (owner: [~yash.mayya]) # ErrorReporterTest (owner: [~yash.mayya]) # RetryWithToleranceOperatorTest (owner: [~yash.mayya]) # WorkerErrantRecordReporterTest (owner: [~yash.mayya]) # ConnectorsResourceTest # StandaloneHerderTest # KafkaConfigBackingStoreTest # KafkaOffsetBackingStoreTest (owner: Christo) ([https://github.com/apache/kafka/pull/12418]) # KafkaBasedLogTest # RetryUtilTest # RepartitionTopicTest (streams) (owner: Christo) # StateManagerUtilTest (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#DE350B}Some of the tests below use EasyMock as well. For those migrate both PowerMock and EasyMock to Mockito.{color} Unless stated in brackets the tests are in the connect module. A list of tests which still require to be moved from PowerMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: # ErrorHandlingTaskTest (owner: Divij) # SourceTaskOffsetCommiterTest (owner: Divij) # WorkerMetricsGroupTest (owner: Divij) # WorkerSinkTaskTest (owner: Divij) # WorkerSinkTaskThreadedTest (owner: Divij) # WorkerTaskTest # ErrorReporterTest # RetryWithToleranceOperatorTest # WorkerErrantRecordReporterTest # ConnectorsResourceTest # StandaloneHerderTest # KafkaConfigBackingStoreTest # KafkaOffsetBackingStoreTest (owner: Christo) (https://github.com/apache/kafka/pull/12418) # KafkaBasedLogTest # RetryUtilTest # RepartitionTopicTest (streams) (owner: Christo) # StateManagerUtilTest (streams) (owner: Christo) *The coverage report for the above tests after the change should be >= to what the coverage is now.* > Remaining PowerMock to Mockito tests > > > Key: KAFKA-14132 > URL: https://issues.apache.org/jira/browse/KAFKA-14132 > Project: Kafka > Issue Type: Sub-task >Reporter: Christo Lolov >Assignee: Christo Lolov >Priority: Major > > {color:#de350b}Some of the tests below use EasyMock as well. For those > migrate both PowerMock and EasyMock to Mockito.{color} > Unless stated in brackets the tests are in the connect module. > A list of tests which still require to be moved from PowerMock to Mockito as > of 2nd of August 2022 which do not have a Jira issue and do not have pull > requests I am aware of which are opened: > # ErrorHandlingTaskTest (owner: Divij) > # SourceTaskOffsetCommiterTest (owner: Divij) > # WorkerMetricsGroupTest (owner: Divij) > # WorkerSinkTaskTest (owner: Divij) > # WorkerSinkTaskThreadedTest (owner: Divij) > # WorkerTaskTest (owner: [~yash.mayya]) > # ErrorReporterTest (owner: [~yash.mayya]) > # RetryWithToleranceOperatorTest (owner: [~yash.mayya]) > # WorkerErrantRecordReporterTest (owner: [~yash.mayya]) > # ConnectorsResourceTest > # StandaloneHerderTest > # KafkaConfigBackingStoreTest > # KafkaOffsetBackingStoreTest (owner: Christo) > ([https://github.com/apache/kafka/pull/12418]) > # KafkaBasedLogTest > # RetryUtilTest > # RepartitionTopicTest (streams) (owner: Christo) > # StateManagerUtilTest (streams) (owner: Christo) > *The coverage report for the above tests after the change should be >= to > what the coverage is now.* -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-12887) Do not trigger user-customized ExceptionalHandler for RTE
[ https://issues.apache.org/jira/browse/KAFKA-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601779#comment-17601779 ] Ian Corne commented on KAFKA-12887: --- Why wouldn't you allow the user to handle this? > Do not trigger user-customized ExceptionalHandler for RTE > - > > Key: KAFKA-12887 > URL: https://issues.apache.org/jira/browse/KAFKA-12887 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Josep Prat >Priority: Major > Fix For: 3.1.0 > > > Today in StreamThread we have a try-catch block that captures all {{Throwable > e}} and then triggers {{this.streamsUncaughtExceptionHandler.accept(e)}}. > However, there are possible RTEs such as IllegalState/IllegalArgument > exceptions which are usually caused by bugs, etc. In such cases we should not > let users to decide what to do with these exceptions, but should let Streams > itself to enforce the decision, e.g. in the IllegalState/IllegalArgument we > should fail fast to notify the potential error. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] cadonna commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation
cadonna commented on code in PR #12600: URL: https://github.com/apache/kafka/pull/12600#discussion_r965876179 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final Map> activeTasksToCreate, - final Map> standbyTasksToCreate, - final Map> tasksToRecycle, - final Set tasksToCloseClean) { +private void classifyRunningAndSuspendedTasks(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final Map> tasksToRecycle, + final Set tasksToCloseClean) { for (final Task task : tasks.allTasks()) { +if (!task.isActive()) { +throw new IllegalStateException("Standby tasks should only be managed by the state updater"); +} final TaskId taskId = task.id(); if (activeTasksToCreate.containsKey(taskId)) { -if (task.isActive()) { -final Set topicPartitions = activeTasksToCreate.get(taskId); -if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) { -task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id())); -} -task.resume(); -} else { -throw new IllegalStateException("Standby tasks should only be managed by the state updater"); -} +handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId)); activeTasksToCreate.remove(taskId); } else if (standbyTasksToCreate.containsKey(taskId)) { -if (!task.isActive()) { -throw new IllegalStateException("Standby tasks should only be managed by the state updater"); -} else { -tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); -} +tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); standbyTasksToCreate.remove(taskId); } else { tasksToCloseClean.add(task); } } } +private void handleReAssignedActiveTask(final Task task, +final Set inputPartitions) { +if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) { +task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); +} +task.resume(); Review Comment: Although I like the more explicit nature of your proposal, I do not think that it is correct. If a task is in `SUSPENDED` the task transits to `RESTORING` only after the call to `task.resume()`. Reassigned revoked active tasks should be in `SUSPENDED` and not in `RESTORING`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-12887) Do not trigger user-customized ExceptionalHandler for RTE
[ https://issues.apache.org/jira/browse/KAFKA-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601779#comment-17601779 ] Ian Corne edited comment on KAFKA-12887 at 9/8/22 12:13 PM: Why wouldn't you allow the user to handle this? This is currently in 3.1.1 which is the version in the latest spring boot 2.7 and it disables handling errors in business logic. IllegalArgumentException is not only used for fatal errors.. was (Author: icorne): Why wouldn't you allow the user to handle this? > Do not trigger user-customized ExceptionalHandler for RTE > - > > Key: KAFKA-12887 > URL: https://issues.apache.org/jira/browse/KAFKA-12887 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Josep Prat >Priority: Major > Fix For: 3.1.0 > > > Today in StreamThread we have a try-catch block that captures all {{Throwable > e}} and then triggers {{this.streamsUncaughtExceptionHandler.accept(e)}}. > However, there are possible RTEs such as IllegalState/IllegalArgument > exceptions which are usually caused by bugs, etc. In such cases we should not > let users to decide what to do with these exceptions, but should let Streams > itself to enforce the decision, e.g. in the IllegalState/IllegalArgument we > should fail fast to notify the potential error. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] ashmeet13 commented on pull request #12414: KAFKA-14073 Logging the reason for Snapshot
ashmeet13 commented on PR #12414: URL: https://github.com/apache/kafka/pull/12414#issuecomment-1240694324 Thank you @jsancio, I see that the build for JVM 17 and JDK 2.13 has timed out again. What can be done from my end to fix this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on pull request #12590: KAFKA-7109: Close fetch sessions on close of consumer
divijvaidya commented on PR #12590: URL: https://github.com/apache/kafka/pull/12590#issuecomment-1240714341 @dajac since you are working on the consumer client protocol, perhaps you may also be interested in taking a look at this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov opened a new pull request, #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests
clolov opened a new pull request, #12607: URL: https://github.com/apache/kafka/pull/12607 Batch 5 of the tests detailed in https://issues.apache.org/jira/browse/KAFKA-14133 which use EasyMock and need to be moved to Mockito. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests
clolov commented on PR #12607: URL: https://github.com/apache/kafka/pull/12607#issuecomment-1240754283 There were stubbings which were no longer on the call path so I have removed them. The way I checked that I wasn't changing the test behaviour was to use EasyMock.verify on the mocks and confirming that the stubbings were indeed unused prior to my change. There are multiple possibilities for refactoring, but I chose to keep the changes as close to the EasyMock implementation as the PR is already big. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests
clolov commented on PR #12607: URL: https://github.com/apache/kafka/pull/12607#issuecomment-1240756214 I am aware that there are merge conflicts and I will aim to address them over the coming days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] clolov commented on pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests
clolov commented on PR #12607: URL: https://github.com/apache/kafka/pull/12607#issuecomment-1240757288 @guozhangwang and @cadonna for visibility -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14133) Remaining EasyMock to Mockito tests
[ https://issues.apache.org/jira/browse/KAFKA-14133?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Christo Lolov updated KAFKA-14133: -- Description: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875A}Merged{color} # -{color:#00875A}WorkerConnectorTest{color} (connect) (owner:- [~yash.mayya] -)- # -{color:#00875A}WorkerCoordinatorTest{color} (connect) (owner:- [~yash.mayya] -)- # -{color:#00875A}RootResourceTest{color} (connect) (owner:- [~yash.mayya] -)- # -{color:#00875A}ByteArrayProducerRecordEquals{color} (connect) (owner:- [~yash.mayya] -)- # {color:#00875A}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875A}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875A}KStreamPrintTest{color} (owner: Christo) # {color:#00875A}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875A}MaterializedInternalTest{color} (owner: Christo) # {color:#00875A}TransformerSupplierAdapterTest{color} (owner: Christo) # {color:#00875A}KTableSuppressProcessorMetricsTest{color} (owner: Christo) # {color:#00875A}ClientUtilsTest{color} (owner: Christo) # {color:#00875A}HighAvailabilityStreamsPartitionAssignorTest{color} (owner: Christo) # {color:#ff8b00}StreamsRebalanceListenerTest{color} (owner: Christo) # {color:#ff8b00}TimestampedKeyValueStoreMaterializerTest{color} (owner: Christo) # {color:#ff8b00}CachingInMemoryKeyValueStoreTest{color} (owner: Christo) # {color:#ff8b00}CachingInMemorySessionStoreTest{color} (owner: Christo) # {color:#ff8b00}CachingPersistentSessionStoreTest{color} (owner: Christo) # {color:#ff8b00}CachingPersistentWindowStoreTest{color} (owner: Christo) # {color:#ff8b00}ChangeLoggingKeyValueBytesStoreTest{color} (owner: Christo) # {color:#ff8b00}ChangeLoggingTimestampedKeyValueBytesStoreTest{color} (owner: Christo) # {color:#ff8b00}CompositeReadOnlyWindowStoreTest{color} (owner: Christo) # {color:#ff8b00}KeyValueStoreBuilderTest{color} (owner: Christo) # {color:#ff8b00}RocksDBStoreTest{color} (owner: Christo) # {color:#ff8b00}StreamThreadStateStoreProviderTest{color} (owner: Christo) # {color:#ff8b00}TopologyTest{color} (owner: Christo) # {color:#ff8b00}KTableSuppressProcessorTest{color} (owner: Christo) # {color:#ff8b00}ChangeLoggingSessionBytesStoreTest{color} (owner: Christo) # {color:#ff8b00}ChangeLoggingTimestampedWindowBytesStoreTest{color} (owner: Christo) # {color:#ff8b00}ChangeLoggingWindowBytesStoreTest{color} (owner: Christo) # {color:#ff8b00}MeteredTimestampedWindowStoreTest{color} (owner: Christo) # {color:#FF8B00}TaskManagerTest{color} (owner: Christo) # InternalTopicManagerTest (owner: Christo) # ProcessorContextImplTest (owner: Christo) # WriteConsistencyVectorTest (owner: Christo) # StreamsAssignmentScaleTest (owner: Christo) # StreamsPartitionAssignorTest (owner: Christo) # AssignmentTestUtils (owner: Christo) # ProcessorStateManagerTest ({*}WIP{*} owner: Matthew) # StandbyTaskTest ({*}WIP{*} owner: Matthew) # StoreChangelogReaderTest ({*}WIP{*} owner: Matthew) # StreamTaskTest ({*}WIP{*} owner: Matthew) # StreamThreadTest ({*}WIP{*} owner: Matthew) # StreamsMetricsImplTest # TimeOrderedCachingPersistentWindowStoreTest # TimeOrderedWindowStoreTest *The coverage report for the above tests after the change should be >= to what the coverage is now.* was: {color:#de350b}There are tests which use both PowerMock and EasyMock. I have put those in https://issues.apache.org/jira/browse/KAFKA-14132. Tests here rely solely on EasyMock.{color} Unless stated in brackets the tests are in the streams module. A list of tests which still require to be moved from EasyMock to Mockito as of 2nd of August 2022 which do not have a Jira issue and do not have pull requests I am aware of which are opened: {color:#ff8b00}In Review{color} {color:#00875A}Merged{color} # -{color:#00875A}WorkerConnectorTest{color} (connect) (owner:- [~yash.mayya] -)- # -{color:#00875A}WorkerCoordinatorTest{color} (connect) (owner:- [~yash.mayya] -)- # -{color:#00875A}RootResourceTest{color} (connect) (owner:- [~yash.mayya] -)- # -{color:#00875A}ByteArrayProducerRecordEquals{color} (connect) (owner:- [~yash.mayya] -)- # {color:#00875A}KStreamFlatTransformTest{color} (owner: Christo) # {color:#00875A}KStreamFlatTransformValuesTest{color} (owner: Christo) # {color:#00875A}KStreamPrintTest{color} (owner: Christo) # {color:#00875A}KStreamRepartitionTest{color} (owner: Christo) # {color:#00875A}MaterializedInternalTest{color} (ow
[GitHub] [kafka] C0urante merged pull request #12478: KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration
C0urante merged PR #12478: URL: https://github.com/apache/kafka/pull/12478 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13952) Infinite retry timeout is not working
[ https://issues.apache.org/jira/browse/KAFKA-13952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-13952. --- Fix Version/s: 3.4.0 Resolution: Fixed > Infinite retry timeout is not working > - > > Key: KAFKA-13952 > URL: https://issues.apache.org/jira/browse/KAFKA-13952 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Jakub Malek >Assignee: Yash Mayya >Priority: Minor > Fix For: 3.4.0 > > > The > [documentation|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L129] > for {{errors.retry.timeout}} property says: > {noformat} > The maximum duration in milliseconds that a failed operation will be > reattempted. The default is 0, which means no retries will be attempted. Use > -1 for infinite retries.{noformat} > But it seems that value {{-1}} is not respected by the > [RetryWithToleranceOperator|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/RetryWithToleranceOperator.java] > that simply compares elapsed time until {{startTime + errorRetryTimeout}} is > exceeded. > I was also not able to find any conversion of the raw config value before > {{RetryWithToleranceOperator}} is initialized. > I run a simple test with a connector using mocked transformation plugin that > throws the {{RetriableException}} and it seems to prove my claim. > I'm not sure if it's documentation or implementation error or maybe I've > missed something. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] yashmayya commented on pull request #12478: KAFKA-13952 fix RetryWithToleranceOperator to respect infinite retries configuration
yashmayya commented on PR #12478: URL: https://github.com/apache/kafka/pull/12478#issuecomment-1240782194 Thanks for the detailed reviews and for bearing with me through multiple rounds of review on this one Chris! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a diff in pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers
mimaison commented on code in PR #12544: URL: https://github.com/apache/kafka/pull/12544#discussion_r966049063 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -1138,5 +1142,13 @@ else if (value instanceof Long) private String className(Object o) { return o != null ? o.getClass().getName() : "null"; } -} +private static void maybeAddClientId(Map clientProps, String groupId) { +String clientId = "connect-cluster"; +if (groupId != null) { Review Comment: Can `groupId` actually be null here? This should only run in distributed mode ## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java: ## @@ -106,6 +108,7 @@ public Connect startConnect(Map workerProps) { // Create the admin client to be shared by all backing stores. Map adminProps = new HashMap<>(config.originals()); ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId); +adminProps.putIfAbsent(CLIENT_ID_CONFIG, "connect-cluster-" + config.groupId()); Review Comment: With the default config, you get `connect-cluster-connect-cluster` for the client-id. I wonder if we need to add the `connect-cluster-` prefix at all? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14198) Release package contains snakeyaml 1.30
[ https://issues.apache.org/jira/browse/KAFKA-14198?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601858#comment-17601858 ] Mickael Maison commented on KAFKA-14198: [~jagsancio] I'm on PTO tomorrow for 2 weeks so unfortunately I'm not sure if I'll have the time to look into this. > Release package contains snakeyaml 1.30 > --- > > Key: KAFKA-14198 > URL: https://issues.apache.org/jira/browse/KAFKA-14198 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.3.0 >Reporter: Mickael Maison >Priority: Major > Fix For: 3.3.0 > > > snakeyaml 1.30 is vulnerable to CVE-2022-25857: > https://security.snyk.io/vuln/SNYK-JAVA-ORGYAML-2806360 > It looks like we pull this dependency because of swagger. It's unclear how or > even if this can be exploited in Kafka but it's flagged by scanning tools. > I wonder if we could make the swagger dependencies compile time only and > avoid shipping them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #12602: KAFKA-13985: Skip committing MirrorSourceTask records without metadata
C0urante commented on PR #12602: URL: https://github.com/apache/kafka/pull/12602#issuecomment-1240821236 Thanks @rgroothuijsen! I agree with the `DEBUG` level for logging; it's tempting to make this silent, but I'd prefer to err on the side of giving users more information since we can't add logging to releases once they've gone out. RE testing, there are a few approaches I can think of: 0. Simply remove the try/catch block in `MirrorSourceTask::commitRecord`, since it essentially duplicates [logic in the Connect framework](https://github.com/apache/kafka/blob/0c97be53fa7e1e0720f2086b5d9d80ffcc1db470/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTask.java#L544-L548) with the small change of using the `WARN` level for logging instead of `ERROR` 0. If we'd prefer to keep the logging as-is, we can isolate all of the logic for the method except exception handling into a package-private `doCommitRecord` method, which is then wrapped by the public `commitRecord` method. I.e.: ```java @Override public void commitRecord(SourceRecord record, RecordMetadata metadata) { try { doCommitRecord(record, metadata); } catch (Throwable e) { log.warn("Failure committing record.", e); } } // Visible for testing void doCommitRecord(SourceRecord record, RecordMetadata metadata) { if (stopping) { return; } if (!metadata.hasOffset()) { log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic()); return; } TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition()); long latency = System.currentTimeMillis() - record.timestamp(); metrics.countRecord(topicPartition); metrics.replicationLatency(topicPartition, latency); TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition()); long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset()); long downstreamOffset = metadata.offset(); maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation
cadonna commented on PR #12600: URL: https://github.com/apache/kafka/pull/12600#issuecomment-1240827367 > we reduce removeRevokedTasksFromStateUpdater to only record in the pending tasks to suspend, but not try to remove from state updaters. And in handleAssignment we just update the tasks bookkeeping from suspended to closed in addition calling remove from the state updater. I am not sure I can follow. Are you proposing to not recycle or resume revoked active tasks? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #12601: MINOR: Retry on test failure for branch builds and increase max test retry to 10
jsancio commented on PR #12601: URL: https://github.com/apache/kafka/pull/12601#issuecomment-1240836278 Merging changes and cherry picking to 3.3. Errors seem unrelated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio merged pull request #12601: MINOR: Retry on test failure for branch builds and increase max test retry to 10
jsancio merged PR #12601: URL: https://github.com/apache/kafka/pull/12601 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers
C0urante commented on code in PR #12544: URL: https://github.com/apache/kafka/pull/12544#discussion_r966081693 ## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java: ## @@ -106,6 +108,7 @@ public Connect startConnect(Map workerProps) { // Create the admin client to be shared by all backing stores. Map adminProps = new HashMap<>(config.originals()); ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId); +adminProps.putIfAbsent(CLIENT_ID_CONFIG, "connect-cluster-" + config.groupId()); Review Comment: Yeah, that's fair. I was thinking it might be useful broker-side to call this out as a Connect application but it's not a must-have and the stuttering that you've noted with `connect-cluster-connect-cluster` is not great. Will remove. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Danny02 opened a new pull request, #12608: Enable KStream to be merged with itself
Danny02 opened a new pull request, #12608: URL: https://github.com/apache/kafka/pull/12608 Why: It is an interesting question what should be the result when merging a KStream with itself. Should the merge duplicate the messages or should it be a noop. I think the only reasonable solution is to duplicate the messages because there are many different ways to disguise a KStream (e.g. adding peek operation on it). It is therefore impossible to implement the solution where it is a noop. How does it help with resolving the issue: This change makes the behavior of the merge operation consistent. Side effects: Using a list instead of a set for the parent nodes could have side effects. The test suit did not detect any. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers
C0urante commented on code in PR #12544: URL: https://github.com/apache/kafka/pull/12544#discussion_r966084441 ## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ## @@ -1138,5 +1142,13 @@ else if (value instanceof Long) private String className(Object o) { return o != null ? o.getClass().getName() : "null"; } -} +private static void maybeAddClientId(Map clientProps, String groupId) { +String clientId = "connect-cluster"; +if (groupId != null) { Review Comment: Right now we have that guarantee, but since the `*BackingStore` classes accept a `WorkerConfig` instead of a `DistributedConfig`, it's difficult to enforce that guarantee. Plus, if we ever want to use any of these backing stores with standalone mode (or a new mode that doesn't use Kafka's group membership API), it'll make that migration easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12544: KAFKA-14098: Add meaningful default client IDs for Connect workers
C0urante commented on code in PR #12544: URL: https://github.com/apache/kafka/pull/12544#discussion_r966088764 ## connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java: ## @@ -106,6 +108,7 @@ public Connect startConnect(Map workerProps) { // Create the admin client to be shared by all backing stores. Map adminProps = new HashMap<>(config.originals()); ConnectUtils.addMetricsContextProperties(adminProps, config, kafkaClusterId); +adminProps.putIfAbsent(CLIENT_ID_CONFIG, "connect-cluster-" + config.groupId()); Review Comment: One thing that I'm wondering a bit more about is if we should add some kind of unique identifier for each worker within the Connect cluster, since without one, it becomes harder to use broker logs to debug issues. It's tempting to use the worker's advertised URL to identify it since that comes from the config and is likely to be human-readable. It may be a little ugly in some edge cases, though. Other possibilities are to use a hash of the advertised URL, or a UUID. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)
jsancio commented on PR #11783: URL: https://github.com/apache/kafka/pull/11783#issuecomment-1240857861 @showuon @C0urante @mimaison these are the test failures: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/system-test-kafka-branch-builder--1662600230--C0urante--kafka-1-system-tests--2f62b7469/2022-09-07--001./2022-09-07--001./report.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)
C0urante commented on PR #11783: URL: https://github.com/apache/kafka/pull/11783#issuecomment-1240862547 @jsancio looks like all the Connect tests were green, and the failures were unrelated. LGTY? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)
jsancio commented on PR #11783: URL: https://github.com/apache/kafka/pull/11783#issuecomment-1240920282 @C0urante sounds good. Do you want to merge it and cherry-pick it to 3.3? I can also merge it if you update the description with what you want me to write in the commit message. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14203) KRaft broker should disable snapshot generation after error replaying the metadata log
[ https://issues.apache.org/jira/browse/KAFKA-14203?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio reassigned KAFKA-14203: -- Assignee: David Arthur > KRaft broker should disable snapshot generation after error replaying the > metadata log > -- > > Key: KAFKA-14203 > URL: https://issues.apache.org/jira/browse/KAFKA-14203 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 3.3.0 >Reporter: Jose Armando Garcia Sancio >Assignee: David Arthur >Priority: Major > Fix For: 3.3.0 > > > The broker skips records for which there was an error when replaying the log. > This means that the MetadataImage has diverged from the state persistent in > the log. The broker should disable snapshot generation else the next time a > snapshot gets generated it will result in inconsistent data getting persisted. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12598: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side)
hachikuji commented on code in PR #12598: URL: https://github.com/apache/kafka/pull/12598#discussion_r966196258 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -982,6 +982,12 @@ class GroupCoordinator(val brokerId: Int, ): Option[Errors] = { if (group.is(Dead)) { Some(Errors.COORDINATOR_NOT_AVAILABLE) +} else if (!isTransactional && generationId < 0 && group.is(Empty)) { Review Comment: I am not sure I follow the reason for the transactional check. Can you clarify? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12598: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side)
dajac commented on code in PR #12598: URL: https://github.com/apache/kafka/pull/12598#discussion_r966198873 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -982,6 +982,12 @@ class GroupCoordinator(val brokerId: Int, ): Option[Errors] = { if (group.is(Dead)) { Some(Errors.COORDINATOR_NOT_AVAILABLE) +} else if (!isTransactional && generationId < 0 && group.is(Empty)) { Review Comment: I thought that transactional commits should never comes with a generation < 0. I suppose that this is wrong. I suppose that a producer could commit transactional offsets without using a consumer group. Is this right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12604: KAFKA-14188; Getting started for Kafka with KRaft
jsancio commented on code in PR #12604: URL: https://github.com/apache/kafka/pull/12604#discussion_r966204598 ## docs/quickstart.html: ## @@ -46,6 +46,14 @@ NOTE: Your local environment must have Java 8+ installed. + +Apache Kafka can be started using ZooKeeper or KRaft. To get started with either configuration follow one the sections below but not both. + + + +Kafka with ZooKeeper + + Run the following commands in order to start all services in the correct order: Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12604: KAFKA-14188; Getting started for Kafka with KRaft
jsancio commented on code in PR #12604: URL: https://github.com/apache/kafka/pull/12604#discussion_r966205030 ## docs/quickstart.html: ## @@ -64,6 +72,28 @@ Once all services have successfully launched, you will have a basic Kafka environment running and ready to use. + + +Kafka with KRaft + + + +Generate a Cluster UUID + + +$ KAFKA_CLUSTER_ID = $(bin/kafka-storege.sh random-uuid) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #12604: KAFKA-14188; Getting started for Kafka with KRaft
jsancio commented on PR #12604: URL: https://github.com/apache/kafka/pull/12604#issuecomment-1240981827 Thanks for the review @showuon . I addressed your comments. I also fixed the `rm` command at the end of the quickstart. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a diff in pull request #12598: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side)
hachikuji commented on code in PR #12598: URL: https://github.com/apache/kafka/pull/12598#discussion_r966208033 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -982,6 +982,12 @@ class GroupCoordinator(val brokerId: Int, ): Option[Errors] = { if (group.is(Dead)) { Some(Errors.COORDINATOR_NOT_AVAILABLE) +} else if (!isTransactional && generationId < 0 && group.is(Empty)) { Review Comment: It seems to be allowed at the moment, though that is definitely not a common case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12598: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side)
dajac commented on code in PR #12598: URL: https://github.com/apache/kafka/pull/12598#discussion_r966208886 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -982,6 +982,12 @@ class GroupCoordinator(val brokerId: Int, ): Option[Errors] = { if (group.is(Dead)) { Some(Errors.COORDINATOR_NOT_AVAILABLE) +} else if (!isTransactional && generationId < 0 && group.is(Empty)) { Review Comment: I can remove that check to stay consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a diff in pull request #12598: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side)
dajac commented on code in PR #12598: URL: https://github.com/apache/kafka/pull/12598#discussion_r966215071 ## core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala: ## @@ -982,6 +982,12 @@ class GroupCoordinator(val brokerId: Int, ): Option[Errors] = { if (group.is(Dead)) { Some(Errors.COORDINATOR_NOT_AVAILABLE) +} else if (!isTransactional && generationId < 0 && group.is(Empty)) { Review Comment: Removed it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request, #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly
ijuma opened a new pull request, #12609: URL: https://github.com/apache/kafka/pull/12609 Verified that the artifact generated by `releaseTarGz` no longer includes swagger-jaxrs2 or its dependencies (like snakeyaml). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly
jsancio commented on PR #12609: URL: https://github.com/apache/kafka/pull/12609#issuecomment-1240998511 Thanks for the PR @ijuma . What do think @showuon @C0urante @mimaison? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio opened a new pull request, #12610: MINOR; Update documentation for printing dependencies
jsancio opened a new pull request, #12610: URL: https://github.com/apache/kafka/pull/12610 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12610: MINOR; Update documentation for printing dependencies
ijuma commented on code in PR #12610: URL: https://github.com/apache/kafka/pull/12610#discussion_r966242324 ## README.md: ## @@ -202,7 +202,7 @@ If needed, you can specify the Scala version with `-PscalaVersion=2.13`. ./gradlew testJar ### Determining how transitive dependencies are added ### Review Comment: Maybe we should delete this and and move "Dependency Analysis" above "common build options" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12610: MINOR; Update documentation for printing dependencies
jsancio commented on code in PR #12610: URL: https://github.com/apache/kafka/pull/12610#discussion_r966244134 ## README.md: ## @@ -202,7 +202,7 @@ If needed, you can specify the Scala version with `-PscalaVersion=2.13`. ./gradlew testJar ### Determining how transitive dependencies are added ### Review Comment: I agree. I missed that section. Let me update it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14198) Release package contains snakeyaml 1.30
[ https://issues.apache.org/jira/browse/KAFKA-14198?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio reassigned KAFKA-14198: -- Assignee: Ismael Juma > Release package contains snakeyaml 1.30 > --- > > Key: KAFKA-14198 > URL: https://issues.apache.org/jira/browse/KAFKA-14198 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.3.0 >Reporter: Mickael Maison >Assignee: Ismael Juma >Priority: Major > Fix For: 3.3.0 > > > snakeyaml 1.30 is vulnerable to CVE-2022-25857: > https://security.snyk.io/vuln/SNYK-JAVA-ORGYAML-2806360 > It looks like we pull this dependency because of swagger. It's unclear how or > even if this can be exploited in Kafka but it's flagged by scanning tools. > I wonder if we could make the swagger dependencies compile time only and > avoid shipping them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] philipnee commented on a diff in pull request #12603: Pausing partition to prevent duplication
philipnee commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r966254369 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { joinPrepareTimer.update(); } +final SortedSet partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId); Review Comment: Do we need to recompute the partitions right before the revocation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13766) Use `max.poll.interval.ms` as the timeout during complete-rebalance phase
[ https://issues.apache.org/jira/browse/KAFKA-13766?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17601949#comment-17601949 ] Guozhang Wang commented on KAFKA-13766: --- Inside onCompleteJoin, in the block starting with {{// trigger the awaiting join group response callback for all the members after rebalancing{{ Indicates that once we are in the completing rebalance phase, we’ve re-enabled the HB with session timeout. I.e. in that phase we effectively have two timers: {{completeAndScheduleNextHeartbeatExpiration(group, member)}} and {{schedulePendingSync(group)}} whichever triggers first, we would fail the member and re-trigger the rebalance. And since in general session.timeout is smaller than rebalance timeout, we would hit the former if there’s a delay on assignment. > Use `max.poll.interval.ms` as the timeout during complete-rebalance phase > - > > Key: KAFKA-13766 > URL: https://issues.apache.org/jira/browse/KAFKA-13766 > Project: Kafka > Issue Type: Bug > Components: core, group-coordinator >Reporter: Guozhang Wang >Assignee: David Jacot >Priority: Major > Labels: new-rebalance-should-fix > > The lifetime of a consumer can be categorized in three phases: > 1) During normal processing, the broker expects a hb request periodically > from consumer, and that is timed by the `session.timeout.ms`. > 2) During the prepare_rebalance, the broker would expect a join-group request > to be received within the rebalance.timeout, which is piggy-backed as the > `max.poll.interval.ms`. > 3) During the complete_rebalance, the broker would expect a sync-group > request to be received again within the `session.timeout.ms`. > So during different phases of the life of the consumer, different timeout > would be used to bound the timer. > Nowadays with cooperative rebalance protocol, we can still return records and > process them in the middle of a rebalance from {{consumer.poll}}. In that > case, for phase 3) we should also use the `max.poll.interval.ms` to bound the > timer, which is in practice larger than `session.timeout.ms`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation
guozhangwang commented on PR #12600: URL: https://github.com/apache/kafka/pull/12600#issuecomment-1241058075 > I am not sure I can follow. Are you proposing to not recycle or resume revoked active tasks? What I'm proposing is that, for restoring active tasks, we can actually ignore them at the `handleRevocation` phase, but only handle them at the `handleAssignment` phase, where we would then know if the task is still owned, or should be closed, or should be recycled. At that time we add them to the corresponding pending tasks, and then call `stateUpdater.remove`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation
guozhangwang commented on code in PR #12600: URL: https://github.com/apache/kafka/pull/12600#discussion_r966274214 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final Map> activeTasksToCreate, - final Map> standbyTasksToCreate, - final Map> tasksToRecycle, - final Set tasksToCloseClean) { +private void classifyRunningAndSuspendedTasks(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final Map> tasksToRecycle, + final Set tasksToCloseClean) { for (final Task task : tasks.allTasks()) { +if (!task.isActive()) { +throw new IllegalStateException("Standby tasks should only be managed by the state updater"); +} final TaskId taskId = task.id(); if (activeTasksToCreate.containsKey(taskId)) { -if (task.isActive()) { -final Set topicPartitions = activeTasksToCreate.get(taskId); -if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) { -task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id())); -} -task.resume(); -} else { -throw new IllegalStateException("Standby tasks should only be managed by the state updater"); -} +handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId)); activeTasksToCreate.remove(taskId); } else if (standbyTasksToCreate.containsKey(taskId)) { -if (!task.isActive()) { -throw new IllegalStateException("Standby tasks should only be managed by the state updater"); -} else { -tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); -} +tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); standbyTasksToCreate.remove(taskId); } else { tasksToCloseClean.add(task); } } } +private void handleReAssignedActiveTask(final Task task, +final Set inputPartitions) { +if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) { +task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); +} +task.resume(); Review Comment: I'm not sure I follow, but in anyways after thinking about it again I think we do not need the pending-tasks-suspended as we could ignore those restoring active tasks at the `handleRevocation` phase. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14201) Consumer should not send group instance ID if committing with empty member ID
[ https://issues.apache.org/jira/browse/KAFKA-14201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio updated KAFKA-14201: --- Fix Version/s: 3.3.0 > Consumer should not send group instance ID if committing with empty member ID > - > > Key: KAFKA-14201 > URL: https://issues.apache.org/jira/browse/KAFKA-14201 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: David Jacot >Priority: Major > Fix For: 3.3.0 > > > The consumer group instance ID is used to support a notion of "static" > consumer groups. The idea is to be able to identify the same group instance > across restarts so that a rebalance is not needed. However, if the user sets > `group.instance.id` in the consumer configuration, but uses "simple" > assignment with `assign()`, then the instance ID nevertheless is sent in the > OffsetCommit request to the coordinator. This may result in a surprising > UNKNOWN_MEMBER_ID error. The consumer should probably be smart enough to only > send the instance ID when committing as part of a consumer group. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] guozhangwang commented on a diff in pull request #12607: KAFKA-14133: Replace EasyMock with Mockito in streams tests
guozhangwang commented on code in PR #12607: URL: https://github.com/apache/kafka/pull/12607#discussion_r966289656 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -1300,28 +1229,16 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { // `handleAssignment` expectRestoreToBeCompleted(consumer, changeLogReader); -expect(activeTaskCreator.createTasks(anyObject(), eq(taskId00Assignment))).andStubReturn(singletonList(task00)); - expect(standbyTaskCreator.createTasks(eq(taskId01Assignment))).andStubReturn(singletonList(task01)); -topologyBuilder.addSubscribedTopicsFromAssignment(anyObject(), anyString()); -expectLastCall().anyTimes(); +when(activeTaskCreator.createTasks(any(), eq(taskId00Assignment))).thenReturn(singletonList(task00)); + when(standbyTaskCreator.createTasks(eq(taskId01Assignment))).thenReturn(singletonList(task01)); +// The second attempt will return empty tasks. makeTaskFolders(taskId00.toString(), taskId01.toString()); expectLockObtainedFor(taskId00, taskId01); -// The second attempt will return empty tasks. -makeTaskFolders(); Review Comment: Why we can remove those calls? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -325,25 +318,21 @@ public void shouldHandleRemovedTasksToRecycleFromStateUpdater() { when(stateUpdater.drainRemovedTasks()).thenReturn(mkSet(task00, task01)); taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true); -expect(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer))) -.andStubReturn(task01Converted); -activeTaskCreator.closeAndRemoveTaskProducerIfNeeded(anyObject()); -expectLastCall().once(); -expect(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions))) -.andStubReturn(task00Converted); -expect(consumer.assignment()).andReturn(emptySet()).anyTimes(); -consumer.resume(anyObject()); -expectLastCall().anyTimes(); -replay(activeTaskCreator, standbyTaskCreator, topologyBuilder, consumer); +when(activeTaskCreator.createActiveTaskFromStandby(eq(task01), eq(taskId01Partitions), eq(consumer))) +.thenReturn(task01Converted); +when(standbyTaskCreator.createStandbyTaskFromActive(eq(task00), eq(taskId00Partitions))) +.thenReturn(task00Converted); +when(consumer.assignment()).thenReturn(emptySet()); taskManager.tasks().addPendingTaskToRecycle(taskId00, taskId00Partitions); taskManager.tasks().addPendingTaskToRecycle(taskId01, taskId01Partitions); taskManager.tryToCompleteRestoration(time.milliseconds(), noOpResetter -> { }); -Mockito.verify(task00Converted).initializeIfNeeded(); -Mockito.verify(task01Converted).initializeIfNeeded(); -Mockito.verify(stateUpdater).add(task00Converted); -Mockito.verify(stateUpdater).add(task01Converted); +verify(task00Converted).initializeIfNeeded(); +verify(task01Converted).initializeIfNeeded(); +verify(stateUpdater).add(task00Converted); +verify(stateUpdater).add(task01Converted); +verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(any()); Review Comment: How to specify that we expect this function call only once? Should we use `verify(activeTaskCreator, times(1)).func();` instead? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -4091,21 +3815,20 @@ public void shouldListNotPausedTasks() { topologyMetadata.pauseTopology(UNNAMED_TOPOLOGY); assertEquals(taskManager.notPausedTasks().size(), 0); + +verifyConsumerResumedWithAssignment(consumer); Review Comment: Why add this additional verification? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ## @@ -271,11 +265,13 @@ public void shouldClassifyExistingTasksWithStateUpdater() { taskManager.handleAssignment(standbyTasks, restoringActiveTasks); -Mockito.verify(stateUpdater).getTasks(); -Mockito.verify(stateUpdater).remove(standbyTaskToRecycle.id()); -Mockito.verify(stateUpdater).remove(standbyTaskToClose.id()); -Mockito.verify(stateUpdater).remove(restoringActiveTaskToRecycle.id()); -Mockito.verify(stateUpdater).remove(restoringActiveTaskToClose.id()); +verify(stateUpdater).getTasks(); +verify(stateUpdater).remove(standbyTaskToRecycle.id()); +verify(stateUpdater).remove(standbyTaskToClose.id()); +verify(stateUpdater).remove(restoringActiveTaskToRecycle.id()); +verify(stateUpdater).remove(re
[jira] [Updated] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky
[ https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14196: Affects Version/s: (was: 3.2.1) > Duplicated consumption during rebalance, causing OffsetValidationTest to act > flaky > -- > > Key: KAFKA-14196 > URL: https://issues.apache.org/jira/browse/KAFKA-14196 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.3.0 >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: new-consumer-threading-should-fix > > Several flaky tests under OffsetValidationTest are indicating potential > consumer duplication issue, when autocommit is enabled. I believe this is > affecting *3.2* and onward. Below shows the failure message: > > {code:java} > Total consumed records 3366 did not match consumed position 3331 {code} > > After investigating the log, I discovered that the data consumed between the > start of a rebalance event and the async commit was lost for those failing > tests. In the example below, the rebalance event kicks in at around > 1662054846995 (first record), and the async commit of the offset 3739 is > completed at around 1662054847015 (right before partitions_revoked). > > {code:java} > {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]} > {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]} > {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]} > {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]} > {code} > A few things to note here: > # Manually calling commitSync in the onPartitionsRevoke cb seems to > alleviate the issue > # Setting includeMetadataInTimeout to false also seems to alleviate the > issue. > The above tries seems to suggest that contract between poll() and > asyncCommit() is broken. AFAIK, we implicitly uses poll() to ack the > previously fetched data, and the consumer would (try to) commit these offsets > in the current poll() loop. However, it seems like as the poll continues to > loop, the "acked" data isn't being committed. > > I believe this could be introduced in KAFKA-14024, which originated from > KAFKA-13310. > More specifically, (see the comments below), the ConsumerCoordinator will > alway return before async commit, due to the previous incomplete commit. > However, this is a bit contradictory here because: > # I think we want to commit asynchronously while the poll continues, and if > we do that, we are back to KAFKA-14024, that the consumer will get rebalance > timeout and get kicked out of the group. > # But we also need to commit all the "acked" offsets before revoking the > partition, and this has to be blocked. > *Steps to Reproduce the Issue:* > # Check out AK 3.2 > # Run this several times: (Recommend to only run runs with autocommit > enabled in consumer_test.py to save time) > {code:java} > _DUCKTAPE_OPTIONS="--debug" > TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure" > bash tests/docker/run_tests.sh {code} > > *Steps to Diagnose the Issue:* > # Open the test results in *results/* > # Go to the consumer log. It might look like this > > {code:java} > results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY > {code} > 3. Find the docker instance that has partition getting revoked and rejoined. > Observed the offset before and after. > *Propose Fixes:* > TBD -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky
[ https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14196: Fix Version/s: 3.3.0 3.2.2 > Duplicated consumption during rebalance, causing OffsetValidationTest to act > flaky > -- > > Key: KAFKA-14196 > URL: https://issues.apache.org/jira/browse/KAFKA-14196 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.2.1 >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: new-consumer-threading-should-fix > Fix For: 3.3.0, 3.2.2 > > > Several flaky tests under OffsetValidationTest are indicating potential > consumer duplication issue, when autocommit is enabled. I believe this is > affecting *3.2* and onward. Below shows the failure message: > > {code:java} > Total consumed records 3366 did not match consumed position 3331 {code} > > After investigating the log, I discovered that the data consumed between the > start of a rebalance event and the async commit was lost for those failing > tests. In the example below, the rebalance event kicks in at around > 1662054846995 (first record), and the async commit of the offset 3739 is > completed at around 1662054847015 (right before partitions_revoked). > > {code:java} > {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]} > {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]} > {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]} > {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]} > {code} > A few things to note here: > # Manually calling commitSync in the onPartitionsRevoke cb seems to > alleviate the issue > # Setting includeMetadataInTimeout to false also seems to alleviate the > issue. > The above tries seems to suggest that contract between poll() and > asyncCommit() is broken. AFAIK, we implicitly uses poll() to ack the > previously fetched data, and the consumer would (try to) commit these offsets > in the current poll() loop. However, it seems like as the poll continues to > loop, the "acked" data isn't being committed. > > I believe this could be introduced in KAFKA-14024, which originated from > KAFKA-13310. > More specifically, (see the comments below), the ConsumerCoordinator will > alway return before async commit, due to the previous incomplete commit. > However, this is a bit contradictory here because: > # I think we want to commit asynchronously while the poll continues, and if > we do that, we are back to KAFKA-14024, that the consumer will get rebalance > timeout and get kicked out of the group. > # But we also need to commit all the "acked" offsets before revoking the > partition, and this has to be blocked. > *Steps to Reproduce the Issue:* > # Check out AK 3.2 > # Run this several times: (Recommend to only run runs with autocommit > enabled in consumer_test.py to save time) > {code:java} > _DUCKTAPE_OPTIONS="--debug" > TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure" > bash tests/docker/run_tests.sh {code} > > *Steps to Diagnose the Issue:* > # Open the test results in *results/* > # Go to the consumer log. It might look like this > > {code:java} > results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY > {code} > 3. Find the docker instance that has partition getting revoked and rejoined. > Observed the offset before and after. > *Propose Fixes:* > TBD -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky
[ https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14196: Priority: Blocker (was: Major) > Duplicated consumption during rebalance, causing OffsetValidationTest to act > flaky > -- > > Key: KAFKA-14196 > URL: https://issues.apache.org/jira/browse/KAFKA-14196 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.2.1 >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: new-consumer-threading-should-fix > Fix For: 3.3.0, 3.2.2 > > > Several flaky tests under OffsetValidationTest are indicating potential > consumer duplication issue, when autocommit is enabled. I believe this is > affecting *3.2* and onward. Below shows the failure message: > > {code:java} > Total consumed records 3366 did not match consumed position 3331 {code} > > After investigating the log, I discovered that the data consumed between the > start of a rebalance event and the async commit was lost for those failing > tests. In the example below, the rebalance event kicks in at around > 1662054846995 (first record), and the async commit of the offset 3739 is > completed at around 1662054847015 (right before partitions_revoked). > > {code:java} > {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]} > {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]} > {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]} > {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]} > {code} > A few things to note here: > # Manually calling commitSync in the onPartitionsRevoke cb seems to > alleviate the issue > # Setting includeMetadataInTimeout to false also seems to alleviate the > issue. > The above tries seems to suggest that contract between poll() and > asyncCommit() is broken. AFAIK, we implicitly uses poll() to ack the > previously fetched data, and the consumer would (try to) commit these offsets > in the current poll() loop. However, it seems like as the poll continues to > loop, the "acked" data isn't being committed. > > I believe this could be introduced in KAFKA-14024, which originated from > KAFKA-13310. > More specifically, (see the comments below), the ConsumerCoordinator will > alway return before async commit, due to the previous incomplete commit. > However, this is a bit contradictory here because: > # I think we want to commit asynchronously while the poll continues, and if > we do that, we are back to KAFKA-14024, that the consumer will get rebalance > timeout and get kicked out of the group. > # But we also need to commit all the "acked" offsets before revoking the > partition, and this has to be blocked. > *Steps to Reproduce the Issue:* > # Check out AK 3.2 > # Run this several times: (Recommend to only run runs with autocommit > enabled in consumer_test.py to save time) > {code:java} > _DUCKTAPE_OPTIONS="--debug" > TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure" > bash tests/docker/run_tests.sh {code} > > *Steps to Diagnose the Issue:* > # Open the test results in *results/* > # Go to the consumer log. It might look like this > > {code:java} > results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY > {code} > 3. Find the docker instance that has partition getting revoked and rejoined. > Observed the offset before and after. > *Propose Fixes:* > TBD -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky
[ https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-14196: Affects Version/s: 3.2.1 (was: 3.3.0) > Duplicated consumption during rebalance, causing OffsetValidationTest to act > flaky > -- > > Key: KAFKA-14196 > URL: https://issues.apache.org/jira/browse/KAFKA-14196 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.2.1 >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Major > Labels: new-consumer-threading-should-fix > > Several flaky tests under OffsetValidationTest are indicating potential > consumer duplication issue, when autocommit is enabled. I believe this is > affecting *3.2* and onward. Below shows the failure message: > > {code:java} > Total consumed records 3366 did not match consumed position 3331 {code} > > After investigating the log, I discovered that the data consumed between the > start of a rebalance event and the async commit was lost for those failing > tests. In the example below, the rebalance event kicks in at around > 1662054846995 (first record), and the async commit of the offset 3739 is > completed at around 1662054847015 (right before partitions_revoked). > > {code:java} > {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]} > {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]} > {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]} > {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]} > {code} > A few things to note here: > # Manually calling commitSync in the onPartitionsRevoke cb seems to > alleviate the issue > # Setting includeMetadataInTimeout to false also seems to alleviate the > issue. > The above tries seems to suggest that contract between poll() and > asyncCommit() is broken. AFAIK, we implicitly uses poll() to ack the > previously fetched data, and the consumer would (try to) commit these offsets > in the current poll() loop. However, it seems like as the poll continues to > loop, the "acked" data isn't being committed. > > I believe this could be introduced in KAFKA-14024, which originated from > KAFKA-13310. > More specifically, (see the comments below), the ConsumerCoordinator will > alway return before async commit, due to the previous incomplete commit. > However, this is a bit contradictory here because: > # I think we want to commit asynchronously while the poll continues, and if > we do that, we are back to KAFKA-14024, that the consumer will get rebalance > timeout and get kicked out of the group. > # But we also need to commit all the "acked" offsets before revoking the > partition, and this has to be blocked. > *Steps to Reproduce the Issue:* > # Check out AK 3.2 > # Run this several times: (Recommend to only run runs with autocommit > enabled in consumer_test.py to save time) > {code:java} > _DUCKTAPE_OPTIONS="--debug" > TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure" > bash tests/docker/run_tests.sh {code} > > *Steps to Diagnose the Issue:* > # Open the test results in *results/* > # Go to the consumer log. It might look like this > > {code:java} > results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY > {code} > 3. Find the docker instance that has partition getting revoked and rejoined. > Observed the offset before and after. > *Propose Fixes:* > TBD -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14209) Optimize stream stream self join to use single state store
Vicky Papavasileiou created KAFKA-14209: --- Summary: Optimize stream stream self join to use single state store Key: KAFKA-14209 URL: https://issues.apache.org/jira/browse/KAFKA-14209 Project: Kafka Issue Type: Improvement Reporter: Vicky Papavasileiou For stream-stream joins that join the same source, we can omit one state store since they contain the same data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12593: KAFKA-14196: Prevent fetching during the rebalancing
hachikuji commented on code in PR #12593: URL: https://github.com/apache/kafka/pull/12593#discussion_r966321913 ## clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java: ## @@ -1282,6 +1282,11 @@ private Fetch pollForFetches(Timer timer) { long pollTimeout = coordinator == null ? timer.remainingMs() : Math.min(coordinator.timeToNextPoll(timer.currentTimeMs()), timer.remainingMs()); +if(coordinator.isCommittingOffsetAsync()) { Review Comment: Hmm, do we want to do this in the general case? I think my expectation is that we would not continue fetching for partitions only when we have sent the offset commit and we are awaiting revocation as part of a rebalance. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] philipnee closed pull request #12593: KAFKA-14196: Prevent fetching during the rebalancing
philipnee closed pull request #12593: KAFKA-14196: Prevent fetching during the rebalancing URL: https://github.com/apache/kafka/pull/12593 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky
[ https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-14196: --- Description: Several flaky tests under OffsetValidationTest are indicating potential consumer duplication issue, when autocommit is enabled. I believe this is affecting *3.2* and onward. Below shows the failure message: {code:java} Total consumed records 3366 did not match consumed position 3331 {code} After investigating the log, I discovered that the data consumed between the start of a rebalance event and the async commit was lost for those failing tests. In the example below, the rebalance event kicks in at around 1662054846995 (first record), and the async commit of the offset 3739 is completed at around 1662054847015 (right before partitions_revoked). {code:java} {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]} {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]} {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]} {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]} {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]} {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]} {code} A few things to note here: # Manually calling commitSync in the onPartitionsRevoke cb seems to alleviate the issue # Setting includeMetadataInTimeout to false also seems to alleviate the issue. The above tries seems to suggest that contract between poll() and asyncCommit() is broken. AFAIK, we implicitly uses poll() to ack the previously fetched data, and the consumer would (try to) commit these offsets in the current poll() loop. However, it seems like as the poll continues to loop, the "acked" data isn't being committed. I believe this could be introduced in KAFKA-14024, which originated from KAFKA-13310. More specifically, (see the comments below), the ConsumerCoordinator will alway return before async commit, due to the previous incomplete commit. However, this is a bit contradictory here because: # I think we want to commit asynchronously while the poll continues, and if we do that, we are back to KAFKA-14024, that the consumer will get rebalance timeout and get kicked out of the group. # But we also need to commit all the "acked" offsets before revoking the partition, and this has to be blocked. *Steps to Reproduce the Issue:* # Check out AK 3.2 # Run this several times: (Recommend to only run runs with autocommit enabled in consumer_test.py to save time) {code:java} _DUCKTAPE_OPTIONS="--debug" TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure" bash tests/docker/run_tests.sh {code} *Steps to Diagnose the Issue:* # Open the test results in *results/* # Go to the consumer log. It might look like this {code:java} results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY {code} 3. Find the docker instance that has partition getting revoked and rejoined. Observed the offset before and after. *Propose Fixes:* TBD https://github.com/apache/kafka/pull/12603 was: Several flaky tests under OffsetValidationTest are indicating potential consumer duplication issue, when autocommit is enabled. I believe this is affecting *3.2* and onward. Below shows the failure message: {code:java} Total consumed records 3366 did not match consumed position 3331 {code} After investigating the log, I discovered that the data consumed between the start of a rebalance event and the async commit was lost for those failing tests. In the example below, the rebalance event kicks in at around 1662054846995 (first record), and the async commit of the offset 3739 is completed at around 1662054847015 (right before partitions_revoked). {code:java} {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]} {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]} {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]} {"timestamp":1662054847016,"name":"partitions_revoked","partitions
[GitHub] [kafka] C0urante commented on pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)
C0urante commented on PR #11783: URL: https://github.com/apache/kafka/pull/11783#issuecomment-1241120740 I can handle the merge. Thanks @jsancio, @showuon, @mimaison, and @tombentley for the review! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante merged pull request #11783: KAFKA-14143: Exactly-once source connector system tests (KIP-618)
C0urante merged PR #11783: URL: https://github.com/apache/kafka/pull/11783 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14143) Exactly-once source system tests
[ https://issues.apache.org/jira/browse/KAFKA-14143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton resolved KAFKA-14143. --- Resolution: Fixed > Exactly-once source system tests > > > Key: KAFKA-14143 > URL: https://issues.apache.org/jira/browse/KAFKA-14143 > Project: Kafka > Issue Type: Task > Components: KafkaConnect >Affects Versions: 3.3.0 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 3.3.0 > > > System tests for the exactly-once source connector support introduced in > [KIP-618|https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors] > / KAFKA-1. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] C0urante commented on pull request #12575: KAFKA-14193: Add EOS related source connector configs to expected config defs in ConnectRestApiTest
C0urante commented on PR #12575: URL: https://github.com/apache/kafka/pull/12575#issuecomment-1241138425 Now that we've merged and backported https://github.com/apache/kafka/pull/11783, this should hopefully no longer be necessary. @yashmayya let me know if anything still fails; otherwise, we may want to close this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly
C0urante commented on PR #12609: URL: https://github.com/apache/kafka/pull/12609#issuecomment-1241219303 This appears to break the docs generation added in https://github.com/apache/kafka/pull/12067; running `./gradlew siteDocsTar` with this change causes the generated `docs/generated/connect_rest.yaml` file to lose all information on the REST API save for general metadata. Are we okay with spending some time on trying to fix the docs generation, or does this need to be merged ASAP to unblock the 3.3 release? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly
jsancio commented on PR #12609: URL: https://github.com/apache/kafka/pull/12609#issuecomment-1241226713 > Are we okay with spending some time on trying to fix the docs generation, or does this need to be merged ASAP to unblock the 3.3 release? @C0urante I am open to suggestions on how to fix this. We have a couple of days to fix the issue. Do you have time to look into this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly
C0urante commented on PR #12609: URL: https://github.com/apache/kafka/pull/12609#issuecomment-1241229865 Yeah, I can try to find something. Thanks José! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches
cmccabe commented on code in PR #12595: URL: https://github.com/apache/kafka/pull/12595#discussion_r966431199 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -844,6 +855,72 @@ public String toString() { } } +/** + * Append records to the Raft log. They will be written out asynchronously. + * + * @param log The log4j logger. + * @param resultThe controller result we are writing out. + * @param maxRecordsPerBatchThe maximum number of records to allow in a batch. + * @param appender The callback to invoke for each batch. The arguments are last + * write offset, record list, and the return result is the new + * last write offset. + * @return The final offset that was returned from the Raft layer. + */ +static long appendRecords( +Logger log, +ControllerResult result, +int maxRecordsPerBatch, +Function, Long> appender +) { +try { +List records = result.records(); +if (result.isAtomic()) { +// If the result must be written out atomically, check that it is not too large. +// In general, we create atomic batches when it is important to commit "all, or +// nothing". They are limited in size and must only be used when the batch size +// is bounded. +if (records.size() > maxRecordsPerBatch) { +throw new IllegalStateException("Attempted to atomically commit " + +records.size() + " records, but maxRecordsPerBatch is " + +maxRecordsPerBatch); +} +long offset = appender.apply(records); +if (log.isTraceEnabled()) { +log.trace("Atomically appended {} record(s) ending with offset {}.", +records.size(), offset); +} +return offset; +} else { +// If the result is non-atomic, then split it into as many batches as needed. +// The appender callback will create an in-memory snapshot for each batch, +// since we might need to revert to any of them. We will only return the final +// offset of the last batch, however. +int i = 0, numBatches = 0; +while (true) { +numBatches++; +int j = i + maxRecordsPerBatch; +if (j > records.size()) { +long offset = appender.apply(records.subList(i, records.size())); Review Comment: Yeah... In general LinkedList turns a lot of stuff into O(N) and that's why we mostly don't use it. It's only really useful if you want to delete things from the middle of a list in O(1), but you also don't need fast access to the middle of the list, which is a pretty rare situation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches
cmccabe commented on code in PR #12595: URL: https://github.com/apache/kafka/pull/12595#discussion_r966431807 ## metadata/src/main/java/org/apache/kafka/controller/QuorumController.java: ## @@ -844,6 +855,72 @@ public String toString() { } } +/** + * Append records to the Raft log. They will be written out asynchronously. + * + * @param log The log4j logger. + * @param resultThe controller result we are writing out. + * @param maxRecordsPerBatchThe maximum number of records to allow in a batch. + * @param appender The callback to invoke for each batch. The arguments are last + * write offset, record list, and the return result is the new + * last write offset. + * @return The final offset that was returned from the Raft layer. + */ +static long appendRecords( +Logger log, +ControllerResult result, +int maxRecordsPerBatch, +Function, Long> appender +) { +try { +List records = result.records(); +if (result.isAtomic()) { +// If the result must be written out atomically, check that it is not too large. +// In general, we create atomic batches when it is important to commit "all, or +// nothing". They are limited in size and must only be used when the batch size +// is bounded. +if (records.size() > maxRecordsPerBatch) { +throw new IllegalStateException("Attempted to atomically commit " + +records.size() + " records, but maxRecordsPerBatch is " + +maxRecordsPerBatch); +} +long offset = appender.apply(records); +if (log.isTraceEnabled()) { +log.trace("Atomically appended {} record(s) ending with offset {}.", +records.size(), offset); +} +return offset; +} else { +// If the result is non-atomic, then split it into as many batches as needed. +// The appender callback will create an in-memory snapshot for each batch, +// since we might need to revert to any of them. We will only return the final +// offset of the last batch, however. +int i = 0, numBatches = 0; +while (true) { +numBatches++; +int j = i + maxRecordsPerBatch; +if (j > records.size()) { +long offset = appender.apply(records.subList(i, records.size())); Review Comment: btw thanks for thinking about the big-O here, even if it didn't end up being an issue in this particular case. We should definitely think about big-O -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #12595: KAFKA-14204: QuorumController must correctly handle overly large batches
cmccabe merged PR #12595: URL: https://github.com/apache/kafka/pull/12595 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a diff in pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly
C0urante commented on code in PR #12609: URL: https://github.com/apache/kafka/pull/12609#discussion_r966438130 ## build.gradle: ## @@ -2652,7 +2653,7 @@ project(':connect:runtime') { } task genConnectOpenAPIDocs(type: io.swagger.v3.plugins.gradle.tasks.ResolveTask, dependsOn: setVersionInOpenAPISpec) { -classpath = sourceSets.main.runtimeClasspath +classpath = sourceSets.main.compileClasspath Review Comment: This feels like a terrible hack but by combining the runtime and compile classpaths, we can still generate the correct docs with `siteDocsTar` and omit the unnecessary Swagger dependencies from the release artifact generated with `releaseTarGz`: ```suggestion classpath = sourceSets.main.compileClasspath + sourceSets.main.runtimeClasspath ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #12598: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (server side)
hachikuji merged PR #12598: URL: https://github.com/apache/kafka/pull/12598 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-14204) QuorumController must correctly handle overly large batches
[ https://issues.apache.org/jira/browse/KAFKA-14204?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jose Armando Garcia Sancio resolved KAFKA-14204. Resolution: Fixed > QuorumController must correctly handle overly large batches > --- > > Key: KAFKA-14204 > URL: https://issues.apache.org/jira/browse/KAFKA-14204 > Project: Kafka > Issue Type: Bug > Components: controller, kraft >Reporter: Colin McCabe >Assignee: Colin McCabe >Priority: Blocker > Fix For: 3.3.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio commented on pull request #12575: KAFKA-14193: Add EOS related source connector configs to expected config defs in ConnectRestApiTest
jsancio commented on PR #12575: URL: https://github.com/apache/kafka/pull/12575#issuecomment-1241267992 I am going to close the PR. @yashmayya feel free to open the PR if you disagree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio closed pull request #12575: KAFKA-14193: Add EOS related source connector configs to expected config defs in ConnectRestApiTest
jsancio closed pull request #12575: KAFKA-14193: Add EOS related source connector configs to expected config defs in ConnectRestApiTest URL: https://github.com/apache/kafka/pull/12575 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly
jsancio commented on code in PR #12609: URL: https://github.com/apache/kafka/pull/12609#discussion_r966451504 ## build.gradle: ## @@ -2652,7 +2653,7 @@ project(':connect:runtime') { } task genConnectOpenAPIDocs(type: io.swagger.v3.plugins.gradle.tasks.ResolveTask, dependsOn: setVersionInOpenAPISpec) { -classpath = sourceSets.main.runtimeClasspath +classpath = sourceSets.main.compileClasspath Review Comment: I am okay with this fix. Feel free to create a Jira to have a better fix beyond 3.3.0 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12609: KAFKA-14198: swagger-jaxrs2 dependency should be compileOnly
ijuma commented on code in PR #12609: URL: https://github.com/apache/kafka/pull/12609#discussion_r966455049 ## build.gradle: ## @@ -2652,7 +2653,7 @@ project(':connect:runtime') { } task genConnectOpenAPIDocs(type: io.swagger.v3.plugins.gradle.tasks.ResolveTask, dependsOn: setVersionInOpenAPISpec) { -classpath = sourceSets.main.runtimeClasspath +classpath = sourceSets.main.compileClasspath Review Comment: The right fix may be to have a separate scope for swagger and that's basically runtimeClasspath + swagger deps. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on pull request #12599: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (client side)
hachikuji commented on PR #12599: URL: https://github.com/apache/kafka/pull/12599#issuecomment-1241281654 Thanks, I was a little hesitant about the client-side fix since I thought there might be a debugging benefit of having group instance ID in request logs. But I guess this speculative benefit is probably small compared to the cost of seeing the regression in behavior because of the additional validation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #12599: KAFKA-14201; Consumer should not send group instance ID if committing with empty member ID (client side)
hachikuji merged PR #12599: URL: https://github.com/apache/kafka/pull/12599 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors
jsancio commented on code in PR #12596: URL: https://github.com/apache/kafka/pull/12596#discussion_r966459040 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -307,11 +322,18 @@ class BrokerMetadataListener( private def publish(publisher: MetadataPublisher): Unit = { val delta = _delta -_image = _delta.apply() +try { + _image = _delta.apply() +} catch { + case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) +} + _delta = new MetadataDelta(_image) if (isDebugEnabled) { debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.") } + +// This publish call is done with its own try-catch and fault handler publisher.publish(delta, _image) Review Comment: Is this correct? If there was an error in `_image = _delta.apply()`, `_image` will be the previous image that was published while `delta` is the new `_delta` that was not applied. Also, note that I am pretty sure that the code `publisher.publish` assumes that this layer doesn't send duplicate deltas and images. Is there a way we can write tests for this code and scenarios so that we can increase our confidence that this code behaves as we expect? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors
jsancio commented on code in PR #12596: URL: https://github.com/apache/kafka/pull/12596#discussion_r964300580 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -41,8 +43,19 @@ class BrokerMetadataListener( val maxBytesBetweenSnapshots: Long, val snapshotter: Option[MetadataSnapshotter], brokerMetrics: BrokerServerMetrics, - metadataLoadingFaultHandler: FaultHandler + _metadataLoadingFaultHandler: FaultHandler ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup { + + private val metadataFaultOccurred = new AtomicBoolean(false) + private val metadataLoadingFaultHandler: FaultHandler = new FaultHandler() { +override def handleFault(failureMessage: String, cause: Throwable): RuntimeException = { + if (metadataFaultOccurred.compareAndSet(false, true)) { +error("Disabling metadata snapshots until this broker is restarted.") + } + _metadataLoadingFaultHandler.handleFault(failureMessage, cause) +} Review Comment: This abstraction feels strange. For example, how does the operator monitor that Kafka has an issue and it is not generating snapshots? I assume that they need to monitor the metric for `BrokerSeverMetrics.metadataLoadErrorCount` which is updated from `KafkaRaftServer`. The disabling of snapshotting happens in `BrokerMetadataListener` which doesn't know about this metric. I think the solution should make this relation explicit and not have it hidden or implemented across multiple layers of abstraction. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors
jsancio commented on code in PR #12596: URL: https://github.com/apache/kafka/pull/12596#discussion_r966471788 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -28,6 +28,8 @@ import org.apache.kafka.server.common.ApiMessageAndVersion import org.apache.kafka.server.fault.FaultHandler import org.apache.kafka.snapshot.SnapshotReader +import java.util.concurrent.atomic.AtomicBoolean + Review Comment: This comment applies to this code: ```scala try { delta.replay(highestMetadataOffset, epoch, messageAndVersion.message()) } catch { case e: Throwable => snapshotName match { case None => metadataLoadingFaultHandler.handleFault( s"Error replaying metadata log record at offset ${_highestOffset}", e) case Some(name) => metadataLoadingFaultHandler.handleFault( s"Error replaying record ${index} from snapshot ${name} at offset ${_highestOffset}", e) } } ``` I think this code attempts to read and replay the entire committed log. I wonder if this code should be more conservative if it encounters an error replaying a record and only read the current batch before updating the image. Note that this code is used for both snapshots and log segments. For snapshots, the entire snapshot needs to be in one `delta` update. ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -307,11 +322,18 @@ class BrokerMetadataListener( private def publish(publisher: MetadataPublisher): Unit = { val delta = _delta -_image = _delta.apply() +try { + _image = _delta.apply() Review Comment: Note that it is possible for `_delta` to include a lot of batches maybe even the entire log. I wonder that if the broker encounters an error applying a delta we want to instead rewind, generate and apply a delta per record batch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a diff in pull request #12604: KAFKA-14188; Getting started for Kafka with KRaft
cmccabe commented on code in PR #12604: URL: https://github.com/apache/kafka/pull/12604#discussion_r966476362 ## docs/streams/quickstart.html: ## @@ -98,19 +97,46 @@ Step Step 2: Start the Kafka server -Kafka uses https://zookeeper.apache.org/";>ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance. + Apache Kafka can be started using ZooKeeper or KRaft. To get started with either configuration follow one the section below but not both. Review Comment: should be "one of the sections below" not "one the section" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors
mumrah commented on code in PR #12596: URL: https://github.com/apache/kafka/pull/12596#discussion_r966480469 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -307,11 +322,18 @@ class BrokerMetadataListener( private def publish(publisher: MetadataPublisher): Unit = { val delta = _delta -_image = _delta.apply() +try { + _image = _delta.apply() +} catch { + case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) +} + _delta = new MetadataDelta(_image) if (isDebugEnabled) { debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.") } + +// This publish call is done with its own try-catch and fault handler publisher.publish(delta, _image) Review Comment: Thanks, good catch. I missed a `throw` in the catch block above. If we can't apply the delta we should not publish the image. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors
mumrah commented on code in PR #12596: URL: https://github.com/apache/kafka/pull/12596#discussion_r966480469 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -307,11 +322,18 @@ class BrokerMetadataListener( private def publish(publisher: MetadataPublisher): Unit = { val delta = _delta -_image = _delta.apply() +try { + _image = _delta.apply() +} catch { + case t: Throwable => metadataLoadingFaultHandler.handleFault(s"Error applying metadata delta $delta", t) +} + _delta = new MetadataDelta(_image) if (isDebugEnabled) { debug(s"Publishing new metadata delta $delta at offset ${_image.highestOffsetAndEpoch().offset}.") } + +// This publish call is done with its own try-catch and fault handler publisher.publish(delta, _image) Review Comment: Thanks, good catch. I missed a `throw` in the catch block above. If we can't apply the delta we should not publish the image. I agree that more tests would be very useful as we harden this code path. I'll see what I can come up with for this PR and we can continue adding more tests after 3.3 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12596: KAFKA-14203 Don't make snapshots on broker after metadata errors
mumrah commented on code in PR #12596: URL: https://github.com/apache/kafka/pull/12596#discussion_r966484578 ## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ## @@ -307,11 +322,18 @@ class BrokerMetadataListener( private def publish(publisher: MetadataPublisher): Unit = { val delta = _delta -_image = _delta.apply() +try { + _image = _delta.apply() Review Comment: Rewinding and re-applying does sound useful for some kind of automatic error mitigation, but I think it will be a quite a bit of work. As it stands, I believe the broker can only process metadata going forward. I can think of a degenerate case we have today where `loadBatches` is able to process all but one record, but `delta.apply` cannot complete and so we can't publish any new metadata. Like you mention, I think the only way to mitigate a situation like this would be to produce smaller deltas to reduce the blast radius of a bad record. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12887) Do not trigger user-customized ExceptionalHandler for RTE
[ https://issues.apache.org/jira/browse/KAFKA-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602033#comment-17602033 ] Guozhang Wang commented on KAFKA-12887: --- Hello [~icorne] We have reverted this change due to reasons similar to what you described here. Do you see it still in 3.1.1? Could you try 3.2.1 instead? > Do not trigger user-customized ExceptionalHandler for RTE > - > > Key: KAFKA-12887 > URL: https://issues.apache.org/jira/browse/KAFKA-12887 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Josep Prat >Priority: Major > Fix For: 3.1.0 > > > Today in StreamThread we have a try-catch block that captures all {{Throwable > e}} and then triggers {{this.streamsUncaughtExceptionHandler.accept(e)}}. > However, there are possible RTEs such as IllegalState/IllegalArgument > exceptions which are usually caused by bugs, etc. In such cases we should not > let users to decide what to do with these exceptions, but should let Streams > itself to enforce the decision, e.g. in the IllegalState/IllegalArgument we > should fail fast to notify the potential error. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] jsancio commented on pull request #12604: KAFKA-14188; Getting started for Kafka with KRaft
jsancio commented on PR #12604: URL: https://github.com/apache/kafka/pull/12604#issuecomment-1241324010 > We should move “Once all services have successfully launched, you will have a basic Kafka environment running and ready to use.” under both ZK and KRaft sections. @forlack Thanks for the feedback. Kept the sentence in the "Kafka with ZooKeeper" section and added a similar sentence to the "Kafka with KRaft" section. It is awkward formatting to have a paragraph for a parent section after starting a subsection. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14189) Improve connection limit and reuse of coordinator and leader in KafkaConsumer
[ https://issues.apache.org/jira/browse/KAFKA-14189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602037#comment-17602037 ] Guozhang Wang commented on KAFKA-14189: --- Hi [~aglicacha] [~vongosling] The main motivation for using two connection sockets for the coordinator and partition leader is to not block coordination related requests such as join/sync by fetching requests (which could be long polling, and during that time we cannot send other requests using the same socket). Reusing the connection may cause issues e.g. a heartbeat request not being processed in time if there's already fetching request parked at the broker side. > Improve connection limit and reuse of coordinator and leader in KafkaConsumer > - > > Key: KAFKA-14189 > URL: https://issues.apache.org/jira/browse/KAFKA-14189 > Project: Kafka > Issue Type: Improvement > Components: clients >Affects Versions: 0.9.0.0 >Reporter: Junyang Liu >Priority: Major > > The connection id of connection with coordinator in KafkaConsumer is > Integer.MAX_VALUE - coordinator id, which is different with connection id of > partition leader. So the connection cannot be reused when coordinator and > leader are in the same broker, which means we need two seperated connections > with the same broker. Suppose such case, a consumer has connected to the > coordinator and finished Join and Sync, and wants to send FETCH to leader in > the same broker. But the connection count has reached limit, so the consumer > with be in the group but cannot consume messages > partial logs: > {code:java} > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Added > READ_UNCOMMITTED fetch request for partition topic-test-4 at offset 9 to node > :9092 (id: 2 rack: 2) > (org.apache.kafka.clients.consumer.internals.Fetcher) > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Built full fetch > (sessionId=INVALID, epoch=INITIAL) for node 2 with 1 partition(s). > (org.apache.kafka.clients.FetchSessionHandler) > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Sending > READ_UNCOMMITTED FullFetchRequest(topic-test-4) to broker :9092 (id: 2 > rack: 2) (org.apache.kafka.clients.consumer.internals.Fetcher) > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating > connection to node :9092 (id: 2 rack: 2) using address / > (org.apache.kafka.clients.NetworkClient) > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Using older server > API v3 to send OFFSET_COMMIT > {group_id=group-test,generation_id=134,member_id=consumer-11-2e2b16eb-516c-496c-8aa4-c6e990b43598,retention_time=-1,topics=[{topic=topic-test,partitions=[{partition=3,offset=0,metadata=},{partition=4,offset=9,metadata=},{partition=5,offset=13,metadata=}]}]} > with correlation id 242 to node 2147483645 > (org.apache.kafka.clients.NetworkClient) > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Created socket with > SO_RCVBUF = 65536, SO_SNDBUF = 131072, SO_TIMEOUT = 0 to node 2 > (org.apache.kafka.common.network.Selector) > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Completed > connection to node 2. Fetching API versions. > (org.apache.kafka.clients.NetworkClient) > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Initiating API > versions fetch from node 2. (org.apache.kafka.clients.NetworkClient) > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Subscribed to > topic(s): topic-test (org.apache.kafka.clients.consumer.KafkaConsumer) > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Connection with > / disconnected (org.apache.kafka.common.network.Selector) > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Node 2 > disconnected. (org.apache.kafka.clients.NetworkClient) > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Cancelled request > with header RequestHeader(apiKey=FETCH, apiVersion=10, clientId=consumer-11, > correlationId=241) due to node 2 being disconnected > (org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient) > DEBUG [Consumer clientId=consumer-11, groupId=group-test] Error sending fetch > request (sessionId=INVALID, epoch=INITIAL) to node 2: > org.apache.kafka.common.errors.DisconnectException. > (org.apache.kafka.clients.FetchSessionHandler){code} > connection to coordinator, rebalance and fetching offsets have finished. when > preparing connection to leader for fetching, the connection limit has > reached, so after tcp connection, the broker disconnect the client. > > The root cause of this issue is that the process of consuming is a > combination of multiple connections(connections with coordinator and leader > in same broker), not atomic, which may leads to "half connected". I think we
[GitHub] [kafka] jsancio merged pull request #12604: KAFKA-14188; Getting started for Kafka with KRaft
jsancio merged PR #12604: URL: https://github.com/apache/kafka/pull/12604 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14208) KafkaConsumer#commitAsync throws unexpected WakeupException
[ https://issues.apache.org/jira/browse/KAFKA-14208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602046#comment-17602046 ] Guozhang Wang commented on KAFKA-14208: --- Hello Qingsheng, thanks for reporting this issue, and I looked at the source code and agree with you that it was introduced as part of KAFKA-13563. I will try to fix this with a follow-up PR. > KafkaConsumer#commitAsync throws unexpected WakeupException > --- > > Key: KAFKA-14208 > URL: https://issues.apache.org/jira/browse/KAFKA-14208 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.2.1 >Reporter: Qingsheng Ren >Priority: Major > > We recently encountered a bug after upgrading Kafka client to 3.2.1 in Flink > Kafka connector (FLINK-29153). Here's the exception: > {code:java} > org.apache.kafka.common.errors.WakeupException > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:514) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:252) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnready(ConsumerCoordinator.java:493) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1055) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1573) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:226) > {code} > As {{WakeupException}} is not listed in the JavaDoc of > {{{}KafkaConsumer#commitAsync{}}}, Flink Kafka connector doesn't catch the > exception thrown directly from KafkaConsumer#commitAsync but handles all > exceptions in the callback. > I checked the source code and suspect this is caused by KAFKA-13563. Also we > never had this exception in commitAsync when we used Kafka client 2.4.1 & > 2.8.1. > I'm wondering if this is kind of breaking the public API as the > WakeupException is not listed in JavaDoc, and maybe it's better to invoke the > callback to handle the {{WakeupException}} instead of throwing it directly > from the method itself. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-14196) Duplicated consumption during rebalance, causing OffsetValidationTest to act flaky
[ https://issues.apache.org/jira/browse/KAFKA-14196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17602049#comment-17602049 ] Guozhang Wang commented on KAFKA-14196: --- Thanks Philip, and regarding your two questions above I agree with [~showuon]'s thoughts as well. Especially for 1), I think even if subscriptions changed in between consecutive onJoinPrepare, as long as they will not change the assigned partitions (i.e. as long as `assignFromSubscribed()` has not called) I think we are fine, since the returned records depend on that assigned partitions. > Duplicated consumption during rebalance, causing OffsetValidationTest to act > flaky > -- > > Key: KAFKA-14196 > URL: https://issues.apache.org/jira/browse/KAFKA-14196 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.2.1 >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: new-consumer-threading-should-fix > Fix For: 3.3.0, 3.2.2 > > > Several flaky tests under OffsetValidationTest are indicating potential > consumer duplication issue, when autocommit is enabled. I believe this is > affecting *3.2* and onward. Below shows the failure message: > > {code:java} > Total consumed records 3366 did not match consumed position 3331 {code} > > After investigating the log, I discovered that the data consumed between the > start of a rebalance event and the async commit was lost for those failing > tests. In the example below, the rebalance event kicks in at around > 1662054846995 (first record), and the async commit of the offset 3739 is > completed at around 1662054847015 (right before partitions_revoked). > > {code:java} > {"timestamp":1662054846995,"name":"records_consumed","count":3,"partitions":[{"topic":"test_topic","partition":0,"count":3,"minOffset":3739,"maxOffset":3741}]} > {"timestamp":1662054846998,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3742,"maxOffset":3743}]} > {"timestamp":1662054847008,"name":"records_consumed","count":2,"partitions":[{"topic":"test_topic","partition":0,"count":2,"minOffset":3744,"maxOffset":3745}]} > {"timestamp":1662054847016,"name":"partitions_revoked","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847031,"name":"partitions_assigned","partitions":[{"topic":"test_topic","partition":0}]} > {"timestamp":1662054847038,"name":"records_consumed","count":23,"partitions":[{"topic":"test_topic","partition":0,"count":23,"minOffset":3739,"maxOffset":3761}]} > {code} > A few things to note here: > # Manually calling commitSync in the onPartitionsRevoke cb seems to > alleviate the issue > # Setting includeMetadataInTimeout to false also seems to alleviate the > issue. > The above tries seems to suggest that contract between poll() and > asyncCommit() is broken. AFAIK, we implicitly uses poll() to ack the > previously fetched data, and the consumer would (try to) commit these offsets > in the current poll() loop. However, it seems like as the poll continues to > loop, the "acked" data isn't being committed. > > I believe this could be introduced in KAFKA-14024, which originated from > KAFKA-13310. > More specifically, (see the comments below), the ConsumerCoordinator will > alway return before async commit, due to the previous incomplete commit. > However, this is a bit contradictory here because: > # I think we want to commit asynchronously while the poll continues, and if > we do that, we are back to KAFKA-14024, that the consumer will get rebalance > timeout and get kicked out of the group. > # But we also need to commit all the "acked" offsets before revoking the > partition, and this has to be blocked. > *Steps to Reproduce the Issue:* > # Check out AK 3.2 > # Run this several times: (Recommend to only run runs with autocommit > enabled in consumer_test.py to save time) > {code:java} > _DUCKTAPE_OPTIONS="--debug" > TC_PATHS="tests/kafkatest/tests/client/consumer_test.py::OffsetValidationTest.test_consumer_failure" > bash tests/docker/run_tests.sh {code} > > *Steps to Diagnose the Issue:* > # Open the test results in *results/* > # Go to the consumer log. It might look like this > > {code:java} > results/2022-09-03--005/OffsetValidationTest/test_consumer_failure/clean_shutdown=True.enable_autocommit=True.metadata_quorum=ZK/2/VerifiableConsumer-0-xx/dockerYY > {code} > 3. Find the docker instance that has partition getting revoked and rejoined. > Observed the offset before and after. > *Propose Fixes:* > TBD > > https://github.com/apache/kafka/pull/12603 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] hachikuji commented on a diff in pull request #12603: KAFKA-14196: Pausing partition to prevent duplication when autocommit is enabled
hachikuji commented on code in PR #12603: URL: https://github.com/apache/kafka/pull/12603#discussion_r966512403 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -799,64 +804,95 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { } } +Optional exception = revokePartitions(partitionsToRevoke, generation, memberId); + +isLeader = false; +subscriptions.resetGroupSubscription(); +joinPrepareTimer = null; +autoCommitOffsetRequestFuture = null; +timer.update(); + +if (exception.isPresent()) { +throw new KafkaException("User rebalance callback throws an error", exception.get()); +} +return true; +} + +private SortedSet getPartitionsToRevoke(RebalanceProtocol protocol, int generation, String memberId) { +SortedSet partitions = new TreeSet<>(COMPARATOR); +if (generation == Generation.NO_GENERATION.generationId || +memberId.equals(Generation.NO_GENERATION.memberId)) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +// Revoke all partitions +if (protocol == RebalanceProtocol.EAGER) { +partitions.addAll(subscriptions.assignedPartitions()); +return partitions; +} + +// only revoke those partitions that are not in the subscription any more. +if (protocol == RebalanceProtocol.COOPERATIVE) { +Set ownedPartitions = new HashSet<>(subscriptions.assignedPartitions()); +partitions.addAll(ownedPartitions.stream() +.filter(tp -> !subscriptions.subscription().contains(tp.topic())) +.collect(Collectors.toSet())); +return partitions; +} + +log.debug("Invalid protocol: {}. No partition will be revoked.", protocol); +return partitions; +} + +private void pausePartitions(Set partitions) { +// KAFKA-14196 for more detail, we pause the partition from consumption to prevent duplicated +// data returned by the consumer poll loop. Without pausing the partitions, the consumer will move forward +// returning the data w/o committing them. And the progress will be lost once the partition is revoked. +// This only applies to autocommits, as we expect user to handle the offsets menually during the partition +// revocation. + +log.debug("Pausing partitions {} before onJoinPrepare", partitions); +partitions.forEach(tp -> subscriptionState().pause(tp)); Review Comment: I have mixed feelings about reusing the pause mechanism here. On the one hand, it does what we want. On the other hand, the pause state can be mutated by the user. What if the user calls `resume()` on a partition that we paused internally? Sounds crazy perhaps, but I think I'd rather have a mechanism that can only be accessed internally for stuff like this. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { joinPrepareTimer.update(); } +final SortedSet partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId); + // async commit offsets prior to rebalance if auto-commit enabled // and there is no in-flight offset commit request -if (autoCommitEnabled && autoCommitOffsetRequestFuture == null) { -autoCommitOffsetRequestFuture = maybeAutoCommitOffsetsAsync(); +if (autoCommitEnabled) { +pausePartitions(partitionsToRevoke); Review Comment: It seems like this bug (and some of the complexity in this patch) is due to the fact that we do the auto-commit prior to revoking partitions. I wonder if that is really necessary. If we revoke first, then the partitions would be removed from `SubscriptionState` and we wouldn't have to worry about fetches for these partitions returning. Could that work as well? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ## @@ -756,10 +756,15 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) { joinPrepareTimer.update(); } +final SortedSet partitionsToRevoke = getPartitionsToRevoke(protocol, generation, memberId); Review Comment: Is your concern that the subscription could change in between the time that we pause the partitions and the time that the revocation callback is triggered? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.
[GitHub] [kafka] guozhangwang opened a new pull request, #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery
guozhangwang opened a new pull request, #12611: URL: https://github.com/apache/kafka/pull/12611 Today we may try to discover coordinator in both blocking (e.g. in `poll`) and non-blocking (e.g. in `commitAsync`) way. For the latter we would poll the underlying network client with timeout 0, and in this case we should not trigger wakeup since these are non-blocking calls and hence should not throw wake-ups. In this PR I'm trying to fix it in a least intrusive way (a more general fix should be, potentially, to have two versions of `ensureCoordinatorReady`), since in our threading refactoring, the `ensureCoordinatorReady` function would not be called by the calling thread any more and only triggered by the background thread, and hence we would have a much simpler manner to ensure that non-blocking functions never throw wake-ups. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12611: KAFKA-14208: Should not wake-up with non-blocking coordinator discovery
guozhangwang commented on PR #12611: URL: https://github.com/apache/kafka/pull/12611#issuecomment-1241358026 Call @philipnee @showuon for reviews. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-14208) KafkaConsumer#commitAsync throws unexpected WakeupException
[ https://issues.apache.org/jira/browse/KAFKA-14208?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-14208: - Assignee: Guozhang Wang > KafkaConsumer#commitAsync throws unexpected WakeupException > --- > > Key: KAFKA-14208 > URL: https://issues.apache.org/jira/browse/KAFKA-14208 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 3.2.1 >Reporter: Qingsheng Ren >Assignee: Guozhang Wang >Priority: Major > > We recently encountered a bug after upgrading Kafka client to 3.2.1 in Flink > Kafka connector (FLINK-29153). Here's the exception: > {code:java} > org.apache.kafka.common.errors.WakeupException > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:514) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:278) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:252) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.coordinatorUnknownAndUnready(ConsumerCoordinator.java:493) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsAsync(ConsumerCoordinator.java:1055) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitAsync(KafkaConsumer.java:1573) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:226) > {code} > As {{WakeupException}} is not listed in the JavaDoc of > {{{}KafkaConsumer#commitAsync{}}}, Flink Kafka connector doesn't catch the > exception thrown directly from KafkaConsumer#commitAsync but handles all > exceptions in the callback. > I checked the source code and suspect this is caused by KAFKA-13563. Also we > never had this exception in commitAsync when we used Kafka client 2.4.1 & > 2.8.1. > I'm wondering if this is kind of breaking the public API as the > WakeupException is not listed in JavaDoc, and maybe it's better to invoke the > callback to handle the {{WakeupException}} instead of throwing it directly > from the method itself. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[GitHub] [kafka] lihaosky commented on a diff in pull request #12600: KAFKA-10199: Remove and suspend tasks in the state updater on revocation
lihaosky commented on code in PR #12600: URL: https://github.com/apache/kafka/pull/12600#discussion_r966520188 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final Set remain for (final Task restoringTask : stateUpdater.getTasks()) { if (restoringTask.isActive()) { if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) { -tasks.addPendingTaskToCloseClean(restoringTask.id()); + tasks.addPendingActiveTaskToSuspend(restoringTask.id()); Review Comment: noob question: Why do we suspend instead of close? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -421,73 +421,120 @@ private void classifyTasksWithoutStateUpdater(final Map> activeTasksToCreate, - final Map> standbyTasksToCreate, - final Map> tasksToRecycle, - final Set tasksToCloseClean) { +private void classifyRunningAndSuspendedTasks(final Map> activeTasksToCreate, + final Map> standbyTasksToCreate, + final Map> tasksToRecycle, + final Set tasksToCloseClean) { for (final Task task : tasks.allTasks()) { +if (!task.isActive()) { +throw new IllegalStateException("Standby tasks should only be managed by the state updater"); +} final TaskId taskId = task.id(); if (activeTasksToCreate.containsKey(taskId)) { -if (task.isActive()) { -final Set topicPartitions = activeTasksToCreate.get(taskId); -if (tasks.updateActiveTaskInputPartitions(task, topicPartitions)) { -task.updateInputPartitions(topicPartitions, topologyMetadata.nodeToSourceTopics(task.id())); -} -task.resume(); -} else { -throw new IllegalStateException("Standby tasks should only be managed by the state updater"); -} +handleReAssignedActiveTask(task, activeTasksToCreate.get(taskId)); activeTasksToCreate.remove(taskId); } else if (standbyTasksToCreate.containsKey(taskId)) { -if (!task.isActive()) { -throw new IllegalStateException("Standby tasks should only be managed by the state updater"); -} else { -tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); -} +tasksToRecycle.put(task, standbyTasksToCreate.get(taskId)); standbyTasksToCreate.remove(taskId); } else { tasksToCloseClean.add(task); } } } +private void handleReAssignedActiveTask(final Task task, +final Set inputPartitions) { +if (tasks.updateActiveTaskInputPartitions(task, inputPartitions)) { +task.updateInputPartitions(inputPartitions, topologyMetadata.nodeToSourceTopics(task.id())); +} +task.resume(); +if (task.state() == State.RESTORING) { +handleReAssignedRevokedActiveTask(task); +} +} + +private void handleReAssignedRevokedActiveTask(final Task task) { +tasks.removeTask(task); +stateUpdater.add(task); +} + private void classifyTasksWithStateUpdater(final Map> activeTasksToCreate, final Map> standbyTasksToCreate, final Map> tasksToRecycle, final Set tasksToCloseClean) { -classifyRunningTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); +classifyRunningAndSuspendedTasks(activeTasksToCreate, standbyTasksToCreate, tasksToRecycle, tasksToCloseClean); Review Comment: noob question: are taskIds in `activeTasksToCreate` and `standbyTasksToCreate` always mutually exclusive? I guess standby is always disabled if there's only 1 host/node? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -960,7 +1011,7 @@ private void removeRevokedTasksFromStateUpdater(final Set remain for (final Task restoringTask : stateUpdater.getTasks()) { if (restoringTask.isActive()) { if (remainingRevokedPartitions.containsAll(restoringTask.inputPartitions())) { -tasks.addPendingTaskToCloseClean(restoringTask.id()); +