[GitHub] [kafka] dongjinleekr closed pull request #10642: KAFKA-12756: Update Zookeeper to 3.6.3 or higher

2021-05-31 Thread GitBox


dongjinleekr closed pull request #10642:
URL: https://github.com/apache/kafka/pull/10642


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #10678: TRIVIAL: Fix type inconsistencies, unthrown exceptions, etc

2021-05-31 Thread GitBox


dongjinleekr commented on pull request #10678:
URL: https://github.com/apache/kafka/pull/10678#issuecomment-851263147


   Rebased onto the latest trunk, along with additional glitches I found during 
other issues.
   
   @chia7712 @kowshik @ijuma Could you have a look? :pray:


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10791: MINOR: replace by org.junit.jupiter.api.Tag by net.jqwik.api.Tag for …

2021-05-31 Thread GitBox


chia7712 commented on pull request #10791:
URL: https://github.com/apache/kafka/pull/10791#issuecomment-851271639


   > The build definition doesn't require changes to mention this annotation?
   
   The doc (https://jqwik.net/docs/current/user-guide.html#tagging-tests) 
explains how to tag a test but it does not mention the package of this 
annotation. I filed a PR for that (https://github.com/jlink/jqwik/pull/193)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #10796: HOTFIX: fix build error

2021-05-31 Thread GitBox


chia7712 opened a new pull request #10796:
URL: https://github.com/apache/kafka/pull/10796


   see CI 
(https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-10585/5/pipeline/12/)
 and the error is related to 6b005b2b4eece81a5500fb0080ef5354b4240681
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12867) Trogdor ConsumeBenchWorker quits prematurely with maxMessages config

2021-05-31 Thread Kowshik Prakasam (Jira)
Kowshik Prakasam created KAFKA-12867:


 Summary: Trogdor ConsumeBenchWorker quits prematurely with 
maxMessages config
 Key: KAFKA-12867
 URL: https://issues.apache.org/jira/browse/KAFKA-12867
 Project: Kafka
  Issue Type: Bug
Reporter: Kowshik Prakasam


The trogdor 
[ConsumeBenchWorker|https://github.com/apache/kafka/commits/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java]
 has a bug. If one of the consumption tasks completes executing successfully 
due to [maxMessages being 
consumed|https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L245],
 then, the consumption task [notifies the 
doneFuture|https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L285]
 causing the ConsumeBenchWorker to halt. This becomes a problem when more than 
1 consumption task is running in parallel, because the successful completion of 
1 of the tasks shuts down the entire worker while the other tasks are still 
running. When the worker is shut down, it 
[kills|https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L482]
 all the active consumption tasks, which is not the desired behavior.

The fix is to not notify the doneFuture when 1 of the consumption tasks 
complete without error. Instead, we should defer the notification to the 
[CloseStatusUpdater|https://github.com/apache/kafka/blob/fc405d792de12a50956195827eaf57bbf6c9/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConsumeBenchWorker.java#L299]
 thread.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik opened a new pull request #10797: KAFKA-12867: Fix ConsumeBenchWorker exit behavior for maxMessages config

2021-05-31 Thread GitBox


kowshik opened a new pull request #10797:
URL: https://github.com/apache/kafka/pull/10797


   The trogdor `ConsumeBenchWorker` has a bug. If one of the consumption tasks 
completes executing successfully due to `maxMessages` being consumed, then, the 
consumption task notifies the `doneFuture` causing the entire  
`ConsumeBenchWorker` to halt. This becomes a problem when more than 1 
consumption task is running in parallel, because the successful completion of 1 
of the tasks shuts down the entire worker while the other tasks are still 
running. When the worker is shut down, it kills all the active consumption 
tasks, which is not the desired behavior.
   
   The fix is to not notify the `doneFuture` when 1 of the consumption tasks 
complete without error. Instead, we should defer the notification to the 
`CloseStatusUpdater` thread.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #10797: KAFKA-12867: Fix ConsumeBenchWorker exit behavior for maxMessages config

2021-05-31 Thread GitBox


kowshik commented on pull request #10797:
URL: https://github.com/apache/kafka/pull/10797#issuecomment-851293196


   cc @junrao @apovzner @rajinisivaram for review.
   
   @apovzner It appears this behavior has been around since 
`ConsumeBenchWorker` was first implemented: 
https://github.com/apache/kafka/pull/4775. Please let me know your thoughts.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12868) log cleaner failed with java.lang.ArrayIndexOutOfBoundsException: -2147483648

2021-05-31 Thread Yuneng Xie (Jira)
Yuneng Xie created KAFKA-12868:
--

 Summary: log cleaner failed with 
java.lang.ArrayIndexOutOfBoundsException: -2147483648
 Key: KAFKA-12868
 URL: https://issues.apache.org/jira/browse/KAFKA-12868
 Project: Kafka
  Issue Type: Bug
  Components: log cleaner
Affects Versions: 2.4.0
Reporter: Yuneng Xie


our broker spent too long loading the offset partition.

```

[2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished 
loading offsets and group metadata from __consumer_offsets-47 in 1236029 
milliseconds. (kafka.coordinator.group.GroupMetadataManager)

```

 

so i checked the partition data , it's too big 

```

106G /var/lib/kafka/data/__consumer_offsets-47

```

 

the retention time is 7 days

```

log.retention.hours=168

```

 

and i found error log about log cleaner:

```

[2021-04-10 00:28:24,507] INFO Cleaner 0: Beginning cleaning of log 
__consumer_offsets-47. (kafka.log.LogCleaner)

[2021-04-10 00:28:24,507] INFO Cleaner 0: Building offset map for 
__consumer_offsets-47... (kafka.log.LogCleaner)

[2021-04-10 00:28:24,526] INFO Cleaner 0: Building offset map for log 
__consumer_offsets-47 for 17 segments in offset range [1010757586, 1020967392). 
(kafka.log.LogCleaner)

[2021-04-10 00:29:22,669] WARN [kafka-log-cleaner-thread-0]: Unexpected 
exception thrown when cleaning log 
Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, 
partition=47, highWatermark=1021411007, lastStableOffset=1021411007, 
logStartOffset=0, logEndOffset=1021411007). Marking its partition 
(__consumer_offsets-47) as uncleanable (kafka.log.LogCleaner)

kafka.log.LogCleaningException: -2147483648

        at 
kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:348)

        at 
kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:324)

        at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:313)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

Caused by: java.lang.ArrayIndexOutOfBoundsException: -2147483648

        at kafka.utils.CoreUtils$.readInt(CoreUtils.scala:241)

        at kafka.log.SkimpyOffsetMap.positionOf(OffsetMap.scala:183)

        at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:101)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2(LogCleaner.scala:947)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2$adapted(LogCleaner.scala:944)

        at scala.collection.Iterator.foreach(Iterator.scala:941)

        at scala.collection.Iterator.foreach$(Iterator.scala:941)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

        at scala.collection.IterableLike.foreach(IterableLike.scala:74)

        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1(LogCleaner.scala:944)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1$adapted(LogCleaner.scala:933)

        at scala.collection.Iterator.foreach(Iterator.scala:941)

        at scala.collection.Iterator.foreach$(Iterator.scala:941)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

        at scala.collection.IterableLike.foreach(IterableLike.scala:74)

        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

        at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:933)

        at kafka.log.Cleaner.$anonfun$buildOffsetMap$3(LogCleaner.scala:894)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMap$3$adapted(LogCleaner.scala:890)

        at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)

        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)

        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)

        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)

        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)

        at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:890)

        at kafka.log.Cleaner.doClean(LogCleaner.scala:514)

        at kafka.log.Cleaner.clean(LogCleaner.scala:502)

        at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:371)

        at 
kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:344)

        ... 3 more

```

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12868) log cleaner failed with java.lang.ArrayIndexOutOfBoundsException: -2147483648

2021-05-31 Thread Yuneng Xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuneng Xie updated KAFKA-12868:
---
Description: 
our broker spent too long loading the offset partition.

```

[2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished 
loading offsets and group metadata from __consumer_offsets-47 in 1236029 
milliseconds. (kafka.coordinator.group.GroupMetadataManager)

```

 

so i checked the partition data , it's too big 

```

106G /var/lib/kafka/data/__consumer_offsets-47

```

 

the retention time is 7 days

```

log.retention.hours=168

```

 

and i found error log about log cleaner:

```

[2021-04-10 00:28:24,507] INFO Cleaner 0: Beginning cleaning of log 
__consumer_offsets-47. (kafka.log.LogCleaner)

[2021-04-10 00:28:24,507] INFO Cleaner 0: Building offset map for 
__consumer_offsets-47... (kafka.log.LogCleaner)

[2021-04-10 00:28:24,526] INFO Cleaner 0: Building offset map for log 
__consumer_offsets-47 for 17 segments in offset range [1010757586, 1020967392). 
(kafka.log.LogCleaner)

[2021-04-10 00:29:22,669] WARN [kafka-log-cleaner-thread-0]: Unexpected 
exception thrown when cleaning log 
Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, 
partition=47, highWatermark=1021411007, lastStableOffset=1021411007, 
logStartOffset=0, logEndOffset=1021411007). Marking its partition 
(__consumer_offsets-47) as uncleanable (kafka.log.LogCleaner)

kafka.log.LogCleaningException: -2147483648

        at 
kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:348)

        at 
kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:324)

        at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:313)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

Caused by: java.lang.ArrayIndexOutOfBoundsException: -2147483648

        at kafka.utils.CoreUtils$.readInt(CoreUtils.scala:241)

        at kafka.log.SkimpyOffsetMap.positionOf(OffsetMap.scala:183)

        at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:101)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2(LogCleaner.scala:947)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2$adapted(LogCleaner.scala:944)

        at scala.collection.Iterator.foreach(Iterator.scala:941)

        at scala.collection.Iterator.foreach$(Iterator.scala:941)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

        at scala.collection.IterableLike.foreach(IterableLike.scala:74)

        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1(LogCleaner.scala:944)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1$adapted(LogCleaner.scala:933)

        at scala.collection.Iterator.foreach(Iterator.scala:941)

        at scala.collection.Iterator.foreach$(Iterator.scala:941)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

        at scala.collection.IterableLike.foreach(IterableLike.scala:74)

        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

        at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:933)

        at kafka.log.Cleaner.$anonfun$buildOffsetMap$3(LogCleaner.scala:894)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMap$3$adapted(LogCleaner.scala:890)

        at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)

        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)

        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)

        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)

        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)

        at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:890)

        at kafka.log.Cleaner.doClean(LogCleaner.scala:514)

        at kafka.log.Cleaner.clean(LogCleaner.scala:502)

        at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:371)

        at 
kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:344)

        ... 3 more

```

 

any suggestion on this? 

 

  was:
our broker spent too long loading the offset partition.

```

[2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished 
loading offsets and group metadata from __consumer_offsets-47 in 1236029 
milliseconds. (kafka.coordinator.group.GroupMetadataManager)

```

 

so i checked the partition data , it's too big 

```

106G /var/lib/kafka/data/__consumer_offsets-47

```

 

the retention time is 7 days

```

log.retention.hours=168

```

 

and i found error log about log cleaner:

```

[2021-04-

[jira] [Updated] (KAFKA-12868) log cleaner failed with java.lang.ArrayIndexOutOfBoundsException: -2147483648

2021-05-31 Thread Yuneng Xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuneng Xie updated KAFKA-12868:
---
Description: 
our broker spent too long loading the offset partition.

```

[2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished 
loading offsets and group metadata from __consumer_offsets-47 in 1236029 
milliseconds. (kafka.coordinator.group.GroupMetadataManager)

```

 

so i checked the partition data , it's too big 

```

106G /var/lib/kafka/data/__consumer_offsets-47

```

 

the retention time is 7 days

```

log.retention.hours=168

```

 

and i found error log about log cleaner:

```

[2021-04-10 00:28:24,507] INFO Cleaner 0: Beginning cleaning of log 
__consumer_offsets-47. (kafka.log.LogCleaner)

[2021-04-10 00:28:24,507] INFO Cleaner 0: Building offset map for 
__consumer_offsets-47... (kafka.log.LogCleaner)

[2021-04-10 00:28:24,526] INFO Cleaner 0: Building offset map for log 
__consumer_offsets-47 for 17 segments in offset range [1010757586, 1020967392). 
(kafka.log.LogCleaner)

[2021-04-10 00:29:22,669] WARN [kafka-log-cleaner-thread-0]: Unexpected 
exception thrown when cleaning log 
Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, 
partition=47, highWatermark=1021411007, lastStableOffset=1021411007, 
logStartOffset=0, logEndOffset=1021411007). Marking its partition 
(__consumer_offsets-47) as uncleanable (kafka.log.LogCleaner)

kafka.log.LogCleaningException: -2147483648

        at 
kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:348)

        at 
kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:324)

        at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:313)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

