Re: [PR] KAFKA-16379: Coordinator event queue, processing, flush, purgatory time histograms [kafka]
dimitarndimitrov commented on code in PR #16949: URL: https://github.com/apache/kafka/pull/16949#discussion_r1753395793 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/HdrHistogram.java: ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import org.HdrHistogram.Histogram; +import org.HdrHistogram.Recorder; +import org.HdrHistogram.ValueRecorder; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * A wrapper on top of the HdrHistogram API. It handles writing to the histogram by delegating + * to an internal {@link ValueRecorder} implementation, and reading from the histogram by + * efficiently implementing the retrieval of up-to-date histogram data. + * + * Note that all APIs expect a timestamp which is used by the histogram to discard decaying data + * and determine when the snapshot from which the histogram metrics are calculated should be + * refreshed. + */ +public final class HdrHistogram { Review Comment: > Is HdrHistogram better than Yammer histogram? @junrao I think we have good evidence that it is. The included unit tests illustrate a bit how HdrHistogram is more accurate than the Yammer histogram, and JMH benchmarks point to significantly faster writes (I have concrete numbers from internal tests, but I'll try to include a JMH benchmark in the bugfix PR I'm preparing, and we can look at the results it gives us). > If it's better, why not use it more broadly on the server side? What's our recommendation for new developers who want to add histogram metrics on the server side? I personally would vouch for that. Part of the motivation for originally implementing this was seamlessly replacing the Yammer histogram usages in `RequestMetrics`, which can be noticeable in profiles in high-load scenarios, and using a more performant histogram could be a nice little win in such cases. All I'm saying is that we don't claim to have convinced the community already, and don't want to inadvertently undermine the other "new histogram" effort we were made aware of, before there's been a discussion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17520) align the low bound of ducktape version
Chia-Ping Tsai created KAFKA-17520: -- Summary: align the low bound of ducktape version Key: KAFKA-17520 URL: https://issues.apache.org/jira/browse/KAFKA-17520 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai There was a discussion for it: https://github.com/apache/kafka/pull/14216#issuecomment-2117519600 and I noticed this issue when trying to upgrade base image for e2e -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-15319: Upgrade rocksdb to fix CVE-2022-37434 [kafka]
chia7712 commented on PR #14216: URL: https://github.com/apache/kafka/pull/14216#issuecomment-2342917872 @lucasbru I open https://issues.apache.org/jira/browse/KAFKA-17520 for bumping the version. Please let me know if you have any concerns -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17520) align the low bound of ducktape version
[ https://issues.apache.org/jira/browse/KAFKA-17520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17880888#comment-17880888 ] Chia-Ping Tsai commented on KAFKA-17520: the required version in https://github.com/apache/kafka/blob/trunk/tests/setup.py#L54 is inconsistent to dockerfile > align the low bound of ducktape version > > > Key: KAFKA-17520 > URL: https://issues.apache.org/jira/browse/KAFKA-17520 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > There was a discussion for it: > https://github.com/apache/kafka/pull/14216#issuecomment-2117519600 > and I noticed this issue when trying to upgrade base image for e2e -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17520) align the low bound of ducktape version
[ https://issues.apache.org/jira/browse/KAFKA-17520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17880890#comment-17880890 ] Eric Chang commented on KAFKA-17520: May I take this issue ? > align the low bound of ducktape version > > > Key: KAFKA-17520 > URL: https://issues.apache.org/jira/browse/KAFKA-17520 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > There was a discussion for it: > https://github.com/apache/kafka/pull/14216#issuecomment-2117519600 > and I noticed this issue when trying to upgrade base image for e2e -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17290: Added Integration tests for ShareFetch and ShareAcknowledge APIs [kafka]
omkreddy merged PR #16916: URL: https://github.com/apache/kafka/pull/16916 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17347: Add missing client-metrics option to kafka-configs.sh [kafka]
AndrewJSchofield commented on code in PR #17046: URL: https://github.com/apache/kafka/pull/17046#discussion_r1753746159 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -620,6 +699,17 @@ private void deleteAndVerifyGroupConfigValue(Admin client, verifyGroupConfig(client, groupName, defaultConfigs); } +private void deleteAndVerifyClientMetricsConfigValue(Admin client, + String clientMetricsName, + Map defaultConfigs) throws Exception { Review Comment: Ah, there are multiple points here. Thanks for your patience. If you do `--describe --entity-type topics`, it lists the configs for all topics. If you do `--describe --entity-type client-metrics`, it lists configs for all client metrics resources. If you do `--describe --entity-type groups`, it would ideally list all of the group config resources, but there is no RPC capable of doing that so instead it lists the consumer groups with configs. The `--entity-default` situation is tricky. None of topics, groups or client-metrics works with `--entity-default`. Making it required to specify an entity name for these types would break the ability to list the configs for all resources. `--entity-default` should only be permitted for clients, users, ips and brokers, but there's no explicit check for this. I will revert my change for group with no entity name, and add a check for `--entity-default` to make sure it's only specified with entity types that genuinely support it. There is also the point about default values for the configs themselves. So, if I have a group G1 with just `consumer.heartbeat.interval.ms` defined, then this is the output: ``` $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type groups --entity-name G1 Dynamic configs for group G1 are: consumer.heartbeat.interval.ms=5000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.heartbeat.interval.ms=5000} $ bin/kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type groups --entity-name G1 --all All configs for group G1 are: consumer.heartbeat.interval.ms=5000 sensitive=false synonyms={DYNAMIC_GROUP_CONFIG:consumer.heartbeat.interval.ms=5000} consumer.session.timeout.ms=45000 sensitive=false synonyms={} ``` Currently, client-metrics does not behave the same (and it should). If a client-metrics resource defines a subset of the allowed configs, the `--all` does not display the default values for the missing configs. I'll address that in KAFKA-17516. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17048; Add docs for KIP-853 [kafka]
mimaison commented on code in PR #17076: URL: https://github.com/apache/kafka/pull/17076#discussion_r1753754828 ## docs/quickstart.html: ## @@ -32,8 +32,8 @@ the latest Kafka release and extract it: -$ tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz -$ cd kafka_{{scalaVersion}}-{{fullDotVersion}} +tar -xzf kafka_{{scalaVersion}}-{{fullDotVersion}}.tgz Review Comment: Why are we removing the `$` prompts? We use them everywhere in the documentation. It's useful to distinguish the commands from their outputs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17048; Add docs for KIP-853 [kafka]
mimaison commented on code in PR #17076: URL: https://github.com/apache/kafka/pull/17076#discussion_r1753776432 ## docs/ops.html: ## @@ -3776,25 +3776,71 @@ controller.quorum.voters=id1@host1:port1,id2@host2:port2,id3@host3:port3 + controller.quorum.bootstrap.servers=host1:port1,host2:port2,host3:port3 If a Kafka cluster has 3 controllers named controller1, controller2 and controller3, then controller1 may have the following configuration: process.roles=controller node.id=1 listeners=CONTROLLER://controller1.example.com:9093 -controller.quorum.voters=1...@controller1.example.com:9093,2...@controller2.example.com:9093,3...@controller3.example.com:9093 +controller.quorum.bootstrap.servers=controller1.example.com:9093,controller2.example.com:9093,controller3.example.com:9093 - Every broker and controller must set the controller.quorum.voters property. The node ID supplied in the controller.quorum.voters property must match the corresponding id on the controller servers. For example, on controller1, node.id must be set to 1, and so forth. Each node ID must be unique across all the servers in a particular cluster. No two servers can have the same node ID regardless of their process.roles values. + Every broker and controller must set the controller.quorum.bootstrap.servers property. - Storage Tool + Provisioning Nodes The kafka-storage.sh random-uuid command can be used to generate a cluster ID for your new cluster. This cluster ID must be used when formatting each server in the cluster with the kafka-storage.sh format command. This is different from how Kafka has operated in the past. Previously, Kafka would format blank storage directories automatically, and also generate a new cluster ID automatically. One reason for the change is that auto-formatting can sometimes obscure an error condition. This is particularly important for the metadata log maintained by the controller and broker servers. If a majority of the controllers were able to start with an empty log directory, a leader might be able to be elected with missing committed data. + Bootstrap a Standalone Controller + The recommended method for creating a new KRaft controller cluster is to bootstrap it with one voter and dyncamically add the rest of the controllers. This can be done with the following CLI command: + + kafka-storage format --cluster-id --standalone --config controller.properties Review Comment: Have you checked `` renders fine? We may need to use `<`/`>` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17048; Add docs for KIP-853 [kafka]
mimaison commented on code in PR #17076: URL: https://github.com/apache/kafka/pull/17076#discussion_r1753780968 ## docs/ops.html: ## @@ -3776,25 +3776,77 @@ controller.quorum.voters=id1@host1:port1,id2@host2:port2,id3@host3:port3 + controller.quorum.bootstrap.servers=host1:port1,host2:port2,host3:port3 If a Kafka cluster has 3 controllers named controller1, controller2 and controller3, then controller1 may have the following configuration: process.roles=controller node.id=1 listeners=CONTROLLER://controller1.example.com:9093 -controller.quorum.voters=1...@controller1.example.com:9093,2...@controller2.example.com:9093,3...@controller3.example.com:9093 +controller.quorum.bootstrap.servers=controller1.example.com:9093,controller2.example.com:9093,controller3.example.com:9093 +controller.listener.names=CONTROLLER - Every broker and controller must set the controller.quorum.voters property. The node ID supplied in the controller.quorum.voters property must match the corresponding id on the controller servers. For example, on controller1, node.id must be set to 1, and so forth. Each node ID must be unique across all the servers in a particular cluster. No two servers can have the same node ID regardless of their process.roles values. + Every broker and controller must set the controller.quorum.bootstrap.servers property. - Storage Tool + Provisioning Nodes The kafka-storage.sh random-uuid command can be used to generate a cluster ID for your new cluster. This cluster ID must be used when formatting each server in the cluster with the kafka-storage.sh format command. This is different from how Kafka has operated in the past. Previously, Kafka would format blank storage directories automatically, and also generate a new cluster ID automatically. One reason for the change is that auto-formatting can sometimes obscure an error condition. This is particularly important for the metadata log maintained by the controller and broker servers. If a majority of the controllers were able to start with an empty log directory, a leader might be able to be elected with missing committed data. + Bootstrap a Standalone Controller + The recommended method for creating a new KRaft controller cluster is to bootstrap it with one voter and dynamically add the rest of the controllers. Bootstrapping the first controller can be done with the following CLI command: Review Comment: Any particular reason this is the recommended method over the alternative below? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17048; Add docs for KIP-853 [kafka]
mimaison commented on code in PR #17076: URL: https://github.com/apache/kafka/pull/17076#discussion_r1753837295 ## docs/ops.html: ## @@ -3776,25 +3776,77 @@ controller.quorum.voters=id1@host1:port1,id2@host2:port2,id3@host3:port3 + controller.quorum.bootstrap.servers=host1:port1,host2:port2,host3:port3 If a Kafka cluster has 3 controllers named controller1, controller2 and controller3, then controller1 may have the following configuration: process.roles=controller node.id=1 listeners=CONTROLLER://controller1.example.com:9093 -controller.quorum.voters=1...@controller1.example.com:9093,2...@controller2.example.com:9093,3...@controller3.example.com:9093 +controller.quorum.bootstrap.servers=controller1.example.com:9093,controller2.example.com:9093,controller3.example.com:9093 +controller.listener.names=CONTROLLER - Every broker and controller must set the controller.quorum.voters property. The node ID supplied in the controller.quorum.voters property must match the corresponding id on the controller servers. For example, on controller1, node.id must be set to 1, and so forth. Each node ID must be unique across all the servers in a particular cluster. No two servers can have the same node ID regardless of their process.roles values. + Every broker and controller must set the controller.quorum.bootstrap.servers property. - Storage Tool + Provisioning Nodes The kafka-storage.sh random-uuid command can be used to generate a cluster ID for your new cluster. This cluster ID must be used when formatting each server in the cluster with the kafka-storage.sh format command. This is different from how Kafka has operated in the past. Previously, Kafka would format blank storage directories automatically, and also generate a new cluster ID automatically. One reason for the change is that auto-formatting can sometimes obscure an error condition. This is particularly important for the metadata log maintained by the controller and broker servers. If a majority of the controllers were able to start with an empty log directory, a leader might be able to be elected with missing committed data. + Bootstrap a Standalone Controller + The recommended method for creating a new KRaft controller cluster is to bootstrap it with one voter and dynamically add the rest of the controllers. Bootstrapping the first controller can be done with the following CLI command: + + kafka-storage format --cluster-id --standalone --config controller.properties Review Comment: All other commands have the `$ bin/` prompt, I think we should be consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17048; Add docs for KIP-853 [kafka]
mimaison commented on code in PR #17076: URL: https://github.com/apache/kafka/pull/17076#discussion_r1753841794 ## docs/ops.html: ## @@ -3776,25 +3776,77 @@ controller.quorum.voters=id1@host1:port1,id2@host2:port2,id3@host3:port3 + controller.quorum.bootstrap.servers=host1:port1,host2:port2,host3:port3 If a Kafka cluster has 3 controllers named controller1, controller2 and controller3, then controller1 may have the following configuration: process.roles=controller node.id=1 listeners=CONTROLLER://controller1.example.com:9093 -controller.quorum.voters=1...@controller1.example.com:9093,2...@controller2.example.com:9093,3...@controller3.example.com:9093 +controller.quorum.bootstrap.servers=controller1.example.com:9093,controller2.example.com:9093,controller3.example.com:9093 +controller.listener.names=CONTROLLER - Every broker and controller must set the controller.quorum.voters property. The node ID supplied in the controller.quorum.voters property must match the corresponding id on the controller servers. For example, on controller1, node.id must be set to 1, and so forth. Each node ID must be unique across all the servers in a particular cluster. No two servers can have the same node ID regardless of their process.roles values. + Every broker and controller must set the controller.quorum.bootstrap.servers property. - Storage Tool + Provisioning Nodes The kafka-storage.sh random-uuid command can be used to generate a cluster ID for your new cluster. This cluster ID must be used when formatting each server in the cluster with the kafka-storage.sh format command. This is different from how Kafka has operated in the past. Previously, Kafka would format blank storage directories automatically, and also generate a new cluster ID automatically. One reason for the change is that auto-formatting can sometimes obscure an error condition. This is particularly important for the metadata log maintained by the controller and broker servers. If a majority of the controllers were able to start with an empty log directory, a leader might be able to be elected with missing committed data. + Bootstrap a Standalone Controller + The recommended method for creating a new KRaft controller cluster is to bootstrap it with one voter and dynamically add the rest of the controllers. Bootstrapping the first controller can be done with the following CLI command: + + kafka-storage format --cluster-id --standalone --config controller.properties + + This command will 1) create a meta.properties file in metadata.log.dir with a randomly generated directory.id, 2) create a snapshot at -00.checkpoint with the necessary control records (KRaftVersionRecord and VotersRecord) to make this Kafka node the only voter for the quorum. + + Bootstrap with Multiple Controllers + The KRaft cluster metadata partition can also be bootstrapped with more than one voter. This can be done by using the --initial-controllers flag: + + cluster-id=$(kafka-storage random-uuid) +controller-0-uuid=$(kafka-storage random-uuid) +controller-1-uuid=$(kafka-storage random-uuid) +controller-2-uuid=$(kafka-storage random-uuid) + +# In each controller execute +kafka-storage format --cluster-id ${cluster-id} \ + --initial-controllers "0@controller-0:1234:${controller-0-uuid},1@controller-1:1234:${controller-1-uuid},2@controller-2:1234:${controller-2-uuid}" \ + --config controller.properties + +This command is similar to the standalone version but the snapshot at -00.checkpoint will instead contain a VotersRecord that includes information for all of the controllers specified in --initial-controllers. It is important that the value of this flag is the same in all of the controllers with the same cluster id. + +In the replica description 0@controller-0:1234:3Db5QLSqSZieL3rJBUUegA, 0 is the replica id, 3Db5QLSqSZieL3rJBUUegA is the replica directory id, controller-0 is the replica's host and 1234 is the replica's port. + + Formatting Brokers and New Controllers + When provisioning new broker and controller nodes that we want to add to an existing Kafka cluster, use the kafka-storage.sh format command without the --standalone or --initial-controllers flags. + + kafka-storage format --cluster-id --config server.properties + + Controller membership changes + + Add New Controller + If the KRaft Controller cluster already exist, the cluster can be expanded by first provisioning a new controller using the kafka-storage tool and starting the controller. + + After starting the controller, the replication to the new controller can be monitored using the kafka-metadata-quorum describe --replication command. Once the new controller has caught up to the active controller, it can be added to the cluster using the kafka-metadata-quorum add-controller command. + + When using broker en
Re: [PR] KAFKA-17048; Add docs for KIP-853 [kafka]
mimaison commented on code in PR #17076: URL: https://github.com/apache/kafka/pull/17076#discussion_r1753849319 ## config/kraft/reconfig-server.properties: ## @@ -0,0 +1,132 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more Review Comment: It's unclear why we're adding a new file -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17448: New consumer seek should update positions in background thread [kafka]
FrankYang0529 commented on code in PR #17075: URL: https://github.com/apache/kafka/pull/17075#discussion_r1753881144 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/SeekUnvalidatedEvent.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer.internals.events; + +import org.apache.kafka.clients.consumer.internals.SubscriptionState; +import org.apache.kafka.common.TopicPartition; + +import java.util.Optional; + +/** + * Event to perform {@link org.apache.kafka.clients.consumer.internals.SubscriptionState#seekUnvalidated(TopicPartition, SubscriptionState.FetchPosition)} Review Comment: Updated it. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17521) bootstrap-controller option buggy behavior
Michal Medvecky created KAFKA-17521: --- Summary: bootstrap-controller option buggy behavior Key: KAFKA-17521 URL: https://issues.apache.org/jira/browse/KAFKA-17521 Project: Kafka Issue Type: Bug Components: admin Affects Versions: 3.7.1 Reporter: Michal Medvecky Once running kafka admin tools with --bootstrap-controller, I am experiencing weird behavior. Let me show examples. {code:java} [appuser@e4bbc669d343 ~]$ kafka-configs --describe --bootstrap-controller kafka1:9093 --command-config /tmp/kafka-client.properties --entity-type brokers --entity-name 1 Dynamic configs for broker 1 are: {code} That's "sort of" fine, but: * my set up consists of 3 controller nodes (1,2,3) and 3 broker nodes (4,5,6). * entity-type must be "brokers", even though I am connecting to a controller (9093/tcp is a controller listener) * node 1 is not a broker, but a controller instead ("for broker 1 are ...") When trying to describe config for node 2: {code:java} [appuser@e4bbc669d343 ~]$ kafka-configs --describe --bootstrap-controller kafka1:9093 --command-config /tmp/kafka-client.properties --entity-type brokers --entity-name 2 Dynamic configs for broker 2 are: Error while executing config command with args '--describe --bootstrap-controller kafka1:9093 --command-config /tmp/kafka-client.properties --entity-type brokers --entity-name 2' java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidRequestException: Unexpected broker id, expected 1 or empty string, but received 2 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) at kafka.admin.ConfigCommand$.getResourceConfig(ConfigCommand.scala:610) at kafka.admin.ConfigCommand$.$anonfun$describeResourceConfig$5(ConfigCommand.scala:568) at kafka.admin.ConfigCommand$.$anonfun$describeResourceConfig$5$adapted(ConfigCommand.scala:560) at scala.collection.immutable.List.foreach(List.scala:333) at kafka.admin.ConfigCommand$.describeResourceConfig(ConfigCommand.scala:560) at kafka.admin.ConfigCommand$.describeConfig(ConfigCommand.scala:538) at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:343) at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:97) at kafka.admin.ConfigCommand.main(ConfigCommand.scala) Caused by: org.apache.kafka.common.errors.InvalidRequestException: Unexpected broker id, expected 1 or empty string, but received 2 {code} Ehm, what? Expected 1? I need to describe configs for node 2, not 1. The same thing happens, once connecting to node 2 instead of node 1: {code:java} [appuser@e4bbc669d343 ~]$ kafka-configs --describe --bootstrap-controller kafka2:9093 --command-config /tmp/kafka-client.properties --entity-type brokers --entity-name 2 Dynamic configs for broker 2 are: Error while executing config command with args '--describe --bootstrap-controller kafka2:9093 --command-config /tmp/kafka-client.properties --entity-type brokers --entity-name 2' java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidRequestException: Unexpected broker id, expected 1 or empty string, but received 2 at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) at kafka.admin.ConfigCommand$.getResourceConfig(ConfigCommand.scala:610) at kafka.admin.ConfigCommand$.$anonfun$describeResourceConfig$5(ConfigCommand.scala:568) at kafka.admin.ConfigCommand$.$anonfun$describeResourceConfig$5$adapted(ConfigCommand.scala:560) at scala.collection.immutable.List.foreach(List.scala:333) at kafka.admin.ConfigCommand$.describeResourceConfig(ConfigCommand.scala:560) at kafka.admin.ConfigCommand$.describeConfig(ConfigCommand.scala:538) at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:343) at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:97) at kafka.admin.ConfigCommand.main(ConfigCommand.scala) Caused by: org.apache.kafka.common.errors.InvalidRequestException: Unexpected broker id, expected 1 or empty string, but received 2 {code} If i specify {{--all}} instead of {{{}entity-name{}}}, what I see is: {code:java} [appuser@e4bbc669d343 ~]$ kafka-configs --describe --bootstrap-controller kafka2:9093 --command-config /tmp/kafka-client.properties --entity-type brokers --all All configs for broker 1 are: advertised.listeners=null sensitive=false synonyms={} zookeeper.ssl.truststore.type=null sensitive=false synonyms={} All configs for broker 2 are: Error while executi
Re: [PR] KAFKA-12829: Remove deprecated StreamsBuilder#addGlobalStore of old Processor API [kafka]
pegasas commented on PR #17059: URL: https://github.com/apache/kafka/pull/17059#issuecomment-2343272830   fix lint & test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17347: Add missing client-metrics option to kafka-configs.sh [kafka]
chia7712 commented on code in PR #17046: URL: https://github.com/apache/kafka/pull/17046#discussion_r1754016599 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -620,6 +699,17 @@ private void deleteAndVerifyGroupConfigValue(Admin client, verifyGroupConfig(client, groupName, defaultConfigs); } +private void deleteAndVerifyClientMetricsConfigValue(Admin client, + String clientMetricsName, + Map defaultConfigs) throws Exception { Review Comment: @AndrewJSchofield thanks for all your sharing! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17439: Make polling for new records an explicit action/event in the new consumer [kafka]
AndrewJSchofield commented on code in PR #17035: URL: https://github.com/apache/kafka/pull/17035#discussion_r1754077572 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchRequestManager.java: ## @@ -65,16 +67,59 @@ protected void maybeThrowAuthFailure(Node node) { networkClientDelegate.maybeThrowAuthFailure(node); } +/** + * Request that a fetch request be issued to the cluster to pull down the next batch of records. + * + * + * + * The returned {@link CompletableFuture} is {@link CompletableFuture#complete(Object) completed} when the + * fetch request(s) have been created and enqueued into the network client's outgoing send buffer. + * It is not completed when the network client has received the data. + * + * @return Future for which the caller can wait to ensure that the requests have been enqueued + */ +public CompletableFuture requestFetch() { +CompletableFuture future = new CompletableFuture<>(); + +if (pendingFetchRequestFuture != null) { +// In this case, we have an outstanding fetch request, so chain the newly created future to be +// invoked when the outstanding fetch request is completed. +pendingFetchRequestFuture.whenComplete((value, exception) -> { +if (exception != null) { Review Comment: Yes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17287: Added integration tests for ShareConsumer [kafka]
AndrewJSchofield commented on PR #17009: URL: https://github.com/apache/kafka/pull/17009#issuecomment-234165 While the tests seem generally quite reliable, I see very occasional failures. Please chase them down. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17109: implement exponential backoff for state directory lock [kafka]
lucasbru merged PR #17116: URL: https://github.com/apache/kafka/pull/17116 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: Bump zstd-jni from 1.5.6-4 to 1.5.6-5 [kafka]
olegbonar commented on PR #17151: URL: https://github.com/apache/kafka/pull/17151#issuecomment-2343373691 Looks like some flaky tests failed, probably not related to the PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17508: Adding some guard for fallback deletion logic [kafka]
FrankYang0529 commented on code in PR #17154: URL: https://github.com/apache/kafka/pull/17154#discussion_r1754278310 ## storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java: ## @@ -805,6 +805,10 @@ private Void deleteTypeIfExists(StorageAction delete, Stri if (logIfMissing) { LOGGER.info("Failed to delete {} {} because it does not exist.", fileType, file.getAbsolutePath()); } +if (file.getParent() == null) { +LOGGER.warn("Failed to delete {} {} because parent directory is null.", fileType, file.getAbsolutePath()); Review Comment: Hi @showuon, thanks for the suggestion. I removed the log and add a test with mock for it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17400: Added share fetch purgatory for delaying share fetch requests [kafka]
adixitconfluent commented on code in PR #16969: URL: https://github.com/apache/kafka/pull/16969#discussion_r1754336687 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -995,6 +1004,9 @@ boolean canAcquireRecords() { * @return A boolean which indicates whether the fetch lock is acquired. */ boolean maybeAcquireFetchLock() { +if (partitionState() != SharePartitionState.ACTIVE) { Review Comment: Hi @junrao , I am not sure of that. We acquire a write lock [here](https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L309) before we update the partitionState and also before reading the partitionState, we acquire the read lock [here](https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L1453). So, at all times we should be reading the latest state. ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -995,6 +1004,9 @@ boolean canAcquireRecords() { * @return A boolean which indicates whether the fetch lock is acquired. */ boolean maybeAcquireFetchLock() { +if (partitionState() != SharePartitionState.ACTIVE) { Review Comment: Hi @junrao , I am not sure of that. We acquire a write lock [here](https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L309) before we update the partitionState and also before reading the partitionState, we acquire the read lock [here](https://github.com/apache/kafka/blob/trunk/core/src/main/java/kafka/server/share/SharePartition.java#L1453). So, at all times we should be reading the latest state. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]
FrankYang0529 commented on code in PR #16982: URL: https://github.com/apache/kafka/pull/16982#discussion_r1754351703 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -2479,23 +2487,29 @@ public void testCurrentLag(GroupProtocol groupProtocol) { consumer.seek(tp0, 50L); consumer.poll(Duration.ofMillis(0)); // requests: list-offset, fetch -assertEquals(2, client.inFlightRequestCount()); Review Comment: Hi @lianetm, thanks for catching this. I tried to add `Thread.sleep(1000)` before `assertEquals(0, client.inFlightRequestCount())`. It looks like both `ListOffsetsRequest` and `OffsetFetchRequest` will be sent after getting `FindCoordinatorResponse`. I remove the assertion now. Do you think that we should keep this assertion for CLASSIC group protocol only? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17400: Added share fetch purgatory for delaying share fetch requests [kafka]
apoorvmittal10 commented on code in PR #16969: URL: https://github.com/apache/kafka/pull/16969#discussion_r1754417489 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -995,6 +1004,9 @@ boolean canAcquireRecords() { * @return A boolean which indicates whether the fetch lock is acquired. */ boolean maybeAcquireFetchLock() { +if (partitionState() != SharePartitionState.ACTIVE) { Review Comment: @junrao We update the partitionState always with write lock. Though one thisng we can improve in method `completeInitializationWithException`, the lock is not explicitly attained which updating `partitionState` but the method is called after attaining lock itself. But I ll make the change for `completeInitializationWithException` method as well. Here when `partitionState()` is called then it takes the read lock. Am I missing somehting here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17502: Modified commitSync() and close() handling in clients [kafka]
AndrewJSchofield commented on code in PR #17136: URL: https://github.com/apache/kafka/pull/17136#discussion_r1754436434 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -84,10 +86,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi private Uuid memberId; private boolean fetchMoreRecords = false; private final Map fetchAcknowledgementsMap; -private final Map> acknowledgeRequestStates; +private final Map> acknowledgeRequestStates; private final long retryBackoffMs; private final long retryBackoffMaxMs; -private boolean closing = false; private final CompletableFuture closeFuture; Review Comment: It seems to me that the handling of `closeFuture` is flawed. An `AcknowledgeRequestState` is node-specific, but the future is associated with the entire request manager. It's only valid to complete the future once all close acks have been completed, not just the first one. ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -1011,33 +1036,47 @@ public void completeIfEmpty() { } } -static class Pair { +static class Tuple { private V asyncRequest; -private V syncRequest; +private Queue syncRequestQueue; +private V closeRequest; -public Pair(V asyncRequest, V syncRequest) { +public Tuple(V asyncRequest, Queue syncRequestQueue, V closeRequest) { this.asyncRequest = asyncRequest; -this.syncRequest = syncRequest; +this.syncRequestQueue = syncRequestQueue; +this.closeRequest = closeRequest; } public void setAsyncRequest(V asyncRequest) { this.asyncRequest = asyncRequest; } -public void setSyncRequest(V second) { -this.syncRequest = second; +public void setSyncRequestQueue(Queue syncRequestQueue) { Review Comment: You could encapsulate this queue entirely in this class. For example, `addSyncRequest` could create the queue if it's not yet been created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17522) Share partition acquire() need not return a future
Abhinav Dixit created KAFKA-17522: - Summary: Share partition acquire() need not return a future Key: KAFKA-17522 URL: https://issues.apache.org/jira/browse/KAFKA-17522 Project: Kafka Issue Type: Sub-task Reporter: Abhinav Dixit Assignee: Apoorv Mittal As per discussion [https://github.com/apache/kafka/pull/16274#discussion_r1700968453] and [https://github.com/apache/kafka/pull/16969#discussion_r1752362118] , we don't need acquire method to return a future since we are not persisting acquisitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17400: Added share fetch purgatory for delaying share fetch requests [kafka]
adixitconfluent commented on code in PR #16969: URL: https://github.com/apache/kafka/pull/16969#discussion_r1754440143 ## core/src/main/java/kafka/server/share/ShareFetchUtils.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server.share; + +import kafka.server.ReplicaManager; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.message.ShareFetchResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.record.FileRecords; +import org.apache.kafka.common.requests.ListOffsetsRequest; +import org.apache.kafka.storage.internals.log.FetchPartitionData; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import scala.Option; + +/** + * Utility class for post-processing of share fetch operations. + */ +public class ShareFetchUtils { +private static final Logger log = LoggerFactory.getLogger(ShareFetchUtils.class); + +// Process the replica manager fetch response to update share partitions and futures. +static CompletableFuture> processFetchResponse( +SharePartitionManager.ShareFetchPartitionData shareFetchPartitionData, +Map responseData, +Map partitionCacheMap, +ReplicaManager replicaManager +) { +Map> futures = new HashMap<>(); +responseData.forEach((topicIdPartition, fetchPartitionData) -> { + +SharePartition sharePartition = partitionCacheMap.get(new SharePartitionManager.SharePartitionKey( +shareFetchPartitionData.groupId(), topicIdPartition)); +futures.put(topicIdPartition, sharePartition.acquire(shareFetchPartitionData.memberId(), fetchPartitionData) Review Comment: @junrao , I think we can get rid of the future as you mentioned. I have logged a jira https://issues.apache.org/jira/browse/KAFKA-17522 and assigned it to @apoorvmittal10. He mentioned he'll take a look. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17347: Add missing client-metrics option to kafka-configs.sh [kafka]
chia7712 commented on code in PR #17046: URL: https://github.com/apache/kafka/pull/17046#discussion_r1754462594 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -620,6 +699,17 @@ private void deleteAndVerifyGroupConfigValue(Admin client, verifyGroupConfig(client, groupName, defaultConfigs); } +private void deleteAndVerifyClientMetricsConfigValue(Admin client, + String clientMetricsName, + Map defaultConfigs) throws Exception { Review Comment: > consumer.session.timeout.ms=45000 sensitive=false synonyms={} Should the value of `synonyms` be `synonyms={DEFAULT_CONFIG:group.consumer.session.timeout.ms=45000}`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17523) Connectors should be able to override offset.flush.interval.ms
Viktor Somogyi-Vass created KAFKA-17523: --- Summary: Connectors should be able to override offset.flush.interval.ms Key: KAFKA-17523 URL: https://issues.apache.org/jira/browse/KAFKA-17523 Project: Kafka Issue Type: Improvement Reporter: Viktor Somogyi-Vass The offset.flush.interval.ms config affects how frequently the tasks of a connector is flushed. This is a worker level config, affecting all connectors. In a prod cluster, it is possible to have multiple connectors with different desired flush rate which means that users may want to override this config on connector level too. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17347: Add missing client-metrics option to kafka-configs.sh [kafka]
AndrewJSchofield commented on code in PR #17046: URL: https://github.com/apache/kafka/pull/17046#discussion_r1754598635 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -620,6 +699,17 @@ private void deleteAndVerifyGroupConfigValue(Admin client, verifyGroupConfig(client, groupName, defaultConfigs); } +private void deleteAndVerifyClientMetricsConfigValue(Admin client, + String clientMetricsName, + Map defaultConfigs) throws Exception { Review Comment: That would imply that the broker config `group.consumer.session.timeout.ms` provided the default. KIP-848 does describe such a config, but does not explicitly specify that the default should be taken from the broker config. I think that probably it should and this is a symptom of that problem. As an aside, we are doing exactly this in https://github.com/apache/kafka/pull/17070 which is creating a group config with a broker default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16332 Remove Deprecated builder methods for Time/Session/Join/SlidingWindows [kafka]
brandboat commented on code in PR #17126: URL: https://github.com/apache/kafka/pull/17126#discussion_r1754695410 ## streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java: ## @@ -193,7 +193,7 @@ public void shouldGroupByKey(final TestInfo testInfo) throws Exception { produceMessages(timestamp); stream.groupByKey(Grouped.with(Serdes.Integer(), Serdes.String())) -.windowedBy(TimeWindows.of(ofMillis(500L))) +.windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(500L))) Review Comment: In this test, we create topic with 3 partitions, https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java#L241 but records are all in same timestamp, so we don't need to care about any recored will be dropped here, hence the grace period is not required. ## streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java: ## @@ -157,7 +157,7 @@ public void shouldReduceWindowed(final TestInfo testInfo) throws Exception { produceMessages(secondBatchTimestamp); groupedStream -.windowedBy(TimeWindows.of(ofMillis(500L))) +.windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L), ofMinutes(1L))) Review Comment: In this test, we create topic with 3 partitions, https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java#L241 and records with secondBatchTimestamp may come after records with firstBatchTimestamp, so we need grace period here to avoid records being dropped. https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java#L154-L157 ## streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java: ## @@ -349,7 +348,7 @@ private Topology setupTopologyWithIntermediateTopic(final boolean useRepartition stream = builder.stream(INTERMEDIATE_USER_TOPIC); } stream.groupByKey() -.windowedBy(TimeWindows.of(ofMillis(35)).advanceBy(ofMillis(10))) + .windowedBy(TimeWindows.ofSizeWithNoGrace(ofMillis(35)).advanceBy(ofMillis(10))) Review Comment: I removed the grace period from the test code wherever possible, as I believe most test cases don't require it. IMO, we should only add a grace period when it's necessary. However, if this complicates the code review, I can add grace period (1 day) back to these test cases. I'm not entirely sure I know the details of Streams, so please correct me if I'm wrong., in this test class, we create topics with only one partition, ensuring that all records arrive in sequential order. https://github.com/apache/kafka/blob/f59d829381a7021971b0b2835fbbacb4abd996a5/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java#L205 And in https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java#L177 all records timestamp are in asendending order. As a result, there's no need for grace period here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16332 Remove Deprecated builder methods for Time/Session/Join/SlidingWindows [kafka]
brandboat commented on code in PR #17126: URL: https://github.com/apache/kafka/pull/17126#discussion_r1754695466 ## streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java: ## @@ -157,7 +157,7 @@ public void shouldReduceWindowed(final TestInfo testInfo) throws Exception { produceMessages(secondBatchTimestamp); groupedStream -.windowedBy(TimeWindows.of(ofMillis(500L))) +.windowedBy(TimeWindows.ofSizeAndGrace(ofMillis(500L), ofMinutes(1L))) Review Comment: In this test, we create topic with 3 partitions, https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java#L241 and records with firstBatchTimestamp may come after records with secondBatchTimestamp, so we need grace period here to avoid records being dropped. https://github.com/apache/kafka/blob/9ccc33da8af2c506647620a6a6f1795030328de1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java#L154-L157 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17524) StreamThreadTest shouldReturnErrorIfProducerInstanceIdNotInitialized hanging
David Arthur created KAFKA-17524: Summary: StreamThreadTest shouldReturnErrorIfProducerInstanceIdNotInitialized hanging Key: KAFKA-17524 URL: https://issues.apache.org/jira/browse/KAFKA-17524 Project: Kafka Issue Type: Test Reporter: David Arthur A trunk build had a timeout and it appears that this test was the cause. [https://github.com/apache/kafka/actions/runs/10798234851/job/29953919232] In the Gradle log, we see {code:java} 2024-09-10T20:31:26.6830206Z Gradle Test Run :streams:test > Gradle Test Executor 47 > StreamThreadTest > shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean, boolean) > "shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean, boolean).stateUpdaterEnabled=true, processingThreadsEnabled=true" STARTED {code} but no "FINISHED" or "FAILED" later in the log. It seems that this test was running for around 50 minutes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] MINOR Move some things into committer-tools [kafka]
mumrah commented on PR #17162: URL: https://github.com/apache/kafka/pull/17162#issuecomment-2343835120 I've also included a new find-unfinished-test.py which I've used in the past to find hanging tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17506 KRaftMigrationDriver initialization race [kafka]
mumrah commented on code in PR #17147: URL: https://github.com/apache/kafka/pull/17147#discussion_r1754786855 ## metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java: ## @@ -232,6 +232,32 @@ CompletableFuture enqueueMetadataChangeEventWithFuture( return future; } +@Test +public void testOnControllerChangeWhenUninitialized() throws InterruptedException { +CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); +CapturingMigrationClient.newBuilder().build(); +CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build(); +MockFaultHandler faultHandler = new MockFaultHandler("testBecomeLeaderUninitialized"); +KRaftMigrationDriver.Builder builder = defaultTestBuilder() +.setZkMigrationClient(migrationClient) +.setPropagator(metadataPropagator) +.setFaultHandler(faultHandler); +try (KRaftMigrationDriver driver = builder.build()) { +// Fake a complete migration with ZK client +migrationClient.setMigrationRecoveryState( + ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1)); + +// enqueue a poll event. once run, this will enqueue a recovery event +driver.start(); Review Comment: Good point. Let's consider the possibilities: Case 1: If the PollEvent runs quickly, we'll see * start(), enqueue PollEvent * run PollEvent, enqueue RecoverMigrationStateFromZKEvent (from PollEvent) * onControllerChange(), enqueue RecoverMigrationStateFromZKEvent and KRaftLeaderEvent * run RecoverMigrationStateFromZKEvent, state = INACTIVE * run RecoverMigrationStateFromZKEvent, no-op * run KRaftLeaderEvent, state is INACTIVE ✅ Case 2: If the RecoverMigrationStateFromZKEvent from PollEvent runs before onControllerChange: * start(), enqueue PollEvent * run PollEvent, enqueue RecoverMigrationStateFromZKEvent (from PollEvent) * run RecoverMigrationStateFromZKEvent, state = INACTIVE * onControllerChange(), enqueue KRaftLeaderEvent * run KRaftLeaderEvent, state is INACTIVE ✅ Only Case 1 tests the race condition and the fix. If we reverse the operations, we'll have Case 3: * onControllerChange(), enqueue RecoverMigrationStateFromZKEvent and KRaftLeaderEvent * start(), enqueue PollEvent * run RecoverMigrationStateFromZKEvent, state = INACTIVE * run KRaftLeaderEvent, state is INACTIVE ✅ * run PollEvent, no-op So yea, it makes sense to use Case 3 for the test since it contrives the race condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17493) Sink connector-related OffsetsApiIntegrationTest suite test cases failing more frequently with new consumer/group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-17493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881001#comment-17881001 ] Sagar Rao commented on KAFKA-17493: --- [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L129].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L226].] I see the same line in the stacktrace as well ``` at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:397) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:445) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:394) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:999) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:226) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsets(OffsetsApiIntegrationTest.java:173) at java.lang.reflect.Method.invoke(Method.java:569) at java.util.ArrayList.forEach(ArrayList.java:1511) at java.util.ArrayList.forEach(ArrayList.java:1511) ``` We are trying to use the AdminClient to read the sink connector offsets [here|[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1234-L1252].] There's not much indication in the logs as to why this is happening. > Sink connector-related OffsetsApiIntegrationTest suite test cases failing > more frequently with new consumer/group coordinator > - > > Key: KAFKA-17493 > URL: https://issues.apache.org/jira/browse/KAFKA-17493 > Project: Kafka > Issue Type: Test > Components: connect, consumer, group-coordinator >Reporter: Chris Egerton >Priority: Major > > We recently updated trunk to use the new KIP-848 consumer/group coordinator > by default, which appears to have led to an uptick in flakiness for the > OffsetsApiIntegrationTest suite for Connect (specifically, the test cases > that use sink connectors, which makes sense since they're the type of > connector that uses a consumer group under the hood). > Gradle Enterprise shows that in the week before that update was made, the > test suite had a flakiness rate of about 4% > (https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172455840&search.startTimeMin=172395360&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.*&tests.sortField=FLAKY), > and in the week and a half since, the flakiness rate has jumped to 17% > (https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=17247312000
Re: [PR] KAFKA-17506 KRaftMigrationDriver initialization race [kafka]
mumrah merged PR #17147: URL: https://github.com/apache/kafka/pull/17147 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17506 KRaftMigrationDriver initialization race [kafka]
mumrah commented on code in PR #17147: URL: https://github.com/apache/kafka/pull/17147#discussion_r1754786855 ## metadata/src/test/java/org/apache/kafka/metadata/migration/KRaftMigrationDriverTest.java: ## @@ -232,6 +232,32 @@ CompletableFuture enqueueMetadataChangeEventWithFuture( return future; } +@Test +public void testOnControllerChangeWhenUninitialized() throws InterruptedException { +CountingMetadataPropagator metadataPropagator = new CountingMetadataPropagator(); +CapturingMigrationClient.newBuilder().build(); +CapturingMigrationClient migrationClient = CapturingMigrationClient.newBuilder().build(); +MockFaultHandler faultHandler = new MockFaultHandler("testBecomeLeaderUninitialized"); +KRaftMigrationDriver.Builder builder = defaultTestBuilder() +.setZkMigrationClient(migrationClient) +.setPropagator(metadataPropagator) +.setFaultHandler(faultHandler); +try (KRaftMigrationDriver driver = builder.build()) { +// Fake a complete migration with ZK client +migrationClient.setMigrationRecoveryState( + ZkMigrationLeadershipState.EMPTY.withKRaftMetadataOffsetAndEpoch(100, 1)); + +// enqueue a poll event. once run, this will enqueue a recovery event +driver.start(); Review Comment: Good point. Let's consider the possibilities: Case 1: If the PollEvent runs quickly, we'll see * start(), enqueue PollEvent * run PollEvent, enqueue RecoverMigrationStateFromZKEvent (from PollEvent) * onControllerChange(), enqueue RecoverMigrationStateFromZKEvent and KRaftLeaderEvent * run RecoverMigrationStateFromZKEvent, state = INACTIVE * run RecoverMigrationStateFromZKEvent, no-op * run KRaftLeaderEvent, state is INACTIVE ✅ Case 2: If the RecoverMigrationStateFromZKEvent from PollEvent runs before onControllerChange: * start(), enqueue PollEvent * run PollEvent, enqueue RecoverMigrationStateFromZKEvent (from PollEvent) * run RecoverMigrationStateFromZKEvent, state = INACTIVE * onControllerChange(), enqueue KRaftLeaderEvent * run KRaftLeaderEvent, state is INACTIVE ✅ Only Case 1 tests the race condition and the fix. If we reverse the operations, we'll have Case 3: * onControllerChange(), enqueue RecoverMigrationStateFromZKEvent and KRaftLeaderEvent * start(), enqueue PollEvent * run RecoverMigrationStateFromZKEvent, state = INACTIVE * run KRaftLeaderEvent, state is INACTIVE ✅ * run PollEvent, no-op So yea, it makes sense to use Case 3 for the test since it always contrives the race condition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17154: New consumer subscribe may join group without a call to consumer.poll [kafka]
FrankYang0529 opened a new pull request, #17165: URL: https://github.com/apache/kafka/pull/17165 To fulfill "rebalances will only occur during an active call to KafkaConsumer#poll(Duration)", we should not send `JoinRequest` after `AsyncKafkaConsumer#subscribe`. Add a flag `subscriptionUpdated` to `AbstractMembershipManager#onSubscriptionUpdated`. When calling `AsyncKafkaConsumer#subscribe`, set the flag to true. When calling `AsyncKafkaConsumer#subscribe`, send the `JoinRequest` if the flag is true. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16792: Enable consumer unit tests that fail to fetch offsets only for new consumer with poll(0) [kafka]
lianetm commented on code in PR #16982: URL: https://github.com/apache/kafka/pull/16982#discussion_r1754814823 ## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ## @@ -2479,23 +2487,29 @@ public void testCurrentLag(GroupProtocol groupProtocol) { consumer.seek(tp0, 50L); consumer.poll(Duration.ofMillis(0)); // requests: list-offset, fetch -assertEquals(2, client.inFlightRequestCount()); Review Comment: Well definitely this bit is somewhere where the 2 consumers internasl are different, even though I see them conceptually aligned: - what's the same in both consumers: a call to consumer lag will "signal" that it needs to retrieve the log endOffsets - what's internally done different in both consumers: classic will only generate the request on the next poll (on the single thread it had and didn't want to block waiting for the offsets) vs async consumer, where the background thread poll will pick up the intention already expressed in the app thread and generate the request to get end offsets. So I would say we keep the assertion (for the classic as you suggested), and it will be helpful to show this difference in the test actually. I would add an explanation for it too: Classic consumer does not send the LIST_OFFSETS right away (requires an explicit poll), different from the new async consumer, that will send the LIST_OFFSETS request in the background thread on the next background thread poll. Makes sense? With all these tests we're enabling, worth running them repeatedly locally to try to spot any other flakiness similar to the ones we've been catching. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17493) Sink connector-related OffsetsApiIntegrationTest suite test cases failing more frequently with new consumer/group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-17493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881010#comment-17881010 ] Chris Egerton commented on KAFKA-17493: --- [~sagarrao] I don't think our attempts to read offsets are timing out (and the stack trace in your comment doesn't demonstrate this). I think the issue is that we're timing out while waiting for offsets to be committed, which in previous cases has been caused because connector tasks aren't committing offsets at all, possibly because the consumer group is rebalancing. > Sink connector-related OffsetsApiIntegrationTest suite test cases failing > more frequently with new consumer/group coordinator > - > > Key: KAFKA-17493 > URL: https://issues.apache.org/jira/browse/KAFKA-17493 > Project: Kafka > Issue Type: Test > Components: connect, consumer, group-coordinator >Reporter: Chris Egerton >Priority: Major > > We recently updated trunk to use the new KIP-848 consumer/group coordinator > by default, which appears to have led to an uptick in flakiness for the > OffsetsApiIntegrationTest suite for Connect (specifically, the test cases > that use sink connectors, which makes sense since they're the type of > connector that uses a consumer group under the hood). > Gradle Enterprise shows that in the week before that update was made, the > test suite had a flakiness rate of about 4% > (https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172455840&search.startTimeMin=172395360&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.*&tests.sortField=FLAKY), > and in the week and a half since, the flakiness rate has jumped to 17% > (https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.*&tests.sortField=FLAKY). -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17502: Modified commitSync() and close() handling in clients [kafka]
ShivsundarR commented on code in PR #17136: URL: https://github.com/apache/kafka/pull/17136#discussion_r1754897975 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -1011,33 +1036,47 @@ public void completeIfEmpty() { } } -static class Pair { +static class Tuple { private V asyncRequest; -private V syncRequest; +private Queue syncRequestQueue; +private V closeRequest; -public Pair(V asyncRequest, V syncRequest) { +public Tuple(V asyncRequest, Queue syncRequestQueue, V closeRequest) { this.asyncRequest = asyncRequest; -this.syncRequest = syncRequest; +this.syncRequestQueue = syncRequestQueue; +this.closeRequest = closeRequest; } public void setAsyncRequest(V asyncRequest) { this.asyncRequest = asyncRequest; } -public void setSyncRequest(V second) { -this.syncRequest = second; +public void setSyncRequestQueue(Queue syncRequestQueue) { Review Comment: Yeah makes sense, I have updated the addSyncRequest() method. Still the getSyncRequestQueue() method exists though as we need the queue contents when we process the acknowledgements. Here will renaming the method work, or is it fine as it is? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-14995: Automate asf.yaml collaborators refresh [kafka]
mumrah commented on PR #17124: URL: https://github.com/apache/kafka/pull/17124#issuecomment-2343953520 @fonsdant thanks for your contribution -- hope to see more in the future 😄 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17506) KRaftMigrationDriver initialization race condition
[ https://issues.apache.org/jira/browse/KAFKA-17506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Arthur resolved KAFKA-17506. -- Resolution: Fixed > KRaftMigrationDriver initialization race condition > -- > > Key: KAFKA-17506 > URL: https://issues.apache.org/jira/browse/KAFKA-17506 > Project: Kafka > Issue Type: Bug >Affects Versions: 3.8.0, 3.7.1, 3.9.0 >Reporter: David Arthur >Assignee: David Arthur >Priority: Blocker > Fix For: 3.9.0, 3.7.2, 3.8.1 > > > There is a race condition between KRaftMigrationDriver running its first > poll() and being notified by Raft about a leader change. If > {{onControllerChange}} is called before RecoverMigrationStateFromZKEvent is > run, we will end up getting stuck in the INACTIVE state. > This is likely a very narrow race condition that is not likely to be seen in > practice. It was found when debugging a flaky integration test. > If encountered, this can be worked around by restarting the stuck controller. > Leadership will move to another controller which will allow the migration to > be started. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17502: Modified commitSync() and close() handling in clients [kafka]
ShivsundarR commented on code in PR #17136: URL: https://github.com/apache/kafka/pull/17136#discussion_r1754905117 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -84,10 +86,9 @@ public class ShareConsumeRequestManager implements RequestManager, MemberStateLi private Uuid memberId; private boolean fetchMoreRecords = false; private final Map fetchAcknowledgementsMap; -private final Map> acknowledgeRequestStates; +private final Map> acknowledgeRequestStates; private final long retryBackoffMs; private final long retryBackoffMaxMs; -private boolean closing = false; private final CompletableFuture closeFuture; Review Comment: Yes, thanks, I had missed this. I am now put an extra variable to indicate if a requestState has completed processing, and then accordingly waited till all the nodes are finished and then completed the `closeFuture`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17057: Add RETRY option to ProductionExceptionHanlder [kafka]
aliehsaeedii commented on code in PR #17163: URL: https://github.com/apache/kafka/pull/17163#discussion_r1754976225 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java: ## @@ -393,39 +393,41 @@ private void recordSendError(final String topic, "indicating the task may be migrated out"; sendException.set(new TaskMigratedException(errorMessage, productionException)); } else { -if (isRetriable(productionException)) { +final DefaultErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( +null, // only required to pass for DeserializationExceptionHandler +context.recordContext().topic(), +context.recordContext().partition(), +context.recordContext().offset(), +context.recordContext().headers(), +processorNodeId, +taskId, +context.recordContext().timestamp() +); + +final ProductionExceptionHandlerResponse handlerResponse; +try { +handlerResponse = Objects.requireNonNull( +productionExceptionHandler.handle(errorHandlerContext, serializedRecord, productionException), +"Invalid ProductionExceptionHandler response." +); +} catch (final RuntimeException fatalUserException) { +log.error( +"Production error callback failed after production error for record {}", +serializedRecord, +productionException +); +sendException.set(new FailedProcessingException("Fatal user code error in production error callback", fatalUserException)); +return; +} + +if (productionException instanceof RetriableException && handlerResponse == ProductionExceptionHandlerResponse.RETRY) { Review Comment: Thanks Matthias. Much better/more general than `isRetriable()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-17493) Sink connector-related OffsetsApiIntegrationTest suite test cases failing more frequently with new consumer/group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-17493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881001#comment-17881001 ] David Jacot edited comment on KAFKA-17493 at 9/11/24 3:35 PM: -- [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L129].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L226].] I see the same line in the stacktrace as well ``` at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:397) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:445) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:394) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:999) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:226) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsets(OffsetsApiIntegrationTest.java:173) at java.lang.reflect.Method.invoke(Method.java:569) at java.util.ArrayList.forEach(ArrayList.java:1511) at java.util.ArrayList.forEach(ArrayList.java:1511) ``` We are trying to use the AdminClient to read the sink connector offsets [here|[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1234-L1252].] There's not much indication in the logs as to why this is happening. was (Author: sagarrao): [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L129].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|[https://github.com/apach
[jira] [Created] (KAFKA-17525) Convert the UnknownServerException to InvalidRequestException when altering client-metrics config at runtime
Chia-Ping Tsai created KAFKA-17525: -- Summary: Convert the UnknownServerException to InvalidRequestException when altering client-metrics config at runtime Key: KAFKA-17525 URL: https://issues.apache.org/jira/browse/KAFKA-17525 Project: Kafka Issue Type: Improvement Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai see following test: {code:bash} chia7712@fedora:~/project/kafka$ bin/kafka-configs.sh --bootstrap-server 192.168.1.149:2 --alter --entity-type client-metrics --entity-name test --add-config interval.ms=bbb Error while executing config command with args '--bootstrap-server 192.168.1.149:2 --alter --entity-type client-metrics --entity-name test --add-config interval.ms=bbb' java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) at kafka.admin.ConfigCommand$.alterResourceConfig(ConfigCommand.scala:581) at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:449) at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:351) at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:100) at kafka.admin.ConfigCommand.main(ConfigCommand.scala) Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. {code} By contrast, `topic` resource can handle the such error and then return `InvalidRequestException` ```bash chia7712@fedora:~/project/kafka$ bin/kafka-configs.sh --bootstrap-server 192.168.1.149:2 --alter --topic chia --add-config flush.ms=aaa Error while executing config command with args '--bootstrap-server 192.168.1.149:2 --alter --topic chia --add-config flush.ms=aaa' java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidConfigurationException: Invalid value aaa for configuration flush.ms: Not a number of type LONG at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) at kafka.admin.ConfigCommand$.alterResourceConfig(ConfigCommand.scala:581) at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:371) at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:351) at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:100) at kafka.admin.ConfigCommand.main(ConfigCommand.scala) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Invalid value aaa for configuration flush.ms: Not a number of type LONG ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17525) Convert the UnknownServerException to InvalidRequestException when altering client-metrics config at runtime
[ https://issues.apache.org/jira/browse/KAFKA-17525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-17525: --- Description: see following test: {code:bash} chia7712@fedora:~/project/kafka$ bin/kafka-configs.sh --bootstrap-server 192.168.1.149:2 --alter --entity-type client-metrics --entity-name test --add-config interval.ms=bbb Error while executing config command with args '--bootstrap-server 192.168.1.149:2 --alter --entity-type client-metrics --entity-name test --add-config interval.ms=bbb' java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) at kafka.admin.ConfigCommand$.alterResourceConfig(ConfigCommand.scala:581) at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:449) at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:351) at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:100) at kafka.admin.ConfigCommand.main(ConfigCommand.scala) Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. {code} By contrast, `topic` resource can handle the such error and then return `InvalidRequestException` {code:bash} chia7712@fedora:~/project/kafka$ bin/kafka-configs.sh --bootstrap-server 192.168.1.149:2 --alter --topic chia --add-config flush.ms=aaa Error while executing config command with args '--bootstrap-server 192.168.1.149:2 --alter --topic chia --add-config flush.ms=aaa' java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidConfigurationException: Invalid value aaa for configuration flush.ms: Not a number of type LONG at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) at kafka.admin.ConfigCommand$.alterResourceConfig(ConfigCommand.scala:581) at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:371) at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:351) at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:100) at kafka.admin.ConfigCommand.main(ConfigCommand.scala) Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: Invalid value aaa for configuration flush.ms: Not a number of type LONG {code} was: see following test: {code:bash} chia7712@fedora:~/project/kafka$ bin/kafka-configs.sh --bootstrap-server 192.168.1.149:2 --alter --entity-type client-metrics --entity-name test --add-config interval.ms=bbb Error while executing config command with args '--bootstrap-server 192.168.1.149:2 --alter --entity-type client-metrics --entity-name test --add-config interval.ms=bbb' java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) at kafka.admin.ConfigCommand$.alterResourceConfig(ConfigCommand.scala:581) at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:449) at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:351) at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:100) at kafka.admin.ConfigCommand.main(ConfigCommand.scala) Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request. {code} By contrast, `topic` resource can handle the such error and then return `InvalidRequestException` ```bash chia7712@fedora:~/project/kafka$ bin/kafka-configs.sh --bootstrap-server 192.168.1.149:2 --alter --topic chia --add-config flush.ms=aaa Error while executing config command with args '--bootstrap-server 192.168.1.149:2 --alter --topic chia --add-config flush.ms=aaa' java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.InvalidConfigurationException: Invalid value aaa for configuration flush.ms: Not a number of type LONG at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) at java.bas
[jira] [Commented] (KAFKA-17525) Convert the UnknownServerException to InvalidRequestException when altering client-metrics config at runtime
[ https://issues.apache.org/jira/browse/KAFKA-17525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881030#comment-17881030 ] Chia-Ping Tsai commented on KAFKA-17525: `ClientMetricsConfigs#validateProperties` should leverage `CONFIG` to validate the new configs. [0] https://github.com/apache/kafka/blob/0e30209f01728d176906df23fad3cf8b36abbf38/server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsConfigs.java#L126 > Convert the UnknownServerException to InvalidRequestException when altering > client-metrics config at runtime > > > Key: KAFKA-17525 > URL: https://issues.apache.org/jira/browse/KAFKA-17525 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > see following test: > {code:bash} > chia7712@fedora:~/project/kafka$ bin/kafka-configs.sh --bootstrap-server > 192.168.1.149:2 --alter --entity-type client-metrics --entity-name test > --add-config interval.ms=bbb > Error while executing config command with args '--bootstrap-server > 192.168.1.149:2 --alter --entity-type client-metrics --entity-name test > --add-config interval.ms=bbb' > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) > at > kafka.admin.ConfigCommand$.alterResourceConfig(ConfigCommand.scala:581) > at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:449) > at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:351) > at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:100) > at kafka.admin.ConfigCommand.main(ConfigCommand.scala) > Caused by: org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request. > {code} > By contrast, `topic` resource can handle the such error and then return > `InvalidRequestException` > {code:bash} > chia7712@fedora:~/project/kafka$ bin/kafka-configs.sh --bootstrap-server > 192.168.1.149:2 --alter --topic chia --add-config flush.ms=aaa > Error while executing config command with args '--bootstrap-server > 192.168.1.149:2 --alter --topic chia --add-config flush.ms=aaa' > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.InvalidConfigurationException: Invalid value > aaa for configuration flush.ms: Not a number of type LONG > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) > at > kafka.admin.ConfigCommand$.alterResourceConfig(ConfigCommand.scala:581) > at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:371) > at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:351) > at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:100) > at kafka.admin.ConfigCommand.main(ConfigCommand.scala) > Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: > Invalid value aaa for configuration flush.ms: Not a number of type LONG > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (KAFKA-17493) Sink connector-related OffsetsApiIntegrationTest suite test cases failing more frequently with new consumer/group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-17493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881001#comment-17881001 ] Sagar Rao edited comment on KAFKA-17493 at 9/11/24 3:47 PM: [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|#L129].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1234-L1252] I see the same line in the stacktrace as well ``` at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:397) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:445) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:394) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:999) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:226) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsets(OffsetsApiIntegrationTest.java:173) at java.lang.reflect.Method.invoke(Method.java:569) at java.util.ArrayList.forEach(ArrayList.java:1511) at java.util.ArrayList.forEach(ArrayList.java:1511) ``` We are trying to use the AdminClient to read the sink connector offsets [here|#L1234-L1252].] There's not much indication in the logs as to why this is happening. was (Author: sagarrao): [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L129].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L226].] I see the same line in the stacktrace as well ``` at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.j
[jira] [Comment Edited] (KAFKA-17493) Sink connector-related OffsetsApiIntegrationTest suite test cases failing more frequently with new consumer/group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-17493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881001#comment-17881001 ] Sagar Rao edited comment on KAFKA-17493 at 9/11/24 3:48 PM: [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L128-L149].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1234-L1252] I see the same line in the stacktrace as well ``` at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:397) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:445) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:394) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:999) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:226) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsets(OffsetsApiIntegrationTest.java:173) at java.lang.reflect.Method.invoke(Method.java:569) at java.util.ArrayList.forEach(ArrayList.java:1511) at java.util.ArrayList.forEach(ArrayList.java:1511) ``` We are trying to use the AdminClient to read the sink connector offsets [here|#L1234-L1252].] There's not much indication in the logs as to why this is happening. was (Author: sagarrao): [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|#L129].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1234-L1252] I see the same line in the stacktrace as well ``` at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.ap
[jira] [Comment Edited] (KAFKA-17493) Sink connector-related OffsetsApiIntegrationTest suite test cases failing more frequently with new consumer/group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-17493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881001#comment-17881001 ] Sagar Rao edited comment on KAFKA-17493 at 9/11/24 3:49 PM: [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L128-L149].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1234-L1252] I see the same line in the stacktrace as well ``` at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:397) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:445) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:394) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:999) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:226) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsets(OffsetsApiIntegrationTest.java:173) at java.lang.reflect.Method.invoke(Method.java:569) at java.util.ArrayList.forEach(ArrayList.java:1511) at java.util.ArrayList.forEach(ArrayList.java:1511) ``` We are trying to use the AdminClient to read the sink connector offsets [here|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1234-L1252].] There's not much indication in the logs as to why this is happening. was (Author: sagarrao): [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L128-L149].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|https://github.com/apache/kafka/blob/trun
[jira] [Comment Edited] (KAFKA-17493) Sink connector-related OffsetsApiIntegrationTest suite test cases failing more frequently with new consumer/group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-17493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881001#comment-17881001 ] Sagar Rao edited comment on KAFKA-17493 at 9/11/24 3:49 PM: [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L128-L149].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1234-L1252] I see the same line in the stacktrace as well ``` at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:397) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:445) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:394) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:999) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:226) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsets(OffsetsApiIntegrationTest.java:173) at java.lang.reflect.Method.invoke(Method.java:569) at java.util.ArrayList.forEach(ArrayList.java:1511) at java.util.ArrayList.forEach(ArrayList.java:1511) ``` We are trying to use the AdminClient to read the sink connector offsets [here|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1234-L1252]. There's not much indication in the logs as to why this is happening. was (Author: sagarrao): [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L128-L149].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|https://github.com/apache/kafka/blob/trunk
[jira] [Comment Edited] (KAFKA-17493) Sink connector-related OffsetsApiIntegrationTest suite test cases failing more frequently with new consumer/group coordinator
[ https://issues.apache.org/jira/browse/KAFKA-17493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881001#comment-17881001 ] Sagar Rao edited comment on KAFKA-17493 at 9/11/24 3:51 PM: [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L128-L149].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L226-L227] I see the same line in the stacktrace as well ``` at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151) at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132) at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63) at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36) at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214) at org.apache.kafka.test.TestUtils.lambda$waitForCondition$3(TestUtils.java:397) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:445) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:394) at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.verifyExpectedSinkConnectorOffsets(OffsetsApiIntegrationTest.java:999) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.getAndVerifySinkConnectorOffsets(OffsetsApiIntegrationTest.java:226) at org.apache.kafka.connect.integration.OffsetsApiIntegrationTest.testGetSinkConnectorOffsets(OffsetsApiIntegrationTest.java:173) at java.lang.reflect.Method.invoke(Method.java:569) at java.util.ArrayList.forEach(ArrayList.java:1511) at java.util.ArrayList.forEach(ArrayList.java:1511) ``` We are trying to use the AdminClient to read the sink connector offsets [here|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java#L1234-L1252]. There's not much indication in the logs as to why this is happening. was (Author: sagarrao): [~dajac] , [~ChrisEgerton] I took a look at the logs for [testGetSinkConnectorOffsets|https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.startTimeMax=172568159&search.startTimeMin=172473120&search.tags=trunk&search.timeZoneId=America%2FNew_York&tests.container=org.apache.kafka.connect.integration.OffsetsApiIntegrationTest&tests.sortField=FLAKY&tests.test=testGetSinkConnectorOffsets()]. I noticed a couple of differences which which may contribute to the flakiness (not totally sure at this point): 1) For the passed test case, I see that when the test passes, at that point we are spinning up a new connect cluster. When that happens, I see [verifyClusterReadiness|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java#L181] getting triggered which checks whether the kafka cluster is ready or not and also an Admin client is able to do admin stuff. In the failing case, I see we don't have that and instead we reuse an existing connect cluster as per [this|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java#L128-L149].] 2) In the failed test, the connector comes up properly till this point, but it appears to me that it gets stuck when trying to read the offsets using the Admin client [here|https://github.com/ap
[jira] [Commented] (KAFKA-17410) Flaky test testPollThrowsInterruptExceptionIfInterrupted for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-17410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881034#comment-17881034 ] Lianet Magrans commented on KAFKA-17410: [~frankvicky] this test is very noisy/flaky in trunk, and we already have coverage for the new consumer in AsyncKafkaConsumerTest.testPollThrowsInterruptExceptionIfInterrupted (passing consistently), so it would make sense to me to leave it running only for the classic consumer, what do you think? In parallel, given that we may have an idea of what could be behind the flakiness (but fix is in-flight), we can review it again once that fix goes in (jiras related already). Makes sense? If so, let me know if you have bandwidth for the PR just moving this back to the classic consumer, with the reference to the existing test for the new consumer. Happy to help with reviews. Thanks! > Flaky test testPollThrowsInterruptExceptionIfInterrupted for new consumer > - > > Key: KAFKA-17410 > URL: https://issues.apache.org/jira/browse/KAFKA-17410 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: TengYao Chi >Priority: Major > Labels: consumer-threading-refactor, flaky-test > > KafkaConsumerTest.testPollThrowsInterruptExceptionIfInterrupted is flaky for > the new consumer (passing consistently for the classic consumer). > Fails with: > org.opentest4j.AssertionFailedError: Expected > org.apache.kafka.common.errors.InterruptException to be thrown, but nothing > was thrown. > It's been flaky since enabled for the new consumer recently > [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172438559&search.startTimeMin=172006560&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testPollThrowsInterruptExceptionIfInterrupted(GroupProtocol)%5B2%5D|https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172438559&search.startTimeMin=172006560&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testPollThrowsInterruptExceptionIfInterrupted(GroupProtocol)%5B2%5D.] > Note that a very similar test already exist in AsyncKafkaConsumerTest. > testPollThrowsInterruptExceptionIfInterrupted, written specifically for the > async consumer, and that passes consistently. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17502: Modified commitSync() and close() handling in clients [kafka]
ShivsundarR commented on code in PR #17136: URL: https://github.com/apache/kafka/pull/17136#discussion_r1754897975 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -1011,33 +1036,47 @@ public void completeIfEmpty() { } } -static class Pair { +static class Tuple { private V asyncRequest; -private V syncRequest; +private Queue syncRequestQueue; +private V closeRequest; -public Pair(V asyncRequest, V syncRequest) { +public Tuple(V asyncRequest, Queue syncRequestQueue, V closeRequest) { this.asyncRequest = asyncRequest; -this.syncRequest = syncRequest; +this.syncRequestQueue = syncRequestQueue; +this.closeRequest = closeRequest; } public void setAsyncRequest(V asyncRequest) { this.asyncRequest = asyncRequest; } -public void setSyncRequest(V second) { -this.syncRequest = second; +public void setSyncRequestQueue(Queue syncRequestQueue) { Review Comment: Yeah makes sense, I have updated the addSyncRequest() method. Still the getSyncRequestQueue() method exists though as we need the queue contents when we process the acknowledgements. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17347: Add missing client-metrics option to kafka-configs.sh [kafka]
chia7712 commented on code in PR #17046: URL: https://github.com/apache/kafka/pull/17046#discussion_r1755095302 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -325,6 +358,35 @@ private void verifyGroupConfigUpdate() throws Exception { } } + +@ClusterTest(types = {Type.KRAFT}) +public void testClientMetricsConfigUpdate() throws Exception { +alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(), "--entity-type", "client-metrics", "--alter"); +verifyClientMetricsConfigUpdate(); + +// Test for the --client-metrics alias +alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(), "--client-metrics", "--alter"); Review Comment: the arguments generated by this test case is `--client-metrics --alter --entity-name cm --add-config interval.ms=6000,metrics=`, but that is illegal actually since the "--alter" is viewed as entity name. It can work since we don't call check and `ConfigCommand#alterConfig` process only first entity. Luckily, "cm" is the first entity Maybe we can handle this in follow-up as it needs minor refactor. the tests of other resource types have similar issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17502: Modified commitSync() and close() handling in clients [kafka]
AndrewJSchofield commented on code in PR #17136: URL: https://github.com/apache/kafka/pull/17136#discussion_r1755099465 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java: ## @@ -1011,33 +1036,47 @@ public void completeIfEmpty() { } } -static class Pair { +static class Tuple { private V asyncRequest; -private V syncRequest; +private Queue syncRequestQueue; +private V closeRequest; -public Pair(V asyncRequest, V syncRequest) { +public Tuple(V asyncRequest, Queue syncRequestQueue, V closeRequest) { this.asyncRequest = asyncRequest; -this.syncRequest = syncRequest; +this.syncRequestQueue = syncRequestQueue; +this.closeRequest = closeRequest; } public void setAsyncRequest(V asyncRequest) { this.asyncRequest = asyncRequest; } -public void setSyncRequest(V second) { -this.syncRequest = second; +public void setSyncRequestQueue(Queue syncRequestQueue) { Review Comment: That's fine. It just seemed weird forcing the caller to make and set the queue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16727: Added share group record lock duration ms to dynamic configurations [kafka]
apoorvmittal10 commented on code in PR #17070: URL: https://github.com/apache/kafka/pull/17070#discussion_r1754670964 ## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/GroupType.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +public enum GroupType { Review Comment: Can't we move this class instead of deleting other and adding new here? ## core/src/main/scala/kafka/server/BrokerServer.scala: ## @@ -613,7 +616,10 @@ class BrokerServer( val writer = new CoordinatorPartitionWriter( replicaManager ) - val groupConfigManager = new GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap()) + groupConfigManager = new GroupConfigManager( +config.groupCoordinatorConfig.extractConsumerGroupConfigMap(), +config.shareGroupConfig.extractShareGroupConfigMap() + ) Review Comment: If the config is not being used only for `group coordinator` then should it be in method `createGroupCoordinator`? I think it should be moved more centrally. ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/DynamicGroupConfig.java: ## @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.share.ShareGroupConfig; +import org.apache.kafka.server.share.ShareGroupDynamicConfig; + +import java.util.Arrays; +import java.util.Optional; +import java.util.Properties; + +public class DynamicGroupConfig { +public static final ConfigDef CONFIG_DEF = Utils.mergeConfigs(Arrays.asList( +ConsumerGroupDynamicConfig.configDef(), +ShareGroupDynamicConfig.configDef() +)); + +ConsumerGroupDynamicConfig consumerGroupDynamicConfig; +ShareGroupDynamicConfig shareGroupDynamicConfig; Review Comment: Should they be private? ## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/GroupType.java: ## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.common.runtime; Review Comment: Should this file be in `runtime` or in `org.apache.kafka.coordinator.common`? -- This is an automated message from the Apache Git Service.
[jira] [Commented] (KAFKA-17410) Flaky test testPollThrowsInterruptExceptionIfInterrupted for new consumer
[ https://issues.apache.org/jira/browse/KAFKA-17410?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881040#comment-17881040 ] TengYao Chi commented on KAFKA-17410: - Hi [~lianetm] Apologies for the late response. Sadly, I still haven’t been able to reproduce the bug on my machine, despite trying every tricks I know. :P Given the situation, your suggestion makes sense to me. I’ll go ahead and file a PR to disable this test for the AsyncConsumer. > Flaky test testPollThrowsInterruptExceptionIfInterrupted for new consumer > - > > Key: KAFKA-17410 > URL: https://issues.apache.org/jira/browse/KAFKA-17410 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Reporter: Lianet Magrans >Assignee: TengYao Chi >Priority: Major > Labels: consumer-threading-refactor, flaky-test > > KafkaConsumerTest.testPollThrowsInterruptExceptionIfInterrupted is flaky for > the new consumer (passing consistently for the classic consumer). > Fails with: > org.opentest4j.AssertionFailedError: Expected > org.apache.kafka.common.errors.InterruptException to be thrown, but nothing > was thrown. > It's been flaky since enabled for the new consumer recently > [https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172438559&search.startTimeMin=172006560&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testPollThrowsInterruptExceptionIfInterrupted(GroupProtocol)%5B2%5D|https://ge.apache.org/scans/tests?search.names=Git%20branch&search.rootProjectNames=kafka&search.startTimeMax=172438559&search.startTimeMin=172006560&search.timeZoneId=America%2FToronto&search.values=trunk&tests.container=org.apache.kafka.clients.consumer.KafkaConsumerTest&tests.test=testPollThrowsInterruptExceptionIfInterrupted(GroupProtocol)%5B2%5D.] > Note that a very similar test already exist in AsyncKafkaConsumerTest. > testPollThrowsInterruptExceptionIfInterrupted, written specifically for the > async consumer, and that passes consistently. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (KAFKA-17525) Convert the UnknownServerException to InvalidRequestException when altering client-metrics config at runtime
[ https://issues.apache.org/jira/browse/KAFKA-17525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881042#comment-17881042 ] TaiJuWu commented on KAFKA-17525: - Hi[~chia7712] , if you are not working on this, may I have it? > Convert the UnknownServerException to InvalidRequestException when altering > client-metrics config at runtime > > > Key: KAFKA-17525 > URL: https://issues.apache.org/jira/browse/KAFKA-17525 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > > see following test: > {code:bash} > chia7712@fedora:~/project/kafka$ bin/kafka-configs.sh --bootstrap-server > 192.168.1.149:2 --alter --entity-type client-metrics --entity-name test > --add-config interval.ms=bbb > Error while executing config command with args '--bootstrap-server > 192.168.1.149:2 --alter --entity-type client-metrics --entity-name test > --add-config interval.ms=bbb' > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) > at > kafka.admin.ConfigCommand$.alterResourceConfig(ConfigCommand.scala:581) > at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:449) > at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:351) > at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:100) > at kafka.admin.ConfigCommand.main(ConfigCommand.scala) > Caused by: org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request. > {code} > By contrast, `topic` resource can handle the such error and then return > `InvalidRequestException` > {code:bash} > chia7712@fedora:~/project/kafka$ bin/kafka-configs.sh --bootstrap-server > 192.168.1.149:2 --alter --topic chia --add-config flush.ms=aaa > Error while executing config command with args '--bootstrap-server > 192.168.1.149:2 --alter --topic chia --add-config flush.ms=aaa' > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.InvalidConfigurationException: Invalid value > aaa for configuration flush.ms: Not a number of type LONG > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) > at > kafka.admin.ConfigCommand$.alterResourceConfig(ConfigCommand.scala:581) > at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:371) > at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:351) > at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:100) > at kafka.admin.ConfigCommand.main(ConfigCommand.scala) > Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: > Invalid value aaa for configuration flush.ms: Not a number of type LONG > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17316: Standard authorizer refactor [kafka]
Claudenw commented on PR #16779: URL: https://github.com/apache/kafka/pull/16779#issuecomment-2344112120 @cmccabe You asked about why the StandardAuthorizerData was easier to test than the StandardAuthorizer. It is not that the StandardAuthorizerData is easier to test, it is that the interface for StandardAuthorizerData is different than the StandardAuthorizer. The StandardAuthorizer does some work and then passed requests on to the StandardAuthorizerData to handle. So the parameters are different for some methods. This give us the opportunity to test the StandardAuthorizerData interface separate from the StandardAuthorizer. In many cases it is probably possible to implement the StandardAuthorizerData than to start at the StandardAuthorizer level. The Trie implementation is simply a replacement for the StandardAuthorizerData. The tests will now verify edge cases and corner cases for the StandardAuthorizerData that otherwise were not discovered until integration testing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-8666: Improve Documentation on usage of Materialized config object [kafka]
fonsdant commented on PR #17145: URL: https://github.com/apache/kafka/pull/17145#issuecomment-2344112932 Thanks, @mjsax! I have committed you suggestions. It looks even better now! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17400: Added share fetch purgatory for delaying share fetch requests [kafka]
junrao commented on code in PR #16969: URL: https://github.com/apache/kafka/pull/16969#discussion_r1755130741 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -995,6 +1004,9 @@ boolean canAcquireRecords() { * @return A boolean which indicates whether the fetch lock is acquired. */ boolean maybeAcquireFetchLock() { +if (partitionState() != SharePartitionState.ACTIVE) { Review Comment: @apoorvmittal10 : Yes, the only issue is in `completeInitializationWithException`. Do you plan to fix it in this PR or a separate one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881043#comment-17881043 ] João Pedro Fonseca commented on KAFKA-14995: [~vvcephei], [~jlprat], could I mark this task as closed? The code has merged to trunk > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Assignee: João Pedro Fonseca >Priority: Minor > Labels: newbie > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17233: MirrorCheckpointConnector should use batched listConsumerGroupOffsets [kafka]
fonsdant commented on PR #17038: URL: https://github.com/apache/kafka/pull/17038#issuecomment-2344125595 Hi, @gharris1727! Could you review it, please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16379: Coordinator event queue, processing, flush, purgatory time histograms [kafka]
junrao commented on code in PR #16949: URL: https://github.com/apache/kafka/pull/16949#discussion_r1755140820 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/metrics/HdrHistogram.java: ## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.metrics; + +import org.HdrHistogram.Histogram; +import org.HdrHistogram.Recorder; +import org.HdrHistogram.ValueRecorder; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * A wrapper on top of the HdrHistogram API. It handles writing to the histogram by delegating + * to an internal {@link ValueRecorder} implementation, and reading from the histogram by + * efficiently implementing the retrieval of up-to-date histogram data. + * + * Note that all APIs expect a timestamp which is used by the histogram to discard decaying data + * and determine when the snapshot from which the histogram metrics are calculated should be + * refreshed. + */ +public final class HdrHistogram { Review Comment: > All I'm saying is that we don't claim to have convinced the community already, and don't want to inadvertently undermine the other "new histogram" effort we were made aware of, before there's been a discussion. Ok. What's our plan moving forward? We probably don't want to be in the interim state for too long. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881044#comment-17881044 ] Josep Prat commented on KAFKA-14995: Yes sure, go ahead! > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Assignee: João Pedro Fonseca >Priority: Minor > Labels: newbie > Fix For: 4.0.0 > > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-14995) Automate asf.yaml collaborators refresh
[ https://issues.apache.org/jira/browse/KAFKA-14995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat updated KAFKA-14995: --- Fix Version/s: 4.0.0 > Automate asf.yaml collaborators refresh > --- > > Key: KAFKA-14995 > URL: https://issues.apache.org/jira/browse/KAFKA-14995 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Assignee: João Pedro Fonseca >Priority: Minor > Labels: newbie > Fix For: 4.0.0 > > > We have added a policy to use the asf.yaml Github Collaborators: > [https://github.com/apache/kafka-site/pull/510] > The policy states that we set this list to be the top 20 commit authors who > are not Kafka committers. Unfortunately, it's not trivial to compute this > list. > Here is the process I followed to generate the list the first time (note that > I generated this list on 2023-04-28, so the lookback is one year: > 1. List authors by commit volume in the last year: > {code:java} > $ git shortlog --email --numbered --summary --since=2022-04-28 | vim {code} > 2. manually filter out the authors who are committers, based on > [https://kafka.apache.org/committers] > 3. truncate the list to 20 authors > 4. for each author > 4a. Find a commit in the `git log` that they were the author on: > {code:java} > commit 440bed2391338dc10fe4d36ab17dc104b61b85e8 > Author: hudeqi <1217150...@qq.com> > Date: Fri May 12 14:03:17 2023 +0800 > ...{code} > 4b. Look up that commit in Github: > [https://github.com/apache/kafka/commit/440bed2391338dc10fe4d36ab17dc104b61b85e8] > 4c. Copy their Github username into .asf.yaml under both the PR whitelist and > the Collaborators lists. > 5. Send a PR to update .asf.yaml: [https://github.com/apache/kafka/pull/13713] > > This is pretty time consuming and is very scriptable. Two complications: > * To do the filtering, we need to map from Git log "Author" to documented > Kafka "Committer" that we can use to perform the filter. Suggestion: just > update the structure of the "Committers" page to include their Git "Author" > name and email > ([https://github.com/apache/kafka-site/blob/asf-site/committers.html)] > * To generate the YAML lists, we need to map from Git log "Author" to Github > username. There's presumably some way to do this in the Github REST API (the > mapping is based on the email, IIUC), or we could also just update the > Committers page to also document each committer's Github username. > > Ideally, we would write this script (to be stored in the Apache Kafka repo) > and create a Github Action to run it every three months. > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17525) Convert the UnknownServerException to InvalidRequestException when altering client-metrics config at runtime
[ https://issues.apache.org/jira/browse/KAFKA-17525?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai reassigned KAFKA-17525: -- Assignee: TaiJuWu (was: Chia-Ping Tsai) > Convert the UnknownServerException to InvalidRequestException when altering > client-metrics config at runtime > > > Key: KAFKA-17525 > URL: https://issues.apache.org/jira/browse/KAFKA-17525 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: TaiJuWu >Priority: Minor > > see following test: > {code:bash} > chia7712@fedora:~/project/kafka$ bin/kafka-configs.sh --bootstrap-server > 192.168.1.149:2 --alter --entity-type client-metrics --entity-name test > --add-config interval.ms=bbb > Error while executing config command with args '--bootstrap-server > 192.168.1.149:2 --alter --entity-type client-metrics --entity-name test > --add-config interval.ms=bbb' > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: The server experienced > an unexpected error when processing the request. > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) > at > kafka.admin.ConfigCommand$.alterResourceConfig(ConfigCommand.scala:581) > at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:449) > at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:351) > at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:100) > at kafka.admin.ConfigCommand.main(ConfigCommand.scala) > Caused by: org.apache.kafka.common.errors.UnknownServerException: The server > experienced an unexpected error when processing the request. > {code} > By contrast, `topic` resource can handle the such error and then return > `InvalidRequestException` > {code:bash} > chia7712@fedora:~/project/kafka$ bin/kafka-configs.sh --bootstrap-server > 192.168.1.149:2 --alter --topic chia --add-config flush.ms=aaa > Error while executing config command with args '--bootstrap-server > 192.168.1.149:2 --alter --topic chia --add-config flush.ms=aaa' > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.InvalidConfigurationException: Invalid value > aaa for configuration flush.ms: Not a number of type LONG > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180) > at > kafka.admin.ConfigCommand$.alterResourceConfig(ConfigCommand.scala:581) > at kafka.admin.ConfigCommand$.alterConfig(ConfigCommand.scala:371) > at kafka.admin.ConfigCommand$.processCommand(ConfigCommand.scala:351) > at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:100) > at kafka.admin.ConfigCommand.main(ConfigCommand.scala) > Caused by: org.apache.kafka.common.errors.InvalidConfigurationException: > Invalid value aaa for configuration flush.ms: Not a number of type LONG > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17526) make ConfigCommandIntegrationTest.java test use correct arguments in testing alias
Chia-Ping Tsai created KAFKA-17526: -- Summary: make ConfigCommandIntegrationTest.java test use correct arguments in testing alias Key: KAFKA-17526 URL: https://issues.apache.org/jira/browse/KAFKA-17526 Project: Kafka Issue Type: Test Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai see https://github.com/apache/kafka/pull/17046#discussion_r1755095302 -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17347: Add missing client-metrics option to kafka-configs.sh [kafka]
chia7712 commented on code in PR #17046: URL: https://github.com/apache/kafka/pull/17046#discussion_r1755161862 ## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ## @@ -325,6 +358,35 @@ private void verifyGroupConfigUpdate() throws Exception { } } + +@ClusterTest(types = {Type.KRAFT}) +public void testClientMetricsConfigUpdate() throws Exception { +alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(), "--entity-type", "client-metrics", "--alter"); +verifyClientMetricsConfigUpdate(); + +// Test for the --client-metrics alias +alterOpts = asList("--bootstrap-server", cluster.bootstrapServers(), "--client-metrics", "--alter"); Review Comment: https://issues.apache.org/jira/browse/KAFKA-17526 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-17527) Kafka Streams fails with NPE for missing RecordContext
Matthias J. Sax created KAFKA-17527: --- Summary: Kafka Streams fails with NPE for missing RecordContext Key: KAFKA-17527 URL: https://issues.apache.org/jira/browse/KAFKA-17527 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 3.9.0 Reporter: Matthias J. Sax He did observe a crash of Kafka Streams with the following stack trace: {code:java} 2024-09-10 10:59:12,301] ERROR [kafka-producer-network-thread | i-0197827b22f4d4e4c-StreamThread-1-producer] Error executing user-provided callback on message for topic-partition 'stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-47-changelog-1' (org.apache.kafka.clients.producer.internals.ProducerBatch) java.lang.NullPointerException: Cannot invoke "org.apache.kafka.streams.processor.internals.InternalProcessorContext.recordContext()" because "context" is null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:405) at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285) at org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1574) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:312) at org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:200) at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:1166) at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:474) at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:337) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:251) at java.base/java.lang.Thread.run(Thread.java:840) {code} It seems to be a bug introduced via KIP-1033, coming from the changelogging layer which does pass a `null` context into `RecordCollector.send(...)`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17347: Add missing client-metrics option to kafka-configs.sh [kafka]
chia7712 merged PR #17046: URL: https://github.com/apache/kafka/pull/17046 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17347) Add omitted --client-metrics option to kafka-configs.sh
[ https://issues.apache.org/jira/browse/KAFKA-17347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17347. Resolution: Fixed > Add omitted --client-metrics option to kafka-configs.sh > --- > > Key: KAFKA-17347 > URL: https://issues.apache.org/jira/browse/KAFKA-17347 > Project: Kafka > Issue Type: Bug > Components: tools >Affects Versions: 3.7.0 >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Fix For: 4.0.0 > > > KIP-714 introduced client metrics resources to kafka-configs.sh. The option > --entity-type client-metrics was added, and a shorthand of "client-metrics" > was also included in the comments. However, the "-client-metrics" option > whose syntax matches all of the other entity types was omitted. This corrects > that omission. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Assigned] (KAFKA-17527) Kafka Streams fails with NPE for missing RecordContext
[ https://issues.apache.org/jira/browse/KAFKA-17527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-17527: --- Assignee: Matthias J. Sax > Kafka Streams fails with NPE for missing RecordContext > -- > > Key: KAFKA-17527 > URL: https://issues.apache.org/jira/browse/KAFKA-17527 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.9.0 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Blocker > > He did observe a crash of Kafka Streams with the following stack trace: > {code:java} > 2024-09-10 10:59:12,301] ERROR [kafka-producer-network-thread | > i-0197827b22f4d4e4c-StreamThread-1-producer] Error executing user-provided > callback on message for topic-partition > 'stream-soak-test-KSTREAM-AGGREGATE-STATE-STORE-47-changelog-1' > (org.apache.kafka.clients.producer.internals.ProducerBatch) > java.lang.NullPointerException: Cannot invoke > "org.apache.kafka.streams.processor.internals.InternalProcessorContext.recordContext()" > because "context" is null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:405) > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.lambda$send$1(RecordCollectorImpl.java:285) > at > org.apache.kafka.clients.producer.KafkaProducer$AppendCallbacks.onCompletion(KafkaProducer.java:1574) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:312) > at > org.apache.kafka.clients.producer.internals.ProducerBatch.abort(ProducerBatch.java:200) > at > org.apache.kafka.clients.producer.internals.RecordAccumulator.abortUndrainedBatches(RecordAccumulator.java:1166) > at > org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:474) > at > org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:337) > at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:251) > at java.base/java.lang.Thread.run(Thread.java:840) {code} > It seems to be a bug introduced via KIP-1033, coming from the changelogging > layer which does pass a `null` context into `RecordCollector.send(...)`. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-17524) StreamThreadTest shouldReturnErrorIfProducerInstanceIdNotInitialized hanging
[ https://issues.apache.org/jira/browse/KAFKA-17524?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-17524: Component/s: streams unit tests > StreamThreadTest shouldReturnErrorIfProducerInstanceIdNotInitialized hanging > > > Key: KAFKA-17524 > URL: https://issues.apache.org/jira/browse/KAFKA-17524 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: David Arthur >Priority: Major > Labels: flaky-test > > A trunk build had a timeout and it appears that this test was the cause. > [https://github.com/apache/kafka/actions/runs/10798234851/job/29953919232] > In the Gradle log, we see > {code:java} > 2024-09-10T20:31:26.6830206Z Gradle Test Run :streams:test > Gradle Test > Executor 47 > StreamThreadTest > > shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean, boolean) > > "shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean, > boolean).stateUpdaterEnabled=true, processingThreadsEnabled=true" STARTED > {code} > but no "FINISHED" or "FAILED" later in the log. > It seems that this test was running for around 50 minutes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17400: Added share fetch purgatory for delaying share fetch requests [kafka]
adixitconfluent commented on PR #16969: URL: https://github.com/apache/kafka/pull/16969#issuecomment-2344243078 The tests failing for JDK 21 and Scala 2.13 are unrelated to the PR. I ran the four of them locally and they all passed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] MINOR: follow up KAFKA-17477 comments [kafka]
chia7712 merged PR #17161: URL: https://github.com/apache/kafka/pull/17161 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17518) AsyncKafkaConsumer cannot reliably leave group when closed with small timeout
[ https://issues.apache.org/jira/browse/KAFKA-17518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881068#comment-17881068 ] Lianet Magrans commented on KAFKA-17518: I would add to this Jira, that it would be a good validation if we can end this with an integration test that calls close(0) and ensures that the group becomes empty. I expect such test would pass on trunk for the classic consumer now (it does send the leave group, without waiting for a response because it has 0 timeout), but same test requires this Jira to pass for the async consumer. (Discussion on KAFKA-16985 PR has more context since it was there where this issue was discovered) > AsyncKafkaConsumer cannot reliably leave group when closed with small timeout > - > > Key: KAFKA-17518 > URL: https://issues.apache.org/jira/browse/KAFKA-17518 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 3.9.0 >Reporter: Kirk True >Priority: Major > Labels: consumer-threading-refactor, kip-848-client-support > > If {{close()}} is called with a short timeout (e.g. 0 ms), the process cannot > complete, leading to the consumer remaining in the consumer group. > On {{{}close(){}}}, the consumer attempts to unsubscribe and leave the > consumer group. This process requires hops back and forth between the > application and background threads to call the > {{{}ConsumerRebalanceListener{}}}. Those hops add a nonzero amount of time to > the close step. > The events used to communicate between the application and background threads > are based on the timeout provided by the user. If the timeout is not > sufficient, the events will expire, and the process will be left incomplete. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-16985: Ensure consumer sends leave request on close even if interrupted [kafka]
lianetm commented on code in PR #16686: URL: https://github.com/apache/kafka/pull/16686#discussion_r1755201808 ## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java: ## @@ -1291,6 +1294,10 @@ private void releaseAssignmentAndLeaveGroup(final Timer timer) { log.warn("Consumer triggered an unsubscribe event to leave the group but couldn't " + "complete it within {} ms. It will proceed to close.", timer.timeoutMs()); } finally { +// Regardless of success or failure of the unsubscribe process, it's important to process any background +// events in the hope that our ConsumerRebalanceListenerCallbackNeededEvent is present and can be executed. +processBackgroundEvents(); Review Comment: just for the record, my expectations here were not right because of another gap discovered. This `processBackgrounEvents` is not enough to ensure that close(0) sends the leave, and https://issues.apache.org/jira/browse/KAFKA-17518 was created. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17400: Added share fetch purgatory for delaying share fetch requests [kafka]
apoorvmittal10 commented on code in PR #16969: URL: https://github.com/apache/kafka/pull/16969#discussion_r1755202301 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -995,6 +1004,9 @@ boolean canAcquireRecords() { * @return A boolean which indicates whether the fetch lock is acquired. */ boolean maybeAcquireFetchLock() { +if (partitionState() != SharePartitionState.ACTIVE) { Review Comment: Yes @junrao, I have made the change already locally and shall push with other changes I have. This PR has gone a bit id already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17400: Added share fetch purgatory for delaying share fetch requests [kafka]
apoorvmittal10 commented on code in PR #16969: URL: https://github.com/apache/kafka/pull/16969#discussion_r1755202301 ## core/src/main/java/kafka/server/share/SharePartition.java: ## @@ -995,6 +1004,9 @@ boolean canAcquireRecords() { * @return A boolean which indicates whether the fetch lock is acquired. */ boolean maybeAcquireFetchLock() { +if (partitionState() != SharePartitionState.ACTIVE) { Review Comment: Yes @junrao, I have made the change already locally and shall push with other changes I have. This PR has gone a bit big already. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-17524) StreamThreadTest shouldReturnErrorIfProducerInstanceIdNotInitialized hanging
[ https://issues.apache.org/jira/browse/KAFKA-17524?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881072#comment-17881072 ] David Arthur commented on KAFKA-17524: -- Here's another from the same test class, different test method Raw line: 2024-09-11T13:23:30.9643436Z Gradle Test Run :streams:test > Gradle Test Executor 50 > StreamThreadTest > shouldNotCloseTaskAndRemoveFromTaskManagerIfPidMappingIsInvalidInCommitTransactionWhenCommitting(boolean, boolean) > "shouldNotCloseTaskAndRemoveFromTaskManagerIfPidMappingIsInvalidInCommitTransactionWhenCommitting(boolean, boolean).stateUpdaterEnabled=true, processingThreadsEnabled=true" STARTED Workflow run: [https://github.com/apache/kafka/actions/runs/10810671508/job/29990096670] Build Scan: https://ge.apache.org/s/ddsbbgzcop7bo/timeline > StreamThreadTest shouldReturnErrorIfProducerInstanceIdNotInitialized hanging > > > Key: KAFKA-17524 > URL: https://issues.apache.org/jira/browse/KAFKA-17524 > Project: Kafka > Issue Type: Test > Components: streams, unit tests >Reporter: David Arthur >Priority: Major > Labels: flaky-test > > A trunk build had a timeout and it appears that this test was the cause. > [https://github.com/apache/kafka/actions/runs/10798234851/job/29953919232] > In the Gradle log, we see > {code:java} > 2024-09-10T20:31:26.6830206Z Gradle Test Run :streams:test > Gradle Test > Executor 47 > StreamThreadTest > > shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean, boolean) > > "shouldReturnErrorIfProducerInstanceIdNotInitialized(boolean, > boolean).stateUpdaterEnabled=true, processingThreadsEnabled=true" STARTED > {code} > but no "FINISHED" or "FAILED" later in the log. > It seems that this test was running for around 50 minutes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17528) Remove whitelist/blacklist from JMXReporter
Chia-Ping Tsai created KAFKA-17528: -- Summary: Remove whitelist/blacklist from JMXReporter Key: KAFKA-17528 URL: https://issues.apache.org/jira/browse/KAFKA-17528 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17529) Remove blacklist from MM2
Chia-Ping Tsai created KAFKA-17529: -- Summary: Remove blacklist from MM2 Key: KAFKA-17529 URL: https://issues.apache.org/jira/browse/KAFKA-17529 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-17530) Remove blacklist/whitelist from ReplaceField
Chia-Ping Tsai created KAFKA-17530: -- Summary: Remove blacklist/whitelist from ReplaceField Key: KAFKA-17530 URL: https://issues.apache.org/jira/browse/KAFKA-17530 Project: Kafka Issue Type: Sub-task Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (KAFKA-12601) Remove deprecated `delegation.token.master.key`
[ https://issues.apache.org/jira/browse/KAFKA-12601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai updated KAFKA-12601: --- Parent: KAFKA-17384 Issue Type: Sub-task (was: Task) > Remove deprecated `delegation.token.master.key` > --- > > Key: KAFKA-12601 > URL: https://issues.apache.org/jira/browse/KAFKA-12601 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma >Assignee: xuanzhang gong >Priority: Major > Fix For: 4.0.0 > > -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17392: Remove whitelist option in ConsoleConsumerOptions [kafka]
chia7712 merged PR #17138: URL: https://github.com/apache/kafka/pull/17138 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17392) Remove whitelist in ConsoleConsumerOptions
[ https://issues.apache.org/jira/browse/KAFKA-17392?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17392. Fix Version/s: 4.0.0 Resolution: Fixed > Remove whitelist in ConsoleConsumerOptions > -- > > Key: KAFKA-17392 > URL: https://issues.apache.org/jira/browse/KAFKA-17392 > Project: Kafka > Issue Type: Sub-task >Reporter: PoAn Yang >Assignee: Ming-Yen Chung >Priority: Minor > Fix For: 4.0.0 > > > The > [whitelist|https://github.com/apache/kafka/blob/944c1353a925858ea9bd9024a713cd7301f55133/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java#L79-L83] > is deprecated option in ConsoleConsumerOptions. We can consider to remove it > in 4.0. > https://cwiki.apache.org/confluence/display/KAFKA/KIP-629%3A+Use+racially+neutral+terms+in+our+codebase -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] KAFKA-17231: Add missing node latency metrics [kafka]
chia7712 merged PR #17137: URL: https://github.com/apache/kafka/pull/17137 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-17231) Share consumer node latency metrics
[ https://issues.apache.org/jira/browse/KAFKA-17231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chia-Ping Tsai resolved KAFKA-17231. Fix Version/s: 4.0.0 Resolution: Fixed > Share consumer node latency metrics > --- > > Key: KAFKA-17231 > URL: https://issues.apache.org/jira/browse/KAFKA-17231 > Project: Kafka > Issue Type: Sub-task >Reporter: Andrew Schofield >Assignee: Andrew Schofield >Priority: Major > Fix For: 4.0.0 > > > This is the share consumer equivalent of > https://github.com/apache/kafka/pull/16755. -- This message was sent by Atlassian Jira (v8.20.10#820010)
Re: [PR] Testing PR [kafka]
mumrah closed pull request #17141: Testing PR URL: https://github.com/apache/kafka/pull/17141 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-17524: Add timeout to StreamThreadTest.shouldReturnErrorIfProducerInstanceIdNotInitialized [kafka]
mjsax opened a new pull request, #17167: URL: https://github.com/apache/kafka/pull/17167 (no comment) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15073 Close stale PRs [2/n] [kafka]
chia7712 commented on code in PR #17166: URL: https://github.com/apache/kafka/pull/17166#discussion_r1755260902 ## .github/workflows/stale.yml: ## @@ -42,14 +42,17 @@ jobs: with: debug-only: ${{ inputs.dryRun || false }} operations-per-run: ${{ inputs.operationsPerRun || 100 }} + ascending: true days-before-stale: 90 - days-before-close: -1 + days-before-close: 120 stale-pr-label: 'stale' stale-pr-message: > This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has -merge conflicts, please update it with the latest from trunk (or appropriate release branch) +merge conflicts, please update it with the latest from trunk (or appropriate release branch). + +If you are having difficulty finding a reviewer, please reach out on the +[mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. - Review Comment: Should we add [close-pr-message](https://github.com/actions/stale#close-pr-message) to explain our policy? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-17448: New consumer seek should update positions in background thread [kafka]
lianetm merged PR #17075: URL: https://github.com/apache/kafka/pull/17075 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15073 Close stale PRs [2/n] [kafka]
mumrah commented on code in PR #17166: URL: https://github.com/apache/kafka/pull/17166#discussion_r1755279325 ## .github/workflows/stale.yml: ## @@ -42,14 +42,17 @@ jobs: with: debug-only: ${{ inputs.dryRun || false }} operations-per-run: ${{ inputs.operationsPerRun || 100 }} + ascending: true days-before-stale: 90 - days-before-close: -1 + days-before-close: 120 stale-pr-label: 'stale' stale-pr-message: > This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has -merge conflicts, please update it with the latest from trunk (or appropriate release branch) +merge conflicts, please update it with the latest from trunk (or appropriate release branch). + +If you are having difficulty finding a reviewer, please reach out on the +[mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. - Review Comment: Yes, good idea. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-15073 Close stale PRs [2/n] [kafka]
jlprat commented on code in PR #17166: URL: https://github.com/apache/kafka/pull/17166#discussion_r1755293831 ## .github/workflows/stale.yml: ## @@ -42,14 +42,21 @@ jobs: with: debug-only: ${{ inputs.dryRun || false }} operations-per-run: ${{ inputs.operationsPerRun || 100 }} + ascending: true days-before-stale: 90 - days-before-close: -1 + days-before-close: 120 Review Comment: I know it's probably not doable, but it would be great to close only the PRs where the contributor dropped the ball, and leave the ones waiting for us maintainers to review open. With the closing message we leave the door open for the contributor to reopen, maybe we can be more explicit stating that it's totally fine to reopen if the PR is under this case -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org