[jira] [Created] (KAFKA-16430) The group-metadata-manager thread is always in a loading state and occupies one CPU, unable to end.

2024-03-27 Thread Gao Fei (Jira)
Gao Fei created KAFKA-16430:
---

 Summary: The group-metadata-manager thread is always in a loading 
state and occupies one CPU, unable to end.
 Key: KAFKA-16430
 URL: https://issues.apache.org/jira/browse/KAFKA-16430
 Project: Kafka
  Issue Type: Bug
  Components: group-coordinator
Affects Versions: 2.4.0
Reporter: Gao Fei


I deployed three broker instances and suddenly found that the client was unable 
to consume data from certain topic partitions. I first tried to log in to the 
broker corresponding to the group and used the following command to view the 
consumer group:
{code:java}
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9093 --describe 
--group mygroup{code}
and found the following error:
{code:java}
Error: Executing consumer group command failed due to 
org.apache.kafka.common.errors.CoodinatorLoadInProgressException: The 
coodinator is loading and hence can't process requests.{code}

I then discovered that the broker may be stuck in a loop, which is constantly 
in a loading state. At the same time, I found through the top command that the 
"group-metadata-manager-0" thread was constantly consuming 100% of the CPU 
resources. This loop could not be broken, resulting in the inability to consume 
topic partition data on that node. At this point, I suspected that the issue 
may be related to the __consumer_offsets partition data file loaded by this 
thread.
Finally, after restarting the broker instance, everything was back to normal. 
It's very strange that if there was an issue with the __consumer_offsets 
partition data file, the broker should have failed to start. Why was it able to 
automatically recover after a restart? And why did this continuous loop loading 
of the __consumer_offsets partition data occur?

We encountered this issue in our production environment using Kafka versions 
2.2.1 and 2.4.0, and I believe it may also affect other versions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15715) KRaft support in UpdateFeaturesTest

2024-03-27 Thread johndoe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15715?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

johndoe reassigned KAFKA-15715:
---

Assignee: (was: johndoe)

> KRaft support in UpdateFeaturesTest
> ---
>
> Key: KAFKA-15715
> URL: https://issues.apache.org/jira/browse/KAFKA-15715
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Reporter: Sameer Tejani
>Priority: Minor
>  Labels: kraft, kraft-test, newbie
>
> The following tests in UpdateFeaturesTest in 
> core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala need to be 
> updated to support KRaft
> 176 : def testShouldFailRequestIfNotController(): Unit = {
> 210 : def testShouldFailRequestWhenDowngradeFlagIsNotSetDuringDowngrade(): 
> Unit = {
> 223 : def 
> testShouldFailRequestWhenDowngradeToHigherVersionLevelIsAttempted(): Unit = {
> 236 : def 
> testShouldFailRequestInServerWhenDowngradeFlagIsNotSetDuringDeletion(): Unit 
> = {
> 280 : def testShouldFailRequestDuringDeletionOfNonExistingFeature(): Unit = {
> 292 : def testShouldFailRequestWhenUpgradingToSameVersionLevel(): Unit = {
> 354 : def 
> testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibilityForExistingFinalizedFeature():
>  Unit = {
> 368 : def 
> testShouldFailRequestDuringBrokerMaxVersionLevelIncompatibilityWithNoExistingFinalizedFeature():
>  Unit = {
> 381 : def testSuccessfulFeatureUpgradeAndWithNoExistingFinalizedFeatures(): 
> Unit = {
> 417 : def testSuccessfulFeatureUpgradeAndDowngrade(): Unit = {
> 459 : def testPartialSuccessDuringValidFeatureUpgradeAndInvalidDowngrade(): 
> Unit = {
> 509 : def testPartialSuccessDuringInvalidFeatureUpgradeAndValidDowngrade(): 
> Unit = {
> Scanned 567 lines. Found 0 KRaft tests out of 12 tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Improve logging in AssignmentsManager [kafka]

2024-03-27 Thread via GitHub


showuon commented on PR #15522:
URL: https://github.com/apache/kafka/pull/15522#issuecomment-2022115723

   Will check it this week or next. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16365: AssignmentsManager callback handling issues [kafka]

2024-03-27 Thread via GitHub


showuon commented on PR #15521:
URL: https://github.com/apache/kafka/pull/15521#issuecomment-2022115867

   Will try to review it this week or next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15558) Determine if Timer should be used elsewhere in PrototypeAsyncConsumer.updateFetchPositions()

2024-03-27 Thread Andrew Schofield (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831197#comment-17831197
 ] 

Andrew Schofield commented on KAFKA-15558:
--

I was referring to the way that there is inconsistent time-out handling for 
some of the events which are sent from the application thread to the background 
thread in the new consumer. I know this is being fixed now, so I think there's 
nothing more to do on this ticket now.

> Determine if Timer should be used elsewhere in 
> PrototypeAsyncConsumer.updateFetchPositions()
> 
>
> Key: KAFKA-15558
> URL: https://issues.apache.org/jira/browse/KAFKA-15558
> Project: Kafka
>  Issue Type: Task
>  Components: clients, consumer
>Reporter: Kirk True
>Assignee: Phuc Hong Tran
>Priority: Major
>  Labels: consumer-threading-refactor, fetcher, timeout
> Fix For: 3.8.0
>
>
> This is a followup ticket based on a question from [~junrao] when reviewing 
> the [fetch request manager pull 
> request|https://github.com/apache/kafka/pull/14406]:
> {quote}It still seems weird that we only use the timer for 
> {{{}refreshCommittedOffsetsIfNeeded{}}}, but not for other cases where we 
> don't have valid fetch positions. For example, if all partitions are in 
> {{AWAIT_VALIDATION}} state, it seems that {{PrototypeAsyncConsumer.poll()}} 
> will just go in a busy loop, which is not efficient.
> {quote}
> The goal here is to determine if we should also be propagating the Timer to 
> the validate positions and reset positions operations.
> Note: we should also investigate if the existing {{KafkaConsumer}} 
> implementation should be fixed, too.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[PR] [MINOR] Renaming the `Abortable_Transaction` error to `Transaction_Abortable` [kafka]

2024-03-27 Thread via GitHub


sjhajharia opened a new pull request, #15609:
URL: https://github.com/apache/kafka/pull/15609

   This is a follow-up to [this](https://github.com/apache/kafka/pull/15486) PR 
which introduced the new `ABORTABLE_TRANSACTION` error as a part of KIP-890 
efforts. However on further discussion, we seem to gain consensus that the 
error should be rather named as `TRANSACTION_ABORTABLE`. 
   This PR aims to address the same. There are no changes in the code apart 
from that.
   
   ### References
   JIRA: https://issues.apache.org/jira/browse/KAFKA-16314
   Original PR: https://github.com/apache/kafka/pull/15486
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16403) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords

2024-03-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831201#comment-17831201
 ] 

Matthias J. Sax commented on KAFKA-16403:
-

Sound like an env issue but not a bug to me? I would tend to close as "not a 
bug"?

> Flaky test 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords
> -
>
> Key: KAFKA-16403
> URL: https://issues.apache.org/jira/browse/KAFKA-16403
> Project: Kafka
>  Issue Type: Bug
>Reporter: Igor Soarez
>Priority: Major
>
> {code:java}
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords()
>  failed, log available in 
> /home/jenkins/workspace/Kafka_kafka-pr_PR-14903/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords().test.stdout
> Gradle Test Run :streams:examples:test > Gradle Test Executor 82 > 
> WordCountDemoTest > testCountListOfWords() FAILED
>     org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store KSTREAM-AGGREGATE-STATE-STORE-03 at location 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276)
>         at 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60)
>         Caused by:
>         org.rocksdb.RocksDBException: Corruption: IO error: No such file or 
> directory: While open a file for random read: 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/10.ldb:
>  No such file or directory in file 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/MANIFEST-05
>             at org.rocksdb.RocksDB.open(Native Method)
>             at org.rocksdb.RocksDB.open(RocksDB.java:307)
>             at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>             ... 17 more
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16404) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig

2024-03-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16404?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831202#comment-17831202
 ] 

Matthias J. Sax commented on KAFKA-16404:
-

Sounds like an env issue to me. I would propose to close this as "not a bug".

> Flaky test 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig
> -
>
> Key: KAFKA-16404
> URL: https://issues.apache.org/jira/browse/KAFKA-16404
> Project: Kafka
>  Issue Type: Bug
>Reporter: Igor Soarez
>Priority: Major
>
>  
> {code:java}
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig()
>  failed, log available in 
> /home/jenkins/workspace/Kafka_kafka-pr_PR-14903@2/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig().test.stdout
> Gradle Test Run :streams:examples:test > Gradle Test Executor 87 > 
> WordCountDemoTest > testGetStreamsConfig() FAILED
>     org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store KSTREAM-AGGREGATE-STATE-STORE-03 at location 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03
>         at 
> app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>         at 
> app//org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69)
>         at 
> app//org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>         at 
> app//org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>         at 
> app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> app//org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>         at 
> app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151)
>         at 
> app//org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>         at 
> app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151)
>         at 
> app//org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>         at 
> app//org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>         at 
> app//org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>         at 
> app//org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530)
>         at 
> app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373)
>         at 
> app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300)
>         at 
> app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276)
>         at 
> app//org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60)
>         Caused by:
>         org.rocksdb.RocksDBException: While lock file: 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/LOCK:
>  Resource temporarily unavailable
>             at app//org.rocksdb.RocksDB.open(Native Method)
>             at app//org.rocksdb.RocksDB.open(RocksDB.java:307)
>             at 
> app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>             ... 17 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] [MINOR] Renaming the `Abortable_Transaction` error to `Transaction_Abortable` [kafka]

2024-03-27 Thread via GitHub


sjhajharia commented on PR #15609:
URL: https://github.com/apache/kafka/pull/15609#issuecomment-2022175562

   Requesting review from @jolshan (reviewer of the parent PR)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on PR #15075:
URL: https://github.com/apache/kafka/pull/15075#issuecomment-2022177651

   @chia7712 Conflicts resolved.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-27 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831208#comment-17831208
 ] 

Matthias J. Sax commented on KAFKA-16382:
-

An application reset, does not touch the output topic. So it's expected that 
output from the first run of the app is still in the output topic after the 
reset.
{quote}The issue is NOT reproduced if internal cache is disabled.
{quote}
That's also sounds like behavior as expected. Caching implies that the output 
might not contain all intermediate results.

I think we can close this ticket as "not a bug" ?

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
>  # Expected output "A1:anull, A1:ab, A1:"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on code in PR #15075:
URL: https://github.com/apache/kafka/pull/15075#discussion_r1540643481


##
server/src/main/java/org/apache/kafka/server/config/ZkConfig.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.zookeeper.client.ZKClientConfig;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ZkConfig {
+/** * Zookeeper Configuration ***/
+public static final String ZK_CONNECT_PROP = "zookeeper.connect";
+public static final String ZK_SESSION_TIMEOUT_MS_PROP = 
"zookeeper.session.timeout.ms";
+public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = 
"zookeeper.connection.timeout.ms";
+public static final String ZK_ENABLE_SECURE_ACLS_PROP = 
"zookeeper.set.acl";
+public static final String ZK_MAX_IN_FLIGHT_REQUESTS_PROP = 
"zookeeper.max.in.flight.requests";
+public static final String ZK_SSL_CLIENT_ENABLE_PROP = 
"zookeeper.ssl.client.enable";
+public static final String ZK_CLIENT_CNXN_SOCKET_PROP = 
"zookeeper.clientCnxnSocket";
+public static final String ZK_SSL_KEY_STORE_LOCATION_PROP = 
"zookeeper.ssl.keystore.location";
+public static final String ZK_SSL_KEY_STORE_PASSWORD_PROP = 
"zookeeper.ssl.keystore.password";
+public static final String ZK_SSL_KEY_STORE_TYPE_PROP = 
"zookeeper.ssl.keystore.type";
+public static final String ZK_SSL_TRUST_STORE_LOCATION_PROP = 
"zookeeper.ssl.truststore.location";
+public static final String ZK_SSL_TRUST_STORE_PASSWORD_PROP = 
"zookeeper.ssl.truststore.password";
+public static final String ZK_SSL_TRUST_STORE_TYPE_PROP = 
"zookeeper.ssl.truststore.type";
+public static final String ZK_SSL_PROTOCOL_PROP = "zookeeper.ssl.protocol";
+public static final String ZK_SSL_ENABLED_PROTOCOLS_PROP = 
"zookeeper.ssl.enabled.protocols";
+public static final String ZK_SSL_CIPHER_SUITES_PROP = 
"zookeeper.ssl.cipher.suites";
+public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP = 
"zookeeper.ssl.endpoint.identification.algorithm";
+public static final String ZK_SSL_CRL_ENABLE_PROP = 
"zookeeper.ssl.crl.enable";
+public static final String ZK_SSL_OCSP_ENABLE_PROP = 
"zookeeper.ssl.ocsp.enable";
+
+// a map from the Kafka config to the corresponding ZooKeeper Java system 
property
+public static final Map 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP;
+
+public static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper 
connection string in the form hostname:port where host and port 
are the " +
+"host and port of a ZooKeeper server. To allow connecting through 
other ZooKeeper nodes when that ZooKeeper machine is " +
+"down you can also specify multiple hosts in the form 
hostname1:port1,hostname2:port2,hostname3:port3.\n" +
+"The server can also have a ZooKeeper chroot path as part of its 
ZooKeeper connection string which puts its data under some path in the global 
ZooKeeper namespace. " +
+"For example to give a chroot path of /chroot/path 
you would give the connection string as 
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.";
+public static final String ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session 
timeout";
+public static final String ZK_CONNECTION_TIMEOUT_MS_DOC = "The max time 
that the client waits to establish a connection to ZooKeeper. If not set, the 
value in " + ZK_SESSION_TIMEOUT_MS_PROP + " is used";
+public static final String ZK_ENABLE_SECURE_ACLS_DOC = "Set client to use 
secure ACLs";
+public static final String ZK_MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum 
number of unacknowledged requests the client will send to ZooKeeper before 
blocking.";
+public static final String ZK_SSL_CLIENT_ENABLE_DOC;
+public static final String ZK_CLIENT_CNXN_SOCKET_DOC;
+public static final String ZK_SSL_KEY_STORE_LOCATION_DOC;
+public static final String ZK_SSL_KEY_STORE_PASSWORD_DOC;
+public static final String ZK_SSL_KEY_STORE_TYPE_DOC;
+public static final String ZK_SSL_TRUST_STORE_LOCATION_D

Re: [PR] MINOR: Add retry mechanism to EOS example [kafka]

2024-03-27 Thread via GitHub


showuon merged PR #15561:
URL: https://github.com/apache/kafka/pull/15561


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]