Caused by: java.lang.ArrayIndexOutOfBoundsException: -2147483648

        at kafka.utils.CoreUtils$.readInt(CoreUtils.scala:241)

        at kafka.log.SkimpyOffsetMap.positionOf(OffsetMap.scala:183)

        at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:101)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2(LogCleaner.scala:947)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2$adapted(LogCleaner.scala:944)

        at scala.collection.Iterator.foreach(Iterator.scala:941)

        at scala.collection.Iterator.foreach$(Iterator.scala:941)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

        at scala.collection.IterableLike.foreach(IterableLike.scala:74)

        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1(LogCleaner.scala:944)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1$adapted(LogCleaner.scala:933)

        at scala.collection.Iterator.foreach(Iterator.scala:941)

        at scala.collection.Iterator.foreach$(Iterator.scala:941)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

        at scala.collection.IterableLike.foreach(IterableLike.scala:74)

        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

        at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:933)

        at kafka.log.Cleaner.$anonfun$buildOffsetMap$3(LogCleaner.scala:894)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMap$3$adapted(LogCleaner.scala:890)

        at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)

        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)

        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)

        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)

        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)

        at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:890)

        at kafka.log.Cleaner.doClean(LogCleaner.scala:514)

        at kafka.log.Cleaner.clean(LogCleaner.scala:502)

        at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:371)

        at 
kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:344)

        ... 3 more

```

 

seems like the cleaner failed to compact  __consumer_offsets-47

any suggestion on this? 

 

  was:
our broker spent too long loading the offset partition.

```

