[GitHub] [kafka] showuon commented on a diff in pull request #11438: KAFKA-13403 Fix KafkaServer crashes when deleting topics due to the race in log deletion

2022-04-26 Thread GitBox


showuon commented on code in PR #11438:
URL: https://github.com/apache/kafka/pull/11438#discussion_r858349721


##
clients/src/main/java/org/apache/kafka/common/utils/Utils.java:
##
@@ -853,15 +853,21 @@ public static void delete(final File rootFile) throws 
IOException {
 Files.walkFileTree(rootFile.toPath(), new SimpleFileVisitor() {
 @Override
 public FileVisitResult visitFileFailed(Path path, IOException exc) 
throws IOException {
-// If the root path did not exist, ignore the error; otherwise 
throw it.
-if (exc instanceof NoSuchFileException && 
path.toFile().equals(rootFile))
-return FileVisitResult.TERMINATE;
+if (exc instanceof NoSuchFileException) {
+if (path.toFile().equals(rootFile)) {
+// If the root path did not exist, ignore the error 
and terminate;
+return FileVisitResult.TERMINATE;
+} else {
+//  Otherwise, just continue walking because we don't 
have to delete this file.

Review Comment:
   nit: additional space after //
   Also, could we mention that the file might already be deleted by other 
threads?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13855) FileNotFoundException: Error while rolling log segment for topic partition in dir

2022-04-26 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17527961#comment-17527961
 ] 

Haruki Okada commented on KAFKA-13855:
--

H-mm sorry, sounds like I just overstepped.

Yeah, seems we need to dig into this further. Please nevermind for now.

> FileNotFoundException: Error while rolling log segment for topic partition in 
> dir
> -
>
> Key: KAFKA-13855
> URL: https://issues.apache.org/jira/browse/KAFKA-13855
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.6.1
>Reporter: Sergey Ivanov
>Priority: Major
>
> Hello,
> We faced an issue when one of Kafka broker in cluster has failed with an 
> exception and restarted:
>  
> {code:java}
> [2022-04-13T09:51:44,563][ERROR][category=kafka.server.LogDirFailureChannel] 
> Error while rolling log segment for prod_data_topic-7 in dir 
> /var/opt/kafka/data/1
> java.io.FileNotFoundException: 
> /var/opt/kafka/data/1/prod_data_topic-7/26872377.index (No such 
> file or directory)
>   at java.base/java.io.RandomAccessFile.open0(Native Method)
>   at java.base/java.io.RandomAccessFile.open(Unknown Source)
>   at java.base/java.io.RandomAccessFile.(Unknown Source)
>   at java.base/java.io.RandomAccessFile.(Unknown Source)
>   at kafka.log.AbstractIndex.$anonfun$resize$1(AbstractIndex.scala:183)
>   at kafka.log.AbstractIndex.resize(AbstractIndex.scala:176)
>   at 
> kafka.log.AbstractIndex.$anonfun$trimToValidSize$1(AbstractIndex.scala:242)
>   at kafka.log.AbstractIndex.trimToValidSize(AbstractIndex.scala:242)
>   at kafka.log.LogSegment.onBecomeInactiveSegment(LogSegment.scala:508)
>   at kafka.log.Log.$anonfun$roll$8(Log.scala:1916)
>   at kafka.log.Log.$anonfun$roll$2(Log.scala:1916)
>   at kafka.log.Log.roll(Log.scala:2349)
>   at kafka.log.Log.maybeRoll(Log.scala:1865)
>   at kafka.log.Log.$anonfun$append$2(Log.scala:1169)
>   at kafka.log.Log.append(Log.scala:2349)
>   at kafka.log.Log.appendAsLeader(Log.scala:1019)
>   at 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:984)
>   at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:972)
>   at 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:883)
>   at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
>   at 
> scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)
>   at scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237)
>   at scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230)
>   at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44)
>   at scala.collection.mutable.HashMap.foreach(HashMap.scala:149)
>   at scala.collection.TraversableLike.map(TraversableLike.scala:273)
>   at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>   at 
> kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:871)
>   at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:571)
>   at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:605)
>   at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>   at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:70)
>   at java.base/java.lang.Thread.run(Unknown Source)
> [2022-04-13T09:51:44,812][ERROR][category=kafka.log.LogManager] Shutdown 
> broker because all log dirs in /var/opt/kafka/data/1 have failed {code}
> There are no any additional useful information in logs, just one warn before 
> this error:
> {code:java}
> [2022-04-13T09:51:44,720][WARN][category=kafka.server.ReplicaManager] 
> [ReplicaManager broker=1] Broker 1 stopped fetcher for partitions 
> __consumer_offsets-22,prod_data_topic-5,__consumer_offsets-30,
> 
> prod_data_topic-0 and stopped moving logs for partitions  because they are in 
> the failed log directory /var/opt/kafka/data/1.
> [2022-04-13T09:51:44,720][WARN][category=kafka.log.LogManager] Stopping 
> serving logs in dir /var/opt/kafka/data/1{code}
> The topic configuration is:
> {code:java}
> /opt/kafka $ ./bin/kafka-topics.sh --bootstrap-server localhost:9092 
> --describe --topic prod_data_topic
> Topic: prod_data_topic        PartitionCount: 12      ReplicationFactor: 3    
> Configs: 
> min.insync.replicas=2,segment.bytes=1073741824,max.message.bytes=15728640,retention.bytes=4294967296
>         Topic: prod_data_topic        Partition: 0    Leader: 3       
> Replicas: 3,1,2 Isr: 3,2,1
>         Topic: prod_data_topic        Partition: 1    Leader: 1       
> Replicas: 1,2,3 Isr: 3,2,1
>         Topic: 

[jira] [Comment Edited] (KAFKA-13403) KafkaServer crashes when deleting topics due to the race in log deletion

2022-04-26 Thread Haruki Okada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17527800#comment-17527800
 ] 

Haruki Okada edited comment on KAFKA-13403 at 4/26/22 7:35 AM:
---

[~showuon] Hi, could you help reviewing the PR 
[https://github.com/apache/kafka/pull/11438] ?

 

-There seems to be another ticket likely due to the same cause: 
https://issues.apache.org/jira/browse/KAFKA-13855-


After took another look at 13855, seems currently there's no clue to conclude 
it is the same cause.


was (Author: ocadaruma):
[~showuon] Hi, could you help reviewing the PR 
[https://github.com/apache/kafka/pull/11438] ?

 

There seems to be another ticket likely due to the same cause: 
https://issues.apache.org/jira/browse/KAFKA-13855

> KafkaServer crashes when deleting topics due to the race in log deletion
> 
>
> Key: KAFKA-13403
> URL: https://issues.apache.org/jira/browse/KAFKA-13403
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.4.1
>Reporter: Haruki Okada
>Assignee: Haruki Okada
>Priority: Major
>
> h2. Environment
>  * OS: CentOS Linux release 7.6
>  * Kafka version: 2.4.1
>  * 
>  ** But as far as I checked the code, I think same phenomenon could happen 
> even on trunk
>  * Kafka log directory: RAID1+0 (i.e. not using JBOD so only single log.dirs 
> is set)
>  * Java version: AdoptOpenJDK 1.8.0_282
> h2. Phenomenon
> When we were in the middle of deleting several topics by `kafka-topics.sh 
> --delete --topic blah-blah`, one broker in our cluster crashed due to 
> following exception:
>  
> {code:java}
> [2021-10-21 18:19:19,122] ERROR Shutdown broker because all log dirs in 
> /data/kafka have failed (kafka.log.LogManager)
> {code}
>  
>  
> We also found NoSuchFileException was thrown right before the crash when 
> LogManager tried to delete logs for some partitions.
>  
> {code:java}
> [2021-10-21 18:19:18,849] ERROR Error while deleting log for foo-bar-topic-5 
> in dir /data/kafka (kafka.server.LogDirFailureChannel)
> java.nio.file.NoSuchFileException: 
> /data/kafka/foo-bar-topic-5.df3626d2d9eb41a2aeb0b8d55d7942bd-delete/03877066.timeindex.deleted
> at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
> at 
> sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
> at 
> sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:144)
> at 
> sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
> at java.nio.file.Files.readAttributes(Files.java:1737)
> at java.nio.file.FileTreeWalker.getAttributes(FileTreeWalker.java:219)
> at java.nio.file.FileTreeWalker.visit(FileTreeWalker.java:276)
> at java.nio.file.FileTreeWalker.next(FileTreeWalker.java:372)
> at java.nio.file.Files.walkFileTree(Files.java:2706)
> at java.nio.file.Files.walkFileTree(Files.java:2742)
> at org.apache.kafka.common.utils.Utils.delete(Utils.java:732)
> at kafka.log.Log.$anonfun$delete$2(Log.scala:2036)
> at 
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
> at kafka.log.Log.maybeHandleIOException(Log.scala:2343)
> at kafka.log.Log.delete(Log.scala:2030)
> at kafka.log.LogManager.deleteLogs(LogManager.scala:826)
> at kafka.log.LogManager.$anonfun$deleteLogs$6(LogManager.scala:840)
> at 
> kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:116)
> at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:65)
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> {code}
> So, the log-dir was marked as offline and ended up with KafkaServer crash 
> because the broker has only single log-dir.
> h2. Cause
> We also found below logs right before the NoSuchFileException.
>  
> {code:java}
> [2021-10-21 18:18:17,829] INFO Log for partition foo-bar-5 is r

[jira] [Assigned] (KAFKA-13459) MM2 should be able to add the source offset to the record header

2022-04-26 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13459?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass reassigned KAFKA-13459:
---

Assignee: Viktor Somogyi-Vass

> MM2 should be able to add the source offset to the record header
> 
>
> Key: KAFKA-13459
> URL: https://issues.apache.org/jira/browse/KAFKA-13459
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> MM2 could add the source offset to the record header to help with diagnostics 
> in some use-cases.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-10586) Full support for distributed mode in dedicated MirrorMaker 2.0 clusters

2022-04-26 Thread Viktor Somogyi-Vass (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10586?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass reassigned KAFKA-10586:
---

Assignee: Viktor Somogyi-Vass  (was: Daniel Urban)

> Full support for distributed mode in dedicated MirrorMaker 2.0 clusters
> ---
>
> Key: KAFKA-10586
> URL: https://issues.apache.org/jira/browse/KAFKA-10586
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker
>Reporter: Daniel Urban
>Assignee: Viktor Somogyi-Vass
>Priority: Major
>
> KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated 
> MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means 
> that with specific workloads, the dedicated MM2 cluster can become unable to 
> react to dynamic topic and group filter changes.
> (This occurs when after a rebalance operation, the leader node has no 
> MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is 
> stopped on the leader, meaning it cannot detect config changes by itself. 
> Followers still running the connector can detect config changes, but they 
> cannot query the leader for config updates.)
> Besides the REST support, config provider references should be evaluated 
> lazily in connector configurations.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] vamossagar12 commented on pull request #11211: KAFKA-12960: Enforcing strict retention time for WindowStore and Sess…

2022-04-26 Thread GitBox