2024-03-27 Thread via GitHub


chia7712 commented on code in PR #15075:
URL: https://github.com/apache/kafka/pull/15075#discussion_r1540685634


##
server/src/main/java/org/apache/kafka/server/config/ZkConfig.java:
##
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.zookeeper.client.ZKClientConfig;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ZkConfig {
+/** * Zookeeper Configuration ***/
+public static final String ZK_CONNECT_PROP = "zookeeper.connect";
+public static final String ZK_SESSION_TIMEOUT_MS_PROP = 
"zookeeper.session.timeout.ms";
+public static final String ZK_CONNECTION_TIMEOUT_MS_PROP = 
"zookeeper.connection.timeout.ms";
+public static final String ZK_ENABLE_SECURE_ACLS_PROP = 
"zookeeper.set.acl";
+public static final String ZK_MAX_IN_FLIGHT_REQUESTS_PROP = 
"zookeeper.max.in.flight.requests";
+public static final String ZK_SSL_CLIENT_ENABLE_PROP = 
"zookeeper.ssl.client.enable";
+public static final String ZK_CLIENT_CNXN_SOCKET_PROP = 
"zookeeper.clientCnxnSocket";
+public static final String ZK_SSL_KEY_STORE_LOCATION_PROP = 
"zookeeper.ssl.keystore.location";
+public static final String ZK_SSL_KEY_STORE_PASSWORD_PROP = 
"zookeeper.ssl.keystore.password";
+public static final String ZK_SSL_KEY_STORE_TYPE_PROP = 
"zookeeper.ssl.keystore.type";
+public static final String ZK_SSL_TRUST_STORE_LOCATION_PROP = 
"zookeeper.ssl.truststore.location";
+public static final String ZK_SSL_TRUST_STORE_PASSWORD_PROP = 
"zookeeper.ssl.truststore.password";
+public static final String ZK_SSL_TRUST_STORE_TYPE_PROP = 
"zookeeper.ssl.truststore.type";
+public static final String ZK_SSL_PROTOCOL_PROP = "zookeeper.ssl.protocol";
+public static final String ZK_SSL_ENABLED_PROTOCOLS_PROP = 
"zookeeper.ssl.enabled.protocols";
+public static final String ZK_SSL_CIPHER_SUITES_PROP = 
"zookeeper.ssl.cipher.suites";
+public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_PROP = 
"zookeeper.ssl.endpoint.identification.algorithm";
+public static final String ZK_SSL_CRL_ENABLE_PROP = 
"zookeeper.ssl.crl.enable";
+public static final String ZK_SSL_OCSP_ENABLE_PROP = 
"zookeeper.ssl.ocsp.enable";
+
+// a map from the Kafka config to the corresponding ZooKeeper Java system 
property
+public static final Map 
ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP;
+
+public static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper 
connection string in the form hostname:port where host and port 
are the " +
+"host and port of a ZooKeeper server. To allow connecting through 
other ZooKeeper nodes when that ZooKeeper machine is " +
+"down you can also specify multiple hosts in the form 
hostname1:port1,hostname2:port2,hostname3:port3.\n" +
+"The server can also have a ZooKeeper chroot path as part of its 
ZooKeeper connection string which puts its data under some path in the global 
ZooKeeper namespace. " +
+"For example to give a chroot path of /chroot/path 
you would give the connection string as 
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path.";
+public static final String ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session 
timeout";
+public static final String ZK_CONNECTION_TIMEOUT_MS_DOC = "The max time 
that the client waits to establish a connection to ZooKeeper. If not set, the 
value in " + ZK_SESSION_TIMEOUT_MS_PROP + " is used";
+public static final String ZK_ENABLE_SECURE_ACLS_DOC = "Set client to use 
secure ACLs";
+public static final String ZK_MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum 
number of unacknowledged requests the client will send to ZooKeeper before 
blocking.";
+public static final String ZK_SSL_CLIENT_ENABLE_DOC;
+public static final String ZK_CLIENT_CNXN_SOCKET_DOC;
+public static final String ZK_SSL_KEY_STORE_LOCATION_DOC;
+public static final String ZK_SSL_KEY_STORE_PASSWORD_DOC;
+public static final String ZK_SSL_KEY_STORE_TYPE_DOC;
+public static final String ZK_SSL_TRUST_STORE_LOCATION_D

[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-27 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831228#comment-17831228
 ] 

Stanislav Spiridonov commented on KAFKA-16382:
--

Wait, why is it not a bug? I have the real scenario. 

Good scenario:
 # Some events happened. Output topic contains mapped result.
 # Event ends ({_}*null*{_} body).  Output topic contains delete message for 
the event. 

Wrong scenario:
 # Some events happened. Output topic contains mapped result.
 # We stopped Kafka, Make some updates + {*}full rest{*}. 
 # Meanwhile event ends ({*}_null_{*} body).  
 # We start Kafka, it process the input topic from scratch but "optimise" 
internally nulls. The output topic *still* contains mapped result. The delete 
message *never* reach the output topic.

As work around we can clear the output topic, But in out system the output 
topic is input for another Kafka Stream application so we need to reset all 
subsequent Kafka Stream applications to correct this behaviour.

Another workaround is on each null body generate another synthetic message with 
another key and some value in the body (it will not optimised) and check for 
such messages on write to output topic and generate back the delete message, 
but it is also looks as a hack.

 

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
>  # Expected output "A1:anull, A1:ab, A1:"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


urbandan commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1540691777


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,19 +38,57 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+val INDEFINITE: Int = -1
+
+def indefinitely(error: Errors): ErrorCount = {
+  ErrorCount(error, INDEFINITE)
+}
+  }
+
+  class ErrorQueue(initialErrorCounts: ErrorCount*) {
+private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ 
initialErrorCounts
+
+def takeError(): Errors = queue.synchronized {
+  while (queue.head.repeat == 0) {
+queue.dequeue()
+  }
+  if (queue.head.repeat > 0) {
+queue.head.repeat -= 1
+  }
+  queue.head.error
+}
+
+def peekError(): Errors = queue.synchronized {
+  queue.head.error
+}
+
+def clearProcessedError(): Unit = {
+  TestUtils.waitUntilTrue(() =>
+queue.synchronized {
+  queue.head.repeat == 0
+}, "error wasn't processed")
+  queue.synchronized {
+queue.dequeue()

Review Comment:
   shouldn't we clear the whole queue here?



##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,19 +38,57 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+val INDEFINITE: Int = -1
+
+def indefinitely(error: Errors): ErrorCount = {
+  ErrorCount(error, INDEFINITE)
+}
+  }
+
+  class ErrorQueue(initialErrorCounts: ErrorCount*) {
+private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ 
initialErrorCounts
+
+def takeError(): Errors = queue.synchronized {
+  while (queue.head.repeat == 0) {
+queue.dequeue()
+  }
+  if (queue.head.repeat > 0) {
+queue.head.repeat -= 1
+  }
+  queue.head.error
+}
+
+def peekError(): Errors = queue.synchronized {
+  queue.head.error
+}
+
+def clearProcessedError(): Unit = {
+  TestUtils.waitUntilTrue(() =>
+queue.synchronized {
+  queue.head.repeat == 0

Review Comment:
   how is this going to behave if the background thread keeps calling 
takeError, potentially removing all errors?
   shouldn't the condition check for an empty queue?
   also, what if the only element in the queue has INDEFINITE?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16148: Implement GroupMetadataManager#onUnloaded [kafka]

2024-03-27 Thread via GitHub


dajac commented on PR #15446:
URL: https://github.com/apache/kafka/pull/15446#issuecomment-2022239076

   @jeffkbkim There are failed tests that look related. Could you check them 
please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-27 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831230#comment-17831230
 ] 

Stanislav Spiridonov commented on KAFKA-16382:
--

BTW if you take the wrong scenario from previous comment but instead of 
*delete* message the input topic will contains an *update* message for the 
event, all will works correct - the output topic will contain the updated 
version of the event.

So, as you can see the {*}update{*}, as well as *add* will work correctly in 
case of full reset and the only *delete* are broken.

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
>  # Expected output "A1:anull, A1:ab, A1:"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-27 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831228#comment-17831228
 ] 

Stanislav Spiridonov edited comment on KAFKA-16382 at 3/27/24 8:55 AM:
---

Wait, why is it not a bug? I have the real scenario. 

Good scenario:
 # Some events happened. Output topic contains mapped result.
 # Event ends ({_}*null*{_} body).  Output topic contains delete message for 
the event. 

Wrong scenario:
 # Some events happened. Output topic contains mapped result.
 # We stopped Kafka, Make some updates + {*}full rest{*}. 
 # Meanwhile event ends ({*}_null_{*} body).  
 # We start Kafka, it process the input topic from scratch but "optimise" 
internally nulls. The output topic *still* contains mapped result. The delete 
message *never* reach the output topic.

As work around we can clear the output topic before Kafka start. But in out 
system the output topic is input for another Kafka Stream application so we 
need to reset all subsequent Kafka Stream applications to correct this 
behaviour.

Another workaround is on each null body generate another synthetic message with 
another key and some value in the body (it will not optimised) and check for 
such messages on write to output topic and generate back the delete message, 
but it is also looks as a hack.

 


was (Author: foal):
Wait, why is it not a bug? I have the real scenario. 

Good scenario:
 # Some events happened. Output topic contains mapped result.
 # Event ends ({_}*null*{_} body).  Output topic contains delete message for 
the event. 

Wrong scenario:
 # Some events happened. Output topic contains mapped result.
 # We stopped Kafka, Make some updates + {*}full rest{*}. 
 # Meanwhile event ends ({*}_null_{*} body).  
 # We start Kafka, it process the input topic from scratch but "optimise" 
internally nulls. The output topic *still* contains mapped result. The delete 
message *never* reach the output topic.

As work around we can clear the output topic, But in out system the output 
topic is input for another Kafka Stream application so we need to reset all 
subsequent Kafka Stream applications to correct this behaviour.

Another workaround is on each null body generate another synthetic message with 
another key and some value in the body (it will not optimised) and check for 
such messages on write to output topic and generate back the delete message, 
but it is also looks as a hack.

 

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
>  # Expected output "A1:anull, A1:ab, A1:"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16382) Kafka Streams drop NULL values after reset

2024-03-27 Thread Stanislav Spiridonov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831228#comment-17831228
 ] 

Stanislav Spiridonov edited comment on KAFKA-16382 at 3/27/24 8:57 AM:
---

Wait, why is it not a bug? I have the real scenario. 

Good scenario:
 # Some events happened. Output topic contains mapped result.
 # Event ends ({_}*null*{_} body).  Output topic contains delete message for 
the event. 

Wrong scenario:
 # Some events happened. Output topic contains mapped result.
 # We stopped Kafka, Make some updates + {*}full rest{*}. 
 # Meanwhile event ends ({*}_null_{*} body).  
 # We start Kafka, it process the input topic from scratch but "optimise" 
internally nulls. The output topic *still* contains mapped result. The delete 
message *never* reach the output topic.

As work around we can clear the output topic before Kafka start. But in out 
system the output topic is input for another Kafka Stream application so we 
need to reset all subsequent Kafka Stream applications to correct this 
behaviour.

Another workaround is on each delete message (null body) generate synthetic 
message with synthetic key and some value in the body (it will not optimised) 
and check for such messages on write to output topic and generate back the 
delete message to the output, but it is also looks as a hack.

 


was (Author: foal):
Wait, why is it not a bug? I have the real scenario. 

Good scenario:
 # Some events happened. Output topic contains mapped result.
 # Event ends ({_}*null*{_} body).  Output topic contains delete message for 
the event. 

Wrong scenario:
 # Some events happened. Output topic contains mapped result.
 # We stopped Kafka, Make some updates + {*}full rest{*}. 
 # Meanwhile event ends ({*}_null_{*} body).  
 # We start Kafka, it process the input topic from scratch but "optimise" 
internally nulls. The output topic *still* contains mapped result. The delete 
message *never* reach the output topic.

As work around we can clear the output topic before Kafka start. But in out 
system the output topic is input for another Kafka Stream application so we 
need to reset all subsequent Kafka Stream applications to correct this 
behaviour.

Another workaround is on each null body generate another synthetic message with 
another key and some value in the body (it will not optimised) and check for 
such messages on write to output topic and generate back the delete message, 
but it is also looks as a hack.

 

> Kafka Streams drop NULL values after reset
> --
>
> Key: KAFKA-16382
> URL: https://issues.apache.org/jira/browse/KAFKA-16382
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.6.1
>Reporter: Stanislav Spiridonov
>Priority: Major
>
> Kafka Streams (KTable) drops null values after full reset.
> See 
> [https://github.com/foal/Null-Issue/blob/main/src/main/java/NullProblemExample.java]
>  for sample topology
> Step to reproduce (req NULL-IN, NULL-IN-AUX, NULL-OUT topics)
>  # Start example - 1st round
>  # Send to NULL-IN "A1:a" -> NULL-OUT "A1:anull"
>  # Send to NULL-IN-AUX "A1:b" -> NULL-OUT "A1:anull, A1:ab"
>  # Stop application 
>  # Run kafka-streams-application-reset 
> {code:java}
> call bin/windows/kafka-streams-application-reset --application-id 
> nullproblem-example^
>  --input-topics "NULL-IN,NULL-IN-AUX"^
>  --bootstrap-server "localhost:9092"
> {code}
>  # Send to NULL-IN-AUX "A1:" -> NULL-OUT "A1:anull, A1:ab" - it is Ok (no app 
> running yet)
>  # Start example - 2nd round
>  # After initialization -> NULL-OUT *still contains* "A1:anull, A1:ab"
>  # Expected output "A1:anull, A1:ab, A1:"
> The issue is NOT reproduced if application just restarted (skip step 5). 
> The issue is NOT reproduced if internal cache is disabled.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16353: Offline protocol migration integration tests [kafka]

2024-03-27 Thread via GitHub


dajac merged PR #15492:
URL: https://github.com/apache/kafka/pull/15492


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-16353) Offline protocol migration integration tests

2024-03-27 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16353?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-16353.
-
Resolution: Fixed

> Offline protocol migration integration tests
> 
>
> Key: KAFKA-16353
> URL: https://issues.apache.org/jira/browse/KAFKA-16353
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Dongnuo Lyu
>Assignee: Dongnuo Lyu
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16403) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords

2024-03-27 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez resolved KAFKA-16403.
-
Resolution: Not A Bug

> Flaky test 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords
> -
>
> Key: KAFKA-16403
> URL: https://issues.apache.org/jira/browse/KAFKA-16403
> Project: Kafka
>  Issue Type: Bug
>Reporter: Igor Soarez
>Priority: Major
>
> {code:java}
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords()
>  failed, log available in 
> /home/jenkins/workspace/Kafka_kafka-pr_PR-14903/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords().test.stdout
> Gradle Test Run :streams:examples:test > Gradle Test Executor 82 > 
> WordCountDemoTest > testCountListOfWords() FAILED
>     org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store KSTREAM-AGGREGATE-STATE-STORE-03 at location 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276)
>         at 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60)
>         Caused by:
>         org.rocksdb.RocksDBException: Corruption: IO error: No such file or 
> directory: While open a file for random read: 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/10.ldb:
>  No such file or directory in file 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/MANIFEST-05
>             at org.rocksdb.RocksDB.open(Native Method)
>             at org.rocksdb.RocksDB.open(RocksDB.java:307)
>             at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>             ... 17 more
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16403) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords

2024-03-27 Thread Igor Soarez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831244#comment-17831244
 ] 

Igor Soarez commented on KAFKA-16403:
-

The test only failed the one time. I agree it was likely a problem in the 
execution environment. Closing.

> Flaky test 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords
> -
>
> Key: KAFKA-16403
> URL: https://issues.apache.org/jira/browse/KAFKA-16403
> Project: Kafka
>  Issue Type: Bug
>Reporter: Igor Soarez
>Priority: Major
>
> {code:java}
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords()
>  failed, log available in 
> /home/jenkins/workspace/Kafka_kafka-pr_PR-14903/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testCountListOfWords().test.stdout
> Gradle Test Run :streams:examples:test > Gradle Test Executor 82 > 
> WordCountDemoTest > testCountListOfWords() FAILED
>     org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store KSTREAM-AGGREGATE-STATE-STORE-03 at location 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>         at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>         at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151)
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151)
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>         at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300)
>         at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276)
>         at 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60)
>         Caused by:
>         org.rocksdb.RocksDBException: Corruption: IO error: No such file or 
> directory: While open a file for random read: 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/10.ldb:
>  No such file or directory in file 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/MANIFEST-05
>             at org.rocksdb.RocksDB.open(Native Method)
>             at org.rocksdb.RocksDB.open(RocksDB.java:307)
>             at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>             ... 17 more
>  {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16404) Flaky test org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig

2024-03-27 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16404?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez resolved KAFKA-16404.
-
Resolution: Not A Bug

Same as KAFKA-16403, this only failed once. It was likely the result of a 
testing infrastructure problem. We can always re-open if we see this again and 
suspect otherwise.

> Flaky test 
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig
> -
>
> Key: KAFKA-16404
> URL: https://issues.apache.org/jira/browse/KAFKA-16404
> Project: Kafka
>  Issue Type: Bug
>Reporter: Igor Soarez
>Priority: Major
>
>  
> {code:java}
> org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig()
>  failed, log available in 
> /home/jenkins/workspace/Kafka_kafka-pr_PR-14903@2/streams/examples/build/reports/testOutput/org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.testGetStreamsConfig().test.stdout
> Gradle Test Run :streams:examples:test > Gradle Test Executor 87 > 
> WordCountDemoTest > testGetStreamsConfig() FAILED
>     org.apache.kafka.streams.errors.ProcessorStateException: Error opening 
> store KSTREAM-AGGREGATE-STATE-STORE-03 at location 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03
>         at 
> app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:329)
>         at 
> app//org.apache.kafka.streams.state.internals.RocksDBTimestampedStore.openRocksDB(RocksDBTimestampedStore.java:69)
>         at 
> app//org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:254)
>         at 
> app//org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:175)
>         at 
> app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> app//org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:56)
>         at 
> app//org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:71)
>         at 
> app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$3(MeteredKeyValueStore.java:151)
>         at 
> app//org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
>         at 
> app//org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:151)
>         at 
> app//org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
>         at 
> app//org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
>         at 
> app//org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258)
>         at 
> app//org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:530)
>         at 
> app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:373)
>         at 
> app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:300)
>         at 
> app//org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:276)
>         at 
> app//org.apache.kafka.streams.examples.wordcount.WordCountDemoTest.setup(WordCountDemoTest.java:60)
>         Caused by:
>         org.rocksdb.RocksDBException: While lock file: 
> /tmp/kafka-streams/streams-wordcount/0_0/rocksdb/KSTREAM-AGGREGATE-STATE-STORE-03/LOCK:
>  Resource temporarily unavailable
>             at app//org.rocksdb.RocksDB.open(Native Method)
>             at app//org.rocksdb.RocksDB.open(RocksDB.java:307)
>             at 
> app//org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:323)
>             ... 17 more
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16297) Race condition while promoting future replica can lead to partition unavailability.

2024-03-27 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-16297:

Fix Version/s: 3.7.1

> Race condition while promoting future replica can lead to partition 
> unavailability.
> ---
>
> Key: KAFKA-16297
> URL: https://issues.apache.org/jira/browse/KAFKA-16297
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
> Fix For: 3.7.1
>
>
> KIP-858 proposed that when a directory failure occurs after changing the 
> assignment of a replica that's moved between two directories in the same 
> broker, but before the future replica promotion completes, the broker should 
> reassign the replica to inform the controller of its correct status. But this 
> hasn't yet been implemented, and without it this failure may lead to 
> indefinite partition unavailability.
> Example scenario:
>  # A broker which leads partition P receives a request to alter the replica 
> from directory A to directory B.
>  # The broker creates a future replica in directory B and starts a replica 
> fetcher.
>  # Once the future replica first catches up, the broker queues a reassignment 
> to inform the controller of the directory change.
>  # The next time the replica catches up, the broker briefly blocks appends 
> and promotes the replica. However, before the promotion is attempted, 
> directory A fails.
>  # The controller was informed that P in now in directory B before it 
> received the notification that directory A has failed, so it does not elect a 
> new leader, and as long as the broker is online, partition A remains 
> unavailable.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16297) Race condition while promoting future replica can lead to partition unavailability.

2024-03-27 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-16297:

Affects Version/s: 3.7.0

> Race condition while promoting future replica can lead to partition 
> unavailability.
> ---
>
> Key: KAFKA-16297
> URL: https://issues.apache.org/jira/browse/KAFKA-16297
> Project: Kafka
>  Issue Type: Sub-task
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
> Fix For: 3.7.1
>
>
> KIP-858 proposed that when a directory failure occurs after changing the 
> assignment of a replica that's moved between two directories in the same 
> broker, but before the future replica promotion completes, the broker should 
> reassign the replica to inform the controller of its correct status. But this 
> hasn't yet been implemented, and without it this failure may lead to 
> indefinite partition unavailability.
> Example scenario:
>  # A broker which leads partition P receives a request to alter the replica 
> from directory A to directory B.
>  # The broker creates a future replica in directory B and starts a replica 
> fetcher.
>  # Once the future replica first catches up, the broker queues a reassignment 
> to inform the controller of the directory change.
>  # The next time the replica catches up, the broker briefly blocks appends 
> and promotes the replica. However, before the promotion is attempted, 
> directory A fails.
>  # The controller was informed that P in now in directory B before it 
> received the notification that directory A has failed, so it does not elect a 
> new leader, and as long as the broker is online, partition A remains 
> unavailable.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16297) Race condition while promoting future replica can lead to partition unavailability.

2024-03-27 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16297?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-16297:

Component/s: jbod

> Race condition while promoting future replica can lead to partition 
> unavailability.
> ---
>
> Key: KAFKA-16297
> URL: https://issues.apache.org/jira/browse/KAFKA-16297
> Project: Kafka
>  Issue Type: Sub-task
>  Components: jbod
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Major
> Fix For: 3.7.1
>
>
> KIP-858 proposed that when a directory failure occurs after changing the 
> assignment of a replica that's moved between two directories in the same 
> broker, but before the future replica promotion completes, the broker should 
> reassign the replica to inform the controller of its correct status. But this 
> hasn't yet been implemented, and without it this failure may lead to 
> indefinite partition unavailability.
> Example scenario:
>  # A broker which leads partition P receives a request to alter the replica 
> from directory A to directory B.
>  # The broker creates a future replica in directory B and starts a replica 
> fetcher.
>  # Once the future replica first catches up, the broker queues a reassignment 
> to inform the controller of the directory change.
>  # The next time the replica catches up, the broker briefly blocks appends 
> and promotes the replica. However, before the promotion is attempted, 
> directory A fails.
>  # The controller was informed that P in now in directory B before it 
> received the notification that directory A has failed, so it does not elect a 
> new leader, and as long as the broker is online, partition A remains 
> unavailable.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16365) AssignmentsManager mismanages completion notifications

2024-03-27 Thread Igor Soarez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16365?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Igor Soarez updated KAFKA-16365:

Priority: Critical  (was: Major)

> AssignmentsManager mismanages completion notifications
> --
>
> Key: KAFKA-16365
> URL: https://issues.apache.org/jira/browse/KAFKA-16365
> Project: Kafka
>  Issue Type: Sub-task
>  Components: jbod
>Affects Versions: 3.7.0
>Reporter: Igor Soarez
>Assignee: Igor Soarez
>Priority: Critical
> Fix For: 3.7.1
>
>
> When moving replicas between directories in the same broker, future replica 
> promotion hinges on acknowledgment from the controller of a change in the 
> directory assignment.
>  
> ReplicaAlterLogDirsThread relies on AssignmentsManager for a completion 
> notification of the directory assignment change.
>  
> In its current form, under certain assignment scheduling, AssignmentsManager 
> both miss completion notifications, or prematurely trigger them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