[2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished 
loading offsets and group metadata from __consumer_offsets-47 in 1236029 
milliseconds. (kafka.coordinator.group.GroupMetadataManager)

```

 

so i checked the partition data , it's too big 

```

106G /var/lib/kafka/data/__consumer_offsets-47

```

 

the retention time is 7 days

```

log.retention.hours=168

[jira] [Updated] (KAFKA-12868) log cleaner failed with java.lang.ArrayIndexOutOfBoundsException: -2147483648

2021-05-31 Thread Yuneng Xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuneng Xie updated KAFKA-12868:
---
Description: 
our broker spent too long loading the offset partition.

```

[2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished 
loading offsets and group metadata from __consumer_offsets-47 in 1236029 
milliseconds. (kafka.coordinator.group.GroupMetadataManager)

```

 

so i checked the partition data , it's too big 

```

106G /var/lib/kafka/data/__consumer_offsets-47

```

 

the retention time is 7 days

```

log.retention.hours=168

```

but i found many segments data that's supposed to be cleaned in 
/var/lib/kafka/data/__consumer_offsets-47. 

 

and i found error log about log cleaner:

```

[2021-04-10 00:28:24,507] INFO Cleaner 0: Beginning cleaning of log 
__consumer_offsets-47. (kafka.log.LogCleaner)

[2021-04-10 00:28:24,507] INFO Cleaner 0: Building offset map for 
__consumer_offsets-47... (kafka.log.LogCleaner)

[2021-04-10 00:28:24,526] INFO Cleaner 0: Building offset map for log 
__consumer_offsets-47 for 17 segments in offset range [1010757586, 1020967392). 
(kafka.log.LogCleaner)

[2021-04-10 00:29:22,669] WARN [kafka-log-cleaner-thread-0]: Unexpected 
exception thrown when cleaning log 
Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, 
partition=47, highWatermark=1021411007, lastStableOffset=1021411007, 
logStartOffset=0, logEndOffset=1021411007). Marking its partition 
(__consumer_offsets-47) as uncleanable (kafka.log.LogCleaner)

kafka.log.LogCleaningException: -2147483648

        at 
kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:348)

        at 
kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:324)

        at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:313)

        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)

Caused by: java.lang.ArrayIndexOutOfBoundsException: -2147483648

        at kafka.utils.CoreUtils$.readInt(CoreUtils.scala:241)

        at kafka.log.SkimpyOffsetMap.positionOf(OffsetMap.scala:183)

        at kafka.log.SkimpyOffsetMap.put(OffsetMap.scala:101)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2(LogCleaner.scala:947)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$2$adapted(LogCleaner.scala:944)

        at scala.collection.Iterator.foreach(Iterator.scala:941)

        at scala.collection.Iterator.foreach$(Iterator.scala:941)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

        at scala.collection.IterableLike.foreach(IterableLike.scala:74)

        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1(LogCleaner.scala:944)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMapForSegment$1$adapted(LogCleaner.scala:933)

        at scala.collection.Iterator.foreach(Iterator.scala:941)

        at scala.collection.Iterator.foreach$(Iterator.scala:941)

        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)

        at scala.collection.IterableLike.foreach(IterableLike.scala:74)

        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)

        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)

        at kafka.log.Cleaner.buildOffsetMapForSegment(LogCleaner.scala:933)

        at kafka.log.Cleaner.$anonfun$buildOffsetMap$3(LogCleaner.scala:894)

        at 
kafka.log.Cleaner.$anonfun$buildOffsetMap$3$adapted(LogCleaner.scala:890)

        at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)

        at 
scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)

        at 
scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)

        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)

        at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)

        at kafka.log.Cleaner.buildOffsetMap(LogCleaner.scala:890)

        at kafka.log.Cleaner.doClean(LogCleaner.scala:514)

        at kafka.log.Cleaner.clean(LogCleaner.scala:502)

        at kafka.log.LogCleaner$CleanerThread.cleanLog(LogCleaner.scala:371)

        at 
kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:344)

        ... 3 more

```

 

seems like the cleaner failed to compact  __consumer_offsets-47

any suggestion on this? 

 

  was:
our broker spent too long loading the offset partition.

```

[2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished 
loading offsets and group metadata from __consumer_offsets-47 in 1236029 
milliseconds. (kafka.coordinator.group.GroupMetadataManager)

```

 

so i checked the partition data , it's too big 

```

106G 

[jira] [Updated] (KAFKA-12868) log cleaner failed with java.lang.ArrayIndexOutOfBoundsException: -2147483648

2021-05-31 Thread Yuneng Xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12868?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuneng Xie updated KAFKA-12868:
---
Description: 
our broker spent too long loading the offset partition.

```

[2021-05-30 03:18:20,505] INFO [GroupMetadataManager brokerId=2] Finished 
loading offsets and group metadata from __consumer_offsets-47 in 1236029 
milliseconds. (kafka.coordinator.group.GroupMetadataManager)

```

 

so i checked the partition data , it's too big 

```

106G /var/lib/kafka/data/__consumer_offsets-47

```

 

the retention time is 7 days

```

log.retention.hours=168

```

but i found many segments data that's supposed to be cleaned in 
/var/lib/kafka/data/__consumer_offsets-47. 

 

and i found error log about log cleaner:

```

 

[2021-04-09 00:41:48,327] INFO Cleaner 0: Beginning cleaning of log 
__consumer_offsets-47. (kafka.log.LogCleaner)
[2021-04-09 00:41:48,327] INFO Cleaner 0: Building offset map for 
__consumer_offsets-47... (kafka.log.LogCleaner)
[2021-04-09 00:41:48,353] INFO Cleaner 0: Building offset map for log 
__consumer_offsets-47 for 1 segments in offset range [1010155298, 1010757586). 
(kafka.log.LogCleaner)
[2021-04-09 13:02:09,425] INFO Cleaner 0: Offset map for log 
__consumer_offsets-47 complete. (kafka.log.LogCleaner)
[2021-04-09 13:02:09,426] INFO Cleaner 0: Cleaning log __consumer_offsets-47 
(cleaning prior to Fri Apr 09 00:41:34 SRET 2021, discarding tombstones prior 
to Wed Apr 07 23:19:50 SRET 2021)... (kafka.log.LogCleaner)
[2021-04-09 13:02:09,427] INFO Cleaner 0: Cleaning LogSegment(baseOffset=0, 
size=51681, lastModifiedTime=1617890796000, largestTime=1617890765851) in log 
__consumer_offsets-47 into 0 with deletion horizon 161780879, retaining 
deletes. (kafka.log.LogCleaner)
[2021-04-09 13:02:27,573] INFO Cleaner 0: Cleaning 
LogSegment(baseOffset=100989, size=65218, lastModifiedTime=161789519, 
largestTime=1617895190754) in log __consumer_offsets-47 into 0 with deletion 
horizon 161780879, retaining deletes. (kafka.log.LogCleaner)
[2021-04-09 13:02:52,991] INFO Cleaner 0: Swapping in cleaned segment 
LogSegment(baseOffset=0, size=53871, lastModifiedTime=161789519, 
largestTime=1617894932233) for segment(s) List(LogSegment(baseOffset=0, 
size=51681, lastModifiedTime=1617890796000, largestTime=1617890765851), 
LogSegment(baseOffset=100989, size=65218, lastModifiedTime=161789519, 
largestTime=1617895190754)) in log 
Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, 
partition=47, highWatermark=1015632787, lastStableOffset=1015632787, 
logStartOffset=0, logEndOffset=1015632789) (kafka.log.LogCleaner)
[2021-04-09 13:02:52,992] INFO Cleaner 0: Cleaning 
LogSegment(baseOffset=1010155298, size=104857448, 
lastModifiedTime=1617900094000, largestTime=1617900094618) in log 
__consumer_offsets-47 into 1010155298 with deletion horizon 161780879, 
retaining deletes. (kafka.log.LogCleaner)
[2021-04-10 00:28:24,476] INFO Cleaner 0: Swapping in cleaned segment 
LogSegment(baseOffset=1010155298, size=63028, lastModifiedTime=1617900094000, 
largestTime=1617900094618) for segment(s) 
List(LogSegment(baseOffset=1010155298, size=104857448, 
lastModifiedTime=1617900094000, largestTime=1617900094618)) in log 
Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, 
partition=47, highWatermark=1021404662, lastStableOffset=1021404662, 
logStartOffset=0, logEndOffset=1021404663) (kafka.log.LogCleaner)
[2021-04-10 00:28:24,492] INFO [kafka-log-cleaner-thread-0]:
 Log cleaner thread 0 cleaned log __consumer_offsets-47 (dirty section = 
[1010155298, 1010155298])
 100.1 MB of log processed in 85,596.2 seconds (0.0 MB/sec).
 Indexed 100.0 MB in 44421.1 seconds (0.0 Mb/sec, 51.9% of total time)
 Buffer utilization: 0.0%
 Cleaned 100.1 MB in 41175.1 seconds (0.0 Mb/sec, 48.1% of total time)
 Start size: 100.1 MB (602,913 messages)
 End size: 0.1 MB (625 messages)
 99.9% size reduction (99.9% fewer messages)
 (kafka.log.LogCleaner)