vamossagar12 commented on PR #11211:
URL: https://github.com/apache/kafka/pull/11211#issuecomment-1109503519

   Thanks @showuon !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-04-26 Thread GitBox


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r858432879


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {
+
+final Map> 
consumedOffsetsPerTask = new HashMap<>();
+prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, 
consumedOffsetsPerTask);
+
+final Set dirtyTasks = new HashSet<>();
+try {
+taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+} catch (final TaskCorruptedException e) {
+log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+e.corruptedTasks());
+
+// If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
+dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+closeDirtyAndRevive(dirtyTasks, true);
+} catch (final RuntimeException e) {
+log.error("Exception caught while committing active tasks: " + 
consumedOffsetsPerTask.keySet(), e);
+activeTasksCommitException.compareAndSet(null, e);
+dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+}
+
+// for non-revoking active tasks, we should not enforce checkpoint
+// as it's EOS enabled in which case no checkpoint should be written 
while
+// the task is in RUNNING tate
+for (final Task task : activeTasksNeedCommit) {
+if (!dirtyTasks.contains(task)) {
+try {
+task.postCommit(false);

Review Comment:
   @showuon , can you plz review this as well whenever you get the chance? It's 
been open for a while.. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-04-26 Thread GitBox


showuon commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r858437939


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {
+
+final Map> 
consumedOffsetsPerTask = new HashMap<>();
+prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, 
consumedOffsetsPerTask);
+
+final Set dirtyTasks = new HashSet<>();
+try {
+taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+} catch (final TaskCorruptedException e) {
+log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+e.corruptedTasks());
+
+// If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
+dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+closeDirtyAndRevive(dirtyTasks, true);
+} catch (final RuntimeException e) {
+log.error("Exception caught while committing active tasks: " + 
consumedOffsetsPerTask.keySet(), e);
+activeTasksCommitException.compareAndSet(null, e);
+dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+}
+
+// for non-revoking active tasks, we should not enforce checkpoint
+// as it's EOS enabled in which case no checkpoint should be written 
while
+// the task is in RUNNING tate
+for (final Task task : activeTasksNeedCommit) {
+if (!dirtyTasks.contains(task)) {
+try {
+task.postCommit(false);

Review Comment:
   This is the only question I have for this PR, and it needs other experts to 
provide comments.  cc @guozhangwang @ableegoldman



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] vamossagar12 commented on a diff in pull request #11433: KAFKA-13295: Avoiding Transation timeouts arising due to long restora…

2022-04-26 Thread GitBox


vamossagar12 commented on code in PR #11433:
URL: https://github.com/apache/kafka/pull/11433#discussion_r858441929


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -262,6 +262,42 @@ private void closeDirtyAndRevive(final Collection 
taskWithChangelogs, fina
 }
 }
 
+private void commitActiveTasks(final Set activeTasksNeedCommit, 
final AtomicReference activeTasksCommitException) {
+
+final Map> 
consumedOffsetsPerTask = new HashMap<>();
+prepareCommitAndAddOffsetsToMap(activeTasksNeedCommit, 
consumedOffsetsPerTask);
+
+final Set dirtyTasks = new HashSet<>();
+try {
+taskExecutor.commitOffsetsOrTransaction(consumedOffsetsPerTask);
+} catch (final TaskCorruptedException e) {
+log.warn("Some tasks were corrupted when trying to commit offsets, 
these will be cleaned and revived: {}",
+e.corruptedTasks());
+
+// If we hit a TaskCorruptedException it must be EOS, just handle 
the cleanup for those corrupted tasks right here
+dirtyTasks.addAll(tasks.tasks(e.corruptedTasks()));
+closeDirtyAndRevive(dirtyTasks, true);
+} catch (final RuntimeException e) {
+log.error("Exception caught while committing active tasks: " + 
consumedOffsetsPerTask.keySet(), e);
+activeTasksCommitException.compareAndSet(null, e);
+dirtyTasks.addAll(consumedOffsetsPerTask.keySet());
+}
+
+// for non-revoking active tasks, we should not enforce checkpoint
+// as it's EOS enabled in which case no checkpoint should be written 
while
+// the task is in RUNNING tate
+for (final Task task : activeTasksNeedCommit) {
+if (!dirtyTasks.contains(task)) {
+try {
+task.postCommit(false);

Review Comment:
   got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on pull request #12064: KAFKA-12841: Remove an additional call of onAcknowledgement

2022-04-26 Thread GitBox


cadonna commented on PR #12064:
URL: https://github.com/apache/kafka/pull/12064#issuecomment-1109538143

   @junrao @philipnee Thank you for pinging me! 
   Since I have already created RC0 which is under voting I actually only 
accept regressions and exceptionally severe bugs that justify the creation of a 
new release candidate. This bug seems to have been around for quite some time 
and it does not seem to be a regression or exceptionally severe. So I would 
prefer not to cherry pick this PR to 3.2 before 3.2.0 is released. After the 
release, you should cherry-pick this PR to 3.2 in order to include it in the 
bugfix release 3.2.1.
   If you still think that this PR should be included in 3.2.0 please raise it 
on the voting thread for RC0 of 3.2.0 on the dev mailing list.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dengziming commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-04-26 Thread GitBox


dengziming commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r858479425


##
core/src/main/scala/kafka/api/ApiVersion.scala:
##
@@ -1,500 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.api
-
-import org.apache.kafka.clients.NodeApiVersions
-import org.apache.kafka.common.config.ConfigDef.Validator
-import org.apache.kafka.common.config.ConfigException
-import org.apache.kafka.common.feature.{Features, FinalizedVersionRange, 
SupportedVersionRange}
-import org.apache.kafka.common.message.ApiMessageType.ListenerType
-import org.apache.kafka.common.record.RecordVersion
-import org.apache.kafka.common.requests.ApiVersionsResponse
-
-/**
- * This class contains the different Kafka versions.
- * Right now, we use them for upgrades - users can configure the version of 
the API brokers will use to communicate between themselves.
- * This is only for inter-broker communications - when communicating with 
clients, the client decides on the API version.
- *
- * Note that the ID we initialize for each version is important.
- * We consider a version newer than another, if it has a higher ID (to avoid 
depending on lexicographic order)
- *
- * Since the api protocol may change more than once within the same release 
and to facilitate people deploying code from
- * trunk, we have the concept of internal versions (first introduced during 
the 0.10.0 development cycle). For example,
- * the first time we introduce a version change in a release, say 0.10.0, we 
will add a config value "0.10.0-IV0" and a
- * corresponding case object KAFKA_0_10_0-IV0. We will also add a config value 
"0.10.0" that will be mapped to the
- * latest internal version object, which is KAFKA_0_10_0-IV0. When we change 
the protocol a second time while developing
- * 0.10.0, we will add a new config value "0.10.0-IV1" and a corresponding 
case object KAFKA_0_10_0-IV1. We will change
- * the config value "0.10.0" to map to the latest internal version object 
KAFKA_0_10_0-IV1. The config value of
- * "0.10.0-IV0" is still mapped to KAFKA_0_10_0-IV0. This way, if people are 
deploying from trunk, they can use
- * "0.10.0-IV0" and "0.10.0-IV1" to upgrade one internal version at a time. 
For most people who just want to use
- * released version, they can use "0.10.0" when upgrading to the 0.10.0 
release.
- */

Review Comment:
   Should we also add these doc comments to MetadataVersion



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13716) add tests for `DeleteRecordsCommand` class

2022-04-26 Thread Divij Vaidya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528047#comment-17528047
 ] 

Divij Vaidya commented on KAFKA-13716:
--

Hey folks

[https://github.com/apache/kafka/pull/12087] adds tests that cover the 
request/response for DeleteRecords API. However, my PR does not validate the 
thin layer of additional command line parsing functionality provided in 
DeleteRecordsCommand. I think it is worthwhile to add additional tests for 
`DeleteRecordsCommand` class at 
[https://github.com/apache/kafka/tree/trunk/core/src/test/scala/unit/kafka/admin]
 

> add tests for `DeleteRecordsCommand` class
> --
>
> Key: KAFKA-13716
> URL: https://issues.apache.org/jira/browse/KAFKA-13716
> Project: Kafka
>  Issue Type: Test
>  Components: tools
>Reporter: Luke Chen
>Assignee: Shivanjal Arora
>Priority: Major
>  Labels: Newbie, newbie
>
> Found there's no tests for `DeleteRecordsCommand` class, which is used in 
> `kafka-delete-records.sh`. We should add it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] LatentLag opened a new pull request, #12095: MINOR: Adding client Base64 serializer and deserializer

2022-04-26 Thread GitBox


LatentLag opened a new pull request, #12095:
URL: https://github.com/apache/kafka/pull/12095

   Adding client serialization for Base64 which enables two use cases. First, 
these allow messages encoded in base64 to not incur the encoding penalty of a 
larger file size. Second, binary encoded messages may be used more simply on 
the command line.
   
   This is a minor change. For testing the class files were added to an 
existing `kafka-client.jar` and exercised with `kafka-console-producer.sh` and 
`kafka-console-consumer.sh` in Windows and Ubuntu 20.04.
   
   This is my own work and I license it to this project.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] aybefox commented on pull request #9947: KAFKA-12190: Fix setting of file permissions on non-POSIX filesystems

2022-04-26 Thread GitBox


aybefox commented on PR #9947:
URL: https://github.com/apache/kafka/pull/9947#issuecomment-1109626111

   @notme159 
   Same here :( Did you find out something?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] dajac commented on pull request #12095: MINOR: Adding client Base64 serializer and deserializer

2022-04-26 Thread GitBox


dajac commented on PR #12095:
URL: https://github.com/apache/kafka/pull/12095#issuecomment-1109648747

   @LatentLag Thanks for the PR. I think that we would need a KIP for adding 
those because I suppose that we consider them as public interfaces. The process 
is described here:  
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] viktorsomogyi commented on pull request #11492: KAFKA-13452: MM2 shouldn't checkpoint when offset mapping is unavailable

2022-04-26 Thread GitBox


viktorsomogyi commented on PR #11492:
URL: https://github.com/apache/kafka/pull/11492#issuecomment-1109725104

   @mimaison would you please review this?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13857) The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition

2022-04-26 Thread RivenSun (Jira)
RivenSun created KAFKA-13857:


 Summary: The listOffsets method of KafkaAdminClient should support 
returning logEndOffset of topicPartition
 Key: KAFKA-13857
 URL: https://issues.apache.org/jira/browse/KAFKA-13857
 Project: Kafka
  Issue Type: Bug
  Components: admin
Reporter: RivenSun


The server side currently handles the LIST_OFFSETS request process as follows:
{code:java}
KafkaApis.handleListOffsetRequest() ->

KafkaApis.handleListOffsetRequestV1AndAbove() ->

ReplicaManager.fetchOffsetForTimestamp() ->

Partition.fetchOffsetForTimestamp(){code}
 

In the last method above, it is obvious that when the client side does not pass 
the isolationLevel value, the server side supports returning 
localLog.logEndOffset.
{code:java}
val lastFetchableOffset = isolationLevel match {
  case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
  case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
  case None => localLog.logEndOffset
} 

{code}
 

 

KafkaAdminClient is an operation and maintenance management tool, which *should 
be different from the listOffsets-related methods (offsetsForTimes, 
beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be 
limited by the value of {color:#FF}isolationLevel {color}in the 
ListOffsetsOptions parameter.*



In the current KafkaAdminClient.listOffsets() method, both the AdminClient and 
the server consider isolationLevel as a required parameter:
1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
will be thrown when AdminClient executes listOffsets() method.
{code:java}
ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}

2) The current logic for converting isolationLevel on the server side has not 
yet handled the case where the user passes in a value that is neither 
READ_UNCOMMITTED nor READ_COMMITTED :
{code:java}
val isolationLevelOpt = if (isClientRequest)
  Some(offsetRequest.isolationLevel)
else
  None {code}
{code:java}
public IsolationLevel isolationLevel() {
return IsolationLevel.forId(data.isolationLevel());
} {code}
h1. 

Solution:

Added a new enum `NONE` in IsolationLevel, dedicated to 
AdminClient.listOffsets() method.

 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13857) The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition

2022-04-26 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13857:
-
Description: 
The server side currently handles the LIST_OFFSETS request process as follows:
{code:java}
KafkaApis.handleListOffsetRequest() ->

KafkaApis.handleListOffsetRequestV1AndAbove() ->

ReplicaManager.fetchOffsetForTimestamp() ->

Partition.fetchOffsetForTimestamp(){code}
 

In the last method above, it is obvious that when the client side does not pass 
the isolationLevel value, the server side supports returning 
localLog.logEndOffset.
{code:java}
val lastFetchableOffset = isolationLevel match {
  case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
  case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
  case None => localLog.logEndOffset
} 

{code}
 

 

KafkaAdminClient is an operation and maintenance management tool, which *should 
be different from the listOffsets-related methods (offsetsForTimes, 
beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be 
limited by the value of {color:#ff}isolationLevel {color}in the 
ListOffsetsOptions parameter.*

In the current KafkaAdminClient.listOffsets() method, both the AdminClient and 
the server consider isolationLevel as a required parameter:
1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
will be thrown when AdminClient executes listOffsets() method.
{code:java}
ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
2) The current logic for converting isolationLevel on the server side has not 
yet handled the case where the user passes in a value that is neither 
READ_UNCOMMITTED nor READ_COMMITTED :
{code:java}
val isolationLevelOpt = if (isClientRequest)
  Some(offsetRequest.isolationLevel)
else
  None {code}
{code:java}
public IsolationLevel isolationLevel() {
return IsolationLevel.forId(data.isolationLevel());
} {code}
h1.  
h2. Solution:

Added a new enum `NONE` in IsolationLevel, dedicated to 
AdminClient.listOffsets() method.

 

 

  was:
The server side currently handles the LIST_OFFSETS request process as follows:
{code:java}
KafkaApis.handleListOffsetRequest() ->

KafkaApis.handleListOffsetRequestV1AndAbove() ->

ReplicaManager.fetchOffsetForTimestamp() ->

Partition.fetchOffsetForTimestamp(){code}
 

In the last method above, it is obvious that when the client side does not pass 
the isolationLevel value, the server side supports returning 
localLog.logEndOffset.
{code:java}
val lastFetchableOffset = isolationLevel match {
  case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
  case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
  case None => localLog.logEndOffset
} 

{code}
 

 

KafkaAdminClient is an operation and maintenance management tool, which *should 
be different from the listOffsets-related methods (offsetsForTimes, 
beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be 
limited by the value of {color:#FF}isolationLevel {color}in the 
ListOffsetsOptions parameter.*



In the current KafkaAdminClient.listOffsets() method, both the AdminClient and 
the server consider isolationLevel as a required parameter:
1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
will be thrown when AdminClient executes listOffsets() method.
{code:java}
ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}

2) The current logic for converting isolationLevel on the server side has not 
yet handled the case where the user passes in a value that is neither 
READ_UNCOMMITTED nor READ_COMMITTED :
{code:java}
val isolationLevelOpt = if (isClientRequest)
  Some(offsetRequest.isolationLevel)
else
  None {code}
{code:java}
public IsolationLevel isolationLevel() {
return IsolationLevel.forId(data.isolationLevel());
} {code}
h1. 

Solution:

Added a new enum `NONE` in IsolationLevel, dedicated to 
AdminClient.listOffsets() method.

 

 

 Issue Type: Improvement  (was: Bug)

> The listOffsets method of KafkaAdminClient should support returning 
> logEndOffset of topicPartition
> --
>
> Key: KAFKA-13857
> URL: https://issues.apache.org/jira/browse/KAFKA-13857
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: RivenSun
>Priority: Major
>
> The server side currently handles the LIST_OFFSETS request process as follows:
> {code:java}
> KafkaApis.handleListOffsetRequest() ->
> KafkaApis.handleListOffsetRequestV1AndAbove() ->
> ReplicaManager.fetchOffsetForTimestamp() ->
> Partition.fetchOffsetForTimestamp(){code}
>  
> In the last method above, it is obvious that when the client side does not 
> pass the isolationLevel value, the server side supports returning 
> localL

[jira] [Commented] (KAFKA-13857) The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition

2022-04-26 Thread RivenSun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528170#comment-17528170
 ] 

RivenSun commented on KAFKA-13857:
--

Hi [~guozhang]  [~showuon]
Could you give some suggestions for this issue?
Thanks.

> The listOffsets method of KafkaAdminClient should support returning 
> logEndOffset of topicPartition
> --
>
> Key: KAFKA-13857
> URL: https://issues.apache.org/jira/browse/KAFKA-13857
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: RivenSun
>Priority: Major
>
> The server side currently handles the LIST_OFFSETS request process as follows:
> {code:java}
> KafkaApis.handleListOffsetRequest() ->
> KafkaApis.handleListOffsetRequestV1AndAbove() ->
> ReplicaManager.fetchOffsetForTimestamp() ->
> Partition.fetchOffsetForTimestamp(){code}
>  
> In the last method above, it is obvious that when the client side does not 
> pass the isolationLevel value, the server side supports returning 
> localLog.logEndOffset.
> {code:java}
> val lastFetchableOffset = isolationLevel match {
>   case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
>   case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
>   case None => localLog.logEndOffset
> } 
> {code}
>  
>  
> KafkaAdminClient is an operation and maintenance management tool, which 
> *should be different from the listOffsets-related methods (offsetsForTimes, 
> beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not 
> be limited by the value of {color:#ff}isolationLevel {color}in the 
> ListOffsetsOptions parameter.*
> In the current KafkaAdminClient.listOffsets() method, both the AdminClient 
> and the server consider isolationLevel as a required parameter:
> 1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
> will be thrown when AdminClient executes listOffsets() method.
> {code:java}
> ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
> 2) The current logic for converting isolationLevel on the server side has not 
> yet handled the case where the user passes in a value that is neither 
> READ_UNCOMMITTED nor READ_COMMITTED :
> {code:java}
> val isolationLevelOpt = if (isClientRequest)
>   Some(offsetRequest.isolationLevel)
> else
>   None {code}
> {code:java}
> public IsolationLevel isolationLevel() {
> return IsolationLevel.forId(data.isolationLevel());
> } {code}
> h1.  
> h2. Solution:
> Added a new enum `NONE` in IsolationLevel, dedicated to 
> AdminClient.listOffsets() method.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13716) add tests for `DeleteRecordsCommand` class

2022-04-26 Thread Richard Joerger (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13716?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528174#comment-17528174
 ] 

Richard Joerger commented on KAFKA-13716:
-

[~divijvaidya] , that clears that up for me. It's the actual CLI interface and 
not the DeleteRecords API. Thanks for chiming in [~showuon] . 

[~Shivanjal] , I see that this is assigned to you, are you still working on the 
tests? If you are, no worries but in the event you are not, please let me know. 
Thanks. 

> add tests for `DeleteRecordsCommand` class
> --
>
> Key: KAFKA-13716
> URL: https://issues.apache.org/jira/browse/KAFKA-13716
> Project: Kafka
>  Issue Type: Test
>  Components: tools
>Reporter: Luke Chen
>Assignee: Shivanjal Arora
>Priority: Major
>  Labels: Newbie, newbie
>
> Found there's no tests for `DeleteRecordsCommand` class, which is used in 
> `kafka-delete-records.sh`. We should add it.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] ijuma opened a new pull request, #12096: KAFKA-13794: Fix comparator of inflightBatchesBySequence in TransactionsManager (round 3)

2022-04-26 Thread GitBox


ijuma opened a new pull request, #12096:
URL: https://github.com/apache/kafka/pull/12096

   Conceptually, the ordering is defined by the producer id, producer epoch
   and the sequence number. A given `TopicPartitionEntry` has a single
   producer id and hence we only need to compare the other two.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13857) The listOffsets method of KafkaAdminClient should support returning logEndOffset of topicPartition