akatona84 commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1540748533


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,19 +38,57 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+val INDEFINITE: Int = -1
+
+def indefinitely(error: Errors): ErrorCount = {
+  ErrorCount(error, INDEFINITE)
+}
+  }
+
+  class ErrorQueue(initialErrorCounts: ErrorCount*) {
+private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ 
initialErrorCounts
+
+def takeError(): Errors = queue.synchronized {
+  while (queue.head.repeat == 0) {
+queue.dequeue()
+  }
+  if (queue.head.repeat > 0) {
+queue.head.repeat -= 1
+  }
+  queue.head.error
+}
+
+def peekError(): Errors = queue.synchronized {
+  queue.head.error
+}
+
+def clearProcessedError(): Unit = {
+  TestUtils.waitUntilTrue(() =>
+queue.synchronized {
+  queue.head.repeat == 0

Review Comment:
   I'm removing the INDEFINITE , empty queue will mean there's no error 
(Errors.NONE is returned)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Preventing running the :core tests twice when testing with coverage [kafka]

2024-03-27 Thread via GitHub


viktorsomogyi merged PR #15580:
URL: https://github.com/apache/kafka/pull/15580


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


viktorsomogyi commented on PR #15605:
URL: https://github.com/apache/kafka/pull/15605#issuecomment-2022430965

   @akatona84 I think this one exceeds the "minor" commit, I think you should 
create a jira ticket for this. Also, please add a description of your change 
that reveals the thinking behind your modifications.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Assigned] (KAFKA-16103) Review client logic for triggering offset commit callbacks

2024-03-27 Thread Lucas Brutschy (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lucas Brutschy reassigned KAFKA-16103:
--

Assignee: Lucas Brutschy  (was: Lianet Magrans)

> Review client logic for triggering offset commit callbacks
> --
>
> Key: KAFKA-16103
> URL: https://issues.apache.org/jira/browse/KAFKA-16103
> Project: Kafka
>  Issue Type: Sub-task
>  Components: clients, consumer
>Reporter: Lianet Magrans
>Assignee: Lucas Brutschy
>Priority: Critical
>  Labels: kip-848-client-support, offset
> Fix For: 3.8.0
>
>
> Review logic for triggering commit callbacks, ensuring that all callbacks are 
> triggered before returning from commitSync



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


soarez commented on PR #15605:
URL: https://github.com/apache/kafka/pull/15605#issuecomment-2022452207

   I think there is a JIRA already for this flaky test: 
[KAFKA-15915](https://issues.apache.org/jira/browse/KAFKA-15915)
   If this is correct, then you should update the PR title to refer to the JIRA.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16323) Failing test: fix testRemoteFetchExpiresPerSecMetric

2024-03-27 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831291#comment-17831291
 ] 

Chia-Ping Tsai commented on KAFKA-16323:


It seems to me the root cause could be the `spy` does not work well. We had met 
similar issue before - the spied method results in unexpected behavior when we 
test it in multi-threads. see https://github.com/apache/kafka/pull/10006

Hence, we can try to fix it by using `override` to replace spied method. for 
example: 

{code}
val latch = new CountDownLatch(1)
val remoteLogManager = new RemoteLogManager(
  remoteLogManagerConfig,
  0,
  TestUtils.tempRelativeDir("data").getAbsolutePath,
  "clusterId",
  time,
  _ => Optional.of(dummyLog),
  (TopicPartition, Long) => {},
  brokerTopicStats) {
  override def read(remoteStorageFetchInfo: RemoteStorageFetchInfo): 
FetchDataInfo = {
// wait until verification completes
latch.await(5000, TimeUnit.MILLISECONDS)
mock(classOf[FetchDataInfo])
  }
}
{code}


> Failing test: fix testRemoteFetchExpiresPerSecMetric 
> -
>
> Key: KAFKA-16323
> URL: https://issues.apache.org/jira/browse/KAFKA-16323
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Johnny Hsu
>Assignee: Johnny Hsu
>Priority: Major
>  Labels: test-failure
>
> Refer to 
> [https://ci-builds.apache.org/job/Kafka/job/kafka/job/trunk/2685/testReport/junit/kafka.server/ReplicaManagerTest/Build___JDK_21_and_Scala_2_13___testRemoteFetchExpiresPerSecMetric__/]
> This test is failing, and this ticket aims to address this 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15517: Improve MirrorMaker logging in case of authorization errors [kafka]

2024-03-27 Thread via GitHub


mimaison commented on code in PR #15558:
URL: https://github.com/apache/kafka/pull/15558#discussion_r1540866456


##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##
@@ -320,4 +322,19 @@ static void createCompactedTopic(String topicName, short 
partitions, short repli
 static void createSinglePartitionCompactedTopic(String topicName, short 
replicationFactor, Admin admin) {
 createCompactedTopic(topicName, (short) 1, replicationFactor, admin);
 }
+
+static  T adminCall(Callable callable, Supplier errMsg)
+throws ExecutionException, InterruptedException {
+try {
+return callable.call();
+} catch (ExecutionException | InterruptedException e) {
+if (e.getCause() instanceof TopicAuthorizationException ||

Review Comment:
   I think we also need to handle `GroupAuthorizationException` as this can be 
thrown by `listConsumerGroupOffsets()`



##
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java:
##
@@ -216,7 +217,10 @@ Set findConsumerGroups()
 
 Collection listConsumerGroups()
 throws InterruptedException, ExecutionException {
-return sourceAdminClient.listConsumerGroups().valid().get();
+return adminCall(
+() -> sourceAdminClient.listConsumerGroups().valid().get(),
+() -> "list consumer groups on cluster " + 
config.sourceClusterAlias()

Review Comment:
   Should we put the alias before `cluster`? so we get a message like `list 
consumer groups on source cluster` for example. I think it reads better than 
`list consumer groups on cluster source`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-27 Thread via GitHub


lucasbru commented on code in PR #15525:
URL: https://github.com/apache/kafka/pull/15525#discussion_r1540879732


##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ListOffsetsEvent.java:
##
@@ -25,22 +25,15 @@
 import java.util.Map;
 
 /**
- * Event for retrieving partition offsets by performing a
+ * Application Event for retrieving partition offsets by performing a
  * {@link org.apache.kafka.common.requests.ListOffsetsRequest 
ListOffsetsRequest}.
- * This event is created with a map of {@link TopicPartition} and target 
timestamps to search
- * offsets for. It is completed with the map of {@link TopicPartition} and
- * {@link OffsetAndTimestamp} found (offset of the first message whose 
timestamp is greater than
- * or equals to the target timestamp)
  */
-public class ListOffsetsEvent extends 
CompletableApplicationEvent> {
-
+public class ListOffsetsEvent extends 
CompletableApplicationEvent> {

Review Comment:
   I'm personally not concerned about having two events, because they are very 
simple. The alternative is to have a common code-path that carries a 
`requiresTimestamp` boolean to differentiate behavior again, which isn't really 
any simpler. But I agree there is a certain amount of code duplication here 
that we could eliminate using your approach @lianetm , so I'm not against it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-27 Thread via GitHub


lucasbru commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2022475971

   > @lucasbru - Thanks again for reviewing the PR. Sorry about the 
misinterpretation on short circuting logic so here I updated the 
beginningOrEndOffsets API. It seems like the right thing to do here is to still 
send out the request but return it immediately for zero timeout (a bit strange 
because it does throw timeout when time runs out which seems inconsistent).
   
   Yes, the behavior of the existing consumer is a bit curious, but it's not 
the only place where a zero duration is treated different from 0.01s. 
Either way, we probably have to do it this way for compatibility. This part 
looks good to me now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16272: Update connect_distributed_test.py to support KIP-848’s group protocol config [kafka]

2024-03-27 Thread via GitHub


lucasbru merged PR #15576:
URL: https://github.com/apache/kafka/pull/15576


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-27 Thread via GitHub


mimaison commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1540885105


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+public class KafkaConfig {
+
+/** * Log Configuration ***/
+public final static String NUM_PARTITIONS_PROP = "num.partitions";

Review Comment:
   Would it also make sense to colocate the `CONFIG` and `DOC` like we do in 
the other Config classes?



##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
+import org.apache.kafka.server.config.KafkaConfig._

Review Comment:
   I wonder if we should just import `KafkaConfig` and use `KAFKA.SOME_CONFIG` 
instead of importing all/statically KafkaConfig. WDYT?



##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+public class KafkaConfig {
+
+/** * Log Configuration ***/
+public final static String NUM_PARTITIONS_PROP = "num.partitions";

Review Comment:
   For consistency with the other *Configs classes, should we used the `CONFIG` 
suffix instead of `PROP`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [MINOR] Renaming the `Abortable_Transaction` error to `Transaction_Abortable` [kafka]

2024-03-27 Thread via GitHub


sjhajharia commented on PR #15609:
URL: https://github.com/apache/kafka/pull/15609#issuecomment-2022517844

   Thanks for the review @soarez 
   I have updated the variable names in the test.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1540968019


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+public class KafkaConfig {
+
+/** * Log Configuration ***/
+public final static String NUM_PARTITIONS_PROP = "num.partitions";

Review Comment:
   > For consistency with the other *Configs classes, should we used the CONFIG 
suffix instead of PROP?
   I thought about it but I wan't sure which would make it easier for people to 
review. 
   We also might need to do the same for LogCleaner configs as well. 
   
   > Would it also make sense to colocate the CONFIG and DOC like we do in the 
other Config classes?
   I considered this but I found it tricky for some docs that refer to more 
than 1 config in the string interpolation.  For example the `LOG_DIR_DOC` and 
`LOG_DIRS_DOC` refer to both `LOG_DIR_PROP` and `LOG_DIRS_PROP` same goes to 
`LOG_ROLL_TIME_HOURS_DOC` and `LOG_ROLL_TIME_MILLIS_DOC`. I thought this why 
they are grouped at the end after defining the properties names first. 



##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+public class KafkaConfig {
+
+/** * Log Configuration ***/
+public final static String NUM_PARTITIONS_PROP = "num.partitions";

Review Comment:
   > For consistency with the other *Configs classes, should we used the CONFIG 
suffix instead of PROP?
   
   I thought about it but I wan't sure which would make it easier for people to 
review. 
   We also might need to do the same for LogCleaner configs as well. 
   
   > Would it also make sense to colocate the CONFIG and DOC like we do in the 
other Config classes?
   
   I considered this but I found it tricky for some docs that refer to more 
than 1 config in the string interpolation.  For example the `LOG_DIR_DOC` and 
`LOG_DIRS_DOC` refer to both `LOG_DIR_PROP` and `LOG_DIRS_PROP` same goes to 
`LOG_ROLL_TIME_HOURS_DOC` and `LOG_ROLL_TIME_MILLIS_DOC`. I thought this why 
they are grouped at the end after defining the properties names first. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1540992038


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+public class KafkaConfig {
+
+/** * Log Configuration ***/
+public final static String NUM_PARTITIONS_PROP = "num.partitions";

Review Comment:
   I just pushed a commit to replace `PROP` with `CONFIG` suffix. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]

2024-03-27 Thread via GitHub


jeqo commented on PR #15588:
URL: https://github.com/apache/kafka/pull/15588#issuecomment-2022623732

   Sorry for also being later with the review. 
   Changes look good to me overall, though I'd also add a mention on these 
configs that: `log.roll` configs are ignored (as well as the mentioned 
`segment.bytes|ms`). 
   
   We could address this as part of KAFKA-16429 if you agree.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1540968019


##
server/src/main/java/org/apache/kafka/server/config/KafkaConfig.java:
##
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.TopicConfig;
+import static 
org.apache.kafka.server.config.ServerTopicConfigSynonyms.LOG_PREFIX;
+
+public class KafkaConfig {
+
+/** * Log Configuration ***/
+public final static String NUM_PARTITIONS_PROP = "num.partitions";

Review Comment:
   > For consistency with the other *Configs classes, should we used the CONFIG 
suffix instead of PROP?
   
   I thought about it but I wan't sure which would make it easier for people to 
review. 
   We also might need to do the same for LogCleaner configs as well. 
   
   > Would it also make sense to colocate the CONFIG and DOC like we do in the 
other Config classes?
   
   I considered this but I found it tricky for some docs that refer to more 
than 1 config in the string interpolation.  For example the `LOG_DIR_DOC` and 
`LOG_DIRS_DOC` refer each others prop e.g LOG_DIRS_DOC  refer to `LOG_DIR_PROP` 
and LOG_DIR_DOC  refers to `LOG_DIRS_PROP` same goes to 
`LOG_ROLL_TIME_HOURS_DOC` and `LOG_ROLL_TIME_MILLIS_DOC`. I thought this why 
they are grouped at the end after defining the properties names first. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig Replication properties and docs out of … [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on PR #15575:
URL: https://github.com/apache/kafka/pull/15575#issuecomment-2022624611

   @OmniaGM Can you, please, merge with latest trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]

2024-03-27 Thread via GitHub


brandboat commented on PR #15588:
URL: https://github.com/apache/kafka/pull/15588#issuecomment-2022630053

   Thank you @showuon , @jeqo . I will address the comments in the new JIRA 
https://issues.apache.org/jira/browse/KAFKA-16429


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-03-27 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831324#comment-17831324
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-16414:
--

As well, I'm +1 on including the active segment on size-based retention.

I guess this change of behavior does not require a KIP (?), but let's make sure 
it's mentioned/acknowledged on the Release notes so if there's any user relying 
on how the mix of retention by size and active segment rotation currently work, 
they will be aware when upgrading.

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16414) Inconsistent active segment expiration behavior between retention.ms and retention.bytes

2024-03-27 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831327#comment-17831327
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-16414:
--

Just got the new messages while posting my last comment.

As Kamal mentioned, Tiered Storage has been a feature where there were 
assumptions about how active segment rotation works and aligning these 
behaviors will potentially break some integration tests. This is why I was 
hesitating to suggest a KIP for this change.

> Inconsistent active segment expiration behavior between retention.ms and 
> retention.bytes
> 
>
> Key: KAFKA-16414
> URL: https://issues.apache.org/jira/browse/KAFKA-16414
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 3.6.1
>Reporter: Kuan Po Tseng
>Assignee: Kuan Po Tseng
>Priority: Major
>
> This is a follow up issue on KAFKA-16385.
> Currently, there's a difference between how retention.ms and retention.bytes 
> handle active segment expiration:
> - retention.ms always expire active segment when max segment timestamp 
> matches the condition.
> - retention.bytes only expire active segment when retention.bytes is 
> configured to zero.
> The behavior should be either rotate active segments for both retention 
> configurations or none at all.
> For more details, see
> https://issues.apache.org/jira/browse/KAFKA-16385?focusedCommentId=17829682&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17829682



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on PR #15075:
URL: https://github.com/apache/kafka/pull/15075#issuecomment-2022645115

   @chia7712 CI are ready and seems OK for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1541026665


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
+import org.apache.kafka.server.config.KafkaConfig._

Review Comment:
   For this one and maybe other that import few config from KafkaConfig this 
would be nice improvement but some classes like DynamicBrokerConfig for example 
include a large set of config. I can update few to use  `KAFKA.SOME_CONFIG` if 
they need small set of config and leave the others specially that we when we 
move them to java we will do this anyway. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1541032203


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
+import org.apache.kafka.server.config.KafkaConfig._

Review Comment:
   Also at the moment we can't use `KafkaConfig` everywhere as there's another 
`KafkaConfig` in scala 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1541026665


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
+import org.apache.kafka.server.config.KafkaConfig._

Review Comment:
   ~For this one and maybe other that import few config from KafkaConfig this 
would be nice improvement but some classes like DynamicBrokerConfig for example 
include a large set of config. I can update few to use  `KAFKA.SOME_CONFIG` if 
they need small set of config and leave the others specially that we when we 
move them to java we will do this anyway. ~ 
   Ignore this I got the comment wrong. At the moment we can't use KafkaConfig 
everywhere as there's another KafkaConfig in scala



##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
+import org.apache.kafka.server.config.KafkaConfig._

Review Comment:
   Also at the moment we can't use `KafkaConfig` everywhere as there's another 
`KafkaConfig` in scala 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1541026665


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
+import org.apache.kafka.server.config.KafkaConfig._

Review Comment:
   ~ For this one and maybe other that import few config from KafkaConfig this 
would be nice improvement but some classes like DynamicBrokerConfig for example 
include a large set of config. I can update few to use  `KAFKA.SOME_CONFIG` if 
they need small set of config and leave the others specially that we when we 
move them to java we will do this anyway. ~ 
   
   Ignore my previous comment I got the comment wrong. At the moment we can't 
use KafkaConfig everywhere as there's another KafkaConfig in scala



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16385: Enhance documentation for retention.ms and retention.bytes configurations [kafka]

2024-03-27 Thread via GitHub


chia7712 commented on PR #15588:
URL: https://github.com/apache/kafka/pull/15588#issuecomment-2022675724

   @jeqo I should merge code after getting your reviews :(


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1541026665


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
+import org.apache.kafka.server.config.KafkaConfig._

Review Comment:
For this one and maybe other that import few config from KafkaConfig 
this would be nice improvement but some classes like DynamicBrokerConfig for 
example include a large set of config. I can update few to use  
`KAFKA.SOME_CONFIG` if they need small set of config and leave the others 
specially that we when we move them to java we will do this anyway.
   
   Ignore my previous comment I got the comment wrong. At the moment we can't 
use KafkaConfig everywhere as there's another KafkaConfig in scala



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] refactor: improve the output information during partition directory m… [kafka]

2024-03-27 Thread via GitHub


KevinZTW opened a new pull request, #15610:
URL: https://github.com/apache/kafka/pull/15610

   Currently, when we using `kafka-reassign-partitions` to move the log 
directory, the output only indicates which replica's movement has successfully 
started. 
   
   This PR propose to show more detailed information, helping end users 
understand that the operation is proceeding as expected.
   
   For example, if I have a mv.json as below, which move `t3-0`, `t3-1` to 
different log directory
   
   ```json
   {
   "version":1,
   "partitions":[
   { 
"topic":"t3","partition":0,"replicas":[1],"log_dirs":["/var/lib/kafka/data/logs"]
 },
   { "topic":"t3","partition":1,"replicas":[1, 2, 
3],"log_dirs":["/var/lib/kafka/data/logs", "/var/lib/kafka/data/logs2", 
"/var/lib/kafka/data/logs2"] }
   ]
   }
   
   ```
   
   - origin output
   ```
   Successfully started partition reassignments for t3-0,t3-1
   Successfully started log directory moves for: t3-0-1,t3-1-1,t3-1-2,t3-1-3
   ```
   - proposed output
   ```
   Successfully started partition reassignments for t3-0,t3-1
   Successfully started moving log directory to /var/lib/kafka/data/logs for 
replica t3-0 with broker id: 1
   Successfully started moving log directory to /var/lib/kafka/data/logs for 
replica t3-1 with broker id: 1
   Successfully started moving log directory to /var/lib/kafka/data/logs2 for 
replica t3-1 with broker id: 2
   Successfully started moving log directory to /var/lib/kafka/data/logs2 for 
replica t3-1 with broker id: 3
   ```
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-27 Thread via GitHub


mimaison commented on code in PR #15569:
URL: https://github.com/apache/kafka/pull/15569#discussion_r1541117312


##
core/src/main/scala/kafka/server/ZkAdminManager.scala:
##
@@ -49,6 +49,7 @@ import 
org.apache.kafka.common.security.scram.internals.{ScramCredentialUtils, S
 import org.apache.kafka.common.utils.Sanitizer
 import org.apache.kafka.server.common.AdminOperationException
 import org.apache.kafka.server.config.{ConfigEntityName, ConfigType}
+import org.apache.kafka.server.config.KafkaConfig._

Review Comment:
   Ah right, that's a good point. Thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-15265) Remote copy/fetch quotas for tiered storage.

2024-03-27 Thread Satish Duggana (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831383#comment-17831383
 ] 

Satish Duggana commented on KAFKA-15265:


KIP-956 is approved. [~abhijeetkumar] is working on pushing the implementation 
to the trunk. 

> Remote copy/fetch quotas for tiered storage.
> 
>
> Key: KAFKA-15265
> URL: https://issues.apache.org/jira/browse/KAFKA-15265
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Assignee: Abhijeet Kumar
>Priority: Major
>
> Related KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]

2024-03-27 Thread via GitHub


chia7712 commented on PR #15075:
URL: https://github.com/apache/kafka/pull/15075#issuecomment-2022937094

   ```
   ./gradlew cleanTest :tools:test --tests 
GetOffsetShellTest.testTopicPatternArgWithPartitionsArg :trogdor:test --tests 
CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated :connect:mirror:test 
--tests MirrorConnectorsIntegrationExactlyOnceTest.testReplicateSourceDefault 
--tests 
MirrorConnectorsWithCustomForwardingAdminIntegrationTest.testReplicateSourceDefault
 :core:test --tests ReplicaManagerTest.testRemoteFetchExpiresPerSecMetric 
--tests LogDirFailureTest.testIOExceptionDuringCheckpoint :clients:test --tests 
KafkaAdminClientTest.testClientSideTimeoutAfterFailureToReceiveResponse
   ```
   failed tests can pass on my local, so I'm about to merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14588 ZK configuration moved to ZkConfig [kafka]

2024-03-27 Thread via GitHub


chia7712 merged PR #15075:
URL: https://github.com/apache/kafka/pull/15075


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-16432) KStreams: Joining KStreams and GlobalKTable requires a state store

2024-03-27 Thread Matej Sprysl (Jira)
Matej Sprysl created KAFKA-16432:


 Summary: KStreams: Joining KStreams and GlobalKTable requires a 
state store
 Key: KAFKA-16432
 URL: https://issues.apache.org/jira/browse/KAFKA-16432
 Project: Kafka
  Issue Type: Bug
Reporter: Matej Sprysl


h2.  Code:
{code:java}
final StreamsBuilder builder = new StreamsBuilder();

final GlobalKTable table = builder.globalTable("tableTopic", 
Consumed.with(Serdes.Long(), Serdes.Long()));

final KStream stream =
builder.stream(
Pattern.compile("streamTopic"),
Consumed.with(Serdes.ByteArray(), Message.SERDE));

(...some processing, no state store added...)

final var joiner = new MyJoiner();
final var keyMapper = new MyKeyMapper();

final KStream enriched =
messages
.join(table, keyMapper, joiner, Named.as("innerJoin"));{code}
h2. Error:
{code:java}
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
innerJoin has no access to StateStore tableTopic-STATE-STORE-00 as the 
store is not connected to the processor. If you add stores manually via 
'.addStateStore()' make sure to connect the added store to the processor by 
providing the processor name to '.addStateStore()' or connect them via 
'.connectProcessorAndStateStores()'. DSL users need to provide the store name 
to '.process()', '.transform()', or '.transformValues()' to connect the store 
to the corresponding operator, or they can provide a StoreBuilder by 
implementing the stores() method on the Supplier itself. If you do not add 
stores manually, please file a bug report at 
https://issues.apache.org/jira/projects/KAFKA.
{code}
h2. Additional notes:

This error happens when I try to join a KStreams instance with a GlobalKTable 
instance.

It is important to emphasize that I am not connecting any state store manually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: AbstractConfig cleanup [kafka]

2024-03-27 Thread via GitHub


mjsax commented on code in PR #15597:
URL: https://github.com/apache/kafka/pull/15597#discussion_r1541286490


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -58,6 +61,8 @@ public class AbstractConfig {
 
 private final ConfigDef definition;
 
+public static final String AUTOMATIC_CONFIG_PROVIDERS_PROPERTY = 
"org.apache.kafka.automatic.config.providers";

Review Comment:
   Looks like a public API change to me. Don't we need a KIP?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] MINOR: Remove redundant ApiVersionsResponse#filterApis [kafka]

2024-03-27 Thread via GitHub


brandboat opened a new pull request, #15611:
URL: https://github.com/apache/kafka/pull/15611

   The method `ApiVersionsResponse#filterApis(RecordVersion, 
ApiMessageType.ListenerType)` is only used in test, we can remove it and invoke 
the other one.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-16432) KStreams: Joining KStreams and GlobalKTable requires a state store

2024-03-27 Thread Matej Sprysl (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matej Sprysl updated KAFKA-16432:
-
Description: 
h2.  Code:
{code:java}
final StreamsBuilder builder = new StreamsBuilder();

final GlobalKTable table = builder.globalTable("tableTopic", 
Consumed.with(Serdes.Long(), Serdes.Long()));

final StreamsBuilder builder2 = new StreamsBuilder();

final KStream stream =     
builder2.stream(
Pattern.compile("streamTopic"),
Consumed.with(Serdes.ByteArray(), Message.SERDE));

(...some processing, no state store added...)

final var joiner = new MyJoiner();
final var keyMapper = new MyKeyMapper();

final KStream enriched =
messages
.join(table, keyMapper, joiner, Named.as("innerJoin"));{code}
h2. Error:
{code:java}
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
innerJoin has no access to StateStore tableTopic-STATE-STORE-00 as the 
store is not connected to the processor. If you add stores manually via 
'.addStateStore()' make sure to connect the added store to the processor by 
providing the processor name to '.addStateStore()' or connect them via 
'.connectProcessorAndStateStores()'. DSL users need to provide the store name 
to '.process()', '.transform()', or '.transformValues()' to connect the store 
to the corresponding operator, or they can provide a StoreBuilder by 
implementing the stores() method on the Supplier itself. If you do not add 
stores manually, please file a bug report at 
https://issues.apache.org/jira/projects/KAFKA.
{code}
h2. Additional notes:

This error happens when I try to join a KStreams instance with a GlobalKTable 
instance.

It is important to emphasize that I am not connecting any state store manually.

  was:
h2.  Code:
{code:java}
final StreamsBuilder builder = new StreamsBuilder();

final GlobalKTable table = builder.globalTable("tableTopic", 
Consumed.with(Serdes.Long(), Serdes.Long()));

final KStream stream =
builder.stream(
Pattern.compile("streamTopic"),
Consumed.with(Serdes.ByteArray(), Message.SERDE));

(...some processing, no state store added...)

final var joiner = new MyJoiner();
final var keyMapper = new MyKeyMapper();

final KStream enriched =
messages
.join(table, keyMapper, joiner, Named.as("innerJoin"));{code}
h2. Error:
{code:java}
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
innerJoin has no access to StateStore tableTopic-STATE-STORE-00 as the 
store is not connected to the processor. If you add stores manually via 
'.addStateStore()' make sure to connect the added store to the processor by 
providing the processor name to '.addStateStore()' or connect them via 
'.connectProcessorAndStateStores()'. DSL users need to provide the store name 
to '.process()', '.transform()', or '.transformValues()' to connect the store 
to the corresponding operator, or they can provide a StoreBuilder by 
implementing the stores() method on the Supplier itself. If you do not add 
stores manually, please file a bug report at 
https://issues.apache.org/jira/projects/KAFKA.
{code}
h2. Additional notes:

This error happens when I try to join a KStreams instance with a GlobalKTable 
instance.

It is important to emphasize that I am not connecting any state store manually.


> KStreams: Joining KStreams and GlobalKTable requires a state store
> --
>
> Key: KAFKA-16432
> URL: https://issues.apache.org/jira/browse/KAFKA-16432
> Project: Kafka
>  Issue Type: Bug
>Reporter: Matej Sprysl
>Priority: Major
>
> h2.  Code:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final GlobalKTable table = builder.globalTable("tableTopic", 
> Consumed.with(Serdes.Long(), Serdes.Long()));
> final StreamsBuilder builder2 = new StreamsBuilder();
> final KStream stream =     
> builder2.stream(
> Pattern.compile("streamTopic"),
> Consumed.with(Serdes.ByteArray(), Message.SERDE));
> (...some processing, no state store added...)
> final var joiner = new MyJoiner();
> final var keyMapper = new MyKeyMapper();
> final KStream enriched =
> messages
> .join(table, keyMapper, joiner, Named.as("innerJoin"));{code}
> h2. Error:
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
> innerJoin has no access to StateStore tableTopic-STATE-STORE-00 as 
> the store is not connected to the processor. If you add stores manually via 
> '.addStateStore()' make sure to connect the added store to the processor by 
> providing the processor name to '.addStateStore()' or connect them via 
> '.connectProcessorAndStateStores()'. DSL users need to provide the store name 
> to '.process()', '.transform()', or '.transformValues()' to connect the store 
> to the c

[jira] [Commented] (KAFKA-16432) KStreams: Joining KStreams and GlobalKTable requires a state store

2024-03-27 Thread Matej Sprysl (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16432?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831415#comment-17831415
 ] 

Matej Sprysl commented on KAFKA-16432:
--

Update:

The issue was probably caused by creating the table stream with a different 
builder. Closing.

> KStreams: Joining KStreams and GlobalKTable requires a state store
> --
>
> Key: KAFKA-16432
> URL: https://issues.apache.org/jira/browse/KAFKA-16432
> Project: Kafka
>  Issue Type: Bug
>Reporter: Matej Sprysl
>Priority: Major
>
> h2.  Code:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final GlobalKTable table = builder.globalTable("tableTopic", 
> Consumed.with(Serdes.Long(), Serdes.Long()));
> final KStream stream =
> builder.stream(
> Pattern.compile("streamTopic"),
> Consumed.with(Serdes.ByteArray(), Message.SERDE));
> (...some processing, no state store added...)
> final var joiner = new MyJoiner();
> final var keyMapper = new MyKeyMapper();
> final KStream enriched =
> messages
> .join(table, keyMapper, joiner, Named.as("innerJoin"));{code}
> h2. Error:
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
> innerJoin has no access to StateStore tableTopic-STATE-STORE-00 as 
> the store is not connected to the processor. If you add stores manually via 
> '.addStateStore()' make sure to connect the added store to the processor by 
> providing the processor name to '.addStateStore()' or connect them via 
> '.connectProcessorAndStateStores()'. DSL users need to provide the store name 
> to '.process()', '.transform()', or '.transformValues()' to connect the store 
> to the corresponding operator, or they can provide a StoreBuilder by 
> implementing the stores() method on the Supplier itself. If you do not add 
> stores manually, please file a bug report at 
> https://issues.apache.org/jira/projects/KAFKA.
> {code}
> h2. Additional notes:
> This error happens when I try to join a KStreams instance with a GlobalKTable 
> instance.
> It is important to emphasize that I am not connecting any state store 
> manually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Assigned] (KAFKA-15915) Flaky test - testUnrecoverableError - ProducerIdManagerTest

2024-03-27 Thread Andras Katona (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-15915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona reassigned KAFKA-15915:
-

Assignee: Andras Katona

> Flaky test - testUnrecoverableError - ProducerIdManagerTest
> ---
>
> Key: KAFKA-15915
> URL: https://issues.apache.org/jira/browse/KAFKA-15915
> Project: Kafka
>  Issue Type: Bug
>Reporter: Lucas Brutschy
>Assignee: Andras Katona
>Priority: Major
>  Labels: flaky-test
>
> Test intermittently gives the following result:
> {code}
> java.lang.UnsupportedOperationException: Success.failed
>   at scala.util.Success.failed(Try.scala:277)
>   at 
> kafka.coordinator.transaction.ProducerIdManagerTest.verifyFailure(ProducerIdManagerTest.scala:234)
>   at 
> kafka.coordinator.transaction.ProducerIdManagerTest.testUnrecoverableErrors(ProducerIdManagerTest.scala:199)
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Resolved] (KAFKA-16432) KStreams: Joining KStreams and GlobalKTable requires a state store

2024-03-27 Thread Matej Sprysl (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matej Sprysl resolved KAFKA-16432.
--
Resolution: Fixed

Fixed by using the same StreamsBuilder for both streams.

> KStreams: Joining KStreams and GlobalKTable requires a state store
> --
>
> Key: KAFKA-16432
> URL: https://issues.apache.org/jira/browse/KAFKA-16432
> Project: Kafka
>  Issue Type: Bug
>Reporter: Matej Sprysl
>Priority: Major
>
> h2.  Code:
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final GlobalKTable table = builder.globalTable("tableTopic", 
> Consumed.with(Serdes.Long(), Serdes.Long()));
> final KStream stream =
> builder.stream(
> Pattern.compile("streamTopic"),
> Consumed.with(Serdes.ByteArray(), Message.SERDE));
> (...some processing, no state store added...)
> final var joiner = new MyJoiner();
> final var keyMapper = new MyKeyMapper();
> final KStream enriched =
> messages
> .join(table, keyMapper, joiner, Named.as("innerJoin"));{code}
> h2. Error:
> {code:java}
> Caused by: org.apache.kafka.streams.errors.StreamsException: Processor 
> innerJoin has no access to StateStore tableTopic-STATE-STORE-00 as 
> the store is not connected to the processor. If you add stores manually via 
> '.addStateStore()' make sure to connect the added store to the processor by 
> providing the processor name to '.addStateStore()' or connect them via 
> '.connectProcessorAndStateStores()'. DSL users need to provide the store name 
> to '.process()', '.transform()', or '.transformValues()' to connect the store 
> to the corresponding operator, or they can provide a StoreBuilder by 
> implementing the stores() method on the Supplier itself. If you do not add 
> stores manually, please file a bug report at 
> https://issues.apache.org/jira/projects/KAFKA.
> {code}
> h2. Additional notes:
> This error happens when I try to join a KStreams instance with a GlobalKTable 
> instance.
> It is important to emphasize that I am not connecting any state store 
> manually.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] MINOR: AbstractConfig cleanup [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15597:
URL: https://github.com/apache/kafka/pull/15597#discussion_r1541313630


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -1502,13 +1502,23 @@ public static  Map filterMap(final Map map, final Predicate propsToMap(Properties properties) {
-Map map = new HashMap<>(properties.size());
-for (Map.Entry entry : properties.entrySet()) {
+return castToStringObjectMap(properties);
+}
+
+/**
+ * Cast a map with arbitrary type keys to be keyed on String.
+ * @param inputMap A map with unknown type keys
+ * @return A map with the same contents as the input map, but with String 
keys
+ * @throws ConfigException if any key is not a String
+ */
+public static Map castToStringObjectMap(Map 
inputMap) {

Review Comment:
   There is a similar function like this one in 
`AbstractConfigTest.convertPropertiesToMap` maybe we can drop the one in 
`AbstractConfigTest`



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -925,10 +926,13 @@ static Map adminConfigs(String connName,
 // Ignore configs that begin with "admin." since those will be added 
next (with the prefix stripped)
 // and those that begin with "producer." and "consumer.", since we 
know they aren't intended for
 // the admin client
+// Also ignore the config.providers configurations because the 
worker-configured ConfigProviders should
+// already have been evaluated via the trusted WorkerConfig constructor
 Map nonPrefixedWorkerConfigs = 
config.originals().entrySet().stream()
 .filter(e -> !e.getKey().startsWith("admin.")
 && !e.getKey().startsWith("producer.")
-&& !e.getKey().startsWith("consumer."))
+&& !e.getKey().startsWith("consumer.")
+&& 
!e.getKey().startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG))

Review Comment:
   Small suggestion, as the list of config isn't that huge then I would suggest 
refactoring this to something similar to the following 
   ```
   Stream prefixes = Stream.of("admin.", "producer.", "consumer.", 
AbstractConfig.CONFIG_PROVIDERS_CONFIG);
   Map nonPrefixedWorkerConfigs = 
config.originals().entrySet().stream()
   .filter(e -> prefixes.allMatch(s -> 
!e.getKey().startsWith(s)))
   .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
   ```
   



##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -580,8 +601,15 @@ private Map 
instantiateConfigProviders(Map 
instantiateConfigProviders(Map providerClassName = 
Optional.ofNullable(indirectConfigs.get(providerClass));
   Boolean isAllowed = providerClassName.map(name -> 
classNameFilter.test(name)).orElse(false);
   if (isAllowed) {
   providerMap.put(provider, providerClassName.get());
   } else {
   throw new ConfigException(providerClassName + " is not 
allowed. Update System property '"
   + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow 
" + providerClassName);
   }
 ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15597:
URL: https://github.com/apache/kafka/pull/15597#discussion_r1541293009


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -580,8 +601,15 @@ private Map 
instantiateConfigProviders(Map providerClassName = 
Optional.ofNullable(indirectConfigs.get(providerClass));
   Boolean isAllowed = providerClassName.map(name -> 
classNameFilter.test(name)).orElse(false);
   
   
   
   
   
   if (isAllowed) {
   providerMap.put(provider, providerClassName.get());
   } else {
   throw new ConfigException(providerClassName + " is not allowed. Update 
System property '"
   + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + 
providerClassName);
 }
 ```



##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -580,8 +601,15 @@ private Map 
instantiateConfigProviders(Map providerClassName = 
Optional.ofNullable(indirectConfigs.get(providerClass));
   Boolean isAllowed = providerClassName.map(name -> 
classNameFilter.test(name)).orElse(false);
   if (isAllowed) {
   providerMap.put(provider, providerClassName.get());
   } else {
   throw new ConfigException(providerClassName + " is not allowed. Update 
System property '"
   + AUTOMATIC_CONFIG_PROVIDERS_PROPERTY + "' to allow " + 
providerClassName);
 }
 ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15597:
URL: https://github.com/apache/kafka/pull/15597#discussion_r1541328444


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -925,10 +926,13 @@ static Map adminConfigs(String connName,
 // Ignore configs that begin with "admin." since those will be added 
next (with the prefix stripped)
 // and those that begin with "producer." and "consumer.", since we 
know they aren't intended for
 // the admin client
+// Also ignore the config.providers configurations because the 
worker-configured ConfigProviders should
+// already have been evaluated via the trusted WorkerConfig constructor
 Map nonPrefixedWorkerConfigs = 
config.originals().entrySet().stream()
 .filter(e -> !e.getKey().startsWith("admin.")
 && !e.getKey().startsWith("producer.")
-&& !e.getKey().startsWith("consumer."))
+&& !e.getKey().startsWith("consumer.")
+&& 
!e.getKey().startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG))

Review Comment:
   Small suggestion, as the list of config isn't that huge then I would suggest 
refactoring this to something similar to the following 
   ```
   Stream prefixes = Stream.of("admin.", "producer.", "consumer.", 
AbstractConfig.CONFIG_PROVIDERS_CONFIG);
   Map nonPrefixedWorkerConfigs = 
config.originals().entrySet().stream()
   .filter(e -> prefixes.allMatch(s -> !e.getKey().startsWith(s)))
   .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


akatona84 commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1541357369


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,19 +38,57 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+val INDEFINITE: Int = -1
+
+def indefinitely(error: Errors): ErrorCount = {
+  ErrorCount(error, INDEFINITE)
+}
+  }
+
+  class ErrorQueue(initialErrorCounts: ErrorCount*) {
+private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ 
initialErrorCounts
+
+def takeError(): Errors = queue.synchronized {
+  while (queue.head.repeat == 0) {
+queue.dequeue()
+  }
+  if (queue.head.repeat > 0) {
+queue.head.repeat -= 1
+  }
+  queue.head.error
+}
+
+def peekError(): Errors = queue.synchronized {
+  queue.head.error
+}
+
+def clearProcessedError(): Unit = {
+  TestUtils.waitUntilTrue(() =>
+queue.synchronized {
+  queue.head.repeat == 0
+}, "error wasn't processed")
+  queue.synchronized {
+queue.dequeue()

Review Comment:
   no, but doesn't matter any more. I was able to get rid of the 
maybeRequestNextBlock override completely and the whole "mocking" became much 
more easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16406 [2] : Split consumer commit tests [kafka]

2024-03-27 Thread via GitHub


lianetm commented on PR #15612:
URL: https://github.com/apache/kafka/pull/15612#issuecomment-2023088189

   @lucasbru this is one more last split that I find would make sense. With 
this, all the tests left in the `PlainTextConsumer` do not seem to belong to 
any sensible group other than the generic/misc one seen as the PlainText 
itself. Could you take a look if you have a chance? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


soarez commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1541375369


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+def once(error: Errors): ErrorCount = {
+  ErrorCount(error, 1)
+}
+  }
+
+  class ErrorQueue(initialErrorCounts: ErrorCount*) {
+private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ 
initialErrorCounts
+
+def takeError(): Errors = queue.synchronized {
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  while (queue.nonEmpty && queue.head.repeat == 0) {
+queue.dequeue()
+  }
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  if (queue.head.repeat > 0) {
+queue.head.repeat -= 1
+  }
+  queue.head.error

Review Comment:
   Should the error be immediately dequeued once it's taken?



##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+def once(error: Errors): ErrorCount = {
+  ErrorCount(error, 1)
+}
+  }

Review Comment:
   Do we need an error count / `repeat`? It seems the tests don't make use of 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15729: Add KRaft support in GetOffsetShellTest [kafka]

2024-03-27 Thread via GitHub


chia7712 commented on PR #15489:
URL: https://github.com/apache/kafka/pull/15489#issuecomment-2023109755

   @Owen-CH-Leung The `printUsageAndExit` call `exit(1)` so `KRAFT` and 
`CO_KRAFT` will stop the JVM. `ZK` can capture the exit code to throw exception 
so it does not terminate the JVM.
   
   Hence, a simple solution is - set dumb exit procedure for `testPrintHelp`. 
For example:
   ```java
   @ClusterTest
   public void testPrintHelp() {
   Exit.setExitProcedure((statusCode, message) -> { });
   try {
   String out = ToolsTestUtils.captureStandardErr(() -> 
GetOffsetShell.mainNoExit("--help"));
   assertTrue(out.startsWith(GetOffsetShell.USAGE_TEXT));
   } finally {
   Exit.resetExitProcedure();
   }
   }
   ``` 
   WDYT?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


akatona84 commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1541378249


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+def once(error: Errors): ErrorCount = {
+  ErrorCount(error, 1)
+}
+  }

Review Comment:
   as I'm simplifying the whole stuff can be simplified as well :D thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] KAFKA-16103: commitSync should await pending async commits [kafka]

2024-03-27 Thread via GitHub


lucasbru opened a new pull request, #15613:
URL: https://github.com/apache/kafka/pull/15613

   The javadoc for `KafkaConsumer.commitSync` says:
   
   > Note that asynchronous offset commits sent previously with the {@link 
#commitAsync(OffsetCommitCallback)}
   > (or similar) are guaranteed to have their callbacks invoked prior to 
completion of this method.
   
   This is not always true, neither for the legacy consumer nor for the async
   consumer. This change proposed a number of fixes related to this
   guarantee:
   
 - In the legacy consumer, we're also awaiting async commits that are
   "pending" instead of "in-flight", because we do not know the
   coordinator yet.
 - In the new consumer, we keep track of the incomplete async commit
   futures and wait for them to complete before returning from 
   `commitSync`.
 - Since we need to block to make sure that our previous commits are 
   completed, we allow the consumer to wake up. This is implemented
   but we are leaving it unresolved in the legacy consumer.
   
   ## Testing
   
   A new integration test
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


akatona84 commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1541384909


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+def once(error: Errors): ErrorCount = {
+  ErrorCount(error, 1)
+}
+  }
+
+  class ErrorQueue(initialErrorCounts: ErrorCount*) {
+private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ 
initialErrorCounts
+
+def takeError(): Errors = queue.synchronized {
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  while (queue.nonEmpty && queue.head.repeat == 0) {
+queue.dequeue()
+  }
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  if (queue.head.repeat > 0) {
+queue.head.repeat -= 1
+  }
+  queue.head.error

Review Comment:
   i just removed the maybe request next block and it required peeking into it, 
but since it's gone, no need to have the counters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15915: Flaky ProducerIdManagerTest error injection fix [kafka]

2024-03-27 Thread via GitHub


akatona84 commented on code in PR #15605:
URL: https://github.com/apache/kafka/pull/15605#discussion_r1541384909


##
core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala:
##
@@ -38,22 +38,49 @@ import org.mockito.Mockito.{mock, when}
 import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.mutable
-import scala.util.{Failure, Success}
+import scala.util.{Failure, Success, Try}
 
 class ProducerIdManagerTest {
 
   var brokerToController: NodeToControllerChannelManager = 
mock(classOf[NodeToControllerChannelManager])
   val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient])
 
+  case class ErrorCount(error: Errors, var repeat: Int)
+
+  object ErrorCount {
+def once(error: Errors): ErrorCount = {
+  ErrorCount(error, 1)
+}
+  }
+
+  class ErrorQueue(initialErrorCounts: ErrorCount*) {
+private val queue: mutable.Queue[ErrorCount] = mutable.Queue.empty ++ 
initialErrorCounts
+
+def takeError(): Errors = queue.synchronized {
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  while (queue.nonEmpty && queue.head.repeat == 0) {
+queue.dequeue()
+  }
+  if (queue.isEmpty) {
+return Errors.NONE
+  }
+  if (queue.head.repeat > 0) {
+queue.head.repeat -= 1
+  }
+  queue.head.error

Review Comment:
   i just removed the maybe request next block and it required peeking into it, 
but since it's gone, no need to have the counters either.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


OmniaGM commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541382798


##
server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+
+public class AclEntry extends AccessControlEntry {
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+public static final KafkaPrincipal WILDCARD_PRINCIPAL = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
+public static final String WILDCARD_PRINCIPAL_STRING = 
WILDCARD_PRINCIPAL.toString();
+public static final String WILDCARD_HOST = "*";
+public static final String WILDCARD_RESOURCE = 
ResourcePattern.WILDCARD_RESOURCE;
+public static final String RESOURCE_SEPARATOR = ":";
+public static final Set RESOURCE_TYPES = 
Arrays.stream(ResourceType.values())
+.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY))
+.collect(Collectors.toSet());
+public static final Set ACL_OPERATIONS = 
Arrays.stream(AclOperation.values())
+.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY))
+.collect(Collectors.toSet());
+
+private static final String PRINCIPAL_KEY = "principal";
+private static final String PERMISSION_TYPE_KEY = "permissionType";
+private static final String OPERATION_KEY = "operation";
+private static final String HOSTS_KEY = "host";
+public static final String VERSION_KEY = "version";
+public static final int CURRENT_VERSION = 1;
+private static final String ACLS_KEY = "acls";
+
+public final AccessControlEntry ace;
+public final KafkaPrincipal kafkaPrincipal;
+
+public AclEntry(AccessControlEntry ace) {
+super(ace.principal(), ace.host(), ace.operation(), 
ace.permissionType());
+this.ace = ace;
+
+kafkaPrincipal = ace.principal() == null
+? null
+: SecurityUtils.parseKafkaPrincipal(ace.principal());
+}
+
+public static AclEntry apply(KafkaPrincipal principal,
+ AclPermissionType permissionType,
+ String host,
+ AclOperation operation) {
+return new AclEntry(new AccessControlEntry(principal == null ? null :

Re: [PR] KAFKA-16323: fix testRemoteFetchExpiresPerSecMetric [kafka]

2024-03-27 Thread via GitHub


johnnychhsu commented on code in PR #15463:
URL: https://github.com/apache/kafka/pull/15463#discussion_r1541419703


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4164,16 +4164,13 @@ class ReplicaManagerTest {
 mock(classOf[FetchDataInfo])
   }).when(spyRLM).read(any())
 
-  // Get the current type=DelayedRemoteFetchMetrics,name=ExpiresPerSec 
metric value before fetching
-  val curExpiresPerSec = 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long]
+  val curExpiresPerSec = 
DelayedRemoteFetchMetrics.expiredRequestMeter.count()
   replicaManager.fetchMessages(params, Seq(tidp0 -> new 
PartitionData(topicId, fetchOffset, 0, 10, 
Optional.of[Integer](leaderEpoch), Optional.of[Integer](leaderEpoch))), 
UnboundedQuota, fetchCallback)
   // advancing the clock to expire the delayed remote fetch
   timer.advanceClock(2000L)
 
-  // verify the metric value is incremented since the delayed remote fetch 
is expired
-  TestUtils.waitUntilTrue(() => curExpiresPerSec + 1 == 
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long],
-"The ExpiresPerSec value is not incremented. Current value is: " +
-  
safeYammerMetricValue("type=DelayedRemoteFetchMetrics,name=ExpiresPerSec").asInstanceOf[Long])
+  // verify the DelayedRemoteFetchMetrics.expiredRequestMeter.mark is 
called since the delayed remote fetch is expired
+  assertEquals(curExpiresPerSec + 1, 
DelayedRemoteFetchMetrics.expiredRequestMeter.count())

Review Comment:
   ah i see. thanks for the comment @showuon ! 
   let me address that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] MINOR: AbstractConfig cleanup [kafka]

2024-03-27 Thread via GitHub


gharris1727 commented on code in PR #15597:
URL: https://github.com/apache/kafka/pull/15597#discussion_r1541420383


##
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java:
##
@@ -580,8 +601,15 @@ private Map 
instantiateConfigProviders(Map

[jira] [Commented] (KAFKA-15265) Remote copy/fetch quotas for tiered storage.

2024-03-27 Thread Henry Cai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-15265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831438#comment-17831438
 ] 

Henry Cai commented on KAFKA-15265:
---

Thanks.  [~abhijeetkumar] Do you have a rough timeline when the PR will be 
ready?  Do you need other people to contribute to the work?

> Remote copy/fetch quotas for tiered storage.
> 
>
> Key: KAFKA-15265
> URL: https://issues.apache.org/jira/browse/KAFKA-15265
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Reporter: Satish Duggana
>Assignee: Abhijeet Kumar
>Priority: Major
>
> Related KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-956+Tiered+Storage+Quotas



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-16217: Stop the abort transaction try loop when closing producers [kafka]

2024-03-27 Thread via GitHub


CalvinConfluent commented on PR #15541:
URL: https://github.com/apache/kafka/pull/15541#issuecomment-2023173614

   @kirktrue @jolshan Anything else we need to address for this ticket?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-14133: Move StreamTaskTest to Mockito [kafka]

2024-03-27 Thread via GitHub


clolov commented on PR #14716:
URL: https://github.com/apache/kafka/pull/14716#issuecomment-2023192543

   Hello @cadonna! This ought to be one of the last PRs for the migration. I 
will circle back tomorrow morning to rebase it since it has been out for a long 
time


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15853: Move KafkaConfig log properties and docs out of core [kafka]

2024-03-27 Thread via GitHub


mimaison commented on PR #15569:
URL: https://github.com/apache/kafka/pull/15569#issuecomment-2023212508

   Now that https://github.com/apache/kafka/pull/15075 got merged, this needs 
rebasing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16406 [2] : Split consumer commit tests [kafka]

2024-03-27 Thread via GitHub


lianetm commented on code in PR #15612:
URL: https://github.com/apache/kafka/pull/15612#discussion_r1541452355


##
core/src/test/scala/integration/kafka/api/PlaintextConsumerCommitTest.scala:
##
@@ -0,0 +1,320 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+  * License. You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+  * specific language governing permissions and limitations under the License.
+  */
+package kafka.api
+
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.test.MockConsumerInterceptor
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Timeout
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+
+import java.time.Duration
+import java.util
+import java.util.Optional
+import java.util.stream.Stream
+import scala.jdk.CollectionConverters._
+
+/**
+ * Integration tests for the consumer that covers the logic related to 
committing offsets.
+ */
+@Timeout(600)
+class PlaintextConsumerCommitTest extends AbstractConsumerTest {
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAutoCommitOnClose(quorum: String, groupProtocol: String): Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+val consumer = createConsumer()
+
+val numRecords = 1
+val producer = createProducer()
+sendRecords(producer, numRecords, tp)
+
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+// should auto-commit sought positions before closing
+consumer.seek(tp, 300)
+consumer.seek(tp2, 500)
+consumer.close()
+
+// now we should see the committed positions from another consumer
+val anotherConsumer = createConsumer()
+assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(500, 
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAutoCommitOnCloseAfterWakeup(quorum: String, groupProtocol: String): 
Unit = {
+this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+val consumer = createConsumer()
+
+val numRecords = 1
+val producer = createProducer()
+sendRecords(producer, numRecords, tp)
+
+consumer.subscribe(List(topic).asJava)
+awaitAssignment(consumer, Set(tp, tp2))
+
+// should auto-commit sought positions before closing
+consumer.seek(tp, 300)
+consumer.seek(tp2, 500)
+
+// wakeup the consumer before closing to simulate trying to break a poll
+// loop from another thread
+consumer.wakeup()
+consumer.close()
+
+// now we should see the committed positions from another consumer
+val anotherConsumer = createConsumer()
+assertEquals(300, anotherConsumer.committed(Set(tp).asJava).get(tp).offset)
+assertEquals(500, 
anotherConsumer.committed(Set(tp2).asJava).get(tp2).offset)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitMetadata(quorum: String, groupProtocol: String): Unit = {
+val consumer = createConsumer()
+consumer.assign(List(tp).asJava)
+
+// sync commit
+val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
+consumer.commitSync(Map((tp, syncMetadata)).asJava)
+assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp))
+
+// async commit
+val asyncMetadata = new OffsetAndMetadata(10, "bar")
+sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata)))
+assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp))
+
+// handle null metadata
+val nullMetadata = new OffsetAndMetadata(5, null)
+consumer.commitSync(Map(tp -> nullMetadata).asJava)
+

Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541471761


##
server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+
+public class AclEntry extends AccessControlEntry {
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+public static final KafkaPrincipal WILDCARD_PRINCIPAL = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
+public static final String WILDCARD_PRINCIPAL_STRING = 
WILDCARD_PRINCIPAL.toString();
+public static final String WILDCARD_HOST = "*";
+public static final String WILDCARD_RESOURCE = 
ResourcePattern.WILDCARD_RESOURCE;
+public static final String RESOURCE_SEPARATOR = ":";
+public static final Set RESOURCE_TYPES = 
Arrays.stream(ResourceType.values())
+.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY))
+.collect(Collectors.toSet());
+public static final Set ACL_OPERATIONS = 
Arrays.stream(AclOperation.values())
+.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY))
+.collect(Collectors.toSet());
+
+private static final String PRINCIPAL_KEY = "principal";
+private static final String PERMISSION_TYPE_KEY = "permissionType";
+private static final String OPERATION_KEY = "operation";
+private static final String HOSTS_KEY = "host";
+public static final String VERSION_KEY = "version";
+public static final int CURRENT_VERSION = 1;
+private static final String ACLS_KEY = "acls";
+
+public final AccessControlEntry ace;
+public final KafkaPrincipal kafkaPrincipal;
+
+public AclEntry(AccessControlEntry ace) {
+super(ace.principal(), ace.host(), ace.operation(), 
ace.permissionType());
+this.ace = ace;
+
+kafkaPrincipal = ace.principal() == null
+? null
+: SecurityUtils.parseKafkaPrincipal(ace.principal());
+}
+
+public static AclEntry apply(KafkaPrincipal principal,
+ AclPermissionType permissionType,
+ String host,
+ AclOperation operation) {
+return new AclEntry(new AccessControlEntry(principal == null ? null 

Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541472627


##
server/src/main/java/org/apache/kafka/security/CredentialProvider.java:
##
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security;
+
+import org.apache.kafka.common.security.authenticator.CredentialCache;
+import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+
+import java.util.Collection;
+import java.util.Properties;
+
+public class CredentialProvider {
+private final Collection scramMechanisms;

Review Comment:
   Nice catch. Thanks. Field removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541475816


##
core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala:
##
@@ -183,12 +184,13 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
 
 //with includeAuthorizedOperations flag
 topicResult = getTopicMetadata(client, topic, new 
DescribeTopicsOptions().includeAuthorizedOperations(true))
-expectedOperations = 
AclEntry.supportedOperations(ResourceType.TOPIC).asJava
+expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC)
 assertEquals(expectedOperations, topicResult.authorizedOperations)
   }
 
+  @nowarn("cat=deprecation")
   def configuredClusterPermissions: Set[AclOperation] =
-AclEntry.supportedOperations(ResourceType.CLUSTER)
+
JavaConverters.asScalaSet(AclEntry.supportedOperations(ResourceType.CLUSTER)).toSet

Review Comment:
   AFAIK `asScala` not exists in scala-2.12 which we use to build kafka.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831460#comment-17831460
 ] 

Johnny Hsu commented on KAFKA-16310:


The update is in below section:

 

## When the TimestampType is LOG_APPEND_TIME

When the TimestampType is LOG_APPEND_TIME, the timestamp of the records are the 
same. In this case, we should choose the offset of the first record. [This 
path|https://github.com/apache/kafka/blob/6f38fe5e0a6e2fe85fec7cb9adc379061d35ce45/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L294]
 in LogValidator was added to handle this case for non-compressed type, while 
[this 
path|https://github.com/apache/kafka/blob/6f38fe5e0a6e2fe85fec7cb9adc379061d35ce45/storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java#L421]
 in LogValidator was added to handle this case for compressed type.  

I don't have the Confluence account yet, [~chia7712] would you please help 
update the KIP in the wiki? I will send this update to the dev thread for 
visibility. Thanks! 

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


ijuma commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541483681


##
core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala:
##
@@ -183,12 +184,13 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
 
 //with includeAuthorizedOperations flag
 topicResult = getTopicMetadata(client, topic, new 
DescribeTopicsOptions().includeAuthorizedOperations(true))
-expectedOperations = 
AclEntry.supportedOperations(ResourceType.TOPIC).asJava
+expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC)
 assertEquals(expectedOperations, topicResult.authorizedOperations)
   }
 
+  @nowarn("cat=deprecation")
   def configuredClusterPermissions: Set[AclOperation] =
-AclEntry.supportedOperations(ResourceType.CLUSTER)
+
JavaConverters.asScalaSet(AclEntry.supportedOperations(ResourceType.CLUSTER)).toSet

Review Comment:
   You can use CollectionConverters.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16156: beginningOrEndOffsets does not need to build an OffsetAndTimestamps object upon completion [kafka]

2024-03-27 Thread via GitHub


philipnee commented on PR #15525:
URL: https://github.com/apache/kafka/pull/15525#issuecomment-2023293507

   @lucasbru - If I'm not mistaken, the current implementation for both 
beginningOrEndOffsets and OffsetsForTimes both need to send out a request upon 
getting ZERO duration.  Seems like both code paths are invoking this logic
   ```
   // if timeout is set to zero, do not try to poll the network 
client at all
   // and return empty immediately; otherwise try to get the 
results synchronously
   // and throw timeout exception if it cannot complete in time
   if (timer.timeoutMs() == 0L)
   return result;
   ```
   
   But the offsets for time seems to shortcircuit it here:
   ```
   // If timeout is set to zero return empty immediately; otherwise 
try to get the results
   // and throw timeout exception if it cannot complete in time.
   if (timeout.toMillis() == 0L)
   return listOffsetsEvent.emptyResult();
   
   return applicationEventHandler.addAndGet(listOffsetsEvent, 
timer);
   ```
   
   I'll create a ticket to align the behavior of these two APIs in the new 
consumers.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541492785


##
server/src/main/java/org/apache/kafka/security/authorizer/AclEntry.java:
##
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.security.authorizer;
+
+import org.apache.kafka.common.acl.AccessControlEntry;
+import org.apache.kafka.common.acl.AclOperation;
+import org.apache.kafka.common.acl.AclPermissionType;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.resource.ResourcePattern;
+import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.server.util.Json;
+import org.apache.kafka.server.util.json.DecodeJson;
+import org.apache.kafka.server.util.json.JsonObject;
+import org.apache.kafka.server.util.json.JsonValue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.acl.AclOperation.ALTER;
+import static org.apache.kafka.common.acl.AclOperation.ALTER_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.CLUSTER_ACTION;
+import static org.apache.kafka.common.acl.AclOperation.CREATE;
+import static org.apache.kafka.common.acl.AclOperation.CREATE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.DELETE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_CONFIGS;
+import static org.apache.kafka.common.acl.AclOperation.DESCRIBE_TOKENS;
+import static org.apache.kafka.common.acl.AclOperation.IDEMPOTENT_WRITE;
+import static org.apache.kafka.common.acl.AclOperation.READ;
+import static org.apache.kafka.common.acl.AclOperation.WRITE;
+
+public class AclEntry extends AccessControlEntry {
+private static final DecodeJson.DecodeInteger INT = new 
DecodeJson.DecodeInteger();
+private static final DecodeJson.DecodeString STRING = new 
DecodeJson.DecodeString();
+
+public static final KafkaPrincipal WILDCARD_PRINCIPAL = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*");
+public static final String WILDCARD_PRINCIPAL_STRING = 
WILDCARD_PRINCIPAL.toString();
+public static final String WILDCARD_HOST = "*";
+public static final String WILDCARD_RESOURCE = 
ResourcePattern.WILDCARD_RESOURCE;
+public static final String RESOURCE_SEPARATOR = ":";
+public static final Set RESOURCE_TYPES = 
Arrays.stream(ResourceType.values())
+.filter(t -> !(t == ResourceType.UNKNOWN || t == ResourceType.ANY))
+.collect(Collectors.toSet());
+public static final Set ACL_OPERATIONS = 
Arrays.stream(AclOperation.values())
+.filter(t -> !(t == AclOperation.UNKNOWN || t == AclOperation.ANY))
+.collect(Collectors.toSet());
+
+private static final String PRINCIPAL_KEY = "principal";
+private static final String PERMISSION_TYPE_KEY = "permissionType";
+private static final String OPERATION_KEY = "operation";
+private static final String HOSTS_KEY = "host";
+public static final String VERSION_KEY = "version";
+public static final int CURRENT_VERSION = 1;
+private static final String ACLS_KEY = "acls";
+
+public final AccessControlEntry ace;
+public final KafkaPrincipal kafkaPrincipal;
+
+public AclEntry(AccessControlEntry ace) {
+super(ace.principal(), ace.host(), ace.operation(), 
ace.permissionType());
+this.ace = ace;
+
+kafkaPrincipal = ace.principal() == null
+? null
+: SecurityUtils.parseKafkaPrincipal(ace.principal());
+}
+
+public static AclEntry apply(KafkaPrincipal principal,
+ AclPermissionType permissionType,
+ String host,
+ AclOperation operation) {
+return new AclEntry(new AccessControlEntry(principal == null ? null 

[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831463#comment-17831463
 ] 

Chia-Ping Tsai commented on KAFKA-16310:


@johnnyhsu thanks for offers the description. I add following statement to 
KIP-734 according to your comments.

{quote}

This returns the offset and timestamp corresponding to the record with the 
highest timestamp on the partition. Noted that we should choose the offset of 
the earliest record if the timestamp of the records are the same.

{quote}

WDYT?

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout

2024-03-27 Thread Philip Nee (Jira)
Philip Nee created KAFKA-16433:
--

 Summary: beginningOffsets and offsetsForTimes don't behave 
consistently when providing a zero timeout
 Key: KAFKA-16433
 URL: https://issues.apache.org/jira/browse/KAFKA-16433
 Project: Kafka
  Issue Type: Task
  Components: consumer
Reporter: Philip Nee
Assignee: Philip Nee


As documented here:[https://github.com/apache/kafka/pull/15525]

 

Both API should at least send out a request when zero timeout is provided.

 

This is corrected in the PR above.  We however still to fix the implementation 
for offsetsForTimes API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (KAFKA-16433) beginningOffsets and offsetsForTimes don't behave consistently when providing a zero timeout

2024-03-27 Thread Philip Nee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Philip Nee updated KAFKA-16433:
---
Labels: consumer-threading-refactor  (was: )

> beginningOffsets and offsetsForTimes don't behave consistently when providing 
> a zero timeout
> 
>
> Key: KAFKA-16433
> URL: https://issues.apache.org/jira/browse/KAFKA-16433
> Project: Kafka
>  Issue Type: Task
>  Components: consumer
>Reporter: Philip Nee
>Assignee: Philip Nee
>Priority: Blocker
>  Labels: consumer-threading-refactor
>
> As documented here:[https://github.com/apache/kafka/pull/15525]
>  
> Both API should at least send out a request when zero timeout is provided.
>  
> This is corrected in the PR above.  We however still to fix the 
> implementation for offsetsForTimes API.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831464#comment-17831464
 ] 

Johnny Hsu edited comment on KAFKA-16310 at 3/27/24 5:01 PM:
-

{quote}[~chia7712] thanks for the help!

This returns the offset and timestamp corresponding to the record with the 
highest timestamp on the partition. Noted that we should choose the offset of 
the earliest record if the timestamp of the records are the same.

This sounds good to me, thanks! {quote}


was (Author: JIRAUSER304478):
{quote}[~chia7712] thanks for the help!

This returns the offset and timestamp corresponding to the record with the 
highest timestamp on the partition. Noted that we should choose the offset of 
the earliest record if the timestamp of the records are the same.

This sounds good to me, thanks! 


{quote}

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-16310) ListOffsets doesn't report the offset with maxTimestamp anymore

2024-03-27 Thread Johnny Hsu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17831464#comment-17831464
 ] 

Johnny Hsu commented on KAFKA-16310:


{quote}[~chia7712] thanks for the help!

This returns the offset and timestamp corresponding to the record with the 
highest timestamp on the partition. Noted that we should choose the offset of 
the earliest record if the timestamp of the records are the same.

This sounds good to me, thanks! 


{quote}

> ListOffsets doesn't report the offset with maxTimestamp anymore
> ---
>
> Key: KAFKA-16310
> URL: https://issues.apache.org/jira/browse/KAFKA-16310
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 3.7.0
>Reporter: Emanuele Sabellico
>Assignee: Chia-Ping Tsai
>Priority: Blocker
> Fix For: 3.6.2, 3.8.0, 3.7.1
>
>
> Updated: This is confirmed a regression issue in v3.7.0. 
> The impact of this issue is that when there is a batch containing records 
> with timestamp not in order, the offset of the timestamp will be wrong.(ex: 
> the timestamp for t0 should be mapping to offset 10, but will get offset 12.. 
> etc). It'll cause the time index is putting the wrong offset, so the result 
> will be unexpected. 
> ===
> The last offset is reported instead.
> A test in librdkafka (0081/do_test_ListOffsets) is failing an it's checking 
> that the offset with the max timestamp is the middle one and not the last 
> one. The tests is passing with 3.6.0 and previous versions
> This is the test:
> [https://github.com/confluentinc/librdkafka/blob/a6d85bdbc1023b1a5477b8befe516242c3e182f6/tests/0081-admin.c#L4989]
>  
> there are three messages, with timestamps:
> {noformat}
> t0 + 100
> t0 + 400
> t0 + 250{noformat}
> and indices 0,1,2. 
> then a ListOffsets with RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP is done.
> it should return offset 1 but in 3.7.0 and trunk is returning offset 2
> Even after 5 seconds from producing it's still returning 2 as the offset with 
> max timestamp.
> ProduceRequest and ListOffsets were sent to the same broker (2), the leader 
> didn't change.
> {code:java}
> %7|1709134230.019|SEND|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ProduceRequest (v7, 
> 206 bytes @ 0, CorrId 2) %7|1709134230.020|RECV|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received ProduceResponse 
> (v7, 95 bytes, CorrId 2, rtt 1.18ms) 
> %7|1709134230.020|MSGSET|0081_admin#producer-3| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: 
> rdkafkatest_rnd22e8d8ec45b53f98_do_test_ListOffsets [0]: MessageSet with 3 
> message(s) (MsgId 0, BaseSeq -1) delivered {code}
> {code:java}
> %7|1709134235.021|SEND|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Sent ListOffsetsRequest 
> (v7, 103 bytes @ 0, CorrId 7) %7|1709134235.022|RECV|0081_admin#producer-2| 
> [thrd:localhost:39951/bootstrap]: localhost:39951/2: Received 
> ListOffsetsResponse (v7, 88 bytes, CorrId 7, rtt 0.54ms){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


Re: [PR] KAFKA-15899 Move kafka.security package from core to server module [kafka]

2024-03-27 Thread via GitHub


nizhikov commented on code in PR #15572:
URL: https://github.com/apache/kafka/pull/15572#discussion_r1541508048


##
core/src/test/scala/integration/kafka/api/BaseAdminIntegrationTest.scala:
##
@@ -183,12 +184,13 @@ abstract class BaseAdminIntegrationTest extends 
IntegrationTestHarness with Logg
 
 //with includeAuthorizedOperations flag
 topicResult = getTopicMetadata(client, topic, new 
DescribeTopicsOptions().includeAuthorizedOperations(true))
-expectedOperations = 
AclEntry.supportedOperations(ResourceType.TOPIC).asJava
+expectedOperations = AclEntry.supportedOperations(ResourceType.TOPIC)
 assertEquals(expectedOperations, topicResult.authorizedOperations)
   }
 
+  @nowarn("cat=deprecation")
   def configuredClusterPermissions: Set[AclOperation] =
-AclEntry.supportedOperations(ResourceType.CLUSTER)
+
JavaConverters.asScalaSet(AclEntry.supportedOperations(ResourceType.CLUSTER)).toSet

Review Comment:
   Hello @ijuma 
   
   Sorry, I didn't find a way to use `CollectionConverters` to convert java set 
to immutable scala set.
   Can you give me an example? 
   
   `AclEntry.supportedOperations(ResourceType.CLUSTER).asScala` which uses 
`CollectionConverters` returns `scala.collection.mutable.Set` while here we use 
`scala.collection.immutable.Set`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



  1   2   >