[2021-04-10 00:28:24,507] INFO Cleaner 0: Beginning cleaning of log 
__consumer_offsets-47. (kafka.log.LogCleaner)
[2021-04-10 00:28:24,507] INFO Cleaner 0: Building offset map for 
__consumer_offsets-47... (kafka.log.LogCleaner)
[2021-04-10 00:28:24,526] INFO Cleaner 0: Building offset map for log 
__consumer_offsets-47 for 17 segments in offset range [1010757586, 1020967392). 
(kafka.log.LogCleaner)
[2021-04-10 00:29:22,669] WARN [kafka-log-cleaner-thread-0]: Unexpected 
exception thrown when cleaning log 
Log(dir=/var/lib/kafka/data/__consumer_offsets-47, topic=__consumer_offsets, 
partition=47, highWatermark=1021411007, lastStableOffset=1021411007, 
logStartOffset=0, logEndOffset=1021411007). Marking its partition 
(__consumer_offsets-47) as uncleanable (kafka.log.LogCleaner)
kafka.log.LogCleaningException: -2147483648
 at kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:348)
 at 
kafk

[jira] [Commented] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes

2021-05-31 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354293#comment-17354293
 ] 

Chia-Ping Tsai commented on KAFKA-12847:


Could you try to use non-root to run kafka tests? IIRC, Not only system tests 
but also UT/IT expect to be run by non-root. 

 

>  Also, I wonder whether or not upstream Dockerfile & System tests are part of 
>CI/CD and get tested for every PR. If so, this issue should have been caught.

We don't run system tests for each PR since it is too expensive. Instead, we 
make sure all system tests pass before release.

> Dockerfile needed for kafka system tests needs changes
> --
>
> Key: KAFKA-12847
> URL: https://issues.apache.org/jira/browse/KAFKA-12847
> Project: Kafka
>  Issue Type: Bug
>  Components: system tests
>Affects Versions: 2.8.0, 2.7.1
> Environment: Issue tested in environments below but is independent of 
> h/w arch. or Linux flavor: -
> 1.) RHEL-8.3 on x86_64 
> 2.) RHEL-8.3 on IBM Power (ppc64le)
> 3.) apache/kafka branch tested: trunk (master)
>Reporter: Abhijit Mane
>Assignee: Abhijit Mane
>Priority: Major
>  Labels: easyfix
> Attachments: Dockerfile.upstream
>
>
> Hello,
> I tried apache/kafka system tests as per documentation: -
> ([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_])
> =
>  PROBLEM
>  ~~
> 1.) As root user, clone kafka github repo and start "kafka system tests"
>  # git clone [https://github.com/apache/kafka.git]
>  # cd kafka
>  # ./gradlew clean systemTestLibs
>  # bash tests/docker/run_tests.sh
> 2.) Dockerfile issue - 
> [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile]
> This file has an *UID* entry as shown below: -
>  ---
>  ARG *UID*="1000"
>  RUN useradd -u $*UID* ducker
> // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not 
> unique, root user id is 0
>  ---
>  I ran everything as root which means the built-in bash environment variable 
> 'UID' always
> resolves to 0 and can't be changed. Hence, the docker build fails. The issue 
> should be seen even if run as non-root.
> 3.) Next, as root, as per README, I ran: -
> server:/kafka> *bash tests/docker/run_tests.sh*
> The ducker tool builds the container images & switches to user '*ducker*' 
> inside the container
> & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the 
> container.
> Ref: 
> [https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak]
> Ex:  docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* 
> This fails as the 'ducker' user has *no write permissions* to create files 
> under 'kafka' root dir. Hence, it needs to be made writeable.
> // *chmod -R a+w kafka* 
>  – needed as container is run as 'ducker' and needs write access since kafka 
> root volume from host is mapped to container as "/opt/kafka-dev" where the 
> 'ducker' user writes logs
>  =
> =
>  *FIXES needed*
>  ~
>  1.) Dockerfile - 
> [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile]
>  Change 'UID' to '*UID_DUCKER*'.
> This won't conflict with built in bash env. var UID and the docker image 
> build should succeed.
>  ---
>  ARG *UID_DUCKER*="1000"
>  RUN useradd -u $*UID_DUCKER* ducker
> // *{color:#57d9a3}No Error{color}* => No conflict with built-in UID
>  ---
> 2.) README needs an update where we must ensure the kafka root dir from where 
> the tests 
>  are launched is writeable to allow the 'ducker' user to create results/logs.
>  # chmod -R a+w kafka
> With this, I was able to get the docker images built and system tests started 
> successfully.
>  =
> Also, I wonder whether or not upstream Dockerfile & System tests are part of 
> CI/CD and get tested for every PR. If so, this issue should have been caught.
>  
> *Question to kafka SME*
>  -
>  Do you believe this is a valid problem with the Dockerfile and the fix is 
> acceptable? 
>  Please let me know and I am happy to submit a PR with this fix.
> Thanks,
>  Abhijit



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on pull request #10702: MINOR: Switch Jenkins from JDK 15 to JDK 16

2021-05-31 Thread GitBox


jlprat commented on pull request #10702:
URL: https://github.com/apache/kafka/pull/10702#issuecomment-851334730


   For reference, there is an issue in PowerMock for this: 
https://github.com/powermock/powermock/issues/1099


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10796: HOTFIX: fix build error

2021-05-31 Thread GitBox


cadonna commented on pull request #10796:
URL: https://github.com/apache/kafka/pull/10796#issuecomment-851444665


   The test failures are unrelated and known to be flaky.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-851446115


   Tests added. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna merged pull request #10796: HOTFIX: fix build error

2021-05-31 Thread GitBox


cadonna merged pull request #10796:
URL: https://github.com/apache/kafka/pull/10796


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642461619



##
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##
@@ -198,15 +199,15 @@ class DefaultAutoTopicCreationManager(
 
   // Borrow client information such as client id and correlation id from 
the original request,
   // in order to correlate the create request with the original metadata 
request.
-  val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+  requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
 requestVersion,
 context.clientId,
 context.correlationId)
   ForwardingManager.buildEnvelopeRequest(context,
 
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
 }.getOrElse(createTopicsRequest)
 
-channelManager.sendRequest(request, requestCompletionHandler)
+channelManager.sendRequest(request, requestCompletionHandler, 
requestHeader)

Review comment:
   This is 1 of 2 places we use `EnvelopeRequest`, so add `requestHeader`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642461619



##
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##
@@ -198,15 +199,15 @@ class DefaultAutoTopicCreationManager(
 
   // Borrow client information such as client id and correlation id from 
the original request,
   // in order to correlate the create request with the original metadata 
request.
-  val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+  requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
 requestVersion,
 context.clientId,
 context.correlationId)
   ForwardingManager.buildEnvelopeRequest(context,
 
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
 }.getOrElse(createTopicsRequest)
 
-channelManager.sendRequest(request, requestCompletionHandler)
+channelManager.sendRequest(request, requestCompletionHandler, 
requestHeader)

Review comment:
   This is 1st of 2 places we use `EnvelopeRequest`, so add `requestHeader`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642462291



##
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##
@@ -125,7 +125,7 @@ class ForwardingManagerImpl(
   }
 }
 
-channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler)
+channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler, 
request.header)

Review comment:
   This is 2nd of 2 places we use `EnvelopeRequest`, add `requestHeader`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642461619



##
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##
@@ -198,15 +199,15 @@ class DefaultAutoTopicCreationManager(
 
   // Borrow client information such as client id and correlation id from 
the original request,
   // in order to correlate the create request with the original metadata 
request.
-  val requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
+  requestHeader = new RequestHeader(ApiKeys.CREATE_TOPICS,
 requestVersion,
 context.clientId,
 context.correlationId)
   ForwardingManager.buildEnvelopeRequest(context,
 
createTopicsRequest.build(requestVersion).serializeWithHeader(requestHeader))
 }.getOrElse(createTopicsRequest)
 
