[GitHub] [kafka] dongjinleekr closed pull request #10642: KAFKA-12756: Update Zookeeper to 3.6.3 or higher
dongjinleekr closed pull request #10642: URL: https://github.com/apache/kafka/pull/10642 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #10678: TRIVIAL: Fix type inconsistencies, unthrown exceptions, etc
dongjinleekr commented on pull request #10678: URL: https://github.com/apache/kafka/pull/10678#issuecomment-851263147 Rebased onto the latest trunk, along with additional glitches I found during other issues. @chia7712 @kowshik @ijuma Could you have a look? :pray: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10791: MINOR: replace by org.junit.jupiter.api.Tag by net.jqwik.api.Tag for …
chia7712 commented on pull request #10791: URL: https://github.com/apache/kafka/pull/10791#issuecomment-851271639 > The build definition doesn't require changes to mention this annotation? The doc (https://jqwik.net/docs/current/user-guide.html#tagging-tests) explains how to tag a test but it does not mention the package of this annotation. I filed a PR for that (https://github.com/jlink/jqwik/pull/193) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 opened a new pull request #10796: HOTFIX: fix build error
chia7712 opened a new pull request #10796: URL: https://github.com/apache/kafka/pull/10796 see CI (https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-10585/5/pipeline/12/) and the error is related to 6b005b2b4eece81a5500fb0080ef5354b4240681 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12867) Trogdor ConsumeBenchWorker quits prematurely with maxMessages config
Kowshik Prakasam created KAFKA-12867: Summary: Trogdor ConsumeBenchWorker quits prematurely with maxMessages config Key: KAFKA-12867 URL: https://issues.apache.org/jira/browse/KAFKA-12867 Project: Kafka Issue Type: Bug Reporter: Kowshik Prakasam The trogdor [ConsumeBenchWorker|https://github.com/apache/kafka/commits/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java] has a bug. If one of the consumption tasks completes executing successfully due to [maxMessages being consumed|https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L245], then, the consumption task [notifies the doneFuture|https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L285] causing the ConsumeBenchWorker to halt. This becomes a problem when more than 1 consumption task is running in parallel, because the successful completion of 1 of the tasks shuts down the entire worker while the other tasks are still running. When the worker is shut down, it [kills|https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L482] all the active consumption tasks, which is not the desired behavior. The fix is to not notify the doneFuture when 1 of the consumption tasks complete without error. Instead, we should defer the notification to the [CloseStatusUpdater|https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L299] thread. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kowshik opened a new pull request #10797: KAFKA-12867: Fix ConsumeBenchWorker exit behavior for maxMessages config
kowshik opened a new pull request #10797: URL: https://github.com/apache/kafka/pull/10797 The trogdor `ConsumeBenchWorker` has a bug. If one of the consumption tasks completes executing successfully due to `maxMessages` being consumed, then, the consumption task notifies the `doneFuture` causing the entire `ConsumeBenchWorker` to halt. This becomes a problem when more than 1 consumption task is running in parallel, because the successful completion of 1 of the tasks shuts down the entire worker while the other tasks are still running. When the worker is shut down, it kills all the active consumption tasks, which is not the desired behavior. The fix is to not notify the `doneFuture` when 1 of the consumption tasks complete without error. Instead, we should defer the notification to the `CloseStatusUpdater` thread. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on pull request #10797: KAFKA-12867: Fix ConsumeBenchWorker exit behavior for maxMessages config
kowshik commented on pull request #10797: URL: https://github.com/apache/kafka/pull/10797#issuecomment-851293196 cc @junrao @apovzner @rajinisivaram for review. @apovzner It appears this behavior has been around since `ConsumeBenchWorker` was first implemented: https://github.com/apache/kafka/pull/4775. Please let me know your thoughts. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12868) log cleaner failed with java.lang.ArrayIndexOutOfBoundsException: -2147483648
Yuneng Xie created KAFKA-12868: -- Summary: log cleaner failed with java.lang.ArrayIndexOutOfBoundsException: -2147483648 Key: KAFKA-12868 URL: https://issues.apache.org/jira/browse/KAFKA-12868 Project: Kafka Issue Type: Bug Components: log cleaner Affects Versions: 2.4.0 Reporter: Yuneng Xie our broker spent too long loading the offset partition. ``` [2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished loading offsets and group metadata from __consumer_offsets-47 in 1236029 milliseconds. (kafka.coordinator.group.GroupMetadataManager) ``` so i checked the partition data , it's too big ``` 106G /var/lib/kafka/data/__consumer_offsets-47 ``` the retention time is 7 days ``` log.retention.hours=168 ``` and i found error log about log cleaner: ``` [2021-04-10 00:28:24,507] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-47. (kafka.log.LogCleaner) [2021-04-10 00:28:24,507] INFO Cleaner 0: Building offset map for __consumer_offsets-47... (kafka.log.LogCleaner) [2021-04-10 00:28:24,526] INFO Cleaner 0: Building offset map for log __consumer_offsets-47 for 17 segments in offset range [1010757586, 1020967392). (kafka.log.LogCleaner) [2021-04-10 00:29:22,669] WARN [kafka-log-cleaner-thread-0]: Unexpected exception thrown when cleaning log Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, partition=47, highWatermark=1021411007, lastStableOffset=1021411007, logStartOffset=0, logEndOffset=1021411007). Marking its partition (__consumer_offsets-47) as uncleanable (kafka.log.LogCleaner) kafka.log.LogCleaningException: -2147483648 at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:348) at kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:324) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:313) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2147483648 at kafka.utils.CoreUtils$.readInt(CoreUtils.scala:241) at kafka.log.SkimpyOffsetMap.positionOf(OffsetMap.scala:183) at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:101) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2(LogCleaner.scala:947) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2$adapted(LogCleaner.scala:944) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1(LogCleaner.scala:944) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1$adapted(LogCleaner.scala:933) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:933) at kafka.log.Cleaner.$anonfun$buildOffsetMap$3(LogCleaner.scala:894) at kafka.log.Cleaner.$anonfun$buildOffsetMap$3$adapted(LogCleaner.scala:890) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:890) at kafka.log.Cleaner.doClean(LogCleaner.scala:514) at kafka.log.Cleaner.clean(LogCleaner.scala:502) at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:371) at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:344) ... 3 more ``` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12868) log cleaner failed with java.lang.ArrayIndexOutOfBoundsException: -2147483648
[ https://issues.apache.org/jira/browse/KAFKA-12868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuneng Xie updated KAFKA-12868: --- Description: our broker spent too long loading the offset partition. ``` [2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished loading offsets and group metadata from __consumer_offsets-47 in 1236029 milliseconds. (kafka.coordinator.group.GroupMetadataManager) ``` so i checked the partition data , it's too big ``` 106G /var/lib/kafka/data/__consumer_offsets-47 ``` the retention time is 7 days ``` log.retention.hours=168 ``` and i found error log about log cleaner: ``` [2021-04-10 00:28:24,507] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-47. (kafka.log.LogCleaner) [2021-04-10 00:28:24,507] INFO Cleaner 0: Building offset map for __consumer_offsets-47... (kafka.log.LogCleaner) [2021-04-10 00:28:24,526] INFO Cleaner 0: Building offset map for log __consumer_offsets-47 for 17 segments in offset range [1010757586, 1020967392). (kafka.log.LogCleaner) [2021-04-10 00:29:22,669] WARN [kafka-log-cleaner-thread-0]: Unexpected exception thrown when cleaning log Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, partition=47, highWatermark=1021411007, lastStableOffset=1021411007, logStartOffset=0, logEndOffset=1021411007). Marking its partition (__consumer_offsets-47) as uncleanable (kafka.log.LogCleaner) kafka.log.LogCleaningException: -2147483648 at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:348) at kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:324) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:313) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2147483648 at kafka.utils.CoreUtils$.readInt(CoreUtils.scala:241) at kafka.log.SkimpyOffsetMap.positionOf(OffsetMap.scala:183) at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:101) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2(LogCleaner.scala:947) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2$adapted(LogCleaner.scala:944) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1(LogCleaner.scala:944) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1$adapted(LogCleaner.scala:933) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:933) at kafka.log.Cleaner.$anonfun$buildOffsetMap$3(LogCleaner.scala:894) at kafka.log.Cleaner.$anonfun$buildOffsetMap$3$adapted(LogCleaner.scala:890) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:890) at kafka.log.Cleaner.doClean(LogCleaner.scala:514) at kafka.log.Cleaner.clean(LogCleaner.scala:502) at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:371) at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:344) ... 3 more ``` any suggestion on this? was: our broker spent too long loading the offset partition. ``` [2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished loading offsets and group metadata from __consumer_offsets-47 in 1236029 milliseconds. (kafka.coordinator.group.GroupMetadataManager) ``` so i checked the partition data , it's too big ``` 106G /var/lib/kafka/data/__consumer_offsets-47 ``` the retention time is 7 days ``` log.retention.hours=168 ``` and i found error log about log cleaner: ``` [2021-04-
[jira] [Updated] (KAFKA-12868) log cleaner failed with java.lang.ArrayIndexOutOfBoundsException: -2147483648
[ https://issues.apache.org/jira/browse/KAFKA-12868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuneng Xie updated KAFKA-12868: --- Description: our broker spent too long loading the offset partition. ``` [2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished loading offsets and group metadata from __consumer_offsets-47 in 1236029 milliseconds. (kafka.coordinator.group.GroupMetadataManager) ``` so i checked the partition data , it's too big ``` 106G /var/lib/kafka/data/__consumer_offsets-47 ``` the retention time is 7 days ``` log.retention.hours=168 ``` and i found error log about log cleaner: ``` [2021-04-10 00:28:24,507] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-47. (kafka.log.LogCleaner) [2021-04-10 00:28:24,507] INFO Cleaner 0: Building offset map for __consumer_offsets-47... (kafka.log.LogCleaner) [2021-04-10 00:28:24,526] INFO Cleaner 0: Building offset map for log __consumer_offsets-47 for 17 segments in offset range [1010757586, 1020967392). (kafka.log.LogCleaner) [2021-04-10 00:29:22,669] WARN [kafka-log-cleaner-thread-0]: Unexpected exception thrown when cleaning log Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, partition=47, highWatermark=1021411007, lastStableOffset=1021411007, logStartOffset=0, logEndOffset=1021411007). Marking its partition (__consumer_offsets-47) as uncleanable (kafka.log.LogCleaner) kafka.log.LogCleaningException: -2147483648 at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:348) at kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:324) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:313) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2147483648 at kafka.utils.CoreUtils$.readInt(CoreUtils.scala:241) at kafka.log.SkimpyOffsetMap.positionOf(OffsetMap.scala:183) at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:101) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2(LogCleaner.scala:947) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2$adapted(LogCleaner.scala:944) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1(LogCleaner.scala:944) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1$adapted(LogCleaner.scala:933) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:933) at kafka.log.Cleaner.$anonfun$buildOffsetMap$3(LogCleaner.scala:894) at kafka.log.Cleaner.$anonfun$buildOffsetMap$3$adapted(LogCleaner.scala:890) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:890) at kafka.log.Cleaner.doClean(LogCleaner.scala:514) at kafka.log.Cleaner.clean(LogCleaner.scala:502) at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:371) at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:344) ... 3 more ``` seems like the cleaner failed to compact __consumer_offsets-47 any suggestion on this? was: our broker spent too long loading the offset partition. ``` [2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished loading offsets and group metadata from __consumer_offsets-47 in 1236029 milliseconds. (kafka.coordinator.group.GroupMetadataManager) ``` so i checked the partition data , it's too big ``` 106G /var/lib/kafka/data/__consumer_offsets-47 ``` the retention time is 7 days ``` log.retention.hours=168
[jira] [Updated] (KAFKA-12868) log cleaner failed with java.lang.ArrayIndexOutOfBoundsException: -2147483648
[ https://issues.apache.org/jira/browse/KAFKA-12868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuneng Xie updated KAFKA-12868: --- Description: our broker spent too long loading the offset partition. ``` [2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished loading offsets and group metadata from __consumer_offsets-47 in 1236029 milliseconds. (kafka.coordinator.group.GroupMetadataManager) ``` so i checked the partition data , it's too big ``` 106G /var/lib/kafka/data/__consumer_offsets-47 ``` the retention time is 7 days ``` log.retention.hours=168 ``` but i found many segments data that's supposed to be cleaned in /var/lib/kafka/data/__consumer_offsets-47. and i found error log about log cleaner: ``` [2021-04-10 00:28:24,507] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-47. (kafka.log.LogCleaner) [2021-04-10 00:28:24,507] INFO Cleaner 0: Building offset map for __consumer_offsets-47... (kafka.log.LogCleaner) [2021-04-10 00:28:24,526] INFO Cleaner 0: Building offset map for log __consumer_offsets-47 for 17 segments in offset range [1010757586, 1020967392). (kafka.log.LogCleaner) [2021-04-10 00:29:22,669] WARN [kafka-log-cleaner-thread-0]: Unexpected exception thrown when cleaning log Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, partition=47, highWatermark=1021411007, lastStableOffset=1021411007, logStartOffset=0, logEndOffset=1021411007). Marking its partition (__consumer_offsets-47) as uncleanable (kafka.log.LogCleaner) kafka.log.LogCleaningException: -2147483648 at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:348) at kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:324) at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:313) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96) Caused by: java.lang.ArrayIndexOutOfBoundsException: -2147483648 at kafka.utils.CoreUtils$.readInt(CoreUtils.scala:241) at kafka.log.SkimpyOffsetMap.positionOf(OffsetMap.scala:183) at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:101) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2(LogCleaner.scala:947) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2$adapted(LogCleaner.scala:944) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1(LogCleaner.scala:944) at kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1$adapted(LogCleaner.scala:933) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:933) at kafka.log.Cleaner.$anonfun$buildOffsetMap$3(LogCleaner.scala:894) at kafka.log.Cleaner.$anonfun$buildOffsetMap$3$adapted(LogCleaner.scala:890) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876) at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:890) at kafka.log.Cleaner.doClean(LogCleaner.scala:514) at kafka.log.Cleaner.clean(LogCleaner.scala:502) at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:371) at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:344) ... 3 more ``` seems like the cleaner failed to compact __consumer_offsets-47 any suggestion on this? was: our broker spent too long loading the offset partition. ``` [2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished loading offsets and group metadata from __consumer_offsets-47 in 1236029 milliseconds. (kafka.coordinator.group.GroupMetadataManager) ``` so i checked the partition data , it's too big ``` 106G
[jira] [Updated] (KAFKA-12868) log cleaner failed with java.lang.ArrayIndexOutOfBoundsException: -2147483648
[ https://issues.apache.org/jira/browse/KAFKA-12868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuneng Xie updated KAFKA-12868: --- Description: our broker spent too long loading the offset partition. ``` [2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished loading offsets and group metadata from __consumer_offsets-47 in 1236029 milliseconds. (kafka.coordinator.group.GroupMetadataManager) ``` so i checked the partition data , it's too big ``` 106G /var/lib/kafka/data/__consumer_offsets-47 ``` the retention time is 7 days ``` log.retention.hours=168 ``` but i found many segments data that's supposed to be cleaned in /var/lib/kafka/data/__consumer_offsets-47. and i found error log about log cleaner: ``` [2021-04-09 00:41:48,327] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-47. (kafka.log.LogCleaner) [2021-04-09 00:41:48,327] INFO Cleaner 0: Building offset map for __consumer_offsets-47... (kafka.log.LogCleaner) [2021-04-09 00:41:48,353] INFO Cleaner 0: Building offset map for log __consumer_offsets-47 for 1 segments in offset range [1010155298, 1010757586). (kafka.log.LogCleaner) [2021-04-09 13:02:09,425] INFO Cleaner 0: Offset map for log __consumer_offsets-47 complete. (kafka.log.LogCleaner) [2021-04-09 13:02:09,426] INFO Cleaner 0: Cleaning log __consumer_offsets-47 (cleaning prior to Fri Apr 09 00:41:34 SRET 2021, discarding tombstones prior to Wed Apr 07 23:19:50 SRET 2021)... (kafka.log.LogCleaner) [2021-04-09 13:02:09,427] INFO Cleaner 0: Cleaning LogSegment(baseOffset=0, size=51681, lastModifiedTime=1617890796000, largestTime=1617890765851) in log __consumer_offsets-47 into 0 with deletion horizon 161780879, retaining deletes. (kafka.log.LogCleaner) [2021-04-09 13:02:27,573] INFO Cleaner 0: Cleaning LogSegment(baseOffset=100989, size=65218, lastModifiedTime=161789519, largestTime=1617895190754) in log __consumer_offsets-47 into 0 with deletion horizon 161780879, retaining deletes. (kafka.log.LogCleaner) [2021-04-09 13:02:52,991] INFO Cleaner 0: Swapping in cleaned segment LogSegment(baseOffset=0, size=53871, lastModifiedTime=161789519, largestTime=1617894932233) for segment(s) List(LogSegment(baseOffset=0, size=51681, lastModifiedTime=1617890796000, largestTime=1617890765851), LogSegment(baseOffset=100989, size=65218, lastModifiedTime=161789519, largestTime=1617895190754)) in log Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, partition=47, highWatermark=1015632787, lastStableOffset=1015632787, logStartOffset=0, logEndOffset=1015632789) (kafka.log.LogCleaner) [2021-04-09 13:02:52,992] INFO Cleaner 0: Cleaning LogSegment(baseOffset=1010155298, size=104857448, lastModifiedTime=1617900094000, largestTime=1617900094618) in log __consumer_offsets-47 into 1010155298 with deletion horizon 161780879, retaining deletes. (kafka.log.LogCleaner) [2021-04-10 00:28:24,476] INFO Cleaner 0: Swapping in cleaned segment LogSegment(baseOffset=1010155298, size=63028, lastModifiedTime=1617900094000, largestTime=1617900094618) for segment(s) List(LogSegment(baseOffset=1010155298, size=104857448, lastModifiedTime=1617900094000, largestTime=1617900094618)) in log Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, partition=47, highWatermark=1021404662, lastStableOffset=1021404662, logStartOffset=0, logEndOffset=1021404663) (kafka.log.LogCleaner) [2021-04-10 00:28:24,492] INFO [kafka-log-cleaner-thread-0]: Log cleaner thread 0 cleaned log __consumer_offsets-47 (dirty section = [1010155298, 1010155298]) 100.1 MB of log processed in 85,596.2 seconds (0.0 MB/sec). Indexed 100.0 MB in 44421.1 seconds (0.0 Mb/sec, 51.9% of total time) Buffer utilization: 0.0% Cleaned 100.1 MB in 41175.1 seconds (0.0 Mb/sec, 48.1% of total time) Start size: 100.1 MB (602,913 messages) End size: 0.1 MB (625 messages) 99.9% size reduction (99.9% fewer messages) (kafka.log.LogCleaner) [2021-04-10 00:28:24,507] INFO Cleaner 0: Beginning cleaning of log __consumer_offsets-47. (kafka.log.LogCleaner) [2021-04-10 00:28:24,507] INFO Cleaner 0: Building offset map for __consumer_offsets-47... (kafka.log.LogCleaner) [2021-04-10 00:28:24,526] INFO Cleaner 0: Building offset map for log __consumer_offsets-47 for 17 segments in offset range [1010757586, 1020967392). (kafka.log.LogCleaner) [2021-04-10 00:29:22,669] WARN [kafka-log-cleaner-thread-0]: Unexpected exception thrown when cleaning log Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, partition=47, highWatermark=1021411007, lastStableOffset=1021411007, logStartOffset=0, logEndOffset=1021411007). Marking its partition (__consumer_offsets-47) as uncleanable (kafka.log.LogCleaner) kafka.log.LogCleaningException: -2147483648 at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:348) at kafk
[jira] [Commented] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes
[ https://issues.apache.org/jira/browse/KAFKA-12847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354293#comment-17354293 ] Chia-Ping Tsai commented on KAFKA-12847: Could you try to use non-root to run kafka tests? IIRC, Not only system tests but also UT/IT expect to be run by non-root. > Also, I wonder whether or not upstream Dockerfile & System tests are part of >CI/CD and get tested for every PR. If so, this issue should have been caught. We don't run system tests for each PR since it is too expensive. Instead, we make sure all system tests pass before release. > Dockerfile needed for kafka system tests needs changes > -- > > Key: KAFKA-12847 > URL: https://issues.apache.org/jira/browse/KAFKA-12847 > Project: Kafka > Issue Type: Bug > Components: system tests >Affects Versions: 2.8.0, 2.7.1 > Environment: Issue tested in environments below but is independent of > h/w arch. or Linux flavor: - > 1.) RHEL-8.3 on x86_64 > 2.) RHEL-8.3 on IBM Power (ppc64le) > 3.) apache/kafka branch tested: trunk (master) >Reporter: Abhijit Mane >Assignee: Abhijit Mane >Priority: Major > Labels: easyfix > Attachments: Dockerfile.upstream > > > Hello, > I tried apache/kafka system tests as per documentation: - > ([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_]) > = > PROBLEM > ~~ > 1.) As root user, clone kafka github repo and start "kafka system tests" > # git clone [https://github.com/apache/kafka.git] > # cd kafka > # ./gradlew clean systemTestLibs > # bash tests/docker/run_tests.sh > 2.) Dockerfile issue - > [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile] > This file has an *UID* entry as shown below: - > --- > ARG *UID*="1000" > RUN useradd -u $*UID* ducker > // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not > unique, root user id is 0 > --- > I ran everything as root which means the built-in bash environment variable > 'UID' always > resolves to 0 and can't be changed. Hence, the docker build fails. The issue > should be seen even if run as non-root. > 3.) Next, as root, as per README, I ran: - > server:/kafka> *bash tests/docker/run_tests.sh* > The ducker tool builds the container images & switches to user '*ducker*' > inside the container > & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the > container. > Ref: > [https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak] > Ex: docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* > This fails as the 'ducker' user has *no write permissions* to create files > under 'kafka' root dir. Hence, it needs to be made writeable. > // *chmod -R a+w kafka* > – needed as container is run as 'ducker' and needs write access since kafka > root volume from host is mapped to container as "/opt/kafka-dev" where the > 'ducker' user writes logs > = > = > *FIXES needed* > ~ > 1.) Dockerfile - > [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile] > Change 'UID' to '*UID_DUCKER*'. > This won't conflict with built in bash env. var UID and the docker image > build should succeed. > --- > ARG *UID_DUCKER*="1000" > RUN useradd -u $*UID_DUCKER* ducker > // *{color:#57d9a3}No Error{color}* => No conflict with built-in UID > --- > 2.) README needs an update where we must ensure the kafka root dir from where > the tests > are launched is writeable to allow the 'ducker' user to create results/logs. > # chmod -R a+w kafka > With this, I was able to get the docker images built and system tests started > successfully. > = > Also, I wonder whether or not upstream Dockerfile & System tests are part of > CI/CD and get tested for every PR. If so, this issue should have been caught. > > *Question to kafka SME* > - > Do you believe this is a valid problem with the Dockerfile and the fix is > acceptable? > Please let me know and I am happy to submit a PR with this fix. > Thanks, > Abhijit -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jlprat commented on pull request #10702: MINOR: Switch Jenkins from JDK 15 to JDK 16
jlprat commented on pull request #10702: URL: https://github.com/apache/kafka/pull/10702#issuecomment-851334730 For reference, there is an issue in PowerMock for this: https://github.com/powermock/powermock/issues/1099 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10796: HOTFIX: fix build error
cadonna commented on pull request #10796: URL: https://github.com/apache/kafka/pull/10796#issuecomment-851444665 The test failures are unrelated and known to be flaky. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on pull request #10794: URL: https://github.com/apache/kafka/pull/10794#issuecomment-851446115 Tests added. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna merged pull request #10796: HOTFIX: fix build error
cadonna merged pull request #10796: URL: https://github.com/apache/kafka/pull/10796 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642461619 ## File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala ## @@ -198,15 +199,15 @@ class DefaultAutoTopicCreationManager( // Borrow client information such as client id and correlation id from the original request, // in order to correlate the create request with the original metadata request. - val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, + requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, requestVersion, context.clientId, context.correlationId) ForwardingManager.buildEnvelopeRequest(context, createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader)) }.getOrElse(createTopicsRequest) -channelManager.sendRequest(request, requestCompletionHandler) +channelManager.sendRequest(request, requestCompletionHandler, requestHeader) Review comment: This is 1 of 2 places we use `EnvelopeRequest`, so add `requestHeader` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642461619 ## File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala ## @@ -198,15 +199,15 @@ class DefaultAutoTopicCreationManager( // Borrow client information such as client id and correlation id from the original request, // in order to correlate the create request with the original metadata request. - val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, + requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, requestVersion, context.clientId, context.correlationId) ForwardingManager.buildEnvelopeRequest(context, createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader)) }.getOrElse(createTopicsRequest) -channelManager.sendRequest(request, requestCompletionHandler) +channelManager.sendRequest(request, requestCompletionHandler, requestHeader) Review comment: This is 1st of 2 places we use `EnvelopeRequest`, so add `requestHeader` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642462291 ## File path: core/src/main/scala/kafka/server/ForwardingManager.scala ## @@ -125,7 +125,7 @@ class ForwardingManagerImpl( } } -channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler) +channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler, request.header) Review comment: This is 2nd of 2 places we use `EnvelopeRequest`, add `requestHeader`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642461619 ## File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala ## @@ -198,15 +199,15 @@ class DefaultAutoTopicCreationManager( // Borrow client information such as client id and correlation id from the original request, // in order to correlate the create request with the original metadata request. - val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, + requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS, requestVersion, context.clientId, context.correlationId) ForwardingManager.buildEnvelopeRequest(context, createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader)) }.getOrElse(createTopicsRequest) -channelManager.sendRequest(request, requestCompletionHandler) +channelManager.sendRequest(request, requestCompletionHandler, requestHeader) Review comment: This is the 1st of 2 places we use `EnvelopeRequest`, so add `requestHeader` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642462291 ## File path: core/src/main/scala/kafka/server/ForwardingManager.scala ## @@ -125,7 +125,7 @@ class ForwardingManagerImpl( } } -channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler) +channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler, request.header) Review comment: This is the 2nd of 2 places we use `EnvelopeRequest`, add `requestHeader`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lkokhreidze commented on a change in pull request #10785: KIP-708 / A Rack awareness for Kafka Streams
lkokhreidze commented on a change in pull request #10785: URL: https://github.com/apache/kafka/pull/10785#discussion_r642467096 ## File path: clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java ## @@ -33,8 +31,10 @@ import java.util.Map; import java.util.Set; import java.util.function.BiConsumer; +import java.util.function.Function; Review comment: Sorry for the un-related changes :( happy to revert if it adds too much noise during the review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #10507: KAFKA-8410: Migrating stateful operators to new Processor API
cadonna commented on a change in pull request #10507: URL: https://github.com/apache/kafka/pull/10507#discussion_r642403697 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java ## @@ -32,7 +32,7 @@ private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class); @Override -public DeserializationHandlerResponse handle(final ProcessorContext context, +public DeserializationHandlerResponse handle(final ProcessorContext context, Review comment: Do we need to deprecate also this method and add a new one? Technically, it is a class of the public API that can be extended. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ## @@ -48,21 +48,20 @@ CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) { this.builder = builder; } -@SuppressWarnings("unchecked") - KTable build(final Map, Aggregator> groupPatterns, -final Initializer initializer, -final NamedInternal named, -final StoreBuilder storeBuilder, -final Serde keySerde, -final Serde valueSerde, -final String queryableName) { + KTable build(final Map, Aggregator> groupPatterns, +final Initializer initializer, +final NamedInternal named, +final StoreBuilder storeBuilder, +final Serde keySerde, +final Serde valueSerde, +final String queryableName) { processRepartitions(groupPatterns, storeBuilder); final Collection processors = new ArrayList<>(); -final Collection parentProcessors = new ArrayList<>(); +final Collection parentProcessors = new ArrayList<>(); boolean stateCreated = false; int counter = 0; for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) { -final KStreamAggProcessorSupplier parentProcessor = +final KStreamAggregateProcessorSupplier parentProcessor = Review comment: Shouldn't this be `KStreamAggregateProcessorSupplier`? The positions of the parameters `KOut` and `VIn` on `KStreamAggregateProcessorSupplier` changed with respect to `KStreamAggProcessorSupplier`. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java ## @@ -183,7 +182,9 @@ final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); return doAggregate( -new KStreamAggregate<>(materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), +new KStreamAggregate<>(materializedInternal.storeName(), +aggregateBuilder.countInitializer, +aggregateBuilder.countAggregator), Review comment: ```suggestion new KStreamAggregate<>( materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator ), ``` or ```suggestion new KStreamAggregate<>( materializedInternal.storeName(), aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), ``` ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java ## @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.kstream.internals; +import java.util.Objects; +import java.util.Set; Review comment: See my comment above about import order. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java ## @@ -16,6 +16,10 @@ */ package org.apache.kafka.streams.kstream.internals; +import java.util.Collection; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; Review comment: In [KAFKA-10787](https://issues.apache.org/jira/browse/KAFKA-10787) we agreed on an import order `kafka`, `org.apache.kafka`, `com`, `net`, `org`, `java`, `javax` and static imports. Additionally, there should be a empty line between import blocks. Note, PR #10428 introduces check and a formatter for this. ## File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ## @@ -26,29 +26,29 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.ConnectedStoreProvider; import org.apache.kafka.streams.processor.Processor; -import org.
[GitHub] [kafka] vamossagar12 opened a new pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store
vamossagar12 opened a new pull request #10798: URL: https://github.com/apache/kafka/pull/10798 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354490#comment-17354490 ] Sagar Rao commented on KAFKA-9168: -- [~ableegoldman] i modified the put API for single column family.. The changes are minimal right now but they need to evolve. But, before that I was thinking if you could run the internal benchmarking tool just for this API, we can then take a call. I will come up with jmh benchmarks as well. Here is the draft PR: https://github.com/apache/kafka/pull/10798 > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: perfomance > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on pull request #10796: HOTFIX: fix build error
ijuma commented on pull request #10796: URL: https://github.com/apache/kafka/pull/10796#issuecomment-851537672 Thanks for the fix, my bad for missing it. I did merge trunk and build locally before merging the PR, but didn't check Scala 2.12. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10791: MINOR: replace by org.junit.jupiter.api.Tag by net.jqwik.api.Tag for …
ijuma commented on pull request #10791: URL: https://github.com/apache/kafka/pull/10791#issuecomment-851538731 I was asking if our build has to change. Looks like it's string based, so it doesn't? https://github.com/apache/kafka/blob/trunk/build.gradle#L404 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma edited a comment on pull request #10791: MINOR: replace by org.junit.jupiter.api.Tag by net.jqwik.api.Tag for …
ijuma edited a comment on pull request #10791: URL: https://github.com/apache/kafka/pull/10791#issuecomment-851538731 I was asking if our build has to change. Looks like it's string based, so it doesn't? https://github.com/apache/kafka/blob/trunk/build.gradle#L404 Have we tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others
viktorsomogyi commented on pull request #10738: URL: https://github.com/apache/kafka/pull/10738#issuecomment-851544134 Finished rebasing and coding (and therefore force pushing), I'll switch the pr into "ready for review" state now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] viktorsomogyi commented on pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others
viktorsomogyi commented on pull request #10738: URL: https://github.com/apache/kafka/pull/10738#issuecomment-851545252 @omkreddy @rajinisivaram would you please help in the review to get this long outstanding KIP done? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
dengziming commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642556841 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -363,6 +386,22 @@ class BrokerToControllerRequestThread( } } + def maybeCheckNotControllerErrorInsideEnvelopeResponse(requestHeader: RequestHeader, responseBody: AbstractResponse): Boolean = { +if (responseBody.isInstanceOf[EnvelopeResponse] && requestHeader != null) { + info(s"Trying to find NOT_CONTROLLER exception inside envelope response") + val envelopeResponse = responseBody.asInstanceOf[EnvelopeResponse] + val envelopeError = envelopeResponse.error() + + if (envelopeError == Errors.NONE) { +val response = AbstractResponse.parseResponse(envelopeResponse.responseData, requestHeader) +envelopeResponse.responseData().rewind() +return response.errorCounts().containsKey(Errors.NOT_CONTROLLER) + } +} + +false; Review comment: nit: unnecessary `;` ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -363,6 +386,22 @@ class BrokerToControllerRequestThread( } } + def maybeCheckNotControllerErrorInsideEnvelopeResponse(requestHeader: RequestHeader, responseBody: AbstractResponse): Boolean = { Review comment: How about move this to `ForwardingManager` since the envelopResponse and envelopRequest are all in `ForwardingManager` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #10799: WIP
dajac opened a new pull request #10799: URL: https://github.com/apache/kafka/pull/10799 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10693: KAFKA-12625: Fix the NOTICE file
jlprat commented on pull request #10693: URL: https://github.com/apache/kafka/pull/10693#issuecomment-851594252 Hi @ableegoldman as you reviewed the last change in that area (fix license files) do you think you can review this one? Thanks in advance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10783: MINOR: Dependency updates around Scala libraries
jlprat commented on pull request #10783: URL: https://github.com/apache/kafka/pull/10783#issuecomment-851594384 @ijuma You can review now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9497: KAFKA-10619: Producer will enable idempotence and acks all by default
ijuma commented on pull request #9497: URL: https://github.com/apache/kafka/pull/9497#issuecomment-851596119 @d8tltanc can we rebase this PR so that we can get it merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma removed a comment on pull request #9497: KAFKA-10619: Producer will enable idempotence and acks all by default
ijuma removed a comment on pull request #9497: URL: https://github.com/apache/kafka/pull/9497#issuecomment-851596119 @d8tltanc can we rebase this PR so that we can get it merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #9497: KAFKA-10619: Producer will enable idempotence and acks all by default
ijuma commented on pull request #9497: URL: https://github.com/apache/kafka/pull/9497#issuecomment-851597938 @d8tltanc I notice that the Jira ticket was assigned to you, but this PR was submitted by @warrenzhu25. Are you working together on this? I am asking because we need to get this change into trunk soonish and changing the default is not the only change we require. We also need to check and potentially update the tests so that we get coverage of non idempotent cases too after the default is switched. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soarez commented on pull request #10795: KAFKA-12866: Avoid root access to Zookeeper
soarez commented on pull request #10795: URL: https://github.com/apache/kafka/pull/10795#issuecomment-851598218 @omkreddy can you have a look at this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12869) Update vulnerable dependencies
Pavel Kuznetsov created KAFKA-12869: --- Summary: Update vulnerable dependencies Key: KAFKA-12869 URL: https://issues.apache.org/jira/browse/KAFKA-12869 Project: Kafka Issue Type: Bug Components: core, KafkaConnect Affects Versions: 2.7.1 Reporter: Pavel Kuznetsov *Description* I checked kafka_2.13-2.7.1.tgz distribution with WhiteSource and find out that some libraries have vulnerabilities. Here they are: * jetty-io-9.4.38.v20210224.jar has CVE-2021-28165 vulnerability. The way to fix it is to upgrade to org.eclipse.jetty:jetty-io:9.4.39 or org.eclipse.jetty:jetty-io:10.0.2 or org.eclipse.jetty:jetty-io:11.0.2 * jersey-common-2.31.jar has CVE-2021-28168 vulnerability. The way to fix it is to upgrade to org.glassfish.jersey.core:jersey-common:2.34 * jetty-server-9.4.38.v20210224.jar has CVE-2021-28164 vulnerability. The way to fix it is to upgrade to org.eclipse.jetty:jetty-webapp:9.4.39 *To Reproduce* Download kafka_2.13-2.7.1.tgz and find jars, listed above. Check that these jars with corresponding versions are mentioned in corresponding vulnerability description. *Expected* * jetty-io upgraded to 9.4.39 or higher * jersey-common upgraded to 2.34 or higher * jetty-server upgraded to jetty-webapp:9.4.39 or higher *Actual* * jetty-io is 9.4.38 * jersey-common is 2.31 * jetty-server is 9.4.38 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354586#comment-17354586 ] Bruno Cadonna commented on KAFKA-9168: -- [~sagarrao] and [~ableegoldman] I took the liberty to start the internal benchmark on the draft PR. > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: perfomance > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] omkreddy commented on pull request #10795: KAFKA-12866: Avoid root access to Zookeeper
omkreddy commented on pull request #10795: URL: https://github.com/apache/kafka/pull/10795#issuecomment-851613121 cc @rondagostino -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12869) Update vulnerable dependencies
[ https://issues.apache.org/jira/browse/KAFKA-12869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354604#comment-17354604 ] Ismael Juma commented on KAFKA-12869: - Thanks for the report. Have you checked if these have already been fixed in trunk? > Update vulnerable dependencies > -- > > Key: KAFKA-12869 > URL: https://issues.apache.org/jira/browse/KAFKA-12869 > Project: Kafka > Issue Type: Bug > Components: core, KafkaConnect >Affects Versions: 2.7.1 >Reporter: Pavel Kuznetsov >Priority: Major > Labels: security > > *Description* > I checked kafka_2.13-2.7.1.tgz distribution with WhiteSource and find out > that some libraries have vulnerabilities. > Here they are: > * jetty-io-9.4.38.v20210224.jar has CVE-2021-28165 vulnerability. The way to > fix it is to upgrade to org.eclipse.jetty:jetty-io:9.4.39 or > org.eclipse.jetty:jetty-io:10.0.2 or org.eclipse.jetty:jetty-io:11.0.2 > * jersey-common-2.31.jar has CVE-2021-28168 vulnerability. The way to fix it > is to upgrade to org.glassfish.jersey.core:jersey-common:2.34 > * jetty-server-9.4.38.v20210224.jar has CVE-2021-28164 vulnerability. The way > to fix it is to upgrade to org.eclipse.jetty:jetty-webapp:9.4.39 > *To Reproduce* > Download kafka_2.13-2.7.1.tgz and find jars, listed above. > Check that these jars with corresponding versions are mentioned in > corresponding vulnerability description. > *Expected* > * jetty-io upgraded to 9.4.39 or higher > * jersey-common upgraded to 2.34 or higher > * jetty-server upgraded to jetty-webapp:9.4.39 or higher > *Actual* > * jetty-io is 9.4.38 > * jersey-common is 2.31 > * jetty-server is 9.4.38 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma opened a new pull request #10800: MINOR: Update jmh for async profiler 2.0 support
ijuma opened a new pull request #10800: URL: https://github.com/apache/kafka/pull/10800 Async profiler 2.0 outputs html5 flame graph files and supports simultaneous collection of cpu, allocation and lock profiles in jfr format. Updated the readme to include an example of the latter and verified that the Readme commands work with async profiler 2.0. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10765: KAFKA-12519: Remove built-in Streams metrics for versions 0.10.0-2.4
cadonna commented on pull request #10765: URL: https://github.com/apache/kafka/pull/10765#issuecomment-851639965 @showuon and @guozhangwang Do you want to have a second look after my last update of the unit tests? Otherwise I would move on and merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10783: MINOR: Dependency updates for Scala libraries for improved Scala 3.0 support
ijuma commented on pull request #10783: URL: https://github.com/apache/kafka/pull/10783#issuecomment-851640073 JDK 8 and Scala 2.12 has two unrelated failures. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma merged pull request #10783: MINOR: Dependency updates for Scala libraries for improved Scala 3.0 support
ijuma merged pull request #10783: URL: https://github.com/apache/kafka/pull/10783 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10765: KAFKA-12519: Remove built-in Streams metrics for versions 0.10.0-2.4
cadonna commented on pull request #10765: URL: https://github.com/apache/kafka/pull/10765#issuecomment-851641644 Test failures are unrelated and known to be flaky: ``` JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics() JDK 8 and Scala 2.12 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics() JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions() JDK 15 and Scala 2.13 / kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic() ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10783: MINOR: Dependency updates for Scala libraries for improved Scala 3.0 support
jlprat commented on pull request #10783: URL: https://github.com/apache/kafka/pull/10783#issuecomment-851642412 Thanks a lot for the review @ijuma ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #10785: KIP-708 / A Rack awareness for Kafka Streams
cadonna commented on pull request #10785: URL: https://github.com/apache/kafka/pull/10785#issuecomment-851652903 @lkokhreidze Thank you for the PR! This is quite a large PR. Could you split it up into smaller PRs that are easier to review? One option could be to have a separate PR for: - standby task assignor - config changes - subscription info changes - ... Opening small PRs usually increases the review quality and decreases time to the first review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #10693: KAFKA-12625: Fix the NOTICE file
ableegoldman commented on pull request #10693: URL: https://github.com/apache/kafka/pull/10693#issuecomment-851732066 Hey @jlprat , can you elaborate on (a) the motivation behind adding this `NOTICE-binary` file (are we missing licenses completely? or we had them but not in the correct format? etc), (b) why you put this in a new `NOTICE-binary` file instead of the existing `NOTICE` file (do we need both? if we do add this `NOTICE-binary` then should we remove some or all of the `NOTICE` file contents?), and (c) how the `NOTICE-binary` file relates to the existing stuff under `licenses/` I'll extend the same disclaimer, make no assumption that I know what I'm talking about 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642743186 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -363,6 +386,22 @@ class BrokerToControllerRequestThread( } } + def maybeCheckNotControllerErrorInsideEnvelopeResponse(requestHeader: RequestHeader, responseBody: AbstractResponse): Boolean = { +if (responseBody.isInstanceOf[EnvelopeResponse] && requestHeader != null) { + info(s"Trying to find NOT_CONTROLLER exception inside envelope response") + val envelopeResponse = responseBody.asInstanceOf[EnvelopeResponse] + val envelopeError = envelopeResponse.error() + + if (envelopeError == Errors.NONE) { +val response = AbstractResponse.parseResponse(envelopeResponse.responseData, requestHeader) +envelopeResponse.responseData().rewind() +return response.errorCounts().containsKey(Errors.NOT_CONTROLLER) + } +} + +false; Review comment: Nice catch! Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642748185 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -363,6 +386,22 @@ class BrokerToControllerRequestThread( } } + def maybeCheckNotControllerErrorInsideEnvelopeResponse(requestHeader: RequestHeader, responseBody: AbstractResponse): Boolean = { Review comment: Yes, I tried, but it can't. The `activeController` is stored in `BrokerToControllerRequestThread`, which owned by `BrokerToControllerChannelManager`. So, only the `BrokerToControllerRequestThread#handleResponse` can update the activeController and put the reqeust into top of the queue again (retry). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354778#comment-17354778 ] Sagar Rao edited comment on KAFKA-9168 at 6/1/21, 3:42 AM: --- Thanks [~cadonna]! was (Author: sagarrao): Thanks [~cadonna]! would like to see the results. > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: perfomance > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354778#comment-17354778 ] Sagar Rao commented on KAFKA-9168: -- Thanks [~cadonna]! would like to see the results. > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: perfomance > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 merged pull request #10585: MINOR: cleanTest ought to remove output of unitTest task and integrat…
chia7712 merged pull request #10585: URL: https://github.com/apache/kafka/pull/10585 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #10791: MINOR: replace by org.junit.jupiter.api.Tag by net.jqwik.api.Tag for …
chia7712 commented on pull request #10791: URL: https://github.com/apache/kafka/pull/10791#issuecomment-851797315 > I was asking if our build has to change. Looks like it's string based, so it doesn't? Sorry for my incorrect response :( > Have we tested? yes. *BEFORE* https://user-images.githubusercontent.com/6234750/120266901-6e57c300-c2d5-11eb-9df2-90ebbc209bd8.png";> *AFTER* https://user-images.githubusercontent.com/6234750/120267072-cd1d3c80-c2d5-11eb-9802-46007dacdd5d.png";> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12869) Update vulnerable dependencies
[ https://issues.apache.org/jira/browse/KAFKA-12869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354800#comment-17354800 ] Dongjin Lee commented on KAFKA-12869: - [~ijuma] I checked the versions. All of them are already fixed in trunk, but not included in 2.7.1. > Update vulnerable dependencies > -- > > Key: KAFKA-12869 > URL: https://issues.apache.org/jira/browse/KAFKA-12869 > Project: Kafka > Issue Type: Bug > Components: core, KafkaConnect >Affects Versions: 2.7.1 >Reporter: Pavel Kuznetsov >Priority: Major > Labels: security > > *Description* > I checked kafka_2.13-2.7.1.tgz distribution with WhiteSource and find out > that some libraries have vulnerabilities. > Here they are: > * jetty-io-9.4.38.v20210224.jar has CVE-2021-28165 vulnerability. The way to > fix it is to upgrade to org.eclipse.jetty:jetty-io:9.4.39 or > org.eclipse.jetty:jetty-io:10.0.2 or org.eclipse.jetty:jetty-io:11.0.2 > * jersey-common-2.31.jar has CVE-2021-28168 vulnerability. The way to fix it > is to upgrade to org.glassfish.jersey.core:jersey-common:2.34 > * jetty-server-9.4.38.v20210224.jar has CVE-2021-28164 vulnerability. The way > to fix it is to upgrade to org.eclipse.jetty:jetty-webapp:9.4.39 > *To Reproduce* > Download kafka_2.13-2.7.1.tgz and find jars, listed above. > Check that these jars with corresponding versions are mentioned in > corresponding vulnerability description. > *Expected* > * jetty-io upgraded to 9.4.39 or higher > * jersey-common upgraded to 2.34 or higher > * jetty-server upgraded to jetty-webapp:9.4.39 or higher > *Actual* > * jetty-io is 9.4.38 > * jersey-common is 2.31 > * jetty-server is 9.4.38 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] chia7712 merged pull request #10791: MINOR: replace by org.junit.jupiter.api.Tag by net.jqwik.api.Tag for …
chia7712 merged pull request #10791: URL: https://github.com/apache/kafka/pull/10791 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #10176: KAFKA-12359: Update Jetty to 11
dongjinleekr commented on pull request #10176: URL: https://github.com/apache/kafka/pull/10176#issuecomment-851818497 For those who are interested in this issue, here is some context: As you can see in the table below (from [here](https://www.eclipse.org/jetty/)), the change between Jetty 9.4 and Jetty 11 is not only Servlet versions; it involves Servlet version, package namespace, API changes, and related dependencies. For this reason, Upgrading Jetty from 9.4 is never a simple task but a complicated one that requires lots of changes, tests, etc.  The problem is: **the current 9.4's End of Life seems not a too distant future**; For instance, [Jetty 9.2 reached the end of life in March 2018](https://www.eclipse.org/lists/jetty-announce/msg00116.html), and [9.3 also did in February 2020](https://www.eclipse.org/lists/jetty-announce/msg00140.html) (i.e., in two years). 9.3 also dropped Java 7 support from before its final release. Although the Jetty team does not mention it explicitly, the community seems to expect it to reach the EOL in 2022 when Jetty 10.x and 11.x becomes stable and widespread. If this thing happens, we should also migrate to Jetty 11, along with Java 11. I agree with @ijuma that it is too early to discuss this issue. However, I think we have enough reason to recognize this problem. **I will keep my eyes on Jetty 9.4's EOL issue and as soon as it becomes certain, I will reopen the PR.** +1. For those who hope to test Kafka Connect on Servlet 5: I will start providing a preview soon like [log4j2 appender](http://home.apache.org/~dongjin/post/apache-kafka-log4j2-support/). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat commented on pull request #10693: KAFKA-12625: Fix the NOTICE file
jlprat commented on pull request #10693: URL: https://github.com/apache/kafka/pull/10693#issuecomment-851831451 Thanks for looking at it @ableegoldman , I'll do my best explaining: a) I added a NOTICE-binary file following the same pattern done for the LICENSE patch. The purpose of the NOTICE-binary is to be included in the distribution file only. b) The content of our NOTICE is what Kafka declares, that I left untouched. The purpose of the NOTICE-binary is to collect all notices from all our dependencies and paste them there. c) I'm no expert at all, but I'd say the licenses folder was merely to collect all the different texts of licences so we could reference them in our LICENSE-binary (I think licenses say their text needs to be included) I looked at https://github.com/apache/hadoop and attempted to use the same approach. On short, NOTICE contains Kafka's notices while NOTICE-binary contains Kafka's and its dependencies' notices so it can be packaged in our distribution files. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642195443 ## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java ## @@ -36,10 +37,16 @@ private final String message; public static ApiError fromThrowable(Throwable t) { +Throwable throwableToBeDecode = t; +// Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR. +if (t instanceof CompletionException) { +throwableToBeDecode = t.getCause(); Review comment: fix 1: unwrap the `CompletionException` to get the original exception inside. ## File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java ## @@ -36,10 +37,16 @@ private final String message; public static ApiError fromThrowable(Throwable t) { +Throwable throwableToBeEncode = t; +// Future will wrap the original exception with completionException, which will return unexpected UNKNOWN_SERVER_ERROR. +if (t instanceof CompletionException) { +throwableToBeEncode = t.getCause(); Review comment: fix 1: unwrap the CompletionException to get the original exception inside. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on a change in pull request #10794: URL: https://github.com/apache/kafka/pull/10794#discussion_r642803795 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -239,18 +244,34 @@ class BrokerToControllerChannelManagerImpl( * * @param request The request to be sent. * @param callbackRequest completion callback. + * @param requestHeader The request header to be sent, used for parsing the envelop response */ def sendRequest( request: AbstractRequest.Builder[_ <: AbstractRequest], -callback: ControllerRequestCompletionHandler +callback: ControllerRequestCompletionHandler, +requestHeader: RequestHeader Review comment: add a `requestHeader` parameter to do envelope response parsing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on pull request #10794: URL: https://github.com/apache/kafka/pull/10794#issuecomment-851845194 Jenkins PR build results proved the flaky tests doesn't fail anymore: `#4`: 1 failed test: ``` Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStop ``` `#5`: all tests passed `#6`: all tests passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon edited a comment on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon edited a comment on pull request #10794: URL: https://github.com/apache/kafka/pull/10794#issuecomment-851845194 Jenkins PR build results proved the `RaftClusterTest` tests doesn't fail anymore: `#4`: 1 failed test: ``` Build / JDK 11 and Scala 2.13 / org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStop ``` `#5`: all tests passed `#6`: all tests passed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed
showuon commented on pull request #10794: URL: https://github.com/apache/kafka/pull/10794#issuecomment-851846366 @dengziming , thanks for the comments. I've updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.
satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r642095314 ## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ## @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME; + +/** + * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}) + * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove + * for what topic partition's metadata should be consumed by this instance using + * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively. + * + * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the + * partitions to be deleted. This class receives those notifications with + * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the + * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}. + * Any leadership changes later are called through the same API. We will remove the partitions that ard deleted from + * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}. + * + * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)}, + * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}. + */ +class ConsumerTask implements Runnable, Closeable { +private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); + +private static final long POLL_INTERVAL_MS = 30L; + +private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); +private final KafkaConsumer consumer; +private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; +private final RemoteLogMetadataTopicPartitioner topicPartitioner; + +private volatile boolean close = false; +private volatile boolean assignPartitions = false; + +private final Object assignPartitionsLock = new Object(); + +// Remote log metadata topic partitions that consumer is assigned to. +private volatile Set assignedMetaPartitions = Collections.emptySet(); + +// User topic partitions that this broker is a leader/follower for. +private Set assignedTopicPartitions = Collections.emptySet(); + +// Map of remote log metadata topic partition to target end offsets to be consumed. +private final Map partitionToTargetEndOffsets = new ConcurrentHashMap<>(); + +// Map of remote log metadata topic partition to consumed offsets. +private final Map partitionToConsumedOffsets = new ConcurrentHashMap<>(); + +public ConsumerTask(KafkaConsumer consumer, +RemotePartitionMetadataEventHandler
[GitHub] [kafka] lkokhreidze commented on pull request #10785: KIP-708 / A Rack awareness for Kafka Streams
lkokhreidze commented on pull request #10785: URL: https://github.com/apache/kafka/pull/10785#issuecomment-851871480 Hi @cadonna Thanks for the reply and suggestion. Will it be okay to have smaller PRs merged into trunk without having full functionality in place? Afair, when I worked on https://github.com/apache/kafka/pull/7170, it was one of the motivations to have full functionality in trunk in one go. If having partial implementation in trunk is OK, I can split this PR into smaller chunks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org