[jira] [Created] (KAFKA-10609) Mirror Maker 2.0 RemoteClusterUtils do not return offset map for Assign Mode Consumers
Ananya created KAFKA-10609: -- Summary: Mirror Maker 2.0 RemoteClusterUtils do not return offset map for Assign Mode Consumers Key: KAFKA-10609 URL: https://issues.apache.org/jira/browse/KAFKA-10609 Project: Kafka Issue Type: Bug Components: mirrormaker Affects Versions: 2.5.0 Environment: Ubuntu 19 8 core 16GB machine Reporter: Ananya I was using a mirror maker 2.0. I was testing the consumer checkpointing functionality. I found that the RemoteClusterUtils.translateOffsets do not give checkpoints for the consumer which runs in assign mode. I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12 My source Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12 My target Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12 I am only doing 1-way replication from my source cluster to the target cluster. Mirror Maker Config: {code:java} clusters = A, B A.bootstrap.servers = localhost:9082 B.bootstrap.servers = localhost:9092 A->B.enabled = true A->B.topics = .* A->B.groups = .* B->A.enabled = false B->A.topics = .* replication.factor=1 checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1 emit.heartbeats.interval.seconds = 2 refresh.topics.interval.seconds=1 refresh.groups.interval.seconds=1 emit.checkpoints.interval.seconds=1 sync.topic.configs.enabled=true sync.topic.configs.interval.seconds=1{code} +*Steps to replicate:*+ * Create a topic on the source cluster * Push some data in the topic using console producer * Start a consumer in assign mode to read from the above topic but only from 1 partition. {code:java} Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9082"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "7"); KafkaConsumer consumer = new KafkaConsumer(properties); TopicPartition tp = new TopicPartition("testTopic", 1); consumer.assign(Collections.singleton(tp)); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord record : records) { System.out.println(new String(record.value()) + "__" + record.partition()); } } }{code} * Stop consumer mid-way. Describe the consumer in the source cluster to get the lag information. {code:java} bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082 --group testTopic GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG testGroup testTopic 0 5 28 23 {code} * Run translate offset method to print the downstream offset. {code:java} Map newOffsets = RemoteClusterUtils.translateOffsets(properties, "A", "testGroup", Duration.ofMillis(5500)); System.out.println(newOffsets.toString());{code} * *{color:#FF}An empty map is returned{color}* *+Expected Outcome:+ {color:#00875a}Translated Committed offset should have been returned.{color}* +*My Debugging*+ On debugging the issue, I found that the checkpoint topic in the target cluster did not have this group's committed offset. Tried multiple times with different commit frequency and topic/group name. It didn't work. Only consumer running in subscribe mode and console consumer with --group flag is giving checkpoint. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10609) Mirror Maker 2.0 RemoteClusterUtils do not return offset map for Assign Mode Consumers
[ https://issues.apache.org/jira/browse/KAFKA-10609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ananya updated KAFKA-10609: --- Description: I was using a mirror maker 2.0. I was testing the consumer checkpointing functionality. I found that the RemoteClusterUtils.translateOffsets do not give checkpoints for the consumer which runs in assign mode. +*Setup Details*+ I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12 My source Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12 My target Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12 I am only doing 1-way replication from my source cluster to the target cluster. +*Mirror Maker Config:*+ {code:java} clusters = A, B A.bootstrap.servers = localhost:9082 B.bootstrap.servers = localhost:9092 A->B.enabled = true A->B.topics = .* A->B.groups = .* B->A.enabled = false B->A.topics = .* replication.factor=1 checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1 emit.heartbeats.interval.seconds = 2 refresh.topics.interval.seconds=1 refresh.groups.interval.seconds=1 emit.checkpoints.interval.seconds=1 sync.topic.configs.enabled=true sync.topic.configs.interval.seconds=1{code} +*Steps to replicate:*+ * Create a topic on the source cluster * Push some data in the topic using console producer * Start a consumer in assign mode to read from the above topic but only from 1 partition. {code:java} Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9082"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "7"); KafkaConsumer consumer = new KafkaConsumer(properties); TopicPartition tp = new TopicPartition("testTopic", 1); consumer.assign(Collections.singleton(tp)); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord record : records) { System.out.println(new String(record.value()) + "__" + record.partition()); } } }{code} * Stop consumer mid-way. Describe the consumer in the source cluster to get the lag information. {code:java} bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082 --group testTopic GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG testGroup testTopic 0 5 28 23 {code} * Run translate offset method to print the downstream offset. {code:java} Map newOffsets = RemoteClusterUtils.translateOffsets(properties, "A", "testGroup", Duration.ofMillis(5500)); System.out.println(newOffsets.toString());{code} * *{color:#ff}An empty map is returned{color}* *+Expected Outcome:+ {color:#00875a}Translated Committed offset should have been returned.{color}* +*My Debugging*+ On debugging the issue, I found that the checkpoint topic in the target cluster did not have this group's committed offset. Tried multiple times with different commit frequency and topic/group name. It didn't work. Only consumer running in subscribe mode and console consumer with --group flag is giving checkpoint. was: I was using a mirror maker 2.0. I was testing the consumer checkpointing functionality. I found that the RemoteClusterUtils.translateOffsets do not give checkpoints for the consumer which runs in assign mode. I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12 My source Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12 My target Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12 I am only doing 1-way replication from my source cluster to the target cluster. Mirror Maker Config: {code:java} clusters = A, B A.bootstrap.servers = localhost:9082 B.bootstrap.servers = localhost:9092 A->B.enabled = true A->B.topics = .* A->B.groups = .* B->A.enabled = false B->A.topics = .* replication.factor=1 checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1 emit.heartbeats.interval.seconds = 2 refresh.topics.interval.seconds=1 refresh.groups.interval.seconds=1 emit.checkpoints.interval.seconds=1 sync.topic.co
[jira] [Commented] (KAFKA-10313) Out of range offset errors leading to offset reset
[ https://issues.apache.org/jira/browse/KAFKA-10313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213690#comment-17213690 ] Jikky John commented on KAFKA-10313: Same issue here > Out of range offset errors leading to offset reset > -- > > Key: KAFKA-10313 > URL: https://issues.apache.org/jira/browse/KAFKA-10313 > Project: Kafka > Issue Type: Bug > Components: consumer >Affects Versions: 2.2.2 >Reporter: Varsha Abhinandan >Priority: Major > > Hi, > > We have been occasionally noticing offset resets happening on the Kafka > consumer because of offset out of range error. However, I don't see any > errors in the broker logs. No logs related to leader-election, replica lag, > Kafka broker pod restarts or anything. (just info logs were enabled in the > prod environment). > > It appeared from the logs that the out of range error was because of the > fetch offset being larger than the offset range on the broker. Noticed this > happening multiple times on different consumers, stream apps in the prod > environment. So, it doesn't seem like an application bug and more like a bug > in the KafkaConsumer. Would like to understand the cause for such errors. > > Also, none of the offset reset options are desirable. Choosing "earliest" > creates a sudden huge lag (we have a retention of 24hours) and choosing > "latest" leads to data loss (the records produced between the out of range > error and when offset reset happens on the consumer). So, wondering if it is > better for the Kafka client to separate out 'auto.offset.reset' config for > just offset not found. For, out of range error maybe the Kafka client can > automatically reset the offset to latest if the fetch offset is higher to > prevent data loss. Also, automatically reset it to earliest if the fetch > offset is lesser than the start offset. > > Following are the logs on the consumer side : > {noformat} > [2020-07-17T08:46:00,322Z] [INFO ] [pipeline-thread-12 > ([prd453-19-event-upsert]-bo-pipeline-12)] > [o.a.k.c.consumer.internals.Fetcher] [Consumer > clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, > groupId=bo-indexer-group-prd453-19] Fetch offset 476383711 is out of range > for partition prd453-19-event-upsert-32, resetting offset > [2020-07-17T08:46:00,330Z] [INFO ] [pipeline-thread-12 > ([prd453-19-event-upsert]-bo-pipeline-12)] > [o.a.k.c.consumer.internals.Fetcher] [Consumer > clientId=bo-indexer-group-prd453-19-on-c19-bo-indexer-upsert-blue-5d665bcbb7-dnvkh-pid-1-kafka-message-source-id-544, > groupId=bo-indexer-group-prd453-19] Resetting offset for partition > prd453-19-event-upsert-32 to offset 453223789. > {noformat} > Broker logs for the partition : > {noformat} > [2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable > segments with base offsets [452091893] due to retention time 8640ms breach > [2020-07-17T07:40:12,082Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Scheduling log > segment [baseOffset 452091893, size 1073741693] for deletion. > [2020-07-17T07:40:12,083Z] [INFO ] [kafka-scheduler-4] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Incrementing log > start offset to 453223789 > [2020-07-17T07:41:12,083Z] [INFO ] [kafka-scheduler-7] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Deleting segment > 452091893 > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted log > /data/kafka/prd453-19-event-upsert-32/000452091893.log.deleted. > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted offset index > /data/kafka/prd453-19-event-upsert-32/000452091893.index.deleted. > [2020-07-17T07:41:12,114Z] [INFO ] [kafka-scheduler-7] > [kafka.log.LogSegment] Deleted time index > /data/kafka/prd453-19-event-upsert-32/000452091893.timeindex.deleted. > [2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] > [kafka.log.ProducerStateManager] [ProducerStateManager > partition=prd453-19-event-upsert-32] Writing producer snapshot at offset > 475609786 > [2020-07-17T07:52:31,836Z] [INFO ] [data-plane-kafka-request-handler-3] > [kafka.log.Log] [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] > Rolled new log segment at offset 475609786 in 1 ms.{noformat} > > {noformat} > [2020-07-17T09:05:12,075Z] [INFO ] [kafka-scheduler-2] [kafka.log.Log] > [Log partition=prd453-19-event-upsert-32, dir=/data/kafka] Found deletable > segm
[GitHub] [kafka] tombentley commented on pull request #9422: KAFKA-10602: Make RetryWithToleranceOperator thread safe
tombentley commented on pull request #9422: URL: https://github.com/apache/kafka/pull/9422#issuecomment-708222714 @kkonstantine yeah, I was hoping someone would recommend a testing approach. I'll see if I'm able to reproduce it in a test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xakassi closed pull request #9211: KAFKA-10426: Deadlock on session key update.
xakassi closed pull request #9211: URL: https://github.com/apache/kafka/pull/9211 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xakassi opened a new pull request #9431: KAFKA-10426: Deadlock on session key update.
xakassi opened a new pull request #9431: URL: https://github.com/apache/kafka/pull/9431 DistributedHerder goes to updateConfigsWithIncrementalCooperative() synchronized method and called configBackingStore.snapshot() which take a lock on internal object in KafkaConfigBackingStore class. Meanwhile KafkaConfigBackingStore in ConsumeCallback inside synchronized block on internal object gets SESSION_KEY record and calls updateListener.onSessionKeyUpdate() which take a lock on DistributedHerder. So, we have a Deadlock. To avoid this, updateListener with new session key should be called outside synchronized block as it's done, for example, for updateListener.onTaskConfigUpdate(updatedTasks). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xakassi commented on pull request #9211: KAFKA-10426: Deadlock on session key update.
xakassi commented on pull request #9211: URL: https://github.com/apache/kafka/pull/9211#issuecomment-708231665 I have closed this PR and submitted a new one: https://github.com/apache/kafka/pull/9431 @kkonstantine Please, take a look at it. Thank you! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10417) suppress() with cogroup() throws ClassCastException
[ https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213725#comment-17213725 ] Christian Duvholt commented on KAFKA-10417: --- Hi [~vvcephei], I suspect that I'm not experienced enough with Kafka Streams to solve this issue, but I can give it a try. Is there any documentation about `enableSendOldValues` which I could read to get a better understanding of the issue? > suppress() with cogroup() throws ClassCastException > --- > > Key: KAFKA-10417 > URL: https://issues.apache.org/jira/browse/KAFKA-10417 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.6.0 >Reporter: Wardha Perinkada Kattu >Priority: Critical > Labels: kafka-streams > Fix For: 2.7.0, 2.6.1 > > > Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` > throws `ClassCastException` > Works fine without the `suppress()` > Code block tested - > {code:java} > val stream1 = requestStreams.merge(successStreams).merge(errorStreams) > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.notificationSerde())) > val streams2 = confirmationStreams > .groupByKey(Grouped.with(Serdes.String(), > serdesConfig.confirmationsSerde())) > val cogrouped = > stream1.cogroup(notificationAggregator).cogroup(streams2, > confirmationsAggregator) > > .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong( > .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store") > > .withValueSerde(serdesConfig.notificationMetricSerde())) > .suppress(Suppressed.untilWindowCloses(unbounded())) > .toStream() > {code} > Exception thrown is: > {code:java} > Caused by: java.lang.ClassCastException: class > org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to > class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier > (org.apache.kafka.streams.kstream.internals.PassThrough and > org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in > unnamed module of loader 'app') > {code} > [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 commented on pull request #9423: KAFKA-9263 The new hw is added to incorrect log when ReplicaAlterLogD…
chia7712 commented on pull request #9423: URL: https://github.com/apache/kafka/pull/9423#issuecomment-708252424 ``` kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords ``` pass on my local. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9422: KAFKA-10602: Make RetryWithToleranceOperator thread safe
tombentley commented on pull request #9422: URL: https://github.com/apache/kafka/pull/9422#issuecomment-708294583 @kkonstantine I've written a small test which interleaves calls to `RetryWithToleranceOperator.execute` and `RetryWithToleranceOperator.executeFailed` and detects conflicts which can arise when those methods are not `synchronized`. It fails reliable for me when they're not `synchronized` and passes reliably when they are. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9365: KAFKA-10566: Fix erroneous config usage warnings
tombentley commented on pull request #9365: URL: https://github.com/apache/kafka/pull/9365#issuecomment-708295751 @rajinisivaram @omkreddy @ijuma @mjsax please could someone take a look at this fix? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9397: KAFKA-10583: Add documentation on the thread-safety of KafkaAdminClient.
tombentley commented on pull request #9397: URL: https://github.com/apache/kafka/pull/9397#issuecomment-708304114 @cmccabe @ijuma @hachikuji any chance we could merge this trivial javadoc change? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10607) Ensure the error counts contains the NONE
[ https://issues.apache.org/jira/browse/KAFKA-10607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Bentley reassigned KAFKA-10607: --- Assignee: Tom Bentley > Ensure the error counts contains the NONE > - > > Key: KAFKA-10607 > URL: https://issues.apache.org/jira/browse/KAFKA-10607 > Project: Kafka > Issue Type: Improvement > Components: clients >Reporter: Boyang Chen >Assignee: Tom Bentley >Priority: Major > > In the RPC errorCounts() call, there are inconsistent behaviors from the > default implementation, for example certain RPCs filter out Errors.NONE > during the map generation. We should make it consistent by applying the > errorCounts() to Errors.NONE for all RPCs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vamossagar12 opened a new pull request #9432: KAFKA-10559 | Initial Commit
vamossagar12 opened a new pull request #9432: URL: https://github.com/apache/kafka/pull/9432 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics
[ https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213813#comment-17213813 ] huxihx commented on KAFKA-10606: I am wondering if we could change the default value for _AllowAutoTopicCreation_ to _false_ in MetadataRequest.json. > Auto create non-existent topics when fetching metadata for all topics > - > > Key: KAFKA-10606 > URL: https://issues.apache.org/jira/browse/KAFKA-10606 > Project: Kafka > Issue Type: Bug >Reporter: Lincong Li >Priority: Major > > The "allow auto topic creation" flag is hardcoded to be true for the > fetch-all-topic metadata request: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37 > In the below code, annotation claims that "*This never causes > auto-creation*". It it NOT true and auto topic creation still gets triggered > under some circumstances. So, this is a bug that needs to be fixed. > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68 > For example, the bug could be manifested in the below situation: > A topic T is being deleted and a request to fetch metadata for all topics > gets sent to one broker. The broker reads names of all topics from its > metadata cache (shown below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196 > Then the broker authorizes all topics and makes sure that they are allowed to > be described. Then the broker tries to get metadata for every authorized > topic by reading the metadata cache again, once for every topic (show below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240 > However, the metadata cache could have been updated while the broker was > authorizing all topics and topic T and its metadata no longer exist in the > cache since the topic got deleted and metadata update requests eventually got > propagated from the controller to all brokers. So, at this point, when the > broker tries to get metadata for topic T from its cache, it realizes that it > does not exist and the broker tries to "auto create" topic T since the > allow-auto-topic-creation flag was set to true in all the fetch-all-topic > metadata requests. > I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was > introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics
[ https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213813#comment-17213813 ] huxihx edited comment on KAFKA-10606 at 10/14/20, 10:45 AM: I am wondering if we could change the default value for _AllowAutoTopicCreation_ to _false_ in MetadataRequest.json. In doing so could we have ALL_TOPICS_REQUEST_DATA actually disable auto-creation. was (Author: huxi_2b): I am wondering if we could change the default value for _AllowAutoTopicCreation_ to _false_ in MetadataRequest.json. > Auto create non-existent topics when fetching metadata for all topics > - > > Key: KAFKA-10606 > URL: https://issues.apache.org/jira/browse/KAFKA-10606 > Project: Kafka > Issue Type: Bug >Reporter: Lincong Li >Priority: Major > > The "allow auto topic creation" flag is hardcoded to be true for the > fetch-all-topic metadata request: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37 > In the below code, annotation claims that "*This never causes > auto-creation*". It it NOT true and auto topic creation still gets triggered > under some circumstances. So, this is a bug that needs to be fixed. > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68 > For example, the bug could be manifested in the below situation: > A topic T is being deleted and a request to fetch metadata for all topics > gets sent to one broker. The broker reads names of all topics from its > metadata cache (shown below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196 > Then the broker authorizes all topics and makes sure that they are allowed to > be described. Then the broker tries to get metadata for every authorized > topic by reading the metadata cache again, once for every topic (show below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240 > However, the metadata cache could have been updated while the broker was > authorizing all topics and topic T and its metadata no longer exist in the > cache since the topic got deleted and metadata update requests eventually got > propagated from the controller to all brokers. So, at this point, when the > broker tries to get metadata for topic T from its cache, it realizes that it > does not exist and the broker tries to "auto create" topic T since the > allow-auto-topic-creation flag was set to true in all the fetch-all-topic > metadata requests. > I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was > introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley opened a new pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()
tombentley opened a new pull request #9433: URL: https://github.com/apache/kafka/pull/9433 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()
tombentley commented on pull request #9433: URL: https://github.com/apache/kafka/pull/9433#issuecomment-708340163 @abbccdda please could you take a look since you opened the JIRA? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
[ https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213887#comment-17213887 ] Pushkar Deole commented on KAFKA-8803: -- [~guozhang] I recently faced this issue on our lab. We use following versions: Kafka broker 2.5.0 Kafka streams and client 2.5.0 So do i need to upgrade to 2.5.1 version for both ? > Stream will not start due to TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > > > Key: KAFKA-8803 > URL: https://issues.apache.org/jira/browse/KAFKA-8803 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Raman Gupta >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.5.0, 2.3.2, 2.4.2 > > Attachments: logs-20200311.txt.gz, logs-client-20200311.txt.gz, > logs.txt.gz, screenshot-1.png > > > One streams app is consistently failing at startup with the following > exception: > {code} > 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] > org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout > exception caught when initializing transactions for task 0_36. This might > happen if the broker is slow to respond, if the network connection to the > broker was interrupted, or if similar circumstances arise. You can increase > producer parameter `max.block.ms` to increase this timeout. > org.apache.kafka.common.errors.TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > {code} > These same brokers are used by many other streams without any issue, > including some in the very same processes for the stream which consistently > throws this exception. > *UPDATE 08/16:* > The very first instance of this error is August 13th 2019, 17:03:36.754 and > it happened for 4 different streams. For 3 of these streams, the error only > happened once, and then the stream recovered. For the 4th stream, the error > has continued to happen, and continues to happen now. > I looked up the broker logs for this time, and see that at August 13th 2019, > 16:47:43, two of four brokers started reporting messages like this, for > multiple partitions: > [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, > fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader > reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread) > The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, > here is a view of the count of these messages over time: > !screenshot-1.png! > However, as noted, the stream task timeout error continues to happen. > I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 > broker. The broker has a patch for KAFKA-8773. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
[ https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213887#comment-17213887 ] Pushkar Deole edited comment on KAFKA-8803 at 10/14/20, 12:59 PM: -- [~guozhang] I recently faced this issue on our lab. We use following versions: Kafka broker 2.5.0 Kafka streams and client 2.5.0 So do i need to upgrade to 2.5.1 version for both i.e. broker as well as clients ? was (Author: pdeole): [~guozhang] I recently faced this issue on our lab. We use following versions: Kafka broker 2.5.0 Kafka streams and client 2.5.0 So do i need to upgrade to 2.5.1 version for both ? > Stream will not start due to TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > > > Key: KAFKA-8803 > URL: https://issues.apache.org/jira/browse/KAFKA-8803 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Raman Gupta >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.5.0, 2.3.2, 2.4.2 > > Attachments: logs-20200311.txt.gz, logs-client-20200311.txt.gz, > logs.txt.gz, screenshot-1.png > > > One streams app is consistently failing at startup with the following > exception: > {code} > 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] > org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout > exception caught when initializing transactions for task 0_36. This might > happen if the broker is slow to respond, if the network connection to the > broker was interrupted, or if similar circumstances arise. You can increase > producer parameter `max.block.ms` to increase this timeout. > org.apache.kafka.common.errors.TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > {code} > These same brokers are used by many other streams without any issue, > including some in the very same processes for the stream which consistently > throws this exception. > *UPDATE 08/16:* > The very first instance of this error is August 13th 2019, 17:03:36.754 and > it happened for 4 different streams. For 3 of these streams, the error only > happened once, and then the stream recovered. For the 4th stream, the error > has continued to happen, and continues to happen now. > I looked up the broker logs for this time, and see that at August 13th 2019, > 16:47:43, two of four brokers started reporting messages like this, for > multiple partitions: > [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, > fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader > reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread) > The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, > here is a view of the count of these messages over time: > !screenshot-1.png! > However, as noted, the stream task timeout error continues to happen. > I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 > broker. The broker has a patch for KAFKA-8773. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-8803) Stream will not start due to TimeoutException: Timeout expired after 60000milliseconds while awaiting InitProducerId
[ https://issues.apache.org/jira/browse/KAFKA-8803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17213887#comment-17213887 ] Pushkar Deole edited comment on KAFKA-8803 at 10/14/20, 1:03 PM: - [~guozhang] I recently faced this issue on our lab. The weird thing is some of the events are getting processed by the stream while other are getting this error (probably some events might be consumed by different streams app) We use following versions: Kafka broker 2.5.0 Kafka streams and client 2.5.0 So do i need to upgrade to 2.5.1 version for both i.e. broker as well as clients ? was (Author: pdeole): [~guozhang] I recently faced this issue on our lab. We use following versions: Kafka broker 2.5.0 Kafka streams and client 2.5.0 So do i need to upgrade to 2.5.1 version for both i.e. broker as well as clients ? > Stream will not start due to TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > > > Key: KAFKA-8803 > URL: https://issues.apache.org/jira/browse/KAFKA-8803 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Raman Gupta >Assignee: Guozhang Wang >Priority: Major > Fix For: 2.5.0, 2.3.2, 2.4.2 > > Attachments: logs-20200311.txt.gz, logs-client-20200311.txt.gz, > logs.txt.gz, screenshot-1.png > > > One streams app is consistently failing at startup with the following > exception: > {code} > 2019-08-14 17:02:29,568 ERROR --- [2ce1b-StreamThread-2] > org.apa.kaf.str.pro.int.StreamTask: task [0_36] Timeout > exception caught when initializing transactions for task 0_36. This might > happen if the broker is slow to respond, if the network connection to the > broker was interrupted, or if similar circumstances arise. You can increase > producer parameter `max.block.ms` to increase this timeout. > org.apache.kafka.common.errors.TimeoutException: Timeout expired after > 6milliseconds while awaiting InitProducerId > {code} > These same brokers are used by many other streams without any issue, > including some in the very same processes for the stream which consistently > throws this exception. > *UPDATE 08/16:* > The very first instance of this error is August 13th 2019, 17:03:36.754 and > it happened for 4 different streams. For 3 of these streams, the error only > happened once, and then the stream recovered. For the 4th stream, the error > has continued to happen, and continues to happen now. > I looked up the broker logs for this time, and see that at August 13th 2019, > 16:47:43, two of four brokers started reporting messages like this, for > multiple partitions: > [2019-08-13 20:47:43,658] INFO [ReplicaFetcher replicaId=3, leaderId=1, > fetcherId=0] Retrying leaderEpoch request for partition xxx-1 as the leader > reported an error: UNKNOWN_LEADER_EPOCH (kafka.server.ReplicaFetcherThread) > The UNKNOWN_LEADER_EPOCH messages continued for some time, and then stopped, > here is a view of the count of these messages over time: > !screenshot-1.png! > However, as noted, the stream task timeout error continues to happen. > I use the static consumer group protocol with Kafka 2.3.0 clients and 2.3.0 > broker. The broker has a patch for KAFKA-8773. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] scanterog commented on a change in pull request #9313: [mm2] Fix consumer/producer properties override
scanterog commented on a change in pull request #9313: URL: https://github.com/apache/kafka/pull/9313#discussion_r503489280 ## File path: connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorMakerConfigTest.java ## @@ -52,10 +52,10 @@ public void testClusterConfigProperties() { "replication.factor", "4")); Map connectorProps = mirrorConfig.connectorBaseConfig(new SourceAndTarget("a", "b"), MirrorSourceConnector.class); -assertEquals("source.cluster.bootstrap.servers is set", -"servers-one", connectorProps.get("source.cluster.bootstrap.servers")); -assertEquals("target.cluster.bootstrap.servers is set", -"servers-two", connectorProps.get("target.cluster.bootstrap.servers")); +assertEquals("source.bootstrap.servers is set", Review comment: I think I have achieved what we want. Explicitly setting "source." prefix for props starting with consumer|producer|admin and setting "source.cluster." otherwise. All tests passed. The code runs fine on my infra. @mimaison please take a look whenever you get some time :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
rajinisivaram commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r504726150 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -813,8 +852,9 @@ case class OffsetTruncationState(offset: Long, truncationCompleted: Boolean) { override def toString: String = "offset:%d-truncationCompleted:%b".format(offset, truncationCompleted) } -case class OffsetAndEpoch(offset: Long, leaderEpoch: Int) { +case class OffsetAndEpoch(offset: Long, leaderEpoch: Int, lastFetchedEpoch: Option[Int] = None) { Review comment: I had initially added another class because I didn't want to change `OffsetAndEpoch`, but I removed that because it looked like too many similar classes. Your suggestion to use InitialFetchState sounds much better, updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
rajinisivaram commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r504727381 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -408,9 +428,12 @@ abstract class AbstractFetcherThread(name: String, def markPartitionsForTruncation(topicPartition: TopicPartition, truncationOffset: Long): Unit = { partitionMapLock.lockInterruptibly() try { + // It is safe to reset `lastFetchedEpoch` here since we don't expect diverging offsets Review comment: Updated. The method is still there for older versions, but it is now disabled with IBP 2.7. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
rajinisivaram commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r504729807 ## File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala ## @@ -432,14 +455,22 @@ abstract class AbstractFetcherThread(name: String, failedPartitions.removeAll(initialFetchStates.keySet) initialFetchStates.forKeyValue { (tp, initialFetchState) => -// We can skip the truncation step iff the leader epoch matches the existing epoch +// For IBP 2.7 onwards, we can rely on truncation based on diverging data returned in fetch responses. +// For older versions, we can skip the truncation step iff the leader epoch matches the existing epoch val currentState = partitionStates.stateValue(tp) -val updatedState = if (currentState != null && currentState.currentLeaderEpoch == initialFetchState.leaderEpoch) { +val updatedState = if (initialFetchState.offset >= 0 && isTruncationOnFetchSupported && initialFetchState.lastFetchedEpoch.nonEmpty) { + if (currentState != null) +currentState Review comment: I refactored this code a bit and added a check for Fetching state. Not sure if I have missed something though. I think we can continue to fetch without truncating if currentState is Fetching when `lastFetchedEpoch` is known. If we need to truncate, we will do that later when we get told about diverging epochs. Does that make sense? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
rajinisivaram commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r504731267 ## File path: core/src/main/scala/kafka/server/DelayedFetch.scala ## @@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long, * Case E: This broker is the leader, but the requested epoch is now fenced * Case F: The fetch offset locates not on the last segment of the log * Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + * Case H: A diverging epoch was found, return response to trigger truncation Review comment: Makes sense, will submit another PR with just those changes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
rajinisivaram commented on pull request #9382: URL: https://github.com/apache/kafka/pull/9382#issuecomment-708446946 @hachikuji Thanks for the review, have addressed the comments so far. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9338: Fixed KAFKA-10515: Serdes used within metered state stores will now be initialized with the default serdes if not already set.
vvcephei commented on pull request #9338: URL: https://github.com/apache/kafka/pull/9338#issuecomment-708466216 Oh, sorry, @thake , I didn't see your replies when I submitted my review last night. Thanks both for your initial feedback and your reply. I'd certainly never pass up the opportunity to hear such concerns or ideas, especially before a release, and I really appreciate that you took the time to raise them. Also, thanks for your contribution in this PR! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor
vvcephei commented on a change in pull request #9384: URL: https://github.com/apache/kafka/pull/9384#discussion_r504763968 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -900,9 +900,7 @@ // These are not settable in the main Streams config; they are set by the StreamThread to pass internal // state into the assignor. -public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__"; -public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__"; -public static final String STREAMS_ADMIN_CLIENT = "__streams.admin.client.instance__"; +public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = "__reference.container.instance__"; public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__"; public static final String NEXT_SCHEDULED_REBALANCE_MS = "__next.probing.rebalance.ms__"; public static final String TIME = "__time__"; Review comment: Sorry, @mjsax , I was referring to all three of "assignment error code", "next scheduled rebalance ms", and "time". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics
[ https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Bentley reassigned KAFKA-10606: --- Assignee: Tom Bentley > Auto create non-existent topics when fetching metadata for all topics > - > > Key: KAFKA-10606 > URL: https://issues.apache.org/jira/browse/KAFKA-10606 > Project: Kafka > Issue Type: Bug >Reporter: Lincong Li >Assignee: Tom Bentley >Priority: Major > > The "allow auto topic creation" flag is hardcoded to be true for the > fetch-all-topic metadata request: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37 > In the below code, annotation claims that "*This never causes > auto-creation*". It it NOT true and auto topic creation still gets triggered > under some circumstances. So, this is a bug that needs to be fixed. > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68 > For example, the bug could be manifested in the below situation: > A topic T is being deleted and a request to fetch metadata for all topics > gets sent to one broker. The broker reads names of all topics from its > metadata cache (shown below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196 > Then the broker authorizes all topics and makes sure that they are allowed to > be described. Then the broker tries to get metadata for every authorized > topic by reading the metadata cache again, once for every topic (show below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240 > However, the metadata cache could have been updated while the broker was > authorizing all topics and topic T and its metadata no longer exist in the > cache since the topic got deleted and metadata update requests eventually got > propagated from the controller to all brokers. So, at this point, when the > broker tries to get metadata for topic T from its cache, it realizes that it > does not exist and the broker tries to "auto create" topic T since the > allow-auto-topic-creation flag was set to true in all the fetch-all-topic > metadata requests. > I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was > introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor
vvcephei commented on a change in pull request #9384: URL: https://github.com/apache/kafka/pull/9384#discussion_r504775858 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -192,12 +191,11 @@ public String toString() { */ @Override public void configure(final Map configs) { -final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs); +assignorConfiguration = new AssignorConfiguration(configs); Review comment: Thanks for the update! Are you still planning to drop the `assignorConfiguration` field in favor of a `referenceContainer` field? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9368: KAFKA-9274: Add timeout handling for state restore and StandbyTasks
vvcephei commented on a change in pull request #9368: URL: https://github.com/apache/kafka/pull/9368#discussion_r504792364 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -482,8 +486,10 @@ boolean tryToCompleteRestoration() { if (restored.containsAll(task.changelogPartitions())) { try { task.completeRestoration(); -} catch (final TimeoutException e) { -log.debug("Could not complete restoration for {} due to {}; will retry", task.id(), e); +task.clearTaskTimeout(); +} catch (final TimeoutException timeoutException) { +task.maybeInitTaskTimeoutOrThrow(now, timeoutException); +log.debug("Could not complete restoration for {} due to {}; will retry", task.id(), timeoutException); Review comment: ```suggestion log.debug(String.format("Could not complete restoration for %s; will retry", task.id()), timeoutException); ``` It might be a good idea to add tests for the log messages so we can tell if they're actually properly formatted or not. Hopefully, the log4j upgrade makes it easier to detect these logging bugs. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -147,9 +147,12 @@ public void update(final Set topicPartitions, final Map task-timeout-deadline} + */ void maybeInitTaskTimeoutOrThrow(final long currentWallClockMs, - final TimeoutException timeoutException, - final Logger log) throws StreamsException { + final Exception cause, + final Logger log) { Review comment: It seems like we ought to just define `log` at the AbstractTask level and avoid having two almost identical `maybeInitTaskTimeoutOrThrow` method definitions. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -148,7 +148,7 @@ public void update(final Set topicPartitions, final Map activeTasks = new LinkedList<>(); for (final Task task : tasks.values()) { try { task.initializeIfNeeded(); -} catch (final LockException | TimeoutException e) { +task.clearTaskTimeout(); +} catch (final LockException retriableException) { // it is possible that if there are multiple threads within the instance that one thread // trying to grab the task from the other, while the other has not released the lock since // it did not participate in the rebalance. In this case we can just retry in the next iteration -log.debug("Could not initialize {} due to the following exception; will retry", task.id(), e); +log.debug("Could not initialize {} due to the following exception; will retry", task.id(), retriableException); Review comment: ```suggestion log.debug(String.format("Could not initialize %s due to the following exception; will retry", task.id()), retriableException); ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram opened a new pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch
rajinisivaram opened a new pull request #9434: URL: https://github.com/apache/kafka/pull/9434 In 2.7, we added lastFetchedEpoch to fetch requests and divergingEpoch to fetch responses. We are not using these for truncation yet, but in order to use these for truncation with IBP 2.7 onwards in the next release, we should make sure that we handle these in all the supporting classes even in 2.7. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response
rajinisivaram commented on a change in pull request #9382: URL: https://github.com/apache/kafka/pull/9382#discussion_r504803409 ## File path: core/src/main/scala/kafka/server/DelayedFetch.scala ## @@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long, * Case E: This broker is the leader, but the requested epoch is now fenced * Case F: The fetch offset locates not on the last segment of the log * Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + * Case H: A diverging epoch was found, return response to trigger truncation Review comment: PR with the changes in DelayedFetch and FetchSession: https://github.com/apache/kafka/pull/9434 ## File path: core/src/main/scala/kafka/server/DelayedFetch.scala ## @@ -77,6 +78,7 @@ class DelayedFetch(delayMs: Long, * Case E: This broker is the leader, but the requested epoch is now fenced * Case F: The fetch offset locates not on the last segment of the log * Case G: The accumulated bytes from all the fetching partitions exceeds the minimum bytes + * Case H: A diverging epoch was found, return response to trigger truncation Review comment: PR with the changes in DelayedFetch and FetchSession: https://github.com/apache/kafka/pull/9434 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10610) Integration tests for each CLI command to make sure it continues working with existing minimal authorizations
Chris Johnson created KAFKA-10610: - Summary: Integration tests for each CLI command to make sure it continues working with existing minimal authorizations Key: KAFKA-10610 URL: https://issues.apache.org/jira/browse/KAFKA-10610 Project: Kafka Issue Type: Test Components: tools Reporter: Chris Johnson It would be nice to have test coverage of all CLI commands (kafka-topics, kafka-acls, kafka-configs, kafka-console-consumer, etc) to ensure that they work for a user given the minimal permissions expected for each command. This will help to catch regressions where a change to an existing command's functionality unwittingly requires expanded permissions. An example regression these kinds of tests would have caught: https://issues.apache.org/jira/browse/KAFKA-10212 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics
[ https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214051#comment-17214051 ] Lincong Li commented on KAFKA-10606: [~huxi_2b] Thanks for your comment. However, clients set this to true explicitly on all paths, and even if they didn't this breaks behavior when getting MD for particular topics (where auto creation may be expected because the topics were named. For example, https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37 Also changing default values for RPC objects changes the behavior for older clients. Thus it is a breaking API change. So even if we want to change the default value of that field (for hypothetical clients that don't explicitly set it), it needs to be in a new RPC version. A simple way to fix it on the server side is: https://github.com/linkedin/kafka/pull/94/commits/95ad9add181db980914a13a6ffe1a88cd5636a6d I prefer the change that does not require Kafka client version upgrade. Because in some cases (e.g. LinkedIn), client version upgrade means hundreds or even thousands of Kafka users will have to bump Kafka client version in their project dependency and it makes this approach less feasible compared to the server side fix. There are definitely other options with more complex (or fancier) way to fix this issue on the server side as well. > Auto create non-existent topics when fetching metadata for all topics > - > > Key: KAFKA-10606 > URL: https://issues.apache.org/jira/browse/KAFKA-10606 > Project: Kafka > Issue Type: Bug >Reporter: Lincong Li >Assignee: Tom Bentley >Priority: Major > > The "allow auto topic creation" flag is hardcoded to be true for the > fetch-all-topic metadata request: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37 > In the below code, annotation claims that "*This never causes > auto-creation*". It it NOT true and auto topic creation still gets triggered > under some circumstances. So, this is a bug that needs to be fixed. > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68 > For example, the bug could be manifested in the below situation: > A topic T is being deleted and a request to fetch metadata for all topics > gets sent to one broker. The broker reads names of all topics from its > metadata cache (shown below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196 > Then the broker authorizes all topics and makes sure that they are allowed to > be described. Then the broker tries to get metadata for every authorized > topic by reading the metadata cache again, once for every topic (show below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240 > However, the metadata cache could have been updated while the broker was > authorizing all topics and topic T and its metadata no longer exist in the > cache since the topic got deleted and metadata update requests eventually got > propagated from the controller to all brokers. So, at this point, when the > broker tries to get metadata for topic T from its cache, it realizes that it > does not exist and the broker tries to "auto create" topic T since the > allow-auto-topic-creation flag was set to true in all the fetch-all-topic > metadata requests. > I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was > introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] hachikuji commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch
hachikuji commented on a change in pull request #9434: URL: https://github.com/apache/kafka/pull/9434#discussion_r504839142 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1084,7 +1084,7 @@ class ReplicaManager(val config: KafkaConfig, fetchInfos.foreach { case (topicPartition, partitionData) => logReadResultMap.get(topicPartition).foreach(logReadResult => { val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata - fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) + fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData, logReadResult.divergingEpoch.nonEmpty)) Review comment: Hmm.. If the `LogReadResult` has a diverging epoch, wouldn't we want to respond immediately? ## File path: core/src/main/scala/kafka/server/DelayedFetch.scala ## @@ -88,6 +90,13 @@ class DelayedFetch(delayMs: Long, try { if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val partition = replicaManager.getPartitionOrException(topicPartition) + +// Case H: Return diverging epoch in response to trigger truncation +if (fetchStatus.hasDivergingEpoch) { Review comment: Here we are using the status from the original fetch. I am wondering if we need to recheck below since it is possible to get a truncation while a fetch is in purgatory. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics
[ https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214077#comment-17214077 ] Ismael Juma edited comment on KAFKA-10606 at 10/14/20, 5:19 PM: Seems like the way to fix this is to pass `false` as the `allowAutoTopicCreation` parameter of `getTopicMetadata` when it's an all topics request. was (Author: ijuma): Seems like the way to fix this is to pass `false` as the `allowAutoTopicCreation` parameter of `getTopicMetadata` when it's a all topics request. > Auto create non-existent topics when fetching metadata for all topics > - > > Key: KAFKA-10606 > URL: https://issues.apache.org/jira/browse/KAFKA-10606 > Project: Kafka > Issue Type: Bug >Reporter: Lincong Li >Assignee: Tom Bentley >Priority: Major > > The "allow auto topic creation" flag is hardcoded to be true for the > fetch-all-topic metadata request: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37 > In the below code, annotation claims that "*This never causes > auto-creation*". It it NOT true and auto topic creation still gets triggered > under some circumstances. So, this is a bug that needs to be fixed. > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68 > For example, the bug could be manifested in the below situation: > A topic T is being deleted and a request to fetch metadata for all topics > gets sent to one broker. The broker reads names of all topics from its > metadata cache (shown below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196 > Then the broker authorizes all topics and makes sure that they are allowed to > be described. Then the broker tries to get metadata for every authorized > topic by reading the metadata cache again, once for every topic (show below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240 > However, the metadata cache could have been updated while the broker was > authorizing all topics and topic T and its metadata no longer exist in the > cache since the topic got deleted and metadata update requests eventually got > propagated from the controller to all brokers. So, at this point, when the > broker tries to get metadata for topic T from its cache, it realizes that it > does not exist and the broker tries to "auto create" topic T since the > allow-auto-topic-creation flag was set to true in all the fetch-all-topic > metadata requests. > I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was > introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics
[ https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214077#comment-17214077 ] Ismael Juma commented on KAFKA-10606: - Seems like the way to fix this is to pass `false` as the `allowAutoTopicCreation` parameter of `getTopicMetadata` when it's a all topics request. > Auto create non-existent topics when fetching metadata for all topics > - > > Key: KAFKA-10606 > URL: https://issues.apache.org/jira/browse/KAFKA-10606 > Project: Kafka > Issue Type: Bug >Reporter: Lincong Li >Assignee: Tom Bentley >Priority: Major > > The "allow auto topic creation" flag is hardcoded to be true for the > fetch-all-topic metadata request: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37 > In the below code, annotation claims that "*This never causes > auto-creation*". It it NOT true and auto topic creation still gets triggered > under some circumstances. So, this is a bug that needs to be fixed. > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68 > For example, the bug could be manifested in the below situation: > A topic T is being deleted and a request to fetch metadata for all topics > gets sent to one broker. The broker reads names of all topics from its > metadata cache (shown below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196 > Then the broker authorizes all topics and makes sure that they are allowed to > be described. Then the broker tries to get metadata for every authorized > topic by reading the metadata cache again, once for every topic (show below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240 > However, the metadata cache could have been updated while the broker was > authorizing all topics and topic T and its metadata no longer exist in the > cache since the topic got deleted and metadata update requests eventually got > propagated from the controller to all brokers. So, at this point, when the > broker tries to get metadata for topic T from its cache, it realizes that it > does not exist and the broker tries to "auto create" topic T since the > allow-auto-topic-creation flag was set to true in all the fetch-all-topic > metadata requests. > I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was > introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics
[ https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214080#comment-17214080 ] Tom Bentley commented on KAFKA-10606: - [~andrewlinc...@gmail.com] agreed that a broker-side fix is necessary. Are you going to open a PR for Apache Kafka too? I'd assigned this to myself, since you didn't assign it to yourself, but if you're intending to open a PR then go ahead. > Auto create non-existent topics when fetching metadata for all topics > - > > Key: KAFKA-10606 > URL: https://issues.apache.org/jira/browse/KAFKA-10606 > Project: Kafka > Issue Type: Bug >Reporter: Lincong Li >Assignee: Tom Bentley >Priority: Major > > The "allow auto topic creation" flag is hardcoded to be true for the > fetch-all-topic metadata request: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37 > In the below code, annotation claims that "*This never causes > auto-creation*". It it NOT true and auto topic creation still gets triggered > under some circumstances. So, this is a bug that needs to be fixed. > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68 > For example, the bug could be manifested in the below situation: > A topic T is being deleted and a request to fetch metadata for all topics > gets sent to one broker. The broker reads names of all topics from its > metadata cache (shown below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196 > Then the broker authorizes all topics and makes sure that they are allowed to > be described. Then the broker tries to get metadata for every authorized > topic by reading the metadata cache again, once for every topic (show below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240 > However, the metadata cache could have been updated while the broker was > authorizing all topics and topic T and its metadata no longer exist in the > cache since the topic got deleted and metadata update requests eventually got > propagated from the controller to all brokers. So, at this point, when the > broker tries to get metadata for topic T from its cache, it realizes that it > does not exist and the broker tries to "auto create" topic T since the > allow-auto-topic-creation flag was set to true in all the fetch-all-topic > metadata requests. > I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was > introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5235) GetOffsetShell: support for multiple topics and consumer configuration override
[ https://issues.apache.org/jira/browse/KAFKA-5235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214081#comment-17214081 ] Ron Dagostino commented on KAFKA-5235: -- KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-635%3A+GetOffsetShell%3A+support+for+multiple+topics+and+consumer+configuration+override > GetOffsetShell: support for multiple topics and consumer configuration > override > --- > > Key: KAFKA-5235 > URL: https://issues.apache.org/jira/browse/KAFKA-5235 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Arseniy Tashoyan >Assignee: Daniel Urban >Priority: Major > Labels: kip, tool > Original Estimate: 168h > Remaining Estimate: 168h > > Currently, GetOffsetShell only allows fetching the offsets of a single topic > with an optional list of which partitions to describe. Besides that, it does > not allow consumer properties to be overridden. The tool does not have a > dedicated script under bin either. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics
[ https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214088#comment-17214088 ] Lincong Li commented on KAFKA-10606: Hi [~tombentley]. Thanks for your comment. I will open a PR with the same change in https://github.com/linkedin/kafka/pull/94/commits/95ad9add181db980914a13a6ffe1a88cd5636a6d Thanks for your comment as well [~ijuma], I prefer broker-side fix instead of client-side fix since the client-side fix requires Kafka users' client version upgrade and it is much more difficult to do compared to deploy a new version of Kafka server. Let me know whether you have any particular concerns regarding the broker-side fix. > Auto create non-existent topics when fetching metadata for all topics > - > > Key: KAFKA-10606 > URL: https://issues.apache.org/jira/browse/KAFKA-10606 > Project: Kafka > Issue Type: Bug >Reporter: Lincong Li >Assignee: Tom Bentley >Priority: Major > > The "allow auto topic creation" flag is hardcoded to be true for the > fetch-all-topic metadata request: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37 > In the below code, annotation claims that "*This never causes > auto-creation*". It it NOT true and auto topic creation still gets triggered > under some circumstances. So, this is a bug that needs to be fixed. > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68 > For example, the bug could be manifested in the below situation: > A topic T is being deleted and a request to fetch metadata for all topics > gets sent to one broker. The broker reads names of all topics from its > metadata cache (shown below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196 > Then the broker authorizes all topics and makes sure that they are allowed to > be described. Then the broker tries to get metadata for every authorized > topic by reading the metadata cache again, once for every topic (show below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240 > However, the metadata cache could have been updated while the broker was > authorizing all topics and topic T and its metadata no longer exist in the > cache since the topic got deleted and metadata update requests eventually got > propagated from the controller to all brokers. So, at this point, when the > broker tries to get metadata for topic T from its cache, it realizes that it > does not exist and the broker tries to "auto create" topic T since the > allow-auto-topic-creation flag was set to true in all the fetch-all-topic > metadata requests. > I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was > introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] abbccdda commented on a change in pull request #9409: KAFKA-10599: Implement basic CLI tool for feature versioning system
abbccdda commented on a change in pull request #9409: URL: https://github.com/apache/kafka/pull/9409#discussion_r504831412 ## File path: core/src/main/scala/kafka/admin/FeatureCommand.scala ## @@ -0,0 +1,359 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.server.BrokerFeatures +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions} +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.utils.Utils + +import java.util.Properties +import scala.collection.Seq +import scala.collection.immutable.ListMap +import scala.jdk.CollectionConverters._ + +import joptsimple.OptionSpec + +object FeatureCommand { + + def main(args: Array[String]): Unit = { +val opts = new FeatureCommandOptions(args) +val featureApis = new FeatureApis(opts) +var exitCode = 0 +try { + featureApis.execute() +} catch { + case e: IllegalArgumentException => +printException(e) +opts.parser.printHelpOn(System.err) +exitCode = 1 + case _: UpdateFeaturesException => +exitCode = 1 + case e: Throwable => +printException(e) +exitCode = 1 +} finally { + featureApis.close() + Exit.exit(exitCode) +} + } + + private def printException(exception: Throwable): Unit = { +System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception)) + } +} + +class UpdateFeaturesException(message: String) extends RuntimeException(message) + +/** + * A class that provides necessary APIs to bridge the Admin client feature APIs with the CLI tool. + * + * @param opts the CLI options + */ +class FeatureApis(var opts: FeatureCommandOptions) { + private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures + private val adminClient = createAdminClient() + + private def pad(op: String): String = { +f"$op%11s" + } + + private val addOp = pad("[Add]") + private val upgradeOp = pad("[Upgrade]") + private val deleteOp = pad("[Delete]") + private val downgradeOp = pad("[Downgrade]") + + // For testing only. + def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = { Review comment: nit: we could make these functions package private This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics
[ https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Bentley reassigned KAFKA-10606: --- Assignee: Lincong Li (was: Tom Bentley) > Auto create non-existent topics when fetching metadata for all topics > - > > Key: KAFKA-10606 > URL: https://issues.apache.org/jira/browse/KAFKA-10606 > Project: Kafka > Issue Type: Bug >Reporter: Lincong Li >Assignee: Lincong Li >Priority: Major > > The "allow auto topic creation" flag is hardcoded to be true for the > fetch-all-topic metadata request: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37 > In the below code, annotation claims that "*This never causes > auto-creation*". It it NOT true and auto topic creation still gets triggered > under some circumstances. So, this is a bug that needs to be fixed. > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68 > For example, the bug could be manifested in the below situation: > A topic T is being deleted and a request to fetch metadata for all topics > gets sent to one broker. The broker reads names of all topics from its > metadata cache (shown below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196 > Then the broker authorizes all topics and makes sure that they are allowed to > be described. Then the broker tries to get metadata for every authorized > topic by reading the metadata cache again, once for every topic (show below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240 > However, the metadata cache could have been updated while the broker was > authorizing all topics and topic T and its metadata no longer exist in the > cache since the topic got deleted and metadata update requests eventually got > propagated from the controller to all brokers. So, at this point, when the > broker tries to get metadata for topic T from its cache, it realizes that it > does not exist and the broker tries to "auto create" topic T since the > allow-auto-topic-creation flag was set to true in all the fetch-all-topic > metadata requests. > I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was > introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10044) Deprecate ConsumerConfig#addDeserializerToConfig and ProducerConfig#addSerializerToConfig
[ https://issues.apache.org/jira/browse/KAFKA-10044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-10044. - Resolution: Fixed Resolved via https://github.com/apache/kafka/pull/9013 > Deprecate ConsumerConfig#addDeserializerToConfig and > ProducerConfig#addSerializerToConfig > - > > Key: KAFKA-10044 > URL: https://issues.apache.org/jira/browse/KAFKA-10044 > Project: Kafka > Issue Type: Improvement >Reporter: Chia-Ping Tsai >Assignee: Chia-Ping Tsai >Priority: Minor > Labels: need-kip > > from [~ijuma] suggestion > (https://github.com/apache/kafka/pull/8605#discussion_r430431086) > {quote} > I think you could submit a KIP for the deprecation of the two methods in this > class, but we can merge the other changes in the meantime. > {quote} -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9929) Support reverse iterator on WindowStore
[ https://issues.apache.org/jira/browse/KAFKA-9929?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bill Bejeck resolved KAFKA-9929. Resolution: Fixed Resolved via !https://github.com/favicon.ico|width=16,height=16! [GitHub Pull Request #9137|https://github.com/apache/kafka/pull/9137] !https://github.com/favicon.ico|width=16,height=16! [GitHub Pull Request #9138|https://github.com/apache/kafka/pull/9138] !https://github.com/favicon.ico|width=16,height=16! [GitHub Pull Request #9139|https://github.com/apache/kafka/pull/9139] !https://github.com/favicon.ico|width=16,height=16! [GitHub Pull Request #9321|https://github.com/apache/kafka/pull/9321] > Support reverse iterator on WindowStore > --- > > Key: KAFKA-9929 > URL: https://issues.apache.org/jira/browse/KAFKA-9929 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Jorge Esteban Quilcate Otoya >Assignee: Jorge Esteban Quilcate Otoya >Priority: Major > Labels: needs-kip > > Currently, WindowStore fetch operations return an iterator sorted from > earliest to latest result: > ``` > * For each key, the iterator guarantees ordering of windows, starting from > the oldest/earliest > * available window to the newest/latest window. > ``` > > We have a use-case where traces are stored in a WindowStore > and use Kafka Streams to create a materialized view of traces. A query > request comes with a time range (e.g. now-1h, now) and want to return the > most recent results, i.e. fetch from this period of time, iterate and pattern > match latest/most recent traces, and if enough results, then reply without > moving further on the iterator. > Same store is used to search for previous traces. In this case, it search a > key for the last day, if found traces, we would also like to iterate from the > most recent. > RocksDb seems to support iterating backward and forward: > [https://github.com/facebook/rocksdb/wiki/Iterator#iterating-upper-bound-and-lower-bound] > > For reference: This in some way extracts some bits from this previous issue: > https://issues.apache.org/jira/browse/KAFKA-4212: > > > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via > > segment dropping, but it stores multiple items per key, based on their > > timestamp. But this store can be repurposed as a cache by fetching the > > items in reverse chronological order and returning the first item found. > > Would like to know if there is any impediment on RocksDb or WindowStore to > support this. > Adding an argument to reverse in current fetch methods would be great: > ``` > WindowStore.fetch(from,to,Direction.BACKWARD|FORWARD) > ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9915) Throttle Create Topic, Create Partition and Delete Topic Operations
[ https://issues.apache.org/jira/browse/KAFKA-9915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214116#comment-17214116 ] Bill Bejeck commented on KAFKA-9915: [~dajac] can we resolve this issue? Looks like it's completed. > Throttle Create Topic, Create Partition and Delete Topic Operations > --- > > Key: KAFKA-9915 > URL: https://issues.apache.org/jira/browse/KAFKA-9915 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 2.7.0 > > > This tracks the completion of the KIP-599: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-599%3A+Throttle+Create+Topic%2C+Create+Partition+and+Delete+Topic+Operations]. > If/when the KIP is approved by the community, we will create smaller > sub-tasks to track overall progress. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9915) Throttle Create Topic, Create Partition and Delete Topic Operations
[ https://issues.apache.org/jira/browse/KAFKA-9915?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214121#comment-17214121 ] Ismael Juma commented on KAFKA-9915: [~dajac] Sounds good, I missed it when I was looking before. > Throttle Create Topic, Create Partition and Delete Topic Operations > --- > > Key: KAFKA-9915 > URL: https://issues.apache.org/jira/browse/KAFKA-9915 > Project: Kafka > Issue Type: New Feature > Components: core >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > Fix For: 2.7.0 > > > This tracks the completion of the KIP-599: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-599%3A+Throttle+Create+Topic%2C+Create+Partition+and+Delete+Topic+Operations]. > If/when the KIP is approved by the community, we will create smaller > sub-tasks to track overall progress. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] Lincong opened a new pull request #9435: MINOR KAFKA-10606 Disable auto topic creation for fetch-all-topic-metadata request
Lincong opened a new pull request #9435: URL: https://github.com/apache/kafka/pull/9435 There is a bug that causes fetch-all-topic-metadata requests triggering auto topic creation. Details are described in KAFKA-10606. This is the simplest way to fix this bug on the broker side. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10606) Auto create non-existent topics when fetching metadata for all topics
[ https://issues.apache.org/jira/browse/KAFKA-10606?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214130#comment-17214130 ] Lincong Li commented on KAFKA-10606: Hi [~tombentley] [~ijuma] I just created a PR. Let me know if you have any feedback or suggestions. Thanks! https://github.com/apache/kafka/pull/9435 > Auto create non-existent topics when fetching metadata for all topics > - > > Key: KAFKA-10606 > URL: https://issues.apache.org/jira/browse/KAFKA-10606 > Project: Kafka > Issue Type: Bug >Reporter: Lincong Li >Assignee: Lincong Li >Priority: Major > > The "allow auto topic creation" flag is hardcoded to be true for the > fetch-all-topic metadata request: > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L37 > In the below code, annotation claims that "*This never causes > auto-creation*". It it NOT true and auto topic creation still gets triggered > under some circumstances. So, this is a bug that needs to be fixed. > https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java#L68 > For example, the bug could be manifested in the below situation: > A topic T is being deleted and a request to fetch metadata for all topics > gets sent to one broker. The broker reads names of all topics from its > metadata cache (shown below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1196 > Then the broker authorizes all topics and makes sure that they are allowed to > be described. Then the broker tries to get metadata for every authorized > topic by reading the metadata cache again, once for every topic (show below). > https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/KafkaApis.scala#L1240 > However, the metadata cache could have been updated while the broker was > authorizing all topics and topic T and its metadata no longer exist in the > cache since the topic got deleted and metadata update requests eventually got > propagated from the controller to all brokers. So, at this point, when the > broker tries to get metadata for topic T from its cache, it realizes that it > does not exist and the broker tries to "auto create" topic T since the > allow-auto-topic-creation flag was set to true in all the fetch-all-topic > metadata requests. > I think this bug exists since "*metadataRequest.allowAutoTopicCreation*" was > introduced. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7061) Enhanced log compaction
[ https://issues.apache.org/jira/browse/KAFKA-7061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214188#comment-17214188 ] Bill Bejeck commented on KAFKA-7061: PR is still open, and feature freeze for 2.7 passed on 10/7. So I'm moving this out of the expected KIP list for the 2.7 release. > Enhanced log compaction > --- > > Key: KAFKA-7061 > URL: https://issues.apache.org/jira/browse/KAFKA-7061 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 2.5.0 >Reporter: Luis Cabral >Assignee: Senthilnathan Muthusamy >Priority: Major > Labels: kip > > Enhance log compaction to support more than just offset comparison, so the > insertion order isn't dictating which records to keep. > Default behavior is kept as it was, with the enhanced approached having to be > purposely activated. > The enhanced compaction is done either via the record timestamp, by settings > the new configuration as "timestamp" or via the record headers by setting > this configuration to anything other than the default "offset" or the > reserved "timestamp". > See > [KIP-280|https://cwiki.apache.org/confluence/display/KAFKA/KIP-280%3A+Enhanced+log+compaction] > for more details. > +From Guozhang:+ We should emphasize on the WIKI that the newly introduced > config yields to the existing "log.cleanup.policy", i.e. if the latter's > value is `delete` not `compact`, then the previous config would be ignored. > +From Jun Rao:+ With the timestamp/header strategy, the behavior of the > application may need to change. In particular, the application can't just > blindly take the record with a larger offset and assuming that it's the value > to keep. It needs to check the timestamp or the header now. So, it would be > useful to at least document this. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on a change in pull request #8918: Use debug level logging for noisy log messages in Connect
rhauch commented on a change in pull request #8918: URL: https://github.com/apache/kafka/pull/8918#discussion_r504899011 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -492,7 +492,7 @@ public boolean commitOffsets() { // to persistent storage // Next we need to wait for all outstanding messages to finish sending -log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); +log.debug("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); Review comment: I'm not convinced that changing this to DEBUG is worth it. The number of source records output in this log message can be instrumental in some cases. For example, if the producer does not keep up with the source task (for whatever reason), this currently INFO-level message appears shortly before the following ERROR-level message: ``` ... ERROR WorkerSourceTask{id=...} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter) ``` The number of outstanding source records reported by this line is an important factor in determining how to tune the producer and `offset.flush.timeout.ms` value. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #9407: MINOR: Merge log error to avoid double error
rhauch commented on a change in pull request #9407: URL: https://github.com/apache/kafka/pull/9407#discussion_r504901806 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java ## @@ -184,8 +184,7 @@ private void doRun() throws InterruptedException { execute(); } catch (Throwable t) { -log.error("{} Task threw an uncaught and unrecoverable exception", this, t); -log.error("{} Task is being killed and will not recover until manually restarted", this); +log.error("{} Task threw an uncaught and unrecoverable exception, task is being killed and will not recover until manually restarted", this, t); Review comment: I also like @kkonstantine's suggested format. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik opened a new pull request #9436: MINOR: Check for active controller in UpdateFeatures request processing logic
kowshik opened a new pull request #9436: URL: https://github.com/apache/kafka/pull/9436 Tuned the code a bit to check for active controller upfront in UpdateFeatures request processing logic, before the event is queued. **Tests:** Relying on existing test, particularly: `UpdateFeaturesTest.testShouldFailRequestIfNotController`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #9436: MINOR: Check for active controller in UpdateFeatures request processing logic
kowshik commented on pull request #9436: URL: https://github.com/apache/kafka/pull/9436#issuecomment-708598843 cc @junrao @abbccdda This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor
mjsax commented on a change in pull request #9384: URL: https://github.com/apache/kafka/pull/9384#discussion_r504909676 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -192,12 +191,11 @@ public String toString() { */ @Override public void configure(final Map configs) { -final AssignorConfiguration assignorConfiguration = new AssignorConfiguration(configs); +assignorConfiguration = new AssignorConfiguration(configs); Review comment: I never intended to drop it. Maybe I miss understand your comment? We could replace the field `StreamsParttionAssignor#taskManager` etc with `StreamsPartitionAssigner#referenceContainer` but it just make the code lines longer each time we need to access the TM (etc). Thus, it seems to make the code more readable if we just "extract" those field from the reference container once? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor
mjsax commented on a change in pull request #9384: URL: https://github.com/apache/kafka/pull/9384#discussion_r504910261 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -900,9 +900,7 @@ // These are not settable in the main Streams config; they are set by the StreamThread to pass internal // state into the assignor. -public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__"; -public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__"; -public static final String STREAMS_ADMIN_CLIENT = "__streams.admin.client.instance__"; +public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = "__reference.container.instance__"; public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__"; public static final String NEXT_SCHEDULED_REBALANCE_MS = "__next.probing.rebalance.ms__"; public static final String TIME = "__time__"; Review comment: Ah. Ack. Actually, we can also pull `INTERNAL_TASK_ASSIGNOR_CLASS` and `ASSIGNMENT_LISTENER` into the reference container. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10611) Merge log error to avoid double error
Benoit MAGGI created KAFKA-10611: Summary: Merge log error to avoid double error Key: KAFKA-10611 URL: https://issues.apache.org/jira/browse/KAFKA-10611 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 2.6.0 Reporter: Benoit MAGGI When using an error tracking system, 2 error logs means 2 different alerts. It's best to group the logs and have one error with all information. For example when using with [Sentry|https://sentry.io/welcome/], this [double line|https://github.com/apache/kafka/blob/775f0d484b6fccc3d985a9d53d86d7a3710c0b22/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L187] of log.error will create 2 different Issues. One can merge the issues but it will be simpler to have a single error log line -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10472) Consider migrating Topology methods to the Builder pattern
[ https://issues.apache.org/jira/browse/KAFKA-10472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214210#comment-17214210 ] Huynh Quang Thao commented on KAFKA-10472: -- Hi [~mjsax], I followed on [https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals |https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals] However, I don't have permission to create a KIP document. What is the next step I should do to have the permission? Or I should create somewhere else first? Thanks. > Consider migrating Topology methods to the Builder pattern > -- > > Key: KAFKA-10472 > URL: https://issues.apache.org/jira/browse/KAFKA-10472 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Huynh Quang Thao >Priority: Minor > Labels: need-kip > > During code review for KIP-478, I got this feedback from [~bbejeck] . > In Topology, we have methods like this: > {code:java} > public synchronized Topology addGlobalStore( > final StoreBuilder storeBuilder, > final String sourceName, > final TimestampExtractor timestampExtractor, > final Deserializer keyDeserializer, > final Deserializer valueDeserializer, > final String topic, > final String processorName, > final ProcessorSupplier stateUpdateSupplier){code} > It would probably be better UX to preset a builder interface like: > {code:java} > public synchronized Topology addGlobalStore( > AddGlobalStoreParameters.fromStoreBuilder(storeBuiler) > .withSourceName(sourceName) > .withSourceTopic(topic) > .withTimestampExtractor(timestampExtractor) > .withKeyDeserializer(keyDeserializer) > .withValueDeserializer(valueDeserializer) > .withProcessorName(processorName) > .withStateUpdateSupplier(stateUpdateSupplier) > ){code} > > Note: new API design proposals should take into account the proposed grammar: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kowshik commented on a change in pull request #9409: KAFKA-10599: Implement basic CLI tool for feature versioning system
kowshik commented on a change in pull request #9409: URL: https://github.com/apache/kafka/pull/9409#discussion_r504915393 ## File path: core/src/main/scala/kafka/admin/FeatureCommand.scala ## @@ -0,0 +1,359 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.server.BrokerFeatures +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions} +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.utils.Utils + +import java.util.Properties +import scala.collection.Seq +import scala.collection.immutable.ListMap +import scala.jdk.CollectionConverters._ + +import joptsimple.OptionSpec + +object FeatureCommand { + + def main(args: Array[String]): Unit = { +val opts = new FeatureCommandOptions(args) +val featureApis = new FeatureApis(opts) +var exitCode = 0 +try { + featureApis.execute() +} catch { + case e: IllegalArgumentException => +printException(e) +opts.parser.printHelpOn(System.err) +exitCode = 1 + case _: UpdateFeaturesException => +exitCode = 1 + case e: Throwable => +printException(e) +exitCode = 1 +} finally { + featureApis.close() + Exit.exit(exitCode) +} + } + + private def printException(exception: Throwable): Unit = { +System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception)) + } +} + +class UpdateFeaturesException(message: String) extends RuntimeException(message) + +/** + * A class that provides necessary APIs to bridge the Admin client feature APIs with the CLI tool. + * + * @param opts the CLI options + */ +class FeatureApis(var opts: FeatureCommandOptions) { + private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures + private val adminClient = createAdminClient() + + private def pad(op: String): String = { +f"$op%11s" + } + + private val addOp = pad("[Add]") + private val upgradeOp = pad("[Upgrade]") + private val deleteOp = pad("[Delete]") + private val downgradeOp = pad("[Downgrade]") + + // For testing only. + def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = { Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #9409: KAFKA-10599: Implement basic CLI tool for feature versioning system
kowshik commented on pull request #9409: URL: https://github.com/apache/kafka/pull/9409#issuecomment-708609368 @abbccdda Thanks for the review! I've addressed the most recent comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cyrusv commented on a change in pull request #8918: Use debug level logging for noisy log messages in Connect
cyrusv commented on a change in pull request #8918: URL: https://github.com/apache/kafka/pull/8918#discussion_r504916758 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -492,7 +492,7 @@ public boolean commitOffsets() { // to persistent storage // Next we need to wait for all outstanding messages to finish sending -log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); +log.debug("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); Review comment: I do not feel strongly about this particular message -- it represents about 2% of the connect log volume in my deployment, so not too bad. Are you still onboard with the other log levels I've updated in the PR? In which case, I will revert this one? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bmaggi commented on pull request #9407: MINOR: Merge log error to avoid double error
bmaggi commented on pull request #9407: URL: https://github.com/apache/kafka/pull/9407#issuecomment-708611073 Thanks for the comments. Following suggestions: * I created a JIRA ticket [KAFKA-10611](https://issues.apache.org/jira/browse/KAFKA-10611) on the subject * I also added a new commit using the suggested text (closer to the old version) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10611) Merge log error to avoid double error
[ https://issues.apache.org/jira/browse/KAFKA-10611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Benoit MAGGI updated KAFKA-10611: - External issue URL: https://github.com/apache/kafka/pull/9407 > Merge log error to avoid double error > - > > Key: KAFKA-10611 > URL: https://issues.apache.org/jira/browse/KAFKA-10611 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.6.0 >Reporter: Benoit MAGGI >Priority: Trivial > > When using an error tracking system, 2 error logs means 2 different alerts. > It's best to group the logs and have one error with all information. > For example when using with [Sentry|https://sentry.io/welcome/], this [double > line|https://github.com/apache/kafka/blob/775f0d484b6fccc3d985a9d53d86d7a3710c0b22/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L187] > of log.error will create 2 different Issues. > One can merge the issues but it will be simpler to have a single error log > line > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10472) Consider migrating Topology methods to the Builder pattern
[ https://issues.apache.org/jira/browse/KAFKA-10472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214221#comment-17214221 ] Matthias J. Sax commented on KAFKA-10472: - Just create an account and share your account name, so I can grant permissions. > Consider migrating Topology methods to the Builder pattern > -- > > Key: KAFKA-10472 > URL: https://issues.apache.org/jira/browse/KAFKA-10472 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Assignee: Huynh Quang Thao >Priority: Minor > Labels: need-kip > > During code review for KIP-478, I got this feedback from [~bbejeck] . > In Topology, we have methods like this: > {code:java} > public synchronized Topology addGlobalStore( > final StoreBuilder storeBuilder, > final String sourceName, > final TimestampExtractor timestampExtractor, > final Deserializer keyDeserializer, > final Deserializer valueDeserializer, > final String topic, > final String processorName, > final ProcessorSupplier stateUpdateSupplier){code} > It would probably be better UX to preset a builder interface like: > {code:java} > public synchronized Topology addGlobalStore( > AddGlobalStoreParameters.fromStoreBuilder(storeBuiler) > .withSourceName(sourceName) > .withSourceTopic(topic) > .withTimestampExtractor(timestampExtractor) > .withKeyDeserializer(keyDeserializer) > .withValueDeserializer(valueDeserializer) > .withProcessorName(processorName) > .withStateUpdateSupplier(stateUpdateSupplier) > ){code} > > Note: new API design proposals should take into account the proposed grammar: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on a change in pull request #8918: Use debug level logging for noisy log messages in Connect
rhauch commented on a change in pull request #8918: URL: https://github.com/apache/kafka/pull/8918#discussion_r504920005 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -492,7 +492,7 @@ public boolean commitOffsets() { // to persistent storage // Next we need to wait for all outstanding messages to finish sending -log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); +log.debug("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); Review comment: Yeah, I think the log messages other than this one are fine as DEBUG. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thake commented on a change in pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values
thake commented on a change in pull request #9338: URL: https://github.com/apache/kafka/pull/9338#discussion_r504924920 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +public interface WrappingNullableSerde extends Serde { +@SuppressWarnings({"unchecked", "rawtypes"}) +default void setIfUnset(final Serde defaultKeySerde, final Serde defaultValueSerde) { +final Serializer serializer = this.serializer(); Review comment: Ah, my bad. I didn't get the API right. I propably have worked too much on kotlin code with a lot of immutables :) The abstract class is a good idea to circumvent this problem. I'll check the PR and comment on it directly. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9435: MINOR KAFKA-10606 Disable auto topic creation for fetch-all-topic-metadata request
ijuma commented on pull request #9435: URL: https://github.com/apache/kafka/pull/9435#issuecomment-708617750 Thanks for the PR. Thoughts on how to test this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9338: KAFKA-10515: Properly initialize nullable Serdes with default values
vvcephei commented on a change in pull request #9338: URL: https://github.com/apache/kafka/pull/9338#discussion_r504936524 ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/WrappingNullableSerde.java ## @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +public interface WrappingNullableSerde extends Serde { +@SuppressWarnings({"unchecked", "rawtypes"}) +default void setIfUnset(final Serde defaultKeySerde, final Serde defaultValueSerde) { +final Serializer serializer = this.serializer(); Review comment: No worries, I think this is actually always ok in practice, since I think that all the Wrapping implementations keep their de/serializers in fields anyway. But then again, we've had _so_ many bugs with serdes that I feel a bit twitchy about just ignoring the potential for a future bug that I happened to notice here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10612) Log When SSL Authentication is in Unexpected State
David Mollitor created KAFKA-10612: -- Summary: Log When SSL Authentication is in Unexpected State Key: KAFKA-10612 URL: https://issues.apache.org/jira/browse/KAFKA-10612 Project: Kafka Issue Type: Improvement Reporter: David Mollitor Recently got into some deep troubleshooting of Kafka SSL client authentication. I was looking at a lot of SSL debug logging and seeing that the client was correctly passing its client credentials but the client would not authorize correctly with Apache Sentry. I finally discovered that the issue was simply that {{ssl.client.auth}} was set to {{none}}. D'oh. It would have been helpful to get some broker logging indicating that the client is doing SSL authentication but that none is required by the server. I doubt many environments would bother setting it up if it wasn't going to be used. https://kafka.apache.org/documentation/#ssl.client.auth -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] belugabehr opened a new pull request #9437: KAFKA-10612: Log When SSL Authentication is in Unexpected State
belugabehr opened a new pull request #9437: URL: https://github.com/apache/kafka/pull/9437 Additional logging, no functional changes. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bbejeck commented on pull request #9237: KAFKA-10454 / Update copartitionSourceGroups when optimization algorithm is triggered
bbejeck commented on pull request #9237: URL: https://github.com/apache/kafka/pull/9237#issuecomment-708635800 Java 8 failed [StreamTableJoinTopologyOptimizationIntegrationTest/](https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-9237/17/testReport/junit/org.apache.kafka.streams.integration/StreamTableJoinTopologyOptimizationIntegrationTest/Build___JDK_8___shouldDoStreamTableJoinWithDifferentNumberOfPartitions_Optimization___none_/?cloudbees-analytics-link=scm-reporting%2Ftests%2Ffailed) seems like a flaky test failure as it was due directory clean-up. Java 11 and Java 15 passed. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch
rajinisivaram commented on a change in pull request #9434: URL: https://github.com/apache/kafka/pull/9434#discussion_r504952903 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1084,7 +1084,7 @@ class ReplicaManager(val config: KafkaConfig, fetchInfos.foreach { case (topicPartition, partitionData) => logReadResultMap.get(topicPartition).foreach(logReadResult => { val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata - fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData)) + fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData, logReadResult.divergingEpoch.nonEmpty)) Review comment: ah, yes, so we don't need to check the original result in DelayedFetch, we return immediately here. Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on a change in pull request #9434: MINOR: Handle lastFetchedEpoch/divergingEpoch in FetchSession and DelayedFetch
rajinisivaram commented on a change in pull request #9434: URL: https://github.com/apache/kafka/pull/9434#discussion_r504954049 ## File path: core/src/main/scala/kafka/server/DelayedFetch.scala ## @@ -88,6 +90,13 @@ class DelayedFetch(delayMs: Long, try { if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) { val partition = replicaManager.getPartitionOrException(topicPartition) + +// Case H: Return diverging epoch in response to trigger truncation +if (fetchStatus.hasDivergingEpoch) { Review comment: @hachikuji Thanks for the review. Makes sense, I have added a new check at the end instead of this one, not sure if there is a better way to check. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Lincong commented on pull request #9435: MINOR KAFKA-10606 Disable auto topic creation for fetch-all-topic-metadata request
Lincong commented on pull request #9435: URL: https://github.com/apache/kafka/pull/9435#issuecomment-708650004 @ijuma One way to test it could be to mock objects passed to the `KafkaApis` class. The `MetadataCache` object should be mocked in a way that upon the first invocation on the method `getAllTopics` to simulate the scenario in which the metadata cache got updated "concurrently" with `handleTopicMetadataRequest`. However, IMO, this change is pretty minor. So, I am wondering how necessary it is to add tests for it? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cyrusv commented on a change in pull request #8918: Use debug level logging for noisy log messages in Connect
cyrusv commented on a change in pull request #8918: URL: https://github.com/apache/kafka/pull/8918#discussion_r504963920 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java ## @@ -492,7 +492,7 @@ public boolean commitOffsets() { // to persistent storage // Next we need to wait for all outstanding messages to finish sending -log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); +log.debug("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size()); Review comment: Sounds good, updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor
mjsax commented on a change in pull request #9384: URL: https://github.com/apache/kafka/pull/9384#discussion_r504910261 ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -900,9 +900,7 @@ // These are not settable in the main Streams config; they are set by the StreamThread to pass internal // state into the assignor. -public static final String TASK_MANAGER_FOR_PARTITION_ASSIGNOR = "__task.manager.instance__"; -public static final String STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR = "__streams.metadata.state.instance__"; -public static final String STREAMS_ADMIN_CLIENT = "__streams.admin.client.instance__"; +public static final String REFERENCE_CONTAINER_PARTITION_ASSIGNOR = "__reference.container.instance__"; public static final String ASSIGNMENT_ERROR_CODE = "__assignment.error.code__"; public static final String NEXT_SCHEDULED_REBALANCE_MS = "__next.probing.rebalance.ms__"; public static final String TIME = "__time__"; Review comment: Ah. Ack. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9435: MINOR KAFKA-10606 Disable auto topic creation for fetch-all-topic-metadata request
ijuma commented on pull request #9435: URL: https://github.com/apache/kafka/pull/9435#issuecomment-708671677 `KafkaApisTest` has a few tests like that. Could we add one there? The main concern is that we will regress here if we don't have tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10613) Broker should not set leader epoch if the list-offset request version < 4
Guozhang Wang created KAFKA-10613: - Summary: Broker should not set leader epoch if the list-offset request version < 4 Key: KAFKA-10613 URL: https://issues.apache.org/jira/browse/KAFKA-10613 Project: Kafka Issue Type: Improvement Components: streams Reporter: Guozhang Wang Assignee: Guozhang Wang The list-offset response added a new field in version 4: {code} { "name": "LeaderEpoch", "type": "int32", "versions": "4+" } {code} And the compiled code would throw UnsupportedVersionException if that field is not default (-1) with version < 4. However, on the broker side we forget to add the logic to not setting this field based on the request version. This would cause old versioned clients' list-offset call to always get UnsupportedVersionException and an empty result would be returned. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] guozhangwang opened a new pull request #9438: KAFKA-10613: Only set leader epoch when list-offset version >= 4
guozhangwang opened a new pull request #9438: URL: https://github.com/apache/kafka/pull/9438 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10613) Broker should not set leader epoch if the list-offset request version < 4
[ https://issues.apache.org/jira/browse/KAFKA-10613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17214294#comment-17214294 ] Ismael Juma commented on KAFKA-10613: - When did we regress here? > Broker should not set leader epoch if the list-offset request version < 4 > - > > Key: KAFKA-10613 > URL: https://issues.apache.org/jira/browse/KAFKA-10613 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Assignee: Guozhang Wang >Priority: Major > > The list-offset response added a new field in version 4: > {code} > { "name": "LeaderEpoch", "type": "int32", "versions": "4+" } > {code} > And the compiled code would throw UnsupportedVersionException if that field > is not default (-1) with version < 4. However, on the broker side we forget > to add the logic to not setting this field based on the request version. This > would cause old versioned clients' list-offset call to always get > UnsupportedVersionException and an empty result would be returned. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #9438: KAFKA-10613: Only set leader epoch when list-offset version >= 4
ijuma commented on pull request #9438: URL: https://github.com/apache/kafka/pull/9438#issuecomment-708677767 Can we add a test please? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #9384: MINOR: remove explicit passing of AdminClient into StreamsPartitionAssignor
mjsax commented on pull request #9384: URL: https://github.com/apache/kafka/pull/9384#issuecomment-708678450 Address comments. @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9438: KAFKA-10613: Only set leader epoch when list-offset version >= 4
guozhangwang commented on pull request #9438: URL: https://github.com/apache/kafka/pull/9438#issuecomment-708678537 System tests on `StreamsUpgradeTest.test_metadata_upgrade`: trunk failed: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4220/ this branch succeeded: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4222/ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #9428: MINOR: fix a bug in removing elements from an ImplicitLinkedHashColle…
cmccabe merged pull request #9428: URL: https://github.com/apache/kafka/pull/9428 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9428: MINOR: fix a bug in removing elements from an ImplicitLinkedHashColle…
cmccabe commented on pull request #9428: URL: https://github.com/apache/kafka/pull/9428#issuecomment-708681157 Backported to 2.7 as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] Lincong commented on pull request #9435: MINOR KAFKA-10606 Disable auto topic creation for fetch-all-topic-metadata request
Lincong commented on pull request #9435: URL: https://github.com/apache/kafka/pull/9435#issuecomment-708681626 @ijuma Will do! Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jsancio commented on a change in pull request #9418: KAFKA-10601; Add support for append linger to Raft implementation
jsancio commented on a change in pull request #9418: URL: https://github.com/apache/kafka/pull/9418#discussion_r505036063 ## File path: raft/src/main/java/org/apache/kafka/raft/RaftClient.java ## @@ -19,33 +19,51 @@ import org.apache.kafka.common.record.Records; import java.io.IOException; +import java.util.List; import java.util.concurrent.CompletableFuture; -public interface RaftClient { +public interface RaftClient { + +interface Listener { +/** + * Callback which is invoked when records written through {@link #scheduleAppend(int, List)} + * become committed. + * + * Note that there is not a one-to-one correspondence between writes through + * {@link #scheduleAppend(int, List)} and this callback. The Raft implementation + * is free to batch together the records from multiple append calls provided + * that batch boundaries are respected. This means that each batch specified + * through {@link #scheduleAppend(int, List)} is guaranteed to be a subset of + * a batch passed to {@link #handleCommit(int, long, List)}. + * + * @param epoch the epoch in which the write was accepted + * @param lastOffset the offset of the last record in the record list + * @param records the set of records that were committed + */ +void handleCommit(int epoch, long lastOffset, List records); +} /** * Initialize the client. This should only be called once and it must be * called before any of the other APIs can be invoked. * * @throws IOException For any IO errors during initialization */ -void initialize() throws IOException; +void initialize(Listener listener) throws IOException; /** - * Append a new entry to the log. The client must be in the leader state to - * accept an append: it is up to the state machine implementation - * to ensure this using {@link #currentLeaderAndEpoch()}. - * - * TODO: One improvement we can make here is to allow the caller to specify - * the current leader epoch in the record set. That would ensure that each - * leader change must be "observed" by the state machine before new appends - * are accepted. + * Append a list of records to the log. The write will be scheduled for some time + * in the future. There is no guarantee that appended records will be written to + * the log and eventually committed. However, it is guaranteed that if any of the + * records become committed, then all of them will be. * - * @param records The records to append to the log - * @param timeoutMs Maximum time to wait for the append to complete - * @return A future containing the last offset and epoch of the appended records (if successful) + * @param epoch the current leader epoch + * @param records the list of records to append + * @return the offset within the current epoch that the log entries will be appended, + * or null if the leader was unable to accept the write (e.g. due to memory + * being reached). */ -CompletableFuture append(Records records, AckMode ackMode, long timeoutMs); +Long scheduleAppend(int epoch, List records); Review comment: How about either ``` OptionalLong scheduleAppend(int epoch, List records); ``` or ``` void scheduleAppend(int epoch, List records) throws BusyException; ``` I am okay with either solution but I am wondering why did you decide to return a `null` for this case instead of throwing some exception? ## File path: raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java ## @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.raft.RecordSerde; + +im
[GitHub] [kafka] abbccdda closed pull request #9419: KAFKA-10343: Add IBP based ApiVersion tests
abbccdda closed pull request #9419: URL: https://github.com/apache/kafka/pull/9419 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9419: KAFKA-10343: Add IBP based ApiVersion tests
abbccdda commented on pull request #9419: URL: https://github.com/apache/kafka/pull/9419#issuecomment-708706701 Merged with https://github.com/apache/kafka/pull/9103 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 commented on pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur
lct45 commented on pull request #9383: URL: https://github.com/apache/kafka/pull/9383#issuecomment-708708449 https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4226/ system tests part 2 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 edited a comment on pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur
lct45 edited a comment on pull request #9383: URL: https://github.com/apache/kafka/pull/9383#issuecomment-708708449 https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4227/ system tests part 2 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lct45 edited a comment on pull request #9383: KAFKA-10455: Ensure that probing rebalances always occur
lct45 edited a comment on pull request #9383: URL: https://github.com/apache/kafka/pull/9383#issuecomment-708708449 https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4228/ system tests part 2 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xvrl commented on pull request #9427: backport KAFKA-10571
xvrl commented on pull request #9427: URL: https://github.com/apache/kafka/pull/9427#issuecomment-708718493 merged to 2.7 as part of https://github.com/apache/kafka/commit/ff1e2271f9cf148c513207a0a87c22e647940d0b This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xvrl closed pull request #9427: backport KAFKA-10571
xvrl closed pull request #9427: URL: https://github.com/apache/kafka/pull/9427 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] xvrl opened a new pull request #9439: KAFKA-10587 MirrorMaker CLI change for KIP-629
xvrl opened a new pull request #9439: URL: https://github.com/apache/kafka/pull/9439 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #9409: KAFKA-10599: Implement basic CLI tool for feature versioning system
junrao commented on a change in pull request #9409: URL: https://github.com/apache/kafka/pull/9409#discussion_r505019265 ## File path: core/src/main/scala/kafka/admin/FeatureCommand.scala ## @@ -0,0 +1,359 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.server.BrokerFeatures +import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit} +import org.apache.kafka.clients.CommonClientConfigs +import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions} +import org.apache.kafka.common.feature.{Features, SupportedVersionRange} +import org.apache.kafka.common.utils.Utils + +import java.util.Properties +import scala.collection.Seq +import scala.collection.immutable.ListMap +import scala.jdk.CollectionConverters._ + +import joptsimple.OptionSpec + +object FeatureCommand { + + def main(args: Array[String]): Unit = { +val opts = new FeatureCommandOptions(args) +val featureApis = new FeatureApis(opts) +var exitCode = 0 +try { + featureApis.execute() +} catch { + case e: IllegalArgumentException => +printException(e) +opts.parser.printHelpOn(System.err) +exitCode = 1 + case _: UpdateFeaturesException => +exitCode = 1 + case e: Throwable => +printException(e) +exitCode = 1 +} finally { + featureApis.close() + Exit.exit(exitCode) +} + } + + private def printException(exception: Throwable): Unit = { +System.err.println("\nError encountered when executing command: " + Utils.stackTrace(exception)) + } +} + +class UpdateFeaturesException(message: String) extends RuntimeException(message) + +/** + * A class that provides necessary APIs to bridge the Admin client feature APIs with the CLI tool. + * + * @param opts the CLI options + */ +class FeatureApis(var opts: FeatureCommandOptions) { + private var supportedFeatures = BrokerFeatures.createDefault().supportedFeatures + private val adminClient = createAdminClient() + + private def pad(op: String): String = { +f"$op%11s" + } + + private val addOp = pad("[Add]") + private val upgradeOp = pad("[Upgrade]") + private val deleteOp = pad("[Delete]") + private val downgradeOp = pad("[Downgrade]") + + // For testing only. + private[admin] def setSupportedFeatures(newFeatures: Features[SupportedVersionRange]): Unit = { +supportedFeatures = newFeatures + } + + // For testing only. + private[admin] def setOptions(newOpts: FeatureCommandOptions): Unit = { +opts = newOpts + } + + private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = { +val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController) +adminClient.describeFeatures(options).featureMetadata().get() + } + + /** + * Describes the supported and finalized features. If the --from-controller CLI option + * is provided, then the request is issued only to the controller, otherwise the request is issued + * to any of the provided bootstrap servers. + */ + def describeFeatures(): Unit = { +val result = describeFeatures(opts.hasFromControllerOption) +val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet + +features.toList.sorted.foreach { + feature => +val output = new StringBuilder() +output.append(s"Feature: $feature") + +val (supportedMinVersion, supportedMaxVersion) = { + val supportedVersionRange = result.supportedFeatures.get(feature) + if (supportedVersionRange == null) { +("-", "-") + } else { +(supportedVersionRange.minVersion, supportedVersionRange.maxVersion) + } +} +output.append(s"\tSupportedMinVersion: $supportedMinVersion") +output.append(s"\tSupportedMaxVersion: $supportedMaxVersion") + +val (finalizedMinVersionLevel, finalizedMaxVersionLevel) = { + val finalizedVersionRange = result.finalizedFeatures.get(feature) + if (finalizedVersionRange == null) { +("-", "-") + } else { +(fin
[GitHub] [kafka] feyman2016 commented on pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
feyman2016 commented on pull request #9270: URL: https://github.com/apache/kafka/pull/9270#issuecomment-708859895 Hi, @vvcephei and @abbccdda ,sorry this PR takes a very long time, just updated the PR, could you help to review? Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
feyman2016 commented on a change in pull request #9270: URL: https://github.com/apache/kafka/pull/9270#discussion_r505133488 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int, group.currentState match { case Stable => -info(s"Static member joins during Stable stage will not trigger rebalance.") -group.maybeInvokeJoinCallback(member, JoinGroupResult( - members = List.empty, - memberId = newMemberId, - generationId = group.generationId, - protocolType = group.protocolType, - protocolName = group.protocolName, - // We want to avoid current leader performing trivial assignment while the group - // is in stable stage, because the new assignment in leader's next sync call - // won't be broadcast by a stable group. This could be guaranteed by - // always returning the old leader id so that the current leader won't assume itself - // as a leader based on the returned message, since the new member.id won't match - // returned leader id, therefore no assignment will be performed. - leaderId = currentLeader, - error = Errors.NONE)) +// check if group's selectedProtocol of next generation will change, if not, simply store group to persist the +// updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent +val selectedProtocolOfNextGeneration = group.selectProtocol +if (group.protocolName.contains(selectedProtocolOfNextGeneration)) { + info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.") + val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap + groupManager.storeGroup(group, groupAssignment, error => { +group.inLock { + if (error != Errors.NONE) { Review comment: Revised the persistence failure handling logic, now it will revert the member id update in the groupMetaData if any persistence error encountered and call the responseCallback with the returned error. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #9270: KAFKA-10284: Group membership update due to static member rejoin should be persisted
feyman2016 commented on a change in pull request #9270: URL: https://github.com/apache/kafka/pull/9270#discussion_r505133488 ## File path: core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala ## @@ -1040,21 +1040,36 @@ class GroupCoordinator(val brokerId: Int, group.currentState match { case Stable => -info(s"Static member joins during Stable stage will not trigger rebalance.") -group.maybeInvokeJoinCallback(member, JoinGroupResult( - members = List.empty, - memberId = newMemberId, - generationId = group.generationId, - protocolType = group.protocolType, - protocolName = group.protocolName, - // We want to avoid current leader performing trivial assignment while the group - // is in stable stage, because the new assignment in leader's next sync call - // won't be broadcast by a stable group. This could be guaranteed by - // always returning the old leader id so that the current leader won't assume itself - // as a leader based on the returned message, since the new member.id won't match - // returned leader id, therefore no assignment will be performed. - leaderId = currentLeader, - error = Errors.NONE)) +// check if group's selectedProtocol of next generation will change, if not, simply store group to persist the +// updated static member, if yes, rebalance should be triggered to let the group's assignment and selectProtocol consistent +val selectedProtocolOfNextGeneration = group.selectProtocol +if (group.protocolName.contains(selectedProtocolOfNextGeneration)) { + info(s"Static member which joins during Stable stage and doesn't affect selectProtocol will not trigger rebalance.") + val groupAssignment: Map[String, Array[Byte]] = group.allMemberMetadata.map(member => member.memberId -> member.assignment).toMap + groupManager.storeGroup(group, groupAssignment, error => { +group.inLock { + if (error != Errors.NONE) { Review comment: @vvcephei @abbccdda Revised the persistence failure handling logic, now it will revert the member id update in the groupMetaData if any persistence error encountered and call the responseCallback with the returned error. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org