-channelManager.sendRequest(request, requestCompletionHandler)
+channelManager.sendRequest(request, requestCompletionHandler, 
requestHeader)

Review comment:
   This is the 1st of 2 places we use `EnvelopeRequest`, so add 
`requestHeader`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642462291



##
File path: core/src/main/scala/kafka/server/ForwardingManager.scala
##
@@ -125,7 +125,7 @@ class ForwardingManagerImpl(
   }
 }
 
-channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler)
+channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler, 
request.header)

Review comment:
   This is the 2nd of 2 places we use `EnvelopeRequest`, add 
`requestHeader`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lkokhreidze commented on a change in pull request #10785: KIP-708 / A Rack awareness for Kafka Streams

2021-05-31 Thread GitBox


lkokhreidze commented on a change in pull request #10785:
URL: https://github.com/apache/kafka/pull/10785#discussion_r642467096



##
File path: clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
##
@@ -33,8 +31,10 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.function.BiConsumer;
+import java.util.function.Function;

Review comment:
   Sorry for the un-related changes :( happy to revert if it adds too much 
noise during the review.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10507: KAFKA-8410: Migrating stateful operators to new Processor API

2021-05-31 Thread GitBox


cadonna commented on a change in pull request #10507:
URL: https://github.com/apache/kafka/pull/10507#discussion_r642403697



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
##
@@ -32,7 +32,7 @@
 private static final Logger log = 
LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);
 
 @Override
-public DeserializationHandlerResponse handle(final ProcessorContext 
context,
+public DeserializationHandlerResponse handle(final ProcessorContext 
context,

Review comment:
   Do we need to deprecate also this method and add a new one? Technically, 
it is a class of the public API that can be extended.  

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##
@@ -48,21 +48,20 @@
 CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
 this.builder = builder;
 }
-@SuppressWarnings("unchecked")
- KTable build(final Map, 
Aggregator> groupPatterns,
-final Initializer initializer,
-final NamedInternal named,
-final StoreBuilder storeBuilder,
-final Serde keySerde,
-final Serde valueSerde,
-final String queryableName) {
+ KTable build(final Map, 
Aggregator> groupPatterns,
+final Initializer initializer,
+final NamedInternal named,
+final StoreBuilder storeBuilder,
+final Serde keySerde,
+final Serde valueSerde,
+final String queryableName) {
 processRepartitions(groupPatterns, storeBuilder);
 final Collection processors = new ArrayList<>();
-final Collection parentProcessors = new 
ArrayList<>();
+final Collection parentProcessors = 
new ArrayList<>();
 boolean stateCreated = false;
 int counter = 0;
 for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) {
-final KStreamAggProcessorSupplier parentProcessor =
+final KStreamAggregateProcessorSupplier 
parentProcessor =

Review comment:
   Shouldn't this be `KStreamAggregateProcessorSupplier`? The 
positions of the parameters `KOut` and `VIn` on 
`KStreamAggregateProcessorSupplier` changed with respect to 
`KStreamAggProcessorSupplier`.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
##
@@ -183,7 +182,9 @@
 
 final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME);
 return doAggregate(
-new KStreamAggregate<>(materializedInternal.storeName(), 
aggregateBuilder.countInitializer, aggregateBuilder.countAggregator),
+new KStreamAggregate<>(materializedInternal.storeName(),
+aggregateBuilder.countInitializer,
+aggregateBuilder.countAggregator),

Review comment:
   ```suggestion
   new KStreamAggregate<>(
   materializedInternal.storeName(),
   aggregateBuilder.countInitializer,
   aggregateBuilder.countAggregator
   ),
   ```
   or
   ```suggestion
   new KStreamAggregate<>(
   materializedInternal.storeName(),
   aggregateBuilder.countInitializer,
   aggregateBuilder.countAggregator),
   ```

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
##
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Objects;
+import java.util.Set;

Review comment:
   See my comment above about import order.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java
##
@@ -16,6 +16,10 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;

Review comment:
   In [KAFKA-10787](https://issues.apache.org/jira/browse/KAFKA-10787) we 
agreed on an import order `kafka`, `org.apache.kafka`, `com`, `net`, `org`, 
`java`, `javax` and static imports. Additionally, there should be a empty line 
between import blocks.
   
   Note, PR #10428 introduces check and a formatter for this.

##
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##
@@ -26,29 +26,29 @@
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.processor.ConnectedStoreProvider;
 import org.apache.kafka.streams.processor.Processor;
-import org.

[GitHub] [kafka] vamossagar12 opened a new pull request #10798: KAFKA-9168: Adding direct byte buffer support to rocksdb state store

2021-05-31 Thread GitBox


vamossagar12 opened a new pull request #10798:
URL: https://github.com/apache/kafka/pull/10798


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-05-31 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354490#comment-17354490
 ] 

Sagar Rao commented on KAFKA-9168:
--

[~ableegoldman] i modified the put API for single column family.. The changes 
are minimal right now but they need to evolve. But, before that I was thinking 
if you could run the internal benchmarking tool just for this API, we can then 
take a call. I will come up with jmh benchmarks as well. Here is the draft PR:

https://github.com/apache/kafka/pull/10798

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on pull request #10796: HOTFIX: fix build error

2021-05-31 Thread GitBox


ijuma commented on pull request #10796:
URL: https://github.com/apache/kafka/pull/10796#issuecomment-851537672


   Thanks for the fix, my bad for missing it. I did merge trunk and build 
locally before merging the PR, but didn't check Scala 2.12.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10791: MINOR: replace by org.junit.jupiter.api.Tag by net.jqwik.api.Tag for …

2021-05-31 Thread GitBox


ijuma commented on pull request #10791:
URL: https://github.com/apache/kafka/pull/10791#issuecomment-851538731


   I was asking if our build has to change. Looks like it's string based, so it 
doesn't?
   
   https://github.com/apache/kafka/blob/trunk/build.gradle#L404


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #10791: MINOR: replace by org.junit.jupiter.api.Tag by net.jqwik.api.Tag for …

2021-05-31 Thread GitBox


ijuma edited a comment on pull request #10791:
URL: https://github.com/apache/kafka/pull/10791#issuecomment-851538731


   I was asking if our build has to change. Looks like it's string based, so it 
doesn't?
   
   https://github.com/apache/kafka/blob/trunk/build.gradle#L404
   
   Have we tested?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others

2021-05-31 Thread GitBox


viktorsomogyi commented on pull request #10738:
URL: https://github.com/apache/kafka/pull/10738#issuecomment-851544134


   Finished rebasing and coding (and therefore force pushing), I'll switch the 
pr into "ready for review" state now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #10738: KAFKA-6945: KIP-373, allow users to create delegation token for others

2021-05-31 Thread GitBox


viktorsomogyi commented on pull request #10738:
URL: https://github.com/apache/kafka/pull/10738#issuecomment-851545252


   @omkreddy @rajinisivaram would you please help in the review to get this 
long outstanding KIP done?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


dengziming commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642556841



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -363,6 +386,22 @@ class BrokerToControllerRequestThread(
 }
   }
 
+  def maybeCheckNotControllerErrorInsideEnvelopeResponse(requestHeader: 
RequestHeader, responseBody: AbstractResponse): Boolean = {
+if (responseBody.isInstanceOf[EnvelopeResponse] && requestHeader != null) {
+  info(s"Trying to find NOT_CONTROLLER exception inside envelope response")
+  val envelopeResponse = responseBody.asInstanceOf[EnvelopeResponse]
+  val envelopeError = envelopeResponse.error()
+
+  if (envelopeError == Errors.NONE) {
+val response = 
AbstractResponse.parseResponse(envelopeResponse.responseData, requestHeader)
+envelopeResponse.responseData().rewind()
+return response.errorCounts().containsKey(Errors.NOT_CONTROLLER)
+  }
+}
+
+false;

Review comment:
   nit: unnecessary `;`

##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -363,6 +386,22 @@ class BrokerToControllerRequestThread(
 }
   }
 
+  def maybeCheckNotControllerErrorInsideEnvelopeResponse(requestHeader: 
RequestHeader, responseBody: AbstractResponse): Boolean = {

Review comment:
   How about move this to `ForwardingManager` since the envelopResponse and 
envelopRequest are all in `ForwardingManager`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #10799: WIP

2021-05-31 Thread GitBox


dajac opened a new pull request #10799:
URL: https://github.com/apache/kafka/pull/10799


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10693: KAFKA-12625: Fix the NOTICE file

2021-05-31 Thread GitBox


jlprat commented on pull request #10693:
URL: https://github.com/apache/kafka/pull/10693#issuecomment-851594252


   Hi @ableegoldman as you reviewed the last change in that area (fix license 
files) do you think you can review this one? Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10783: MINOR: Dependency updates around Scala libraries

2021-05-31 Thread GitBox


jlprat commented on pull request #10783:
URL: https://github.com/apache/kafka/pull/10783#issuecomment-851594384


   @ijuma You can review now


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9497: KAFKA-10619: Producer will enable idempotence and acks all by default

2021-05-31 Thread GitBox


ijuma commented on pull request #9497:
URL: https://github.com/apache/kafka/pull/9497#issuecomment-851596119


   @d8tltanc can we rebase this PR so that we can get it merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma removed a comment on pull request #9497: KAFKA-10619: Producer will enable idempotence and acks all by default

2021-05-31 Thread GitBox


ijuma removed a comment on pull request #9497:
URL: https://github.com/apache/kafka/pull/9497#issuecomment-851596119


   @d8tltanc can we rebase this PR so that we can get it merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9497: KAFKA-10619: Producer will enable idempotence and acks all by default

2021-05-31 Thread GitBox


ijuma commented on pull request #9497:
URL: https://github.com/apache/kafka/pull/9497#issuecomment-851597938


   @d8tltanc I notice that the Jira ticket was assigned to you, but this PR was 
submitted by @warrenzhu25. Are you working together on this?
   
   I am asking because we need to get this change into trunk soonish and 
changing the default is not the only change we require. We also need to check 
and potentially update the tests so that we get coverage of non idempotent 
cases too after the default is switched.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soarez commented on pull request #10795: KAFKA-12866: Avoid root access to Zookeeper

2021-05-31 Thread GitBox


soarez commented on pull request #10795:
URL: https://github.com/apache/kafka/pull/10795#issuecomment-851598218


   @omkreddy can you have a look at this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12869) Update vulnerable dependencies

2021-05-31 Thread Pavel Kuznetsov (Jira)
Pavel Kuznetsov created KAFKA-12869:
---

 Summary: Update vulnerable dependencies
 Key: KAFKA-12869
 URL: https://issues.apache.org/jira/browse/KAFKA-12869
 Project: Kafka
  Issue Type: Bug
  Components: core, KafkaConnect
Affects Versions: 2.7.1
Reporter: Pavel Kuznetsov


*Description*
I checked kafka_2.13-2.7.1.tgz distribution with WhiteSource and find out that 
some libraries have vulnerabilities.
Here they are:
* jetty-io-9.4.38.v20210224.jar has CVE-2021-28165 vulnerability. The way to 
fix it is to upgrade to org.eclipse.jetty:jetty-io:9.4.39 or 
org.eclipse.jetty:jetty-io:10.0.2 or org.eclipse.jetty:jetty-io:11.0.2
* jersey-common-2.31.jar has CVE-2021-28168 vulnerability. The way to fix it is 
to upgrade to org.glassfish.jersey.core:jersey-common:2.34
* jetty-server-9.4.38.v20210224.jar has CVE-2021-28164 vulnerability. The way 
to fix it is to upgrade to org.eclipse.jetty:jetty-webapp:9.4.39

*To Reproduce*
Download kafka_2.13-2.7.1.tgz and find jars, listed above.
Check that these jars with corresponding versions are mentioned in 
corresponding vulnerability description.

*Expected*
* jetty-io upgraded to 9.4.39 or higher
* jersey-common upgraded to 2.34 or higher
* jetty-server upgraded to jetty-webapp:9.4.39 or higher

*Actual*
* jetty-io is 9.4.38
* jersey-common is 2.31
* jetty-server is 9.4.38





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-05-31 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354586#comment-17354586
 ] 