2022-04-26 Thread RivenSun (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

RivenSun updated KAFKA-13857:
-
Description: 
The server side currently handles the LIST_OFFSETS request process as follows:
{code:java}
KafkaApis.handleListOffsetRequest() ->

KafkaApis.handleListOffsetRequestV1AndAbove() ->

ReplicaManager.fetchOffsetForTimestamp() ->

Partition.fetchOffsetForTimestamp(){code}
 

In the last method above, it is obvious that when the client side does not pass 
the isolationLevel value, the server side supports returning 
localLog.logEndOffset.
{code:java}
val lastFetchableOffset = isolationLevel match {
  case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
  case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
  case None => localLog.logEndOffset
} 

{code}
 

 

KafkaAdminClient is an operation and maintenance management tool, which *should 
be different from the listOffsets-related methods (offsetsForTimes, 
beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be 
limited by the value of {color:#ff}isolationLevel {color}in the 
ListOffsetsOptions parameter.*

In the current KafkaAdminClient.listOffsets() method, both the AdminClient and 
the server consider isolationLevel as a required parameter:
1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
will be thrown when AdminClient executes listOffsets() method.
{code:java}
ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
2) The current logic for converting isolationLevel on the server side has not 
yet handled the case where the user passes in a value that is neither 
READ_UNCOMMITTED nor READ_COMMITTED :
{code:java}
val isolationLevelOpt = if (isClientRequest)
  Some(offsetRequest.isolationLevel)
else
  None {code}
{code:java}
public IsolationLevel isolationLevel() {
return IsolationLevel.forId(data.isolationLevel());
} {code}
h1.  
h2. Suggestion:

Added a new enum `NONE` in IsolationLevel, only dedicated to 
AdminClient.listOffsets() method.
This change may cause the highestSupportedVersion of 
ApiMessageType.LIST_OFFSETS to increase by one.

 

 

  was:
The server side currently handles the LIST_OFFSETS request process as follows:
{code:java}
KafkaApis.handleListOffsetRequest() ->

KafkaApis.handleListOffsetRequestV1AndAbove() ->

ReplicaManager.fetchOffsetForTimestamp() ->

Partition.fetchOffsetForTimestamp(){code}
 

In the last method above, it is obvious that when the client side does not pass 
the isolationLevel value, the server side supports returning 
localLog.logEndOffset.
{code:java}
val lastFetchableOffset = isolationLevel match {
  case Some(IsolationLevel.READ_COMMITTED) => localLog.lastStableOffset
  case Some(IsolationLevel.READ_UNCOMMITTED) => localLog.highWatermark
  case None => localLog.logEndOffset
} 

{code}
 

 

KafkaAdminClient is an operation and maintenance management tool, which *should 
be different from the listOffsets-related methods (offsetsForTimes, 
beginningOffsets, endOffsets) provided by KafkaConsumer,* *and it should not be 
limited by the value of {color:#ff}isolationLevel {color}in the 
ListOffsetsOptions parameter.*

In the current KafkaAdminClient.listOffsets() method, both the AdminClient and 
the server consider isolationLevel as a required parameter:
1) If AdminClient uses new ListOffsetsOptions(null), a NullPointerException 
will be thrown when AdminClient executes listOffsets() method.
{code:java}
ListOffsetsRequest.Builder(...) -> isolationLevel.id(){code}
2) The current logic for converting isolationLevel on the server side has not 
yet handled the case where the user passes in a value that is neither 
READ_UNCOMMITTED nor READ_COMMITTED :
{code:java}
val isolationLevelOpt = if (isClientRequest)
  Some(offsetRequest.isolationLevel)
else
  None {code}
{code:java}
public IsolationLevel isolationLevel() {
return IsolationLevel.forId(data.isolationLevel());
} {code}
h1.  
h2. Solution:

Added a new enum `NONE` in IsolationLevel, dedicated to 
AdminClient.listOffsets() method.

 

 


> The listOffsets method of KafkaAdminClient should support returning 
> logEndOffset of topicPartition
> --
>
> Key: KAFKA-13857
> URL: https://issues.apache.org/jira/browse/KAFKA-13857
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: RivenSun
>Priority: Major
>
> The server side currently handles the LIST_OFFSETS request process as follows:
> {code:java}
> KafkaApis.handleListOffsetRequest() ->
> KafkaApis.handleListOffsetRequestV1AndAbove() ->
> ReplicaManager.fetchOffsetForTimestamp() ->
> Partition.fetchOffsetForTimestamp(){code}
>  
> In the last method above, it is obvious that when the client side does not 
> pass t

[GitHub] [kafka] dengziming commented on a diff in pull request #11951: KAFKA-13836: Improve KRaft broker heartbeat logic

2022-04-26 Thread GitBox


dengziming commented on code in PR #11951:
URL: https://github.com/apache/kafka/pull/11951#discussion_r858760908


##
core/src/main/scala/kafka/server/BrokerLifecycleManager.scala:
##
@@ -333,7 +353,11 @@ class BrokerLifecycleManager(val config: KafkaConfig,
   }
 
   private def sendBrokerHeartbeat(): Unit = {
-val metadataOffset = _highestMetadataOffsetProvider()
+val metadataOffset = if (readyToUnfence) {

Review Comment:
   Currently, this offset is only used when starting the broker, first to 
ensure the broker has catchup so we can start initialization, second to ensure 
the broker has catchup so we can unfenced, if we use the same metadata offset 
here, it seems unnecessary to ensure twice, but It's indeed peculiar to have an 
offset going back, maybe adding a new field to the `HeartbeatRequest`, or just 
leave it as it is now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-04-26 Thread GitBox


mumrah commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r858801361


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.ConfigDef.Validator;
+import org.apache.kafka.common.config.ConfigException;
+
+public class MetadataVersionValidator implements Validator {
+
+@Override
+public void ensureValid(String name, Object value) {
+try {
+MetadataVersion.fromVersionString(value.toString());
+} catch (IllegalArgumentException e) {
+throw new ConfigException(name, value.toString(), e.getMessage());
+}
+}
+
+@Override
+public String toString() {

Review Comment:
   Yea, for reference 
https://kafka.apache.org/27/documentation.html#brokerconfigs_inter.broker.protocol.version



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-04-26 Thread GitBox


mumrah commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r858801361


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersionValidator.java:
##
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.ConfigDef.Validator;
+import org.apache.kafka.common.config.ConfigException;
+
+public class MetadataVersionValidator implements Validator {
+
+@Override
+public void ensureValid(String name, Object value) {
+try {
+MetadataVersion.fromVersionString(value.toString());
+} catch (IllegalArgumentException e) {
+throw new ConfigException(name, value.toString(), e.getMessage());
+}
+}
+
+@Override
+public String toString() {

Review Comment:
   Yea, for reference 
https://kafka.apache.org/27/documentation.html#brokerconfigs_inter.broker.protocol.version.
   
   Do we need the `distinct` call in the streaming expression? The enum values 
are already unique



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] joel-hamill commented on pull request #12094: MINOR: fix html generation syntax errors

2022-04-26 Thread GitBox


joel-hamill commented on PR #12094:
URL: https://github.com/apache/kafka/pull/12094#issuecomment-1109970872

   cc @ijuma @guozhangwang 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] joel-hamill commented on pull request #11874: Fix typos in configuration docs

2022-04-26 Thread GitBox


joel-hamill commented on PR #11874:
URL: https://github.com/apache/kafka/pull/11874#issuecomment-1109972244

   cc @ijuma 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma opened a new pull request, #12097: MINOR: Make TopicPartitionBookkeeper and TopicPartitionEntry top level

2022-04-26 Thread GitBox


ijuma opened a new pull request, #12097:
URL: https://github.com/apache/kafka/pull/12097

   This is the first step towards refactoring the `TransactionManager` so
   that it's easier to understand and test. The high level idea is to push
   down behavior to `TopicPartitionEntry` and `TopicPartitionBookkeeper`
   and to encapsulate the state so that the mutations can only be done via
   the appropriate methods.
   
   Inner classes have no mechanism to limit access from the outer class,
   which presents a challenge when mutability is widespread, like it is the
   case here.
   
   As a first step, we make `TopicPartitionBookkeeper` and
   `TopicPartitionEntry` top level and rename them to make their intended
   usage clear. To make the review easier, we don't change anything else
   except access changes required for the code to compile.
   
   The next PR will contain the rest of the refactoring.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #12064: KAFKA-12841: Remove an additional call of onAcknowledgement

2022-04-26 Thread GitBox


junrao commented on PR #12064:
URL: https://github.com/apache/kafka/pull/12064#issuecomment-1109979635

   @cadonna : Thanks for the response. This bug was actually introduced in 
3.2.0. I will ping the mailing list.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #12089: MINOR: Rename `AlterIsrManager` to `AlterPartitionManager`

2022-04-26 Thread GitBox


hachikuji merged PR #12089:
URL: https://github.com/apache/kafka/pull/12089


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Commented] (KAFKA-13647) RocksDb metrics 'number-open-files' is not correct

2022-04-26 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528288#comment-17528288
 ] 

Matthias J. Sax commented on KAFKA-13647:
-

Should we close this ticket? \cc [~guozhang] [~cadonna] 

> RocksDb metrics 'number-open-files' is not correct
> --
>
> Key: KAFKA-13647
> URL: https://issues.apache.org/jira/browse/KAFKA-13647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Sylvain Le Gouellec
>Priority: Major
> Attachments: image-2022-02-07-16-06-25-304.png, 
> image-2022-02-07-16-06-39-821.png, image-2022-02-07-16-06-53-164.png
>
>
> We were looking at RocksDB metrics and noticed that the {{number-open-files}} 
> metric behaves like a counter, rather than a gauge. 
> Looking at the code, we think there is a small error in the type of metric 
> for that specific mbean (should be a value metric rather than a sum metric).
> See [ 
> https://github.com/apache/kafka/blob/ca5d6f9229c170beb23809159113037f05a1120f/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Commented] (KAFKA-13840) KafkaConsumer is unable to recover connection to group coordinator after commitOffsetsAsync exception

2022-04-26 Thread Kyle R Stehbens (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528300#comment-17528300
 ] 

Kyle R Stehbens commented on KAFKA-13840:
-

Gotcha, i'll give these new versions a try on one of our less important jobs as 
soon as they are released and report back here with the results.

> KafkaConsumer is unable to recover connection to group coordinator after 
> commitOffsetsAsync exception
> -
>
> Key: KAFKA-13840
> URL: https://issues.apache.org/jira/browse/KAFKA-13840
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, consumer
>Affects Versions: 2.6.1, 3.1.0, 2.7.2, 2.8.1, 3.0.0
>Reporter: Kyle R Stehbens
>Assignee: Luke Chen
>Priority: Major
>
> Hi, I've discovered an issue with the java Kafka client (consumer) whereby a 
> timeout or any other retry-able exception triggered during an async offset 
> commit, renders the client unable to recover its group co-coordinator and 
> leaves the client in a broken state.
>  
> I first encountered this using v2.8.1 of the java client, and after going 
> through the code base for all versions of the client, have found it affects 
> all versions of the client from 2.6.1 onward.
> I also confirmed that by rolling back to 2.5.1, the issue is not present.
>  
> The issue stems from changes to how the FindCoordinatorResponseHandler in 
> 2.5.1 used to call clearFindCoordinatorFuture(); on both success and failure 
> here:
> [https://github.com/apache/kafka/blob/0efa8fb0f4c73d92b6e55a112fa45417a67a7dc2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L783]
>  
> In all future version of the client this call is not made:
> [https://github.com/apache/kafka/blob/839b886f9b732b151e1faeace7303c80641c08c4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java#L838]
>  
> What this results in, is when the KafkaConsumer makes a call to 
> coordinator.commitOffsetsAsync(...), if an error occurs such that the 
> coordinator is unavailable here:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1007]
>  
> then the client will try call:
> [https://github.com/apache/kafka/blob/c5077c679c372589215a1b58ca84360c683aa6e8/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L1017]
> However this will never be able to succeed as it perpetually returns a 
> reference to a failed future: findCoordinatorFuture that is never cleared out.
>  
> This manifests in all future calls to commitOffsetsAsync() throwing a 
> "coordinator unavailable" exception forever going forward after any 
> retry-able exception causes the coordinator to close. 
> Note we discovered this when we upgraded the kafka client in our Flink 
> consumers from 2.4.1 to 2.8.1 and subsequently needed to downgrade the 
> client. We noticed this occurring in our non-flink java consumers too running 
> 3.x client versions.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] cadonna commented on pull request #12064: KAFKA-12841: Remove an additional call of onAcknowledgement

2022-04-26 Thread GitBox


cadonna commented on PR #12064:
URL: https://github.com/apache/kafka/pull/12064#issuecomment-1110039917

   @junrao Now I see how this issue was introduced in 3.2.0. The fix for the 
bug described in KAFKA-12841 introduced it, right? I initially understood that 
this PR is the fix for the bug described in KAFKA-12841 which dates back to 2.6.
   
   I think that classifies as a regression. I will cherry-pick this PR to 3.2.0 
and create a new release candidate.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-04-26 Thread GitBox


ijuma commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r858974266


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+IBP_0_8_0(-1),
+IBP_0_8_1(-1),
+IBP_0_8_2(-1),
+IBP_0_9_0(-1),
+
+// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+IBP_0_10_0_IV0(-1),
+
+// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+IBP_0_10_0_IV1(-1),
+
+// introduced for JoinGroup protocol change in KIP-62
+IBP_0_10_1_IV0(-1),
+
+// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+IBP_0_10_1_IV1(-1),
+
+// introduced ListOffsetRequest v1 in KIP-79
+IBP_0_10_1_IV2(-1),
+
+// introduced UpdateMetadataRequest v3 in KIP-103
+IBP_0_10_2_IV0(-1),
+
+// KIP-98 (idempotent and transactional producer support)
+IBP_0_11_0_IV0(-1),
+
+// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+IBP_0_11_0_IV1(-1),
+
+// Introduced leader epoch fetches to the replica fetcher via KIP-101
+IBP_0_11_0_IV2(-1),
+
+// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+IBP_1_0_IV0(-1),
+
+// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+// and KafkaStorageException for fetch requests.
+IBP_1_1_IV0(-1),
+
+// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+IBP_2_0_IV0(-1),
+
+// Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+IBP_2_0_IV1(-1),
+
+// Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+IBP_2_1_IV0(-1),
+
+// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+IBP_2_1_IV1(-1),
+
+// Support ZStandard Compression Codec (KIP-110)
+IBP_2_1_IV2(-1),
+
+// Introduced broker generation (KIP-380), and
+// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+IBP_2_2_IV0(-1),
+
+// New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+IBP_2_2_IV1(-1),
+
+// Introduced static membership.
+IBP_2_3_IV0(-1),
+
+// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+IBP_2_3_IV1(-1),
+
+// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+IBP_2_4_IV0(-1),
+
+// Flexible version support in inter-broker APIs
+IBP_2_4_IV1(-1),
+
+// No new APIs, equivalent to 2.4-IV1
+IBP_2_5_IV0(-1),
+
+// Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+IBP_2_6_IV0(-1),
+
+// Introduced feature versioning support (KIP-584)
+IBP_2_7_IV0(-1),
+
+// Bup Fetch protocol for Raft protocol (KIP-595)
+IBP_2_7_IV1(-1),
+
+// Introduced AlterPartition (KIP-497)
+IBP_2_7_IV2(-1),
+
+// Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+IBP_2_8_IV0(-1),
+
+// Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+IBP_2_8_IV1(-1),
+
+// Introduce AllocateProducerIds (KIP-730)
+IBP_3_0_IV0(1),
+
+// Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+// Assume message format version is 3.0 (KIP-724)
+IBP_3_0_IV1(2),
+
+// Adds topic IDs to Fetch requests/responses (KIP-516)
+IBP_3_1_IV0(3),
+
+// Support for leader recovery for unclean leader election (KIP-704)
+IBP_3_2_IV0(4),
+
+// KRaft GA

Review Comment:
   We shouldn't have this until we h

[GitHub] [kafka] ijuma commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-04-26 Thread GitBox


ijuma commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r858975278


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+IBP_0_8_0(-1),
+IBP_0_8_1(-1),
+IBP_0_8_2(-1),
+IBP_0_9_0(-1),
+
+// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+IBP_0_10_0_IV0(-1),
+
+// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+IBP_0_10_0_IV1(-1),
+
+// introduced for JoinGroup protocol change in KIP-62
+IBP_0_10_1_IV0(-1),
+
+// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+IBP_0_10_1_IV1(-1),
+
+// introduced ListOffsetRequest v1 in KIP-79
+IBP_0_10_1_IV2(-1),
+
+// introduced UpdateMetadataRequest v3 in KIP-103
+IBP_0_10_2_IV0(-1),
+
+// KIP-98 (idempotent and transactional producer support)
+IBP_0_11_0_IV0(-1),
+
+// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+IBP_0_11_0_IV1(-1),
+
+// Introduced leader epoch fetches to the replica fetcher via KIP-101
+IBP_0_11_0_IV2(-1),
+
+// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+IBP_1_0_IV0(-1),
+
+// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+// and KafkaStorageException for fetch requests.
+IBP_1_1_IV0(-1),
+
+// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+IBP_2_0_IV0(-1),
+
+// Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+IBP_2_0_IV1(-1),
+
+// Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+IBP_2_1_IV0(-1),
+
+// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+IBP_2_1_IV1(-1),
+
+// Support ZStandard Compression Codec (KIP-110)
+IBP_2_1_IV2(-1),
+
+// Introduced broker generation (KIP-380), and
+// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+IBP_2_2_IV0(-1),
+
+// New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+IBP_2_2_IV1(-1),
+
+// Introduced static membership.
+IBP_2_3_IV0(-1),
+
+// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+IBP_2_3_IV1(-1),
+
+// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+IBP_2_4_IV0(-1),
+
+// Flexible version support in inter-broker APIs
+IBP_2_4_IV1(-1),
+
+// No new APIs, equivalent to 2.4-IV1
+IBP_2_5_IV0(-1),
+
+// Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+IBP_2_6_IV0(-1),
+
+// Introduced feature versioning support (KIP-584)
+IBP_2_7_IV0(-1),
+
+// Bup Fetch protocol for Raft protocol (KIP-595)
+IBP_2_7_IV1(-1),
+
+// Introduced AlterPartition (KIP-497)
+IBP_2_7_IV2(-1),
+
+// Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+IBP_2_8_IV0(-1),
+
+// Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+IBP_2_8_IV1(-1),
+
+// Introduce AllocateProducerIds (KIP-730)
+IBP_3_0_IV0(1),
+
+// Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+// Assume message format version is 3.0 (KIP-724)
+IBP_3_0_IV1(2),
+
+// Adds topic IDs to Fetch requests/responses (KIP-516)
+IBP_3_1_IV0(3),
+
+// Support for leader recovery for unclean leader election (KIP-704)
+IBP_3_2_IV0(4),
+
+// KRaft GA
+IBP_3_3_IV0(5);
+
+private final Optional me

[GitHub] [kafka] ijuma commented on a diff in pull request #12052: KAFKA-13799: Improve documentation for Kafka zero-copy

2022-04-26 Thread GitBox


ijuma commented on code in PR #12052:
URL: https://github.com/apache/kafka/pull/12052#discussion_r858994743


##
docs/design.html:
##
@@ -125,6 +125,9 @@ 
 
 This combination of pagecache and sendfile means that on a Kafka cluster 
where the consumers are mostly caught up you will see no read activity on the 
disks whatsoever as they will be serving data entirely from cache.
 
+TLS/SSL libraries operate at the user space (in-kernel 
SSL_sendfile is currently not supported by Kafka). Due to this 
restriction, sendfile could not be used when transport layer 
enables SSL protocol. For enabling

Review Comment:
   Nit: "could not be used when transport layer enables SSL protocol" -> "is 
not be used when SSL is enabledl"



##
docs/design.html:
##
@@ -125,6 +125,9 @@ 
 
 This combination of pagecache and sendfile means that on a Kafka cluster 
where the consumers are mostly caught up you will see no read activity on the 
disks whatsoever as they will be serving data entirely from cache.
 
+TLS/SSL libraries operate at the user space (in-kernel 
SSL_sendfile is currently not supported by Kafka). Due to this 
restriction, sendfile could not be used when transport layer 
enables SSL protocol. For enabling

Review Comment:
   Nit: "could not be used when transport layer enables SSL protocol" -> "is 
not be used when SSL is enabled"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] junrao commented on pull request #12064: KAFKA-12841: Remove an additional call of onAcknowledgement

2022-04-26 Thread GitBox


junrao commented on PR #12064:
URL: https://github.com/apache/kafka/pull/12064#issuecomment-1110094353

   Thanks, @cadonna. I will mark the jira as fixed in 3.2.0 then.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Resolved] (KAFKA-13448) Kafka Producer Client Callback behaviour does not align with Javadoc

2022-04-26 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-13448.
-
Fix Version/s: 3.2.0
 Assignee: Philip Nee
   Resolution: Fixed

merged [https://github.com/apache/kafka/pull/11689] and a followup fix 
[https://github.com/apache/kafka/pull/12064] to trunk and 3.2 branch.

> Kafka Producer Client Callback behaviour does not align with Javadoc
> 
>
> Key: KAFKA-13448
> URL: https://issues.apache.org/jira/browse/KAFKA-13448
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.1.0
>Reporter: Seamus O Ceanainn
>Assignee: Philip Nee
>Priority: Minor
> Fix For: 3.2.0
>
>
> In PR [#4188|https://github.com/apache/kafka/pull/4188] as part of 
> KAFKA-6180, a breaking change was accidentally introduced in the behaviour of 
> Callbacks for the producer client.
> Previously, whenever an exception was thrown when producing an event, the 
> value for 'metadata' passed to the Callback.onCompletion method was always 
> null. In PR [#4188|https://github.com/apache/kafka/pull/4188], in one part of 
> the code where Callback.onCompletion is called, the behaviour was changed so 
> that instead of passing a null value for metadata, a 'placeholder' value was 
> provided instead (see 
> [here|https://github.com/apache/kafka/pull/4188/files#diff-42d8f5166459ee28f201ff9cec0080fc7845544a0089ac9e8f3e16864cc1193eR1196]
>  and 
> [here|https://github.com/apache/kafka/pull/4188/files#diff-42d8f5166459ee28f201ff9cec0080fc7845544a0089ac9e8f3e16864cc1193eR1199]).
>   This placeholder contained only topic and partition information, and with 
> all other fields set to '-1'.
> This change only impacted a subset of exceptions, so that in the case of 
> ApiExceptions metadata would still be null (see 
> [here|https://github.com/apache/kafka/commit/aa42a11dfd99ee9ab24d2e9a7521ef1c97ae1ff4#diff-42d8f5166459ee28f201ff9cec0080fc7845544a0089ac9e8f3e16864cc1193eR843]),
>  but for all other exceptions the placeholder value would be used. The 
> behaviour at the time of writing remains the same.
> This issue was first reported in KAFKA-7412 when a user first noticed the 
> difference between the documented behaviour of Callback.onCompletion and the 
> implemented behaviour.
> At the time it was assumed that the behaviour when errors occur was to always 
> provide a placeholder metadata value to Callback.onCompletion, and the 
> documentation was updated at that time to reflect this assumption in [PR 
> #5798|https://github.com/apache/kafka/pull/5798]. The documentation now 
> states that when an exception occurs that the method will be called with an 
> empty metadata value (see 
> [here|https://github.com/apache/kafka/blob/3.1/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java#L30-L31]).
>  However, there is still one case where Callback.onCompletion is called with 
> a null value for metadata (see 
> [here|https://github.com/apache/kafka/blob/3.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L1002]),
>  so there is still a discrepancy between the documented behaviour and the 
> implementation of Callback.onCompletion.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Assigned] (KAFKA-12841) NPE from the provided metadata in client callback in case of ApiException

2022-04-26 Thread Jun Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao reassigned KAFKA-12841:
---

Fix Version/s: 3.2.0
 Assignee: Philip Nee  (was: Kirk True)
   Resolution: Fixed

merged [https://github.com/apache/kafka/pull/11689] and a followup fix 
[https://github.com/apache/kafka/pull/12064] to trunk and 3.2 branch.

> NPE from the provided metadata in client callback in case of ApiException
> -
>
> Key: KAFKA-12841
> URL: https://issues.apache.org/jira/browse/KAFKA-12841
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.6.0
> Environment: Prod
>Reporter: Avi Youkhananov
>Assignee: Philip Nee
>Priority: Major
> Fix For: 3.2.0
>
> Attachments: NPE.production
>
>
> 1.
> org.apache.kafka.clients.producer.Callback interface has method 
> onCompletion(...)
> Which says as part of the documentation :
> *The metadata for the record that was sent (i.e. the partition and offset). 
> *An empty metadata with -1 value for all fields* except for topicPartition 
> will be returned if an error occurred.
> We got an NPE from doSend(...) method in 
> org.apache.kafka.clients.producer.KafkaProducer 
> Which can occur in case ApiException was thrown ...
> In case of ApiException it uses the regular callback instead of 
> InterceptorCallback which also may cover the NPE.
> 2. More over RecordMetadata has method partition() which return int but can 
> also throw NPE because TopicPartition might be null.
> Stack trace attached.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] ahuang98 commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-04-26 Thread GitBox


ahuang98 commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r859016703


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+IBP_0_8_0(-1),
+IBP_0_8_1(-1),
+IBP_0_8_2(-1),
+IBP_0_9_0(-1),
+
+// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+IBP_0_10_0_IV0(-1),
+
+// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+IBP_0_10_0_IV1(-1),
+
+// introduced for JoinGroup protocol change in KIP-62
+IBP_0_10_1_IV0(-1),
+
+// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+IBP_0_10_1_IV1(-1),
+
+// introduced ListOffsetRequest v1 in KIP-79
+IBP_0_10_1_IV2(-1),
+
+// introduced UpdateMetadataRequest v3 in KIP-103
+IBP_0_10_2_IV0(-1),
+
+// KIP-98 (idempotent and transactional producer support)
+IBP_0_11_0_IV0(-1),
+
+// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+IBP_0_11_0_IV1(-1),
+
+// Introduced leader epoch fetches to the replica fetcher via KIP-101
+IBP_0_11_0_IV2(-1),
+
+// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+IBP_1_0_IV0(-1),
+
+// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+// and KafkaStorageException for fetch requests.
+IBP_1_1_IV0(-1),
+
+// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+IBP_2_0_IV0(-1),
+
+// Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+IBP_2_0_IV1(-1),
+
+// Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+IBP_2_1_IV0(-1),
+
+// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+IBP_2_1_IV1(-1),
+
+// Support ZStandard Compression Codec (KIP-110)
+IBP_2_1_IV2(-1),
+
+// Introduced broker generation (KIP-380), and
+// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+IBP_2_2_IV0(-1),
+
+// New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+IBP_2_2_IV1(-1),
+
+// Introduced static membership.
+IBP_2_3_IV0(-1),
+
+// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+IBP_2_3_IV1(-1),
+
+// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+IBP_2_4_IV0(-1),
+
+// Flexible version support in inter-broker APIs
+IBP_2_4_IV1(-1),
+
+// No new APIs, equivalent to 2.4-IV1
+IBP_2_5_IV0(-1),
+
+// Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+IBP_2_6_IV0(-1),
+
+// Introduced feature versioning support (KIP-584)
+IBP_2_7_IV0(-1),
+
+// Bup Fetch protocol for Raft protocol (KIP-595)
+IBP_2_7_IV1(-1),
+
+// Introduced AlterPartition (KIP-497)
+IBP_2_7_IV2(-1),
+
+// Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+IBP_2_8_IV0(-1),
+
+// Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+IBP_2_8_IV1(-1),
+
+// Introduce AllocateProducerIds (KIP-730)
+IBP_3_0_IV0(1),
+
+// Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+// Assume message format version is 3.0 (KIP-724)
+IBP_3_0_IV1(2),
+
+// Adds topic IDs to Fetch requests/responses (KIP-516)
+IBP_3_1_IV0(3),
+
+// Support for leader recovery for unclean leader election (KIP-704)
+IBP_3_2_IV0(4),
+
+// KRaft GA
+IBP_3_3_IV0(5);
+
+private final Optional

[jira] [Resolved] (KAFKA-13647) RocksDb metrics 'number-open-files' is not correct

2022-04-26 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-13647.
---
Resolution: Incomplete

I resolved the ticket for now as incomplete, since the streams code cannot 
alone fix the issue, since it's on the rocksDB side to fix.

> RocksDb metrics 'number-open-files' is not correct
> --
>
> Key: KAFKA-13647
> URL: https://issues.apache.org/jira/browse/KAFKA-13647
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 3.0.0
>Reporter: Sylvain Le Gouellec
>Priority: Major
> Attachments: image-2022-02-07-16-06-25-304.png, 
> image-2022-02-07-16-06-39-821.png, image-2022-02-07-16-06-53-164.png
>
>
> We were looking at RocksDB metrics and noticed that the {{number-open-files}} 
> metric behaves like a counter, rather than a gauge. 
> Looking at the code, we think there is a small error in the type of metric 
> for that specific mbean (should be a value metric rather than a sum metric).
> See [ 
> https://github.com/apache/kafka/blob/ca5d6f9229c170beb23809159113037f05a1120f/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482|https://github.com/apache/kafka/blob/99b9b3e84f4e98c3f07714e1de6a139a004cbc5b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetrics.java#L482]



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] mumrah commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-04-26 Thread GitBox


mumrah commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r859055329


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+IBP_0_8_0(-1),
+IBP_0_8_1(-1),
+IBP_0_8_2(-1),
+IBP_0_9_0(-1),
+
+// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+IBP_0_10_0_IV0(-1),
+
+// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+IBP_0_10_0_IV1(-1),
+
+// introduced for JoinGroup protocol change in KIP-62
+IBP_0_10_1_IV0(-1),
+
+// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+IBP_0_10_1_IV1(-1),
+
+// introduced ListOffsetRequest v1 in KIP-79
+IBP_0_10_1_IV2(-1),
+
+// introduced UpdateMetadataRequest v3 in KIP-103
+IBP_0_10_2_IV0(-1),
+
+// KIP-98 (idempotent and transactional producer support)
+IBP_0_11_0_IV0(-1),
+
+// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+IBP_0_11_0_IV1(-1),
+
+// Introduced leader epoch fetches to the replica fetcher via KIP-101
+IBP_0_11_0_IV2(-1),
+
+// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+IBP_1_0_IV0(-1),
+
+// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+// and KafkaStorageException for fetch requests.
+IBP_1_1_IV0(-1),
+
+// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+IBP_2_0_IV0(-1),
+
+// Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+IBP_2_0_IV1(-1),
+
+// Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+IBP_2_1_IV0(-1),
+
+// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+IBP_2_1_IV1(-1),
+
+// Support ZStandard Compression Codec (KIP-110)
+IBP_2_1_IV2(-1),
+
+// Introduced broker generation (KIP-380), and
+// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+IBP_2_2_IV0(-1),
+
+// New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+IBP_2_2_IV1(-1),
+
+// Introduced static membership.
+IBP_2_3_IV0(-1),
+
+// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+IBP_2_3_IV1(-1),
+
+// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+IBP_2_4_IV0(-1),
+
+// Flexible version support in inter-broker APIs
+IBP_2_4_IV1(-1),
+
+// No new APIs, equivalent to 2.4-IV1
+IBP_2_5_IV0(-1),
+
+// Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+IBP_2_6_IV0(-1),
+
+// Introduced feature versioning support (KIP-584)
+IBP_2_7_IV0(-1),
+
+// Bup Fetch protocol for Raft protocol (KIP-595)
+IBP_2_7_IV1(-1),
+
+// Introduced AlterPartition (KIP-497)
+IBP_2_7_IV2(-1),
+
+// Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+IBP_2_8_IV0(-1),
+
+// Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+IBP_2_8_IV1(-1),
+
+// Introduce AllocateProducerIds (KIP-730)
+IBP_3_0_IV0(1),
+
+// Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+// Assume message format version is 3.0 (KIP-724)
+IBP_3_0_IV1(2),
+
+// Adds topic IDs to Fetch requests/responses (KIP-516)
+IBP_3_1_IV0(3),
+
+// Support for leader recovery for unclean leader election (KIP-704)
+IBP_3_2_IV0(4),
+
+// KRaft GA

Review Comment:
   @ijuma are you referring to the 

[GitHub] [kafka] ijuma commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-04-26 Thread GitBox


ijuma commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r859091472


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+IBP_0_8_0(-1),
+IBP_0_8_1(-1),
+IBP_0_8_2(-1),
+IBP_0_9_0(-1),
+
+// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+IBP_0_10_0_IV0(-1),
+
+// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+IBP_0_10_0_IV1(-1),
+
+// introduced for JoinGroup protocol change in KIP-62
+IBP_0_10_1_IV0(-1),
+
+// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+IBP_0_10_1_IV1(-1),
+
+// introduced ListOffsetRequest v1 in KIP-79
+IBP_0_10_1_IV2(-1),
+
+// introduced UpdateMetadataRequest v3 in KIP-103
+IBP_0_10_2_IV0(-1),
+
+// KIP-98 (idempotent and transactional producer support)
+IBP_0_11_0_IV0(-1),
+
+// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+IBP_0_11_0_IV1(-1),
+
+// Introduced leader epoch fetches to the replica fetcher via KIP-101
+IBP_0_11_0_IV2(-1),
+
+// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+IBP_1_0_IV0(-1),
+
+// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+// and KafkaStorageException for fetch requests.
+IBP_1_1_IV0(-1),
+
+// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+IBP_2_0_IV0(-1),
+
+// Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+IBP_2_0_IV1(-1),
+
+// Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+IBP_2_1_IV0(-1),
+
+// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+IBP_2_1_IV1(-1),
+
+// Support ZStandard Compression Codec (KIP-110)
+IBP_2_1_IV2(-1),
+
+// Introduced broker generation (KIP-380), and
+// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+IBP_2_2_IV0(-1),
+
+// New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+IBP_2_2_IV1(-1),
+
+// Introduced static membership.
+IBP_2_3_IV0(-1),
+
+// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+IBP_2_3_IV1(-1),
+
+// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+IBP_2_4_IV0(-1),
+
+// Flexible version support in inter-broker APIs
+IBP_2_4_IV1(-1),
+
+// No new APIs, equivalent to 2.4-IV1
+IBP_2_5_IV0(-1),
+
+// Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+IBP_2_6_IV0(-1),
+
+// Introduced feature versioning support (KIP-584)
+IBP_2_7_IV0(-1),
+
+// Bup Fetch protocol for Raft protocol (KIP-595)
+IBP_2_7_IV1(-1),
+
+// Introduced AlterPartition (KIP-497)
+IBP_2_7_IV2(-1),
+
+// Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+IBP_2_8_IV0(-1),
+
+// Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+IBP_2_8_IV1(-1),
+
+// Introduce AllocateProducerIds (KIP-730)
+IBP_3_0_IV0(1),
+
+// Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+// Assume message format version is 3.0 (KIP-724)
+IBP_3_0_IV1(2),
+
+// Adds topic IDs to Fetch requests/responses (KIP-516)
+IBP_3_1_IV0(3),
+
+// Support for leader recovery for unclean leader election (KIP-704)
+IBP_3_2_IV0(4),
+
+// KRaft GA
+IBP_3_3_IV0(5);
+
+private final Optional me

[GitHub] [kafka] junrao commented on a diff in pull request #12029: KAFKA-13815: Avoid reinitialization for a replica that is being deleted

2022-04-26 Thread GitBox


junrao commented on code in PR #12029:
URL: https://github.com/apache/kafka/pull/12029#discussion_r859114446


##
core/src/main/scala/kafka/log/UnifiedLog.scala:
##
@@ -674,21 +682,29 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   /**
-   * Rename the directory of the local log
+   * Rename the directory of the local log. If the log's directory is being 
renamed for async deletion due to a
+   * StopReplica request, then the shouldReinitialize parameter should be set 
to false, otherwise it should be set to true.
*
+   * @param name The new name that this log's directory is being renamed to
+   * @param shouldReinitialize Whether the log's metadata should be 
reinitialized after renaming
* @throws KafkaStorageException if rename fails
*/
-  def renameDir(name: String): Unit = {
+  def renameDir(name: String, shouldReinitialize: Boolean): Unit = {
 lock synchronized {
   maybeHandleIOException(s"Error while renaming dir for $topicPartition in 
log dir ${dir.getParent}") {
 // Flush partitionMetadata file before initializing again
 maybeFlushMetadataFile()

Review Comment:
   Yes, it's probably useful to cover that. What do you think @gitlw ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Created] (KAFKA-13858) Kraft should not shutdown metadata listener until controller shutdown is finished

2022-04-26 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-13858:
---

 Summary: Kraft should not shutdown metadata listener until 
controller shutdown is finished
 Key: KAFKA-13858
 URL: https://issues.apache.org/jira/browse/KAFKA-13858
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


When the kraft broker begins controlled shutdown, it immediately disables the 
metadata listener. This means that metadata changes as part of the controlled 
shutdown do not get sent to the respective components. For partitions that the 
broker is follower of, that is what we want. It prevents the follower from 
being able to rejoin the ISR while still shutting down. But for partitions that 
the broker is leading, it means the leader will remain active until controlled 
shutdown is complete.

In the zk world, we have an explicit request `StopReplica` which serves the 
purpose of shutting down both follower and leader, but we don't have something 
similar in kraft. For KRaft, we may not necessarily need an explicit signal 
like this. We know that the broker is shutting down, so we can treat partition 
changes as implicit `StopReplica` requests rather than going through the normal 
`LeaderAndIsr` flow.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13858) Kraft should not shutdown metadata listener until controller shutdown is finished

2022-04-26 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13858:

Description: 
When the kraft broker begins controlled shutdown, it immediately disables the 
metadata listener. This means that metadata changes as part of the controlled 
shutdown do not get sent to the respective components. For partitions that the 
broker is follower of, that is what we want. It prevents the follower from 
being able to rejoin the ISR while still shutting down. But for partitions that 
the broker is leading, it means the leader will remain active until controlled 
shutdown finishes and the socket server is stopped. That delay can be as much 
as 5 seconds and probably even worse.

In the zk world, we have an explicit request `StopReplica` which serves the 
purpose of shutting down both follower and leader, but we don't have something 
similar in kraft. For KRaft, we may not necessarily need an explicit signal 
like this. We know that the broker is shutting down, so we can treat partition 
changes as implicit `StopReplica` requests rather than going through the normal 
`LeaderAndIsr` flow.

  was:
When the kraft broker begins controlled shutdown, it immediately disables the 
metadata listener. This means that metadata changes as part of the controlled 
shutdown do not get sent to the respective components. For partitions that the 
broker is follower of, that is what we want. It prevents the follower from 
being able to rejoin the ISR while still shutting down. But for partitions that 
the broker is leading, it means the leader will remain active until controlled 
shutdown is complete.

In the zk world, we have an explicit request `StopReplica` which serves the 
purpose of shutting down both follower and leader, but we don't have something 
similar in kraft. For KRaft, we may not necessarily need an explicit signal 
like this. We know that the broker is shutting down, so we can treat partition 
changes as implicit `StopReplica` requests rather than going through the normal 
`LeaderAndIsr` flow.


> Kraft should not shutdown metadata listener until controller shutdown is 
> finished
> -
>
> Key: KAFKA-13858
> URL: https://issues.apache.org/jira/browse/KAFKA-13858
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> When the kraft broker begins controlled shutdown, it immediately disables the 
> metadata listener. This means that metadata changes as part of the controlled 
> shutdown do not get sent to the respective components. For partitions that 
> the broker is follower of, that is what we want. It prevents the follower 
> from being able to rejoin the ISR while still shutting down. But for 
> partitions that the broker is leading, it means the leader will remain active 
> until controlled shutdown finishes and the socket server is stopped. That 
> delay can be as much as 5 seconds and probably even worse.
> In the zk world, we have an explicit request `StopReplica` which serves the 
> purpose of shutting down both follower and leader, but we don't have 
> something similar in kraft. For KRaft, we may not necessarily need an 
> explicit signal like this. We know that the broker is shutting down, so we 
> can treat partition changes as implicit `StopReplica` requests rather than 
> going through the normal `LeaderAndIsr` flow.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] ahuang98 commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-04-26 Thread GitBox


ahuang98 commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r859155538


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+IBP_0_8_0(-1),
+IBP_0_8_1(-1),
+IBP_0_8_2(-1),
+IBP_0_9_0(-1),
+
+// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+IBP_0_10_0_IV0(-1),
+
+// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+IBP_0_10_0_IV1(-1),
+
+// introduced for JoinGroup protocol change in KIP-62
+IBP_0_10_1_IV0(-1),
+
+// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+IBP_0_10_1_IV1(-1),
+
+// introduced ListOffsetRequest v1 in KIP-79
+IBP_0_10_1_IV2(-1),
+
+// introduced UpdateMetadataRequest v3 in KIP-103
+IBP_0_10_2_IV0(-1),
+
+// KIP-98 (idempotent and transactional producer support)
+IBP_0_11_0_IV0(-1),
+
+// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+IBP_0_11_0_IV1(-1),
+
+// Introduced leader epoch fetches to the replica fetcher via KIP-101
+IBP_0_11_0_IV2(-1),
+
+// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+IBP_1_0_IV0(-1),
+
+// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+// and KafkaStorageException for fetch requests.
+IBP_1_1_IV0(-1),
+
+// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+IBP_2_0_IV0(-1),
+
+// Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+IBP_2_0_IV1(-1),
+
+// Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+IBP_2_1_IV0(-1),
+
+// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+IBP_2_1_IV1(-1),
+
+// Support ZStandard Compression Codec (KIP-110)
+IBP_2_1_IV2(-1),
+
+// Introduced broker generation (KIP-380), and
+// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+IBP_2_2_IV0(-1),
+
+// New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+IBP_2_2_IV1(-1),
+
+// Introduced static membership.
+IBP_2_3_IV0(-1),
+
+// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+IBP_2_3_IV1(-1),
+
+// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+IBP_2_4_IV0(-1),
+
+// Flexible version support in inter-broker APIs
+IBP_2_4_IV1(-1),
+
+// No new APIs, equivalent to 2.4-IV1
+IBP_2_5_IV0(-1),
+
+// Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+IBP_2_6_IV0(-1),
+
+// Introduced feature versioning support (KIP-584)
+IBP_2_7_IV0(-1),
+
+// Bup Fetch protocol for Raft protocol (KIP-595)
+IBP_2_7_IV1(-1),
+
+// Introduced AlterPartition (KIP-497)
+IBP_2_7_IV2(-1),
+
+// Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+IBP_2_8_IV0(-1),
+
+// Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+IBP_2_8_IV1(-1),
+
+// Introduce AllocateProducerIds (KIP-730)
+IBP_3_0_IV0(1),
+
+// Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+// Assume message format version is 3.0 (KIP-724)
+IBP_3_0_IV1(2),
+
+// Adds topic IDs to Fetch requests/responses (KIP-516)
+IBP_3_1_IV0(3),
+
+// Support for leader recovery for unclean leader election (KIP-704)
+IBP_3_2_IV0(4),
+
+// KRaft GA
+IBP_3_3_IV0(5);
+
+private final Optional

[GitHub] [kafka] hachikuji opened a new pull request, #12098: MINOR: Fix event output inconsistencies in TransactionalMessageCopier

2022-04-26 Thread GitBox


hachikuji opened a new pull request, #12098:
URL: https://github.com/apache/kafka/pull/12098

   There is some strangeness and inconsistencies in the messages written by 
`TransactionalMessageCopier` to stdout. Here is a sample of two messages.
   
   Progress message:
   ```
   
{"consumed":33000,"stage":"ProcessLoop","totalProcessed":33000,"progress":"copier-0","time":"2022/04/24
 05:40:31:649","remaining":333}
   ```
   The `transactionalId` is set to the value of the `progress` key.
   
   And a shutdown message:
   ```
   
{"consumed":3,"shutdown_complete":"copier-0","totalProcessed":3,"time":"2022/04/24
 05:40:31:937","remaining":0}
   ```
   The `transactionalId` this time is set to the `shutdown_complete` key.
   
   This patch fixes these issues with the following:
   
   1. Use a separate key for the `transactionalId`.
   2. Drop the `progress` and `shutdown_complete` fields.
   3. Use `stage=ShutdownComplete` in the shutdown message.
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ahuang98 commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-04-26 Thread GitBox


ahuang98 commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r859155538


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+public enum MetadataVersion {
+IBP_0_8_0(-1),
+IBP_0_8_1(-1),
+IBP_0_8_2(-1),
+IBP_0_9_0(-1),
+
+// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+IBP_0_10_0_IV0(-1),
+
+// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+IBP_0_10_0_IV1(-1),
+
+// introduced for JoinGroup protocol change in KIP-62
+IBP_0_10_1_IV0(-1),
+
+// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+IBP_0_10_1_IV1(-1),
+
+// introduced ListOffsetRequest v1 in KIP-79
+IBP_0_10_1_IV2(-1),
+
+// introduced UpdateMetadataRequest v3 in KIP-103
+IBP_0_10_2_IV0(-1),
+
+// KIP-98 (idempotent and transactional producer support)
+IBP_0_11_0_IV0(-1),
+
+// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+IBP_0_11_0_IV1(-1),
+
+// Introduced leader epoch fetches to the replica fetcher via KIP-101
+IBP_0_11_0_IV2(-1),
+
+// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+IBP_1_0_IV0(-1),
+
+// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+// and KafkaStorageException for fetch requests.
+IBP_1_1_IV0(-1),
+
+// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+IBP_2_0_IV0(-1),
+
+// Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+IBP_2_0_IV1(-1),
+
+// Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+IBP_2_1_IV0(-1),
+
+// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+IBP_2_1_IV1(-1),
+
+// Support ZStandard Compression Codec (KIP-110)
+IBP_2_1_IV2(-1),
+
+// Introduced broker generation (KIP-380), and
+// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+IBP_2_2_IV0(-1),
+
+// New error code for ListOffsets when a new leader is lagging behind 
former HW (KIP-207)
+IBP_2_2_IV1(-1),
+
+// Introduced static membership.
+IBP_2_3_IV0(-1),
+
+// Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, 
and replica_id to OffsetsForLeaderRequest
+IBP_2_3_IV1(-1),
+
+// Add adding_replicas and removing_replicas fields to LeaderAndIsrRequest
+IBP_2_4_IV0(-1),
+
+// Flexible version support in inter-broker APIs
+IBP_2_4_IV1(-1),
+
+// No new APIs, equivalent to 2.4-IV1
+IBP_2_5_IV0(-1),
+
+// Introduced StopReplicaRequest V3 containing the leader epoch for each 
partition (KIP-570)
+IBP_2_6_IV0(-1),
+
+// Introduced feature versioning support (KIP-584)
+IBP_2_7_IV0(-1),
+
+// Bup Fetch protocol for Raft protocol (KIP-595)
+IBP_2_7_IV1(-1),
+
+// Introduced AlterPartition (KIP-497)
+IBP_2_7_IV2(-1),
+
+// Flexible versioning on ListOffsets, WriteTxnMarkers and 
OffsetsForLeaderEpoch. Also adds topic IDs (KIP-516)
+IBP_2_8_IV0(-1),
+
+// Introduced topic IDs to LeaderAndIsr and UpdateMetadata 
requests/responses (KIP-516)
+IBP_2_8_IV1(-1),
+
+// Introduce AllocateProducerIds (KIP-730)
+IBP_3_0_IV0(1),
+
+// Introduce ListOffsets V7 which supports listing offsets by max 
timestamp (KIP-734)
+// Assume message format version is 3.0 (KIP-724)
+IBP_3_0_IV1(2),
+
+// Adds topic IDs to Fetch requests/responses (KIP-516)
+IBP_3_1_IV0(3),
+
+// Support for leader recovery for unclean leader election (KIP-704)
+IBP_3_2_IV0(4),
+
+// KRaft GA
+IBP_3_3_IV0(5);
+
+private final Optional

[jira] [Updated] (KAFKA-13858) Kraft should not shutdown metadata listener until controller shutdown is finished

2022-04-26 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13858?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-13858:

Labels: kip-500  (was: )

> Kraft should not shutdown metadata listener until controller shutdown is 
> finished
> -
>
> Key: KAFKA-13858
> URL: https://issues.apache.org/jira/browse/KAFKA-13858
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>  Labels: kip-500
>
> When the kraft broker begins controlled shutdown, it immediately disables the 
> metadata listener. This means that metadata changes as part of the controlled 
> shutdown do not get sent to the respective components. For partitions that 
> the broker is follower of, that is what we want. It prevents the follower 
> from being able to rejoin the ISR while still shutting down. But for 
> partitions that the broker is leading, it means the leader will remain active 
> until controlled shutdown finishes and the socket server is stopped. That 
> delay can be as much as 5 seconds and probably even worse.
> In the zk world, we have an explicit request `StopReplica` which serves the 
> purpose of shutting down both follower and leader, but we don't have 
> something similar in kraft. For KRaft, we may not necessarily need an 
> explicit signal like this. We know that the broker is shutting down, so we 
> can treat partition changes as implicit `StopReplica` requests rather than 
> going through the normal `LeaderAndIsr` flow.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] ijuma commented on a diff in pull request #12072: KAFKA-13854 Refactor ApiVersion to MetadataVersion

2022-04-26 Thread GitBox


ijuma commented on code in PR #12072:
URL: https://github.com/apache/kafka/pull/12072#discussion_r859163758


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.common;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.kafka.common.record.RecordVersion;
+
+/**
+ * This class contains the different Kafka versions.
+ * Right now, we use them for upgrades - users can configure the version of 
the API brokers will use to communicate between themselves.
+ * This is only for inter-broker communications - when communicating with 
clients, the client decides on the API version.
+ *
+ * Note that the ID we initialize for each version is important.
+ * We consider a version newer than another if it is lower in the enum list 
(to avoid depending on lexicographic order)
+ *
+ * Since the api protocol may change more than once within the same release 
and to facilitate people deploying code from
+ * trunk, we have the concept of internal versions (first introduced during 
the 0.10.0 development cycle). For example,
+ * the first time we introduce a version change in a release, say 0.10.0, we 
will add a config value "0.10.0-IV0" and a
+ * corresponding enum constant IBP_0_10_0-IV0. We will also add a config value 
"0.10.0" that will be mapped to the
+ * latest internal version object, which is IBP_0_10_0-IV0. When we change the 
protocol a second time while developing
+ * 0.10.0, we will add a new config value "0.10.0-IV1" and a corresponding 
enum constant IBP_0_10_0-IV1. We will change
+ * the config value "0.10.0" to map to the latest internal version 
IBP_0_10_0-IV1. The config value of
+ * "0.10.0-IV0" is still mapped to IBP_0_10_0-IV0. This way, if people are 
deploying from trunk, they can use
+ * "0.10.0-IV0" and "0.10.0-IV1" to upgrade one internal version at a time. 
For most people who just want to use
+ * released version, they can use "0.10.0" when upgrading to the 0.10.0 
release.
+ */
+public enum MetadataVersion {
+IBP_0_8_0(-1),
+IBP_0_8_1(-1),
+IBP_0_8_2(-1),
+IBP_0_9_0(-1),
+
+// 0.10.0-IV0 is introduced for KIP-31/32 which changes the message format.
+IBP_0_10_0_IV0(-1),
+
+// 0.10.0-IV1 is introduced for KIP-36(rack awareness) and KIP-43(SASL 
handshake).
+IBP_0_10_0_IV1(-1),
+
+// introduced for JoinGroup protocol change in KIP-62
+IBP_0_10_1_IV0(-1),
+
+// 0.10.1-IV1 is introduced for KIP-74(fetch response size limit).
+IBP_0_10_1_IV1(-1),
+
+// introduced ListOffsetRequest v1 in KIP-79
+IBP_0_10_1_IV2(-1),
+
+// introduced UpdateMetadataRequest v3 in KIP-103
+IBP_0_10_2_IV0(-1),
+
+// KIP-98 (idempotent and transactional producer support)
+IBP_0_11_0_IV0(-1),
+
+// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
+IBP_0_11_0_IV1(-1),
+
+// Introduced leader epoch fetches to the replica fetcher via KIP-101
+IBP_0_11_0_IV2(-1),
+
+// Introduced LeaderAndIsrRequest V1, UpdateMetadataRequest V4 and 
FetchRequest V6 via KIP-112
+IBP_1_0_IV0(-1),
+
+// Introduced DeleteGroupsRequest V0 via KIP-229, plus KIP-227 incremental 
fetch requests,
+// and KafkaStorageException for fetch requests.
+IBP_1_1_IV0(-1),
+
+// Introduced OffsetsForLeaderEpochRequest V1 via KIP-279 (Fix log 
divergence between leader and follower after fast leader fail over)
+IBP_2_0_IV0(-1),
+
+// Several request versions were bumped due to KIP-219 (Improve quota 
communication)
+IBP_2_0_IV1(-1),
+
+// Introduced new schemas for group offset (v2) and group metadata (v2) 
(KIP-211)
+IBP_2_1_IV0(-1),
+
+// New Fetch, OffsetsForLeaderEpoch, and ListOffsets schemas (KIP-320)
+IBP_2_1_IV1(-1),
+
+// Support ZStandard Compression Codec (KIP-110)
+IBP_2_1_IV2(-1),
+
+// Introduced broker generation (KIP-380), and
+// LeaderAdnIsrRequest V2, UpdateMetadataRequest V5, StopReplicaRequest V1
+IBP_2_2_IV0(-1),
+
+// New error 

[GitHub] [kafka] guozhangwang merged pull request #12094: MINOR: fix html generation syntax errors

2022-04-26 Thread GitBox


guozhangwang merged PR #12094:
URL: https://github.com/apache/kafka/pull/12094


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12052: KAFKA-13799: Improve documentation for Kafka zero-copy

2022-04-26 Thread GitBox


RivenSun2 commented on code in PR #12052:
URL: https://github.com/apache/kafka/pull/12052#discussion_r859296011


##
docs/design.html:
##
@@ -125,6 +125,9 @@ 
 
 This combination of pagecache and sendfile means that on a Kafka cluster 
where the consumers are mostly caught up you will see no read activity on the 
disks whatsoever as they will be serving data entirely from cache.
 
+TLS/SSL libraries operate at the user space (in-kernel 
SSL_sendfile is currently not supported by Kafka). Due to this 
restriction, sendfile could not be used when transport layer 
enables SSL protocol. For enabling

Review Comment:
   @ijuma Thanks for your review.
   Do I need to recreate a PR to improve the semantics of this documentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] ijuma commented on a diff in pull request #12052: KAFKA-13799: Improve documentation for Kafka zero-copy

2022-04-26 Thread GitBox


ijuma commented on code in PR #12052:
URL: https://github.com/apache/kafka/pull/12052#discussion_r859326941


##
docs/design.html:
##
@@ -125,6 +125,9 @@ 
 
 This combination of pagecache and sendfile means that on a Kafka cluster 
where the consumers are mostly caught up you will see no read activity on the 
disks whatsoever as they will be serving data entirely from cache.
 
+TLS/SSL libraries operate at the user space (in-kernel 
SSL_sendfile is currently not supported by Kafka). Due to this 
restriction, sendfile could not be used when transport layer 
enables SSL protocol. For enabling

Review Comment:
   You can submit a new one with just this change if you agree it reads better.



##
docs/design.html:
##
@@ -125,6 +125,9 @@ 
 
 This combination of pagecache and sendfile means that on a Kafka cluster 
where the consumers are mostly caught up you will see no read activity on the 
disks whatsoever as they will be serving data entirely from cache.
 
+TLS/SSL libraries operate at the user space (in-kernel 
SSL_sendfile is currently not supported by Kafka). Due to this 
restriction, sendfile could not be used when transport layer 
enables SSL protocol. For enabling

Review Comment:
   Nit: "could not be used when transport layer enables SSL protocol" -> "is 
not used when SSL is enabled"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (KAFKA-13824) Pass time object from constructor so that we can mock it if needed

2022-04-26 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13824:
--
Description: (was: In [https://github.com/apache/kafka/pull/11896,] for 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java)

> Pass time object from constructor so that we can mock it if needed
> --
>
> Key: KAFKA-13824
> URL: https://issues.apache.org/jira/browse/KAFKA-13824
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hao Li
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13824) Pass time object from constructor so that we can mock it if needed

2022-04-26 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13824:
--
Labels: test  (was: )

> Pass time object from constructor so that we can mock it if needed
> --
>
> Key: KAFKA-13824
> URL: https://issues.apache.org/jira/browse/KAFKA-13824
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hao Li
>Priority: Minor
>  Labels: test
>




--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Updated] (KAFKA-13824) Pass time object from constructor so that we can mock it if needed

2022-04-26 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13824?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-13824:
--
Description: 
We pass in the {{Time}} object in the StreamThread, which is used to measure 
various metrics so that we can mock the time in tests. We should extend this to 
lower-level metrics, more specifically:

* KafkaStreams (instance-level): already done.
* StreamThread (thread-level): already done, in runtime passed through from 
instance.
* ProcessingContext (task-level).
* MeteredStore (store-level).
* StreamTask (task-level).
* ProcessorNode (processor-node-level): only a few processors that need to 
expose this level latency would need to have the {{Time}} object passed through.

> Pass time object from constructor so that we can mock it if needed
> --
>
> Key: KAFKA-13824
> URL: https://issues.apache.org/jira/browse/KAFKA-13824
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Hao Li
>Priority: Minor
>  Labels: test
>
> We pass in the {{Time}} object in the StreamThread, which is used to measure 
> various metrics so that we can mock the time in tests. We should extend this 
> to lower-level metrics, more specifically:
> * KafkaStreams (instance-level): already done.
> * StreamThread (thread-level): already done, in runtime passed through from 
> instance.
> * ProcessingContext (task-level).
> * MeteredStore (store-level).
> * StreamTask (task-level).
> * ProcessorNode (processor-node-level): only a few processors that need to 
> expose this level latency would need to have the {{Time}} object passed 
> through.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[GitHub] [kafka] RivenSun2 commented on a diff in pull request #12052: KAFKA-13799: Improve documentation for Kafka zero-copy

2022-04-26 Thread GitBox


RivenSun2 commented on code in PR #12052:
URL: https://github.com/apache/kafka/pull/12052#discussion_r859353961


##
docs/design.html:
##
@@ -125,6 +125,9 @@ 
 
 This combination of pagecache and sendfile means that on a Kafka cluster 
where the consumers are mostly caught up you will see no read activity on the 
disks whatsoever as they will be serving data entirely from cache.
 
+TLS/SSL libraries operate at the user space (in-kernel 
SSL_sendfile is currently not supported by Kafka). Due to this 
restriction, sendfile could not be used when transport layer 
enables SSL protocol. For enabling

Review Comment:
   Sure , thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] RivenSun2 opened a new pull request, #12099: MINOR: Improve ssl description in zero-copy docs

2022-04-26 Thread GitBox


RivenSun2 opened a new pull request, #12099:
URL: https://github.com/apache/kafka/pull/12099

   Improvements to PR #12052, improve ssl description in zero-copy docs
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a diff in pull request #11896: KAFKA-13785: [6/N][Emit final] emit final for TimeWindowedKStreamImpl

2022-04-26 Thread GitBox


guozhangwang commented on code in PR #11896:
URL: https://github.com/apache/kafka/pull/11896#discussion_r859364452


##
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java:
##
@@ -184,6 +247,78 @@ public void process(final Record record) {
 droppedRecordsSensor.record();
 }
 }
+
+tryEmitFinalResult(record, closeTime);
+}
+
+private void tryEmitFinalResult(final Record record, final 
long closeTime) {
+if (emitStrategy.type() != StrategyType.ON_WINDOW_CLOSE) {
+return;
+}
+
+final long now = internalProcessorContext.currentSystemTimeMs();
+// Throttle emit frequency
+if (now < timeTracker.nextTimeToEmit) {
+return;
+}
+
+// Schedule next emit time based on now to avoid the case that if 
system time jumps a lot,
+// this can be triggered everytime
+timeTracker.nextTimeToEmit = now;
+timeTracker.advanceNextTimeToEmit();
+
+// Close time does not progress
+if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP && 
lastEmitCloseTime >= closeTime) {
+return;
+}
+
+final long emitRangeUpperBoundInclusive = closeTime - 
windows.size();
+if (emitRangeUpperBoundInclusive < 0) {
+// If emitRangeUpperBoundInclusive is 0, it means first window 
closes since windowEndTime
+// is exclusive
+return;
+}
+
+// Because we only get here when emitRangeUpperBoundInclusive > 0 
which means closeTime > windows.size()
+// Since we set lastEmitCloseTime to closeTime before storing to 
processor metadata
+// lastEmitCloseTime - windows.size() is always > 0
+// Set emitRangeLowerBoundInclusive to -1L if not set so that when 
we fetchAll, we fetch from 0L
+final long emitRangeLowerBoundInclusive = lastEmitCloseTime == 
ConsumerRecord.NO_TIMESTAMP ?
+-1L : lastEmitCloseTime - windows.size();
+
+if (lastEmitCloseTime != ConsumerRecord.NO_TIMESTAMP) {
+final Map matchedCloseWindows = 
windows.windowsFor(emitRangeUpperBoundInclusive);
+final Map matchedEmitWindows = 
windows.windowsFor(emitRangeLowerBoundInclusive);
+
+// Don't fetch store if the new emit window close time doesn't 
progress enough to cover next
+// window
+if (matchedCloseWindows.equals(matchedEmitWindows)) {
+log.trace("no new windows to emit. LastEmitCloseTime={}, 
newCloseTime={}",
+lastEmitCloseTime, closeTime);
+return;
+}
+}
+
+final long startNs = time.nanoseconds();

Review Comment:
   If we do intend to use nanoseconds instead of milliseconds, then we should 
name the metrics name as "...-latency-ns" and also in the description to 
emphasize it is measured in nanos, since by default all latency are measured in 
millis across AK package unless otherwise explicitly named / described.
   
   Personally I think it's sufficient to measure in milis. WDYT @lihaosky 
@mjsax ?



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java:
##
@@ -62,6 +67,17 @@ private ProcessorNodeMetrics() {}
 private static final String FORWARD_RATE_DESCRIPTION =
 RATE_DESCRIPTION_PREFIX + FORWARD_DESCRIPTION + 
RATE_DESCRIPTION_SUFFIX;
 
+private static final String EMITTED_RECORDS = 
"window-aggregate-final-emit";
+private static final String EMITTED_RECORDS_DESCRIPTION = "emit final 
records";
+private static final String EMITTED_RECORDS_TOTAL_DESCRIPTION = 
TOTAL_DESCRIPTION + EMITTED_RECORDS_DESCRIPTION;
+private static final String EMITTED_RECORDS_RATE_DESCRIPTION =
+RATE_DESCRIPTION_PREFIX + EMITTED_RECORDS_DESCRIPTION + 
RATE_DESCRIPTION_SUFFIX;
+
+private static final String EMIT_FINAL_LATENCY = 
"window-aggregate-final-emit" + LATENCY_SUFFIX;
+private static final String EMIT_FINAL_DESCRIPTION = "calls to emit final";

Review Comment:
   We can replace with `EMITTED_RECORDS + LATENCY_SUFFIX`.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java:
##
@@ -62,6 +67,17 @@ private ProcessorNodeMetrics() {}
 private static final String FORWARD_RATE_DESCRIPTION =
 RATE_DESCRIPTION_PREFIX + FORWARD_DESCRIPTION + 
RATE_DESCRIPTION_SUFFIX;
 
+private static final String EMITTED_RECORDS = 
"window-aggregate-final-emit";
+private static final String EMITTED_RECORDS_DESCRIPTION = "emit final 
records";
+private static final String EMITTED_RECORDS_TOTAL_DESCRIPTION = 
TOTAL_DESCRIPTION + EMITTED_RECORDS_DESCRIPTION;

[GitHub] [kafka] guozhangwang opened a new pull request, #12100: KAFKA-13785: [6/N][Emit final] Copy: Emit final for TimeWindowedKStreamImpl

2022-04-26 Thread GitBox


guozhangwang opened a new pull request, #12100:
URL: https://github.com/apache/kafka/pull/12100

   This is a copy PR of https://github.com/apache/kafka/pull/11896.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #12100: KAFKA-13785: [6/N][Emit final] Copy: Emit final for TimeWindowedKStreamImpl

2022-04-26 Thread GitBox


guozhangwang commented on PR #12100:
URL: https://github.com/apache/kafka/pull/12100#issuecomment-1110551346

   The only differences between this copy PR and the original ones are 1) 
resolved conflicts, 2) incorporated latest comments. cc @mjsax 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] showuon commented on a diff in pull request #11955: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is

2022-04-26 Thread GitBox


showuon commented on code in PR #11955:
URL: https://github.com/apache/kafka/pull/11955#discussion_r859425060


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##
@@ -222,6 +223,14 @@ public void stop() {
 connectorStatusMetricsGroup.close();
 
 workerConfigTransformer.close();
+executor.shutdown();
+try {
+if 
(!executor.awaitTermination(EXECUTOR_SHUTDOWN_TERMINATION_TIMEOUT_MS, 
TimeUnit.MILLISECONDS)) {
+executor.shutdownNow();
+}
+} catch (InterruptedException e) {
+executor.shutdownNow();
+}

Review Comment:
   Not sure if we should follow the 2-phase shutdown in "usage example" in java 
doc?
   
https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/ExecutorService.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org