[jira] [Created] (KAFKA-16430) The group-metadata-manager thread is always in a loading state and occupies one CPU, unable to end.
Gao Fei created KAFKA-16430: --- Summary: The group-metadata-manager thread is always in a loading state and occupies one CPU, unable to end. Key: KAFKA-16430 URL: https://issues.apache.org/jira/browse/KAFKA-16430 Project: Kafka Issue Type: Bug Components: group-coordinator Affects Versions: 2.4.0 Reporter: Gao Fei I deployed three broker instances and suddenly found that the client was unable to consume data from certain topic partitions. I first tried to log in to the broker corresponding to the group and used the following command to view the consumer group: {code:java} ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --describe --group mygroup{code} and found the following error: {code:java} Error: Executing consumer group command failed due to org.apache.kafka.common.errors.CoodinatorLoadInProgressException: The coodinator is loading and hence can't process requests.{code} I then discovered that the broker may be stuck in a loop, which is constantly in a loading state. At the same time, I found through the top command that the "group-metadata-manager-0" thread was constantly consuming 100% of the CPU resources. This loop could not be broken, resulting in the inability to consume topic partition data on that node. At this point, I suspected that the issue may be related to the __consumer_offsets partition data file loaded by this thread. Finally, after restarting the broker instance, everything was back to normal. It's very strange that if there was an issue with the __consumer_offsets partition data file, the broker should have failed to start. Why was it able to automatically recover after a restart? And why did this continuous loop loading of the __consumer_offsets partition data occur? We encountered this issue in our production environment using Kafka versions 2.2.1 and 2.4.0, and I believe it may also affect other versions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15715) KRaft support in UpdateFeaturesTest
[ https://issues.apache.org/jira/browse/KAFKA-15715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] johndoe reassigned KAFKA-15715: --- Assignee: (was: johndoe) > KRaft support in UpdateFeaturesTest > --- > > Key: KAFKA-15715 > URL: https://issues.apache.org/jira/browse/KAFKA-15715 > Project: Kafka > Issue Type: Task > Components: core >Reporter: Sameer Tejani >Priority: Minor > Labels: kraft, kraft-test, newbie > > The following tests in UpdateFeaturesTest in > core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala need to be > updated to support KRaft > 176 : def testShouldFailRequestIfNotController(): Unit = { > 210 : def testShouldFailRequestWhenDowngradeFlagIsNotSetDuringDowngrade(): > Unit = { > 223 : def > testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted(): Unit = { > 236 : def > testShouldFailRequestInServerWhenDowngradeFlagIsNotSetDuringDeletion(): Unit > = { > 280 : def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = { > 292 : def testShouldFailRequestWhenUpgradingToSameVersionLevel(): Unit = { > 354 : def > testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibilityForExistingFinalizedFeature(): > Unit = { > 368 : def > testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibilityWithNoExistingFinalizedFeature(): > Unit = { > 381 : def testSuccessfulFeatureUpgradeAndWithNoExistingFinalizedFeatures(): > Unit = { > 417 : def testSuccessfulFeatureUpgradeAndDowngrade(): Unit = { > 459 : def testPartialSuccessDuringValidFeatureUpgradeAndInvalidDowngrade(): > Unit = { > 509 : def testPartialSuccessDuringInvalidFeatureUpgradeAndValidDowngrade(): > Unit = { > Scanned 567 lines. Found 0 KRaft tests out of 12 tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Improve logging in AssignmentsManager [kafka]
showuon commented on PR #15522: URL: https://github.com/apache/kafka/pull/15522#issuecomment-2022115723 Will check it this week or next. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16365: AssignmentsManager callback handling issues [kafka]
showuon commented on PR #15521: URL: https://github.com/apache/kafka/pull/15521#issuecomment-2022115867 Will try to review it this week or next week. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()
[ https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831197#comment-17831197 ] Andrew Schofield commented on KAFKA-15558: -- I was referring to the way that there is inconsistent time-out handling for some of the events which are sent from the application thread to the background thread in the new consumer. I know this is being fixed now, so I think there's nothing more to do on this ticket now. > Determine if Timer should be used elsewhere in > PrototypeAsyncConsumer.updateFetchPositions() > > > Key: KAFKA-15558 > URL: https://issues.apache.org/jira/browse/KAFKA-15558 > Project: Kafka > Issue Type: Task > Components: clients, consumer >Reporter: Kirk True >Assignee: Phuc Hong Tran >Priority: Major > Labels: consumer-threading-refactor, fetcher, timeout > Fix For: 3.8.0 > > > This is a followup ticket based on a question from [~junrao] when reviewing > the [fetch request manager pull > request|https://github.com/apache/kafka/pull/14406]: > {quote}It still seems weird that we only use the timer for > {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we > don't have valid fetch positions. For example, if all partitions are in > {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} > will just go in a busy loop, which is not efficient. > {quote} > The goal here is to determine if we should also be propagating the Timer to > the validate positions and reset positions operations. > Note: we should also investigate if the existing {{KafkaConsumer}} > implementation should be fixed, too. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[PR] [MINOR] Renaming the `Abortable_Transaction` error to `Transaction_Abortable` [kafka]
sjhajharia opened a new pull request, #15609: URL: https://github.com/apache/kafka/pull/15609 This is a follow-up to [this](https://github.com/apache/kafka/pull/15486) PR which introduced the new `ABORTABLE_TRANSACTION` error as a part of KIP-890 efforts. However on further discussion, we seem to gain consensus that the error should be rather named as `TRANSACTION_ABORTABLE`. This PR aims to address the same. There are no changes in the code apart from that. ### References JIRA: https://issues.apache.org/jira/browse/KAFKA-16314 Original PR: https://github.com/apache/kafka/pull/15486 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16403) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords
[ https://issues.apache.org/jira/browse/KAFKA-16403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831201#comment-17831201 ] Matthias J. Sax commented on KAFKA-16403: - Sound like an env issue but not a bug to me? I would tend to close as "not a bug"? > Flaky test > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords > - > > Key: KAFKA-16403 > URL: https://issues.apache.org/jira/browse/KAFKA-16403 > Project: Kafka > Issue Type: Bug >Reporter: Igor Soarez >Priority: Major > > {code:java} > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords() > failed, log available in > /home/jenkins/workspace/Kafka_kafka-pr_PR-14903/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords().test.stdout > Gradle Test Run :streams:examples:test > Gradle Test Executor 82 > > WordCountDemoTest > testCountListOfWords() FAILED > org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store KSTREAM-AGGREGATE-STATE-STORE-03 at location > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03 > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > at > org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69) > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > at > org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276) > at > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60) > Caused by: > org.rocksdb.RocksDBException: Corruption: IO error: No such file or > directory: While open a file for random read: > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/10.ldb: > No such file or directory in file > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/MANIFEST-05 > at org.rocksdb.RocksDB.open(Native Method) > at org.rocksdb.RocksDB.open(RocksDB.java:307) > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ... 17 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16404) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831202#comment-17831202 ] Matthias J. Sax commented on KAFKA-16404: - Sounds like an env issue to me. I would propose to close this as "not a bug". > Flaky test > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig > - > > Key: KAFKA-16404 > URL: https://issues.apache.org/jira/browse/KAFKA-16404 > Project: Kafka > Issue Type: Bug >Reporter: Igor Soarez >Priority: Major > > > {code:java} > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig() > failed, log available in > /home/jenkins/workspace/Kafka_kafka-pr_PR-14903@2/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig().test.stdout > Gradle Test Run :streams:examples:test > Gradle Test Executor 87 > > WordCountDemoTest > testGetStreamsConfig() FAILED > org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store KSTREAM-AGGREGATE-STATE-STORE-03 at location > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03 > at > app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > at > app//org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69) > at > app//org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > at > app//org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > at > app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > app//org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > at > app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151) > at > app//org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > at > app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151) > at > app//org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > at > app//org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > at > app//org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > at > app//org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530) > at > app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373) > at > app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300) > at > app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276) > at > app//org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60) > Caused by: > org.rocksdb.RocksDBException: While lock file: > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/LOCK: > Resource temporarily unavailable > at app//org.rocksdb.RocksDB.open(Native Method) > at app//org.rocksdb.RocksDB.open(RocksDB.java:307) > at > app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ... 17 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] [MINOR] Renaming the `Abortable_Transaction` error to `Transaction_Abortable` [kafka]
sjhajharia commented on PR #15609: URL: https://github.com/apache/kafka/pull/15609#issuecomment-2022175562 Requesting review from @jolshan (reviewer of the parent PR) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]
nizhikov commented on PR #15075: URL: https://github.com/apache/kafka/pull/15075#issuecomment-2022177651 @chia7712 Conflicts resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset
[ https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831208#comment-17831208 ] Matthias J. Sax commented on KAFKA-16382: - An application reset, does not touch the output topic. So it's expected that output from the first run of the app is still in the output topic after the reset. {quote}The issue is NOT reproduced if internal cache is disabled. {quote} That's also sounds like behavior as expected. Caching implies that the output might not contain all intermediate results. I think we can close this ticket as "not a bug" ? > Kafka Streams drop NULL values after reset > -- > > Key: KAFKA-16382 > URL: https://issues.apache.org/jira/browse/KAFKA-16382 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Stanislav Spiridonov >Priority: Major > > Kafka Streams (KTable) drops null values after full reset. > See > [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java] > for sample topology > Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics) > # Start example - 1st round > # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull" > # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab" > # Stop application > # Run kafka-streams-application-reset > {code:java} > call bin/windows/kafka-streams-application-reset --application-id > nullproblem-example^ > --input-topics "NULL-IN,NULL-IN-AUX"^ > --bootstrap-server "localhost:9092" > {code} > # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app > running yet) > # Start example - 2nd round > # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab" > # Expected output "A1:anull, A1:ab, A1:" > The issue is NOT reproduced if application just restarted (skip step 5). > The issue is NOT reproduced if internal cache is disabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]
nizhikov commented on code in PR #15075: URL: https://github.com/apache/kafka/pull/15075#discussion_r1540643481 ## server/src/main/java/org/apache/kafka/server/config/ZkConfig.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.zookeeper.client.ZKClientConfig; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class ZkConfig { +/** * Zookeeper Configuration ***/ +public static final String ZK_CONNECT_PROP = "zookeeper.connect"; +public static final String ZK_SESSION_TIMEOUT_MS_PROP = "zookeeper.session.timeout.ms"; +public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = "zookeeper.connection.timeout.ms"; +public static final String ZK_ENABLE_SECURE_ACLS_PROP = "zookeeper.set.acl"; +public static final String ZK_MAX_IN_FLIGHT_REQUESTS_PROP = "zookeeper.max.in.flight.requests"; +public static final String ZK_SSL_CLIENT_ENABLE_PROP = "zookeeper.ssl.client.enable"; +public static final String ZK_CLIENT_CNXN_SOCKET_PROP = "zookeeper.clientCnxnSocket"; +public static final String ZK_SSL_KEY_STORE_LOCATION_PROP = "zookeeper.ssl.keystore.location"; +public static final String ZK_SSL_KEY_STORE_PASSWORD_PROP = "zookeeper.ssl.keystore.password"; +public static final String ZK_SSL_KEY_STORE_TYPE_PROP = "zookeeper.ssl.keystore.type"; +public static final String ZK_SSL_TRUST_STORE_LOCATION_PROP = "zookeeper.ssl.truststore.location"; +public static final String ZK_SSL_TRUST_STORE_PASSWORD_PROP = "zookeeper.ssl.truststore.password"; +public static final String ZK_SSL_TRUST_STORE_TYPE_PROP = "zookeeper.ssl.truststore.type"; +public static final String ZK_SSL_PROTOCOL_PROP = "zookeeper.ssl.protocol"; +public static final String ZK_SSL_ENABLED_PROTOCOLS_PROP = "zookeeper.ssl.enabled.protocols"; +public static final String ZK_SSL_CIPHER_SUITES_PROP = "zookeeper.ssl.cipher.suites"; +public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP = "zookeeper.ssl.endpoint.identification.algorithm"; +public static final String ZK_SSL_CRL_ENABLE_PROP = "zookeeper.ssl.crl.enable"; +public static final String ZK_SSL_OCSP_ENABLE_PROP = "zookeeper.ssl.ocsp.enable"; + +// a map from the Kafka config to the corresponding ZooKeeper Java system property +public static final Map ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP; + +public static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper connection string in the form hostname:port where host and port are the " + +"host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is " + +"down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.\n" + +"The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. " + +"For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path."; +public static final String ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session timeout"; +public static final String ZK_CONNECTION_TIMEOUT_MS_DOC = "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in " + ZK_SESSION_TIMEOUT_MS_PROP + " is used"; +public static final String ZK_ENABLE_SECURE_ACLS_DOC = "Set client to use secure ACLs"; +public static final String ZK_MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking."; +public static final String ZK_SSL_CLIENT_ENABLE_DOC; +public static final String ZK_CLIENT_CNXN_SOCKET_DOC; +public static final String ZK_SSL_KEY_STORE_LOCATION_DOC; +public static final String ZK_SSL_KEY_STORE_PASSWORD_DOC; +public static final String ZK_SSL_KEY_STORE_TYPE_DOC; +public static final String ZK_SSL_TRUST_STORE_LOCATION_D
Re: [PR] MINOR: Add retry mechanism to EOS example [kafka]
showuon merged PR #15561: URL: https://github.com/apache/kafka/pull/15561 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]
chia7712 commented on code in PR #15075: URL: https://github.com/apache/kafka/pull/15075#discussion_r1540685634 ## server/src/main/java/org/apache/kafka/server/config/ZkConfig.java: ## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.config; + +import org.apache.zookeeper.client.ZKClientConfig; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class ZkConfig { +/** * Zookeeper Configuration ***/ +public static final String ZK_CONNECT_PROP = "zookeeper.connect"; +public static final String ZK_SESSION_TIMEOUT_MS_PROP = "zookeeper.session.timeout.ms"; +public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = "zookeeper.connection.timeout.ms"; +public static final String ZK_ENABLE_SECURE_ACLS_PROP = "zookeeper.set.acl"; +public static final String ZK_MAX_IN_FLIGHT_REQUESTS_PROP = "zookeeper.max.in.flight.requests"; +public static final String ZK_SSL_CLIENT_ENABLE_PROP = "zookeeper.ssl.client.enable"; +public static final String ZK_CLIENT_CNXN_SOCKET_PROP = "zookeeper.clientCnxnSocket"; +public static final String ZK_SSL_KEY_STORE_LOCATION_PROP = "zookeeper.ssl.keystore.location"; +public static final String ZK_SSL_KEY_STORE_PASSWORD_PROP = "zookeeper.ssl.keystore.password"; +public static final String ZK_SSL_KEY_STORE_TYPE_PROP = "zookeeper.ssl.keystore.type"; +public static final String ZK_SSL_TRUST_STORE_LOCATION_PROP = "zookeeper.ssl.truststore.location"; +public static final String ZK_SSL_TRUST_STORE_PASSWORD_PROP = "zookeeper.ssl.truststore.password"; +public static final String ZK_SSL_TRUST_STORE_TYPE_PROP = "zookeeper.ssl.truststore.type"; +public static final String ZK_SSL_PROTOCOL_PROP = "zookeeper.ssl.protocol"; +public static final String ZK_SSL_ENABLED_PROTOCOLS_PROP = "zookeeper.ssl.enabled.protocols"; +public static final String ZK_SSL_CIPHER_SUITES_PROP = "zookeeper.ssl.cipher.suites"; +public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP = "zookeeper.ssl.endpoint.identification.algorithm"; +public static final String ZK_SSL_CRL_ENABLE_PROP = "zookeeper.ssl.crl.enable"; +public static final String ZK_SSL_OCSP_ENABLE_PROP = "zookeeper.ssl.ocsp.enable"; + +// a map from the Kafka config to the corresponding ZooKeeper Java system property +public static final Map ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP; + +public static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper connection string in the form hostname:port where host and port are the " + +"host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is " + +"down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.\n" + +"The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. " + +"For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path."; +public static final String ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session timeout"; +public static final String ZK_CONNECTION_TIMEOUT_MS_DOC = "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in " + ZK_SESSION_TIMEOUT_MS_PROP + " is used"; +public static final String ZK_ENABLE_SECURE_ACLS_DOC = "Set client to use secure ACLs"; +public static final String ZK_MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking."; +public static final String ZK_SSL_CLIENT_ENABLE_DOC; +public static final String ZK_CLIENT_CNXN_SOCKET_DOC; +public static final String ZK_SSL_KEY_STORE_LOCATION_DOC; +public static final String ZK_SSL_KEY_STORE_PASSWORD_DOC; +public static final String ZK_SSL_KEY_STORE_TYPE_DOC; +public static final String ZK_SSL_TRUST_STORE_LOCATION_D
[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset
[ https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831228#comment-17831228 ] Stanislav Spiridonov commented on KAFKA-16382: -- Wait, why is it not a bug? I have the real scenario. Good scenario: # Some events happened. Output topic contains mapped result. # Event ends ({_}*null*{_} body). Output topic contains delete message for the event. Wrong scenario: # Some events happened. Output topic contains mapped result. # We stopped Kafka, Make some updates + {*}full rest{*}. # Meanwhile event ends ({*}_null_{*} body). # We start Kafka, it process the input topic from scratch but "optimise" internally nulls. The output topic *still* contains mapped result. The delete message *never* reach the output topic. As work around we can clear the output topic, But in out system the output topic is input for another Kafka Stream application so we need to reset all subsequent Kafka Stream applications to correct this behaviour. Another workaround is on each null body generate another synthetic message with another key and some value in the body (it will not optimised) and check for such messages on write to output topic and generate back the delete message, but it is also looks as a hack. > Kafka Streams drop NULL values after reset > -- > > Key: KAFKA-16382 > URL: https://issues.apache.org/jira/browse/KAFKA-16382 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Stanislav Spiridonov >Priority: Major > > Kafka Streams (KTable) drops null values after full reset. > See > [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java] > for sample topology > Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics) > # Start example - 1st round > # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull" > # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab" > # Stop application > # Run kafka-streams-application-reset > {code:java} > call bin/windows/kafka-streams-application-reset --application-id > nullproblem-example^ > --input-topics "NULL-IN,NULL-IN-AUX"^ > --bootstrap-server "localhost:9092" > {code} > # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app > running yet) > # Start example - 2nd round > # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab" > # Expected output "A1:anull, A1:ab, A1:" > The issue is NOT reproduced if application just restarted (skip step 5). > The issue is NOT reproduced if internal cache is disabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Flaky ProducerIdManagerTest error injection fix [kafka]
urbandan commented on code in PR #15605: URL: https://github.com/apache/kafka/pull/15605#discussion_r1540691777 ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,19 +38,57 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + case class ErrorCount(error: Errors, var repeat: Int) + + object ErrorCount { +val INDEFINITE: Int = -1 + +def indefinitely(error: Errors): ErrorCount = { + ErrorCount(error, INDEFINITE) +} + } + + class ErrorQueue(initialErrorCounts: ErrorCount*) { +private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ initialErrorCounts + +def takeError(): Errors = queue.synchronized { + while (queue.head.repeat == 0) { +queue.dequeue() + } + if (queue.head.repeat > 0) { +queue.head.repeat -= 1 + } + queue.head.error +} + +def peekError(): Errors = queue.synchronized { + queue.head.error +} + +def clearProcessedError(): Unit = { + TestUtils.waitUntilTrue(() => +queue.synchronized { + queue.head.repeat == 0 +}, "error wasn't processed") + queue.synchronized { +queue.dequeue() Review Comment: shouldn't we clear the whole queue here? ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,19 +38,57 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + case class ErrorCount(error: Errors, var repeat: Int) + + object ErrorCount { +val INDEFINITE: Int = -1 + +def indefinitely(error: Errors): ErrorCount = { + ErrorCount(error, INDEFINITE) +} + } + + class ErrorQueue(initialErrorCounts: ErrorCount*) { +private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ initialErrorCounts + +def takeError(): Errors = queue.synchronized { + while (queue.head.repeat == 0) { +queue.dequeue() + } + if (queue.head.repeat > 0) { +queue.head.repeat -= 1 + } + queue.head.error +} + +def peekError(): Errors = queue.synchronized { + queue.head.error +} + +def clearProcessedError(): Unit = { + TestUtils.waitUntilTrue(() => +queue.synchronized { + queue.head.repeat == 0 Review Comment: how is this going to behave if the background thread keeps calling takeError, potentially removing all errors? shouldn't the condition check for an empty queue? also, what if the only element in the queue has INDEFINITE? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16148: Implement GroupMetadataManager#onUnloaded [kafka]
dajac commented on PR #15446: URL: https://github.com/apache/kafka/pull/15446#issuecomment-2022239076 @jeffkbkim There are failed tests that look related. Could you check them please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset
[ https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831230#comment-17831230 ] Stanislav Spiridonov commented on KAFKA-16382: -- BTW if you take the wrong scenario from previous comment but instead of *delete* message the input topic will contains an *update* message for the event, all will works correct - the output topic will contain the updated version of the event. So, as you can see the {*}update{*}, as well as *add* will work correctly in case of full reset and the only *delete* are broken. > Kafka Streams drop NULL values after reset > -- > > Key: KAFKA-16382 > URL: https://issues.apache.org/jira/browse/KAFKA-16382 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Stanislav Spiridonov >Priority: Major > > Kafka Streams (KTable) drops null values after full reset. > See > [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java] > for sample topology > Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics) > # Start example - 1st round > # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull" > # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab" > # Stop application > # Run kafka-streams-application-reset > {code:java} > call bin/windows/kafka-streams-application-reset --application-id > nullproblem-example^ > --input-topics "NULL-IN,NULL-IN-AUX"^ > --bootstrap-server "localhost:9092" > {code} > # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app > running yet) > # Start example - 2nd round > # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab" > # Expected output "A1:anull, A1:ab, A1:" > The issue is NOT reproduced if application just restarted (skip step 5). > The issue is NOT reproduced if internal cache is disabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16382) Kafka Streams drop NULL values after reset
[ https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831228#comment-17831228 ] Stanislav Spiridonov edited comment on KAFKA-16382 at 3/27/24 8:55 AM: --- Wait, why is it not a bug? I have the real scenario. Good scenario: # Some events happened. Output topic contains mapped result. # Event ends ({_}*null*{_} body). Output topic contains delete message for the event. Wrong scenario: # Some events happened. Output topic contains mapped result. # We stopped Kafka, Make some updates + {*}full rest{*}. # Meanwhile event ends ({*}_null_{*} body). # We start Kafka, it process the input topic from scratch but "optimise" internally nulls. The output topic *still* contains mapped result. The delete message *never* reach the output topic. As work around we can clear the output topic before Kafka start. But in out system the output topic is input for another Kafka Stream application so we need to reset all subsequent Kafka Stream applications to correct this behaviour. Another workaround is on each null body generate another synthetic message with another key and some value in the body (it will not optimised) and check for such messages on write to output topic and generate back the delete message, but it is also looks as a hack. was (Author: foal): Wait, why is it not a bug? I have the real scenario. Good scenario: # Some events happened. Output topic contains mapped result. # Event ends ({_}*null*{_} body). Output topic contains delete message for the event. Wrong scenario: # Some events happened. Output topic contains mapped result. # We stopped Kafka, Make some updates + {*}full rest{*}. # Meanwhile event ends ({*}_null_{*} body). # We start Kafka, it process the input topic from scratch but "optimise" internally nulls. The output topic *still* contains mapped result. The delete message *never* reach the output topic. As work around we can clear the output topic, But in out system the output topic is input for another Kafka Stream application so we need to reset all subsequent Kafka Stream applications to correct this behaviour. Another workaround is on each null body generate another synthetic message with another key and some value in the body (it will not optimised) and check for such messages on write to output topic and generate back the delete message, but it is also looks as a hack. > Kafka Streams drop NULL values after reset > -- > > Key: KAFKA-16382 > URL: https://issues.apache.org/jira/browse/KAFKA-16382 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Stanislav Spiridonov >Priority: Major > > Kafka Streams (KTable) drops null values after full reset. > See > [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java] > for sample topology > Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics) > # Start example - 1st round > # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull" > # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab" > # Stop application > # Run kafka-streams-application-reset > {code:java} > call bin/windows/kafka-streams-application-reset --application-id > nullproblem-example^ > --input-topics "NULL-IN,NULL-IN-AUX"^ > --bootstrap-server "localhost:9092" > {code} > # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app > running yet) > # Start example - 2nd round > # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab" > # Expected output "A1:anull, A1:ab, A1:" > The issue is NOT reproduced if application just restarted (skip step 5). > The issue is NOT reproduced if internal cache is disabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16382) Kafka Streams drop NULL values after reset
[ https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831228#comment-17831228 ] Stanislav Spiridonov edited comment on KAFKA-16382 at 3/27/24 8:57 AM: --- Wait, why is it not a bug? I have the real scenario. Good scenario: # Some events happened. Output topic contains mapped result. # Event ends ({_}*null*{_} body). Output topic contains delete message for the event. Wrong scenario: # Some events happened. Output topic contains mapped result. # We stopped Kafka, Make some updates + {*}full rest{*}. # Meanwhile event ends ({*}_null_{*} body). # We start Kafka, it process the input topic from scratch but "optimise" internally nulls. The output topic *still* contains mapped result. The delete message *never* reach the output topic. As work around we can clear the output topic before Kafka start. But in out system the output topic is input for another Kafka Stream application so we need to reset all subsequent Kafka Stream applications to correct this behaviour. Another workaround is on each delete message (null body) generate synthetic message with synthetic key and some value in the body (it will not optimised) and check for such messages on write to output topic and generate back the delete message to the output, but it is also looks as a hack. was (Author: foal): Wait, why is it not a bug? I have the real scenario. Good scenario: # Some events happened. Output topic contains mapped result. # Event ends ({_}*null*{_} body). Output topic contains delete message for the event. Wrong scenario: # Some events happened. Output topic contains mapped result. # We stopped Kafka, Make some updates + {*}full rest{*}. # Meanwhile event ends ({*}_null_{*} body). # We start Kafka, it process the input topic from scratch but "optimise" internally nulls. The output topic *still* contains mapped result. The delete message *never* reach the output topic. As work around we can clear the output topic before Kafka start. But in out system the output topic is input for another Kafka Stream application so we need to reset all subsequent Kafka Stream applications to correct this behaviour. Another workaround is on each null body generate another synthetic message with another key and some value in the body (it will not optimised) and check for such messages on write to output topic and generate back the delete message, but it is also looks as a hack. > Kafka Streams drop NULL values after reset > -- > > Key: KAFKA-16382 > URL: https://issues.apache.org/jira/browse/KAFKA-16382 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.6.1 >Reporter: Stanislav Spiridonov >Priority: Major > > Kafka Streams (KTable) drops null values after full reset. > See > [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java] > for sample topology > Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics) > # Start example - 1st round > # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull" > # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab" > # Stop application > # Run kafka-streams-application-reset > {code:java} > call bin/windows/kafka-streams-application-reset --application-id > nullproblem-example^ > --input-topics "NULL-IN,NULL-IN-AUX"^ > --bootstrap-server "localhost:9092" > {code} > # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app > running yet) > # Start example - 2nd round > # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab" > # Expected output "A1:anull, A1:ab, A1:" > The issue is NOT reproduced if application just restarted (skip step 5). > The issue is NOT reproduced if internal cache is disabled. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16353: Offline protocol migration integration tests [kafka]
dajac merged PR #15492: URL: https://github.com/apache/kafka/pull/15492 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-16353) Offline protocol migration integration tests
[ https://issues.apache.org/jira/browse/KAFKA-16353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-16353. - Resolution: Fixed > Offline protocol migration integration tests > > > Key: KAFKA-16353 > URL: https://issues.apache.org/jira/browse/KAFKA-16353 > Project: Kafka > Issue Type: Sub-task >Reporter: Dongnuo Lyu >Assignee: Dongnuo Lyu >Priority: Major > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16403) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords
[ https://issues.apache.org/jira/browse/KAFKA-16403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16403. - Resolution: Not A Bug > Flaky test > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords > - > > Key: KAFKA-16403 > URL: https://issues.apache.org/jira/browse/KAFKA-16403 > Project: Kafka > Issue Type: Bug >Reporter: Igor Soarez >Priority: Major > > {code:java} > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords() > failed, log available in > /home/jenkins/workspace/Kafka_kafka-pr_PR-14903/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords().test.stdout > Gradle Test Run :streams:examples:test > Gradle Test Executor 82 > > WordCountDemoTest > testCountListOfWords() FAILED > org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store KSTREAM-AGGREGATE-STATE-STORE-03 at location > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03 > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > at > org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69) > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > at > org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276) > at > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60) > Caused by: > org.rocksdb.RocksDBException: Corruption: IO error: No such file or > directory: While open a file for random read: > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/10.ldb: > No such file or directory in file > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/MANIFEST-05 > at org.rocksdb.RocksDB.open(Native Method) > at org.rocksdb.RocksDB.open(RocksDB.java:307) > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ... 17 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16403) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords
[ https://issues.apache.org/jira/browse/KAFKA-16403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831244#comment-17831244 ] Igor Soarez commented on KAFKA-16403: - The test only failed the one time. I agree it was likely a problem in the execution environment. Closing. > Flaky test > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords > - > > Key: KAFKA-16403 > URL: https://issues.apache.org/jira/browse/KAFKA-16403 > Project: Kafka > Issue Type: Bug >Reporter: Igor Soarez >Priority: Major > > {code:java} > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords() > failed, log available in > /home/jenkins/workspace/Kafka_kafka-pr_PR-14903/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords().test.stdout > Gradle Test Run :streams:examples:test > Gradle Test Executor 82 > > WordCountDemoTest > testCountListOfWords() FAILED > org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store KSTREAM-AGGREGATE-STATE-STORE-03 at location > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03 > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > at > org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69) > at > org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > at > org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > at > org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151) > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151) > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > at > org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > at > org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300) > at > org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276) > at > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60) > Caused by: > org.rocksdb.RocksDBException: Corruption: IO error: No such file or > directory: While open a file for random read: > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/10.ldb: > No such file or directory in file > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/MANIFEST-05 > at org.rocksdb.RocksDB.open(Native Method) > at org.rocksdb.RocksDB.open(RocksDB.java:307) > at > org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ... 17 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16404) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig
[ https://issues.apache.org/jira/browse/KAFKA-16404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez resolved KAFKA-16404. - Resolution: Not A Bug Same as KAFKA-16403, this only failed once. It was likely the result of a testing infrastructure problem. We can always re-open if we see this again and suspect otherwise. > Flaky test > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig > - > > Key: KAFKA-16404 > URL: https://issues.apache.org/jira/browse/KAFKA-16404 > Project: Kafka > Issue Type: Bug >Reporter: Igor Soarez >Priority: Major > > > {code:java} > org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig() > failed, log available in > /home/jenkins/workspace/Kafka_kafka-pr_PR-14903@2/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig().test.stdout > Gradle Test Run :streams:examples:test > Gradle Test Executor 87 > > WordCountDemoTest > testGetStreamsConfig() FAILED > org.apache.kafka.streams.errors.ProcessorStateException: Error opening > store KSTREAM-AGGREGATE-STATE-STORE-03 at location > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03 > at > app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329) > at > app//org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69) > at > app//org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254) > at > app//org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175) > at > app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > app//org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56) > at > app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71) > at > app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151) > at > app//org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872) > at > app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151) > at > app//org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232) > at > app//org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102) > at > app//org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258) > at > app//org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530) > at > app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373) > at > app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300) > at > app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276) > at > app//org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60) > Caused by: > org.rocksdb.RocksDBException: While lock file: > /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/LOCK: > Resource temporarily unavailable > at app//org.rocksdb.RocksDB.open(Native Method) > at app//org.rocksdb.RocksDB.open(RocksDB.java:307) > at > app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323) > ... 17 more > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16297) Race condition while promoting future replica can lead to partition unavailability.
[ https://issues.apache.org/jira/browse/KAFKA-16297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-16297: Fix Version/s: 3.7.1 > Race condition while promoting future replica can lead to partition > unavailability. > --- > > Key: KAFKA-16297 > URL: https://issues.apache.org/jira/browse/KAFKA-16297 > Project: Kafka > Issue Type: Sub-task >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > Fix For: 3.7.1 > > > KIP-858 proposed that when a directory failure occurs after changing the > assignment of a replica that's moved between two directories in the same > broker, but before the future replica promotion completes, the broker should > reassign the replica to inform the controller of its correct status. But this > hasn't yet been implemented, and without it this failure may lead to > indefinite partition unavailability. > Example scenario: > # A broker which leads partition P receives a request to alter the replica > from directory A to directory B. > # The broker creates a future replica in directory B and starts a replica > fetcher. > # Once the future replica first catches up, the broker queues a reassignment > to inform the controller of the directory change. > # The next time the replica catches up, the broker briefly blocks appends > and promotes the replica. However, before the promotion is attempted, > directory A fails. > # The controller was informed that P in now in directory B before it > received the notification that directory A has failed, so it does not elect a > new leader, and as long as the broker is online, partition A remains > unavailable. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16297) Race condition while promoting future replica can lead to partition unavailability.
[ https://issues.apache.org/jira/browse/KAFKA-16297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-16297: Affects Version/s: 3.7.0 > Race condition while promoting future replica can lead to partition > unavailability. > --- > > Key: KAFKA-16297 > URL: https://issues.apache.org/jira/browse/KAFKA-16297 > Project: Kafka > Issue Type: Sub-task > Components: jbod >Affects Versions: 3.7.0 >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > Fix For: 3.7.1 > > > KIP-858 proposed that when a directory failure occurs after changing the > assignment of a replica that's moved between two directories in the same > broker, but before the future replica promotion completes, the broker should > reassign the replica to inform the controller of its correct status. But this > hasn't yet been implemented, and without it this failure may lead to > indefinite partition unavailability. > Example scenario: > # A broker which leads partition P receives a request to alter the replica > from directory A to directory B. > # The broker creates a future replica in directory B and starts a replica > fetcher. > # Once the future replica first catches up, the broker queues a reassignment > to inform the controller of the directory change. > # The next time the replica catches up, the broker briefly blocks appends > and promotes the replica. However, before the promotion is attempted, > directory A fails. > # The controller was informed that P in now in directory B before it > received the notification that directory A has failed, so it does not elect a > new leader, and as long as the broker is online, partition A remains > unavailable. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16297) Race condition while promoting future replica can lead to partition unavailability.
[ https://issues.apache.org/jira/browse/KAFKA-16297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-16297: Component/s: jbod > Race condition while promoting future replica can lead to partition > unavailability. > --- > > Key: KAFKA-16297 > URL: https://issues.apache.org/jira/browse/KAFKA-16297 > Project: Kafka > Issue Type: Sub-task > Components: jbod >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Major > Fix For: 3.7.1 > > > KIP-858 proposed that when a directory failure occurs after changing the > assignment of a replica that's moved between two directories in the same > broker, but before the future replica promotion completes, the broker should > reassign the replica to inform the controller of its correct status. But this > hasn't yet been implemented, and without it this failure may lead to > indefinite partition unavailability. > Example scenario: > # A broker which leads partition P receives a request to alter the replica > from directory A to directory B. > # The broker creates a future replica in directory B and starts a replica > fetcher. > # Once the future replica first catches up, the broker queues a reassignment > to inform the controller of the directory change. > # The next time the replica catches up, the broker briefly blocks appends > and promotes the replica. However, before the promotion is attempted, > directory A fails. > # The controller was informed that P in now in directory B before it > received the notification that directory A has failed, so it does not elect a > new leader, and as long as the broker is online, partition A remains > unavailable. > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16365) AssignmentsManager mismanages completion notifications
[ https://issues.apache.org/jira/browse/KAFKA-16365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Igor Soarez updated KAFKA-16365: Priority: Critical (was: Major) > AssignmentsManager mismanages completion notifications > -- > > Key: KAFKA-16365 > URL: https://issues.apache.org/jira/browse/KAFKA-16365 > Project: Kafka > Issue Type: Sub-task > Components: jbod >Affects Versions: 3.7.0 >Reporter: Igor Soarez >Assignee: Igor Soarez >Priority: Critical > Fix For: 3.7.1 > > > When moving replicas between directories in the same broker, future replica > promotion hinges on acknowledgment from the controller of a change in the > directory assignment. > > ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion > notification of the directory assignment change. > > In its current form, under certain assignment scheduling, AssignmentsManager > both miss completion notifications, or prematurely trigger them. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Flaky ProducerIdManagerTest error injection fix [kafka]
akatona84 commented on code in PR #15605: URL: https://github.com/apache/kafka/pull/15605#discussion_r1540748533 ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,19 +38,57 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + case class ErrorCount(error: Errors, var repeat: Int) + + object ErrorCount { +val INDEFINITE: Int = -1 + +def indefinitely(error: Errors): ErrorCount = { + ErrorCount(error, INDEFINITE) +} + } + + class ErrorQueue(initialErrorCounts: ErrorCount*) { +private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ initialErrorCounts + +def takeError(): Errors = queue.synchronized { + while (queue.head.repeat == 0) { +queue.dequeue() + } + if (queue.head.repeat > 0) { +queue.head.repeat -= 1 + } + queue.head.error +} + +def peekError(): Errors = queue.synchronized { + queue.head.error +} + +def clearProcessedError(): Unit = { + TestUtils.waitUntilTrue(() => +queue.synchronized { + queue.head.repeat == 0 Review Comment: I'm removing the INDEFINITE , empty queue will mean there's no error (Errors.NONE is returned) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Preventing running the :core tests twice when testing with coverage [kafka]
viktorsomogyi merged PR #15580: URL: https://github.com/apache/kafka/pull/15580 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Flaky ProducerIdManagerTest error injection fix [kafka]
viktorsomogyi commented on PR #15605: URL: https://github.com/apache/kafka/pull/15605#issuecomment-2022430965 @akatona84 I think this one exceeds the "minor" commit, I think you should create a jira ticket for this. Also, please add a description of your change that reveals the thinking behind your modifications. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-16103) Review client logic for triggering offset commit callbacks
[ https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lucas Brutschy reassigned KAFKA-16103: -- Assignee: Lucas Brutschy (was: Lianet Magrans) > Review client logic for triggering offset commit callbacks > -- > > Key: KAFKA-16103 > URL: https://issues.apache.org/jira/browse/KAFKA-16103 > Project: Kafka > Issue Type: Sub-task > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: Lucas Brutschy >Priority: Critical > Labels: kip-848-client-support, offset > Fix For: 3.8.0 > > > Review logic for triggering commit callbacks, ensuring that all callbacks are > triggered before returning from commitSync -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: Flaky ProducerIdManagerTest error injection fix [kafka]
soarez commented on PR #15605: URL: https://github.com/apache/kafka/pull/15605#issuecomment-2022452207 I think there is a JIRA already for this flaky test: [KAFKA-15915](https://issues.apache.org/jira/browse/KAFKA-15915) If this is correct, then you should update the PR title to refer to the JIRA. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16323) Failing test: fix testRemoteFetchExpiresPerSecMetric
[ https://issues.apache.org/jira/browse/KAFKA-16323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831291#comment-17831291 ] Chia-Ping Tsai commented on KAFKA-16323: It seems to me the root cause could be the `spy` does not work well. We had met similar issue before - the spied method results in unexpected behavior when we test it in multi-threads. see https://github.com/apache/kafka/pull/10006 Hence, we can try to fix it by using `override` to replace spied method. for example: {code} val latch = new CountDownLatch(1) val remoteLogManager = new RemoteLogManager( remoteLogManagerConfig, 0, TestUtils.tempRelativeDir("data").getAbsolutePath, "clusterId", time, _ => Optional.of(dummyLog), (TopicPartition, Long) => {}, brokerTopicStats) { override def read(remoteStorageFetchInfo: RemoteStorageFetchInfo): FetchDataInfo = { // wait until verification completes latch.await(5000, TimeUnit.MILLISECONDS) mock(classOf[FetchDataInfo]) } } {code} > Failing test: fix testRemoteFetchExpiresPerSecMetric > - > > Key: KAFKA-16323 > URL: https://issues.apache.org/jira/browse/KAFKA-16323 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Johnny Hsu >Assignee: Johnny Hsu >Priority: Major > Labels: test-failure > > Refer to > [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2685/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_21_and_Scala_2_13___testRemoteFetchExpiresPerSecMetric__/] > This test is failing, and this ticket aims to address this -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]
mimaison commented on code in PR #15558: URL: https://github.com/apache/kafka/pull/15558#discussion_r1540866456 ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ## @@ -320,4 +322,19 @@ static void createCompactedTopic(String topicName, short partitions, short repli static void createSinglePartitionCompactedTopic(String topicName, short replicationFactor, Admin admin) { createCompactedTopic(topicName, (short) 1, replicationFactor, admin); } + +static T adminCall(Callable callable, Supplier errMsg) +throws ExecutionException, InterruptedException { +try { +return callable.call(); +} catch (ExecutionException | InterruptedException e) { +if (e.getCause() instanceof TopicAuthorizationException || Review Comment: I think we also need to handle `GroupAuthorizationException` as this can be thrown by `listConsumerGroupOffsets()` ## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java: ## @@ -216,7 +217,10 @@ Set findConsumerGroups() Collection listConsumerGroups() throws InterruptedException, ExecutionException { -return sourceAdminClient.listConsumerGroups().valid().get(); +return adminCall( +() -> sourceAdminClient.listConsumerGroups().valid().get(), +() -> "list consumer groups on cluster " + config.sourceClusterAlias() Review Comment: Should we put the alias before `cluster`? so we get a message like `list consumer groups on source cluster` for example. I think it reads better than `list consumer groups on cluster source` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lucasbru commented on code in PR #15525: URL: https://github.com/apache/kafka/pull/15525#discussion_r1540879732 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java: ## @@ -25,22 +25,15 @@ import java.util.Map; /** - * Event for retrieving partition offsets by performing a + * Application Event for retrieving partition offsets by performing a * {@link org.apache.kafka.common.requests.ListOffsetsRequest ListOffsetsRequest}. - * This event is created with a map of {@link TopicPartition} and target timestamps to search - * offsets for. It is completed with the map of {@link TopicPartition} and - * {@link OffsetAndTimestamp} found (offset of the first message whose timestamp is greater than - * or equals to the target timestamp) */ -public class ListOffsetsEvent extends CompletableApplicationEvent> { - +public class ListOffsetsEvent extends CompletableApplicationEvent> { Review Comment: I'm personally not concerned about having two events, because they are very simple. The alternative is to have a common code-path that carries a `requiresTimestamp` boolean to differentiate behavior again, which isn't really any simpler. But I agree there is a certain amount of code duplication here that we could eliminate using your approach @lianetm , so I'm not against it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
lucasbru commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2022475971 > @lucasbru - Thanks again for reviewing the PR. Sorry about the misinterpretation on short circuting logic so here I updated the beginningOrEndOffsets API. It seems like the right thing to do here is to still send out the request but return it immediately for zero timeout (a bit strange because it does throw timeout when time runs out which seems inconsistent). Yes, the behavior of the existing consumer is a bit curious, but it's not the only place where a zero duration is treated different from 0.01s. Either way, we probably have to do it this way for compatibility. This part looks good to me now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16272: Update connect_distributed_test.py to support KIP-848’s group protocol config [kafka]
lucasbru merged PR #15576: URL: https://github.com/apache/kafka/pull/15576 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
mimaison commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1540885105 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +public class KafkaConfig { + +/** * Log Configuration ***/ +public final static String NUM_PARTITIONS_PROP = "num.partitions"; Review Comment: Would it also make sense to colocate the `CONFIG` and `DOC` like we do in the other Config classes? ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -49,6 +49,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.server.common.AdminOperationException import org.apache.kafka.server.config.{ConfigEntityName, ConfigType} +import org.apache.kafka.server.config.KafkaConfig._ Review Comment: I wonder if we should just import `KafkaConfig` and use `KAFKA.SOME_CONFIG` instead of importing all/statically KafkaConfig. WDYT? ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +public class KafkaConfig { + +/** * Log Configuration ***/ +public final static String NUM_PARTITIONS_PROP = "num.partitions"; Review Comment: For consistency with the other *Configs classes, should we used the `CONFIG` suffix instead of `PROP`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [MINOR] Renaming the `Abortable_Transaction` error to `Transaction_Abortable` [kafka]
sjhajharia commented on PR #15609: URL: https://github.com/apache/kafka/pull/15609#issuecomment-2022517844 Thanks for the review @soarez I have updated the variable names in the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1540968019 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +public class KafkaConfig { + +/** * Log Configuration ***/ +public final static String NUM_PARTITIONS_PROP = "num.partitions"; Review Comment: > For consistency with the other *Configs classes, should we used the CONFIG suffix instead of PROP? I thought about it but I wan't sure which would make it easier for people to review. We also might need to do the same for LogCleaner configs as well. > Would it also make sense to colocate the CONFIG and DOC like we do in the other Config classes? I considered this but I found it tricky for some docs that refer to more than 1 config in the string interpolation. For example the `LOG_DIR_DOC` and `LOG_DIRS_DOC` refer to both `LOG_DIR_PROP` and `LOG_DIRS_PROP` same goes to `LOG_ROLL_TIME_HOURS_DOC` and `LOG_ROLL_TIME_MILLIS_DOC`. I thought this why they are grouped at the end after defining the properties names first. ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +public class KafkaConfig { + +/** * Log Configuration ***/ +public final static String NUM_PARTITIONS_PROP = "num.partitions"; Review Comment: > For consistency with the other *Configs classes, should we used the CONFIG suffix instead of PROP? I thought about it but I wan't sure which would make it easier for people to review. We also might need to do the same for LogCleaner configs as well. > Would it also make sense to colocate the CONFIG and DOC like we do in the other Config classes? I considered this but I found it tricky for some docs that refer to more than 1 config in the string interpolation. For example the `LOG_DIR_DOC` and `LOG_DIRS_DOC` refer to both `LOG_DIR_PROP` and `LOG_DIRS_PROP` same goes to `LOG_ROLL_TIME_HOURS_DOC` and `LOG_ROLL_TIME_MILLIS_DOC`. I thought this why they are grouped at the end after defining the properties names first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1540992038 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +public class KafkaConfig { + +/** * Log Configuration ***/ +public final static String NUM_PARTITIONS_PROP = "num.partitions"; Review Comment: I just pushed a commit to replace `PROP` with `CONFIG` suffix. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]
jeqo commented on PR #15588: URL: https://github.com/apache/kafka/pull/15588#issuecomment-2022623732 Sorry for also being later with the review. Changes look good to me overall, though I'd also add a mention on these configs that: `log.roll` configs are ignored (as well as the mentioned `segment.bytes|ms`). We could address this as part of KAFKA-16429 if you agree. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1540968019 ## server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java: ## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.server.config; + +import org.apache.kafka.common.config.TopicConfig; +import static org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX; + +public class KafkaConfig { + +/** * Log Configuration ***/ +public final static String NUM_PARTITIONS_PROP = "num.partitions"; Review Comment: > For consistency with the other *Configs classes, should we used the CONFIG suffix instead of PROP? I thought about it but I wan't sure which would make it easier for people to review. We also might need to do the same for LogCleaner configs as well. > Would it also make sense to colocate the CONFIG and DOC like we do in the other Config classes? I considered this but I found it tricky for some docs that refer to more than 1 config in the string interpolation. For example the `LOG_DIR_DOC` and `LOG_DIRS_DOC` refer each others prop e.g LOG_DIRS_DOC refer to `LOG_DIR_PROP` and LOG_DIR_DOC refers to `LOG_DIRS_PROP` same goes to `LOG_ROLL_TIME_HOURS_DOC` and `LOG_ROLL_TIME_MILLIS_DOC`. I thought this why they are grouped at the end after defining the properties names first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig Replication properties and docs out of … [kafka]
nizhikov commented on PR #15575: URL: https://github.com/apache/kafka/pull/15575#issuecomment-2022624611 @OmniaGM Can you, please, merge with latest trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]
brandboat commented on PR #15588: URL: https://github.com/apache/kafka/pull/15588#issuecomment-2022630053 Thank you @showuon , @jeqo . I will address the comments in the new JIRA https://issues.apache.org/jira/browse/KAFKA-16429 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes
[ https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831324#comment-17831324 ] Jorge Esteban Quilcate Otoya commented on KAFKA-16414: -- As well, I'm +1 on including the active segment on size-based retention. I guess this change of behavior does not require a KIP (?), but let's make sure it's mentioned/acknowledged on the Release notes so if there's any user relying on how the mix of retention by size and active segment rotation currently work, they will be aware when upgrading. > Inconsistent active segment expiration behavior between retention.ms and > retention.bytes > > > Key: KAFKA-16414 > URL: https://issues.apache.org/jira/browse/KAFKA-16414 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.1 >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Major > > This is a follow up issue on KAFKA-16385. > Currently, there's a difference between how retention.ms and retention.bytes > handle active segment expiration: > - retention.ms always expire active segment when max segment timestamp > matches the condition. > - retention.bytes only expire active segment when retention.bytes is > configured to zero. > The behavior should be either rotate active segments for both retention > configurations or none at all. > For more details, see > https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes
[ https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831327#comment-17831327 ] Jorge Esteban Quilcate Otoya commented on KAFKA-16414: -- Just got the new messages while posting my last comment. As Kamal mentioned, Tiered Storage has been a feature where there were assumptions about how active segment rotation works and aligning these behaviors will potentially break some integration tests. This is why I was hesitating to suggest a KIP for this change. > Inconsistent active segment expiration behavior between retention.ms and > retention.bytes > > > Key: KAFKA-16414 > URL: https://issues.apache.org/jira/browse/KAFKA-16414 > Project: Kafka > Issue Type: Improvement >Affects Versions: 3.6.1 >Reporter: Kuan Po Tseng >Assignee: Kuan Po Tseng >Priority: Major > > This is a follow up issue on KAFKA-16385. > Currently, there's a difference between how retention.ms and retention.bytes > handle active segment expiration: > - retention.ms always expire active segment when max segment timestamp > matches the condition. > - retention.bytes only expire active segment when retention.bytes is > configured to zero. > The behavior should be either rotate active segments for both retention > configurations or none at all. > For more details, see > https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]
nizhikov commented on PR #15075: URL: https://github.com/apache/kafka/pull/15075#issuecomment-2022645115 @chia7712 CI are ready and seems OK for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1541026665 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -49,6 +49,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.server.common.AdminOperationException import org.apache.kafka.server.config.{ConfigEntityName, ConfigType} +import org.apache.kafka.server.config.KafkaConfig._ Review Comment: For this one and maybe other that import few config from KafkaConfig this would be nice improvement but some classes like DynamicBrokerConfig for example include a large set of config. I can update few to use `KAFKA.SOME_CONFIG` if they need small set of config and leave the others specially that we when we move them to java we will do this anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1541032203 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -49,6 +49,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.server.common.AdminOperationException import org.apache.kafka.server.config.{ConfigEntityName, ConfigType} +import org.apache.kafka.server.config.KafkaConfig._ Review Comment: Also at the moment we can't use `KafkaConfig` everywhere as there's another `KafkaConfig` in scala -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1541026665 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -49,6 +49,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.server.common.AdminOperationException import org.apache.kafka.server.config.{ConfigEntityName, ConfigType} +import org.apache.kafka.server.config.KafkaConfig._ Review Comment: ~For this one and maybe other that import few config from KafkaConfig this would be nice improvement but some classes like DynamicBrokerConfig for example include a large set of config. I can update few to use `KAFKA.SOME_CONFIG` if they need small set of config and leave the others specially that we when we move them to java we will do this anyway. ~ Ignore this I got the comment wrong. At the moment we can't use KafkaConfig everywhere as there's another KafkaConfig in scala ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -49,6 +49,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.server.common.AdminOperationException import org.apache.kafka.server.config.{ConfigEntityName, ConfigType} +import org.apache.kafka.server.config.KafkaConfig._ Review Comment: Also at the moment we can't use `KafkaConfig` everywhere as there's another `KafkaConfig` in scala -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1541026665 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -49,6 +49,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.server.common.AdminOperationException import org.apache.kafka.server.config.{ConfigEntityName, ConfigType} +import org.apache.kafka.server.config.KafkaConfig._ Review Comment: ~ For this one and maybe other that import few config from KafkaConfig this would be nice improvement but some classes like DynamicBrokerConfig for example include a large set of config. I can update few to use `KAFKA.SOME_CONFIG` if they need small set of config and leave the others specially that we when we move them to java we will do this anyway. ~ Ignore my previous comment I got the comment wrong. At the moment we can't use KafkaConfig everywhere as there's another KafkaConfig in scala -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]
chia7712 commented on PR #15588: URL: https://github.com/apache/kafka/pull/15588#issuecomment-2022675724 @jeqo I should merge code after getting your reviews :( -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
OmniaGM commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1541026665 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -49,6 +49,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.server.common.AdminOperationException import org.apache.kafka.server.config.{ConfigEntityName, ConfigType} +import org.apache.kafka.server.config.KafkaConfig._ Review Comment: For this one and maybe other that import few config from KafkaConfig this would be nice improvement but some classes like DynamicBrokerConfig for example include a large set of config. I can update few to use `KAFKA.SOME_CONFIG` if they need small set of config and leave the others specially that we when we move them to java we will do this anyway. Ignore my previous comment I got the comment wrong. At the moment we can't use KafkaConfig everywhere as there's another KafkaConfig in scala -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] refactor: improve the output information during partition directory m… [kafka]
KevinZTW opened a new pull request, #15610: URL: https://github.com/apache/kafka/pull/15610 Currently, when we using `kafka-reassign-partitions` to move the log directory, the output only indicates which replica's movement has successfully started. This PR propose to show more detailed information, helping end users understand that the operation is proceeding as expected. For example, if I have a mv.json as below, which move `t3-0`, `t3-1` to different log directory ```json { "version":1, "partitions":[ { "topic":"t3","partition":0,"replicas":[1],"log_dirs":["/var/lib/kafka/data/logs"] }, { "topic":"t3","partition":1,"replicas":[1, 2, 3],"log_dirs":["/var/lib/kafka/data/logs", "/var/lib/kafka/data/logs2", "/var/lib/kafka/data/logs2"] } ] } ``` - origin output ``` Successfully started partition reassignments for t3-0,t3-1 Successfully started log directory moves for: t3-0-1,t3-1-1,t3-1-2,t3-1-3 ``` - proposed output ``` Successfully started partition reassignments for t3-0,t3-1 Successfully started moving log directory to /var/lib/kafka/data/logs for replica t3-0 with broker id: 1 Successfully started moving log directory to /var/lib/kafka/data/logs for replica t3-1 with broker id: 1 Successfully started moving log directory to /var/lib/kafka/data/logs2 for replica t3-1 with broker id: 2 Successfully started moving log directory to /var/lib/kafka/data/logs2 for replica t3-1 with broker id: 3 ``` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
mimaison commented on code in PR #15569: URL: https://github.com/apache/kafka/pull/15569#discussion_r1541117312 ## core/src/main/scala/kafka/server/ZkAdminManager.scala: ## @@ -49,6 +49,7 @@ import org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S import org.apache.kafka.common.utils.Sanitizer import org.apache.kafka.server.common.AdminOperationException import org.apache.kafka.server.config.{ConfigEntityName, ConfigType} +import org.apache.kafka.server.config.KafkaConfig._ Review Comment: Ah right, that's a good point. Thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-15265) Remote copy/fetch quotas for tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831383#comment-17831383 ] Satish Duggana commented on KAFKA-15265: KIP-956 is approved. [~abhijeetkumar] is working on pushing the implementation to the trunk. > Remote copy/fetch quotas for tiered storage. > > > Key: KAFKA-15265 > URL: https://issues.apache.org/jira/browse/KAFKA-15265 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Satish Duggana >Assignee: Abhijeet Kumar >Priority: Major > > Related KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]
chia7712 commented on PR #15075: URL: https://github.com/apache/kafka/pull/15075#issuecomment-2022937094 ``` ./gradlew cleanTest :tools:test --tests GetOffsetShellTest.testTopicPatternArgWithPartitionsArg :trogdor:test --tests CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test --tests MirrorConnectorsIntegrationExactlyOnceTest.testReplicateSourceDefault --tests MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault :core:test --tests ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric --tests LogDirFailureTest.testIOExceptionDuringCheckpoint :clients:test --tests KafkaAdminClientTest.testClientSideTimeoutAfterFailureToReceiveResponse ``` failed tests can pass on my local, so I'm about to merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]
chia7712 merged PR #15075: URL: https://github.com/apache/kafka/pull/15075 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-16432) KStreams: Joining KStreams and GlobalKTable requires a state store
Matej Sprysl created KAFKA-16432: Summary: KStreams: Joining KStreams and GlobalKTable requires a state store Key: KAFKA-16432 URL: https://issues.apache.org/jira/browse/KAFKA-16432 Project: Kafka Issue Type: Bug Reporter: Matej Sprysl h2. Code: {code:java} final StreamsBuilder builder = new StreamsBuilder(); final GlobalKTable table = builder.globalTable("tableTopic", Consumed.with(Serdes.Long(), Serdes.Long())); final KStream stream = builder.stream( Pattern.compile("streamTopic"), Consumed.with(Serdes.ByteArray(), Message.SERDE)); (...some processing, no state store added...) final var joiner = new MyJoiner(); final var keyMapper = new MyKeyMapper(); final KStream enriched = messages .join(table, keyMapper, joiner, Named.as("innerJoin"));{code} h2. Error: {code:java} Caused by: org.apache.kafka.streams.errors.StreamsException: Processor innerJoin has no access to StateStore tableTopic-STATE-STORE-00 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA. {code} h2. Additional notes: This error happens when I try to join a KStreams instance with a GlobalKTable instance. It is important to emphasize that I am not connecting any state store manually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
mjsax commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1541286490 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -58,6 +61,8 @@ public class AbstractConfig { private final ConfigDef definition; +public static final String AUTOMATIC_CONFIG_PROVIDERS_PROPERTY = "org.apache.kafka.automatic.config.providers"; Review Comment: Looks like a public API change to me. Don't we need a KIP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] MINOR: Remove redundant ApiVersionsResponse#filterApis [kafka]
brandboat opened a new pull request, #15611: URL: https://github.com/apache/kafka/pull/15611 The method `ApiVersionsResponse#filterApis(RecordVersion, ApiMessageType.ListenerType)` is only used in test, we can remove it and invoke the other one. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-16432) KStreams: Joining KStreams and GlobalKTable requires a state store
[ https://issues.apache.org/jira/browse/KAFKA-16432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matej Sprysl updated KAFKA-16432: - Description: h2. Code: {code:java} final StreamsBuilder builder = new StreamsBuilder(); final GlobalKTable table = builder.globalTable("tableTopic", Consumed.with(Serdes.Long(), Serdes.Long())); final StreamsBuilder builder2 = new StreamsBuilder(); final KStream stream = builder2.stream( Pattern.compile("streamTopic"), Consumed.with(Serdes.ByteArray(), Message.SERDE)); (...some processing, no state store added...) final var joiner = new MyJoiner(); final var keyMapper = new MyKeyMapper(); final KStream enriched = messages .join(table, keyMapper, joiner, Named.as("innerJoin"));{code} h2. Error: {code:java} Caused by: org.apache.kafka.streams.errors.StreamsException: Processor innerJoin has no access to StateStore tableTopic-STATE-STORE-00 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA. {code} h2. Additional notes: This error happens when I try to join a KStreams instance with a GlobalKTable instance. It is important to emphasize that I am not connecting any state store manually. was: h2. Code: {code:java} final StreamsBuilder builder = new StreamsBuilder(); final GlobalKTable table = builder.globalTable("tableTopic", Consumed.with(Serdes.Long(), Serdes.Long())); final KStream stream = builder.stream( Pattern.compile("streamTopic"), Consumed.with(Serdes.ByteArray(), Message.SERDE)); (...some processing, no state store added...) final var joiner = new MyJoiner(); final var keyMapper = new MyKeyMapper(); final KStream enriched = messages .join(table, keyMapper, joiner, Named.as("innerJoin"));{code} h2. Error: {code:java} Caused by: org.apache.kafka.streams.errors.StreamsException: Processor innerJoin has no access to StateStore tableTopic-STATE-STORE-00 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA. {code} h2. Additional notes: This error happens when I try to join a KStreams instance with a GlobalKTable instance. It is important to emphasize that I am not connecting any state store manually. > KStreams: Joining KStreams and GlobalKTable requires a state store > -- > > Key: KAFKA-16432 > URL: https://issues.apache.org/jira/browse/KAFKA-16432 > Project: Kafka > Issue Type: Bug >Reporter: Matej Sprysl >Priority: Major > > h2. Code: > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final GlobalKTable table = builder.globalTable("tableTopic", > Consumed.with(Serdes.Long(), Serdes.Long())); > final StreamsBuilder builder2 = new StreamsBuilder(); > final KStream stream = > builder2.stream( > Pattern.compile("streamTopic"), > Consumed.with(Serdes.ByteArray(), Message.SERDE)); > (...some processing, no state store added...) > final var joiner = new MyJoiner(); > final var keyMapper = new MyKeyMapper(); > final KStream enriched = > messages > .join(table, keyMapper, joiner, Named.as("innerJoin"));{code} > h2. Error: > {code:java} > Caused by: org.apache.kafka.streams.errors.StreamsException: Processor > innerJoin has no access to StateStore tableTopic-STATE-STORE-00 as > the store is not connected to the processor. If you add stores manually via > '.addStateStore()' make sure to connect the added store to the processor by > providing the processor name to '.addStateStore()' or connect them via > '.connectProcessorAndStateStores()'. DSL users need to provide the store name > to '.process()', '.transform()', or '.transformValues()' to connect the store > to the c
[jira] [Commented] (KAFKA-16432) KStreams: Joining KStreams and GlobalKTable requires a state store
[ https://issues.apache.org/jira/browse/KAFKA-16432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831415#comment-17831415 ] Matej Sprysl commented on KAFKA-16432: -- Update: The issue was probably caused by creating the table stream with a different builder. Closing. > KStreams: Joining KStreams and GlobalKTable requires a state store > -- > > Key: KAFKA-16432 > URL: https://issues.apache.org/jira/browse/KAFKA-16432 > Project: Kafka > Issue Type: Bug >Reporter: Matej Sprysl >Priority: Major > > h2. Code: > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final GlobalKTable table = builder.globalTable("tableTopic", > Consumed.with(Serdes.Long(), Serdes.Long())); > final KStream stream = > builder.stream( > Pattern.compile("streamTopic"), > Consumed.with(Serdes.ByteArray(), Message.SERDE)); > (...some processing, no state store added...) > final var joiner = new MyJoiner(); > final var keyMapper = new MyKeyMapper(); > final KStream enriched = > messages > .join(table, keyMapper, joiner, Named.as("innerJoin"));{code} > h2. Error: > {code:java} > Caused by: org.apache.kafka.streams.errors.StreamsException: Processor > innerJoin has no access to StateStore tableTopic-STATE-STORE-00 as > the store is not connected to the processor. If you add stores manually via > '.addStateStore()' make sure to connect the added store to the processor by > providing the processor name to '.addStateStore()' or connect them via > '.connectProcessorAndStateStores()'. DSL users need to provide the store name > to '.process()', '.transform()', or '.transformValues()' to connect the store > to the corresponding operator, or they can provide a StoreBuilder by > implementing the stores() method on the Supplier itself. If you do not add > stores manually, please file a bug report at > https://issues.apache.org/jira/projects/KAFKA. > {code} > h2. Additional notes: > This error happens when I try to join a KStreams instance with a GlobalKTable > instance. > It is important to emphasize that I am not connecting any state store > manually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-15915) Flaky test - testUnrecoverableError - ProducerIdManagerTest
[ https://issues.apache.org/jira/browse/KAFKA-15915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andras Katona reassigned KAFKA-15915: - Assignee: Andras Katona > Flaky test - testUnrecoverableError - ProducerIdManagerTest > --- > > Key: KAFKA-15915 > URL: https://issues.apache.org/jira/browse/KAFKA-15915 > Project: Kafka > Issue Type: Bug >Reporter: Lucas Brutschy >Assignee: Andras Katona >Priority: Major > Labels: flaky-test > > Test intermittently gives the following result: > {code} > java.lang.UnsupportedOperationException: Success.failed > at scala.util.Success.failed(Try.scala:277) > at > kafka.coordinator.transaction.ProducerIdManagerTest.verifyFailure(ProducerIdManagerTest.scala:234) > at > kafka.coordinator.transaction.ProducerIdManagerTest.testUnrecoverableErrors(ProducerIdManagerTest.scala:199) > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Resolved] (KAFKA-16432) KStreams: Joining KStreams and GlobalKTable requires a state store
[ https://issues.apache.org/jira/browse/KAFKA-16432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matej Sprysl resolved KAFKA-16432. -- Resolution: Fixed Fixed by using the same StreamsBuilder for both streams. > KStreams: Joining KStreams and GlobalKTable requires a state store > -- > > Key: KAFKA-16432 > URL: https://issues.apache.org/jira/browse/KAFKA-16432 > Project: Kafka > Issue Type: Bug >Reporter: Matej Sprysl >Priority: Major > > h2. Code: > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final GlobalKTable table = builder.globalTable("tableTopic", > Consumed.with(Serdes.Long(), Serdes.Long())); > final KStream stream = > builder.stream( > Pattern.compile("streamTopic"), > Consumed.with(Serdes.ByteArray(), Message.SERDE)); > (...some processing, no state store added...) > final var joiner = new MyJoiner(); > final var keyMapper = new MyKeyMapper(); > final KStream enriched = > messages > .join(table, keyMapper, joiner, Named.as("innerJoin"));{code} > h2. Error: > {code:java} > Caused by: org.apache.kafka.streams.errors.StreamsException: Processor > innerJoin has no access to StateStore tableTopic-STATE-STORE-00 as > the store is not connected to the processor. If you add stores manually via > '.addStateStore()' make sure to connect the added store to the processor by > providing the processor name to '.addStateStore()' or connect them via > '.connectProcessorAndStateStores()'. DSL users need to provide the store name > to '.process()', '.transform()', or '.transformValues()' to connect the store > to the corresponding operator, or they can provide a StoreBuilder by > implementing the stores() method on the Supplier itself. If you do not add > stores manually, please file a bug report at > https://issues.apache.org/jira/projects/KAFKA. > {code} > h2. Additional notes: > This error happens when I try to join a KStreams instance with a GlobalKTable > instance. > It is important to emphasize that I am not connecting any state store > manually. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
OmniaGM commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1541313630 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -1502,13 +1502,23 @@ public static Map filterMap(final Map map, final Predicate propsToMap(Properties properties) { -Map map = new HashMap<>(properties.size()); -for (Map.Entry entry : properties.entrySet()) { +return castToStringObjectMap(properties); +} + +/** + * Cast a map with arbitrary type keys to be keyed on String. + * @param inputMap A map with unknown type keys + * @return A map with the same contents as the input map, but with String keys + * @throws ConfigException if any key is not a String + */ +public static Map castToStringObjectMap(Map inputMap) { Review Comment: There is a similar function like this one in `AbstractConfigTest.convertPropertiesToMap` maybe we can drop the one in `AbstractConfigTest` ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -925,10 +926,13 @@ static Map adminConfigs(String connName, // Ignore configs that begin with "admin." since those will be added next (with the prefix stripped) // and those that begin with "producer." and "consumer.", since we know they aren't intended for // the admin client +// Also ignore the config.providers configurations because the worker-configured ConfigProviders should +// already have been evaluated via the trusted WorkerConfig constructor Map nonPrefixedWorkerConfigs = config.originals().entrySet().stream() .filter(e -> !e.getKey().startsWith("admin.") && !e.getKey().startsWith("producer.") -&& !e.getKey().startsWith("consumer.")) +&& !e.getKey().startsWith("consumer.") +&& !e.getKey().startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) Review Comment: Small suggestion, as the list of config isn't that huge then I would suggest refactoring this to something similar to the following ``` Stream prefixes = Stream.of("admin.", "producer.", "consumer.", AbstractConfig.CONFIG_PROVIDERS_CONFIG); Map nonPrefixedWorkerConfigs = config.originals().entrySet().stream() .filter(e -> prefixes.allMatch(s -> !e.getKey().startsWith(s))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); ``` ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -580,8 +601,15 @@ private Map instantiateConfigProviders(Map instantiateConfigProviders(Map providerClassName = Optional.ofNullable(indirectConfigs.get(providerClass)); Boolean isAllowed = providerClassName.map(name -> classNameFilter.test(name)).orElse(false); if (isAllowed) { providerMap.put(provider, providerClassName.get()); } else { throw new ConfigException(providerClassName + " is not allowed. Update System property '" + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + providerClassName); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
OmniaGM commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1541293009 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -580,8 +601,15 @@ private Map instantiateConfigProviders(Map providerClassName = Optional.ofNullable(indirectConfigs.get(providerClass)); Boolean isAllowed = providerClassName.map(name -> classNameFilter.test(name)).orElse(false); if (isAllowed) { providerMap.put(provider, providerClassName.get()); } else { throw new ConfigException(providerClassName + " is not allowed. Update System property '" + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + providerClassName); } ``` ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -580,8 +601,15 @@ private Map instantiateConfigProviders(Map providerClassName = Optional.ofNullable(indirectConfigs.get(providerClass)); Boolean isAllowed = providerClassName.map(name -> classNameFilter.test(name)).orElse(false); if (isAllowed) { providerMap.put(provider, providerClassName.get()); } else { throw new ConfigException(providerClassName + " is not allowed. Update System property '" + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + providerClassName); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
OmniaGM commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1541328444 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -925,10 +926,13 @@ static Map adminConfigs(String connName, // Ignore configs that begin with "admin." since those will be added next (with the prefix stripped) // and those that begin with "producer." and "consumer.", since we know they aren't intended for // the admin client +// Also ignore the config.providers configurations because the worker-configured ConfigProviders should +// already have been evaluated via the trusted WorkerConfig constructor Map nonPrefixedWorkerConfigs = config.originals().entrySet().stream() .filter(e -> !e.getKey().startsWith("admin.") && !e.getKey().startsWith("producer.") -&& !e.getKey().startsWith("consumer.")) +&& !e.getKey().startsWith("consumer.") +&& !e.getKey().startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) Review Comment: Small suggestion, as the list of config isn't that huge then I would suggest refactoring this to something similar to the following ``` Stream prefixes = Stream.of("admin.", "producer.", "consumer.", AbstractConfig.CONFIG_PROVIDERS_CONFIG); Map nonPrefixedWorkerConfigs = config.originals().entrySet().stream() .filter(e -> prefixes.allMatch(s -> !e.getKey().startsWith(s))) .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]
akatona84 commented on code in PR #15605: URL: https://github.com/apache/kafka/pull/15605#discussion_r1541357369 ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,19 +38,57 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + case class ErrorCount(error: Errors, var repeat: Int) + + object ErrorCount { +val INDEFINITE: Int = -1 + +def indefinitely(error: Errors): ErrorCount = { + ErrorCount(error, INDEFINITE) +} + } + + class ErrorQueue(initialErrorCounts: ErrorCount*) { +private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ initialErrorCounts + +def takeError(): Errors = queue.synchronized { + while (queue.head.repeat == 0) { +queue.dequeue() + } + if (queue.head.repeat > 0) { +queue.head.repeat -= 1 + } + queue.head.error +} + +def peekError(): Errors = queue.synchronized { + queue.head.error +} + +def clearProcessedError(): Unit = { + TestUtils.waitUntilTrue(() => +queue.synchronized { + queue.head.repeat == 0 +}, "error wasn't processed") + queue.synchronized { +queue.dequeue() Review Comment: no, but doesn't matter any more. I was able to get rid of the maybeRequestNextBlock override completely and the whole "mocking" became much more easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16406 [2] : Split consumer commit tests [kafka]
lianetm commented on PR #15612: URL: https://github.com/apache/kafka/pull/15612#issuecomment-2023088189 @lucasbru this is one more last split that I find would make sense. With this, all the tests left in the `PlainTextConsumer` do not seem to belong to any sensible group other than the generic/misc one seen as the PlainText itself. Could you take a look if you have a chance? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]
soarez commented on code in PR #15605: URL: https://github.com/apache/kafka/pull/15605#discussion_r1541375369 ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + case class ErrorCount(error: Errors, var repeat: Int) + + object ErrorCount { +def once(error: Errors): ErrorCount = { + ErrorCount(error, 1) +} + } + + class ErrorQueue(initialErrorCounts: ErrorCount*) { +private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ initialErrorCounts + +def takeError(): Errors = queue.synchronized { + if (queue.isEmpty) { +return Errors.NONE + } + while (queue.nonEmpty && queue.head.repeat == 0) { +queue.dequeue() + } + if (queue.isEmpty) { +return Errors.NONE + } + if (queue.head.repeat > 0) { +queue.head.repeat -= 1 + } + queue.head.error Review Comment: Should the error be immediately dequeued once it's taken? ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + case class ErrorCount(error: Errors, var repeat: Int) + + object ErrorCount { +def once(error: Errors): ErrorCount = { + ErrorCount(error, 1) +} + } Review Comment: Do we need an error count / `repeat`? It seems the tests don't make use of it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]
chia7712 commented on PR #15489: URL: https://github.com/apache/kafka/pull/15489#issuecomment-2023109755 @Owen-CH-Leung The `printUsageAndExit` call `exit(1)` so `KRAFT` and `CO_KRAFT` will stop the JVM. `ZK` can capture the exit code to throw exception so it does not terminate the JVM. Hence, a simple solution is - set dumb exit procedure for `testPrintHelp`. For example: ```java @ClusterTest public void testPrintHelp() { Exit.setExitProcedure((statusCode, message) -> { }); try { String out = ToolsTestUtils.captureStandardErr(() -> GetOffsetShell.mainNoExit("--help")); assertTrue(out.startsWith(GetOffsetShell.USAGE_TEXT)); } finally { Exit.resetExitProcedure(); } } ``` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]
akatona84 commented on code in PR #15605: URL: https://github.com/apache/kafka/pull/15605#discussion_r1541378249 ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + case class ErrorCount(error: Errors, var repeat: Int) + + object ErrorCount { +def once(error: Errors): ErrorCount = { + ErrorCount(error, 1) +} + } Review Comment: as I'm simplifying the whole stuff can be simplified as well :D thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16103: commitSync should await pending async commits [kafka]
lucasbru opened a new pull request, #15613: URL: https://github.com/apache/kafka/pull/15613 The javadoc for `KafkaConsumer.commitSync` says: > Note that asynchronous offset commits sent previously with the {@link #commitAsync(OffsetCommitCallback)} > (or similar) are guaranteed to have their callbacks invoked prior to completion of this method. This is not always true, neither for the legacy consumer nor for the async consumer. This change proposed a number of fixes related to this guarantee: - In the legacy consumer, we're also awaiting async commits that are "pending" instead of "in-flight", because we do not know the coordinator yet. - In the new consumer, we keep track of the incomplete async commit futures and wait for them to complete before returning from `commitSync`. - Since we need to block to make sure that our previous commits are completed, we allow the consumer to wake up. This is implemented but we are leaving it unresolved in the legacy consumer. ## Testing A new integration test ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]
akatona84 commented on code in PR #15605: URL: https://github.com/apache/kafka/pull/15605#discussion_r1541384909 ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + case class ErrorCount(error: Errors, var repeat: Int) + + object ErrorCount { +def once(error: Errors): ErrorCount = { + ErrorCount(error, 1) +} + } + + class ErrorQueue(initialErrorCounts: ErrorCount*) { +private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ initialErrorCounts + +def takeError(): Errors = queue.synchronized { + if (queue.isEmpty) { +return Errors.NONE + } + while (queue.nonEmpty && queue.head.repeat == 0) { +queue.dequeue() + } + if (queue.isEmpty) { +return Errors.NONE + } + if (queue.head.repeat > 0) { +queue.head.repeat -= 1 + } + queue.head.error Review Comment: i just removed the maybe request next block and it required peeking into it, but since it's gone, no need to have the counters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]
akatona84 commented on code in PR #15605: URL: https://github.com/apache/kafka/pull/15605#discussion_r1541384909 ## core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala: ## @@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when} import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} class ProducerIdManagerTest { var brokerToController: NodeToControllerChannelManager = mock(classOf[NodeToControllerChannelManager]) val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) + case class ErrorCount(error: Errors, var repeat: Int) + + object ErrorCount { +def once(error: Errors): ErrorCount = { + ErrorCount(error, 1) +} + } + + class ErrorQueue(initialErrorCounts: ErrorCount*) { +private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ initialErrorCounts + +def takeError(): Errors = queue.synchronized { + if (queue.isEmpty) { +return Errors.NONE + } + while (queue.nonEmpty && queue.head.repeat == 0) { +queue.dequeue() + } + if (queue.isEmpty) { +return Errors.NONE + } + if (queue.head.repeat > 0) { +queue.head.repeat -= 1 + } + queue.head.error Review Comment: i just removed the maybe request next block and it required peeking into it, but since it's gone, no need to have the counters either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]
OmniaGM commented on code in PR #15572: URL: https://github.com/apache/kafka/pull/15572#discussion_r1541382798 ## server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java: ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.security.authorizer; + +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.SecurityUtils; +import org.apache.kafka.server.util.Json; +import org.apache.kafka.server.util.json.DecodeJson; +import org.apache.kafka.server.util.json.JsonObject; +import org.apache.kafka.server.util.json.JsonValue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.acl.AclOperation.ALTER; +import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS; +import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION; +import static org.apache.kafka.common.acl.AclOperation.CREATE; +import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS; +import static org.apache.kafka.common.acl.AclOperation.DELETE; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS; +import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE; +import static org.apache.kafka.common.acl.AclOperation.READ; +import static org.apache.kafka.common.acl.AclOperation.WRITE; + +public class AclEntry extends AccessControlEntry { +private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger(); +private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString(); + +public static final KafkaPrincipal WILDCARD_PRINCIPAL = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"); +public static final String WILDCARD_PRINCIPAL_STRING = WILDCARD_PRINCIPAL.toString(); +public static final String WILDCARD_HOST = "*"; +public static final String WILDCARD_RESOURCE = ResourcePattern.WILDCARD_RESOURCE; +public static final String RESOURCE_SEPARATOR = ":"; +public static final Set RESOURCE_TYPES = Arrays.stream(ResourceType.values()) +.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY)) +.collect(Collectors.toSet()); +public static final Set ACL_OPERATIONS = Arrays.stream(AclOperation.values()) +.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY)) +.collect(Collectors.toSet()); + +private static final String PRINCIPAL_KEY = "principal"; +private static final String PERMISSION_TYPE_KEY = "permissionType"; +private static final String OPERATION_KEY = "operation"; +private static final String HOSTS_KEY = "host"; +public static final String VERSION_KEY = "version"; +public static final int CURRENT_VERSION = 1; +private static final String ACLS_KEY = "acls"; + +public final AccessControlEntry ace; +public final KafkaPrincipal kafkaPrincipal; + +public AclEntry(AccessControlEntry ace) { +super(ace.principal(), ace.host(), ace.operation(), ace.permissionType()); +this.ace = ace; + +kafkaPrincipal = ace.principal() == null +? null +: SecurityUtils.parseKafkaPrincipal(ace.principal()); +} + +public static AclEntry apply(KafkaPrincipal principal, + AclPermissionType permissionType, + String host, + AclOperation operation) { +return new AclEntry(new AccessControlEntry(principal == null ? null :
Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]
johnnychhsu commented on code in PR #15463: URL: https://github.com/apache/kafka/pull/15463#discussion_r1541419703 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -4164,16 +4164,13 @@ class ReplicaManagerTest { mock(classOf[FetchDataInfo]) }).when(spyRLM).read(any()) - // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec metric value before fetching - val curExpiresPerSec = safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long] + val curExpiresPerSec = DelayedRemoteFetchMetrics.expiredRequestMeter.count() replicaManager.fetchMessages(params, Seq(tidp0 -> new PartitionData(topicId, fetchOffset, 0, 10, Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), UnboundedQuota, fetchCallback) // advancing the clock to expire the delayed remote fetch timer.advanceClock(2000L) - // verify the metric value is incremented since the delayed remote fetch is expired - TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long], -"The ExpiresPerSec value is not incremented. Current value is: " + - safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]) + // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is called since the delayed remote fetch is expired + assertEquals(curExpiresPerSec + 1, DelayedRemoteFetchMetrics.expiredRequestMeter.count()) Review Comment: ah i see. thanks for the comment @showuon ! let me address that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: AbstractConfig cleanup [kafka]
gharris1727 commented on code in PR #15597: URL: https://github.com/apache/kafka/pull/15597#discussion_r1541420383 ## clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java: ## @@ -580,8 +601,15 @@ private Map instantiateConfigProviders(Map
[jira] [Commented] (KAFKA-15265) Remote copy/fetch quotas for tiered storage.
[ https://issues.apache.org/jira/browse/KAFKA-15265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831438#comment-17831438 ] Henry Cai commented on KAFKA-15265: --- Thanks. [~abhijeetkumar] Do you have a rough timeline when the PR will be ready? Do you need other people to contribute to the work? > Remote copy/fetch quotas for tiered storage. > > > Key: KAFKA-15265 > URL: https://issues.apache.org/jira/browse/KAFKA-15265 > Project: Kafka > Issue Type: Improvement > Components: core >Reporter: Satish Duggana >Assignee: Abhijeet Kumar >Priority: Major > > Related KIP: > https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]
CalvinConfluent commented on PR #15541: URL: https://github.com/apache/kafka/pull/15541#issuecomment-2023173614 @kirktrue @jolshan Anything else we need to address for this ticket? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]
clolov commented on PR #14716: URL: https://github.com/apache/kafka/pull/14716#issuecomment-2023192543 Hello @cadonna! This ought to be one of the last PRs for the migration. I will circle back tomorrow morning to rebase it since it has been out for a long time -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]
mimaison commented on PR #15569: URL: https://github.com/apache/kafka/pull/15569#issuecomment-2023212508 Now that https://github.com/apache/kafka/pull/15075 got merged, this needs rebasing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16406 [2] : Split consumer commit tests [kafka]
lianetm commented on code in PR #15612: URL: https://github.com/apache/kafka/pull/15612#discussion_r1541452355 ## core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala: ## @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package kafka.api + +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.consumer._ +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.test.MockConsumerInterceptor +import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Timeout +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.{Arguments, MethodSource} + +import java.time.Duration +import java.util +import java.util.Optional +import java.util.stream.Stream +import scala.jdk.CollectionConverters._ + +/** + * Integration tests for the consumer that covers the logic related to committing offsets. + */ +@Timeout(600) +class PlaintextConsumerCommitTest extends AbstractConsumerTest { + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testAutoCommitOnClose(quorum: String, groupProtocol: String): Unit = { +this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") +val consumer = createConsumer() + +val numRecords = 1 +val producer = createProducer() +sendRecords(producer, numRecords, tp) + +consumer.subscribe(List(topic).asJava) +awaitAssignment(consumer, Set(tp, tp2)) + +// should auto-commit sought positions before closing +consumer.seek(tp, 300) +consumer.seek(tp2, 500) +consumer.close() + +// now we should see the committed positions from another consumer +val anotherConsumer = createConsumer() +assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testAutoCommitOnCloseAfterWakeup(quorum: String, groupProtocol: String): Unit = { +this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") +val consumer = createConsumer() + +val numRecords = 1 +val producer = createProducer() +sendRecords(producer, numRecords, tp) + +consumer.subscribe(List(topic).asJava) +awaitAssignment(consumer, Set(tp, tp2)) + +// should auto-commit sought positions before closing +consumer.seek(tp, 300) +consumer.seek(tp2, 500) + +// wakeup the consumer before closing to simulate trying to break a poll +// loop from another thread +consumer.wakeup() +consumer.close() + +// now we should see the committed positions from another consumer +val anotherConsumer = createConsumer() +assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset) +assertEquals(500, anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset) + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testCommitMetadata(quorum: String, groupProtocol: String): Unit = { +val consumer = createConsumer() +consumer.assign(List(tp).asJava) + +// sync commit +val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo") +consumer.commitSync(Map((tp, syncMetadata)).asJava) +assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp)) + +// async commit +val asyncMetadata = new OffsetAndMetadata(10, "bar") +sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata))) +assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp)) + +// handle null metadata +val nullMetadata = new OffsetAndMetadata(5, null) +consumer.commitSync(Map(tp -> nullMetadata).asJava) +
Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]
nizhikov commented on code in PR #15572: URL: https://github.com/apache/kafka/pull/15572#discussion_r1541471761 ## server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java: ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.security.authorizer; + +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.SecurityUtils; +import org.apache.kafka.server.util.Json; +import org.apache.kafka.server.util.json.DecodeJson; +import org.apache.kafka.server.util.json.JsonObject; +import org.apache.kafka.server.util.json.JsonValue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.acl.AclOperation.ALTER; +import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS; +import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION; +import static org.apache.kafka.common.acl.AclOperation.CREATE; +import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS; +import static org.apache.kafka.common.acl.AclOperation.DELETE; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS; +import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE; +import static org.apache.kafka.common.acl.AclOperation.READ; +import static org.apache.kafka.common.acl.AclOperation.WRITE; + +public class AclEntry extends AccessControlEntry { +private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger(); +private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString(); + +public static final KafkaPrincipal WILDCARD_PRINCIPAL = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"); +public static final String WILDCARD_PRINCIPAL_STRING = WILDCARD_PRINCIPAL.toString(); +public static final String WILDCARD_HOST = "*"; +public static final String WILDCARD_RESOURCE = ResourcePattern.WILDCARD_RESOURCE; +public static final String RESOURCE_SEPARATOR = ":"; +public static final Set RESOURCE_TYPES = Arrays.stream(ResourceType.values()) +.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY)) +.collect(Collectors.toSet()); +public static final Set ACL_OPERATIONS = Arrays.stream(AclOperation.values()) +.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY)) +.collect(Collectors.toSet()); + +private static final String PRINCIPAL_KEY = "principal"; +private static final String PERMISSION_TYPE_KEY = "permissionType"; +private static final String OPERATION_KEY = "operation"; +private static final String HOSTS_KEY = "host"; +public static final String VERSION_KEY = "version"; +public static final int CURRENT_VERSION = 1; +private static final String ACLS_KEY = "acls"; + +public final AccessControlEntry ace; +public final KafkaPrincipal kafkaPrincipal; + +public AclEntry(AccessControlEntry ace) { +super(ace.principal(), ace.host(), ace.operation(), ace.permissionType()); +this.ace = ace; + +kafkaPrincipal = ace.principal() == null +? null +: SecurityUtils.parseKafkaPrincipal(ace.principal()); +} + +public static AclEntry apply(KafkaPrincipal principal, + AclPermissionType permissionType, + String host, + AclOperation operation) { +return new AclEntry(new AccessControlEntry(principal == null ? null
Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]
nizhikov commented on code in PR #15572: URL: https://github.com/apache/kafka/pull/15572#discussion_r1541472627 ## server/src/main/java/org/apache/kafka/security/CredentialProvider.java: ## @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.security; + +import org.apache.kafka.common.security.authenticator.CredentialCache; +import org.apache.kafka.common.security.scram.ScramCredential; +import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils; +import org.apache.kafka.common.security.scram.internals.ScramMechanism; +import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache; + +import java.util.Collection; +import java.util.Properties; + +public class CredentialProvider { +private final Collection scramMechanisms; Review Comment: Nice catch. Thanks. Field removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]
nizhikov commented on code in PR #15572: URL: https://github.com/apache/kafka/pull/15572#discussion_r1541475816 ## core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala: ## @@ -183,12 +184,13 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg //with includeAuthorizedOperations flag topicResult = getTopicMetadata(client, topic, new DescribeTopicsOptions().includeAuthorizedOperations(true)) -expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC).asJava +expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC) assertEquals(expectedOperations, topicResult.authorizedOperations) } + @nowarn("cat=deprecation") def configuredClusterPermissions: Set[AclOperation] = -AclEntry.supportedOperations(ResourceType.CLUSTER) + JavaConverters.asScalaSet(AclEntry.supportedOperations(ResourceType.CLUSTER)).toSet Review Comment: AFAIK `asScala` not exists in scala-2.12 which we use to build kafka. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore
[ https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831460#comment-17831460 ] Johnny Hsu commented on KAFKA-16310: The update is in below section: ## When the TimestampType is LOG_APPEND_TIME When the TimestampType is LOG_APPEND_TIME, the timestamp of the records are the same. In this case, we should choose the offset of the first record. [This path|https://github.com/apache/kafka/blob/6f38fe5e0a6e2fe85fec7cb9adc379061d35ce45/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L294] in LogValidator was added to handle this case for non-compressed type, while [this path|https://github.com/apache/kafka/blob/6f38fe5e0a6e2fe85fec7cb9adc379061d35ce45/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L421] in LogValidator was added to handle this case for compressed type. I don't have the Confluence account yet, [~chia7712] would you please help update the KIP in the wiki? I will send this update to the dev thread for visibility. Thanks! > ListOffsets doesn't report the offset with maxTimestamp anymore > --- > > Key: KAFKA-16310 > URL: https://issues.apache.org/jira/browse/KAFKA-16310 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Emanuele Sabellico >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > Updated: This is confirmed a regression issue in v3.7.0. > The impact of this issue is that when there is a batch containing records > with timestamp not in order, the offset of the timestamp will be wrong.(ex: > the timestamp for t0 should be mapping to offset 10, but will get offset 12.. > etc). It'll cause the time index is putting the wrong offset, so the result > will be unexpected. > === > The last offset is reported instead. > A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking > that the offset with the max timestamp is the middle one and not the last > one. The tests is passing with 3.6.0 and previous versions > This is the test: > [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989] > > there are three messages, with timestamps: > {noformat} > t0 + 100 > t0 + 400 > t0 + 250{noformat} > and indices 0,1,2. > then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done. > it should return offset 1 but in 3.7.0 and trunk is returning offset 2 > Even after 5 seconds from producing it's still returning 2 as the offset with > max timestamp. > ProduceRequest and ListOffsets were sent to the same broker (2), the leader > didn't change. > {code:java} > %7|1709134230.019|SEND|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, > 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse > (v7, 95 bytes, CorrId 2, rtt 1.18ms) > %7|1709134230.020|MSGSET|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: > rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 > message(s) (MsgId 0, BaseSeq -1) delivered {code} > {code:java} > %7|1709134235.021|SEND|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest > (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received > ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]
ijuma commented on code in PR #15572: URL: https://github.com/apache/kafka/pull/15572#discussion_r1541483681 ## core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala: ## @@ -183,12 +184,13 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg //with includeAuthorizedOperations flag topicResult = getTopicMetadata(client, topic, new DescribeTopicsOptions().includeAuthorizedOperations(true)) -expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC).asJava +expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC) assertEquals(expectedOperations, topicResult.authorizedOperations) } + @nowarn("cat=deprecation") def configuredClusterPermissions: Set[AclOperation] = -AclEntry.supportedOperations(ResourceType.CLUSTER) + JavaConverters.asScalaSet(AclEntry.supportedOperations(ResourceType.CLUSTER)).toSet Review Comment: You can use CollectionConverters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]
philipnee commented on PR #15525: URL: https://github.com/apache/kafka/pull/15525#issuecomment-2023293507 @lucasbru - If I'm not mistaken, the current implementation for both beginningOrEndOffsets and OffsetsForTimes both need to send out a request upon getting ZERO duration. Seems like both code paths are invoking this logic ``` // if timeout is set to zero, do not try to poll the network client at all // and return empty immediately; otherwise try to get the results synchronously // and throw timeout exception if it cannot complete in time if (timer.timeoutMs() == 0L) return result; ``` But the offsets for time seems to shortcircuit it here: ``` // If timeout is set to zero return empty immediately; otherwise try to get the results // and throw timeout exception if it cannot complete in time. if (timeout.toMillis() == 0L) return listOffsetsEvent.emptyResult(); return applicationEventHandler.addAndGet(listOffsetsEvent, timer); ``` I'll create a ticket to align the behavior of these two APIs in the new consumers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]
nizhikov commented on code in PR #15572: URL: https://github.com/apache/kafka/pull/15572#discussion_r1541492785 ## server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java: ## @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.security.authorizer; + +import org.apache.kafka.common.acl.AccessControlEntry; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.acl.AclPermissionType; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.resource.ResourcePattern; +import org.apache.kafka.common.resource.ResourceType; +import org.apache.kafka.common.security.auth.KafkaPrincipal; +import org.apache.kafka.common.utils.SecurityUtils; +import org.apache.kafka.server.util.Json; +import org.apache.kafka.server.util.json.DecodeJson; +import org.apache.kafka.server.util.json.JsonObject; +import org.apache.kafka.server.util.json.JsonValue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.kafka.common.acl.AclOperation.ALTER; +import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS; +import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION; +import static org.apache.kafka.common.acl.AclOperation.CREATE; +import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS; +import static org.apache.kafka.common.acl.AclOperation.DELETE; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS; +import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS; +import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE; +import static org.apache.kafka.common.acl.AclOperation.READ; +import static org.apache.kafka.common.acl.AclOperation.WRITE; + +public class AclEntry extends AccessControlEntry { +private static final DecodeJson.DecodeInteger INT = new DecodeJson.DecodeInteger(); +private static final DecodeJson.DecodeString STRING = new DecodeJson.DecodeString(); + +public static final KafkaPrincipal WILDCARD_PRINCIPAL = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*"); +public static final String WILDCARD_PRINCIPAL_STRING = WILDCARD_PRINCIPAL.toString(); +public static final String WILDCARD_HOST = "*"; +public static final String WILDCARD_RESOURCE = ResourcePattern.WILDCARD_RESOURCE; +public static final String RESOURCE_SEPARATOR = ":"; +public static final Set RESOURCE_TYPES = Arrays.stream(ResourceType.values()) +.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY)) +.collect(Collectors.toSet()); +public static final Set ACL_OPERATIONS = Arrays.stream(AclOperation.values()) +.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY)) +.collect(Collectors.toSet()); + +private static final String PRINCIPAL_KEY = "principal"; +private static final String PERMISSION_TYPE_KEY = "permissionType"; +private static final String OPERATION_KEY = "operation"; +private static final String HOSTS_KEY = "host"; +public static final String VERSION_KEY = "version"; +public static final int CURRENT_VERSION = 1; +private static final String ACLS_KEY = "acls"; + +public final AccessControlEntry ace; +public final KafkaPrincipal kafkaPrincipal; + +public AclEntry(AccessControlEntry ace) { +super(ace.principal(), ace.host(), ace.operation(), ace.permissionType()); +this.ace = ace; + +kafkaPrincipal = ace.principal() == null +? null +: SecurityUtils.parseKafkaPrincipal(ace.principal()); +} + +public static AclEntry apply(KafkaPrincipal principal, + AclPermissionType permissionType, + String host, + AclOperation operation) { +return new AclEntry(new AccessControlEntry(principal == null ? null
[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore
[ https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831463#comment-17831463 ] Chia-Ping Tsai commented on KAFKA-16310: @johnnyhsu thanks for offers the description. I add following statement to KIP-734 according to your comments. {quote} This returns the offset and timestamp corresponding to the record with the highest timestamp on the partition. Noted that we should choose the offset of the earliest record if the timestamp of the records are the same. {quote} WDYT? > ListOffsets doesn't report the offset with maxTimestamp anymore > --- > > Key: KAFKA-16310 > URL: https://issues.apache.org/jira/browse/KAFKA-16310 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Emanuele Sabellico >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > Updated: This is confirmed a regression issue in v3.7.0. > The impact of this issue is that when there is a batch containing records > with timestamp not in order, the offset of the timestamp will be wrong.(ex: > the timestamp for t0 should be mapping to offset 10, but will get offset 12.. > etc). It'll cause the time index is putting the wrong offset, so the result > will be unexpected. > === > The last offset is reported instead. > A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking > that the offset with the max timestamp is the middle one and not the last > one. The tests is passing with 3.6.0 and previous versions > This is the test: > [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989] > > there are three messages, with timestamps: > {noformat} > t0 + 100 > t0 + 400 > t0 + 250{noformat} > and indices 0,1,2. > then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done. > it should return offset 1 but in 3.7.0 and trunk is returning offset 2 > Even after 5 seconds from producing it's still returning 2 as the offset with > max timestamp. > ProduceRequest and ListOffsets were sent to the same broker (2), the leader > didn't change. > {code:java} > %7|1709134230.019|SEND|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, > 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse > (v7, 95 bytes, CorrId 2, rtt 1.18ms) > %7|1709134230.020|MSGSET|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: > rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 > message(s) (MsgId 0, BaseSeq -1) delivered {code} > {code:java} > %7|1709134235.021|SEND|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest > (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received > ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout
Philip Nee created KAFKA-16433: -- Summary: beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout Key: KAFKA-16433 URL: https://issues.apache.org/jira/browse/KAFKA-16433 Project: Kafka Issue Type: Task Components: consumer Reporter: Philip Nee Assignee: Philip Nee As documented here:[https://github.com/apache/kafka/pull/15525] Both API should at least send out a request when zero timeout is provided. This is corrected in the PR above. We however still to fix the implementation for offsetsForTimes API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout
[ https://issues.apache.org/jira/browse/KAFKA-16433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Philip Nee updated KAFKA-16433: --- Labels: consumer-threading-refactor (was: ) > beginningOffsets and offsetsForTimes don't behave consistently when providing > a zero timeout > > > Key: KAFKA-16433 > URL: https://issues.apache.org/jira/browse/KAFKA-16433 > Project: Kafka > Issue Type: Task > Components: consumer >Reporter: Philip Nee >Assignee: Philip Nee >Priority: Blocker > Labels: consumer-threading-refactor > > As documented here:[https://github.com/apache/kafka/pull/15525] > > Both API should at least send out a request when zero timeout is provided. > > This is corrected in the PR above. We however still to fix the > implementation for offsetsForTimes API. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore
[ https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831464#comment-17831464 ] Johnny Hsu edited comment on KAFKA-16310 at 3/27/24 5:01 PM: - {quote}[~chia7712] thanks for the help! This returns the offset and timestamp corresponding to the record with the highest timestamp on the partition. Noted that we should choose the offset of the earliest record if the timestamp of the records are the same. This sounds good to me, thanks! {quote} was (Author: JIRAUSER304478): {quote}[~chia7712] thanks for the help! This returns the offset and timestamp corresponding to the record with the highest timestamp on the partition. Noted that we should choose the offset of the earliest record if the timestamp of the records are the same. This sounds good to me, thanks! {quote} > ListOffsets doesn't report the offset with maxTimestamp anymore > --- > > Key: KAFKA-16310 > URL: https://issues.apache.org/jira/browse/KAFKA-16310 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Emanuele Sabellico >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > Updated: This is confirmed a regression issue in v3.7.0. > The impact of this issue is that when there is a batch containing records > with timestamp not in order, the offset of the timestamp will be wrong.(ex: > the timestamp for t0 should be mapping to offset 10, but will get offset 12.. > etc). It'll cause the time index is putting the wrong offset, so the result > will be unexpected. > === > The last offset is reported instead. > A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking > that the offset with the max timestamp is the middle one and not the last > one. The tests is passing with 3.6.0 and previous versions > This is the test: > [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989] > > there are three messages, with timestamps: > {noformat} > t0 + 100 > t0 + 400 > t0 + 250{noformat} > and indices 0,1,2. > then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done. > it should return offset 1 but in 3.7.0 and trunk is returning offset 2 > Even after 5 seconds from producing it's still returning 2 as the offset with > max timestamp. > ProduceRequest and ListOffsets were sent to the same broker (2), the leader > didn't change. > {code:java} > %7|1709134230.019|SEND|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, > 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse > (v7, 95 bytes, CorrId 2, rtt 1.18ms) > %7|1709134230.020|MSGSET|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: > rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 > message(s) (MsgId 0, BaseSeq -1) delivered {code} > {code:java} > %7|1709134235.021|SEND|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest > (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received > ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore
[ https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831464#comment-17831464 ] Johnny Hsu commented on KAFKA-16310: {quote}[~chia7712] thanks for the help! This returns the offset and timestamp corresponding to the record with the highest timestamp on the partition. Noted that we should choose the offset of the earliest record if the timestamp of the records are the same. This sounds good to me, thanks! {quote} > ListOffsets doesn't report the offset with maxTimestamp anymore > --- > > Key: KAFKA-16310 > URL: https://issues.apache.org/jira/browse/KAFKA-16310 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.7.0 >Reporter: Emanuele Sabellico >Assignee: Chia-Ping Tsai >Priority: Blocker > Fix For: 3.6.2, 3.8.0, 3.7.1 > > > Updated: This is confirmed a regression issue in v3.7.0. > The impact of this issue is that when there is a batch containing records > with timestamp not in order, the offset of the timestamp will be wrong.(ex: > the timestamp for t0 should be mapping to offset 10, but will get offset 12.. > etc). It'll cause the time index is putting the wrong offset, so the result > will be unexpected. > === > The last offset is reported instead. > A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking > that the offset with the max timestamp is the middle one and not the last > one. The tests is passing with 3.6.0 and previous versions > This is the test: > [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989] > > there are three messages, with timestamps: > {noformat} > t0 + 100 > t0 + 400 > t0 + 250{noformat} > and indices 0,1,2. > then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done. > it should return offset 1 but in 3.7.0 and trunk is returning offset 2 > Even after 5 seconds from producing it's still returning 2 as the offset with > max timestamp. > ProduceRequest and ListOffsets were sent to the same broker (2), the leader > didn't change. > {code:java} > %7|1709134230.019|SEND|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, > 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse > (v7, 95 bytes, CorrId 2, rtt 1.18ms) > %7|1709134230.020|MSGSET|0081_admin#producer-3| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: > rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 > message(s) (MsgId 0, BaseSeq -1) delivered {code} > {code:java} > %7|1709134235.021|SEND|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest > (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| > [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received > ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]
nizhikov commented on code in PR #15572: URL: https://github.com/apache/kafka/pull/15572#discussion_r1541508048 ## core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala: ## @@ -183,12 +184,13 @@ abstract class BaseAdminIntegrationTest extends IntegrationTestHarness with Logg //with includeAuthorizedOperations flag topicResult = getTopicMetadata(client, topic, new DescribeTopicsOptions().includeAuthorizedOperations(true)) -expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC).asJava +expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC) assertEquals(expectedOperations, topicResult.authorizedOperations) } + @nowarn("cat=deprecation") def configuredClusterPermissions: Set[AclOperation] = -AclEntry.supportedOperations(ResourceType.CLUSTER) + JavaConverters.asScalaSet(AclEntry.supportedOperations(ResourceType.CLUSTER)).toSet Review Comment: Hello @ijuma Sorry, I didn't find a way to use `CollectionConverters` to convert java set to immutable scala set. Can you give me an example? `AclEntry.supportedOperations(ResourceType.CLUSTER).asScala` which uses `CollectionConverters` returns `scala.collection.mutable.Set` while here we use `scala.collection.immutable.Set`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org