Bruno Cadonna commented on KAFKA-9168:
--

[~sagarrao] and [~ableegoldman] I took the liberty to start the internal 
benchmark on the draft PR.

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy commented on pull request #10795: KAFKA-12866: Avoid root access to Zookeeper

2021-05-31 Thread GitBox


omkreddy commented on pull request #10795:
URL: https://github.com/apache/kafka/pull/10795#issuecomment-851613121


   cc @rondagostino 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12869) Update vulnerable dependencies

2021-05-31 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354604#comment-17354604
 ] 

Ismael Juma commented on KAFKA-12869:
-

Thanks for the report. Have you checked if these have already been fixed in 
trunk?

> Update vulnerable dependencies
> --
>
> Key: KAFKA-12869
> URL: https://issues.apache.org/jira/browse/KAFKA-12869
> Project: Kafka
>  Issue Type: Bug
>  Components: core, KafkaConnect
>Affects Versions: 2.7.1
>Reporter: Pavel Kuznetsov
>Priority: Major
>  Labels: security
>
> *Description*
> I checked kafka_2.13-2.7.1.tgz distribution with WhiteSource and find out 
> that some libraries have vulnerabilities.
> Here they are:
> * jetty-io-9.4.38.v20210224.jar has CVE-2021-28165 vulnerability. The way to 
> fix it is to upgrade to org.eclipse.jetty:jetty-io:9.4.39 or 
> org.eclipse.jetty:jetty-io:10.0.2 or org.eclipse.jetty:jetty-io:11.0.2
> * jersey-common-2.31.jar has CVE-2021-28168 vulnerability. The way to fix it 
> is to upgrade to org.glassfish.jersey.core:jersey-common:2.34
> * jetty-server-9.4.38.v20210224.jar has CVE-2021-28164 vulnerability. The way 
> to fix it is to upgrade to org.eclipse.jetty:jetty-webapp:9.4.39
> *To Reproduce*
> Download kafka_2.13-2.7.1.tgz and find jars, listed above.
> Check that these jars with corresponding versions are mentioned in 
> corresponding vulnerability description.
> *Expected*
> * jetty-io upgraded to 9.4.39 or higher
> * jersey-common upgraded to 2.34 or higher
> * jetty-server upgraded to jetty-webapp:9.4.39 or higher
> *Actual*
> * jetty-io is 9.4.38
> * jersey-common is 2.31
> * jetty-server is 9.4.38



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma opened a new pull request #10800: MINOR: Update jmh for async profiler 2.0 support

2021-05-31 Thread GitBox


ijuma opened a new pull request #10800:
URL: https://github.com/apache/kafka/pull/10800


   Async profiler 2.0 outputs html5 flame graph files
   and supports simultaneous collection of cpu,
   allocation and lock profiles in jfr format.
   
   Updated the readme to include an example of the
   latter and verified that the Readme commands
   work with async profiler 2.0.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10765: KAFKA-12519: Remove built-in Streams metrics for versions 0.10.0-2.4

2021-05-31 Thread GitBox


cadonna commented on pull request #10765:
URL: https://github.com/apache/kafka/pull/10765#issuecomment-851639965


   @showuon and @guozhangwang Do you want to have a second look after my last 
update of the unit tests? Otherwise I would move on and merge.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10783: MINOR: Dependency updates for Scala libraries for improved Scala 3.0 support

2021-05-31 Thread GitBox


ijuma commented on pull request #10783:
URL: https://github.com/apache/kafka/pull/10783#issuecomment-851640073


   JDK 8 and Scala 2.12 has two unrelated failures.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma merged pull request #10783: MINOR: Dependency updates for Scala libraries for improved Scala 3.0 support

2021-05-31 Thread GitBox


