[jira] [Commented] (KAFKA-18930) KRaft MigrationEvent won't retry when failing to write data to ZK
[ https://issues.apache.org/jira/browse/KAFKA-18930?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932938#comment-17932938 ] Luke Chen commented on KAFKA-18930: --- [~davidarthur] [~mumrah] , I'd like to hear your thought on this issue. Thanks. > KRaft MigrationEvent won't retry when failing to write data to ZK > -- > > Key: KAFKA-18930 > URL: https://issues.apache.org/jira/browse/KAFKA-18930 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.9.0 >Reporter: Luke Chen >Priority: Major > > When running ZK migrating to KRaft, there will be a dual-write mode. In that > mode, metadata will write to KRaft, then write to ZK asynchronously. When > there's some exception, KRaft MigrationEvent won't retry when failing to > write data to ZK. That causes metadata inconsistency between KRaft and ZK. > > Note: > 1. Besides, when doing KRaft controller clean shutdown, we should keep > retrying the failing ZK writing until force shutdown, to make sure the > metadata is consistent. > 2. When doing shutdown, [the order of > shutdown|https://github.com/apache/kafka/blob/1ec1043d5197c4f807fa5cbc41d875b289443096/core/src/main/scala/kafka/server/ControllerServer.scala#L69-L76] > is to close ZK -> close RPC Client -> close migration driver. That causes > another issue that even if we retry the ZK write, it will never succeed when > shutdown is ongoing because ZK connection is closed first. > > The impact is when rolling back to ZK mode during migration, the metadata in > ZK is out of date -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18943: Kafka Streams incorrectly commits TX during task revokation [kafka]
mjsax commented on code in PR #19164: URL: https://github.com/apache/kafka/pull/19164#discussion_r1986158617 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1163,10 +1166,12 @@ void handleRevocation(final Collection revokedPartitions) { // as such we just need to skip those dirty tasks in the checkpoint final Set dirtyTasks = new HashSet<>(); try { -// in handleRevocation we must call commitOffsetsOrTransaction() directly rather than -// commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make sure we don't skip the -// offset commit because we are in a rebalance -taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); +if (revokedTasksNeedCommit) { Review Comment: Not sure if I can follow? Your proposal would say, we stop committing if a TX is in-flight, but we do want to commit for this case, right? Even if offset-map is empty. And moving it to the outer-most context seems not to be "correct", because checking if a TX is inflight for the ALOS case seems unnecessary (guess it would not be wrong, because the call would just always return `false` so not really changing anything effectively, but it seems unnecessary to change the code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR Improve PR linter output [kafka]
mumrah commented on PR #19159: URL: https://github.com/apache/kafka/pull/19159#issuecomment-2707179546 Example step summary: Commit will look like: ``` MINOR: Disallow unused local variables Recently, we found a regression that could have been detected by static analysis, since a local variable wasn't being passed to a method during a refactoring, and was left unused. It was fixed in [7a749b5](https://github.com/apache/kafka/commit/7a749b589f8c98bd452b79b49cdfb182894d7f57), but almost slipped into 4.0. Unused variables are typically detected by IDEs, but this is insufficient to prevent these kinds of bugs. This change enables unused local variable detection in checkstyle for Kafka. A few notes on the usage: - There are two situations in which people actually want to have a local variable but not use it. First, there are `for (Type ignored: collection)` loops which have to loop `collection.length` number of times, but that do not use `ignored` in the loop body. These are typically still easier to read than a classical `for` loop. Second, some IDEs detect it if a return value of a function such as `File.delete` is not being used. In this case, people sometimes store the result in an unused local variable to make ignoring the return value explicit and to avoid the squiggly lines. - In Java 22, unsued local variables can be omitted by using a single underscore `_`. This is supported by checkstyle. In pre-22 versions, IntelliJ allows such variables to be named `ignored` to suppress the unused local variable warning. This pattern is often (but not consistently) used in the Kafka codebase. This is, however, not supported by checkstyle. Since we cannot switch to Java 22, yet, and we want to use automated detection using checkstyle, we have to resort to prefixing the unused local variables with `@SuppressWarnings("UnusedLocalVariable")`. We have to apply this in 11 cases across the Kafka codebase. While not being pretty, I'd argue it's worth it to prevent bugs like the one fixed in [7a749b5](https://github.com/apache/kafka/commit/7a749b589f8c98bd452b79b49cdfb182894d7f57). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) ``` Validation results: * ✅ Title is not truncated * ✅ Title is not too short * ✅ Title is not too long * ✅ Title has expected KAFKA/MINOR/HOTFIX * ✅ Body is not empty * ✅ PR template text not present * ❌ Old PR template text should be removed * ❌ Pull Request is approved, but no 'Reviewers' found in commit body -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18942) Add reviewers to PR body with committer-tools
[ https://issues.apache.org/jira/browse/KAFKA-18942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17933576#comment-17933576 ] Chia-Ping Tsai commented on KAFKA-18942: +1 to this idea. [~mingyen066] will take over this > Add reviewers to PR body with committer-tools > - > > Key: KAFKA-18942 > URL: https://issues.apache.org/jira/browse/KAFKA-18942 > Project: Kafka > Issue Type: Sub-task > Components: build >Reporter: David Arthur >Assignee: Ming-Yen Chung >Priority: Major > > When we switch to the merge queue, we cannot alter the commit message > directly and instead must use the PR body for the eventual commit message. > > In order to include our "Reviewers" metadata in the commit, we must edit the > PR body after a review has happened and add the "Reviewers" manually. This is > rather annoying and we can do better. > > The committer-tools script "reviewers.py" can use the GitHub API (via "gh") > to read, modify, and update the PR body with the reviewers selected by this > tool. > > For example, > > {noformat} > $ ./committer-tools/reviewers.py > Utility to help generate 'Reviewers' string for Pull Requests. Use Ctrl+D or > Ctrl+C to exit > Name or email (case insensitive): chia > Possible matches (in order of most recent): > [1] Chia-Ping Tsai chia7...@gmail.com (1908) > [2] Chia-Ping Tsai chia7...@apache.org (13) > [3] Chia-Chuan Yu yujuan...@gmail.com (11) > [4] Chia Chuan Yu yujuan...@gmail.com (10) > Make a selection: 1 > Reviewers so far: [('Chia-Ping Tsai', 'chia7...@gmail.com', 1908)] > Name or email (case insensitive): ^C > Reviewers: Chia-Ping Tsai > Pull Request to update (Ctrl+D or Ctrl+C to skip): 19144 > Adding Reviewers to 19144... > {noformat} > > The script should be able to handle existing "Reviewers" string in the PR body -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR KIP link change to use immutable link [kafka]
m1a2st commented on PR #19153: URL: https://github.com/apache/kafka/pull/19153#issuecomment-2708602615 ``` https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-= ``` Keep only this URL, as it will redirect to the specific section. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR KIP link change to use immutable link [kafka]
chia7712 commented on PR #19153: URL: https://github.com/apache/kafka/pull/19153#issuecomment-2708606376 > Keep only this URL, as it will redirect to the specific section. how about https://cwiki.apache.org/confluence/x/2xRRCg#KIP714:Clientmetricsandobservability-Clientidentificationandtheclientinstanceid ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18700: Migrate SnapshotPath and Entry in LogHistory to record classes [kafka]
chia7712 merged PR #19062: URL: https://github.com/apache/kafka/pull/19062 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18700) Migrate SnapshotPath, Entry, OffsetAndEpoch, LogFetchInfo, and LogAppendInfo to record classes
[ https://issues.apache.org/jira/browse/KAFKA-18700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-18700: --- Summary: Migrate SnapshotPath, Entry, OffsetAndEpoch, LogFetchInfo, and LogAppendInfo to record classes (was: Migrate suitable classes to records in raft) > Migrate SnapshotPath, Entry, OffsetAndEpoch, LogFetchInfo, and LogAppendInfo > to record classes > -- > > Key: KAFKA-18700 > URL: https://issues.apache.org/jira/browse/KAFKA-18700 > Project: Kafka > Issue Type: Sub-task >Reporter: TengYao Chi >Assignee: Ming-Yen Chung >Priority: Minor > > raft: > Entry > SnapshotPath -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Rewrite unchecked operations in Mock API [kafka]
chia7712 commented on code in PR #19071: URL: https://github.com/apache/kafka/pull/19071#discussion_r1986194037 ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java: ## @@ -391,24 +386,20 @@ public void testStateStore() { public void shouldNotEnableSendingOldValuesIfNotMaterializedAlreadyAndNotForcedToMaterialize() { final StreamsBuilder builder = new StreamsBuilder(); -final KTableImpl table = -(KTableImpl) builder.table("topic1", consumed); +final var kTable = assertInstanceOf(KTableImpl.class, builder.table("topic1", consumed)); +kTable.enableSendingOldValues(false); -table.enableSendingOldValues(false); - -assertThat(table.sendingOldValueEnabled(), is(false)); +assertFalse(kTable.sendingOldValueEnabled()); Review Comment: ditto ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java: ## @@ -587,19 +577,16 @@ public void shouldThrowNullPointerOnTransformValuesWithKeyWhenTransformerSupplie assertThrows(NullPointerException.class, () -> table.transformValues(null)); } -@SuppressWarnings("unchecked") @Test public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() { -final ValueTransformerWithKeySupplier valueTransformerSupplier = -mock(ValueTransformerWithKeySupplier.class); +final ValueTransformerWithKeySupplier valueTransformerSupplier = mock(); assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized>) null)); } -@SuppressWarnings("unchecked") + @Test public void shouldThrowNullPointerOnTransformValuesWithKeyWhenStoreNamesNull() { -final ValueTransformerWithKeySupplier valueTransformerSupplier = -mock(ValueTransformerWithKeySupplier.class); +final ValueTransformerWithKeySupplier valueTransformerSupplier = mock(); assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (String[]) null)); Review Comment: maybe we can pass `mock()` to `transformValues` directly? ```java assertThrows(NullPointerException.class, () -> table.transformValues(mock(), (String[]) null)); ``` ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java: ## @@ -587,19 +577,16 @@ public void shouldThrowNullPointerOnTransformValuesWithKeyWhenTransformerSupplie assertThrows(NullPointerException.class, () -> table.transformValues(null)); } -@SuppressWarnings("unchecked") @Test public void shouldThrowNullPointerOnTransformValuesWithKeyWhenMaterializedIsNull() { -final ValueTransformerWithKeySupplier valueTransformerSupplier = -mock(ValueTransformerWithKeySupplier.class); +final ValueTransformerWithKeySupplier valueTransformerSupplier = mock(); assertThrows(NullPointerException.class, () -> table.transformValues(valueTransformerSupplier, (Materialized>) null)); Review Comment: ditto ## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java: ## @@ -391,24 +386,20 @@ public void testStateStore() { public void shouldNotEnableSendingOldValuesIfNotMaterializedAlreadyAndNotForcedToMaterialize() { final StreamsBuilder builder = new StreamsBuilder(); -final KTableImpl table = -(KTableImpl) builder.table("topic1", consumed); +final var kTable = assertInstanceOf(KTableImpl.class, builder.table("topic1", consumed)); +kTable.enableSendingOldValues(false); -table.enableSendingOldValues(false); - -assertThat(table.sendingOldValueEnabled(), is(false)); +assertFalse(kTable.sendingOldValueEnabled()); } @Test public void shouldEnableSendingOldValuesIfNotMaterializedAlreadyButForcedToMaterialize() { final StreamsBuilder builder = new StreamsBuilder(); -final KTableImpl table = -(KTableImpl) builder.table("topic1", consumed); - -table.enableSendingOldValues(true); +final var kTable = assertInstanceOf(KTableImpl.class, builder.table("topic1", consumed)); +kTable.enableSendingOldValues(true); -assertThat(table.sendingOldValueEnabled(), is(true)); +assertTrue(kTable.sendingOldValueEnabled()); Review Comment: this change seems to be unnecessary? Also, we don't need to rename the `table` to `kTable`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-18942) Add reviewers to PR body with committer-tools
[ https://issues.apache.org/jira/browse/KAFKA-18942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-18942: -- Assignee: Ming-Yen Chung > Add reviewers to PR body with committer-tools > - > > Key: KAFKA-18942 > URL: https://issues.apache.org/jira/browse/KAFKA-18942 > Project: Kafka > Issue Type: Sub-task > Components: build >Reporter: David Arthur >Assignee: Ming-Yen Chung >Priority: Major > > When we switch to the merge queue, we cannot alter the commit message > directly and instead must use the PR body for the eventual commit message. > > In order to include our "Reviewers" metadata in the commit, we must edit the > PR body after a review has happened and add the "Reviewers" manually. This is > rather annoying and we can do better. > > The committer-tools script "reviewers.py" can use the GitHub API (via "gh") > to read, modify, and update the PR body with the reviewers selected by this > tool. > > For example, > > {noformat} > $ ./committer-tools/reviewers.py > Utility to help generate 'Reviewers' string for Pull Requests. Use Ctrl+D or > Ctrl+C to exit > Name or email (case insensitive): chia > Possible matches (in order of most recent): > [1] Chia-Ping Tsai chia7...@gmail.com (1908) > [2] Chia-Ping Tsai chia7...@apache.org (13) > [3] Chia-Chuan Yu yujuan...@gmail.com (11) > [4] Chia Chuan Yu yujuan...@gmail.com (10) > Make a selection: 1 > Reviewers so far: [('Chia-Ping Tsai', 'chia7...@gmail.com', 1908)] > Name or email (case insensitive): ^C > Reviewers: Chia-Ping Tsai > Pull Request to update (Ctrl+D or Ctrl+C to skip): 19144 > Adding Reviewers to 19144... > {noformat} > > The script should be able to handle existing "Reviewers" string in the PR body -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-18700) Migrate SnapshotPath, Entry, OffsetAndEpoch, LogFetchInfo, and LogAppendInfo to record classes
[ https://issues.apache.org/jira/browse/KAFKA-18700?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-18700. Fix Version/s: 4.1.0 Resolution: Fixed > Migrate SnapshotPath, Entry, OffsetAndEpoch, LogFetchInfo, and LogAppendInfo > to record classes > -- > > Key: KAFKA-18700 > URL: https://issues.apache.org/jira/browse/KAFKA-18700 > Project: Kafka > Issue Type: Sub-task >Reporter: TengYao Chi >Assignee: Ming-Yen Chung >Priority: Minor > Fix For: 4.1.0 > > > raft: > Entry > SnapshotPath -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-18944) Remove unused setters from ClusterConfig
Chia-Ping Tsai created KAFKA-18944: -- Summary: Remove unused setters from ClusterConfig Key: KAFKA-18944 URL: https://issues.apache.org/jira/browse/KAFKA-18944 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai saslServerProperties, saslClientProperties, adminClientProperties, producerProperties, consumerProperties those setters are not used actually, so we should remove them to avoid misleading developers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-10731: add support for SSL hot reload [kafka]
github-actions[bot] commented on PR #17987: URL: https://github.com/apache/kafka/pull/17987#issuecomment-2708651331 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch. If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-18420) Find out the license which is in the license file but is not in distribution
[ https://issues.apache.org/jira/browse/KAFKA-18420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17933584#comment-17933584 ] kangning.li commented on KAFKA-18420: - file: [https://github.com/apache/kafka/pull/18299] This PR has already accomplished this task. So I am going to close this issue. > Find out the license which is in the license file but is not in distribution > > > Key: KAFKA-18420 > URL: https://issues.apache.org/jira/browse/KAFKA-18420 > Project: Kafka > Issue Type: Improvement >Reporter: kangning.li >Assignee: kangning.li >Priority: Major > > see discussion: > https://github.com/apache/kafka/pull/18299#discussion_r1904604076 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR KIP link change to use immutable link [kafka]
chia7712 commented on PR #19153: URL: https://github.com/apache/kafka/pull/19153#issuecomment-2708599138  @m1a2st could you please fix above links as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18915: Migrate AdminClientRebootstrapTest to use new test infra [kafka]
chia7712 commented on code in PR #19094: URL: https://github.com/apache/kafka/pull/19094#discussion_r1986207973 ## core/src/test/java/kafka/test/api/AdminClientRebootstrapTest.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.test.api; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfig; +import org.apache.kafka.common.test.api.ClusterTemplate; +import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.test.TestUtils; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +public class AdminClientRebootstrapTest { +private static final int BROKER_COUNT = 2; + +private static List generator() { +// Enable unclean leader election for the test topic +Map serverProperties = Map.of( +TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, "true", +GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, String.valueOf(BROKER_COUNT) +); + +return Stream.of(false, true) +.map(AdminClientRebootstrapTest::getRebootstrapConfig) +.map(rebootstrapProperties -> AdminClientRebootstrapTest.buildConfig(serverProperties, rebootstrapProperties)) +.toList(); +} + +private static Map getRebootstrapConfig(boolean useRebootstrapTriggerMs) { +Map properties = new HashMap<>(); +if (useRebootstrapTriggerMs) { + properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, "5000"); +} else { + properties.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG, "360"); + properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, "5000"); + properties.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG, "5000"); +properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000"); + properties.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000"); +} +properties.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, "rebootstrap"); +return properties; +} + +private static ClusterConfig buildConfig(Map serverProperties, Map rebootstrapProperties) { +return ClusterConfig.defaultBuilder() +.setTypes(Set.of(Type.KRAFT)) +.setBrokers(BROKER_COUNT) +.setAdminClientProperties(rebootstrapProperties) Review Comment: open https://issues.apache.org/jira/browse/KAFKA-18944 to remove those unused setters ## core/src/test/java/kafka/test/api/AdminClientRebootstrapTest.java: ## @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.test.api; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.common.config.TopicConfig; +import org.apache.kafka.common.test.ClusterInstance; +import org.apache.kafka.common.test.api.ClusterConfig; +import org.apache.kafka.common.test.api.Cluste
[jira] [Commented] (KAFKA-18944) Remove unused setters from ClusterConfig
[ https://issues.apache.org/jira/browse/KAFKA-18944?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17933582#comment-17933582 ] Chia-Ping Tsai commented on KAFKA-18944: a example of misusing: https://github.com/apache/kafka/pull/19094#discussion_r1986207871 > Remove unused setters from ClusterConfig > > > Key: KAFKA-18944 > URL: https://issues.apache.org/jira/browse/KAFKA-18944 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > saslServerProperties, saslClientProperties, adminClientProperties, > producerProperties, consumerProperties > > those setters are not used actually, so we should remove them to avoid > misleading developers. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18933 Add client integration tests module [kafka]
chia7712 commented on code in PR #19144: URL: https://github.com/apache/kafka/pull/19144#discussion_r1986210326 ## clients/integration-tests/src/test/java/org/apache/kafka/clients/admin/AdminClientRebootstrapTest.java: ## @@ -33,6 +32,9 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Stream; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; + + public class AdminClientRebootstrapTest { Review Comment: @mumrah could you please consider using a different test? `AdminClientRebootstrapTest` requires some rewriting (see https://github.com/apache/kafka/pull/19094#discussion_r1986208718). Perhaps we could use `AdminFenceProducersTest` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18909: Move DynamicThreadPool to server module [kafka]
chia7712 commented on PR #19081: URL: https://github.com/apache/kafka/pull/19081#issuecomment-2708604833 @clarkwtc could you please merge trunk to run CI again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup connect runtime module [kafka]
wernerdv commented on code in PR #18074: URL: https://github.com/apache/kafka/pull/18074#discussion_r1986122644 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java: ## @@ -466,16 +466,16 @@ protected boolean readPartition(TopicPartition topicPartition) { return true; } -private void poll(long timeoutMs) { +private void poll() { try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(timeoutMs)); +ConsumerRecords records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE)); Review Comment: @ashrivastava88 If you look at the trunk before this commit was merged, you can see that the method KafkaBasedLog#poll was used in only 2 places with the argument timeoutMs=Integer.MAX_VALUE. So, in this PR, I did not change the existing behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: Move UnifiedLog to storage module [kafka]
mimaison commented on PR #19030: URL: https://github.com/apache/kafka/pull/19030#issuecomment-2708369200 Rebased on trunk and ported https://github.com/apache/kafka/commit/40db001588047201a406ebe969d1d7d2d5eefd57 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Cleanup connect runtime module [kafka]
ashrivastava88 commented on code in PR #18074: URL: https://github.com/apache/kafka/pull/18074#discussion_r1986112509 ## connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java: ## @@ -466,16 +466,16 @@ protected boolean readPartition(TopicPartition topicPartition) { return true; } -private void poll(long timeoutMs) { +private void poll() { try { -ConsumerRecords records = consumer.poll(Duration.ofMillis(timeoutMs)); +ConsumerRecords records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE)); Review Comment: the timeout here is getting set to Integer.MAX_VALUE,, so if we have any failure at consumer fetch. the consumer will keep on retrying till this timeout is reached. We have had a case at confluent,, where one of our customer shared logs,, and it is filled with the consumer fetch,, retrying ,, and every subsequent timeout is decresing,, from this Integer.MAX_VALUE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Adjust ToC of zk2kraft and fix wrong section number of docker [kafka]
chia7712 merged PR #19146: URL: https://github.com/apache/kafka/pull/19146 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18648: Make `records` in `FetchResponse` nullable again [kafka]
ijuma commented on PR #19131: URL: https://github.com/apache/kafka/pull/19131#issuecomment-2704563232 Cherry-picked to 4.0. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18943: Kafka Streams incorrectly commits TX during task revokation [kafka]
mjsax commented on code in PR #19164: URL: https://github.com/apache/kafka/pull/19164#discussion_r1986165537 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1153,8 +1157,7 @@ void handleRevocation(final Collection revokedPartitions) { prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask); Review Comment: Hmmm... Thinking about this again, it seems the `if` below should actually only apply to EOSv2 case? I believe we did actually include some task unnecessarily for ALOS (and older version EOSv1) case? However, changing this code below does break two tests... ``` if (processingMode == EXACTLY_ONCE_V2 && revokedTasksNeedCommit) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); } `` Tests: ``` TaskManagerTest#shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithAlos() TaskManagerTest#shouldCommitAllNeededTasksOnHandleRevocation() ``` At least the second test assumes we commit everything for ALOS, too. I was added when we added EOSv2 and unified commit logic (https://github.com/apache/kafka/pull/8318) -- but I cannot remember why we did it this way... \cc @guozhangwang @ableegoldman do you remember? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-2939) Make AbstractConfig.logUnused() tunable for clients
[ https://issues.apache.org/jira/browse/KAFKA-2939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-2939: --- Labels: (was: newbie) > Make AbstractConfig.logUnused() tunable for clients > --- > > Key: KAFKA-2939 > URL: https://issues.apache.org/jira/browse/KAFKA-2939 > Project: Kafka > Issue Type: Improvement > Components: config >Reporter: Guozhang Wang >Priority: Major > > Today we always log unused configs in KafkaProducer / KafkaConsumer in their > constructors, however for some cases like Kafka Streams that make use of > these clients, other configs may be passed in to configure Partitioner / > Serializer classes, etc. So it would be better to make this function call > optional to avoid printing unnecessary and confusing WARN entries. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18943: Kafka Streams incorrectly commits TX during task revokation [kafka]
mjsax commented on code in PR #19164: URL: https://github.com/apache/kafka/pull/19164#discussion_r1986165537 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1153,8 +1157,7 @@ void handleRevocation(final Collection revokedPartitions) { prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask); Review Comment: Hmmm... Thinking about this again, it seems the `if` below should actually only apply to EOSv2 case? I believe we did actually include some task unnecessarily for ALOS (and older version EOSv1) case? However, changing this code below does break two tests... ``` if (processingMode == EXACTLY_ONCE_V2 && revokedTasksNeedCommit) { prepareCommitAndAddOffsetsToMap(commitNeededActiveTasks, consumedOffsetsPerTask); } ``` Tests: ``` TaskManagerTest#shouldCloseAndReviveUncorruptedTasksWhenTimeoutExceptionThrownFromCommitDuringRevocationWithAlos() TaskManagerTest#shouldCommitAllNeededTasksOnHandleRevocation() ``` At least the second test assumes we commit everything for ALOS, too. I was added when we added EOSv2 and unified commit logic (https://github.com/apache/kafka/pull/8318) -- but I cannot remember why we did it this way... \cc @guozhangwang @ableegoldman do you remember? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-18944: Remove unused setters from ClusterConfig [kafka]
clarkwtc opened a new pull request, #19166: URL: https://github.com/apache/kafka/pull/19166 Remove unused `saslServerProperties`, `saslClientProperties`, `adminClientProperties`, `producerProperties`, and `consumerProperties` in ClusterConfig. First, I quickly fixed the unused adminClientProperties, and then I will move on to https://github.com/apache/kafka/pull/19094 to fix the related issues. Pass AdminClientRebootstrapTest https://github.com/user-attachments/assets/73c50376-6602-493d-8abd-0eb2bb304114"; /> Pass ClusterConfigTest https://github.com/user-attachments/assets/b4da59da-dfdf-4698-9077-5086854360ab"; /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-18944) Remove unused setters from ClusterConfig
[ https://issues.apache.org/jira/browse/KAFKA-18944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wei-Ting Chen reassigned KAFKA-18944: - Assignee: Wei-Ting Chen (was: Chia-Ping Tsai) > Remove unused setters from ClusterConfig > > > Key: KAFKA-18944 > URL: https://issues.apache.org/jira/browse/KAFKA-18944 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Wei-Ting Chen >Priority: Major > > saslServerProperties, saslClientProperties, adminClientProperties, > producerProperties, consumerProperties > > those setters are not used actually, so we should remove them to avoid > misleading developers. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18909: Move DynamicThreadPool to server module [kafka]
clarkwtc commented on PR #19081: URL: https://github.com/apache/kafka/pull/19081#issuecomment-2708609581 @chia7712 No problem. I've merged it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18422) add Kafka client upgrade path
[ https://issues.apache.org/jira/browse/KAFKA-18422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18422: Fix Version/s: 4.0.0 > add Kafka client upgrade path > - > > Key: KAFKA-18422 > URL: https://issues.apache.org/jira/browse/KAFKA-18422 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Kuan Po Tseng >Priority: Blocker > Labels: need-kip > Fix For: 4.0.0 > > > https://github.com/apache/kafka/pull/18193#issuecomment-2572283545 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16758: Extend Consumer#close with an option to leave the group or not [kafka]
frankvicky commented on code in PR #17614: URL: https://github.com/apache/kafka/pull/17614#discussion_r1986090241 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -427,7 +428,7 @@ public void onGroupAssignmentUpdated(Set partitions) { // call close methods if internal objects are already constructed; this is to prevent resource leak. see KAFKA-2121 // we do not need to call `close` at all when `log` is null, which means no internal objects were initialized. if (this.log != null) { -close(Duration.ZERO, true); +close(Duration.ZERO, CloseOptions.GroupMembershipOperation.DEFAULT, true); Review Comment: Given that KIP-848 doesn't define an epoch for `dynamic book remaining in the group,` I decided to ignore the heartbeat when closing. In this way, we don't need to change the protocol and server-side implmentation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14484: Move UnifiedLog to storage module [kafka]
mimaison commented on code in PR #19030: URL: https://github.com/apache/kafka/pull/19030#discussion_r1986107143 ## storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java: ## @@ -55,6 +113,2289 @@ public class UnifiedLog { public static final String STRAY_DIR_SUFFIX = LogFileUtils.STRAY_DIR_SUFFIX; public static final long UNKNOWN_OFFSET = LocalLog.UNKNOWN_OFFSET; +// For compatibility, metrics are defined to be under `Log` class +private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup("kafka.log", "Log"); + +/* A lock that guards all modifications to the log */ +private final Object lock = new Object(); +private final Map> metricNames = new HashMap<>(); + +// localLog The LocalLog instance containing non-empty log segments recovered from disk +private final LocalLog localLog; +private final BrokerTopicStats brokerTopicStats; +private final ProducerStateManager producerStateManager; +private final boolean remoteStorageSystemEnable; +private final ScheduledFuture producerExpireCheck; +private final int producerIdExpirationCheckIntervalMs; +private final String logIdent; +private final Logger logger; +private final LogValidator.MetricsRecorder validatorMetricsRecorder; + +/* The earliest offset which is part of an incomplete transaction. This is used to compute the + * last stable offset (LSO) in ReplicaManager. Note that it is possible that the "true" first unstable offset + * gets removed from the log (through record or segment deletion). In this case, the first unstable offset + * will point to the log start offset, which may actually be either part of a completed transaction or not + * part of a transaction at all. However, since we only use the LSO for the purpose of restricting the + * read_committed consumer to fetching decided data (i.e. committed, aborted, or non-transactional), this + * temporary abuse seems justifiable and saves us from scanning the log after deletion to find the first offsets + * of each ongoing transaction in order to compute a new first unstable offset. It is possible, however, + * that this could result in disagreement between replicas depending on when they began replicating the log. + * In the worst case, the LSO could be seen by a consumer to go backwards. + */ +private volatile Optional firstUnstableOffsetMetadata = Optional.empty(); +private volatile Optional partitionMetadataFile = Optional.empty(); +// This is the offset(inclusive) until which segments are copied to the remote storage. +private volatile long highestOffsetInRemoteStorage = -1L; + +/* Keep track of the current high watermark in order to ensure that segments containing offsets at or above it are + * not eligible for deletion. This means that the active segment is only eligible for deletion if the high watermark + * equals the log end offset (which may never happen for a partition under consistent load). This is needed to + * prevent the log start offset (which is exposed in fetch responses) from getting ahead of the high watermark. + */ +private volatile LogOffsetMetadata highWatermarkMetadata; +private volatile long localLogStartOffset; +private volatile long logStartOffset; +private volatile LeaderEpochFileCache leaderEpochCache; +private volatile Optional topicId; +private volatile LogOffsetsListener logOffsetsListener; + +/** + * A log which presents a unified view of local and tiered log segments. + * + * The log consists of tiered and local segments with the tiered portion of the log being optional. There could be an + * overlap between the tiered and local segments. The active segment is always guaranteed to be local. If tiered segments + * are present, they always appear at the beginning of the log, followed by an optional region of overlap, followed by the local + * segments including the active segment. + * + * NOTE: this class handles state and behavior specific to tiered segments as well as any behavior combining both tiered + * and local segments. The state and behavior specific to local segments are handled by the encapsulated LocalLog instance. + * + * @param logStartOffset The earliest offset allowed to be exposed to kafka client. + * The logStartOffset can be updated by : + * - user's DeleteRecordsRequest + * - broker's log retention + * - broker's log truncation + * - broker's log recovery + * The logStartOffset is used to decide the following: + * - Log deletion. LogSegment whose nextOffset <= log's logStartOffset can be deleted. + * It may trigger log rolling if the active segmen
Re: [PR] KAFKA-18602: Incorrect FinalizedVersionLevel reported for dynamic KRaft quorum [kafka]
chia7712 commented on code in PR #18685: URL: https://github.com/apache/kafka/pull/18685#discussion_r1986118137 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -522,10 +522,14 @@ class KRaftMetadataCache( if (kraftVersionLevel > 0) { finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel) } +var metadataVersion = MetadataVersion.MINIMUM_VERSION Review Comment: That seems like a deadlock to me. If a controller can't receive RPC requests before having a valid metadata version, it can't communicate with other static voters initially. This means it's impossible to obtain a valid metadata version. The approach in #19127 doesn't completely fix the issue, but the issue occurs at the start of static votes. During that phase, the quorum isn't ready, so exposing the 3.3 MV via the API response should be low risk. To clean up, if the above description is accurate, `SimpleApiVersionManager` can replace `featuresPublisher` with `MetadataCache` and manually add the 3.3 MV to the API response when `MetadataCache` lacks a valid MV. That is basically equal to what #19127 does. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Clean up metadata module [kafka]
mumrah commented on PR #19069: URL: https://github.com/apache/kafka/pull/19069#issuecomment-2708307265 @sjhajharia can you merge trunk into this branch? There are some recent CI changes that are needed. I will try to review this over the weekend. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-18931) Debug the case where kafka-share-groups.sh --describe --members does not match with the reality when share consumer is closed during broker outage
Chirag Wadhwa created KAFKA-18931: - Summary: Debug the case where kafka-share-groups.sh --describe --members does not match with the reality when share consumer is closed during broker outage Key: KAFKA-18931 URL: https://issues.apache.org/jira/browse/KAFKA-18931 Project: Kafka Issue Type: Sub-task Reporter: Chirag Wadhwa Assignee: Chirag Wadhwa -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] KAFKA-18933 Add client integration tests module [kafka]
mumrah opened a new pull request, #19144: URL: https://github.com/apache/kafka/pull/19144 Adds a new ":clients:integration-test" Gradle module with one example test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18195: Enter incompatible instead of leaving incompatible entires blank in Kafka Streams broker compatibility matrix [kafka]
mjsax commented on code in PR #18258: URL: https://github.com/apache/kafka/pull/18258#discussion_r1984230751 ## docs/streams/upgrade-guide.html: ## @@ -1735,73 +1735,38 @@ Streams API broker compatibility -The following table shows which versions of the Kafka Streams API are compatible with various Kafka broker versions. +The following table shows which versions of the Kafka Streams API are compatible with various Kafka broker versions. For Kafka Stream version older than 2.3.x, please check 3.9 upgrade document. -Kafka Broker (columns) +Kafka Broker (columns) Kafka Streams API (rows) -0.10.0.x -0.10.1.x and 0.10.2.x -0.11.0.x and1.0.x and1.1.x and2.0.x 2.1.x and2.2.x and2.3.x and2.4.x and2.5.x and2.6.x and2.7.x and2.8.x and3.0.x and3.1.x and3.2.x and3.3.x and3.4.x and3.5.x and3.6.x and3.7.x and3.8.x and3.9.x 4.0.x -0.10.0.x +2.4.x and2.5.x +compatible with exactly-once v2 turned off Review Comment: 2.4.x and 2.5.x do not have EOSv2 -- so this is confusing... There is nothing to be turned on/off -- EOSv2 was only added in 2.6.0 release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18736: Decide when a heartbeat should be sent [kafka]
lucasbru commented on code in PR #19121: URL: https://github.com/apache/kafka/pull/19121#discussion_r1983675477 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java: ## @@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final LogContext logContext, retryBackoffMaxMs, maxPollIntervalMs ); +this.pollTimer = time.timer(maxPollIntervalMs); } +/** + * This will build a heartbeat request if one must be sent, determined based on the member + * state. A heartbeat is sent in the following situations: Review Comment: These are not separate situations in which a heartbeat must be sent. Maybe you mean "when all of the following conditions apply" or soemthign? ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java: ## @@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final LogContext logContext, retryBackoffMaxMs, maxPollIntervalMs ); +this.pollTimer = time.timer(maxPollIntervalMs); } +/** + * This will build a heartbeat request if one must be sent, determined based on the member + * state. A heartbeat is sent in the following situations: + * + * Member is part of the consumer group or wants to join it. + * The heartbeat interval has expired, or the member is in a state that indicates + * that it should heartbeat without waiting for the interval. + * + * This will also determine the maximum wait time until the next poll based on the member's + * state. + * + * If the member is without a coordinator or is in a failed state, the timer is set + * to Long.MAX_VALUE, as there's no need to send a heartbeat. + * If the member cannot send a heartbeat due to either exponential backoff, it will + * return the remaining time left on the backoff timer. + * If the member's heartbeat timer has not expired, It will return the remaining time + * left on the heartbeat timer. + * If the member can send a heartbeat, the timer is set to the current heartbeat interval. + * + * + * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} that includes a + * heartbeat request if one must be sent, and the time to wait until the next poll. + */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { -return new NetworkClientDelegate.PollResult( -heartbeatRequestState.heartbeatIntervalMs(), -Collections.singletonList(makeHeartbeatRequest(currentTimeMs)) -); +if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager.shouldSkipHeartbeat()) { +membershipManager.onHeartbeatRequestSkipped(); +maybePropagateCoordinatorFatalErrorEvent(); +return NetworkClientDelegate.PollResult.EMPTY; +} +pollTimer.update(currentTimeMs); +if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { +logger.warn("Consumer poll timeout has expired. This means the time between " + +"subsequent calls to poll() was longer than the configured max.poll.interval.ms, " + +"which typically implies that the poll loop is spending too much time processing " + +"messages. You can address this either by increasing max.poll.interval.ms or by " + +"reducing the maximum size of batches returned in poll() with max.poll.records."); + +membershipManager.onPollTimerExpired(); +NetworkClientDelegate.UnsentRequest leaveHeartbeat = makeHeartbeatRequestOnlyLogResponse(currentTimeMs); + +// We can ignore the leave response because we can join before or after receiving the response. +heartbeatRequestState.reset(); +heartbeatState.reset(); +return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(leaveHeartbeat)); +} +if (shouldHeartbeatBeforeIntervalExpires() || heartbeatRequestState.canSendRequest(currentTimeMs)) { +NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs); +return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(request)); +} else { +return new NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs)); +} +} + +/** + * A heartbeat should be sent without waiting for the heartbeat interval to expire if: + * - the member is leaving the group + * or + * - the member is joining the group or acknowledging the assignm
Re: [PR] KAFKA-18943: Kafka Streams incorrectly commits TX during task revokation [kafka]
lucasbru commented on code in PR #19164: URL: https://github.com/apache/kafka/pull/19164#discussion_r1986031099 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1153,8 +1157,7 @@ void handleRevocation(final Collection revokedPartitions) { prepareCommitAndAddOffsetsToMap(revokedActiveTasks, consumedOffsetsPerTask); Review Comment: This could be moved into the `if` below, instead of being called (but executing a no-op) in the "optimized" case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18074) Add kafka client compatibility matrix
[ https://issues.apache.org/jira/browse/KAFKA-18074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18074: Fix Version/s: (was: 4.0.0) > Add kafka client compatibility matrix > - > > Key: KAFKA-18074 > URL: https://issues.apache.org/jira/browse/KAFKA-18074 > Project: Kafka > Issue Type: Task >Reporter: Chia-Ping Tsai >Assignee: 黃竣陽 >Priority: Blocker > > in 4.0 we have many major breaking changes - JDK upgrade and protocol cleanup > - that may confuse users in rolling upgrade and setup env. Hence, we should > add a matrix for all our client modules - client, streams, and connect > the matrix consists of following item. > 1. supported JDKs > 2. supported broker versions -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18943: Kafka Streams incorrectly commits TX during task revokation [kafka]
cadonna commented on code in PR #19164: URL: https://github.com/apache/kafka/pull/19164#discussion_r1986072577 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1163,10 +1166,12 @@ void handleRevocation(final Collection revokedPartitions) { // as such we just need to skip those dirty tasks in the checkpoint final Set dirtyTasks = new HashSet<>(); try { -// in handleRevocation we must call commitOffsetsOrTransaction() directly rather than -// commitAndFillInConsumedOffsetsAndMetadataPerTaskMap() to make sure we don't skip the -// offset commit because we are in a rebalance -taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); +if (revokedTasksNeedCommit) { Review Comment: If we have this, do we want to also adapt the following condition in `TaskExecutor#commitOffsetsOrTransaction()` ```java if (!offsetsPerTask.isEmpty() || taskManager.streamsProducer().transactionInFlight()) { ``` to ```java if (!offsetsPerTask.isEmpty()) { ``` (and maybe move it to the outermost context as it was before the PR that introduced the bug)? ## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java: ## @@ -922,6 +932,175 @@ public void onRestoreEnd(final TopicPartition topicPartition, ); } + +private final AtomicReference transactionalProducerId = new AtomicReference<>(); + +private class TestClientSupplier extends DefaultKafkaClientSupplier { +@Override +public Producer getProducer(final Map config) { +transactionalProducerId.compareAndSet(null, (String) config.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG)); + +return new KafkaProducer<>(config, new ByteArraySerializer(), new ByteArraySerializer()); +} +} + +final static AtomicReference taskWithData = new AtomicReference<>(); +final static AtomicBoolean didRevokeIdleTask = new AtomicBoolean(false); + +@Test +public void shouldNotCommitActiveTasksWithPendingInputIfRevokedTaskDidNotMakeProgress() throws Exception { +shouldNotProduceDuplicates(false); +} + +@Test +public void shouldCommitAllTasksIfRevokedTaskTriggerPunctuation() throws Exception { +shouldNotProduceDuplicates(true); +} + +private void shouldNotProduceDuplicates(final boolean usePunctuation) throws Exception { Review Comment: nit: ```suggestion @ParameterizedTest(name = "{argumentsWithNames}") @FieldSource("namedArguments") @ParameterizedTest(name = "shouldCommitAllTasks with punctuation: {0}") @ValueSource(booleans = {true, false}) public void shouldNotProduceDuplicates(final boolean usePunctuation) throws Exception { ... } private static List namedArguments = Arrays.asList( arguments(named("Should not commit active tasks with pending input if revoked task did not make progress"), false), arguments(named("Should commit all tasks if revoked task triggers punctuation"), true) ); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18074) Add kafka client compatibility matrix
[ https://issues.apache.org/jira/browse/KAFKA-18074?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18074: Fix Version/s: 4.0.0 > Add kafka client compatibility matrix > - > > Key: KAFKA-18074 > URL: https://issues.apache.org/jira/browse/KAFKA-18074 > Project: Kafka > Issue Type: Task >Reporter: Chia-Ping Tsai >Assignee: 黃竣陽 >Priority: Blocker > Fix For: 4.0.0 > > > in 4.0 we have many major breaking changes - JDK upgrade and protocol cleanup > - that may confuse users in rolling upgrade and setup env. Hence, we should > add a matrix for all our client modules - client, streams, and connect > the matrix consists of following item. > 1. supported JDKs > 2. supported broker versions -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Clean up metadata module [kafka]
sjhajharia commented on PR #19069: URL: https://github.com/apache/kafka/pull/19069#issuecomment-2708215564 cc: @m1a2st / @mumrah for a final review Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-18422) add Kafka client upgrade path
[ https://issues.apache.org/jira/browse/KAFKA-18422?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-18422: Fix Version/s: (was: 4.0.0) > add Kafka client upgrade path > - > > Key: KAFKA-18422 > URL: https://issues.apache.org/jira/browse/KAFKA-18422 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Kuan Po Tseng >Priority: Blocker > Labels: need-kip > > https://github.com/apache/kafka/pull/18193#issuecomment-2572283545 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-18602: Incorrect FinalizedVersionLevel reported for dynamic KRaft quorum [kafka]
FrankYang0529 commented on code in PR #18685: URL: https://github.com/apache/kafka/pull/18685#discussion_r1986040272 ## core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala: ## @@ -522,10 +522,14 @@ class KRaftMetadataCache( if (kraftVersionLevel > 0) { finalizedFeatures.put(KRaftVersion.FEATURE_NAME, kraftVersionLevel) } +var metadataVersion = MetadataVersion.MINIMUM_VERSION Review Comment: @chia7712 @cmccabe @ijuma @junrao, I found that we cannot add the check of `uninitializedPublishers.isEmpty()` at the end of `ControllerServer#startup`. If users use static voter (`controller.quorum.voters`), the `NetworkClient` tries to send `ApiVersionsReqeust` to other voters [0]. After it receives error response, it disconnects the connection [1]. At the meantime, the `KafkaRaftClient` tries to send `VoteRequest` [2], but the request cannot be sent cause of disconnection. Finally, none of controller can be ready with empty `uninitializedPublishers`. Do we want to use another Jira to track this issue? The original issue is resolved by #19127. [0] https://github.com/apache/kafka/blob/947c414a8c0d30223c725a91507b2d891d5851e2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1087-L1089 [1] https://github.com/apache/kafka/blob/947c414a8c0d30223c725a91507b2d891d5851e2/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1027-L1031 [2] https://github.com/apache/kafka/blob/947c414a8c0d30223c725a91507b2d891d5851e2/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java#L3161-L3164 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18873: Fixed incorrect error message when exceeds 5 for transactional producers [kafka]
EsMoX commented on PR #19041: URL: https://github.com/apache/kafka/pull/19041#issuecomment-2708346593 > @EsMoX I just noticed there's a superfluous newline in the error message in the PR, can you remove it please? Thanks! @kirktrue Done, Thank you! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18927: Remove LATEST_0_11, LATEST_1_0, LATEST_1_1, LATEST_2_0 [kafka]
Parkerhiphop commented on PR #19134: URL: https://github.com/apache/kafka/pull/19134#issuecomment-2708352912 Thanks for the testing and information provided. I will take a look at these failed tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-18736: Decide when a heartbeat should be sent [kafka]
cadonna commented on code in PR #19121: URL: https://github.com/apache/kafka/pull/19121#discussion_r1982894114 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java: ## @@ -254,34 +284,131 @@ public StreamsGroupHeartbeatRequestManager(final LogContext logContext, retryBackoffMaxMs, maxPollIntervalMs ); +this.pollTimer = time.timer(maxPollIntervalMs); } +/** + * This will build a heartbeat request if one must be sent, determined based on the member + * state. A heartbeat is sent in the following situations: + * + * Member is part of the consumer group or wants to join it. + * The heartbeat interval has expired, or the member is in a state that indicates + * that it should heartbeat without waiting for the interval. + * + * This will also determine the maximum wait time until the next poll based on the member's + * state. + * + * If the member is without a coordinator or is in a failed state, the timer is set + * to Long.MAX_VALUE, as there's no need to send a heartbeat. + * If the member cannot send a heartbeat due to either exponential backoff, it will + * return the remaining time left on the backoff timer. + * If the member's heartbeat timer has not expired, It will return the remaining time + * left on the heartbeat timer. + * If the member can send a heartbeat, the timer is set to the current heartbeat interval. + * + * + * @return {@link org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult} that includes a + * heartbeat request if one must be sent, and the time to wait until the next poll. + */ @Override public NetworkClientDelegate.PollResult poll(long currentTimeMs) { -return new NetworkClientDelegate.PollResult( -heartbeatRequestState.heartbeatIntervalMs(), -Collections.singletonList(makeHeartbeatRequest(currentTimeMs)) -); +if (coordinatorRequestManager.coordinator().isEmpty() || membershipManager.shouldSkipHeartbeat()) { +membershipManager.onHeartbeatRequestSkipped(); +maybePropagateCoordinatorFatalErrorEvent(); +return NetworkClientDelegate.PollResult.EMPTY; +} +pollTimer.update(currentTimeMs); +if (pollTimer.isExpired() && !membershipManager.isLeavingGroup()) { +logger.warn("Consumer poll timeout has expired. This means the time between " + +"subsequent calls to poll() was longer than the configured max.poll.interval.ms, " + +"which typically implies that the poll loop is spending too much time processing " + +"messages. You can address this either by increasing max.poll.interval.ms or by " + +"reducing the maximum size of batches returned in poll() with max.poll.records."); + +membershipManager.onPollTimerExpired(); +NetworkClientDelegate.UnsentRequest leaveHeartbeat = makeHeartbeatRequestOnlyLogResponse(currentTimeMs); + +// We can ignore the leave response because we can join before or after receiving the response. +heartbeatRequestState.reset(); +heartbeatState.reset(); +return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(leaveHeartbeat)); +} +if (shouldHeartbeatBeforeIntervalExpires() || heartbeatRequestState.canSendRequest(currentTimeMs)) { +NetworkClientDelegate.UnsentRequest request = makeHeartbeatRequest(currentTimeMs); +return new NetworkClientDelegate.PollResult(heartbeatRequestState.heartbeatIntervalMs(), Collections.singletonList(request)); +} else { +return new NetworkClientDelegate.PollResult(heartbeatRequestState.timeToNextHeartbeatMs(currentTimeMs)); +} +} + +/** + * A heartbeat should be sent without waiting for the heartbeat interval to expire if: + * - the member is leaving the group + * or + * - the member is joining the group or acknowledging the assignment and for both cases there is no heartbeat request + * in flight. + * @return + */ +private boolean shouldHeartbeatBeforeIntervalExpires() { +return membershipManager.state() == MemberState.LEAVING +|| +(membershipManager.state() == MemberState.JOINING || membershipManager.state() == MemberState.ACKNOWLEDGING) +&& !heartbeatRequestState.requestInFlight(); Review Comment: @zheguang Thanks for your comment! The comment says: ``` the member is joining the group or acknowledging the assignment and for both cases there is no heartbeat request in flight. ``` That means, the expression should be true for ```