[GitHub] [kafka] showuon commented on a diff in pull request #11438: KAFKA-13403 Fix KafkaServer crashes when deleting topics due to the race in log deletion
showuon commented on code in PR #11438: URL: https://github.com/apache/kafka/pull/11438#discussion_r858349721 ## clients/src/main/java/org/apache/kafka/common/utils/Utils.java: ## @@ -853,15 +853,21 @@ public static void delete(final File rootFile) throws IOException { Files.walkFileTree(rootFile.toPath(), new SimpleFileVisitor() { @Override public FileVisitResult visitFileFailed(Path path, IOException exc) throws IOException { -// If the root path did not exist, ignore the error; otherwise throw it. -if (exc instanceof NoSuchFileException && path.toFile().equals(rootFile)) -return FileVisitResult.TERMINATE; +if (exc instanceof NoSuchFileException) { +if (path.toFile().equals(rootFile)) { +// If the root path did not exist, ignore the error and terminate; +return FileVisitResult.TERMINATE; +} else { +// Otherwise, just continue walking because we don't have to delete this file. Review Comment: nit: additional space after // Also, could we mention that the file might already be deleted by other threads? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13855) FileNotFoundException: Error while rolling log segment for topic partition in dir
[ https://issues.apache.org/jira/browse/KAFKA-13855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17527961#comment-17527961 ] Haruki Okada commented on KAFKA-13855: -- H-mm sorry, sounds like I just overstepped. Yeah, seems we need to dig into this further. Please nevermind for now. > FileNotFoundException: Error while rolling log segment for topic partition in > dir > - > > Key: KAFKA-13855 > URL: https://issues.apache.org/jira/browse/KAFKA-13855 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 2.6.1 >Reporter: Sergey Ivanov >Priority: Major > > Hello, > We faced an issue when one of Kafka broker in cluster has failed with an > exception and restarted: > > {code:java} > [2022-04-13T09:51:44,563][ERROR][category=kafka.server.LogDirFailureChannel] > Error while rolling log segment for prod_data_topic-7 in dir > /var/opt/kafka/data/1 > java.io.FileNotFoundException: > /var/opt/kafka/data/1/prod_data_topic-7/26872377.index (No such > file or directory) > at java.base/java.io.RandomAccessFile.open0(Native Method) > at java.base/java.io.RandomAccessFile.open(Unknown Source) > at java.base/java.io.RandomAccessFile.(Unknown Source) > at java.base/java.io.RandomAccessFile.(Unknown Source) > at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:183) > at kafka.log.AbstractIndex.resize(AbstractIndex.scala:176) > at > kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:242) > at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:242) > at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:508) > at kafka.log.Log.$anonfun$roll$8(Log.scala:1916) > at kafka.log.Log.$anonfun$roll$2(Log.scala:1916) > at kafka.log.Log.roll(Log.scala:2349) > at kafka.log.Log.maybeRoll(Log.scala:1865) > at kafka.log.Log.$anonfun$append$2(Log.scala:1169) > at kafka.log.Log.append(Log.scala:2349) > at kafka.log.Log.appendAsLeader(Log.scala:1019) > at > kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:984) > at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:972) > at > kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:883) > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273) > at > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) > at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) > at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) > at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) > at scala.collection.mutable.HashMap.foreach(HashMap.scala:149) > at scala.collection.TraversableLike.map(TraversableLike.scala:273) > at scala.collection.TraversableLike.map$(TraversableLike.scala:266) > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > at > kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:871) > at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:571) > at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:605) > at kafka.server.KafkaApis.handle(KafkaApis.scala:132) > at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70) > at java.base/java.lang.Thread.run(Unknown Source) > [2022-04-13T09:51:44,812][ERROR][category=kafka.log.LogManager] Shutdown > broker because all log dirs in /var/opt/kafka/data/1 have failed {code} > There are no any additional useful information in logs, just one warn before > this error: > {code:java} > [2022-04-13T09:51:44,720][WARN][category=kafka.server.ReplicaManager] > [ReplicaManager broker=1] Broker 1 stopped fetcher for partitions > __consumer_offsets-22,prod_data_topic-5,__consumer_offsets-30, > > prod_data_topic-0 and stopped moving logs for partitions because they are in > the failed log directory /var/opt/kafka/data/1. > [2022-04-13T09:51:44,720][WARN][category=kafka.log.LogManager] Stopping > serving logs in dir /var/opt/kafka/data/1{code} > The topic configuration is: > {code:java} > /opt/kafka $ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 > --describe --topic prod_data_topic > Topic: prod_data_topic PartitionCount: 12 ReplicationFactor: 3 > Configs: > min.insync.replicas=2,segment.bytes=1073741824,max.message.bytes=15728640,retention.bytes=4294967296 > Topic: prod_data_topic Partition: 0 Leader: 3 > Replicas: 3,1,2 Isr: 3,2,1 > Topic: prod_data_topic Partition: 1 Leader: 1 > Replicas: 1,2,3 Isr: 3,2,1 > Topic:
[jira] [Comment Edited] (KAFKA-13403) KafkaServer crashes when deleting topics due to the race in log deletion
[ https://issues.apache.org/jira/browse/KAFKA-13403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17527800#comment-17527800 ] Haruki Okada edited comment on KAFKA-13403 at 4/26/22 7:35 AM: --- [~showuon] Hi, could you help reviewing the PR [https://github.com/apache/kafka/pull/11438] ? -There seems to be another ticket likely due to the same cause: https://issues.apache.org/jira/browse/KAFKA-13855- After took another look at 13855, seems currently there's no clue to conclude it is the same cause. was (Author: ocadaruma): [~showuon] Hi, could you help reviewing the PR [https://github.com/apache/kafka/pull/11438] ? There seems to be another ticket likely due to the same cause: https://issues.apache.org/jira/browse/KAFKA-13855 > KafkaServer crashes when deleting topics due to the race in log deletion > > > Key: KAFKA-13403 > URL: https://issues.apache.org/jira/browse/KAFKA-13403 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.4.1 >Reporter: Haruki Okada >Assignee: Haruki Okada >Priority: Major > > h2. Environment > * OS: CentOS Linux release 7.6 > * Kafka version: 2.4.1 > * > ** But as far as I checked the code, I think same phenomenon could happen > even on trunk > * Kafka log directory: RAID1+0 (i.e. not using JBOD so only single log.dirs > is set) > * Java version: AdoptOpenJDK 1.8.0_282 > h2. Phenomenon > When we were in the middle of deleting several topics by `kafka-topics.sh > --delete --topic blah-blah`, one broker in our cluster crashed due to > following exception: > > {code:java} > [2021-10-21 18:19:19,122] ERROR Shutdown broker because all log dirs in > /data/kafka have failed (kafka.log.LogManager) > {code} > > > We also found NoSuchFileException was thrown right before the crash when > LogManager tried to delete logs for some partitions. > > {code:java} > [2021-10-21 18:19:18,849] ERROR Error while deleting log for foo-bar-topic-5 > in dir /data/kafka (kafka.server.LogDirFailureChannel) > java.nio.file.NoSuchFileException: > /data/kafka/foo-bar-topic-5.df3626d2d9eb41a2aeb0b8d55d7942bd-delete/03877066.timeindex.deleted > at > sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) > at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) > at > sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) > at > sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55) > at > sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:144) > at > sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99) > at java.nio.file.Files.readAttributes(Files.java:1737) > at java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219) > at java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276) > at java.nio.file.FileTreeWalker.next(FileTreeWalker.java:372) > at java.nio.file.Files.walkFileTree(Files.java:2706) > at java.nio.file.Files.walkFileTree(Files.java:2742) > at org.apache.kafka.common.utils.Utils.delete(Utils.java:732) > at kafka.log.Log.$anonfun$delete$2(Log.scala:2036) > at > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at kafka.log.Log.maybeHandleIOException(Log.scala:2343) > at kafka.log.Log.delete(Log.scala:2030) > at kafka.log.LogManager.deleteLogs(LogManager.scala:826) > at kafka.log.LogManager.$anonfun$deleteLogs$6(LogManager.scala:840) > at > kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116) > at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > So, the log-dir was marked as offline and ended up with KafkaServer crash > because the broker has only single log-dir. > h2. Cause > We also found below logs right before the NoSuchFileException. > > {code:java} > [2021-10-21 18:18:17,829] INFO Log for partition foo-bar-5 is r
[jira] [Assigned] (KAFKA-13459) MM2 should be able to add the source offset to the record header
[ https://issues.apache.org/jira/browse/KAFKA-13459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass reassigned KAFKA-13459: --- Assignee: Viktor Somogyi-Vass > MM2 should be able to add the source offset to the record header > > > Key: KAFKA-13459 > URL: https://issues.apache.org/jira/browse/KAFKA-13459 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Assignee: Viktor Somogyi-Vass >Priority: Major > > MM2 could add the source offset to the record header to help with diagnostics > in some use-cases. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (KAFKA-10586) Full support for distributed mode in dedicated MirrorMaker 2.0 clusters
[ https://issues.apache.org/jira/browse/KAFKA-10586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass reassigned KAFKA-10586: --- Assignee: Viktor Somogyi-Vass (was: Daniel Urban) > Full support for distributed mode in dedicated MirrorMaker 2.0 clusters > --- > > Key: KAFKA-10586 > URL: https://issues.apache.org/jira/browse/KAFKA-10586 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker >Reporter: Daniel Urban >Assignee: Viktor Somogyi-Vass >Priority: Major > > KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated > MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means > that with specific workloads, the dedicated MM2 cluster can become unable to > react to dynamic topic and group filter changes. > (This occurs when after a rebalance operation, the leader node has no > MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is > stopped on the leader, meaning it cannot detect config changes by itself. > Followers still running the connector can detect config changes, but they > cannot query the leader for config updates.) > Besides the REST support, config provider references should be evaluated > lazily in connector configurations. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…
vamossagar12 commented on PR #11211: URL: https://github.com/apache/kafka/pull/11211#issuecomment-1109503519 Thanks @showuon ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r858432879 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina } } +private void commitActiveTasks(final Set activeTasksNeedCommit, final AtomicReference activeTasksCommitException) { + +final Map> consumedOffsetsPerTask = new HashMap<>(); +prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask); + +final Set dirtyTasks = new HashSet<>(); +try { +taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); +} catch (final TaskCorruptedException e) { +log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", +e.corruptedTasks()); + +// If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here +dirtyTasks.addAll(tasks.tasks(e.corruptedTasks())); +closeDirtyAndRevive(dirtyTasks, true); +} catch (final RuntimeException e) { +log.error("Exception caught while committing active tasks: " + consumedOffsetsPerTask.keySet(), e); +activeTasksCommitException.compareAndSet(null, e); +dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); +} + +// for non-revoking active tasks, we should not enforce checkpoint +// as it's EOS enabled in which case no checkpoint should be written while +// the task is in RUNNING tate +for (final Task task : activeTasksNeedCommit) { +if (!dirtyTasks.contains(task)) { +try { +task.postCommit(false); Review Comment: @showuon , can you plz review this as well whenever you get the chance? It's been open for a while.. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
showuon commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r858437939 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina } } +private void commitActiveTasks(final Set activeTasksNeedCommit, final AtomicReference activeTasksCommitException) { + +final Map> consumedOffsetsPerTask = new HashMap<>(); +prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask); + +final Set dirtyTasks = new HashSet<>(); +try { +taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); +} catch (final TaskCorruptedException e) { +log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", +e.corruptedTasks()); + +// If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here +dirtyTasks.addAll(tasks.tasks(e.corruptedTasks())); +closeDirtyAndRevive(dirtyTasks, true); +} catch (final RuntimeException e) { +log.error("Exception caught while committing active tasks: " + consumedOffsetsPerTask.keySet(), e); +activeTasksCommitException.compareAndSet(null, e); +dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); +} + +// for non-revoking active tasks, we should not enforce checkpoint +// as it's EOS enabled in which case no checkpoint should be written while +// the task is in RUNNING tate +for (final Task task : activeTasksNeedCommit) { +if (!dirtyTasks.contains(task)) { +try { +task.postCommit(false); Review Comment: This is the only question I have for this PR, and it needs other experts to provide comments. cc @guozhangwang @ableegoldman -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…
vamossagar12 commented on code in PR #11433: URL: https://github.com/apache/kafka/pull/11433#discussion_r858441929 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection taskWithChangelogs, fina } } +private void commitActiveTasks(final Set activeTasksNeedCommit, final AtomicReference activeTasksCommitException) { + +final Map> consumedOffsetsPerTask = new HashMap<>(); +prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, consumedOffsetsPerTask); + +final Set dirtyTasks = new HashSet<>(); +try { +taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask); +} catch (final TaskCorruptedException e) { +log.warn("Some tasks were corrupted when trying to commit offsets, these will be cleaned and revived: {}", +e.corruptedTasks()); + +// If we hit a TaskCorruptedException it must be EOS, just handle the cleanup for those corrupted tasks right here +dirtyTasks.addAll(tasks.tasks(e.corruptedTasks())); +closeDirtyAndRevive(dirtyTasks, true); +} catch (final RuntimeException e) { +log.error("Exception caught while committing active tasks: " + consumedOffsetsPerTask.keySet(), e); +activeTasksCommitException.compareAndSet(null, e); +dirtyTasks.addAll(consumedOffsetsPerTask.keySet()); +} + +// for non-revoking active tasks, we should not enforce checkpoint +// as it's EOS enabled in which case no checkpoint should be written while +// the task is in RUNNING tate +for (final Task task : activeTasksNeedCommit) { +if (!dirtyTasks.contains(task)) { +try { +task.postCommit(false); Review Comment: got it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #12064: KAFKA-12841: Remove an additional call of onAcknowledgement
cadonna commented on PR #12064: URL: https://github.com/apache/kafka/pull/12064#issuecomment-1109538143 @junrao @philipnee Thank you for pinging me! Since I have already created RC0 which is under voting I actually only accept regressions and exceptionally severe bugs that justify the creation of a new release candidate. This bug seems to have been around for quite some time and it does not seem to be a regression or exceptionally severe. So I would prefer not to cherry pick this PR to 3.2 before 3.2.0 is released. After the release, you should cherry-pick this PR to 3.2 in order to include it in the bugfix release 3.2.1. If you still think that this PR should be included in 3.2.0 please raise it on the voting thread for RC0 of 3.2.0 on the dev mailing list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
dengziming commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r858479425 ## core/src/main/scala/kafka/api/ApiVersion.scala: ## @@ -1,500 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.api - -import org.apache.kafka.clients.NodeApiVersions -import org.apache.kafka.common.config.ConfigDef.Validator -import org.apache.kafka.common.config.ConfigException -import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, SupportedVersionRange} -import org.apache.kafka.common.message.ApiMessageType.ListenerType -import org.apache.kafka.common.record.RecordVersion -import org.apache.kafka.common.requests.ApiVersionsResponse - -/** - * This class contains the different Kafka versions. - * Right now, we use them for upgrades - users can configure the version of the API brokers will use to communicate between themselves. - * This is only for inter-broker communications - when communicating with clients, the client decides on the API version. - * - * Note that the ID we initialize for each version is important. - * We consider a version newer than another, if it has a higher ID (to avoid depending on lexicographic order) - * - * Since the api protocol may change more than once within the same release and to facilitate people deploying code from - * trunk, we have the concept of internal versions (first introduced during the 0.10.0 development cycle). For example, - * the first time we introduce a version change in a release, say 0.10.0, we will add a config value "0.10.0-IV0" and a - * corresponding case object KAFKA_0_10_0-IV0. We will also add a config value "0.10.0" that will be mapped to the - * latest internal version object, which is KAFKA_0_10_0-IV0. When we change the protocol a second time while developing - * 0.10.0, we will add a new config value "0.10.0-IV1" and a corresponding case object KAFKA_0_10_0-IV1. We will change - * the config value "0.10.0" to map to the latest internal version object KAFKA_0_10_0-IV1. The config value of - * "0.10.0-IV0" is still mapped to KAFKA_0_10_0-IV0. This way, if people are deploying from trunk, they can use - * "0.10.0-IV0" and "0.10.0-IV1" to upgrade one internal version at a time. For most people who just want to use - * released version, they can use "0.10.0" when upgrading to the 0.10.0 release. - */ Review Comment: Should we also add these doc comments to MetadataVersion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13716) add tests for `DeleteRecordsCommand` class
[ https://issues.apache.org/jira/browse/KAFKA-13716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528047#comment-17528047 ] Divij Vaidya commented on KAFKA-13716: -- Hey folks [https://github.com/apache/kafka/pull/12087] adds tests that cover the request/response for DeleteRecords API. However, my PR does not validate the thin layer of additional command line parsing functionality provided in DeleteRecordsCommand. I think it is worthwhile to add additional tests for `DeleteRecordsCommand` class at [https://github.com/apache/kafka/tree/trunk/core/src/test/scala/unit/kafka/admin] > add tests for `DeleteRecordsCommand` class > -- > > Key: KAFKA-13716 > URL: https://issues.apache.org/jira/browse/KAFKA-13716 > Project: Kafka > Issue Type: Test > Components: tools >Reporter: Luke Chen >Assignee: Shivanjal Arora >Priority: Major > Labels: Newbie, newbie > > Found there's no tests for `DeleteRecordsCommand` class, which is used in > `kafka-delete-records.sh`. We should add it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] LatentLag opened a new pull request, #12095: MINOR: Adding client Base64 serializer and deserializer
LatentLag opened a new pull request, #12095: URL: https://github.com/apache/kafka/pull/12095 Adding client serialization for Base64 which enables two use cases. First, these allow messages encoded in base64 to not incur the encoding penalty of a larger file size. Second, binary encoded messages may be used more simply on the command line. This is a minor change. For testing the class files were added to an existing `kafka-client.jar` and exercised with `kafka-console-producer.sh` and `kafka-console-consumer.sh` in Windows and Ubuntu 20.04. This is my own work and I license it to this project. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] aybefox commented on pull request #9947: KAFKA-12190: Fix setting of file permissions on non-POSIX filesystems
aybefox commented on PR #9947: URL: https://github.com/apache/kafka/pull/9947#issuecomment-1109626111 @notme159 Same here :( Did you find out something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #12095: MINOR: Adding client Base64 serializer and deserializer
dajac commented on PR #12095: URL: https://github.com/apache/kafka/pull/12095#issuecomment-1109648747 @LatentLag Thanks for the PR. I think that we would need a KIP for adding those because I suppose that we consider them as public interfaces. The process is described here: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #11492: KAFKA-13452: MM2 shouldn't checkpoint when offset mapping is unavailable
viktorsomogyi commented on PR #11492: URL: https://github.com/apache/kafka/pull/11492#issuecomment-1109725104 @mimaison would you please review this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13857) The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition
RivenSun created KAFKA-13857: Summary: The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition Key: KAFKA-13857 URL: https://issues.apache.org/jira/browse/KAFKA-13857 Project: Kafka Issue Type: Bug Components: admin Reporter: RivenSun The server side currently handles the LIST_OFFSETS request process as follows: {code:java} KafkaApis.handleListOffsetRequest() -> KafkaApis.handleListOffsetRequestV1AndAbove() -> ReplicaManager.fetchOffsetForTimestamp() -> Partition.fetchOffsetForTimestamp(){code} In the last method above, it is obvious that when the client side does not pass the isolationLevel value, the server side supports returning localLog.logEndOffset. {code:java} val lastFetchableOffset = isolationLevel match { case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark case None => localLog.logEndOffset } {code} KafkaAdminClient is an operation and maintenance management tool, which *should be different from the listOffsets-related methods (offsetsForTimes, beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be limited by the value of {color:#FF}isolationLevel {color}in the ListOffsetsOptions parameter.* In the current KafkaAdminClient.listOffsets() method, both the AdminClient and the server consider isolationLevel as a required parameter: 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException will be thrown when AdminClient executes listOffsets() method. {code:java} ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} 2) The current logic for converting isolationLevel on the server side has not yet handled the case where the user passes in a value that is neither READ_UNCOMMITTED nor READ_COMMITTED : {code:java} val isolationLevelOpt = if (isClientRequest) Some(offsetRequest.isolationLevel) else None {code} {code:java} public IsolationLevel isolationLevel() { return IsolationLevel.forId(data.isolationLevel()); } {code} h1. Solution: Added a new enum `NONE` in IsolationLevel, dedicated to AdminClient.listOffsets() method. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13857) The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition
[ https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] RivenSun updated KAFKA-13857: - Description: The server side currently handles the LIST_OFFSETS request process as follows: {code:java} KafkaApis.handleListOffsetRequest() -> KafkaApis.handleListOffsetRequestV1AndAbove() -> ReplicaManager.fetchOffsetForTimestamp() -> Partition.fetchOffsetForTimestamp(){code} In the last method above, it is obvious that when the client side does not pass the isolationLevel value, the server side supports returning localLog.logEndOffset. {code:java} val lastFetchableOffset = isolationLevel match { case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark case None => localLog.logEndOffset } {code} KafkaAdminClient is an operation and maintenance management tool, which *should be different from the listOffsets-related methods (offsetsForTimes, beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be limited by the value of {color:#ff}isolationLevel {color}in the ListOffsetsOptions parameter.* In the current KafkaAdminClient.listOffsets() method, both the AdminClient and the server consider isolationLevel as a required parameter: 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException will be thrown when AdminClient executes listOffsets() method. {code:java} ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} 2) The current logic for converting isolationLevel on the server side has not yet handled the case where the user passes in a value that is neither READ_UNCOMMITTED nor READ_COMMITTED : {code:java} val isolationLevelOpt = if (isClientRequest) Some(offsetRequest.isolationLevel) else None {code} {code:java} public IsolationLevel isolationLevel() { return IsolationLevel.forId(data.isolationLevel()); } {code} h1. h2. Solution: Added a new enum `NONE` in IsolationLevel, dedicated to AdminClient.listOffsets() method. was: The server side currently handles the LIST_OFFSETS request process as follows: {code:java} KafkaApis.handleListOffsetRequest() -> KafkaApis.handleListOffsetRequestV1AndAbove() -> ReplicaManager.fetchOffsetForTimestamp() -> Partition.fetchOffsetForTimestamp(){code} In the last method above, it is obvious that when the client side does not pass the isolationLevel value, the server side supports returning localLog.logEndOffset. {code:java} val lastFetchableOffset = isolationLevel match { case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark case None => localLog.logEndOffset } {code} KafkaAdminClient is an operation and maintenance management tool, which *should be different from the listOffsets-related methods (offsetsForTimes, beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be limited by the value of {color:#FF}isolationLevel {color}in the ListOffsetsOptions parameter.* In the current KafkaAdminClient.listOffsets() method, both the AdminClient and the server consider isolationLevel as a required parameter: 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException will be thrown when AdminClient executes listOffsets() method. {code:java} ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} 2) The current logic for converting isolationLevel on the server side has not yet handled the case where the user passes in a value that is neither READ_UNCOMMITTED nor READ_COMMITTED : {code:java} val isolationLevelOpt = if (isClientRequest) Some(offsetRequest.isolationLevel) else None {code} {code:java} public IsolationLevel isolationLevel() { return IsolationLevel.forId(data.isolationLevel()); } {code} h1. Solution: Added a new enum `NONE` in IsolationLevel, dedicated to AdminClient.listOffsets() method. Issue Type: Improvement (was: Bug) > The listOffsets method of KafkaAdminClient should support returning > logEndOffset of topicPartition > -- > > Key: KAFKA-13857 > URL: https://issues.apache.org/jira/browse/KAFKA-13857 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: RivenSun >Priority: Major > > The server side currently handles the LIST_OFFSETS request process as follows: > {code:java} > KafkaApis.handleListOffsetRequest() -> > KafkaApis.handleListOffsetRequestV1AndAbove() -> > ReplicaManager.fetchOffsetForTimestamp() -> > Partition.fetchOffsetForTimestamp(){code} > > In the last method above, it is obvious that when the client side does not > pass the isolationLevel value, the server side supports returning > localL
[jira] [Commented] (KAFKA-13857) The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition
[ https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528170#comment-17528170 ] RivenSun commented on KAFKA-13857: -- Hi [~guozhang] [~showuon] Could you give some suggestions for this issue? Thanks. > The listOffsets method of KafkaAdminClient should support returning > logEndOffset of topicPartition > -- > > Key: KAFKA-13857 > URL: https://issues.apache.org/jira/browse/KAFKA-13857 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: RivenSun >Priority: Major > > The server side currently handles the LIST_OFFSETS request process as follows: > {code:java} > KafkaApis.handleListOffsetRequest() -> > KafkaApis.handleListOffsetRequestV1AndAbove() -> > ReplicaManager.fetchOffsetForTimestamp() -> > Partition.fetchOffsetForTimestamp(){code} > > In the last method above, it is obvious that when the client side does not > pass the isolationLevel value, the server side supports returning > localLog.logEndOffset. > {code:java} > val lastFetchableOffset = isolationLevel match { > case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset > case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark > case None => localLog.logEndOffset > } > {code} > > > KafkaAdminClient is an operation and maintenance management tool, which > *should be different from the listOffsets-related methods (offsetsForTimes, > beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not > be limited by the value of {color:#ff}isolationLevel {color}in the > ListOffsetsOptions parameter.* > In the current KafkaAdminClient.listOffsets() method, both the AdminClient > and the server consider isolationLevel as a required parameter: > 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException > will be thrown when AdminClient executes listOffsets() method. > {code:java} > ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} > 2) The current logic for converting isolationLevel on the server side has not > yet handled the case where the user passes in a value that is neither > READ_UNCOMMITTED nor READ_COMMITTED : > {code:java} > val isolationLevelOpt = if (isClientRequest) > Some(offsetRequest.isolationLevel) > else > None {code} > {code:java} > public IsolationLevel isolationLevel() { > return IsolationLevel.forId(data.isolationLevel()); > } {code} > h1. > h2. Solution: > Added a new enum `NONE` in IsolationLevel, dedicated to > AdminClient.listOffsets() method. > > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13716) add tests for `DeleteRecordsCommand` class
[ https://issues.apache.org/jira/browse/KAFKA-13716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528174#comment-17528174 ] Richard Joerger commented on KAFKA-13716: - [~divijvaidya] , that clears that up for me. It's the actual CLI interface and not the DeleteRecords API. Thanks for chiming in [~showuon] . [~Shivanjal] , I see that this is assigned to you, are you still working on the tests? If you are, no worries but in the event you are not, please let me know. Thanks. > add tests for `DeleteRecordsCommand` class > -- > > Key: KAFKA-13716 > URL: https://issues.apache.org/jira/browse/KAFKA-13716 > Project: Kafka > Issue Type: Test > Components: tools >Reporter: Luke Chen >Assignee: Shivanjal Arora >Priority: Major > Labels: Newbie, newbie > > Found there's no tests for `DeleteRecordsCommand` class, which is used in > `kafka-delete-records.sh`. We should add it. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] ijuma opened a new pull request, #12096: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionsManager (round 3)
ijuma opened a new pull request, #12096: URL: https://github.com/apache/kafka/pull/12096 Conceptually, the ordering is defined by the producer id, producer epoch and the sequence number. A given `TopicPartitionEntry` has a single producer id and hence we only need to compare the other two. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13857) The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition
[ https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] RivenSun updated KAFKA-13857: - Description: The server side currently handles the LIST_OFFSETS request process as follows: {code:java} KafkaApis.handleListOffsetRequest() -> KafkaApis.handleListOffsetRequestV1AndAbove() -> ReplicaManager.fetchOffsetForTimestamp() -> Partition.fetchOffsetForTimestamp(){code} In the last method above, it is obvious that when the client side does not pass the isolationLevel value, the server side supports returning localLog.logEndOffset. {code:java} val lastFetchableOffset = isolationLevel match { case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark case None => localLog.logEndOffset } {code} KafkaAdminClient is an operation and maintenance management tool, which *should be different from the listOffsets-related methods (offsetsForTimes, beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be limited by the value of {color:#ff}isolationLevel {color}in the ListOffsetsOptions parameter.* In the current KafkaAdminClient.listOffsets() method, both the AdminClient and the server consider isolationLevel as a required parameter: 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException will be thrown when AdminClient executes listOffsets() method. {code:java} ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} 2) The current logic for converting isolationLevel on the server side has not yet handled the case where the user passes in a value that is neither READ_UNCOMMITTED nor READ_COMMITTED : {code:java} val isolationLevelOpt = if (isClientRequest) Some(offsetRequest.isolationLevel) else None {code} {code:java} public IsolationLevel isolationLevel() { return IsolationLevel.forId(data.isolationLevel()); } {code} h1. h2. Suggestion: Added a new enum `NONE` in IsolationLevel, only dedicated to AdminClient.listOffsets() method. This change may cause the highestSupportedVersion of ApiMessageType.LIST_OFFSETS to increase by one. was: The server side currently handles the LIST_OFFSETS request process as follows: {code:java} KafkaApis.handleListOffsetRequest() -> KafkaApis.handleListOffsetRequestV1AndAbove() -> ReplicaManager.fetchOffsetForTimestamp() -> Partition.fetchOffsetForTimestamp(){code} In the last method above, it is obvious that when the client side does not pass the isolationLevel value, the server side supports returning localLog.logEndOffset. {code:java} val lastFetchableOffset = isolationLevel match { case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark case None => localLog.logEndOffset } {code} KafkaAdminClient is an operation and maintenance management tool, which *should be different from the listOffsets-related methods (offsetsForTimes, beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be limited by the value of {color:#ff}isolationLevel {color}in the ListOffsetsOptions parameter.* In the current KafkaAdminClient.listOffsets() method, both the AdminClient and the server consider isolationLevel as a required parameter: 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException will be thrown when AdminClient executes listOffsets() method. {code:java} ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code} 2) The current logic for converting isolationLevel on the server side has not yet handled the case where the user passes in a value that is neither READ_UNCOMMITTED nor READ_COMMITTED : {code:java} val isolationLevelOpt = if (isClientRequest) Some(offsetRequest.isolationLevel) else None {code} {code:java} public IsolationLevel isolationLevel() { return IsolationLevel.forId(data.isolationLevel()); } {code} h1. h2. Solution: Added a new enum `NONE` in IsolationLevel, dedicated to AdminClient.listOffsets() method. > The listOffsets method of KafkaAdminClient should support returning > logEndOffset of topicPartition > -- > > Key: KAFKA-13857 > URL: https://issues.apache.org/jira/browse/KAFKA-13857 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: RivenSun >Priority: Major > > The server side currently handles the LIST_OFFSETS request process as follows: > {code:java} > KafkaApis.handleListOffsetRequest() -> > KafkaApis.handleListOffsetRequestV1AndAbove() -> > ReplicaManager.fetchOffsetForTimestamp() -> > Partition.fetchOffsetForTimestamp(){code} > > In the last method above, it is obvious that when the client side does not > pass t
[GitHub] [kafka] dengziming commented on a diff in pull request #11951: KAFKA-13836: Improve KRaft broker heartbeat logic
dengziming commented on code in PR #11951: URL: https://github.com/apache/kafka/pull/11951#discussion_r858760908 ## core/src/main/scala/kafka/server/BrokerLifecycleManager.scala: ## @@ -333,7 +353,11 @@ class BrokerLifecycleManager(val config: KafkaConfig, } private def sendBrokerHeartbeat(): Unit = { -val metadataOffset = _highestMetadataOffsetProvider() +val metadataOffset = if (readyToUnfence) { Review Comment: Currently, this offset is only used when starting the broker, first to ensure the broker has catchup so we can start initialization, second to ensure the broker has catchup so we can unfenced, if we use the same metadata offset here, it seems unnecessary to ensure twice, but It's indeed peculiar to have an offset going back, maybe adding a new field to the `HeartbeatRequest`, or just leave it as it is now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
mumrah commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r858801361 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.ConfigDef.Validator; +import org.apache.kafka.common.config.ConfigException; + +public class MetadataVersionValidator implements Validator { + +@Override +public void ensureValid(String name, Object value) { +try { +MetadataVersion.fromVersionString(value.toString()); +} catch (IllegalArgumentException e) { +throw new ConfigException(name, value.toString(), e.getMessage()); +} +} + +@Override +public String toString() { Review Comment: Yea, for reference https://kafka.apache.org/27/documentation.html#brokerconfigs_inter.broker.protocol.version -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
mumrah commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r858801361 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java: ## @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.ConfigDef.Validator; +import org.apache.kafka.common.config.ConfigException; + +public class MetadataVersionValidator implements Validator { + +@Override +public void ensureValid(String name, Object value) { +try { +MetadataVersion.fromVersionString(value.toString()); +} catch (IllegalArgumentException e) { +throw new ConfigException(name, value.toString(), e.getMessage()); +} +} + +@Override +public String toString() { Review Comment: Yea, for reference https://kafka.apache.org/27/documentation.html#brokerconfigs_inter.broker.protocol.version. Do we need the `distinct` call in the streaming expression? The enum values are already unique -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] joel-hamill commented on pull request #12094: MINOR: fix html generation syntax errors
joel-hamill commented on PR #12094: URL: https://github.com/apache/kafka/pull/12094#issuecomment-1109970872 cc @ijuma @guozhangwang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] joel-hamill commented on pull request #11874: Fix typos in configuration docs
joel-hamill commented on PR #11874: URL: https://github.com/apache/kafka/pull/11874#issuecomment-1109972244 cc @ijuma -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request, #12097: MINOR: Make TopicPartitionBookkeeper and TopicPartitionEntry top level
ijuma opened a new pull request, #12097: URL: https://github.com/apache/kafka/pull/12097 This is the first step towards refactoring the `TransactionManager` so that it's easier to understand and test. The high level idea is to push down behavior to `TopicPartitionEntry` and `TopicPartitionBookkeeper` and to encapsulate the state so that the mutations can only be done via the appropriate methods. Inner classes have no mechanism to limit access from the outer class, which presents a challenge when mutability is widespread, like it is the case here. As a first step, we make `TopicPartitionBookkeeper` and `TopicPartitionEntry` top level and rename them to make their intended usage clear. To make the review easier, we don't change anything else except access changes required for the code to compile. The next PR will contain the rest of the refactoring. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #12064: KAFKA-12841: Remove an additional call of onAcknowledgement
junrao commented on PR #12064: URL: https://github.com/apache/kafka/pull/12064#issuecomment-1109979635 @cadonna : Thanks for the response. This bug was actually introduced in 3.2.0. I will ping the mailing list. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji merged pull request #12089: MINOR: Rename `AlterIsrManager` to `AlterPartitionManager`
hachikuji merged PR #12089: URL: https://github.com/apache/kafka/pull/12089 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13647) RocksDb metrics 'number-open-files' is not correct
[ https://issues.apache.org/jira/browse/KAFKA-13647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528288#comment-17528288 ] Matthias J. Sax commented on KAFKA-13647: - Should we close this ticket? \cc [~guozhang] [~cadonna] > RocksDb metrics 'number-open-files' is not correct > -- > > Key: KAFKA-13647 > URL: https://issues.apache.org/jira/browse/KAFKA-13647 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: Sylvain Le Gouellec >Priority: Major > Attachments: image-2022-02-07-16-06-25-304.png, > image-2022-02-07-16-06-39-821.png, image-2022-02-07-16-06-53-164.png > > > We were looking at RocksDB metrics and noticed that the {{number-open-files}} > metric behaves like a counter, rather than a gauge. > Looking at the code, we think there is a small error in the type of metric > for that specific mbean (should be a value metric rather than a sum metric). > See [ > https://github.com/apache/kafka/blob/ca5d6f9229c170beb23809159113037f05a1120f/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Commented] (KAFKA-13840) KafkaConsumer is unable to recover connection to group coordinator after commitOffsetsAsync exception
[ https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528300#comment-17528300 ] Kyle R Stehbens commented on KAFKA-13840: - Gotcha, i'll give these new versions a try on one of our less important jobs as soon as they are released and report back here with the results. > KafkaConsumer is unable to recover connection to group coordinator after > commitOffsetsAsync exception > - > > Key: KAFKA-13840 > URL: https://issues.apache.org/jira/browse/KAFKA-13840 > Project: Kafka > Issue Type: Bug > Components: clients, consumer >Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0 >Reporter: Kyle R Stehbens >Assignee: Luke Chen >Priority: Major > > Hi, I've discovered an issue with the java Kafka client (consumer) whereby a > timeout or any other retry-able exception triggered during an async offset > commit, renders the client unable to recover its group co-coordinator and > leaves the client in a broken state. > > I first encountered this using v2.8.1 of the java client, and after going > through the code base for all versions of the client, have found it affects > all versions of the client from 2.6.1 onward. > I also confirmed that by rolling back to 2.5.1, the issue is not present. > > The issue stems from changes to how the FindCoordinatorResponseHandler in > 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure > here: > [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783] > > In all future version of the client this call is not made: > [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838] > > What this results in, is when the KafkaConsumer makes a call to > coordinator.commitOffsetsAsync(...), if an error occurs such that the > coordinator is unavailable here: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007] > > then the client will try call: > [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017] > However this will never be able to succeed as it perpetually returns a > reference to a failed future: findCoordinatorFuture that is never cleared out. > > This manifests in all future calls to commitOffsetsAsync() throwing a > "coordinator unavailable" exception forever going forward after any > retry-able exception causes the coordinator to close. > Note we discovered this when we upgraded the kafka client in our Flink > consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the > client. We noticed this occurring in our non-flink java consumers too running > 3.x client versions. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] cadonna commented on pull request #12064: KAFKA-12841: Remove an additional call of onAcknowledgement
cadonna commented on PR #12064: URL: https://github.com/apache/kafka/pull/12064#issuecomment-1110039917 @junrao Now I see how this issue was introduced in 3.2.0. The fix for the bug described in KAFKA-12841 introduced it, right? I initially understood that this PR is the fix for the bug described in KAFKA-12841 which dates back to 2.6. I think that classifies as a regression. I will cherry-pick this PR to 3.2.0 and create a new release candidate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
ijuma commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r858974266 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.kafka.common.record.RecordVersion; + +public enum MetadataVersion { +IBP_0_8_0(-1), +IBP_0_8_1(-1), +IBP_0_8_2(-1), +IBP_0_9_0(-1), + +// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format. +IBP_0_10_0_IV0(-1), + +// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake). +IBP_0_10_0_IV1(-1), + +// introduced for JoinGroup protocol change in KIP-62 +IBP_0_10_1_IV0(-1), + +// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit). +IBP_0_10_1_IV1(-1), + +// introduced ListOffsetRequest v1 in KIP-79 +IBP_0_10_1_IV2(-1), + +// introduced UpdateMetadataRequest v3 in KIP-103 +IBP_0_10_2_IV0(-1), + +// KIP-98 (idempotent and transactional producer support) +IBP_0_11_0_IV0(-1), + +// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107 +IBP_0_11_0_IV1(-1), + +// Introduced leader epoch fetches to the replica fetcher via KIP-101 +IBP_0_11_0_IV2(-1), + +// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112 +IBP_1_0_IV0(-1), + +// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests, +// and KafkaStorageException for fetch requests. +IBP_1_1_IV0(-1), + +// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log divergence between leader and follower after fast leader fail over) +IBP_2_0_IV0(-1), + +// Several request versions were bumped due to KIP-219 (Improve quota communication) +IBP_2_0_IV1(-1), + +// Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211) +IBP_2_1_IV0(-1), + +// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320) +IBP_2_1_IV1(-1), + +// Support ZStandard Compression Codec (KIP-110) +IBP_2_1_IV2(-1), + +// Introduced broker generation (KIP-380), and +// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1 +IBP_2_2_IV0(-1), + +// New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207) +IBP_2_2_IV1(-1), + +// Introduced static membership. +IBP_2_3_IV0(-1), + +// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, and replica_id to OffsetsForLeaderRequest +IBP_2_3_IV1(-1), + +// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest +IBP_2_4_IV0(-1), + +// Flexible version support in inter-broker APIs +IBP_2_4_IV1(-1), + +// No new APIs, equivalent to 2.4-IV1 +IBP_2_5_IV0(-1), + +// Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570) +IBP_2_6_IV0(-1), + +// Introduced feature versioning support (KIP-584) +IBP_2_7_IV0(-1), + +// Bup Fetch protocol for Raft protocol (KIP-595) +IBP_2_7_IV1(-1), + +// Introduced AlterPartition (KIP-497) +IBP_2_7_IV2(-1), + +// Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516) +IBP_2_8_IV0(-1), + +// Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516) +IBP_2_8_IV1(-1), + +// Introduce AllocateProducerIds (KIP-730) +IBP_3_0_IV0(1), + +// Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734) +// Assume message format version is 3.0 (KIP-724) +IBP_3_0_IV1(2), + +// Adds topic IDs to Fetch requests/responses (KIP-516) +IBP_3_1_IV0(3), + +// Support for leader recovery for unclean leader election (KIP-704) +IBP_3_2_IV0(4), + +// KRaft GA Review Comment: We shouldn't have this until we h
[GitHub] [kafka] ijuma commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
ijuma commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r858975278 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.kafka.common.record.RecordVersion; + +public enum MetadataVersion { +IBP_0_8_0(-1), +IBP_0_8_1(-1), +IBP_0_8_2(-1), +IBP_0_9_0(-1), + +// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format. +IBP_0_10_0_IV0(-1), + +// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake). +IBP_0_10_0_IV1(-1), + +// introduced for JoinGroup protocol change in KIP-62 +IBP_0_10_1_IV0(-1), + +// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit). +IBP_0_10_1_IV1(-1), + +// introduced ListOffsetRequest v1 in KIP-79 +IBP_0_10_1_IV2(-1), + +// introduced UpdateMetadataRequest v3 in KIP-103 +IBP_0_10_2_IV0(-1), + +// KIP-98 (idempotent and transactional producer support) +IBP_0_11_0_IV0(-1), + +// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107 +IBP_0_11_0_IV1(-1), + +// Introduced leader epoch fetches to the replica fetcher via KIP-101 +IBP_0_11_0_IV2(-1), + +// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112 +IBP_1_0_IV0(-1), + +// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests, +// and KafkaStorageException for fetch requests. +IBP_1_1_IV0(-1), + +// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log divergence between leader and follower after fast leader fail over) +IBP_2_0_IV0(-1), + +// Several request versions were bumped due to KIP-219 (Improve quota communication) +IBP_2_0_IV1(-1), + +// Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211) +IBP_2_1_IV0(-1), + +// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320) +IBP_2_1_IV1(-1), + +// Support ZStandard Compression Codec (KIP-110) +IBP_2_1_IV2(-1), + +// Introduced broker generation (KIP-380), and +// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1 +IBP_2_2_IV0(-1), + +// New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207) +IBP_2_2_IV1(-1), + +// Introduced static membership. +IBP_2_3_IV0(-1), + +// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, and replica_id to OffsetsForLeaderRequest +IBP_2_3_IV1(-1), + +// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest +IBP_2_4_IV0(-1), + +// Flexible version support in inter-broker APIs +IBP_2_4_IV1(-1), + +// No new APIs, equivalent to 2.4-IV1 +IBP_2_5_IV0(-1), + +// Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570) +IBP_2_6_IV0(-1), + +// Introduced feature versioning support (KIP-584) +IBP_2_7_IV0(-1), + +// Bup Fetch protocol for Raft protocol (KIP-595) +IBP_2_7_IV1(-1), + +// Introduced AlterPartition (KIP-497) +IBP_2_7_IV2(-1), + +// Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516) +IBP_2_8_IV0(-1), + +// Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516) +IBP_2_8_IV1(-1), + +// Introduce AllocateProducerIds (KIP-730) +IBP_3_0_IV0(1), + +// Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734) +// Assume message format version is 3.0 (KIP-724) +IBP_3_0_IV1(2), + +// Adds topic IDs to Fetch requests/responses (KIP-516) +IBP_3_1_IV0(3), + +// Support for leader recovery for unclean leader election (KIP-704) +IBP_3_2_IV0(4), + +// KRaft GA +IBP_3_3_IV0(5); + +private final Optional me
[GitHub] [kafka] ijuma commented on a diff in pull request #12052: KAFKA-13799: Improve documentation for Kafka zero-copy
ijuma commented on code in PR #12052: URL: https://github.com/apache/kafka/pull/12052#discussion_r858994743 ## docs/design.html: ## @@ -125,6 +125,9 @@ This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache. +TLS/SSL libraries operate at the user space (in-kernel SSL_sendfile is currently not supported by Kafka). Due to this restriction, sendfile could not be used when transport layer enables SSL protocol. For enabling Review Comment: Nit: "could not be used when transport layer enables SSL protocol" -> "is not be used when SSL is enabledl" ## docs/design.html: ## @@ -125,6 +125,9 @@ This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache. +TLS/SSL libraries operate at the user space (in-kernel SSL_sendfile is currently not supported by Kafka). Due to this restriction, sendfile could not be used when transport layer enables SSL protocol. For enabling Review Comment: Nit: "could not be used when transport layer enables SSL protocol" -> "is not be used when SSL is enabled" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #12064: KAFKA-12841: Remove an additional call of onAcknowledgement
junrao commented on PR #12064: URL: https://github.com/apache/kafka/pull/12064#issuecomment-1110094353 Thanks, @cadonna. I will mark the jira as fixed in 3.2.0 then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-13448) Kafka Producer Client Callback behaviour does not align with Javadoc
[ https://issues.apache.org/jira/browse/KAFKA-13448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-13448. - Fix Version/s: 3.2.0 Assignee: Philip Nee Resolution: Fixed merged [https://github.com/apache/kafka/pull/11689] and a followup fix [https://github.com/apache/kafka/pull/12064] to trunk and 3.2 branch. > Kafka Producer Client Callback behaviour does not align with Javadoc > > > Key: KAFKA-13448 > URL: https://issues.apache.org/jira/browse/KAFKA-13448 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.1.0 >Reporter: Seamus O Ceanainn >Assignee: Philip Nee >Priority: Minor > Fix For: 3.2.0 > > > In PR [#4188|https://github.com/apache/kafka/pull/4188] as part of > KAFKA-6180, a breaking change was accidentally introduced in the behaviour of > Callbacks for the producer client. > Previously, whenever an exception was thrown when producing an event, the > value for 'metadata' passed to the Callback.onCompletion method was always > null. In PR [#4188|https://github.com/apache/kafka/pull/4188], in one part of > the code where Callback.onCompletion is called, the behaviour was changed so > that instead of passing a null value for metadata, a 'placeholder' value was > provided instead (see > [here|https://github.com/apache/kafka/pull/4188/files#diff-42d8f5166459ee28f201ff9cec0080fc7845544a0089ac9e8f3e16864cc1193eR1196] > and > [here|https://github.com/apache/kafka/pull/4188/files#diff-42d8f5166459ee28f201ff9cec0080fc7845544a0089ac9e8f3e16864cc1193eR1199]). > This placeholder contained only topic and partition information, and with > all other fields set to '-1'. > This change only impacted a subset of exceptions, so that in the case of > ApiExceptions metadata would still be null (see > [here|https://github.com/apache/kafka/commit/aa42a11dfd99ee9ab24d2e9a7521ef1c97ae1ff4#diff-42d8f5166459ee28f201ff9cec0080fc7845544a0089ac9e8f3e16864cc1193eR843]), > but for all other exceptions the placeholder value would be used. The > behaviour at the time of writing remains the same. > This issue was first reported in KAFKA-7412 when a user first noticed the > difference between the documented behaviour of Callback.onCompletion and the > implemented behaviour. > At the time it was assumed that the behaviour when errors occur was to always > provide a placeholder metadata value to Callback.onCompletion, and the > documentation was updated at that time to reflect this assumption in [PR > #5798|https://github.com/apache/kafka/pull/5798]. The documentation now > states that when an exception occurs that the method will be called with an > empty metadata value (see > [here|https://github.com/apache/kafka/blob/3.1/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java#L30-L31]). > However, there is still one case where Callback.onCompletion is called with > a null value for metadata (see > [here|https://github.com/apache/kafka/blob/3.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1002]), > so there is still a discrepancy between the documented behaviour and the > implementation of Callback.onCompletion. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Assigned] (KAFKA-12841) NPE from the provided metadata in client callback in case of ApiException
[ https://issues.apache.org/jira/browse/KAFKA-12841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao reassigned KAFKA-12841: --- Fix Version/s: 3.2.0 Assignee: Philip Nee (was: Kirk True) Resolution: Fixed merged [https://github.com/apache/kafka/pull/11689] and a followup fix [https://github.com/apache/kafka/pull/12064] to trunk and 3.2 branch. > NPE from the provided metadata in client callback in case of ApiException > - > > Key: KAFKA-12841 > URL: https://issues.apache.org/jira/browse/KAFKA-12841 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 2.6.0 > Environment: Prod >Reporter: Avi Youkhananov >Assignee: Philip Nee >Priority: Major > Fix For: 3.2.0 > > Attachments: NPE.production > > > 1. > org.apache.kafka.clients.producer.Callback interface has method > onCompletion(...) > Which says as part of the documentation : > *The metadata for the record that was sent (i.e. the partition and offset). > *An empty metadata with -1 value for all fields* except for topicPartition > will be returned if an error occurred. > We got an NPE from doSend(...) method in > org.apache.kafka.clients.producer.KafkaProducer > Which can occur in case ApiException was thrown ... > In case of ApiException it uses the regular callback instead of > InterceptorCallback which also may cover the NPE. > 2. More over RecordMetadata has method partition() which return int but can > also throw NPE because TopicPartition might be null. > Stack trace attached. > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] ahuang98 commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
ahuang98 commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r859016703 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.kafka.common.record.RecordVersion; + +public enum MetadataVersion { +IBP_0_8_0(-1), +IBP_0_8_1(-1), +IBP_0_8_2(-1), +IBP_0_9_0(-1), + +// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format. +IBP_0_10_0_IV0(-1), + +// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake). +IBP_0_10_0_IV1(-1), + +// introduced for JoinGroup protocol change in KIP-62 +IBP_0_10_1_IV0(-1), + +// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit). +IBP_0_10_1_IV1(-1), + +// introduced ListOffsetRequest v1 in KIP-79 +IBP_0_10_1_IV2(-1), + +// introduced UpdateMetadataRequest v3 in KIP-103 +IBP_0_10_2_IV0(-1), + +// KIP-98 (idempotent and transactional producer support) +IBP_0_11_0_IV0(-1), + +// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107 +IBP_0_11_0_IV1(-1), + +// Introduced leader epoch fetches to the replica fetcher via KIP-101 +IBP_0_11_0_IV2(-1), + +// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112 +IBP_1_0_IV0(-1), + +// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests, +// and KafkaStorageException for fetch requests. +IBP_1_1_IV0(-1), + +// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log divergence between leader and follower after fast leader fail over) +IBP_2_0_IV0(-1), + +// Several request versions were bumped due to KIP-219 (Improve quota communication) +IBP_2_0_IV1(-1), + +// Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211) +IBP_2_1_IV0(-1), + +// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320) +IBP_2_1_IV1(-1), + +// Support ZStandard Compression Codec (KIP-110) +IBP_2_1_IV2(-1), + +// Introduced broker generation (KIP-380), and +// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1 +IBP_2_2_IV0(-1), + +// New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207) +IBP_2_2_IV1(-1), + +// Introduced static membership. +IBP_2_3_IV0(-1), + +// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, and replica_id to OffsetsForLeaderRequest +IBP_2_3_IV1(-1), + +// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest +IBP_2_4_IV0(-1), + +// Flexible version support in inter-broker APIs +IBP_2_4_IV1(-1), + +// No new APIs, equivalent to 2.4-IV1 +IBP_2_5_IV0(-1), + +// Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570) +IBP_2_6_IV0(-1), + +// Introduced feature versioning support (KIP-584) +IBP_2_7_IV0(-1), + +// Bup Fetch protocol for Raft protocol (KIP-595) +IBP_2_7_IV1(-1), + +// Introduced AlterPartition (KIP-497) +IBP_2_7_IV2(-1), + +// Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516) +IBP_2_8_IV0(-1), + +// Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516) +IBP_2_8_IV1(-1), + +// Introduce AllocateProducerIds (KIP-730) +IBP_3_0_IV0(1), + +// Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734) +// Assume message format version is 3.0 (KIP-724) +IBP_3_0_IV1(2), + +// Adds topic IDs to Fetch requests/responses (KIP-516) +IBP_3_1_IV0(3), + +// Support for leader recovery for unclean leader election (KIP-704) +IBP_3_2_IV0(4), + +// KRaft GA +IBP_3_3_IV0(5); + +private final Optional
[jira] [Resolved] (KAFKA-13647) RocksDb metrics 'number-open-files' is not correct
[ https://issues.apache.org/jira/browse/KAFKA-13647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-13647. --- Resolution: Incomplete I resolved the ticket for now as incomplete, since the streams code cannot alone fix the issue, since it's on the rocksDB side to fix. > RocksDb metrics 'number-open-files' is not correct > -- > > Key: KAFKA-13647 > URL: https://issues.apache.org/jira/browse/KAFKA-13647 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 3.0.0 >Reporter: Sylvain Le Gouellec >Priority: Major > Attachments: image-2022-02-07-16-06-25-304.png, > image-2022-02-07-16-06-39-821.png, image-2022-02-07-16-06-53-164.png > > > We were looking at RocksDB metrics and noticed that the {{number-open-files}} > metric behaves like a counter, rather than a gauge. > Looking at the code, we think there is a small error in the type of metric > for that specific mbean (should be a value metric rather than a sum metric). > See [ > https://github.com/apache/kafka/blob/ca5d6f9229c170beb23809159113037f05a1120f/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482] -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] mumrah commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
mumrah commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r859055329 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.kafka.common.record.RecordVersion; + +public enum MetadataVersion { +IBP_0_8_0(-1), +IBP_0_8_1(-1), +IBP_0_8_2(-1), +IBP_0_9_0(-1), + +// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format. +IBP_0_10_0_IV0(-1), + +// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake). +IBP_0_10_0_IV1(-1), + +// introduced for JoinGroup protocol change in KIP-62 +IBP_0_10_1_IV0(-1), + +// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit). +IBP_0_10_1_IV1(-1), + +// introduced ListOffsetRequest v1 in KIP-79 +IBP_0_10_1_IV2(-1), + +// introduced UpdateMetadataRequest v3 in KIP-103 +IBP_0_10_2_IV0(-1), + +// KIP-98 (idempotent and transactional producer support) +IBP_0_11_0_IV0(-1), + +// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107 +IBP_0_11_0_IV1(-1), + +// Introduced leader epoch fetches to the replica fetcher via KIP-101 +IBP_0_11_0_IV2(-1), + +// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112 +IBP_1_0_IV0(-1), + +// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests, +// and KafkaStorageException for fetch requests. +IBP_1_1_IV0(-1), + +// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log divergence between leader and follower after fast leader fail over) +IBP_2_0_IV0(-1), + +// Several request versions were bumped due to KIP-219 (Improve quota communication) +IBP_2_0_IV1(-1), + +// Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211) +IBP_2_1_IV0(-1), + +// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320) +IBP_2_1_IV1(-1), + +// Support ZStandard Compression Codec (KIP-110) +IBP_2_1_IV2(-1), + +// Introduced broker generation (KIP-380), and +// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1 +IBP_2_2_IV0(-1), + +// New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207) +IBP_2_2_IV1(-1), + +// Introduced static membership. +IBP_2_3_IV0(-1), + +// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, and replica_id to OffsetsForLeaderRequest +IBP_2_3_IV1(-1), + +// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest +IBP_2_4_IV0(-1), + +// Flexible version support in inter-broker APIs +IBP_2_4_IV1(-1), + +// No new APIs, equivalent to 2.4-IV1 +IBP_2_5_IV0(-1), + +// Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570) +IBP_2_6_IV0(-1), + +// Introduced feature versioning support (KIP-584) +IBP_2_7_IV0(-1), + +// Bup Fetch protocol for Raft protocol (KIP-595) +IBP_2_7_IV1(-1), + +// Introduced AlterPartition (KIP-497) +IBP_2_7_IV2(-1), + +// Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516) +IBP_2_8_IV0(-1), + +// Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516) +IBP_2_8_IV1(-1), + +// Introduce AllocateProducerIds (KIP-730) +IBP_3_0_IV0(1), + +// Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734) +// Assume message format version is 3.0 (KIP-724) +IBP_3_0_IV1(2), + +// Adds topic IDs to Fetch requests/responses (KIP-516) +IBP_3_1_IV0(3), + +// Support for leader recovery for unclean leader election (KIP-704) +IBP_3_2_IV0(4), + +// KRaft GA Review Comment: @ijuma are you referring to the
[GitHub] [kafka] ijuma commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
ijuma commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r859091472 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.kafka.common.record.RecordVersion; + +public enum MetadataVersion { +IBP_0_8_0(-1), +IBP_0_8_1(-1), +IBP_0_8_2(-1), +IBP_0_9_0(-1), + +// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format. +IBP_0_10_0_IV0(-1), + +// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake). +IBP_0_10_0_IV1(-1), + +// introduced for JoinGroup protocol change in KIP-62 +IBP_0_10_1_IV0(-1), + +// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit). +IBP_0_10_1_IV1(-1), + +// introduced ListOffsetRequest v1 in KIP-79 +IBP_0_10_1_IV2(-1), + +// introduced UpdateMetadataRequest v3 in KIP-103 +IBP_0_10_2_IV0(-1), + +// KIP-98 (idempotent and transactional producer support) +IBP_0_11_0_IV0(-1), + +// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107 +IBP_0_11_0_IV1(-1), + +// Introduced leader epoch fetches to the replica fetcher via KIP-101 +IBP_0_11_0_IV2(-1), + +// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112 +IBP_1_0_IV0(-1), + +// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests, +// and KafkaStorageException for fetch requests. +IBP_1_1_IV0(-1), + +// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log divergence between leader and follower after fast leader fail over) +IBP_2_0_IV0(-1), + +// Several request versions were bumped due to KIP-219 (Improve quota communication) +IBP_2_0_IV1(-1), + +// Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211) +IBP_2_1_IV0(-1), + +// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320) +IBP_2_1_IV1(-1), + +// Support ZStandard Compression Codec (KIP-110) +IBP_2_1_IV2(-1), + +// Introduced broker generation (KIP-380), and +// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1 +IBP_2_2_IV0(-1), + +// New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207) +IBP_2_2_IV1(-1), + +// Introduced static membership. +IBP_2_3_IV0(-1), + +// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, and replica_id to OffsetsForLeaderRequest +IBP_2_3_IV1(-1), + +// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest +IBP_2_4_IV0(-1), + +// Flexible version support in inter-broker APIs +IBP_2_4_IV1(-1), + +// No new APIs, equivalent to 2.4-IV1 +IBP_2_5_IV0(-1), + +// Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570) +IBP_2_6_IV0(-1), + +// Introduced feature versioning support (KIP-584) +IBP_2_7_IV0(-1), + +// Bup Fetch protocol for Raft protocol (KIP-595) +IBP_2_7_IV1(-1), + +// Introduced AlterPartition (KIP-497) +IBP_2_7_IV2(-1), + +// Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516) +IBP_2_8_IV0(-1), + +// Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516) +IBP_2_8_IV1(-1), + +// Introduce AllocateProducerIds (KIP-730) +IBP_3_0_IV0(1), + +// Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734) +// Assume message format version is 3.0 (KIP-724) +IBP_3_0_IV1(2), + +// Adds topic IDs to Fetch requests/responses (KIP-516) +IBP_3_1_IV0(3), + +// Support for leader recovery for unclean leader election (KIP-704) +IBP_3_2_IV0(4), + +// KRaft GA +IBP_3_3_IV0(5); + +private final Optional me
[GitHub] [kafka] junrao commented on a diff in pull request #12029: KAFKA-13815: Avoid reinitialization for a replica that is being deleted
junrao commented on code in PR #12029: URL: https://github.com/apache/kafka/pull/12029#discussion_r859114446 ## core/src/main/scala/kafka/log/UnifiedLog.scala: ## @@ -674,21 +682,29 @@ class UnifiedLog(@volatile var logStartOffset: Long, } /** - * Rename the directory of the local log + * Rename the directory of the local log. If the log's directory is being renamed for async deletion due to a + * StopReplica request, then the shouldReinitialize parameter should be set to false, otherwise it should be set to true. * + * @param name The new name that this log's directory is being renamed to + * @param shouldReinitialize Whether the log's metadata should be reinitialized after renaming * @throws KafkaStorageException if rename fails */ - def renameDir(name: String): Unit = { + def renameDir(name: String, shouldReinitialize: Boolean): Unit = { lock synchronized { maybeHandleIOException(s"Error while renaming dir for $topicPartition in log dir ${dir.getParent}") { // Flush partitionMetadata file before initializing again maybeFlushMetadataFile() Review Comment: Yes, it's probably useful to cover that. What do you think @gitlw ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13858) Kraft should not shutdown metadata listener until controller shutdown is finished
Jason Gustafson created KAFKA-13858: --- Summary: Kraft should not shutdown metadata listener until controller shutdown is finished Key: KAFKA-13858 URL: https://issues.apache.org/jira/browse/KAFKA-13858 Project: Kafka Issue Type: Bug Reporter: Jason Gustafson Assignee: Jason Gustafson When the kraft broker begins controlled shutdown, it immediately disables the metadata listener. This means that metadata changes as part of the controlled shutdown do not get sent to the respective components. For partitions that the broker is follower of, that is what we want. It prevents the follower from being able to rejoin the ISR while still shutting down. But for partitions that the broker is leading, it means the leader will remain active until controlled shutdown is complete. In the zk world, we have an explicit request `StopReplica` which serves the purpose of shutting down both follower and leader, but we don't have something similar in kraft. For KRaft, we may not necessarily need an explicit signal like this. We know that the broker is shutting down, so we can treat partition changes as implicit `StopReplica` requests rather than going through the normal `LeaderAndIsr` flow. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13858) Kraft should not shutdown metadata listener until controller shutdown is finished
[ https://issues.apache.org/jira/browse/KAFKA-13858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13858: Description: When the kraft broker begins controlled shutdown, it immediately disables the metadata listener. This means that metadata changes as part of the controlled shutdown do not get sent to the respective components. For partitions that the broker is follower of, that is what we want. It prevents the follower from being able to rejoin the ISR while still shutting down. But for partitions that the broker is leading, it means the leader will remain active until controlled shutdown finishes and the socket server is stopped. That delay can be as much as 5 seconds and probably even worse. In the zk world, we have an explicit request `StopReplica` which serves the purpose of shutting down both follower and leader, but we don't have something similar in kraft. For KRaft, we may not necessarily need an explicit signal like this. We know that the broker is shutting down, so we can treat partition changes as implicit `StopReplica` requests rather than going through the normal `LeaderAndIsr` flow. was: When the kraft broker begins controlled shutdown, it immediately disables the metadata listener. This means that metadata changes as part of the controlled shutdown do not get sent to the respective components. For partitions that the broker is follower of, that is what we want. It prevents the follower from being able to rejoin the ISR while still shutting down. But for partitions that the broker is leading, it means the leader will remain active until controlled shutdown is complete. In the zk world, we have an explicit request `StopReplica` which serves the purpose of shutting down both follower and leader, but we don't have something similar in kraft. For KRaft, we may not necessarily need an explicit signal like this. We know that the broker is shutting down, so we can treat partition changes as implicit `StopReplica` requests rather than going through the normal `LeaderAndIsr` flow. > Kraft should not shutdown metadata listener until controller shutdown is > finished > - > > Key: KAFKA-13858 > URL: https://issues.apache.org/jira/browse/KAFKA-13858 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > > When the kraft broker begins controlled shutdown, it immediately disables the > metadata listener. This means that metadata changes as part of the controlled > shutdown do not get sent to the respective components. For partitions that > the broker is follower of, that is what we want. It prevents the follower > from being able to rejoin the ISR while still shutting down. But for > partitions that the broker is leading, it means the leader will remain active > until controlled shutdown finishes and the socket server is stopped. That > delay can be as much as 5 seconds and probably even worse. > In the zk world, we have an explicit request `StopReplica` which serves the > purpose of shutting down both follower and leader, but we don't have > something similar in kraft. For KRaft, we may not necessarily need an > explicit signal like this. We know that the broker is shutting down, so we > can treat partition changes as implicit `StopReplica` requests rather than > going through the normal `LeaderAndIsr` flow. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] ahuang98 commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
ahuang98 commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r859155538 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.kafka.common.record.RecordVersion; + +public enum MetadataVersion { +IBP_0_8_0(-1), +IBP_0_8_1(-1), +IBP_0_8_2(-1), +IBP_0_9_0(-1), + +// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format. +IBP_0_10_0_IV0(-1), + +// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake). +IBP_0_10_0_IV1(-1), + +// introduced for JoinGroup protocol change in KIP-62 +IBP_0_10_1_IV0(-1), + +// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit). +IBP_0_10_1_IV1(-1), + +// introduced ListOffsetRequest v1 in KIP-79 +IBP_0_10_1_IV2(-1), + +// introduced UpdateMetadataRequest v3 in KIP-103 +IBP_0_10_2_IV0(-1), + +// KIP-98 (idempotent and transactional producer support) +IBP_0_11_0_IV0(-1), + +// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107 +IBP_0_11_0_IV1(-1), + +// Introduced leader epoch fetches to the replica fetcher via KIP-101 +IBP_0_11_0_IV2(-1), + +// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112 +IBP_1_0_IV0(-1), + +// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests, +// and KafkaStorageException for fetch requests. +IBP_1_1_IV0(-1), + +// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log divergence between leader and follower after fast leader fail over) +IBP_2_0_IV0(-1), + +// Several request versions were bumped due to KIP-219 (Improve quota communication) +IBP_2_0_IV1(-1), + +// Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211) +IBP_2_1_IV0(-1), + +// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320) +IBP_2_1_IV1(-1), + +// Support ZStandard Compression Codec (KIP-110) +IBP_2_1_IV2(-1), + +// Introduced broker generation (KIP-380), and +// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1 +IBP_2_2_IV0(-1), + +// New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207) +IBP_2_2_IV1(-1), + +// Introduced static membership. +IBP_2_3_IV0(-1), + +// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, and replica_id to OffsetsForLeaderRequest +IBP_2_3_IV1(-1), + +// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest +IBP_2_4_IV0(-1), + +// Flexible version support in inter-broker APIs +IBP_2_4_IV1(-1), + +// No new APIs, equivalent to 2.4-IV1 +IBP_2_5_IV0(-1), + +// Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570) +IBP_2_6_IV0(-1), + +// Introduced feature versioning support (KIP-584) +IBP_2_7_IV0(-1), + +// Bup Fetch protocol for Raft protocol (KIP-595) +IBP_2_7_IV1(-1), + +// Introduced AlterPartition (KIP-497) +IBP_2_7_IV2(-1), + +// Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516) +IBP_2_8_IV0(-1), + +// Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516) +IBP_2_8_IV1(-1), + +// Introduce AllocateProducerIds (KIP-730) +IBP_3_0_IV0(1), + +// Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734) +// Assume message format version is 3.0 (KIP-724) +IBP_3_0_IV1(2), + +// Adds topic IDs to Fetch requests/responses (KIP-516) +IBP_3_1_IV0(3), + +// Support for leader recovery for unclean leader election (KIP-704) +IBP_3_2_IV0(4), + +// KRaft GA +IBP_3_3_IV0(5); + +private final Optional
[GitHub] [kafka] hachikuji opened a new pull request, #12098: MINOR: Fix event output inconsistencies in TransactionalMessageCopier
hachikuji opened a new pull request, #12098: URL: https://github.com/apache/kafka/pull/12098 There is some strangeness and inconsistencies in the messages written by `TransactionalMessageCopier` to stdout. Here is a sample of two messages. Progress message: ``` {"consumed":33000,"stage":"ProcessLoop","totalProcessed":33000,"progress":"copier-0","time":"2022/04/24 05:40:31:649","remaining":333} ``` The `transactionalId` is set to the value of the `progress` key. And a shutdown message: ``` {"consumed":3,"shutdown_complete":"copier-0","totalProcessed":3,"time":"2022/04/24 05:40:31:937","remaining":0} ``` The `transactionalId` this time is set to the `shutdown_complete` key. This patch fixes these issues with the following: 1. Use a separate key for the `transactionalId`. 2. Drop the `progress` and `shutdown_complete` fields. 3. Use `stage=ShutdownComplete` in the shutdown message. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ahuang98 commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
ahuang98 commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r859155538 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.kafka.common.record.RecordVersion; + +public enum MetadataVersion { +IBP_0_8_0(-1), +IBP_0_8_1(-1), +IBP_0_8_2(-1), +IBP_0_9_0(-1), + +// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format. +IBP_0_10_0_IV0(-1), + +// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake). +IBP_0_10_0_IV1(-1), + +// introduced for JoinGroup protocol change in KIP-62 +IBP_0_10_1_IV0(-1), + +// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit). +IBP_0_10_1_IV1(-1), + +// introduced ListOffsetRequest v1 in KIP-79 +IBP_0_10_1_IV2(-1), + +// introduced UpdateMetadataRequest v3 in KIP-103 +IBP_0_10_2_IV0(-1), + +// KIP-98 (idempotent and transactional producer support) +IBP_0_11_0_IV0(-1), + +// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107 +IBP_0_11_0_IV1(-1), + +// Introduced leader epoch fetches to the replica fetcher via KIP-101 +IBP_0_11_0_IV2(-1), + +// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112 +IBP_1_0_IV0(-1), + +// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests, +// and KafkaStorageException for fetch requests. +IBP_1_1_IV0(-1), + +// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log divergence between leader and follower after fast leader fail over) +IBP_2_0_IV0(-1), + +// Several request versions were bumped due to KIP-219 (Improve quota communication) +IBP_2_0_IV1(-1), + +// Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211) +IBP_2_1_IV0(-1), + +// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320) +IBP_2_1_IV1(-1), + +// Support ZStandard Compression Codec (KIP-110) +IBP_2_1_IV2(-1), + +// Introduced broker generation (KIP-380), and +// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1 +IBP_2_2_IV0(-1), + +// New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207) +IBP_2_2_IV1(-1), + +// Introduced static membership. +IBP_2_3_IV0(-1), + +// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, and replica_id to OffsetsForLeaderRequest +IBP_2_3_IV1(-1), + +// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest +IBP_2_4_IV0(-1), + +// Flexible version support in inter-broker APIs +IBP_2_4_IV1(-1), + +// No new APIs, equivalent to 2.4-IV1 +IBP_2_5_IV0(-1), + +// Introduced StopReplicaRequest V3 containing the leader epoch for each partition (KIP-570) +IBP_2_6_IV0(-1), + +// Introduced feature versioning support (KIP-584) +IBP_2_7_IV0(-1), + +// Bup Fetch protocol for Raft protocol (KIP-595) +IBP_2_7_IV1(-1), + +// Introduced AlterPartition (KIP-497) +IBP_2_7_IV2(-1), + +// Flexible versioning on ListOffsets, WriteTxnMarkers and OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516) +IBP_2_8_IV0(-1), + +// Introduced topic IDs to LeaderAndIsr and UpdateMetadata requests/responses (KIP-516) +IBP_2_8_IV1(-1), + +// Introduce AllocateProducerIds (KIP-730) +IBP_3_0_IV0(1), + +// Introduce ListOffsets V7 which supports listing offsets by max timestamp (KIP-734) +// Assume message format version is 3.0 (KIP-724) +IBP_3_0_IV1(2), + +// Adds topic IDs to Fetch requests/responses (KIP-516) +IBP_3_1_IV0(3), + +// Support for leader recovery for unclean leader election (KIP-704) +IBP_3_2_IV0(4), + +// KRaft GA +IBP_3_3_IV0(5); + +private final Optional
[jira] [Updated] (KAFKA-13858) Kraft should not shutdown metadata listener until controller shutdown is finished
[ https://issues.apache.org/jira/browse/KAFKA-13858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-13858: Labels: kip-500 (was: ) > Kraft should not shutdown metadata listener until controller shutdown is > finished > - > > Key: KAFKA-13858 > URL: https://issues.apache.org/jira/browse/KAFKA-13858 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Assignee: Jason Gustafson >Priority: Major > Labels: kip-500 > > When the kraft broker begins controlled shutdown, it immediately disables the > metadata listener. This means that metadata changes as part of the controlled > shutdown do not get sent to the respective components. For partitions that > the broker is follower of, that is what we want. It prevents the follower > from being able to rejoin the ISR while still shutting down. But for > partitions that the broker is leading, it means the leader will remain active > until controlled shutdown finishes and the socket server is stopped. That > delay can be as much as 5 seconds and probably even worse. > In the zk world, we have an explicit request `StopReplica` which serves the > purpose of shutting down both follower and leader, but we don't have > something similar in kraft. For KRaft, we may not necessarily need an > explicit signal like this. We know that the broker is shutting down, so we > can treat partition changes as implicit `StopReplica` requests rather than > going through the normal `LeaderAndIsr` flow. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] ijuma commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion
ijuma commented on code in PR #12072: URL: https://github.com/apache/kafka/pull/12072#discussion_r859163758 ## server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java: ## @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.common; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.kafka.common.record.RecordVersion; + +/** + * This class contains the different Kafka versions. + * Right now, we use them for upgrades - users can configure the version of the API brokers will use to communicate between themselves. + * This is only for inter-broker communications - when communicating with clients, the client decides on the API version. + * + * Note that the ID we initialize for each version is important. + * We consider a version newer than another if it is lower in the enum list (to avoid depending on lexicographic order) + * + * Since the api protocol may change more than once within the same release and to facilitate people deploying code from + * trunk, we have the concept of internal versions (first introduced during the 0.10.0 development cycle). For example, + * the first time we introduce a version change in a release, say 0.10.0, we will add a config value "0.10.0-IV0" and a + * corresponding enum constant IBP_0_10_0-IV0. We will also add a config value "0.10.0" that will be mapped to the + * latest internal version object, which is IBP_0_10_0-IV0. When we change the protocol a second time while developing + * 0.10.0, we will add a new config value "0.10.0-IV1" and a corresponding enum constant IBP_0_10_0-IV1. We will change + * the config value "0.10.0" to map to the latest internal version IBP_0_10_0-IV1. The config value of + * "0.10.0-IV0" is still mapped to IBP_0_10_0-IV0. This way, if people are deploying from trunk, they can use + * "0.10.0-IV0" and "0.10.0-IV1" to upgrade one internal version at a time. For most people who just want to use + * released version, they can use "0.10.0" when upgrading to the 0.10.0 release. + */ +public enum MetadataVersion { +IBP_0_8_0(-1), +IBP_0_8_1(-1), +IBP_0_8_2(-1), +IBP_0_9_0(-1), + +// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format. +IBP_0_10_0_IV0(-1), + +// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL handshake). +IBP_0_10_0_IV1(-1), + +// introduced for JoinGroup protocol change in KIP-62 +IBP_0_10_1_IV0(-1), + +// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit). +IBP_0_10_1_IV1(-1), + +// introduced ListOffsetRequest v1 in KIP-79 +IBP_0_10_1_IV2(-1), + +// introduced UpdateMetadataRequest v3 in KIP-103 +IBP_0_10_2_IV0(-1), + +// KIP-98 (idempotent and transactional producer support) +IBP_0_11_0_IV0(-1), + +// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107 +IBP_0_11_0_IV1(-1), + +// Introduced leader epoch fetches to the replica fetcher via KIP-101 +IBP_0_11_0_IV2(-1), + +// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and FetchRequest V6 via KIP-112 +IBP_1_0_IV0(-1), + +// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental fetch requests, +// and KafkaStorageException for fetch requests. +IBP_1_1_IV0(-1), + +// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log divergence between leader and follower after fast leader fail over) +IBP_2_0_IV0(-1), + +// Several request versions were bumped due to KIP-219 (Improve quota communication) +IBP_2_0_IV1(-1), + +// Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211) +IBP_2_1_IV0(-1), + +// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320) +IBP_2_1_IV1(-1), + +// Support ZStandard Compression Codec (KIP-110) +IBP_2_1_IV2(-1), + +// Introduced broker generation (KIP-380), and +// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1 +IBP_2_2_IV0(-1), + +// New error
[GitHub] [kafka] guozhangwang merged pull request #12094: MINOR: fix html generation syntax errors
guozhangwang merged PR #12094: URL: https://github.com/apache/kafka/pull/12094 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12052: KAFKA-13799: Improve documentation for Kafka zero-copy
RivenSun2 commented on code in PR #12052: URL: https://github.com/apache/kafka/pull/12052#discussion_r859296011 ## docs/design.html: ## @@ -125,6 +125,9 @@ This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache. +TLS/SSL libraries operate at the user space (in-kernel SSL_sendfile is currently not supported by Kafka). Due to this restriction, sendfile could not be used when transport layer enables SSL protocol. For enabling Review Comment: @ijuma Thanks for your review. Do I need to recreate a PR to improve the semantics of this documentation? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a diff in pull request #12052: KAFKA-13799: Improve documentation for Kafka zero-copy
ijuma commented on code in PR #12052: URL: https://github.com/apache/kafka/pull/12052#discussion_r859326941 ## docs/design.html: ## @@ -125,6 +125,9 @@ This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache. +TLS/SSL libraries operate at the user space (in-kernel SSL_sendfile is currently not supported by Kafka). Due to this restriction, sendfile could not be used when transport layer enables SSL protocol. For enabling Review Comment: You can submit a new one with just this change if you agree it reads better. ## docs/design.html: ## @@ -125,6 +125,9 @@ This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache. +TLS/SSL libraries operate at the user space (in-kernel SSL_sendfile is currently not supported by Kafka). Due to this restriction, sendfile could not be used when transport layer enables SSL protocol. For enabling Review Comment: Nit: "could not be used when transport layer enables SSL protocol" -> "is not used when SSL is enabled" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13824) Pass time object from constructor so that we can mock it if needed
[ https://issues.apache.org/jira/browse/KAFKA-13824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-13824: -- Description: (was: In [https://github.com/apache/kafka/pull/11896,] for streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java) > Pass time object from constructor so that we can mock it if needed > -- > > Key: KAFKA-13824 > URL: https://issues.apache.org/jira/browse/KAFKA-13824 > Project: Kafka > Issue Type: Improvement >Reporter: Hao Li >Priority: Minor > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13824) Pass time object from constructor so that we can mock it if needed
[ https://issues.apache.org/jira/browse/KAFKA-13824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-13824: -- Labels: test (was: ) > Pass time object from constructor so that we can mock it if needed > -- > > Key: KAFKA-13824 > URL: https://issues.apache.org/jira/browse/KAFKA-13824 > Project: Kafka > Issue Type: Improvement >Reporter: Hao Li >Priority: Minor > Labels: test > -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Updated] (KAFKA-13824) Pass time object from constructor so that we can mock it if needed
[ https://issues.apache.org/jira/browse/KAFKA-13824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-13824: -- Description: We pass in the {{Time}} object in the StreamThread, which is used to measure various metrics so that we can mock the time in tests. We should extend this to lower-level metrics, more specifically: * KafkaStreams (instance-level): already done. * StreamThread (thread-level): already done, in runtime passed through from instance. * ProcessingContext (task-level). * MeteredStore (store-level). * StreamTask (task-level). * ProcessorNode (processor-node-level): only a few processors that need to expose this level latency would need to have the {{Time}} object passed through. > Pass time object from constructor so that we can mock it if needed > -- > > Key: KAFKA-13824 > URL: https://issues.apache.org/jira/browse/KAFKA-13824 > Project: Kafka > Issue Type: Improvement >Reporter: Hao Li >Priority: Minor > Labels: test > > We pass in the {{Time}} object in the StreamThread, which is used to measure > various metrics so that we can mock the time in tests. We should extend this > to lower-level metrics, more specifically: > * KafkaStreams (instance-level): already done. > * StreamThread (thread-level): already done, in runtime passed through from > instance. > * ProcessingContext (task-level). > * MeteredStore (store-level). > * StreamTask (task-level). > * ProcessorNode (processor-node-level): only a few processors that need to > expose this level latency would need to have the {{Time}} object passed > through. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12052: KAFKA-13799: Improve documentation for Kafka zero-copy
RivenSun2 commented on code in PR #12052: URL: https://github.com/apache/kafka/pull/12052#discussion_r859353961 ## docs/design.html: ## @@ -125,6 +125,9 @@ This combination of pagecache and sendfile means that on a Kafka cluster where the consumers are mostly caught up you will see no read activity on the disks whatsoever as they will be serving data entirely from cache. +TLS/SSL libraries operate at the user space (in-kernel SSL_sendfile is currently not supported by Kafka). Due to this restriction, sendfile could not be used when transport layer enables SSL protocol. For enabling Review Comment: Sure , thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] RivenSun2 opened a new pull request, #12099: MINOR: Improve ssl description in zero-copy docs
RivenSun2 opened a new pull request, #12099: URL: https://github.com/apache/kafka/pull/12099 Improvements to PR #12052, improve ssl description in zero-copy docs ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a diff in pull request #11896: KAFKA-13785: [6/N][Emit final] emit final for TimeWindowedKStreamImpl
guozhangwang commented on code in PR #11896: URL: https://github.com/apache/kafka/pull/11896#discussion_r859364452 ## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java: ## @@ -184,6 +247,78 @@ public void process(final Record record) { droppedRecordsSensor.record(); } } + +tryEmitFinalResult(record, closeTime); +} + +private void tryEmitFinalResult(final Record record, final long closeTime) { +if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) { +return; +} + +final long now = internalProcessorContext.currentSystemTimeMs(); +// Throttle emit frequency +if (now < timeTracker.nextTimeToEmit) { +return; +} + +// Schedule next emit time based on now to avoid the case that if system time jumps a lot, +// this can be triggered everytime +timeTracker.nextTimeToEmit = now; +timeTracker.advanceNextTimeToEmit(); + +// Close time does not progress +if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && lastEmitCloseTime >= closeTime) { +return; +} + +final long emitRangeUpperBoundInclusive = closeTime - windows.size(); +if (emitRangeUpperBoundInclusive < 0) { +// If emitRangeUpperBoundInclusive is 0, it means first window closes since windowEndTime +// is exclusive +return; +} + +// Because we only get here when emitRangeUpperBoundInclusive > 0 which means closeTime > windows.size() +// Since we set lastEmitCloseTime to closeTime before storing to processor metadata +// lastEmitCloseTime - windows.size() is always > 0 +// Set emitRangeLowerBoundInclusive to -1L if not set so that when we fetchAll, we fetch from 0L +final long emitRangeLowerBoundInclusive = lastEmitCloseTime == ConsumerRecord.NO_TIMESTAMP ? +-1L : lastEmitCloseTime - windows.size(); + +if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP) { +final Map matchedCloseWindows = windows.windowsFor(emitRangeUpperBoundInclusive); +final Map matchedEmitWindows = windows.windowsFor(emitRangeLowerBoundInclusive); + +// Don't fetch store if the new emit window close time doesn't progress enough to cover next +// window +if (matchedCloseWindows.equals(matchedEmitWindows)) { +log.trace("no new windows to emit. LastEmitCloseTime={}, newCloseTime={}", +lastEmitCloseTime, closeTime); +return; +} +} + +final long startNs = time.nanoseconds(); Review Comment: If we do intend to use nanoseconds instead of milliseconds, then we should name the metrics name as "...-latency-ns" and also in the description to emphasize it is measured in nanos, since by default all latency are measured in millis across AK package unless otherwise explicitly named / described. Personally I think it's sufficient to measure in milis. WDYT @lihaosky @mjsax ? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java: ## @@ -62,6 +67,17 @@ private ProcessorNodeMetrics() {} private static final String FORWARD_RATE_DESCRIPTION = RATE_DESCRIPTION_PREFIX + FORWARD_DESCRIPTION + RATE_DESCRIPTION_SUFFIX; +private static final String EMITTED_RECORDS = "window-aggregate-final-emit"; +private static final String EMITTED_RECORDS_DESCRIPTION = "emit final records"; +private static final String EMITTED_RECORDS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + EMITTED_RECORDS_DESCRIPTION; +private static final String EMITTED_RECORDS_RATE_DESCRIPTION = +RATE_DESCRIPTION_PREFIX + EMITTED_RECORDS_DESCRIPTION + RATE_DESCRIPTION_SUFFIX; + +private static final String EMIT_FINAL_LATENCY = "window-aggregate-final-emit" + LATENCY_SUFFIX; +private static final String EMIT_FINAL_DESCRIPTION = "calls to emit final"; Review Comment: We can replace with `EMITTED_RECORDS + LATENCY_SUFFIX`. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java: ## @@ -62,6 +67,17 @@ private ProcessorNodeMetrics() {} private static final String FORWARD_RATE_DESCRIPTION = RATE_DESCRIPTION_PREFIX + FORWARD_DESCRIPTION + RATE_DESCRIPTION_SUFFIX; +private static final String EMITTED_RECORDS = "window-aggregate-final-emit"; +private static final String EMITTED_RECORDS_DESCRIPTION = "emit final records"; +private static final String EMITTED_RECORDS_TOTAL_DESCRIPTION = TOTAL_DESCRIPTION + EMITTED_RECORDS_DESCRIPTION;
[GitHub] [kafka] guozhangwang opened a new pull request, #12100: KAFKA-13785: [6/N][Emit final] Copy: Emit final for TimeWindowedKStreamImpl
guozhangwang opened a new pull request, #12100: URL: https://github.com/apache/kafka/pull/12100 This is a copy PR of https://github.com/apache/kafka/pull/11896. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #12100: KAFKA-13785: [6/N][Emit final] Copy: Emit final for TimeWindowedKStreamImpl
guozhangwang commented on PR #12100: URL: https://github.com/apache/kafka/pull/12100#issuecomment-1110551346 The only differences between this copy PR and the original ones are 1) resolved conflicts, 2) incorporated latest comments. cc @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a diff in pull request #11955: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is
showuon commented on code in PR #11955: URL: https://github.com/apache/kafka/pull/11955#discussion_r859425060 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java: ## @@ -222,6 +223,14 @@ public void stop() { connectorStatusMetricsGroup.close(); workerConfigTransformer.close(); +executor.shutdown(); +try { +if (!executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { +executor.shutdownNow(); +} +} catch (InterruptedException e) { +executor.shutdownNow(); +} Review Comment: Not sure if we should follow the 2-phase shutdown in "usage example" in java doc? https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ExecutorService.html -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org