ijuma merged pull request #10783:
URL: https://github.com/apache/kafka/pull/10783


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10765: KAFKA-12519: Remove built-in Streams metrics for versions 0.10.0-2.4

2021-05-31 Thread GitBox


cadonna commented on pull request #10765:
URL: https://github.com/apache/kafka/pull/10765#issuecomment-851641644


   Test failures are unrelated and known to be flaky:
   ```
   JDK 8 and Scala 2.12 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   JDK 8 and Scala 2.12 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopics()
   JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateAndManyTopicsWithManyPartitions()
   JDK 15 and Scala 2.13 / 
kafka.server.RaftClusterTest.testCreateClusterAndCreateListDeleteTopic()
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10783: MINOR: Dependency updates for Scala libraries for improved Scala 3.0 support

2021-05-31 Thread GitBox


jlprat commented on pull request #10783:
URL: https://github.com/apache/kafka/pull/10783#issuecomment-851642412


   Thanks a lot for the review @ijuma !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10785: KIP-708 / A Rack awareness for Kafka Streams

2021-05-31 Thread GitBox


cadonna commented on pull request #10785:
URL: https://github.com/apache/kafka/pull/10785#issuecomment-851652903


   @lkokhreidze Thank you for the PR!
   This is quite a large PR. Could you split it up into smaller PRs that are 
easier to review?
   One option could be to have a separate PR for:
   - standby task assignor
   - config changes 
   - subscription info changes
   - ... 
   
   Opening small PRs usually increases the review quality and decreases time to 
the first review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #10693: KAFKA-12625: Fix the NOTICE file

2021-05-31 Thread GitBox


ableegoldman commented on pull request #10693:
URL: https://github.com/apache/kafka/pull/10693#issuecomment-851732066


   Hey @jlprat , can you elaborate on (a) the motivation behind adding this 
`NOTICE-binary` file (are we missing licenses completely? or we had them but 
not in the correct format? etc), (b) why you put this in a new `NOTICE-binary` 
file instead of the existing `NOTICE` file (do we need both? if we do add this 
`NOTICE-binary` then should we remove some or all of the `NOTICE` file 
contents?), and (c) how the `NOTICE-binary` file relates to the existing stuff 
under `licenses/`
   
   I'll extend the same disclaimer, make no assumption that I know what I'm 
talking about 🙂 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642743186



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -363,6 +386,22 @@ class BrokerToControllerRequestThread(
 }
   }
 
+  def maybeCheckNotControllerErrorInsideEnvelopeResponse(requestHeader: 
RequestHeader, responseBody: AbstractResponse): Boolean = {
+if (responseBody.isInstanceOf[EnvelopeResponse] && requestHeader != null) {
+  info(s"Trying to find NOT_CONTROLLER exception inside envelope response")
+  val envelopeResponse = responseBody.asInstanceOf[EnvelopeResponse]
+  val envelopeError = envelopeResponse.error()
+
+  if (envelopeError == Errors.NONE) {
+val response = 
AbstractResponse.parseResponse(envelopeResponse.responseData, requestHeader)
+envelopeResponse.responseData().rewind()
+return response.errorCounts().containsKey(Errors.NOT_CONTROLLER)
+  }
+}
+
+false;

Review comment:
   Nice catch! Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642748185



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -363,6 +386,22 @@ class BrokerToControllerRequestThread(
 }
   }
 
+  def maybeCheckNotControllerErrorInsideEnvelopeResponse(requestHeader: 
RequestHeader, responseBody: AbstractResponse): Boolean = {

Review comment:
   Yes, I tried, but it can't. The `activeController` is stored in 
`BrokerToControllerRequestThread`, which owned by 
`BrokerToControllerChannelManager`. So, only the 
`BrokerToControllerRequestThread#handleResponse` can update the 
activeController and put the reqeust into top of the queue again (retry).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-05-31 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354778#comment-17354778
 ] 

Sagar Rao edited comment on KAFKA-9168 at 6/1/21, 3:42 AM:
---

Thanks [~cadonna]! 


was (Author: sagarrao):
Thanks [~cadonna]! would like to see the results.

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-05-31 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354778#comment-17354778
 ] 

Sagar Rao commented on KAFKA-9168:
--

Thanks [~cadonna]! would like to see the results.

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #10585: MINOR: cleanTest ought to remove output of unitTest task and integrat…

2021-05-31 Thread GitBox


chia7712 merged pull request #10585:
URL: https://github.com/apache/kafka/pull/10585


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #10791: MINOR: replace by org.junit.jupiter.api.Tag by net.jqwik.api.Tag for …

2021-05-31 Thread GitBox


chia7712 commented on pull request #10791:
URL: https://github.com/apache/kafka/pull/10791#issuecomment-851797315


   > I was asking if our build has to change. Looks like it's string based, so 
it doesn't?
   
   Sorry for my incorrect response :(
   
   > Have we tested?
   
   yes. 
   
   *BEFORE*
   https://user-images.githubusercontent.com/6234750/120266901-6e57c300-c2d5-11eb-9df2-90ebbc209bd8.png";>
   
   *AFTER*
   https://user-images.githubusercontent.com/6234750/120267072-cd1d3c80-c2d5-11eb-9802-46007dacdd5d.png";>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12869) Update vulnerable dependencies

2021-05-31 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17354800#comment-17354800
 ] 

Dongjin Lee commented on KAFKA-12869:
-

[~ijuma] I checked the versions. All of them are already fixed in trunk, but 
not included in 2.7.1.

> Update vulnerable dependencies
> --
>
> Key: KAFKA-12869
> URL: https://issues.apache.org/jira/browse/KAFKA-12869
> Project: Kafka
>  Issue Type: Bug
>  Components: core, KafkaConnect
>Affects Versions: 2.7.1
>Reporter: Pavel Kuznetsov
>Priority: Major
>  Labels: security
>
> *Description*
> I checked kafka_2.13-2.7.1.tgz distribution with WhiteSource and find out 
> that some libraries have vulnerabilities.
> Here they are:
> * jetty-io-9.4.38.v20210224.jar has CVE-2021-28165 vulnerability. The way to 
> fix it is to upgrade to org.eclipse.jetty:jetty-io:9.4.39 or 
> org.eclipse.jetty:jetty-io:10.0.2 or org.eclipse.jetty:jetty-io:11.0.2
> * jersey-common-2.31.jar has CVE-2021-28168 vulnerability. The way to fix it 
> is to upgrade to org.glassfish.jersey.core:jersey-common:2.34
> * jetty-server-9.4.38.v20210224.jar has CVE-2021-28164 vulnerability. The way 
> to fix it is to upgrade to org.eclipse.jetty:jetty-webapp:9.4.39
> *To Reproduce*
> Download kafka_2.13-2.7.1.tgz and find jars, listed above.
> Check that these jars with corresponding versions are mentioned in 
> corresponding vulnerability description.
> *Expected*
> * jetty-io upgraded to 9.4.39 or higher
> * jersey-common upgraded to 2.34 or higher
> * jetty-server upgraded to jetty-webapp:9.4.39 or higher
> *Actual*
> * jetty-io is 9.4.38
> * jersey-common is 2.31
> * jetty-server is 9.4.38



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 merged pull request #10791: MINOR: replace by org.junit.jupiter.api.Tag by net.jqwik.api.Tag for …

2021-05-31 Thread GitBox


chia7712 merged pull request #10791:
URL: https://github.com/apache/kafka/pull/10791


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #10176: KAFKA-12359: Update Jetty to 11

2021-05-31 Thread GitBox


