Re: [PR] KAFKA-17327: Add support of group in kafka-configs.sh [kafka]
DL1231 commented on code in PR #16887: URL: https://github.com/apache/kafka/pull/16887#discussion_r1730271755 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -256,6 +278,32 @@ public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception { } } +@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { +@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), Review Comment: I tried to do it, but failed.  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16567) Add New Stream Metrics based on KIP-869
[ https://issues.apache.org/jira/browse/KAFKA-16567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876480#comment-17876480 ] Balaji Rao commented on KAFKA-16567: [~cadonna] it seems the remaining metrics * {quote}restore-total {quote} * {quote}restore-rate {quote} * {quote}update-total {quote} * {quote}update-rate {quote} * {quote}restore-remaining-records-total {quote} are implemented in this [PR.|https://github.com/apache/kafka/pull/13300/files#diff-4df67afac7b840ed6080d7fae931148c2f661af25eaafd83e0f8535ac34a0b71] I was planning to try implementing these metrics. I first checked out DefaultStateUpdater because one of the metrics (restore-call-rate) in the KIP was defined there. While figuring out how to add the task ID to the new metrics, I found this PR. Initially, I thought the metrics from the PR wouldn't work for state updater since they are recorded from StoreChangelogReader (which I had presumed was the predecessor to the state updater). But it seems like this shouldn't be an issue because the state updater uses StoreChangelogReader for the actual restoration ? Just want to make sure! cc: [~lucasbru], who has kindly offered me to help me with this as my first contribution to kafka-streams. > Add New Stream Metrics based on KIP-869 > --- > > Key: KAFKA-16567 > URL: https://issues.apache.org/jira/browse/KAFKA-16567 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Walter Hernandez >Priority: Major > Labels: kip > > Add the following metrics to the state updater: > * restore-total > * restore-rate > * update-total > * update-rate > * restore-remaining-records-total > Please see the KIP for more information about the metrics. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17053: [Minor] Restructure build.gradle to configure publishing last [kafka]
chia7712 commented on PR #16950: URL: https://github.com/apache/kafka/pull/16950#issuecomment-2308746393 > By restructuring the build.gradle, I meant that the shadowJar block should be moved above the publishing block. That is not happening in this PR, only changing the fork is. @KTKTK-HZ WDYT? could you fix it in this PR? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17373: Add print.epoch to kafka-console-share-consumer.sh/kafka-console-consumer.sh [kafka]
chia7712 commented on code in PR #16987: URL: https://github.com/apache/kafka/pull/16987#discussion_r1730294085 ## tools/src/main/java/org/apache/kafka/tools/consumer/DefaultMessageFormatter.java: ## @@ -130,25 +134,31 @@ public void writeTo(ConsumerRecord consumerRecord, PrintStream o } else { output.print("NO_TIMESTAMP"); } -writeSeparator(output, printPartition || printOffset || printDelivery || printHeaders || printKey || printValue); +writeSeparator(output, anyTrue(printPartition, printOffset, printDelivery, printEpoch, printHeaders, printKey, printValue)); Review Comment: How about adding `BooleanExpressionComplexity` to suppression? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17331: Throw UnsupportedVersionException if the data in ListOffsetRequest does NOT fit EarliestLocalSpec and LatestTieredSpec. [kafka]
chia7712 merged PR #16876: URL: https://github.com/apache/kafka/pull/16876 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: add ReconfigurableQuorumIntegrationTest [kafka]
chia7712 commented on PR #16991: URL: https://github.com/apache/kafka/pull/16991#issuecomment-2308754564 @cmccabe could you please fix build error? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17053: [Minor] Restructure build.gradle to configure publishing last [kafka]
KTKTK-HZ commented on PR #16950: URL: https://github.com/apache/kafka/pull/16950#issuecomment-2308758996 Hey @gharris1727 @chia7712 , I will modify this PR and refactor build.gradle to move the shadowJar block before the publishing block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17399: Apply LambdaValidator to code base [kafka]
xijiu commented on PR #16980: URL: https://github.com/apache/kafka/pull/16980#issuecomment-2308792034 > @xijiu could you please fix the build error? @chia7712 It's my fault, I should add the `@SuppressWarnings("unchecked")` annotation when I perform forced type conversion, like this: ``` @SuppressWarnings("unchecked") final List aliases = (List) value; ``` I have fiexd it, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-17360) local log retention ms/bytes "-2" is not treated correctly
[ https://issues.apache.org/jira/browse/KAFKA-17360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-17360: --- Fix Version/s: 3.7.2 3.8.1 > local log retention ms/bytes "-2" is not treated correctly > -- > > Key: KAFKA-17360 > URL: https://issues.apache.org/jira/browse/KAFKA-17360 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Kuan Po Tseng >Priority: Critical > Fix For: 4.0.0, 3.9.0, 3.7.2, 3.8.1 > > > # When the local.retention.ms/bytes is set to -2, we didn't replace it with > the server-side retention.ms/bytes config, so the -2 local retention won't > take effect. > # When setting retention.ms/bytes to -2, we can notice this log message: > {code:java} > Deleting segment LogSegment(baseOffset=10045, size=1037087, > lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to > local log retention size -2 breach. Local log size after deletion will be > 13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]{code} > This is not helpful for users. We should replace -2 with real retention value > when logging. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]
chia7712 merged PR #16932: URL: https://github.com/apache/kafka/pull/16932 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17373: Add print.epoch to kafka-console-share-consumer.sh/kafka-console-consumer.sh [kafka]
xijiu commented on code in PR #16987: URL: https://github.com/apache/kafka/pull/16987#discussion_r1730319638 ## tools/src/main/java/org/apache/kafka/tools/consumer/DefaultMessageFormatter.java: ## @@ -130,25 +134,31 @@ public void writeTo(ConsumerRecord consumerRecord, PrintStream o } else { output.print("NO_TIMESTAMP"); } -writeSeparator(output, printPartition || printOffset || printDelivery || printHeaders || printKey || printValue); +writeSeparator(output, anyTrue(printPartition, printOffset, printDelivery, printEpoch, printHeaders, printKey, printValue)); Review Comment: > How about adding `BooleanExpressionComplexity` to suppression? Wonderful idea, I have fixed it, PTAL @chia7712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17360) local log retention ms/bytes "-2" is not treated correctly
[ https://issues.apache.org/jira/browse/KAFKA-17360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876497#comment-17876497 ] Chia-Ping Tsai commented on KAFKA-17360: the patch is merged to trunk (https://github.com/apache/kafka/commit/11966a209a8bb5bbf867b5bb5ca1d60b80e26650) and 3.9 (https://github.com/apache/kafka/commit/6d2b81e07f22086e3e752024168772657294c4d9) will open another PR to backport the fix_2 to 3.8 and 3.7 > local log retention ms/bytes "-2" is not treated correctly > -- > > Key: KAFKA-17360 > URL: https://issues.apache.org/jira/browse/KAFKA-17360 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Kuan Po Tseng >Priority: Critical > Fix For: 4.0.0, 3.9.0, 3.7.2, 3.8.1 > > > # When the local.retention.ms/bytes is set to -2, we didn't replace it with > the server-side retention.ms/bytes config, so the -2 local retention won't > take effect. > # When setting retention.ms/bytes to -2, we can notice this log message: > {code:java} > Deleting segment LogSegment(baseOffset=10045, size=1037087, > lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to > local log retention size -2 breach. Local log size after deletion will be > 13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]{code} > This is not helpful for users. We should replace -2 with real retention value > when logging. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17418) Fix the incorrect markdown of junit.py caused by newline character
Chia-Ping Tsai created KAFKA-17418: -- Summary: Fix the incorrect markdown of junit.py caused by newline character Key: KAFKA-17418 URL: https://issues.apache.org/jira/browse/KAFKA-17418 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai see https://github.com/apache/kafka/actions/runs/10534117374/jobs/21477070226/summary_raw -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17418) Fix the incorrect markdown of junit.py caused by newline character
[ https://issues.apache.org/jira/browse/KAFKA-17418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876500#comment-17876500 ] kangning.li commented on KAFKA-17418: - Hi [~chia7712] , I am interested in this issue, cloud you assign it to me ? Thanks > Fix the incorrect markdown of junit.py caused by newline character > -- > > Key: KAFKA-17418 > URL: https://issues.apache.org/jira/browse/KAFKA-17418 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > see > https://github.com/apache/kafka/actions/runs/10534117374/jobs/21477070226/summary_raw -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17418) Fix the incorrect markdown of junit.py caused by newline character
[ https://issues.apache.org/jira/browse/KAFKA-17418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-17418: -- Assignee: kangning.li (was: Chia-Ping Tsai) > Fix the incorrect markdown of junit.py caused by newline character > -- > > Key: KAFKA-17418 > URL: https://issues.apache.org/jira/browse/KAFKA-17418 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: kangning.li >Priority: Minor > > see > https://github.com/apache/kafka/actions/runs/10534117374/jobs/21477070226/summary_raw -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17384) Remove deprecated options in tools
[ https://issues.apache.org/jira/browse/KAFKA-17384?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876501#comment-17876501 ] Chia-Ping Tsai commented on KAFKA-17384: [~yangpoan] Could you please add link of jira (and KIP) which deprecates the config to each sub task? I have updated KAFKA-17390 > Remove deprecated options in tools > -- > > Key: KAFKA-17384 > URL: https://issues.apache.org/jira/browse/KAFKA-17384 > Project: Kafka > Issue Type: Improvement >Reporter: PoAn Yang >Assignee: PoAn Yang >Priority: Minor > > There're deprecated options in following tools. We can consider to remove > them in 4.0. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17390:Remove broker-list in GetOffsetShell [kafka]
chia7712 commented on code in PR #16992: URL: https://github.com/apache/kafka/pull/16992#discussion_r1730323750 ## tools/src/main/java/org/apache/kafka/tools/GetOffsetShell.java: ## @@ -107,12 +107,7 @@ private static class GetOffsetShellOptions extends CommandDefaultOptions { public GetOffsetShellOptions(String[] args) throws TerseException { super(args); -OptionSpec brokerListOpt = parser.accepts("broker-list", "DEPRECATED, use --bootstrap-server instead; ignored if --bootstrap-server is specified. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") -.withRequiredArg() -.describedAs("HOST1:PORT1,...,HOST3:PORT3") -.ofType(String.class); OptionSpec bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED. The server(s) to connect to in the form HOST1:PORT1,HOST2:PORT2.") Review Comment: Please rename `effectiveBrokerListOpt` to `bootstrapServerOpt`. Also, please initialize it in line#110 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]
brandboat opened a new pull request, #16995: URL: https://github.com/apache/kafka/pull/16995 related to https://issues.apache.org/jira/browse/KAFKA-17360 (3.8 branch) When the local.retention.ms/bytes is set to -2, we didn't replace it with the server-side retention.ms/bytes config, so the -2 local retention won't take effect. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17388: Remove broker-list in VerifiableProducer [kafka]
chia7712 commented on code in PR #16958: URL: https://github.com/apache/kafka/pull/16958#discussion_r1730325275 ## tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java: ## @@ -131,14 +131,6 @@ private static ArgumentParser argParser() { .dest("bootstrapServer") .help("REQUIRED: The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,..."); -connectionGroup.addArgument("--broker-list") Review Comment: @LoganZhuZzz could you please update e2e also? see https://github.com/apache/kafka/blob/11966a209a8bb5bbf867b5bb5ca1d60b80e26650/tests/kafkatest/services/verifiable_producer.py#L227 noted that we should use `--bootstrap-server` only if the version >= 2.5. otherwise, we should keep using `--broker-list` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17335: Lack of default for URL encoding configuration for OAuth causes NPE [kafka]
chia7712 commented on code in PR #16990: URL: https://github.com/apache/kafka/pull/16990#discussion_r1730326689 ## clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/secured/ConfigurationUtils.java: ## @@ -209,6 +209,19 @@ public String validateString(String name, boolean isRequired) throws ValidateExc return value; } +/** + * Validates that a value, if supplied, is a {@link Boolean}. If no value is present in the configuration, a + * default value of {@link Boolean#FALSE} is returned. + */ +public Boolean validateBoolean(String name) { +Boolean value = get(name); + +if (value != null) +return value; + +return Boolean.FALSE; Review Comment: the other methods take argument `boolean isRequired`. maybe this method should follow the pattern? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]
brandboat opened a new pull request, #16996: URL: https://github.com/apache/kafka/pull/16996 based on 3.7 branch, related to https://issues.apache.org/jira/browse/KAFKA-17360 When the local.retention.ms/bytes is set to -2, we didn't replace it with the server-side retention.ms/bytes config, so the -2 local retention won't take effect. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Fix an incorrect message in kafka-consumer-groups.sh when missing necessary options [kafka]
chia7712 merged PR #16961: URL: https://github.com/apache/kafka/pull/16961 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove get prefix for internal IQ methods [kafka]
chia7712 commented on code in PR #16954: URL: https://github.com/apache/kafka/pull/16954#discussion_r1730329818 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java: ## @@ -206,29 +206,29 @@ public synchronized Collection getAllMetadataForTopology(final * if streams is (re-)initializing or {@code null} if the corresponding topic cannot be found, * or null if no matching metadata could be found. */ -public synchronized KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, - final K key, - final Serializer keySerializer) { +public synchronized KeyQueryMetadata keyQueryMetadataForKey(final String storeName, +final K key, +final Serializer keySerializer) { Objects.requireNonNull(keySerializer, "keySerializer can't be null"); if (topologyMetadata.hasNamedTopologies()) { throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, keySerializer)" + "method when using named topologies, please use the overload that" + "accepts a topologyName parameter to identify the correct store"); } -return getKeyQueryMetadataForKey(storeName, +return keyQueryMetadataForKey(storeName, key, new DefaultStreamPartitioner<>(keySerializer)); } /** - * See {@link StreamsMetadataState#getKeyQueryMetadataForKey(String, Object, Serializer)} + * See {@link StreamsMetadataState#keyQueryMetadataForKey(String, Object, Serializer)} */ -public synchronized KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, - final K key, - final Serializer keySerializer, - final String topologyName) { +public synchronized KeyQueryMetadata keyQueryMetadataForKey(final String storeName, +final K key, +final Serializer keySerializer, +final String topologyName) { Objects.requireNonNull(keySerializer, "keySerializer can't be null"); -return getKeyQueryMetadataForKey(storeName, +return keyQueryMetadataForKey(storeName, key, new DefaultStreamPartitioner<>(keySerializer), topologyName); Review Comment: please update error message (line#257) also ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java: ## @@ -206,29 +206,29 @@ public synchronized Collection getAllMetadataForTopology(final * if streams is (re-)initializing or {@code null} if the corresponding topic cannot be found, * or null if no matching metadata could be found. */ -public synchronized KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, - final K key, - final Serializer keySerializer) { +public synchronized KeyQueryMetadata keyQueryMetadataForKey(final String storeName, +final K key, +final Serializer keySerializer) { Objects.requireNonNull(keySerializer, "keySerializer can't be null"); if (topologyMetadata.hasNamedTopologies()) { throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, keySerializer)" Review Comment: please update the error message also ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java: ## @@ -112,7 +112,7 @@ public Collection getAllMetadata() { * @param storeName the storeName to find metadata for * @return A collection of {@link StreamsMetadata} that have the provided storeName */ -public synchronized Collection getAllMetadataForStore(final String storeName) { +public synchronized Collection allMetadataForStore(final String storeName) { Objects.requ
[PR] KAFKA-17390: Add a checkstyle rule to suppress all generated code [kafka]
xijiu opened a new pull request, #16997: URL: https://github.com/apache/kafka/pull/16997 1. remove `src/generated/java` from core module 2. add `` to the `suppressions.xml` to suppress all generated code ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]
xijiu closed pull request #16997: KAFKA-17416: Add a checkstyle rule to suppress all generated code URL: https://github.com/apache/kafka/pull/16997 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]
xijiu opened a new pull request, #16998: URL: https://github.com/apache/kafka/pull/16998 1. remove `src/generated/java` from core module 2. add `` to the `suppressions.xml` to suppress all generated code ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]
chia7712 commented on code in PR #16998: URL: https://github.com/apache/kafka/pull/16998#discussion_r1730338891 ## checkstyle/suppressions.xml: ## @@ -237,6 +237,7 @@ + Review Comment: Please remove other generated-related suppression. they are redundant now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]
xijiu commented on code in PR #16998: URL: https://github.com/apache/kafka/pull/16998#discussion_r1730342466 ## checkstyle/suppressions.xml: ## @@ -237,6 +237,7 @@ + Review Comment: > Please remove other generated-related suppression. they are redundant now Agree, I have already removed them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17306; Soften the validation when replaying tombstones [kafka]
chia7712 commented on code in PR #16898: URL: https://github.com/apache/kafka/pull/16898#discussion_r1730343775 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3225,7 +3228,14 @@ public void replay( .updateWith(value) .build()); } else { -ConsumerGroupMember oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false); +ConsumerGroupMember oldMember; +try { +oldMember = consumerGroup.getOrMaybeCreateMember(memberId, false); Review Comment: Could you please add `UnknownMemberIdException` to `getOrMaybeCreateMember` signature? ## core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala: ## @@ -156,17 +164,42 @@ class CoordinatorLoaderImpl[T]( } else { batch.asScala.foreach { record => numRecords = numRecords + 1 - try { -coordinator.replay( - record.offset(), - batch.producerId, - batch.producerEpoch, - deserializer.deserialize(record.key, record.value) -) - } catch { -case ex: UnknownRecordTypeException => - warn(s"Unknown record type ${ex.unknownType} while loading offsets and group metadata " + -s"from $tp. Ignoring it. It could be a left over from an aborted upgrade.") + + val coordinatorRecordOpt = { +try { + Some(deserializer.deserialize(record.key, record.value)) +} catch { + case ex: UnknownRecordTypeException => +warn(s"Unknown record type ${ex.unknownType} while loading offsets and group metadata " + + s"from $tp. Ignoring it. It could be a left over from an aborted upgrade.") +None + case ex: RuntimeException => +val msg = s"Deserializing record $record from $tp failed due to: ${ex.getMessage}" +error(s"$msg.") Review Comment: How about `error(msg)`? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java: ## @@ -3215,7 +3211,14 @@ public void replay( String groupId = key.groupId(); String memberId = key.memberId(); -ConsumerGroup consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null); +ConsumerGroup consumerGroup; +try { +consumerGroup = getOrMaybeCreatePersistedConsumerGroup(groupId, value != null); +} catch (IllegalStateException ex) { +// If the group does not exist and a tombstone is replayed, we can ignore it. Review Comment: > Is there any other reason we could throw this IllegalStateException I have the same question. What if the group type is not `CONSUMER`? ## core/src/test/scala/integration/kafka/api/GroupCoordinatorIntegrationTest.scala: ## @@ -12,53 +12,332 @@ */ package kafka.api -import kafka.integration.KafkaServerTestHarness import kafka.log.UnifiedLog -import kafka.server.KafkaConfig +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, Type} +import kafka.test.junit.ClusterTestExtensions import kafka.utils.TestUtils -import org.apache.kafka.clients.consumer.OffsetAndMetadata -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.clients.admin.{Admin, ConsumerGroupDescription} +import org.apache.kafka.clients.consumer.{Consumer, GroupProtocol, OffsetAndMetadata} +import org.apache.kafka.common.{ConsumerGroupState, GroupType, KafkaFuture, TopicPartition} import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.params.ParameterizedTest -import org.junit.jupiter.params.provider.ValueSource import scala.jdk.CollectionConverters._ -import java.util.Properties import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.record.CompressionType import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.server.config.ServerConfigs +import org.junit.jupiter.api.Timeout +import org.junit.jupiter.api.extension.ExtendWith -class GroupCoordinatorIntegrationTest extends KafkaServerTestHarness { - val offsetsTopicCompressionCodec = CompressionType.GZIP - val overridingProps = new Properties() - overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "1") - overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_COMPRESSION_CODEC_CONFIG, offsetsTopicCompressionCodec.id.toString) +import java.time.Duration +import java.util.Collections +import java.util.concurrent.TimeUnit - override def generateConfigs = TestUt
Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]
pegasas commented on code in PR #16791: URL: https://github.com/apache/kafka/pull/16791#discussion_r1730390328 ## streams/src/main/java/org/apache/kafka/streams/Topology.java: ## @@ -820,110 +778,6 @@ public synchronized Topology addReadOnlyStateStore(final StoreBuilder ); } -/** - * Adds a global {@link StateStore} to the topology. - * The {@link StateStore} sources its data from all partitions of the provided input topic. - * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. - * - * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions - * of the input topic. - * - * The provided {@link org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all - * records forwarded from the {@link SourceNode}. - * The supplier should always generate a new instance each time - * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets called. Creating a single - * {@link org.apache.kafka.streams.processor.Processor} object and returning the same object reference in - * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} would be a violation of the supplier pattern - * and leads to runtime exceptions. - * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. - * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used. - * - * @param storeBuilder user defined state store builder - * @param sourceNamename of the {@link SourceNode} that will be automatically added - * @param keyDeserializer the {@link Deserializer} to deserialize keys with - * @param valueDeserializer the {@link Deserializer} to deserialize values with - * @param topic the topic to source the data from - * @param processorName the name of the {@link org.apache.kafka.streams.processor.ProcessorSupplier} - * @param stateUpdateSupplier the instance of {@link org.apache.kafka.streams.processor.ProcessorSupplier} - * @return itself - * @throws TopologyException if the processor of state is already registered - * @deprecated Since 2.7.0. Use {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead. - */ -@Deprecated -public synchronized Topology addGlobalStore(final StoreBuilder storeBuilder, - final String sourceName, - final Deserializer keyDeserializer, - final Deserializer valueDeserializer, - final String topic, - final String processorName, - final org.apache.kafka.streams.processor.ProcessorSupplier stateUpdateSupplier) { -internalTopologyBuilder.addGlobalStore( -new StoreBuilderWrapper(storeBuilder), -sourceName, -null, -keyDeserializer, -valueDeserializer, -topic, -processorName, -() -> ProcessorAdapter.adapt(stateUpdateSupplier.get()), -true -); -return this; -} - -/** - * Adds a global {@link StateStore} to the topology. - * The {@link StateStore} sources its data from all partitions of the provided input topic. - * There will be exactly one instance of this {@link StateStore} per Kafka Streams instance. - * - * A {@link SourceNode} with the provided sourceName will be added to consume the data arriving from the partitions - * of the input topic. - * - * The provided {@link org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an {@link ProcessorNode} that will receive all - * records forwarded from the {@link SourceNode}. - * The supplier should always generate a new instance each time - * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} gets called. Creating a single - * {@link org.apache.kafka.streams.processor.Processor} object and returning the same object reference in - * {@link org.apache.kafka.streams.processor.ProcessorSupplier#get()} would be a violation of the supplier pattern - * and leads to runtime exceptions. - * This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date. - * - * @param storeBuilder user defined key value store builder - * @param sourceNamename of the {@link SourceNode} that will be automatically added - * @param timestampExtractort
Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]
pegasas commented on PR #16791: URL: https://github.com/apache/kafka/pull/16791#issuecomment-2308913465 > @pegasas -- Any updates on this PR? org.apache.kafka.streams.Topology#addProcessor(java.lang.String, org.apache.kafka.streams.processor.ProcessorSupplier, java.lang.String...) It looks like this method still uses by examples. shall we remove it in this loop?  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]
pegasas commented on code in PR #16791: URL: https://github.com/apache/kafka/pull/16791#discussion_r1730392424 ## streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java: ## @@ -292,7 +292,7 @@ Cancellable schedule(final Duration interval, * (including the currently processed record), i.e., it can be considered a high-watermark. * Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration. * - * Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore} (...) Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16331) Remove Deprecated EOSv1
[ https://issues.apache.org/jira/browse/KAFKA-16331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876530#comment-17876530 ] Matthias J. Sax commented on KAFKA-16331: - This is a very complicated ticket... I would not recommend you to pick it up. I think it's best if a seasoned contributor (or even better a committer) works on this one. > Remove Deprecated EOSv1 > --- > > Key: KAFKA-16331 > URL: https://issues.apache.org/jira/browse/KAFKA-16331 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > EOSv1 was deprecated in AK 3.0 via > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2] > * remove conifg > * remove Producer#sendOffsetsToTransaction > * cleanup code -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: remove get prefix for internal IQ methods [kafka]
mjsax commented on code in PR #16954: URL: https://github.com/apache/kafka/pull/16954#discussion_r1730404618 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsMetadataState.java: ## @@ -206,29 +206,29 @@ public synchronized Collection getAllMetadataForTopology(final * if streams is (re-)initializing or {@code null} if the corresponding topic cannot be found, * or null if no matching metadata could be found. */ -public synchronized KeyQueryMetadata getKeyQueryMetadataForKey(final String storeName, - final K key, - final Serializer keySerializer) { +public synchronized KeyQueryMetadata keyQueryMetadataForKey(final String storeName, +final K key, +final Serializer keySerializer) { Objects.requireNonNull(keySerializer, "keySerializer can't be null"); if (topologyMetadata.hasNamedTopologies()) { throw new IllegalArgumentException("Cannot invoke the getKeyQueryMetadataForKey(storeName, key, keySerializer)" Review Comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove get prefix for internal IQ methods [kafka]
mjsax commented on code in PR #16954: URL: https://github.com/apache/kafka/pull/16954#discussion_r1730404855 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java: ## @@ -246,7 +246,7 @@ default boolean commitRequested() { // IQ related methods -StateStore getStore(final String name); +StateStore store(final String name); Review Comment: Yes, but I will do this in a follow up PR to keep the scope of individual PRs small. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17354) StreamThread::setState race condition causes java.lang.RuntimeException: State mismatch PENDING_SHUTDOWN different from STARTING
[ https://issues.apache.org/jira/browse/KAFKA-17354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876537#comment-17876537 ] Anton Liauchuk commented on KAFKA-17354: [~aoli-al] Didn't reproduce the issue with the commit mentioned in the description. Are there any additional changes required to reproduce? Could you please check? > StreamThread::setState race condition causes java.lang.RuntimeException: > State mismatch PENDING_SHUTDOWN different from STARTING > > > Key: KAFKA-17354 > URL: https://issues.apache.org/jira/browse/KAFKA-17354 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Ao Li >Assignee: Anton Liauchuk >Priority: Major > > I saw a test failure in `StreamThreadTest::shouldChangeStateAtStartClose`. A > race condition in `setState` causes an uncaught exception thrown in > `StateListenerStub`. > Basically, the function `setState` allows two threads to call > `stateListener.onChange` concurrently. > This patch will help you to reproduce the failure deterministically. > https://github.com/aoli-al/kafka/commit/033a9a33766740e6843effb9beabfdcb3804846b -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16331) Remove Deprecated EOSv1
[ https://issues.apache.org/jira/browse/KAFKA-16331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876539#comment-17876539 ] Ismael Juma commented on KAFKA-16331: - Seems reasonable. The main thing to call out is the AK 2.5 or newer requirement, right? > Remove Deprecated EOSv1 > --- > > Key: KAFKA-16331 > URL: https://issues.apache.org/jira/browse/KAFKA-16331 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > EOSv1 was deprecated in AK 3.0 via > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2] > * remove conifg > * remove Producer#sendOffsetsToTransaction > * cleanup code -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16331) Remove Deprecated EOSv1
[ https://issues.apache.org/jira/browse/KAFKA-16331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876539#comment-17876539 ] Ismael Juma edited comment on KAFKA-16331 at 8/25/24 7:28 PM: -- Seems reasonable. The main thing to call out is the Kafka broker 2.5 or newer requirement, right? was (Author: ijuma): Seems reasonable. The main thing to call out is the AK 2.5 or newer requirement, right? > Remove Deprecated EOSv1 > --- > > Key: KAFKA-16331 > URL: https://issues.apache.org/jira/browse/KAFKA-16331 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > EOSv1 was deprecated in AK 3.0 via > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2] > * remove conifg > * remove Producer#sendOffsetsToTransaction > * cleanup code -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17354) StreamThread::setState race condition causes java.lang.RuntimeException: State mismatch PENDING_SHUTDOWN different from STARTING
[ https://issues.apache.org/jira/browse/KAFKA-17354?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876540#comment-17876540 ] Ao Li commented on KAFKA-17354: --- [~anton.liauchuk] Yes, I'm able to reproduce the failure. Note that you will only see the exception, but the test will still be passed because Junit only marks test failures if there are exceptions from the main thread. I've pushed another commit to the fork https://github.com/aoli-al/kafka/tree/KAFKA-63 to propagate the error to the main thread. If you run `./gradlew streams:test --rerun --tests StreamThreadTest.shouldChangeStateAtStartClos` you will see the test failure. > StreamThread::setState race condition causes java.lang.RuntimeException: > State mismatch PENDING_SHUTDOWN different from STARTING > > > Key: KAFKA-17354 > URL: https://issues.apache.org/jira/browse/KAFKA-17354 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Ao Li >Assignee: Anton Liauchuk >Priority: Major > > I saw a test failure in `StreamThreadTest::shouldChangeStateAtStartClose`. A > race condition in `setState` causes an uncaught exception thrown in > `StateListenerStub`. > Basically, the function `setState` allows two threads to call > `stateListener.onChange` concurrently. > This patch will help you to reproduce the failure deterministically. > https://github.com/aoli-al/kafka/commit/033a9a33766740e6843effb9beabfdcb3804846b -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]
mjsax commented on PR #16791: URL: https://github.com/apache/kafka/pull/16791#issuecomment-2308982763 There is two overloads. The old one, using `org.apache.kafka.streams.processor.ProcessorSupplier` ``` org.apache.kafka.streams.Topology#addProcessor(java.lang.String, org.apache.kafka.streams.processor.ProcessorSupplier, java.lang.String...) ``` And the new one using `org.apache.kafka.streams.processor.api.ProcessorSupplier` ``` org.apache.kafka.streams.Topology#addProcessor(java.lang.String, org.apache.kafka.streams.processor.ProcessorSupplier, java.lang.String...) ``` Note the different package names. Only the old one is deprecated is should be removed, and as far as I can tell, it's only used in some test (which can also be removed). The screenshot you posted seem to refer to the new one though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]
mjsax commented on code in PR #16791: URL: https://github.com/apache/kafka/pull/16791#discussion_r1730426306 ## streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java: ## @@ -292,8 +292,7 @@ Cancellable schedule(final Duration interval, * (including the currently processed record), i.e., it can be considered a high-watermark. * Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration. * - * Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore} (...) - * and {@link StreamsBuilder#addGlobalStore} (...), + * Note: this method is not supported for global processors {@link StreamsBuilder#addGlobalStore} (...) Review Comment: ```suggestion * Note: this method is not supported for global processors {@link Topology#addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)}``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]
mjsax commented on code in PR #16791: URL: https://github.com/apache/kafka/pull/16791#discussion_r1730426384 ## streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java: ## @@ -292,8 +292,7 @@ Cancellable schedule(final Duration interval, * (including the currently processed record), i.e., it can be considered a high-watermark. * Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration. * - * Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore} (...) - * and {@link StreamsBuilder#addGlobalStore} (...), + * Note: this method is not supported for global processors {@link StreamsBuilder#addGlobalStore} (...) Review Comment: This not correct. Should be something like this ^ ## streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java: ## @@ -292,8 +292,7 @@ Cancellable schedule(final Duration interval, * (including the currently processed record), i.e., it can be considered a high-watermark. * Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration. * - * Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore} (...) - * and {@link StreamsBuilder#addGlobalStore} (...), + * Note: this method is not supported for global processors {@link StreamsBuilder#addGlobalStore} (...) Review Comment: ```suggestion * Note: this method is not supported for global processors {@link Topology#addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]
mjsax commented on code in PR #16791: URL: https://github.com/apache/kafka/pull/16791#discussion_r1730426457 ## streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java: ## @@ -201,8 +201,7 @@ Cancellable schedule(final Duration interval, * (including the currently processed record), i.e., it can be considered a high-watermark. * Stream-time is tracked on a per-task basis and is preserved across restarts and during task migration. * - * Note: this method is not supported for global processors (cf. {@link Topology#addGlobalStore} (...) - * and {@link StreamsBuilder#addGlobalStore} (...), + * Note: this method is not supported for global processors {@link StreamsBuilder#addGlobalStore} (...) Review Comment: ```suggestion * Note: this method is not supported for global processors {@link Topology#addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier)} ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12829: Remove deprecated methods and classes of old Processor API [kafka]
mjsax commented on PR #16791: URL: https://github.com/apache/kafka/pull/16791#issuecomment-2308985631 The build has checkstyle errors: ``` > Task :streams:checkstyleMain -- | 2993 | 09:39:12 AM | [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16791/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java:23:8: Unused import - org.apache.kafka.streams.Topology. [UnusedImports] | 2994 | 09:39:12 AM | [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16791/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java:22:8: Unused import - org.apache.kafka.streams.Topology. [UnusedImports] ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16335: Remove deprecated method of StreamPartitioner [kafka]
mjsax commented on code in PR #15482: URL: https://github.com/apache/kafka/pull/15482#discussion_r1730427154 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ## @@ -570,7 +570,7 @@ public Optional> partitions(final String topic, final String key, f public void shouldUseDefaultPartitionerAsPartitionReturnsNull() { final StreamPartitioner streamPartitioner = -(topic, key, value, numPartitions) -> null; +(topic, key, value, numPartitions) -> Optional.empty(); Review Comment: The test name should be updated... we used `null` for the old method, but `empty` for the new one, which we test now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16335: Remove deprecated method of StreamPartitioner [kafka]
mjsax commented on code in PR #15482: URL: https://github.com/apache/kafka/pull/15482#discussion_r1730427271 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -1230,7 +1230,7 @@ private KTable doJoinOnForeignKey(final KTable forei final StreamPartitioner> foreignResponseSinkPartitioner = tableJoinedInternal.partitioner() == null -? (topic, key, subscriptionResponseWrapper, numPartitions) -> Optional.of(Collections.singleton(subscriptionResponseWrapper.getPrimaryPartition())) +? (topic, key, val, numPartitions) -> val.getPrimaryPartition() == null ? Optional.empty() : Optional.of(Collections.singleton(val.getPrimaryPartition())) Review Comment: Why did you rename `subscriptionResponseWrapper` to `val`? The old name seem to be much more descriptive. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16335: Remove deprecated method of StreamPartitioner [kafka]
mjsax commented on code in PR #15482: URL: https://github.com/apache/kafka/pull/15482#discussion_r1730427271 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java: ## @@ -1230,7 +1230,7 @@ private KTable doJoinOnForeignKey(final KTable forei final StreamPartitioner> foreignResponseSinkPartitioner = tableJoinedInternal.partitioner() == null -? (topic, key, subscriptionResponseWrapper, numPartitions) -> Optional.of(Collections.singleton(subscriptionResponseWrapper.getPrimaryPartition())) +? (topic, key, val, numPartitions) -> val.getPrimaryPartition() == null ? Optional.empty() : Optional.of(Collections.singleton(val.getPrimaryPartition())) Review Comment: Why did you rename `subscriptionResponseWrapper` to `val`? The old name seem to be much more descriptive. Nit: should we call `val.getPrimaryPartition()` twice as the code does now, or better introduce a variable which stores the result and only call it once? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14569: Migrate Kafka Streams tests from Zookeeper to KRaft [kafka]
mjsax commented on PR #15341: URL: https://github.com/apache/kafka/pull/15341#issuecomment-2308987847 > I have some capacity to pickup this one. Nice! This is highly appeciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12829: Remove the deprecated method `init(ProcessorContext, StateStore)` from the `StateStore` interface [kafka]
mjsax commented on code in PR #16906: URL: https://github.com/apache/kafka/pull/16906#discussion_r1730428926 ## streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java: ## @@ -72,14 +71,6 @@ class CachingWindowStore this.maxObservedTimestamp = new AtomicLong(RecordQueue.UNKNOWN); } -@Deprecated -@Override -public void init(final ProcessorContext context, final StateStore root) { -final String changelogTopic = ProcessorContextUtils.changelogFor(context, name(), Boolean.TRUE); -initInternal(asInternalProcessorContext(context), changelogTopic); -super.init(context, root); -} - @Override public void init(final StateStoreContext context, final StateStore root) { final String changelogTopic = ProcessorContextUtils.changelogFor(context, name(), Boolean.TRUE); Review Comment: Can we inline `initInternal` here, too? ## streams/src/main/java/org/apache/kafka/streams/state/internals/InMemorySessionStore.java: ## @@ -95,16 +93,17 @@ public String name() { return name; } -@Deprecated @Override -public void init(final ProcessorContext context, final StateStore root) { +public void init(final StateStoreContext stateStoreContext, + final StateStore root) { +this.stateStoreContext = stateStoreContext; final String threadId = Thread.currentThread().getName(); -final String taskName = context.taskId().toString(); +final String taskName = stateStoreContext.taskId().toString(); // The provided context is not required to implement InternalProcessorContext, // If it doesn't, we can't record this metric. -if (context instanceof InternalProcessorContext) { -this.context = (InternalProcessorContext) context; +if (stateStoreContext instanceof InternalProcessorContext) { Review Comment: Should we refactor this method and use `asInternalContext` similar to other methods instead? ## streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueChangeBuffer.java: ## @@ -196,15 +195,6 @@ public void setSerdesIfNull(final SerdeGetter getter) { keySerde = keySerde == null ? (Serde) getter.keySerde() : keySerde; valueSerde = valueSerde == null ? FullChangeSerde.wrap((Serde) getter.valueSerde()) : valueSerde; } - -@Deprecated -@Override -public void init(final ProcessorContext context, final StateStore root) { -this.context = ProcessorContextUtils.asInternalProcessorContext(context); -changelogTopic = ProcessorContextUtils.changelogFor(context, name(), Boolean.TRUE); -init(root); Review Comment: Seem this `init` is the same and `internalInit` in other classes -- can we inline it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12824: Delete unused doBranch method [kafka]
mjsax commented on PR #16981: URL: https://github.com/apache/kafka/pull/16981#issuecomment-2308994715 Thank you both! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16330 : remove deprecated methods of TaskId, private params [kafka]
mjsax commented on PR #16985: URL: https://github.com/apache/kafka/pull/16985#issuecomment-2308995761 On build failed with compilation error: ``` > Task :streams:compileJava -- more_vert | 2701 | 06:13:33 AM | /home/jenkins/workspace/Kafka_kafka-pr_PR-16985/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:481: error: incompatible types: inference variable K#1 has incompatible equality constraints V#2,K#2,K#3 | 2702 | 06:13:33 AM | addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer, valSerializer, partitioner, predecessorNames); | 2703 | 06:13:33 AM | ^ | 2704 | 06:13:33 AM | where K#1,V#1,V#2,K#2,K#3 are type-variables: | 2705 | 06:13:33 AM | K#1 extends Object declared in method addSink(String,TopicNameExtractor,Serializer,Serializer,StreamPartitioner,String...) | 2706 | 06:13:33 AM | V#1 extends Object declared in method addSink(String,TopicNameExtractor,Serializer,Serializer,StreamPartitioner,String...) | 2707 | 06:13:33 AM | V#2 extends Object declared in method addSink(String,String,Serializer,Serializer,StreamPartitioner,String...) | 2708 | 06:13:33 AM | K#2 extends Object declared in method addSink(String,String,Serializer,Serializer,StreamPartitioner,String...) | 2709 | 06:13:33 AM | K#3 extends Object declared in class StaticTopicNameExtractor | 2720 | 06:13:33 AM | 1 error | 2721 | 06:13:33 AM | ``` Not sure how the others could pass? -- There is also bunch of test failures; are they related? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16330 : remove deprecated methods of TaskId, private params [kafka]
mjsax commented on code in PR #16985: URL: https://github.com/apache/kafka/pull/16985#discussion_r1730431915 ## streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java: ## @@ -41,10 +34,10 @@ public class TaskId implements Comparable { /** The ID of the subtopology, aka topicGroupId. */ @Deprecated Review Comment: We can remove this annotation ## streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java: ## @@ -41,10 +34,10 @@ public class TaskId implements Comparable { /** The ID of the subtopology, aka topicGroupId. */ @Deprecated -public final int topicGroupId; +private final int topicGroupId; /** The ID of the partition. */ @Deprecated Review Comment: As above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16330 : remove deprecated methods of TaskId, private params [kafka]
mjsax commented on PR #16985: URL: https://github.com/apache/kafka/pull/16985#issuecomment-2308996189 This PR should also remove `TaskMetadata#taskId()`, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16330 : remove deprecated methods of TaskId, private params [kafka]
mjsax commented on PR #16985: URL: https://github.com/apache/kafka/pull/16985#issuecomment-2308998527 Ah. Never mind. `TaskMetadata` was remove by the other PR you did already... (now I remember why I did leave the commend on the Jira ticket about, both should be worked on together...) -- it's easily confusing. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17371: Flaky test in DefaultTaskExecutorTest.shouldUnassignTaskWhenRequired [kafka]
mjsax commented on PR #16941: URL: https://github.com/apache/kafka/pull/16941#issuecomment-2309001082 > I'm not entirely sure how this method will be used. From the docs, it seems that client code can invoke it since it is a public method declared in the interface. It's all internal. Note the package name `...internal...`. So if there is any incorrect usage, it's a bug in Kafka Streams itself. A Kafka Streams user, would never call it. Having said this, it could still make sense to refactor to avoid that we introduce bug accidentally. However, I am not super familiar with this part of the code and thus cannot judge... Maybe @cadonna or @lucasbru can comment on this question? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17216) StreamsConfig STATE_DIR_CONFIG
[ https://issues.apache.org/jira/browse/KAFKA-17216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-17216. - Resolution: Invalid Cf the reply on GitHub – caused by version miss-match. > StreamsConfig STATE_DIR_CONFIG > -- > > Key: KAFKA-17216 > URL: https://issues.apache.org/jira/browse/KAFKA-17216 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.8.0 >Reporter: raphaelauv >Priority: Major > > I can't use the class StreamsConfig > it fail with Caused by: java.lang.ExceptionInInitializerError at > StreamsConfig.java:866 > problem is not present in 3.7.0 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17100: GlobalStreamThread#start should not busy-wait [kafka]
mjsax merged PR #16914: URL: https://github.com/apache/kafka/pull/16914 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]
mumrah commented on code in PR #16998: URL: https://github.com/apache/kafka/pull/16998#discussion_r1730436780 ## checkstyle/suppressions.xml: ## @@ -221,22 +221,7 @@ files="^(?!.*[\\/]org[\\/]apache[\\/]kafka[\\/]streams[\\/].*$)"/> - - - - - - - - - - Review Comment: I don't think you meant to delete these -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17100: GlobalStreamThread#start should not busy-wait [kafka]
mjsax commented on PR #16914: URL: https://github.com/apache/kafka/pull/16914#issuecomment-2309007974 Thanks for the PR @raminqaf -- merged to `trunk`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]
chia7712 commented on PR #16998: URL: https://github.com/apache/kafka/pull/16998#issuecomment-2309008426 ``` ``` ``` ``` ``` ``` @xijiu please remove above rules also, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16331) Remove Deprecated EOSv1
[ https://issues.apache.org/jira/browse/KAFKA-16331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876545#comment-17876545 ] Matthias J. Sax commented on KAFKA-16331: - Yes, that's also important. Thanks for the reminder. My main point was really about internal refactoring to simplify the code base by removing EOSv1 related code. Strictly speaking, it would be enough to just remove the config from `StreamsConfig` so users cannot enable EOSv1 any longer to resolve this ticket; internal refactoring could be done afterwards. However, I would like to use this ticket for internal refactoring, too (at least he lions share – we might do more refactoring down the line...). > Remove Deprecated EOSv1 > --- > > Key: KAFKA-16331 > URL: https://issues.apache.org/jira/browse/KAFKA-16331 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Matthias J. Sax >Priority: Blocker > Fix For: 4.0.0 > > > EOSv1 was deprecated in AK 3.0 via > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-732%3A+Deprecate+eos-alpha+and+replace+eos-beta+with+eos-v2] > * remove conifg > * remove Producer#sendOffsetsToTransaction > * cleanup code -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17419) Disable SslAdminIntegrationTest#testExpireDelegationToken for 3.8 and 3.7
Chia-Ping Tsai created KAFKA-17419: -- Summary: Disable SslAdminIntegrationTest#testExpireDelegationToken for 3.8 and 3.7 Key: KAFKA-17419 URL: https://issues.apache.org/jira/browse/KAFKA-17419 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai When backport KAFKA-17315, I open KAFKA_17417 as follow-up but I forgot to disable `SslAdminIntegrationTest#testExpireDelegationToken` ... The test currently is failed, so it should be disabled and re-enabled in the future by KAFKA-17417 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]
chia7712 commented on PR #16995: URL: https://github.com/apache/kafka/pull/16995#issuecomment-2309011897 The failed test is caused by my previous backport. I forgot to disable it ... open https://issues.apache.org/jira/browse/KAFKA-17419 to disable it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14957: Update-Description-String [kafka]
mjsax commented on code in PR #13909: URL: https://github.com/apache/kafka/pull/13909#discussion_r1730439203 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -839,7 +839,8 @@ public class StreamsConfig extends AbstractConfig { Type.STRING, System.getProperty("java.io.tmpdir") + File.separator + "kafka-streams", Importance.HIGH, -STATE_DIR_DOC) +STATE_DIR_DOC, +"${java.io.tmpdir}") Review Comment: Thanks for confirming. No problem at all :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17053: [Minor] Restructure build.gradle to configure publishing last [kafka]
chia7712 commented on PR #16950: URL: https://github.com/apache/kafka/pull/16950#issuecomment-2309013403 > I will modify this PR and refactor build.gradle to move the shadowJar block before the publishing block. @KTKTK-HZ Thanks a bunch! BTW, could you please remove the "[Minor]"? The value of patch you will contribute is NOT MINOR I feel :smile: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]
chia7712 commented on PR #16995: URL: https://github.com/apache/kafka/pull/16995#issuecomment-2309013599 another failed test pass on my local. will merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]
chia7712 commented on PR #16996: URL: https://github.com/apache/kafka/pull/16996#issuecomment-2309016578 the failed test is traced by https://issues.apache.org/jira/browse/KAFKA-17419. other failed tests pass on my local -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]
chia7712 merged PR #16996: URL: https://github.com/apache/kafka/pull/16996 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17360 local log retention ms/bytes "-2" is not treated correctly [kafka]
chia7712 merged PR #16995: URL: https://github.com/apache/kafka/pull/16995 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17360) local log retention ms/bytes "-2" is not treated correctly
[ https://issues.apache.org/jira/browse/KAFKA-17360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876547#comment-17876547 ] Chia-Ping Tsai commented on KAFKA-17360: 3.7: https://github.com/apache/kafka/commit/57b6c2ef98d8177ab0e43f7653c8079d4daa8789 3.8: https://github.com/apache/kafka/commit/d9a26a95a70ac5632fb36d7e1da4728a86e789b5 > local log retention ms/bytes "-2" is not treated correctly > -- > > Key: KAFKA-17360 > URL: https://issues.apache.org/jira/browse/KAFKA-17360 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Kuan Po Tseng >Priority: Critical > Fix For: 4.0.0, 3.9.0, 3.7.2, 3.8.1 > > > # When the local.retention.ms/bytes is set to -2, we didn't replace it with > the server-side retention.ms/bytes config, so the -2 local retention won't > take effect. > # When setting retention.ms/bytes to -2, we can notice this log message: > {code:java} > Deleting segment LogSegment(baseOffset=10045, size=1037087, > lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to > local log retention size -2 breach. Local log size after deletion will be > 13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]{code} > This is not helpful for users. We should replace -2 with real retention value > when logging. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-17360) local log retention ms/bytes "-2" is not treated correctly
[ https://issues.apache.org/jira/browse/KAFKA-17360?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17360. Resolution: Fixed > local log retention ms/bytes "-2" is not treated correctly > -- > > Key: KAFKA-17360 > URL: https://issues.apache.org/jira/browse/KAFKA-17360 > Project: Kafka > Issue Type: Bug >Reporter: Luke Chen >Assignee: Kuan Po Tseng >Priority: Critical > Fix For: 4.0.0, 3.9.0, 3.7.2, 3.8.1 > > > # When the local.retention.ms/bytes is set to -2, we didn't replace it with > the server-side retention.ms/bytes config, so the -2 local retention won't > take effect. > # When setting retention.ms/bytes to -2, we can notice this log message: > {code:java} > Deleting segment LogSegment(baseOffset=10045, size=1037087, > lastModifiedTime=1724040653922, largestRecordTimestamp=1724040653835) due to > local log retention size -2 breach. Local log size after deletion will be > 13435280. (kafka.log.UnifiedLog) [kafka-scheduler-6]{code} > This is not helpful for users. We should replace -2 with real retention value > when logging. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17327: Add support of group in kafka-configs.sh [kafka]
chia7712 commented on code in PR #16887: URL: https://github.com/apache/kafka/pull/16887#discussion_r1730446002 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -256,6 +278,32 @@ public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception { } } +@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { +@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), Review Comment: @DL1231 I feel that is false limit. Could you adjust the checkstyle rule? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17420) Fix flaky StreamThreadTest.tearDown
Chia-Ping Tsai created KAFKA-17420: -- Summary: Fix flaky StreamThreadTest.tearDown Key: KAFKA-17420 URL: https://issues.apache.org/jira/browse/KAFKA-17420 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai see https://github.com/apache/kafka/actions/runs/10549075483?pr=16954 {code:java} org.opentest4j.AssertionFailedError: expected: but was: at app//org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at app//org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at app//org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:31) at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:183) at app//org.apache.kafka.streams.processor.internals.StreamThreadTest.tearDown(StreamThreadTest.java:240) at java.base@17.0.12/java.lang.reflect.Method.invoke(Method.java:569) at java.base@17.0.12/java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183) {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17419) Disable SslAdminIntegrationTest#testExpireDelegationToken for 3.8 and 3.7
[ https://issues.apache.org/jira/browse/KAFKA-17419?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876549#comment-17876549 ] kangning.li commented on KAFKA-17419: - hi [~chia7712] , I am interested in this issue, cloud you assign it to me? Thanks > Disable SslAdminIntegrationTest#testExpireDelegationToken for 3.8 and 3.7 > - > > Key: KAFKA-17419 > URL: https://issues.apache.org/jira/browse/KAFKA-17419 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Major > > When backport KAFKA-17315, I open KAFKA_17417 as follow-up but I forgot to > disable `SslAdminIntegrationTest#testExpireDelegationToken` ... > The test currently is failed, so it should be disabled and re-enabled in the > future by KAFKA-17417 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17419) Disable SslAdminIntegrationTest#testExpireDelegationToken for 3.8 and 3.7
[ https://issues.apache.org/jira/browse/KAFKA-17419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-17419: -- Assignee: kangning.li (was: Chia-Ping Tsai) > Disable SslAdminIntegrationTest#testExpireDelegationToken for 3.8 and 3.7 > - > > Key: KAFKA-17419 > URL: https://issues.apache.org/jira/browse/KAFKA-17419 > Project: Kafka > Issue Type: Sub-task >Reporter: Chia-Ping Tsai >Assignee: kangning.li >Priority: Major > > When backport KAFKA-17315, I open KAFKA_17417 as follow-up but I forgot to > disable `SslAdminIntegrationTest#testExpireDelegationToken` ... > The test currently is failed, so it should be disabled and re-enabled in the > future by KAFKA-17417 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: remove get prefix for internal IQ methods [kafka]
chia7712 commented on PR #16954: URL: https://github.com/apache/kafka/pull/16954#issuecomment-2309029491 I open https://issues.apache.org/jira/browse/KAFKA-17420 to trace `StreamThreadTest.tearDown`, and others are known flaky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: remove get prefix for internal IQ methods [kafka]
chia7712 merged PR #16954: URL: https://github.com/apache/kafka/pull/16954 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17411: Create local state Standbys on start [kafka]
mjsax commented on code in PR #16922: URL: https://github.com/apache/kafka/pull/16922#discussion_r1730446985 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ## @@ -182,6 +199,120 @@ private boolean lockStateDirectory() { return stateDirLock != null; } +public void initializeTasksForLocalState(final TopologyMetadata topologyMetadata, final StreamsMetricsImpl streamsMetrics) { +final List nonEmptyTaskDirectories = listNonEmptyTaskDirectories(); +if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { +final LogContext logContext = new LogContext("main-thread "); +final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics); +final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config); +final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals()); + +// discover all non-empty task directories in StateDirectory +for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) { +final String dirName = taskDirectory.file().getName(); +final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology()); +final ProcessorTopology topology = topologyMetadata.buildSubtopology(id); +final Set inputPartitions = topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, id.partition())).collect(Collectors.toSet()); + +// create a StandbyTask for each one +if (topology.hasStateWithChangelogs()) { Review Comment: Why do we need this additional check? -- Given that we have a non-empty task directory, it seems redundant? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java: ## @@ -182,6 +199,120 @@ private boolean lockStateDirectory() { return stateDirLock != null; } +public void initializeTasksForLocalState(final TopologyMetadata topologyMetadata, final StreamsMetricsImpl streamsMetrics) { +final List nonEmptyTaskDirectories = listNonEmptyTaskDirectories(); +if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) { +final LogContext logContext = new LogContext("main-thread "); +final ThreadCache dummyCache = new ThreadCache(logContext, 0, streamsMetrics); +final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config); +final boolean stateUpdaterEnabled = StreamsConfig.InternalConfig.stateUpdaterEnabled(config.originals()); + +// discover all non-empty task directories in StateDirectory +for (final TaskDirectory taskDirectory : nonEmptyTaskDirectories) { +final String dirName = taskDirectory.file().getName(); +final TaskId id = parseTaskDirectoryName(dirName, taskDirectory.namedTopology()); +final ProcessorTopology topology = topologyMetadata.buildSubtopology(id); +final Set inputPartitions = topology.sourceTopics().stream().map(topic -> new TopicPartition(topic, id.partition())).collect(Collectors.toSet()); + +// create a StandbyTask for each one +if (topology.hasStateWithChangelogs()) { +final ProcessorStateManager stateManager = new ProcessorStateManager( +id, +Task.TaskType.STANDBY, +eosEnabled, +logContext, +this, +null, +topology.storeToChangelogTopic(), +inputPartitions, +stateUpdaterEnabled +); + +final InternalProcessorContext context = new ProcessorContextImpl( +id, +config, +stateManager, +streamsMetrics, +dummyCache +); + +final Task task = new StandbyTask( +id, +inputPartitions, +topology, +topologyMetadata.taskConfig(id), +streamsMetrics, +stateManager, +this, +dummyCache, +context +); + +// initialize and suspend new Tasks +try { +task.initializeIfNeeded(); +task.suspend(); + +// add new Tasks to tasksForLocalState +tasksForLocalState.put(id, task); +} catch (final TaskCorruptedException e) { +// Task is corrupt - wipe it
Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]
xijiu commented on code in PR #16998: URL: https://github.com/apache/kafka/pull/16998#discussion_r1730499267 ## checkstyle/suppressions.xml: ## @@ -221,22 +221,7 @@ files="^(?!.*[\\/]org[\\/]apache[\\/]kafka[\\/]streams[\\/].*$)"/> - - - - - - - - - - Review Comment: @mumrah Hi David, thanks very much for code review, my default, I will fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]
xijiu commented on PR #16998: URL: https://github.com/apache/kafka/pull/16998#issuecomment-2309123331 @chia7712 @mumrahI have fixed above issues, PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17416: Add a checkstyle rule to suppress all generated code [kafka]
xijiu commented on code in PR #16998: URL: https://github.com/apache/kafka/pull/16998#discussion_r1730499267 ## checkstyle/suppressions.xml: ## @@ -221,22 +221,7 @@ files="^(?!.*[\\/]org[\\/]apache[\\/]kafka[\\/]streams[\\/].*$)"/> - - - - - - - - - - Review Comment: @mumrah Hi David, thanks very much for code review, my fault, I will fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17399: Apply LambdaValidator to code base [kafka]
chia7712 commented on PR #16980: URL: https://github.com/apache/kafka/pull/16980#issuecomment-2309141760 @xijiu Could you please rebase code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15909: Throw error when consumer configured with empty/whitespace-only group.id for LegacyKafkaConsumer [kafka]
chia7712 commented on PR #16933: URL: https://github.com/apache/kafka/pull/16933#issuecomment-2309143513 @FrankYang0529 could you please rebase code to trigger QA again? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15909: Throw error when consumer configured with empty/whitespace-only group.id for LegacyKafkaConsumer [kafka]
FrankYang0529 commented on PR #16933: URL: https://github.com/apache/kafka/pull/16933#issuecomment-2309151303 > @FrankYang0529 could you please rebase code to trigger QA again? Rebased it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17137[part-5]: Ensure Admin APIs are properly tested [kafka]
chia7712 commented on code in PR #16905: URL: https://github.com/apache/kafka/pull/16905#discussion_r1730549155 ## core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala: ## @@ -331,6 +332,54 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException]) } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = { +client = createAdminClient +val timeout = Long.MaxValue + +val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout) +val token = client.createDelegationToken(options).delegationToken().get() + + assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp) +assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateDelegationTokenWithNegativeTimeout(quorum: String): Unit = { +client = createAdminClient +val timeout = -1 + +val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout) +val token = client.createDelegationToken(options).delegationToken().get() + + assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp) +assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testExpiredTimeStampLargerThanMaxLifeStamp(quorum: String): Unit = { +client = createAdminClient +val timeout = -1 + +val createOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout) +val token = client.createDelegationToken(createOptions).delegationToken().get() + + assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp) +assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp) + +TestUtils.waitUntilTrue(() => brokers.forall(server => server.tokenCache.tokens().size() == 1), + "Timed out waiting for token to propagate to all servers") + +val expiredOptions = new ExpireDelegationTokenOptions().expiryTimePeriodMs(token.tokenInfo.maxTimestamp) Review Comment: All we want to verify is the expired ts is never larger the max timestamp, so we should set `maxTimestamp + 1`, right? ## core/src/test/scala/integration/kafka/api/SaslSslAdminIntegrationTest.scala: ## @@ -331,6 +332,54 @@ class SaslSslAdminIntegrationTest extends BaseAdminIntegrationTest with SaslSetu assertFutureExceptionTypeEquals(results.values.get(emptyResourceNameAcl), classOf[InvalidRequestException]) } + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateDelegationTokenWithLargeTimeout(quorum: String): Unit = { +client = createAdminClient +val timeout = Long.MaxValue + +val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout) +val token = client.createDelegationToken(options).delegationToken().get() + + assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp) +assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testCreateDelegationTokenWithNegativeTimeout(quorum: String): Unit = { +client = createAdminClient +val timeout = -1 + +val options = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout) +val token = client.createDelegationToken(options).delegationToken().get() + + assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp) +assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp) + } + + @ParameterizedTest + @ValueSource(strings = Array("zk", "kraft")) + def testExpiredTimeStampLargerThanMaxLifeStamp(quorum: String): Unit = { +client = createAdminClient +val timeout = -1 + +val createOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(timeout) +val token = client.createDelegationToken(createOptions).delegationToken().get() + + assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, token.tokenInfo.maxTimestamp - token.tokenInfo.issueTimestamp) +assertTrue(token.tokenInfo.maxTimestamp >= token.tokenInfo.expiryTimestamp) + +TestUtils.waitUntilTrue(() => brokers.forall(server => server.tokenCache.tokens().size() == 1), + "Timed out waiting for token to propag
[PR] KAFKA-17382: cleanup out-of-date configs of config_property [kafka]
gongxuanzhang opened a new pull request, #16999: URL: https://github.com/apache/kafka/pull/16999 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17137: Feat admin client it acl configs [kafka]
chia7712 commented on code in PR #16648: URL: https://github.com/apache/kafka/pull/16648#discussion_r1730562364 ## clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java: ## @@ -25,10 +25,19 @@ * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving -public class ExpireDelegationTokenOptions extends AbstractOptions { +public class ExpireDelegationTokenOptions Review Comment: Could you please revert this unrelated change? ## clients/src/main/java/org/apache/kafka/clients/admin/ExpireDelegationTokenOptions.java: ## @@ -25,10 +25,19 @@ * The API of this class is evolving, see {@link Admin} for details. */ @InterfaceStability.Evolving -public class ExpireDelegationTokenOptions extends AbstractOptions { +public class ExpireDelegationTokenOptions +extends AbstractOptions { + private long expiryTimePeriodMs = -1L; -public ExpireDelegationTokenOptions expiryTimePeriodMs(long expiryTimePeriodMs) { +/** + * @param expiryTimePeriodMs the time period until we should expire this token. + * {@code expiryTimePeriodMs} >= 0: the token will update the `expiration timestamp` if the current expiration timestamp is small than (now + expiryTimePeriodMs). Review Comment: Sorry that could you please revise it to "{@code expiryTimePeriodMs} >= 0: the token will update the `expiration timestamp` to min(now + expiryTimePeriodMs, maxTimestamp)` ## core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala: ## @@ -106,19 +150,22 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { config.put(AdminClientConfig.CLIENT_ID_CONFIG, clientId) client = Admin.create(config) -val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> clientId).asJava) -val configEntries = Map(QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG -> 1.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 3.0) -client.alterClientQuotas(Seq(new ClientQuotaAlteration(entity, configEntries.map {case (k, v) => - new ClientQuotaAlteration.Op(k,v)}.asJavaCollection)).asJavaCollection).all.get +try { + val entity = new ClientQuotaEntity(Map(ClientQuotaEntity.CLIENT_ID -> clientId).asJava) + val configEntries = Map(QuotaConfigs.CONTROLLER_MUTATION_RATE_OVERRIDE_CONFIG -> 1.0, QuotaConfigs.CONSUMER_BYTE_RATE_OVERRIDE_CONFIG -> 3.0) + client.alterClientQuotas(Seq(new ClientQuotaAlteration(entity, configEntries.map { case (k, v) => +new ClientQuotaAlteration.Op(k, v) + }.asJavaCollection)).asJavaCollection).all.get -TestUtils.waitUntilTrue(() => { - // wait for our ClientQuotaEntity to be set - client.describeClientQuotas(ClientQuotaFilter.all()).entities().get().size == 1 -}, "Timed out waiting for quota config to be propagated to all servers") + TestUtils.waitUntilTrue(() => { +// wait for our ClientQuotaEntity to be set + client.describeClientQuotas(ClientQuotaFilter.all()).entities().get().size == 1 + }, "Timed out waiting for quota config to be propagated to all servers") -val quotaEntities = client.describeClientQuotas(ClientQuotaFilter.all()).entities().get() + val quotaEntities = client.describeClientQuotas(ClientQuotaFilter.all()).entities().get() -assertEquals(configEntries,quotaEntities.get(entity).asScala) + assertEquals(configEntries, quotaEntities.get(entity).asScala) +} finally client.close(time.Duration.ZERO) Review Comment: this client is created normally (correct port), so it can be closed as usual. that means we don't need to add try-finally, as it will be closed by the `@AfterEach` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17382: cleanup out-of-date configs of config_property [kafka]
chia7712 commented on PR #16999: URL: https://github.com/apache/kafka/pull/16999#issuecomment-2309176757 @gongxuanzhang thanks for this patch. the changes are beyond the jira. Could you please focus on "config_property" file? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17327: Add support of group in kafka-configs.sh [kafka]
DL1231 commented on code in PR #16887: URL: https://github.com/apache/kafka/pull/16887#discussion_r1730575833 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -256,6 +278,32 @@ public void testDynamicBrokerConfigUpdateUsingKraft() throws Exception { } } +@ClusterTest(types = {Type.KRAFT, Type.CO_KRAFT}, serverProperties = { +@ClusterConfigProperty(key = "group.coordinator.new.enable", value = "true"), +@ClusterConfigProperty(key = "group.coordinator.rebalance.protocols", value = "classic,consumer"), Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17382: cleanup out-of-date configs of config_property [kafka]
gongxuanzhang closed pull request #16999: KAFKA-17382: cleanup out-of-date configs of config_property URL: https://github.com/apache/kafka/pull/16999 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-12829: Remove deprecated Topology#addGlobalStore of old Processor API [kafka]
pegasas commented on PR #16791: URL: https://github.com/apache/kafka/pull/16791#issuecomment-2309191228 > The build has checkstyle errors: > > ``` > > Task :streams:checkstyleMain > -- > | 2993 | 09:39:12 AM | [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16791/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java:23:8: Unused import - org.apache.kafka.streams.Topology. [UnusedImports] > | 2994 | 09:39:12 AM | [ant:checkstyle] [ERROR] /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-16791/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java:22:8: Unused import - org.apache.kafka.streams.Topology. [UnusedImports] > ``` fixed these checkstyle errors. Is there any commands with gradle which I can run in local? like mvn spotless:apply or something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17367: Share coordinator impl. Introduce infra classes [1/N] [kafka]
chia7712 commented on code in PR #16921: URL: https://github.com/apache/kafka/pull/16921#discussion_r1730578171 ## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinator.java: ## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.share; + +import org.apache.kafka.common.message.ReadShareGroupStateRequestData; +import org.apache.kafka.common.message.ReadShareGroupStateResponseData; +import org.apache.kafka.common.message.WriteShareGroupStateRequestData; +import org.apache.kafka.common.message.WriteShareGroupStateResponseData; +import org.apache.kafka.common.requests.RequestContext; + +import java.util.OptionalInt; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntSupplier; + +public interface ShareCoordinator { +short SHARE_SNAPSHOT_RECORD_KEY_VERSION = 0; +short SHARE_SNAPSHOT_RECORD_VALUE_VERSION = 0; +short SHARE_UPDATE_RECORD_KEY_VERSION = 1; +short SHARE_UPDATE_RECORD_VALUE_VERSION = 1; Review Comment: Should we change the version defined in json (https://github.com/apache/kafka/blob/trunk/share-coordinator/src/main/resources/common/message/ShareUpdateValue.json#L21) to `1` for consistency? ## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRecordSerde.java: ## @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.coordinator.common.runtime; + +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.ByteBufferAccessor; +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; + +/** + * Serializer/Deserializer for {@link CoordinatorRecord}. The format is defined below: + * + * record_key = [record_type key_message] + * record_value = [value_version value_message] + * + * record_type : The record type is currently define as the version of the key + * {@link ApiMessageAndVersion} object. + * key_message : The serialized message of the key {@link ApiMessageAndVersion} object. + * value_version : The value version is currently define as the version of the value + * {@link ApiMessageAndVersion} object. + * value_message : The serialized message of the value {@link ApiMessageAndVersion} object. + * + */ +public abstract class CoordinatorRecordSerde implements Serializer, Deserializer { +@Override +public byte[] serializeKey(CoordinatorRecord record) { +// Record does not accept a null key. +return MessageUtil.toVersionPrefixedBytes( +record.key().version(), +record.key().message() +); +} + +@Override +public byte[] serializeValue(CoordinatorRecord record) { +// Tombstone is represented with a null value. +if (record.value() == null) { +return null; +} else { +return MessageUtil.toVersionPrefixedBytes( +record.value().version(), +record.value().message() +); +} +} + +@Override +public CoordinatorRecord deserialize( +ByteBuffer keyBuffer, +ByteBuffer valueBuffer +) throws RuntimeE
Re: [PR] KAFKA-14262: Deletion of MirrorMaker v1 deprecated classes & tests [kafka]
chia7712 commented on PR #16879: URL: https://github.com/apache/kafka/pull/16879#issuecomment-2309201764 @abhi-ksolves could you please remove related scripts and e2e? https://github.com/apache/kafka/blob/trunk/tests/kafkatest/services/mirror_maker.py https://github.com/apache/kafka/blob/trunk/tests/kafkatest/tests/core/mirror_maker_test.py https://github.com/apache/kafka/blob/trunk/bin/kafka-mirror-maker.sh https://github.com/apache/kafka/blob/trunk/bin/windows/kafka-mirror-maker.bat -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17399: Apply LambdaValidator to code base [kafka]
xijiu commented on PR #16980: URL: https://github.com/apache/kafka/pull/16980#issuecomment-2309230891 > @xijiu Could you please rebase code? @chia7712 Yeap, I have already rebased the branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17062: handle dangling "copy_segment_start" state when deleting remote logs [kafka]
showuon commented on code in PR #16959: URL: https://github.com/apache/kafka/pull/16959#discussion_r1730617565 ## core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java: ## @@ -2530,6 +2635,21 @@ public void testDeleteLogSegmentDueToRetentionTimeBreach(int segmentCount, verifyDeleteLogSegment(segmentMetadataList, deletableSegmentCount, currentLeaderEpoch); } +private void verifyRemoteDeleteMetrics(long remoteDeleteLagBytes, long remoteDeleteLagSegments) { +assertEquals(remoteDeleteLagBytes, safeLongYammerMetricValue("RemoteDeleteLagBytes"), +String.format("Expected to find %d for RemoteDeleteLagBytes metric value, but found %d", +remoteDeleteLagBytes, safeLongYammerMetricValue("RemoteDeleteLagBytes"))); +assertEquals(remoteDeleteLagBytes, safeLongYammerMetricValue("RemoteDeleteLagBytes"), +String.format("Expected to find %d for RemoteDeleteLagBytes metric value, but found %d", +remoteDeleteLagBytes, safeLongYammerMetricValue("RemoteDeleteLagBytes"))); +assertEquals(remoteDeleteLagSegments, safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic), +String.format("Expected to find %d for RemoteDeleteLagSegments for 'Leader' topic metric value, but found %d", +remoteDeleteLagSegments, safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic))); +assertEquals(remoteDeleteLagSegments, safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic), +String.format("Expected to find %d for RemoteDeleteLagSegments for 'Leader' topic metric value, but found %d", +remoteDeleteLagSegments, safeLongYammerMetricValue("RemoteDeleteLagSegments,topic=" + leaderTopic))); +} Review Comment: Oh, nice catch! Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 Tests for ConfigCommand of DynamicBrokerReconfigurationTest rewritten in java [kafka]
github-actions[bot] commented on PR #15848: URL: https://github.com/apache/kafka/pull/15848#issuecomment-2309238540 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ConfigCommand rewritten to java [kafka]
github-actions[bot] commented on PR #15417: URL: https://github.com/apache/kafka/pull/15417#issuecomment-2309238666 This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has merge conflicts, please update it with the latest from trunk (or appropriate release branch) If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17382: cleanup out-of-date configs of config_property [kafka]
gongxuanzhang opened a new pull request, #17000: URL: https://github.com/apache/kafka/pull/17000 fix https://issues.apache.org/jira/browse/KAFKA-17382 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17397) Ensure ClassicKafkaConsumer sends leave request on close even if interrupted
[ https://issues.apache.org/jira/browse/KAFKA-17397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876576#comment-17876576 ] Arpit Goyal commented on KAFKA-17397: - [~kirktrue] Can I pick this up ? > Ensure ClassicKafkaConsumer sends leave request on close even if interrupted > > > Key: KAFKA-17397 > URL: https://issues.apache.org/jira/browse/KAFKA-17397 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.8.0, 3.9.0 >Reporter: Kirk True >Priority: Major > Labels: integration-tests > > During testing for KAFKA-16985, a new, parameterized integration test was > added to {{PlaintextConsumerTest}} named > {{{}testCloseLeavesGroupOnInterrupt(){}}}. When the test is executed locally, > it passes using both the {{AsyncKafkaConsumer}} and the > {{{}ClassicKafkaConsumer{}}}. However, when the test is run in the Apache CI > environment, it passes for the {{AsyncKafkaConsumer}} but fails for the > {{{}ClassicKafkaConsumer{}}}. Rather than hold up KAFKA-16985, this Jira was > filed to investigate and fix the {{{}ClassicKafkaConsumer{}}}. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17062: handle dangling "copy_segment_start" state when deleting remote logs [kafka]
showuon commented on code in PR #16959: URL: https://github.com/apache/kafka/pull/16959#discussion_r1730655484 ## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ## @@ -1254,6 +1255,20 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE canProcess = false; continue; } + +if (RemoteLogSegmentState.COPY_SEGMENT_STARTED.equals(metadata.state())) { +// get the current segment state here to avoid the race condition that before the loop, it's under copying process, +// but then completed. In this case, segmentIdsBeingCopied will not contain this id, so we might +// delete this segment unexpectedly. +Optional curMetadata = remoteLogMetadataManager.remoteLogSegmentMetadata( Review Comment: > Instead of deleting the dangling segments in the same iteration, Can we note down the dangling segments in the current iteration and delete them in the next iteration of cleanupExpiredRemoteLogSegments? This is a good suggestion! Let me update the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org