dongjinleekr commented on pull request #10176:
URL: https://github.com/apache/kafka/pull/10176#issuecomment-851818497


   For those who are interested in this issue, here is some context:
   
   As you can see in the table below (from 
[here](https://www.eclipse.org/jetty/)), the change between Jetty 9.4 and Jetty 
11 is not only Servlet versions; it involves Servlet version, package 
namespace, API changes, and related dependencies. For this reason, Upgrading 
Jetty from 9.4 is never a simple task but a complicated one that requires lots 
of changes, tests, etc.
   
   
![Eclipse-Jetty-The-Eclipse-Foundation](https://user-images.githubusercontent.com/2375128/120270720-03ae8380-c2e5-11eb-97ea-52e2d2f12fdf.png)
   
   The problem is: **the current 9.4's End of Life seems not a too distant 
future**; For instance, [Jetty 9.2 reached the end of life in March 
2018](https://www.eclipse.org/lists/jetty-announce/msg00116.html), and [9.3 
also did in February 
2020](https://www.eclipse.org/lists/jetty-announce/msg00140.html) (i.e., in two 
years). 9.3 also dropped Java 7 support from before its final release. Although 
the Jetty team does not mention it explicitly, the community seems to expect it 
to reach the EOL in 2022 when Jetty 10.x and 11.x becomes stable and 
widespread. If this thing happens, we should also migrate to Jetty 11, along 
with Java 11.
   
   I agree with @ijuma that it is too early to discuss this issue. However, I 
think we have enough reason to recognize this problem. **I will keep my eyes on 
Jetty 9.4's EOL issue and as soon as it becomes certain, I will reopen the PR.**
   
   +1. For those who hope to test Kafka Connect on Servlet 5: I will start 
providing a preview soon like [log4j2 
appender](http://home.apache.org/~dongjin/post/apache-kafka-log4j2-support/).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat commented on pull request #10693: KAFKA-12625: Fix the NOTICE file

2021-05-31 Thread GitBox


jlprat commented on pull request #10693:
URL: https://github.com/apache/kafka/pull/10693#issuecomment-851831451


   Thanks for looking at it @ableegoldman ,
   I'll do my best explaining:
   
   a) I added a NOTICE-binary file following the same pattern done for the 
LICENSE patch. The purpose of the NOTICE-binary is to be included in the 
distribution file only.
   b) The content of our NOTICE is what Kafka declares, that I left untouched. 
The purpose of the NOTICE-binary is to collect all notices from all our 
dependencies and paste them there.
   c) I'm no expert at all, but I'd say the licenses folder was merely to 
collect all the different texts of licences so we could reference them in our 
LICENSE-binary (I think licenses say their text needs to be included)
   
   I looked at https://github.com/apache/hadoop and attempted to use the same 
approach.
   
   On short, NOTICE contains Kafka's notices while NOTICE-binary contains 
Kafka's and its dependencies' notices so it can be packaged in our distribution 
files.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642195443



##
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##
@@ -36,10 +37,16 @@
 private final String message;
 
 public static ApiError fromThrowable(Throwable t) {
+Throwable throwableToBeDecode = t;
+// Future will wrap the original exception with completionException, 
which will return unexpected UNKNOWN_SERVER_ERROR.
+if (t instanceof CompletionException) {
+throwableToBeDecode = t.getCause();

Review comment:
   fix 1: unwrap the `CompletionException` to get the original exception 
inside.

##
File path: clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
##
@@ -36,10 +37,16 @@
 private final String message;
 
 public static ApiError fromThrowable(Throwable t) {
+Throwable throwableToBeEncode = t;
+// Future will wrap the original exception with completionException, 
which will return unexpected UNKNOWN_SERVER_ERROR.
+if (t instanceof CompletionException) {
+throwableToBeEncode = t.getCause();

Review comment:
   fix 1: unwrap the CompletionException to get the original exception 
inside.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon commented on a change in pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#discussion_r642803795



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -239,18 +244,34 @@ class BrokerToControllerChannelManagerImpl(
*
* @param request The request to be sent.
* @param callbackRequest completion callback.
+   * @param requestHeader   The request header to be sent, used for parsing 
the envelop response
*/
   def sendRequest(
 request: AbstractRequest.Builder[_ <: AbstractRequest],
-callback: ControllerRequestCompletionHandler
+callback: ControllerRequestCompletionHandler,
+requestHeader: RequestHeader

Review comment:
   add a `requestHeader` parameter to do envelope response parsing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-851845194


   Jenkins PR build results proved the flaky tests doesn't fail anymore:
   `#4`: 1 failed test:
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStop
   ```
   
   `#5`: all tests passed
   `#6`: all tests passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon edited a comment on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon edited a comment on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-851845194


   Jenkins PR build results proved the `RaftClusterTest` tests doesn't fail 
anymore:
   `#4`: 1 failed test:
   ```
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.connect.integration.BlockingConnectorTest.testBlockInSinkTaskStop
   ```
   
   `#5`: all tests passed
   `#6`: all tests passed
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10794: KAFKA-12677: parse envelope response to check if not_controller error existed

2021-05-31 Thread GitBox


showuon commented on pull request #10794:
URL: https://github.com/apache/kafka/pull/10794#issuecomment-851846366


   @dengziming , thanks for the comments. I've updated. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #10579: KAFKA-9555 Added default RLMM implementation based on internal topic storage.

2021-05-31 Thread GitBox


satishd commented on a change in pull request #10579:
URL: https://github.com/apache/kafka/pull/10579#discussion_r642095314



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import static 
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+/**
+ * This class is responsible for consuming messages from remote log metadata 
topic ({@link 
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME})
+ * partitions and maintain the state of the remote log segment metadata. It 
gives an API to add or remove
+ * for what topic partition's metadata should be consumed by this instance 
using
+ * {{@link #addAssignmentsForPartitions(Set)}} and {@link 
#removeAssignmentsForPartitions(Set)} respectively.
+ * 
+ * When a broker is started, controller sends topic partitions that this 
broker is leader or follower for and the
+ * partitions to be deleted. This class receives those notifications with
+ * {@link #addAssignmentsForPartitions(Set)} and {@link 
#removeAssignmentsForPartitions(Set)} assigns consumer for the
+ * respective remote log metadata partitions by using {@link 
RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}.
+ * Any leadership changes later are called through the same API. We will 
remove the partitions that ard deleted from
+ * this broker which are received through {@link 
#removeAssignmentsForPartitions(Set)}.
+ * 
+ * After receiving these events it invokes {@link 
RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)},
+ * which maintains in-memory representation of the state of {@link 
RemoteLogSegmentMetadata}.
+ */
+class ConsumerTask implements Runnable, Closeable {
+private static final Logger log = 
LoggerFactory.getLogger(ConsumerTask.class);
+
+private static final long POLL_INTERVAL_MS = 30L;
+
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+private final KafkaConsumer consumer;
+private final RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler;
+private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+
+private volatile boolean close = false;
+private volatile boolean assignPartitions = false;
+
+private final Object assignPartitionsLock = new Object();
+
+// Remote log metadata topic partitions that consumer is assigned to.
+private volatile Set assignedMetaPartitions = 
Collections.emptySet();
+
+// User topic partitions that this broker is a leader/follower for.
+private Set assignedTopicPartitions = 
Collections.emptySet();
+
+// Map of remote log metadata topic partition to target end offsets to be 
consumed.
+private final Map partitionToTargetEndOffsets = new 
ConcurrentHashMap<>();
+
+// Map of remote log metadata topic partition to consumed offsets.
+private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
+
+public ConsumerTask(KafkaConsumer consumer,
+RemotePartitionMetadataEventHandler

[GitHub] [kafka] lkokhreidze commented on pull request #10785: KIP-708 / A Rack awareness for Kafka Streams

2021-05-31 Thread GitBox


lkokhreidze commented on pull request #10785:
URL: https://github.com/apache/kafka/pull/10785#issuecomment-851871480


   Hi @cadonna
   Thanks for the reply and suggestion. Will it be okay to have smaller PRs 
merged into trunk without having full functionality in place? Afair, when I 
worked on https://github.com/apache/kafka/pull/7170, it was one of the 
motivations to have full functionality in trunk in one go.
   If having partial implementation in trunk is OK, I can split this PR into 
smaller chunks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org