[GitHub] [kafka] mimaison commented on pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration

2020-07-16 Thread GitBox


mimaison commented on pull request #8921:
URL: https://github.com/apache/kafka/pull/8921#issuecomment-659272353


   Test failure is unrelated:
   - 
org.apache.kafka.streams.integration.BranchedMultiLevelRepartitionConnectedTopologyTest.testTopologyBuild



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison merged pull request #8921: KAFKA-10160: Kafka MM2 consumer configuration

2020-07-16 Thread GitBox


mimaison merged pull request #8921:
URL: https://github.com/apache/kafka/pull/8921


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10160) Kafka MM2 consumer configuration

2020-07-16 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10160?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-10160.

Fix Version/s: 2.7.0
   Resolution: Fixed

> Kafka MM2 consumer configuration
> 
>
> Key: KAFKA-10160
> URL: https://issues.apache.org/jira/browse/KAFKA-10160
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Pavol Ipoth
>Assignee: sats
>Priority: Major
>  Labels: configuration, kafka, mirror-maker
> Fix For: 2.7.0
>
>
> [https://github.com/apache/kafka/blob/trunk/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java#L51,]
>  according this producer/consumer level properties should be configured as 
> e.g. somesource->sometarget.consumer.client.id, i try to set 
> somesource->sometarget.consumer.auto.offset.reset=latest, but without 
> success, consumer always tries to fetch earliest, not sure if bug or my 
> misconfiguration, but then at least some update to docu would be useful



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on a change in pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers

2020-07-16 Thread GitBox


rajinisivaram commented on a change in pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#discussion_r455644987



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -712,10 +708,13 @@ class ReplicaManager(val config: KafkaConfig,
 } catch {
   case e@(_: InvalidTopicException |
   _: LogDirNotFoundException |
-  _: ReplicaNotAvailableException |
   _: KafkaStorageException) =>
-warn("Unable to alter log dirs for %s".format(topicPartition), e)
+warn(s"Unable to alter log dirs for $topicPartition", e)
 (topicPartition, Errors.forException(e))
+  case e: NotLeaderOrFollowerException =>
+warn(s"Unable to alter log dirs for $topicPartition", e)
+// Retaining REPLICA_NOT_AVAILABLE exception for 
ALTER_REPLICA_LOG_DIRS for older versions for compatibility
+(topicPartition, if (config.interBrokerProtocolVersion >= 
KAFKA_2_7_IV0) Errors.NOT_LEADER_OR_FOLLOWER else Errors.REPLICA_NOT_AVAILABLE)

Review comment:
   Reverted.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers

2020-07-16 Thread GitBox


rajinisivaram commented on a change in pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#discussion_r455645402



##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/NotLeaderOrFollowerException.java
##
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+/**
+ * This server is not the leader or follower for the given partition.
+ * This could a transient exception during reassignments.
+ */
+@SuppressWarnings("deprecation")
+public class NotLeaderOrFollowerException extends 
NotLeaderForPartitionException {

Review comment:
   Updated javadoc.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers

2020-07-16 Thread GitBox


rajinisivaram commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-659288156


   @hachikuji @ijuma Thanks for the reviews, have addressed the comments so far.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9973) __consumer_offsets record is invalid lead to log clean failed and __consumer_offsets grows too big

2020-07-16 Thread zhangzhisheng (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159059#comment-17159059
 ] 

zhangzhisheng commented on KAFKA-9973:
--

__consumer_offsets can not clean

kafka version 2.12-2.4.1

jdk version jdk1.8.0_231
{code:java}
// 
21G ./data/kafka-logs/topic-2018066952-7
27G ./data/kafka-logs/__consumer_offsets-25
30G ./data/kafka-logs/topic-2018066951-0
30G ./data/kafka-logs/topic-2018066951-1
30G ./data/kafka-logs/topic-2018066951-3
30G ./data/kafka-logs/topic-2018066951-5
30G ./data/kafka-logs/topic-2018066951-6
30G ./data/kafka-logs/topic-2018066951-7
32G ./data/kafka-logs/__consumer_offsets-30
32G ./data/kafka-logs/topic-201911251114-0
32G ./data/kafka-logs/topic-201911251114-2
{code}

> __consumer_offsets record is invalid lead to log clean failed and 
> __consumer_offsets grows too big
> --
>
> Key: KAFKA-9973
> URL: https://issues.apache.org/jira/browse/KAFKA-9973
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Affects Versions: 2.1.1
>Reporter: leibo
>Priority: Major
> Attachments: dump.png, log-cleaner.log.1, screenshot-1.png
>
>
> __consumer_offsets-34 grows to 13GB, and can't be compact successf, the error 
> log as bellow:
> {code:java}
> //代码占位符
> [2019-12-30 19:21:41,047] INFO [kafka-log-cleaner-thread-6]: Starting 
> (kafka.log.LogCleaner)
> [2019-12-30 19:21:41,079] INFO [kafka-log-cleaner-thread-7]: Starting 
> (kafka.log.LogCleaner)
> [2019-12-30 19:21:42,825] WARN [kafka-log-cleaner-thread-0]: Unexpected 
> exception thrown when cleaning log 
> Log(/home/kafka_2.12-2.1.1/data/kafka-logs/__consumer_offsets-34). Marking 
> its partition (__consumer_offsets-34) as uncleanable (kafka.log.LogCleaner)
> org.apache.kafka.common.record.InvalidRecordException: Found invalid number 
> of record headers -47
> --
> [2019-12-30 20:23:51,340] INFO [kafka-log-cleaner-thread-5]: Starting 
> (kafka.log.LogCleaner)
> [2019-12-30 20:23:51,361] INFO Cleaner 0: Building offset map for log 
> __consumer_offsets-34 for 52 segments in offset range [26842714, 73423730). 
> (kafka.log.LogCleaner)
> [2019-12-30 20:23:51,398] INFO [kafka-log-cleaner-thread-6]: Starting 
> (kafka.log.LogCleaner)
> [2019-12-30 20:23:51,493] INFO [kafka-log-cleaner-thread-7]: Starting 
> (kafka.log.LogCleaner)
> [2019-12-30 20:23:59,596] WARN [kafka-log-cleaner-thread-0]: Unexpected 
> exception thrown when cleaning log 
> Log(/home/kafka_2.12-2.1.1/data/kafka-logs/__consumer_offsets-34). Marking 
> its partition (__consumer_offsets-34) as uncleanable (kafka.log.LogCleaner)
> org.apache.kafka.common.record.InvalidRecordException: Found invalid number 
> of record headers -47
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-16 Thread GitBox


mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r455669663



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -892,136 +895,162 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
 val version = request.header.apiVersion
 
-val mergedResponseMap = if (version == 0)
+val topics = if (version == 0)
   handleListOffsetRequestV0(request)
 else
   handleListOffsetRequestV1AndAbove(request)
 
-sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+  .setThrottleTimeMs(requestThrottleMs)
+  .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
 val correlationId = request.header.correlationId
 val clientId = request.header.clientId
 val offsetRequest = request.body[ListOffsetRequest]
 
-val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-  DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-  k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-}
-
-val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-  try {
-val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-  topicPartition = topicPartition,
-  timestamp = partitionData.timestamp,
-  maxNumOffsets = partitionData.maxNumOffsets,
-  isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-  fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-  } catch {
-// NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-// are typically transient and there is no value in logging the entire 
stack trace for the same
-case e @ (_ : UnknownTopicOrPartitionException |
-  _ : NotLeaderForPartitionException |
-  _ : KafkaStorageException) =>
-  debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-correlationId, clientId, topicPartition, e.getMessage))
-  (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-case e: Throwable =>
-  error("Error while responding to offset request", e)
-  (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-  }
-}
-responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-val correlationId = request.header.correlationId
-val clientId = request.header.clientId
-val offsetRequest = request.body[ListOffsetRequest]
-
-val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-  DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-  k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-ListOffsetResponse.UNKNOWN_TIMESTAMP,
-ListOffsetResponse.UNKNOWN_OFFSET,
-Optional.empty())
-}
-
-val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-  if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-s"failed because the partition is duplicated in the request.")
-(topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-  ListOffsetResponse.UNKNOWN_TIMESTAMP,
-  ListOffsetResponse.UNKNOWN_OFFSET,
-  Optional.empty()))
-  } else {
-
-def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-  (topicPartition, new ListOffsetResponse.PartitionData(
-e,
-ListOffsetResponse.UNKNOWN_TIMESTAMP,
-   

[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-16 Thread GitBox


mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r455671140



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -994,30 +1023,29 @@ public void onSuccess(ClientResponse response, 
RequestFuture f
  *   value of each partition may be null only for v0. In v1 
and later the ListOffset API would not
  *   return a null timestamp (-1 is returned instead when 
necessary).
  */
-private void handleListOffsetResponse(Map timestampsToSearch,
+private void handleListOffsetResponse(Map timestampsToSearch,
   ListOffsetResponse 
listOffsetResponse,
   RequestFuture 
future) {
 Map fetchedOffsets = new HashMap<>();
 Set partitionsToRetry = new HashSet<>();
 Set unauthorizedTopics = new HashSet<>();
 
-for (Map.Entry entry 
: timestampsToSearch.entrySet()) {
+Map partitionsData = 
byTopicPartitions(listOffsetResponse.responseData());

Review comment:
   I actually switched logic to loop on the response as you initially 
suggested





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon opened a new pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-16 Thread GitBox


showuon opened a new pull request #9029:
URL: https://github.com/apache/kafka/pull/9029


   In the original test, we will sleep for static 5 seconds to ensure the 
automated group offset sync is complete. It sometimes synced fast (less than 1 
sec), and sometimes slow (~ 20 seconds). I rewrite the sleep to wait for 
specific condition:  
   1. `consumer.endOffsets` to make sure the topic partition metadata is synced
   2. `backupClient.listConsumerGroupOffsets` to make sure the 
consumerGroupOffset is also synced
   
   I've tested in my local environment a lot of times. It can make the test 
more stable.
   
   Thanks.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-16 Thread GitBox


mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r455671951



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1077,7 +1077,7 @@ class ReplicaManager(val config: KafkaConfig,
   // Try the read first, this tells us whether we need all of 
adjustedFetchSize for this partition
   val readInfo: LogReadInfo = partition.readRecords(
 fetchOffset = fetchInfo.fetchOffset,
-currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
+currentLeaderEpoch = toScalaOption(fetchInfo.currentLeaderEpoch),

Review comment:
   I think it's ok to keep the Scala Option here. 
https://github.com/apache/kafka/pull/9008 can just update the field if it 
changes name.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-16 Thread GitBox


showuon commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r455672187



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException {
 backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * 
RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
 }
 
+private void waitForConsumerGroupOffsetSync(Consumer 
consumer, List topics)
+throws InterruptedException {
+Admin backupClient = backup.kafka().createAdminClient();
+List tps = new ArrayList<>(NUM_PARTITIONS * 
topics.size());
+IntStream.range(0, NUM_PARTITIONS).forEach(
+partitionInd -> {
+for (String topic: topics) {
+tps.add(new TopicPartition(topic, partitionInd));
+}
+}
+);
+long expectedTotalOffsets = NUM_RECORDS_PRODUCED * topics.size();
+
+waitForCondition(() -> {
+Map consumerGroupOffsets =
+
backupClient.listConsumerGroupOffsets("consumer-group-1").partitionsToOffsetAndMetadata().get();
+long consumerGroupOffsetTotal = 
consumerGroupOffsets.values().stream().mapToLong(metadata -> 
metadata.offset()).sum();
+
+Map offsets = consumer.endOffsets(tps, 
Duration.ofMillis(500));
+long totalOffsets = offsets.values().stream().mapToLong(l -> 
l).sum();
+
+// make sure the consumer group offsets are synced to expected 
number
+return totalOffsets == expectedTotalOffsets && 
consumerGroupOffsetTotal > 0;
+}, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not 
complete in time");
+}
 
 @Test
-public void testOneWayReplicationWithAutorOffsetSync1() throws 
InterruptedException {
+public void testOneWayReplicationWithAutoOffsetSync() throws 
InterruptedException {

Review comment:
   rename the typo test name 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-16 Thread GitBox


showuon commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-659310965


   @ning2008wisc @mimaison , could you help review this PR to fix the flaky 
test? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #9027: KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing

2020-07-16 Thread GitBox


cadonna commented on a change in pull request #9027:
URL: https://github.com/apache/kafka/pull/9027#discussion_r455681362



##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -181,6 +193,7 @@ bootstrap.serverscache.max.bytes.buffering
 Medium
 Maximum number of memory bytes to be used for 
record caches across all threads.
+Maximum number of memory bytes to be used for 
record caches across all threads.

Review comment:
   This line seems a duplicate.

##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -270,21 +296,26 @@ bootstrap.serversThe amount of time in milliseconds, before a 
request is retried. This applies if the retries parameter is configured to be greater than 0. 

   100
   
-  rocksdb.config.setter
+  rocksdb.config.setter
 Medium
 The RocksDB configuration.
 
   
-  state.cleanup.delay.ms
+  state.cleanup.delay.ms
 Low
 The amount of time in milliseconds to wait before 
deleting state when a partition has migrated.
 60 milliseconds
   
-  state.dir
+  state.dir
 High
 Directory location for state stores.
 /tmp/kafka-streams
   
+  topology.optimization

Review comment:
   I guess it tells the renderer which CSS to use to have alternating row 
colours. Apparently, CSS3 already offers ways to accomplish this without these 
classes (see 
https://www.textfixer.com/tutorials/css-table-alternating-rows.php).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10278) kafka-configs does not show the current properties of running kafka broker upon describe.

2020-07-16 Thread kaushik srinivas (Jira)
kaushik srinivas created KAFKA-10278:


 Summary: kafka-configs does not show the current properties of 
running kafka broker upon describe.
 Key: KAFKA-10278
 URL: https://issues.apache.org/jira/browse/KAFKA-10278
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.1
Reporter: kaushik srinivas


kafka-configs.sh does not list the properties 
(read-only/per-broker/cluster-wide) with which the kafka broker is currently 
running.

The command returns nothing.

Only those properties added or updated via kafka-configs.sh is listed by the 
describe command.

bash-4.2$ env -i  bin/kafka-configs.sh --bootstrap-server 
kf-test-0.kf-test-headless.test.svc.cluster.local:9092 --entity-type brokers 
--entity-default --describe Default config for brokers in the cluster are:
  log.cleaner.threads=2 sensitive=false 
synonyms=\{DYNAMIC_DEFAULT_BROKER_CONFIG:log.cleaner.threads=2}

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10278) kafka-configs does not show the current properties of running kafka broker upon describe.

2020-07-16 Thread kaushik srinivas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10278?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159129#comment-17159129
 ] 

kaushik srinivas commented on KAFKA-10278:
--

Hi [~rajinisiva...@gmail.com] 

[~rsivaram], [~ijuma]

Is this a known issue already ?

 

> kafka-configs does not show the current properties of running kafka broker 
> upon describe.
> -
>
> Key: KAFKA-10278
> URL: https://issues.apache.org/jira/browse/KAFKA-10278
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.1
>Reporter: kaushik srinivas
>Priority: Critical
>  Labels: kafka-configs.sh
>
> kafka-configs.sh does not list the properties 
> (read-only/per-broker/cluster-wide) with which the kafka broker is currently 
> running.
> The command returns nothing.
> Only those properties added or updated via kafka-configs.sh is listed by the 
> describe command.
> bash-4.2$ env -i  bin/kafka-configs.sh --bootstrap-server 
> kf-test-0.kf-test-headless.test.svc.cluster.local:9092 --entity-type brokers 
> --entity-default --describe Default config for brokers in the cluster are:
>   log.cleaner.threads=2 sensitive=false 
> synonyms=\{DYNAMIC_DEFAULT_BROKER_CONFIG:log.cleaner.threads=2}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-16 Thread GitBox


mimaison commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-659347339


   Thanks @dajac for your comments, I've pushed an update.
   
   @abbccdda Can you take a look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram opened a new pull request #9030: MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server

2020-07-16 Thread GitBox


rajinisivaram opened a new pull request #9030:
URL: https://github.com/apache/kafka/pull/9030


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2020-07-16 Thread Ivan Ponomarev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159164#comment-17159164
 ] 

Ivan Ponomarev commented on KAFKA-5488:
---

Hi everyone, 
{quote}I think there are some unnecessary interfaces

I don't think the return type needs to be a Map,
{quote}
[~high.lee] , concerning your comment about the API: the current API is a 
result of the extensive discussion (you can find the link to the discussion in 
the KIP itself).  The first versions of this KIP didn't have Map return type 
and Function as a parameter, but there was a concern that all the branches will 
be in separate variable scopes, which is inconvenient in many cases. There was 
a really hard discussion with a number of ideas proposed and rejected, what we 
have now seems to be the best choice from many points of view.
{quote}Are you willing to continue working?
{quote}
Sure, since I proposed this KIP, I'm going to implement it. I've been quite 
busy recently, but I really hope that I'll be able to post a PR from me in one 
or maximum two weeks.

 

 

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Marcel "childNo͡.de" Trautwein
>Assignee: highluck
>Priority: Major
>  Labels: kip
>
> KIP-418: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream]
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
>  [https://gitlab.com/childno.de/apache_kafka/snippets/1665655]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2020-07-16 Thread Ivan Ponomarev (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Ponomarev reassigned KAFKA-5488:
-

Assignee: Ivan Ponomarev  (was: highluck)

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Marcel "childNo͡.de" Trautwein
>Assignee: Ivan Ponomarev
>Priority: Major
>  Labels: kip
>
> KIP-418: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream]
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
>  [https://gitlab.com/childno.de/apache_kafka/snippets/1665655]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8296) Kafka Streams branch method raises type warnings

2020-07-16 Thread Ivan Ponomarev (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Ponomarev reassigned KAFKA-8296:
-

Assignee: Ivan Ponomarev

> Kafka Streams branch method raises type warnings
> 
>
> Key: KAFKA-8296
> URL: https://issues.apache.org/jira/browse/KAFKA-8296
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Michael Drogalis
>Assignee: Ivan Ponomarev
>Priority: Minor
>
> Because the branch method in the DSL takes vargargs, using it as follows 
> raises an unchecked type warning:
> {code:java}
> KStream[] branches = builder. User>stream(inputTopic)
> .branch((key, user) -> "united 
> states".equals(user.getCountry()),
> (key, user) -> "germany".equals(user.getCountry()),
> (key, user) -> "mexico".equals(user.getCountry()),
> (key, user) -> true);
> {code}
> The compiler warns with:
> {code:java}
> Warning:(39, 24) java: unchecked generic array creation for varargs parameter 
> of type org.apache.kafka.streams.kstream.Predicate super io.confluent.developer.avro.User>[]
> {code}
> This is unfortunate because of the way Java's type system + generics work. We 
> could possibly fix this by adding the @SafeVarargs annotation to the branch 
> method signatures.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2020-07-16 Thread Ivan Ponomarev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159164#comment-17159164
 ] 

Ivan Ponomarev edited comment on KAFKA-5488 at 7/16/20, 12:38 PM:
--

Hi everyone,
{quote}I think there are some unnecessary interfaces

I don't think the return type needs to be a Map,
{quote}
[~high.lee] , concerning your comment about the API: the current API is a 
result of the extensive discussion (you can find the link to the discussion in 
the KIP itself).  The first versions of this KIP didn't have Map return type 
and Function as a parameter, but there was a concern that all the branches will 
be in separate variable scopes, which is inconvenient in many cases. There was 
a really hard discussion with a number of ideas proposed and rejected, what we 
have now seems to be the best choice from many points of view.
{quote}Are you willing to continue working?
{quote}
Sure, since I proposed this KIP, I'm going to implement it. I've been quite 
busy recently, but I really hope that I'll be able to post a PR in one or 
maximum two weeks.

 

 


was (Author: iponomarev):
Hi everyone, 
{quote}I think there are some unnecessary interfaces

I don't think the return type needs to be a Map,
{quote}
[~high.lee] , concerning your comment about the API: the current API is a 
result of the extensive discussion (you can find the link to the discussion in 
the KIP itself).  The first versions of this KIP didn't have Map return type 
and Function as a parameter, but there was a concern that all the branches will 
be in separate variable scopes, which is inconvenient in many cases. There was 
a really hard discussion with a number of ideas proposed and rejected, what we 
have now seems to be the best choice from many points of view.
{quote}Are you willing to continue working?
{quote}
Sure, since I proposed this KIP, I'm going to implement it. I've been quite 
busy recently, but I really hope that I'll be able to post a PR from me in one 
or maximum two weeks.

 

 

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Marcel "childNo͡.de" Trautwein
>Assignee: Ivan Ponomarev
>Priority: Major
>  Labels: kip
>
> KIP-418: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream]
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
>  [https://gitlab.com/childno.de/apache_kafka/snippets/1665655]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on a change in pull request #9030: MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server

2020-07-16 Thread GitBox


dajac commented on a change in pull request #9030:
URL: https://github.com/apache/kafka/pull/9030#discussion_r455745104



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -197,18 +197,20 @@
 
 public class RequestResponseTest {
 
+private UnknownServerException unknownServerException = new 
UnknownServerException("secret");

Review comment:
   nit: I would add a small comment to explain why we do this here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-16 Thread GitBox


dajac commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r455763010



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -892,136 +895,162 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
 val version = request.header.apiVersion
 
-val mergedResponseMap = if (version == 0)
+val topics = if (version == 0)
   handleListOffsetRequestV0(request)
 else
   handleListOffsetRequestV1AndAbove(request)
 
-sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+  .setThrottleTimeMs(requestThrottleMs)
+  .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
 val correlationId = request.header.correlationId
 val clientId = request.header.clientId
 val offsetRequest = request.body[ListOffsetRequest]
 
-val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-  DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-  k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-}
-
-val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-  try {
-val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-  topicPartition = topicPartition,
-  timestamp = partitionData.timestamp,
-  maxNumOffsets = partitionData.maxNumOffsets,
-  isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-  fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-  } catch {
-// NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-// are typically transient and there is no value in logging the entire 
stack trace for the same
-case e @ (_ : UnknownTopicOrPartitionException |
-  _ : NotLeaderForPartitionException |
-  _ : KafkaStorageException) =>
-  debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-correlationId, clientId, topicPartition, e.getMessage))
-  (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-case e: Throwable =>
-  error("Error while responding to offset request", e)
-  (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-  }
-}
-responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-val correlationId = request.header.correlationId
-val clientId = request.header.clientId
-val offsetRequest = request.body[ListOffsetRequest]
-
-val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-  DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-  k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-ListOffsetResponse.UNKNOWN_TIMESTAMP,
-ListOffsetResponse.UNKNOWN_OFFSET,
-Optional.empty())
-}
-
-val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-  if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-s"failed because the partition is duplicated in the request.")
-(topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-  ListOffsetResponse.UNKNOWN_TIMESTAMP,
-  ListOffsetResponse.UNKNOWN_OFFSET,
-  Optional.empty()))
-  } else {
-
-def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-  (topicPartition, new ListOffsetResponse.PartitionData(
-e,
-ListOffsetResponse.UNKNOWN_TIMESTAMP,
-  

[GitHub] [kafka] dajac commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-16 Thread GitBox


dajac commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r455765374



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1077,7 +1077,7 @@ class ReplicaManager(val config: KafkaConfig,
   // Try the read first, this tells us whether we need all of 
adjustedFetchSize for this partition
   val readInfo: LogReadInfo = partition.readRecords(
 fetchOffset = fetchInfo.fetchOffset,
-currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
+currentLeaderEpoch = toScalaOption(fetchInfo.currentLeaderEpoch),

Review comment:
   Btw, you can use `.asScala` on a Java Option to convert it directly.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-16 Thread GitBox


mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r455782710



##
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##
@@ -1077,7 +1077,7 @@ class ReplicaManager(val config: KafkaConfig,
   // Try the read first, this tells us whether we need all of 
adjustedFetchSize for this partition
   val readInfo: LogReadInfo = partition.readRecords(
 fetchOffset = fetchInfo.fetchOffset,
-currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
+currentLeaderEpoch = toScalaOption(fetchInfo.currentLeaderEpoch),

Review comment:
   Yes but it give us a `Option[java.lang.Integer]` oject while we want 
`Option[Int]`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-07-16 Thread GitBox


mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r455791789



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -892,136 +895,162 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
 val version = request.header.apiVersion
 
-val mergedResponseMap = if (version == 0)
+val topics = if (version == 0)
   handleListOffsetRequestV0(request)
 else
   handleListOffsetRequestV1AndAbove(request)
 
-sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+  .setThrottleTimeMs(requestThrottleMs)
+  .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
 val correlationId = request.header.correlationId
 val clientId = request.header.clientId
 val offsetRequest = request.body[ListOffsetRequest]
 
-val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-  DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-  k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-}
-
-val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-  try {
-val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-  topicPartition = topicPartition,
-  timestamp = partitionData.timestamp,
-  maxNumOffsets = partitionData.maxNumOffsets,
-  isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-  fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-  } catch {
-// NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-// are typically transient and there is no value in logging the entire 
stack trace for the same
-case e @ (_ : UnknownTopicOrPartitionException |
-  _ : NotLeaderForPartitionException |
-  _ : KafkaStorageException) =>
-  debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-correlationId, clientId, topicPartition, e.getMessage))
-  (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-case e: Throwable =>
-  error("Error while responding to offset request", e)
-  (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-  }
-}
-responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-val correlationId = request.header.correlationId
-val clientId = request.header.clientId
-val offsetRequest = request.body[ListOffsetRequest]
-
-val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-  DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-  k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-ListOffsetResponse.UNKNOWN_TIMESTAMP,
-ListOffsetResponse.UNKNOWN_OFFSET,
-Optional.empty())
-}
-
-val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-  if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-s"failed because the partition is duplicated in the request.")
-(topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-  ListOffsetResponse.UNKNOWN_TIMESTAMP,
-  ListOffsetResponse.UNKNOWN_OFFSET,
-  Optional.empty()))
-  } else {
-
-def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-  (topicPartition, new ListOffsetResponse.PartitionData(
-e,
-ListOffsetResponse.UNKNOWN_TIMESTAMP,
-   

[GitHub] [kafka] rondagostino commented on a change in pull request #9030: MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server

2020-07-16 Thread GitBox


rondagostino commented on a change in pull request #9030:
URL: https://github.com/apache/kafka/pull/9030#discussion_r455792482



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
##
@@ -66,10 +66,11 @@ public DescribeClientQuotasResponse(Map>
 }
 
 public DescribeClientQuotasResponse(int throttleTimeMs, Throwable e) {
+ApiError apiError = ApiError.fromThrowable(e);

Review comment:
   Should the same thing happen in `public 
AlterClientQuotasResponse(Collection entities, int 
throttleTimeMs, Throwable e)` for consistency?  And also maybe in 
`IncrementalAlterConfigsResponse` (`public static 
IncrementalAlterConfigsResponseData toResponseData`)?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-16 Thread GitBox


lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455828890



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
 }
 }
 
+private Map 
toPartitionDataMap(List fetchableTopics) {
+Map result = new LinkedHashMap<>();
+fetchableTopics.forEach(fetchTopic -> 
fetchTopic.partitions().forEach(fetchPartition -> {
+Optional leaderEpoch = 
Optional.of(fetchPartition.currentLeaderEpoch())
+.filter(epoch -> epoch != 
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+result.put(new TopicPartition(fetchTopic.topic(), 
fetchPartition.partition()),
+new PartitionData(fetchPartition.fetchOffset(), 
fetchPartition.logStartOffset(),
+fetchPartition.partitionMaxBytes(), leaderEpoch));
+}));
+return Collections.unmodifiableMap(result);
+}
+
+private List 
toForgottonTopicList(List forgottenTopics) {

Review comment:
   Typo "Forgotton"





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-16 Thread GitBox


lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455835380



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
 }
 }
 
+private Map 
toPartitionDataMap(List fetchableTopics) {
+   Map result = new LinkedHashMap<>();
+fetchableTopics.forEach(fetchTopic -> 
fetchTopic.partitions().forEach(fetchPartition -> {
+Optional leaderEpoch = 
Optional.of(fetchPartition.currentLeaderEpoch())
+.filter(epoch -> epoch != 
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+result.put(new TopicPartition(fetchTopic.topic(), 
fetchPartition.partition()),
+new PartitionData(fetchPartition.fetchOffset(), 
fetchPartition.logStartOffset(),
+fetchPartition.partitionMaxBytes(), leaderEpoch));

Review comment:
   Let's open a jira for getting rid of the toPartitionDataMap if we don't 
address it in this PR. It's a pretty large part of the cost here and there are 
only a few places we would have to deal with it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-16 Thread GitBox


lbradstreet commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r455835380



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -273,6 +99,28 @@ public boolean equals(Object o) {
 }
 }
 
+private Map 
toPartitionDataMap(List fetchableTopics) {
+   Map result = new LinkedHashMap<>();
+fetchableTopics.forEach(fetchTopic -> 
fetchTopic.partitions().forEach(fetchPartition -> {
+Optional leaderEpoch = 
Optional.of(fetchPartition.currentLeaderEpoch())
+.filter(epoch -> epoch != 
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+result.put(new TopicPartition(fetchTopic.topic(), 
fetchPartition.partition()),
+new PartitionData(fetchPartition.fetchOffset(), 
fetchPartition.logStartOffset(),
+fetchPartition.partitionMaxBytes(), leaderEpoch));

Review comment:
   Let's open a jira for getting rid of the toPartitionDataMap if we don't 
address it in this PR. It's a pretty large part of the cost here and there are 
only a few places we would have to deal with it. I think we should fix it 
sooner rather than later too.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2020-07-16 Thread highluck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159285#comment-17159285
 ] 

highluck commented on KAFKA-5488:
-

@Ivan Ponomarev
thanks for comment!
Call me whenever you need help :)

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Marcel "childNo͡.de" Trautwein
>Assignee: Ivan Ponomarev
>Priority: Major
>  Labels: kip
>
> KIP-418: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream]
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
>  [https://gitlab.com/childno.de/apache_kafka/snippets/1665655]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on a change in pull request #8933: KAFKA-10163; Throttle Create Topic, Create Partition and Delete Topic Operations (KIP-599, Part I, Broker Changes)

2020-07-16 Thread GitBox


rajinisivaram commented on a change in pull request #8933:
URL: https://github.com/apache/kafka/pull/8933#discussion_r455832706



##
File path: core/src/main/scala/kafka/server/AdminManager.scala
##
@@ -295,34 +312,44 @@ class AdminManager(val config: KafkaConfig,
   throw new InvalidPartitionsException(s"Topic already has 
$oldNumPartitions partitions.")
 }
 
-val newPartitionsAssignment = Option(newPartition.assignments)
-  .map { assignmentMap =>
-val assignments = assignmentMap.asScala.map {
-  createPartitionAssignment => 
createPartitionAssignment.brokerIds.asScala.map(_.toInt)
-}
-val unknownBrokers = assignments.flatten.toSet -- allBrokerIds
-if (unknownBrokers.nonEmpty)
-  throw new InvalidReplicaAssignmentException(
-s"Unknown broker(s) in replica assignment: 
${unknownBrokers.mkString(", ")}.")
-
-if (assignments.size != numPartitionsIncrement)
-  throw new InvalidReplicaAssignmentException(
-s"Increasing the number of partitions by 
$numPartitionsIncrement " +
-  s"but ${assignments.size} assignments provided.")
-
-assignments.zipWithIndex.map { case (replicas, index) =>
-  existingAssignment.size + index -> replicas
-}.toMap
+val newPartitionsAssignment = Option(newPartition.assignments).map { 
assignmentMap =>
+  val assignments = assignmentMap.asScala.map {
+createPartitionAssignment => 
createPartitionAssignment.brokerIds.asScala.map(_.toInt)
+  }
+  val unknownBrokers = assignments.flatten.toSet -- allBrokerIds
+  if (unknownBrokers.nonEmpty)
+throw new InvalidReplicaAssignmentException(
+  s"Unknown broker(s) in replica assignment: 
${unknownBrokers.mkString(", ")}.")
+
+  if (assignments.size != numPartitionsIncrement)
+throw new InvalidReplicaAssignmentException(
+  s"Increasing the number of partitions by $numPartitionsIncrement 
" +
+s"but ${assignments.size} assignments provided.")
+
+  assignments.zipWithIndex.map { case (replicas, index) =>
+existingAssignment.size + index -> replicas
+  }.toMap
 }
 
-val updatedReplicaAssignment = adminZkClient.addPartitions(topic, 
existingAssignment, allBrokers,
-  newPartition.count, newPartitionsAssignment, validateOnly = 
validateOnly)
-CreatePartitionsMetadata(topic, updatedReplicaAssignment.keySet, 
ApiError.NONE)
+val assignmentForNewPartitions = 
adminZkClient.createNewPartitionsAssignment(
+  topic, existingAssignment, allBrokers, newPartition.count, 
newPartitionsAssignment)
+
+if (validateOnly) {
+  CreatePartitionsMetadata(topic, (existingAssignment ++ 
assignmentForNewPartitions).keySet)

Review comment:
   Shouldn't validateOnly tell you know much you would have been throttled?

##
File path: core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
##
@@ -20,21 +20,32 @@ import java.util.concurrent.TimeUnit
 
 import kafka.network.RequestChannel
 import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.Sensor.QuotaEnforcementType
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.quota.ClientQuotaCallback
 
 import scala.jdk.CollectionConverters._
 
+object ClientRequestQuotaManager {
+  val QuotaRequestPercentDefault = Int.MaxValue.toDouble
+  val NanosToPercentagePerSecond = 100.0 / TimeUnit.SECONDS.toNanos(1)
+}
 
 class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
 private val metrics: Metrics,
 private val time: Time,
-threadNamePrefix: String,
-quotaCallback: Option[ClientQuotaCallback])
-extends ClientQuotaManager(config, metrics, 
QuotaType.Request, time, threadNamePrefix, quotaCallback) {
-  val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
-  def exemptSensor = getOrCreateSensor(exemptSensorName, exemptMetricName)
+private val threadNamePrefix: String,
+private val quotaCallback: 
Option[ClientQuotaCallback])
+extends ClientQuotaManager(config, metrics, QuotaType.Request, 
QuotaEnforcementType.PERMISSIVE,
+  time, threadNamePrefix, quotaCallback) {
+
+  private val maxThrottleTimeMs = 
TimeUnit.SECONDS.toMillis(this.config.quotaWindowSizeSeconds)
+  private val exemptMetricName = metrics.metricName("exempt-request-time",
+QuotaType.Request.toString, "Tracking exempt-request-time utilization 
percentage")
+  private val exemptSensorName = "exempt-" + QuotaType.Request

Revie

[GitHub] [kafka] vvcephei commented on a change in pull request #9027: KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing

2020-07-16 Thread GitBox


vvcephei commented on a change in pull request #9027:
URL: https://github.com/apache/kafka/pull/9027#discussion_r455299303



##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -181,6 +193,7 @@ bootstrap.serverscache.max.bytes.buffering
 Medium
 Maximum number of memory bytes to be used for 
record caches across all threads.
+Maximum number of memory bytes to be used for 
record caches across all threads.

Review comment:
   duplicate?

##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -270,43 +319,47 @@ bootstrap.serversThe amount of time in milliseconds, before a 
request is retried. This applies if the retries parameter is configured to be greater than 0. 

   100
   
-  rocksdb.config.setter
+  rocksdb.config.setter
 Medium
 The RocksDB configuration.
 
   
-  state.cleanup.delay.ms
+  state.cleanup.delay.ms
 Low
 The amount of time in milliseconds to wait before 
deleting state when a partition has migrated.
 60 milliseconds
   
-  state.dir
+  state.dir
 High
 Directory location for state stores.
 /tmp/kafka-streams
   
-  timestamp.extractor
+  topology.optimization
 Medium
-Timestamp extractor class that implements the 
TimestampExtractor interface.
-See Timestamp Extractor
+A configuration telling Kafka Streams if it should 
optimize the topology
+none
   
-  upgrade.from
+  upgrade.from
 Medium
 The version you are upgrading from during a 
rolling upgrade.
 See Upgrade 
From
   
-  value.serde
-Medium
-Default serializer/deserializer class for record 
values, implements the Serde interface (see also key.serde).
-Serdes.ByteArray().getClass().getName()
-  
   windowstore.changelog.additional.retention.ms
 Low
 Added to a windows maintainMs to ensure data is 
not deleted from the log prematurely. Allows for clock drift.
 8640 milliseconds = 1 day
   
   
 
+
+  acceptable.recovery.lag
+  
+
+  The maximum acceptable lag (total number of offsets to catch up 
from the changelog) for an instance to be considered caught-up and able to 
receive an active task. Streams will only assign
+  stateful active tasks to instances whose state stores are within 
the acceptable recovery lag, if any exist, and assign warmup replicas to 
restore state in the background for instances
+  that are not yet caught up. Should correspond to a recovery time 
of well under a minute for a given workload. Must be at least 0.

Review comment:
   Maybe we can also mention that if you set it to Long.MAX_VALUE, it 
effectively disables warmups and HA task migration, allowing Streams to produce 
a balanced assignment in one shot.

##
File path: docs/streams/developer-guide/running-app.html
##
@@ -110,6 +110,18 @@ Removing capacity 
from your applicationIf a local state store exists, the changelog is 
replayed from the previously checkpointed offset. The changes are applied and 
the state is restored to the most recent snapshot. This method takes less time 
because it is applying a smaller portion of the changelog.
   
   For more information, see Standby Replicas.
+  
+  As of version 2.6, Streams will now do most of a task's 
restoration in the background through warmup replicas. These will be assigned 
to instances that need to restore a lot of state for a task.
+  A stateful active task will only be assigned to an 
instance once it's state is within the configured

Review comment:
   ```suggestion
 A stateful active task will only be assigned to an 
instance once its state is within the configured
   ```

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -639,16 +643,16 @@
 Serdes.ByteArraySerde.class.getName(),
 Importance.MEDIUM,
 DEFAULT_VALUE_SERDE_CLASS_DOC)
-.define(NUM_STANDBY_REPLICAS_CONFIG,
-Type.INT,
-0,
+.define(DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS,
+Type.CLASS,
+null,
 Importance.MEDIUM,
-NUM_STANDBY_REPLICAS_DOC)
-.define(NUM_STREAM_THREADS_CONFIG,
-Type.INT,
-1,
+DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC)
+   

[GitHub] [kafka] cmccabe commented on pull request #8948: KAFKA-10174: Prefer --bootstrap-server for configs command in ducker tests

2020-07-16 Thread GitBox


cmccabe commented on pull request #8948:
URL: https://github.com/apache/kafka/pull/8948#issuecomment-659508365


   ok, that works for me.  LGTM



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #8948: KAFKA-10174: Prefer --bootstrap-server for configs command in ducker tests

2020-07-16 Thread GitBox


cmccabe merged pull request #8948:
URL: https://github.com/apache/kafka/pull/8948


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10174) Prefer --bootstrap-server ducktape tests using kafka_configs.sh

2020-07-16 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10174?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-10174.
--
Fix Version/s: 2.7
 Reviewer: Colin McCabe
   Resolution: Fixed

> Prefer --bootstrap-server ducktape tests using kafka_configs.sh
> ---
>
> Key: KAFKA-10174
> URL: https://issues.apache.org/jira/browse/KAFKA-10174
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Vinoth Chandar
>Assignee: Vinoth Chandar
>Priority: Major
> Fix For: 2.7
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ning2008wisc commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-16 Thread GitBox


ning2008wisc commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r455902217



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException {
 backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * 
RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
 }
 
+private void waitForConsumerGroupOffsetSync(Consumer 
consumer, List topics)
+throws InterruptedException {
+Admin backupClient = backup.kafka().createAdminClient();
+List tps = new ArrayList<>(NUM_PARTITIONS * 
topics.size());
+IntStream.range(0, NUM_PARTITIONS).forEach(
+partitionInd -> {

Review comment:
   could we have a more intuitive variable name for `partitionInd`? e.g. 
`partitionId` or `partitionIndex`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on a change in pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-16 Thread GitBox


ning2008wisc commented on a change in pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#discussion_r455903116



##
File path: 
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
##
@@ -315,9 +319,34 @@ public void testReplication() throws InterruptedException {
 backup.kafka().consume(NUM_RECORDS_PRODUCED, 2 * 
RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count());
 }
 
+private void waitForConsumerGroupOffsetSync(Consumer 
consumer, List topics)
+throws InterruptedException {
+Admin backupClient = backup.kafka().createAdminClient();
+List tps = new ArrayList<>(NUM_PARTITIONS * 
topics.size());
+IntStream.range(0, NUM_PARTITIONS).forEach(
+partitionInd -> {
+for (String topic: topics) {
+tps.add(new TopicPartition(topic, partitionInd));
+}
+}
+);
+long expectedTotalOffsets = NUM_RECORDS_PRODUCED * topics.size();
+
+waitForCondition(() -> {
+Map consumerGroupOffsets =
+
backupClient.listConsumerGroupOffsets("consumer-group-1").partitionsToOffsetAndMetadata().get();

Review comment:
   could we consider to pass in the consumer group name as a input variable 
of `waitForConsumerGroupOffsetSync`, so that `waitForConsumerGroupOffsetSync` 
looks more generic?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

2020-07-16 Thread GitBox


vvcephei commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r455904752



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##
@@ -20,62 +20,49 @@
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class StreamThreadStateStoreProvider {
 
 private final StreamThread streamThread;
-private final InternalTopologyBuilder internalTopologyBuilder;
 
-public StreamThreadStateStoreProvider(final StreamThread streamThread,
-  final InternalTopologyBuilder 
internalTopologyBuilder) {
+public StreamThreadStateStoreProvider(final StreamThread streamThread) {
 this.streamThread = streamThread;
-this.internalTopologyBuilder = internalTopologyBuilder;
 }
 
 @SuppressWarnings("unchecked")
 public  List stores(final StoreQueryParameters storeQueryParams) {
 final String storeName = storeQueryParams.storeName();
 final QueryableStoreType queryableStoreType = 
storeQueryParams.queryableStoreType();
-final TaskId keyTaskId = createKeyTaskId(storeName, 
storeQueryParams.partition());
 if (streamThread.state() == StreamThread.State.DEAD) {
 return Collections.emptyList();
 }
 final StreamThread.State state = streamThread.state();
 if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == 
StreamThread.State.RUNNING) {
 final Map tasks = 
storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : 
streamThread.activeTaskMap();
-final List stores = new ArrayList<>();
-if (keyTaskId != null) {
-final Task task = tasks.get(keyTaskId);
-if (task == null) {
+if (storeQueryParams.partition() != null) {
+final Task streamTask = findStreamTask(tasks, storeName, 
storeQueryParams.partition());
+if (streamTask == null) {
 return Collections.emptyList();
 }
-final T store = 
validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, 
keyTaskId);
-if (store != null) {
-return Collections.singletonList(store);
-}
-} else {
-for (final Task streamTask : tasks.values()) {
-final T store = 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id());
-if (store != null) {
-stores.add(store);
-}
-}
+final T store = 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id());
+return store != null ? Collections.singletonList(store) : 
Collections.emptyList();

Review comment:
   The nested early-return pattern is pretty hard to follow. Do you mind 
rewriting it to use if/else blocks? I know it was previously doing some early 
returns; it'd be better to migrate to a more maintainable style when we update 
the code, though.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final 
StoreQueryParameters storeQueryParamet
 public  List stores(final String storeName,
   final QueryableStoreType queryableStoreType) {
 final List allStores = new ArrayList<>();
-for (final StreamThreadStateStoreProvider provider : storeProviders) {
-final List stores = provider.stores(storeQueryParameters);
-allStores.addAll(stores);
+for (final StreamThreadStateStoreProvider storeProvider : 
storeProviders) {
+final List stores = storeProvider.stores(storeQueryParameters);
+if (!stores.isEmpty()) {
+allStores.addAll(stores);
+if (storeQueryParameters.partition() != null) {
+break;
+}
+}
  

[GitHub] [kafka] ning2008wisc edited a comment on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-16 Thread GitBox


ning2008wisc edited a comment on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-659516303


   Hi @showuon thanks for the fix, it looks a good start. Another minor and 
non-blocking comment may be: if it is a small fix, probably 1 commit in the PR 
looks more neat.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc commented on pull request #9029: KAFKA-10255: Fix flaky testOneWayReplicationWithAutoOffsetSync test

2020-07-16 Thread GitBox


ning2008wisc commented on pull request #9029:
URL: https://github.com/apache/kafka/pull/9029#issuecomment-659516303


   He @showuon thanks for the fix, it looks a good start. Another minor and 
non-blocking comment may be: if it is a small fix, probably 1 commit in the PR 
looks more neat.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10205) NullPointerException in StreamTask

2020-07-16 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159332#comment-17159332
 ] 

Sophie Blee-Goldman commented on KAFKA-10205:
-

Given how many people have hit this, maybe we can add a null check here and 
throw a more descriptive exception or log a more helpful error (eg "...was 
null, this may be the result of a non-deterministic topology ordering")

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-10205
> URL: https://issues.apache.org/jira/browse/KAFKA-10205
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Brian Forkan
>Assignee: John Roesler
>Priority: Minor
>
> In our Kafka Streams application we have been experiencing a 
> NullPointerException when deploying a new version of our application. This 
> does not happen during a normal rolling restart.
> The exception is:
> {code:java}
> Error caught during partition assignment, will abort the current process and 
> re-throw at the end of 
> rebalance","stack_trace":"java.lang.NullPointerException: nullError caught 
> during partition assignment, will abort the current process and re-throw at 
> the end of rebalance","stack_trace":"java.lang.NullPointerException: null at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) 
> at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at 
> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {code}
> And the relevant lines of code - 
> [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196]
> I suspect "topology.source(partition.topic());" is returning null.
> Has anyone experienced this issue before? I suspect there is a problem with 
> our topology but I can't replicate this on my machine so I can't tell.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on a change in pull request #9030: MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server

2020-07-16 Thread GitBox


rajinisivaram commented on a change in pull request #9030:
URL: https://github.com/apache/kafka/pull/9030#discussion_r455916021



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DescribeClientQuotasResponse.java
##
@@ -66,10 +66,11 @@ public DescribeClientQuotasResponse(Map>
 }
 
 public DescribeClientQuotasResponse(int throttleTimeMs, Throwable e) {
+ApiError apiError = ApiError.fromThrowable(e);

Review comment:
   @rondagostino Thanks for the review. Updated 
`AlterClientQuotasResponse`. I think IncrementalAlterConfigsResponse is ok 
since it uses ApiError, we create the error with the right message when 
converting from Throwable. Can you just verify that the method with ApiError 
was the one you meant? Thanks,





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #9030: MINOR: Filter out quota configs for ConfigCommand using --bootstrap-server

2020-07-16 Thread GitBox


rajinisivaram commented on a change in pull request #9030:
URL: https://github.com/apache/kafka/pull/9030#discussion_r455916169



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -197,18 +197,20 @@
 
 public class RequestResponseTest {
 
+private UnknownServerException unknownServerException = new 
UnknownServerException("secret");

Review comment:
   @dajac Thanks for the review, added comment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #9031: POC: replace abstract Windows with a proper interface

2020-07-16 Thread GitBox


vvcephei opened a new pull request #9031:
URL: https://github.com/apache/kafka/pull/9031


   Just a POC for illustrative purposes. No need to review.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vinothchandar commented on pull request #8948: KAFKA-10174: Prefer --bootstrap-server for configs command in ducker tests

2020-07-16 Thread GitBox


vinothchandar commented on pull request #8948:
URL: https://github.com/apache/kafka/pull/8948#issuecomment-659531282


   Thanks @cmccabe 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters

2020-07-16 Thread GitBox


ableegoldman commented on a change in pull request #9028:
URL: https://github.com/apache/kafka/pull/9028#discussion_r455916364



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -151,7 +151,7 @@ private void prepareConfigs() {
 streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
 streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
 streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
STREAMS_CONSUMER_TIMEOUT);

Review comment:
   Hah, this was pretty janky. Good catch

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -265,7 +265,7 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
 public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() 
throws Exception {

Review comment:
   Why are none of these tests...actually tests? Can you also fix this, ie 
add `@Test` annotations to all the tests here?
   
   I think you can then simplify the two tests that extend this abstract test 
class (`ResetIntegrationTest` and `ResetIntegrationWithSslTest`) and just 
remove all the tests that just call `super.testXXX` -- they should 
automatically run all of the tests in this class





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

2020-07-16 Thread GitBox


dima5rr commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r455934580



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##
@@ -20,62 +20,49 @@
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.processor.internals.Task;
 import org.apache.kafka.streams.state.QueryableStoreType;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.TimestampedWindowStore;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class StreamThreadStateStoreProvider {
 
 private final StreamThread streamThread;
-private final InternalTopologyBuilder internalTopologyBuilder;
 
-public StreamThreadStateStoreProvider(final StreamThread streamThread,
-  final InternalTopologyBuilder 
internalTopologyBuilder) {
+public StreamThreadStateStoreProvider(final StreamThread streamThread) {
 this.streamThread = streamThread;
-this.internalTopologyBuilder = internalTopologyBuilder;
 }
 
 @SuppressWarnings("unchecked")
 public  List stores(final StoreQueryParameters storeQueryParams) {
 final String storeName = storeQueryParams.storeName();
 final QueryableStoreType queryableStoreType = 
storeQueryParams.queryableStoreType();
-final TaskId keyTaskId = createKeyTaskId(storeName, 
storeQueryParams.partition());
 if (streamThread.state() == StreamThread.State.DEAD) {
 return Collections.emptyList();
 }
 final StreamThread.State state = streamThread.state();
 if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == 
StreamThread.State.RUNNING) {
 final Map tasks = 
storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : 
streamThread.activeTaskMap();
-final List stores = new ArrayList<>();
-if (keyTaskId != null) {
-final Task task = tasks.get(keyTaskId);
-if (task == null) {
+if (storeQueryParams.partition() != null) {
+final Task streamTask = findStreamTask(tasks, storeName, 
storeQueryParams.partition());
+if (streamTask == null) {
 return Collections.emptyList();
 }
-final T store = 
validateAndListStores(task.getStore(storeName), queryableStoreType, storeName, 
keyTaskId);
-if (store != null) {
-return Collections.singletonList(store);
-}
-} else {
-for (final Task streamTask : tasks.values()) {
-final T store = 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id());
-if (store != null) {
-stores.add(store);
-}
-}
+final T store = 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id());
+return store != null ? Collections.singletonList(store) : 
Collections.emptyList();

Review comment:
   sure





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

2020-07-16 Thread GitBox


dima5rr commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r455939491



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/WrappingStoreProvider.java
##
@@ -46,11 +46,22 @@ public void setStoreQueryParameters(final 
StoreQueryParameters storeQueryParamet
 public  List stores(final String storeName,
   final QueryableStoreType queryableStoreType) {
 final List allStores = new ArrayList<>();
-for (final StreamThreadStateStoreProvider provider : storeProviders) {
-final List stores = provider.stores(storeQueryParameters);
-allStores.addAll(stores);
+for (final StreamThreadStateStoreProvider storeProvider : 
storeProviders) {
+final List stores = storeProvider.stores(storeQueryParameters);
+if (!stores.isEmpty()) {
+allStores.addAll(stores);
+if (storeQueryParameters.partition() != null) {
+break;
+}
+}
 }
 if (allStores.isEmpty()) {
+if (storeQueryParameters.partition() != null) {
+throw new InvalidStateStoreException(
+String.format("The specified partition %d for store %s 
does not exist.",

Review comment:
   L65 catches on rebalancing, while L60 is parameter validation for 
incorrect partition case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

2020-07-16 Thread GitBox


vvcephei commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r455948704



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##
@@ -51,18 +51,22 @@ public StreamThreadStateStoreProvider(final StreamThread 
streamThread) {
 final StreamThread.State state = streamThread.state();
 if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == 
StreamThread.State.RUNNING) {
 final Map tasks = 
storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : 
streamThread.activeTaskMap();
+final List stores = new ArrayList<>();
 if (storeQueryParams.partition() != null) {
 final Task streamTask = findStreamTask(tasks, storeName, 
storeQueryParams.partition());
-if (streamTask == null) {
-return Collections.emptyList();
+if (streamTask != null) {
+final T store = 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id());
+if (store != null) {
+stores.add(store);
+}
 }
-final T store = 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id());
-return store != null ? Collections.singletonList(store) : 
Collections.emptyList();
+} else {
+tasks.values().stream().
+map(streamTask -> 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id())).
+filter(Objects::nonNull).
+forEach(stores::add);
 }
-return tasks.values().stream().
-map(streamTask -> 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id())).
-filter(Objects::nonNull).
-collect(Collectors.toList());
+return Collections.unmodifiableList(stores);

Review comment:
   Ah, sorry, I can see that my prior comment was ambiguous. This is what I 
meant:
   ```java
   if (storeQueryParams.partition() == null) {
   return tasks.values().stream().
   map(streamTask -> 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id())).
   filter(Objects::nonNull).
   collect(Collectors.toList());
   } else {
   final Task streamTask = findStreamTask(tasks, storeName, 
storeQueryParams.partition());
   if (streamTask == null) {
   return Collections.emptyList();
   } else {
   final T store = 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id());
   return store == null ? Collections.emptyList() : 
Collections.singletonList(store);
   }
   }
   ```
   
   The reason this is better for maintenence is that you only have to trace a 
path through the nested conditionals into a single inner block to understand 
what gets returned. I.e., code comprehension complexity is only the depth of 
the conditional tree.
   
   In contrast, if we do early returns, you have to fully read all the 
conditional blocks that lead up to the one you're interested (depth-first 
traversal), so code comprehension is linear instead of logarithmic. If we 
mutate the collection, you actually have to read _all_ the conditionals to 
understand what is going to happen, so code comprehension is also linear 
instead of logarithmic.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10205) NullPointerException in StreamTask

2020-07-16 Thread Brian Forkan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159365#comment-17159365
 ] 

Brian Forkan commented on KAFKA-10205:
--

[~mjsax] this fix appears to do the trick. I will keep monitoring of course and 
will let you know if we see it again. Thanks for your help everyone.

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-10205
> URL: https://issues.apache.org/jira/browse/KAFKA-10205
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Brian Forkan
>Assignee: John Roesler
>Priority: Minor
>
> In our Kafka Streams application we have been experiencing a 
> NullPointerException when deploying a new version of our application. This 
> does not happen during a normal rolling restart.
> The exception is:
> {code:java}
> Error caught during partition assignment, will abort the current process and 
> re-throw at the end of 
> rebalance","stack_trace":"java.lang.NullPointerException: nullError caught 
> during partition assignment, will abort the current process and re-throw at 
> the end of rebalance","stack_trace":"java.lang.NullPointerException: null at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) 
> at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at 
> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {code}
> And the relevant lines of code - 
> [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196]
> I suspect "topology.source(partition.topic());" is returning null.
> Has anyone experienced this issue before? I suspect there is a problem with 
> our topology but I can't replicate this on my machine so I can't tell.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #9011: KAFKA-10134: Still depends on existence of any fetchable partitions to block on join

2020-07-16 Thread GitBox


hachikuji commented on pull request #9011:
URL: https://github.com/apache/kafka/pull/9011#issuecomment-659560017


   @guozhangwang There is one detail I think we're missing. If 
`updateAssignmentMetadataIfNeeded` does not block, then execution will fall 
through to `pollForFetches`. I would like to understand why `pollForFetches` is 
not blocking. As far as I can tell, the only thing that would cause that is if 
`Heartbeat.timeToNextHeartbeat` is returning 0. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dima5rr commented on a change in pull request #9020: KAFKA-10271 Performance degradation while fetching a key from a single partition

2020-07-16 Thread GitBox


dima5rr commented on a change in pull request #9020:
URL: https://github.com/apache/kafka/pull/9020#discussion_r455974476



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProvider.java
##
@@ -51,18 +51,22 @@ public StreamThreadStateStoreProvider(final StreamThread 
streamThread) {
 final StreamThread.State state = streamThread.state();
 if (storeQueryParams.staleStoresEnabled() ? state.isAlive() : state == 
StreamThread.State.RUNNING) {
 final Map tasks = 
storeQueryParams.staleStoresEnabled() ? streamThread.allTasks() : 
streamThread.activeTaskMap();
+final List stores = new ArrayList<>();
 if (storeQueryParams.partition() != null) {
 final Task streamTask = findStreamTask(tasks, storeName, 
storeQueryParams.partition());
-if (streamTask == null) {
-return Collections.emptyList();
+if (streamTask != null) {
+final T store = 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id());
+if (store != null) {
+stores.add(store);
+}
 }
-final T store = 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id());
-return store != null ? Collections.singletonList(store) : 
Collections.emptyList();
+} else {
+tasks.values().stream().
+map(streamTask -> 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id())).
+filter(Objects::nonNull).
+forEach(stores::add);
 }
-return tasks.values().stream().
-map(streamTask -> 
validateAndListStores(streamTask.getStore(storeName), queryableStoreType, 
storeName, streamTask.id())).
-filter(Objects::nonNull).
-collect(Collectors.toList());
+return Collections.unmodifiableList(stores);

Review comment:
   Will concise it into functional way.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10035) Improve the AbstractResetIntegrationTest

2020-07-16 Thread Sergey (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159387#comment-17159387
 ] 

Sergey commented on KAFKA-10035:


Thanks!

> Improve the AbstractResetIntegrationTest
> 
>
> Key: KAFKA-10035
> URL: https://issues.apache.org/jira/browse/KAFKA-10035
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: feyman
>Assignee: Sergei
>Priority: Minor
>
> In the test: AbstractResetIntegrationTest, there are several places like 
> below:
>  
> {code:java}
> streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
> STREAMS_CONSUMER_TIMEOUT * 100);
> {code}
> which leverage `Long` to `String` conversion as a workaround.
>  
> {code:java}
> streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
> STREAMS_CONSUMER_TIMEOUT * 100);
> {code}
> or exception will be thrown if it is like:
> {code:java}
> {{org.apache.kafka.common.config.ConfigException: Invalid value 20 for 
> configuration session.timeout.ms: Expected value to be a 32-bit integer, but 
> it was a java.lang.Long
>  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:672)
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129)
>  at 
> org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:606)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630)
>  at 
> org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:313)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:766)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:652)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:562)
>  at 
> org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(AbstractResetIntegrationTest.java:270)
>  at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(ResetIntegrationTest.java:77)}}{code}
> This may not seem very intuitive and need enhancement. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Issue Comment Deleted] (KAFKA-10035) Improve the AbstractResetIntegrationTest

2020-07-16 Thread Sergey (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10035?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey updated KAFKA-10035:
---
Comment: was deleted

(was: Thanks!)

> Improve the AbstractResetIntegrationTest
> 
>
> Key: KAFKA-10035
> URL: https://issues.apache.org/jira/browse/KAFKA-10035
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: feyman
>Assignee: Sergei
>Priority: Minor
>
> In the test: AbstractResetIntegrationTest, there are several places like 
> below:
>  
> {code:java}
> streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
> STREAMS_CONSUMER_TIMEOUT * 100);
> {code}
> which leverage `Long` to `String` conversion as a workaround.
>  
> {code:java}
> streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
> STREAMS_CONSUMER_TIMEOUT * 100);
> {code}
> or exception will be thrown if it is like:
> {code:java}
> {{org.apache.kafka.common.config.ConfigException: Invalid value 20 for 
> configuration session.timeout.ms: Expected value to be a 32-bit integer, but 
> it was a java.lang.Long
>  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:672)
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129)
>  at 
> org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:606)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630)
>  at 
> org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:313)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:766)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:652)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:562)
>  at 
> org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(AbstractResetIntegrationTest.java:270)
>  at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(ResetIntegrationTest.java:77)}}{code}
> This may not seem very intuitive and need enhancement. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10035) Improve the AbstractResetIntegrationTest

2020-07-16 Thread Sergei (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159388#comment-17159388
 ] 

Sergei commented on KAFKA-10035:


Thanks!

> Improve the AbstractResetIntegrationTest
> 
>
> Key: KAFKA-10035
> URL: https://issues.apache.org/jira/browse/KAFKA-10035
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: feyman
>Assignee: Sergei
>Priority: Minor
>
> In the test: AbstractResetIntegrationTest, there are several places like 
> below:
>  
> {code:java}
> streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
> STREAMS_CONSUMER_TIMEOUT * 100);
> {code}
> which leverage `Long` to `String` conversion as a workaround.
>  
> {code:java}
> streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
> STREAMS_CONSUMER_TIMEOUT * 100);
> {code}
> or exception will be thrown if it is like:
> {code:java}
> {{org.apache.kafka.common.config.ConfigException: Invalid value 20 for 
> configuration session.timeout.ms: Expected value to be a 32-bit integer, but 
> it was a java.lang.Long
>  at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:672)
>  at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
>  at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108)
>  at 
> org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129)
>  at 
> org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:606)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630)
>  at 
> org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:313)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:766)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:652)
>  at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:562)
>  at 
> org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(AbstractResetIntegrationTest.java:270)
>  at 
> org.apache.kafka.streams.integration.ResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(ResetIntegrationTest.java:77)}}{code}
> This may not seem very intuitive and need enhancement. 
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-4620) Connection exceptions in JMXTool do not make it to the top level

2020-07-16 Thread Kowshik Prakasam (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159392#comment-17159392
 ] 

Kowshik Prakasam commented on KAFKA-4620:
-

My observation is that it seems this issue is resolved now, as I see the 
following code: 
[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/JmxTool.scala#L131-L154]
 . 

Perhaps this was fixed as early as in this PR: 
[https://github.com/apache/kafka/pull/3547 
.|https://github.com/apache/kafka/pull/3547]

> Connection exceptions in JMXTool do not make it to the top level
> 
>
> Key: KAFKA-4620
> URL: https://issues.apache.org/jira/browse/KAFKA-4620
> Project: Kafka
>  Issue Type: Bug
>Reporter: Apurva Mehta
>Assignee: Apurva Mehta
>Priority: Major
>
> If you run JMXTool before the target process is initialized, the JMX 
> connection is refused and the tool quits. 
> Adding the following retry :
> {code:java}
> while (retries < maxNumRetries && !connected) {
>   try {
> System.err.println("Trying to connect to JMX url: %s".format(url))
> jmxc = JMXConnectorFactory.connect(url, null)
> mbsc = jmxc.getMBeanServerConnection()
> connected = true
>   } catch {
> case e : Exception => {
>   System.err.println("Could not connect to JMX url: %s. Exception 
> %s".format(url, e.getMessage))
>   retries += 1
>   Thread.sleep(500)
> }
>   }
> }
> {code}
> does not work because the exceptions do not make it to the top level. Running 
> the above code results in the following output on stderr
> {noformat}
> Trying to connect to JMX url: 
> service:jmx:rmi:///jndi/rmi://127.0.0.1:9192/jmxrmi
> Jan 11, 2017 8:20:33 PM ClientCommunicatorAdmin restart
> WARNING: Failed to restart: java.io.IOException: Failed to get a RMI stub: 
> javax.naming.ServiceUnavailableException [Root exception is 
> java.rmi.ConnectException: Connection refused to host: 127.0.0.1; nested 
> exception is:
> java.net.ConnectException: Connection refused]
> Jan 11, 2017 8:20:33 PM RMIConnector RMIClientCommunicatorAdmin-doStop
> WARNING: Failed to call the method close():java.rmi.ConnectException: 
> Connection refused to host: 172.31.15.109; nested exception is:
> java.net.ConnectException: Connection refused
> Jan 11, 2017 8:20:33 PM ClientCommunicatorAdmin Checker-run
> WARNING: Failed to check connection: java.net.ConnectException: Connection 
> refused
> Jan 11, 2017 8:20:33 PM ClientCommunicatorAdmin Checker-run
> WARNING: stopping
> {noformat}
> We need to add working retry logic to JMXTool so that it can start correctly 
> even if the target process is not ready initially. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bdbyrne commented on a change in pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-16 Thread GitBox


bdbyrne commented on a change in pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#discussion_r455998374



##
File path: 
core/src/test/scala/unit/kafka/admin/TopicCommandWithAdminClientTest.scala
##
@@ -56,12 +56,17 @@ class TopicCommandWithAdminClientTest extends 
KafkaServerTestHarness with Loggin
 zkConnect = zkConnect,
 rackInfo = Map(0 -> "rack1", 1 -> "rack2", 2 -> "rack2", 3 -> "rack1", 4 
-> "rack3", 5 -> "rack3"),
 numPartitions = numPartitions,
-defaultReplicationFactor = defaultReplicationFactor
+defaultReplicationFactor = defaultReplicationFactor,
+replicaFetchMaxBytes = replicaFetchMaxBytes(),
   ).map(KafkaConfig.fromProps)
 
   private val numPartitions = 1
   private val defaultReplicationFactor = 1.toShort
 
+  private def replicaFetchMaxBytes() =
+if (testName.getMethodName == 
"testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress") Some(1)

Review comment:
   Agreed, the `KafkaServerTestHarness` makes it more difficult. I've 
updated the test to set max fetch bytes to 1 for all tests, which is fine given 
none of the other tests produce data.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers

2020-07-16 Thread GitBox


hachikuji commented on a change in pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#discussion_r456008354



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -58,8 +58,8 @@
  *
  * - {@link Errors#OFFSET_OUT_OF_RANGE} If the fetch offset is out of range 
for a requested partition
  * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} If the user does not have READ 
access to a requested topic
- * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a 
broker which is not a replica
- * - {@link Errors#NOT_LEADER_FOR_PARTITION} If the broker is not a leader and 
either the provided leader epoch
+ * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a 
broker with version 2.4 to 2.6 which is not a replica

Review comment:
   Is the note about the range from 2.4 to 2.6 correct? I think this error 
has always been possible in the case of a reassignment.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10279) Allow dynamic update of certificates with additional SubjectAltNames

2020-07-16 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-10279:
--

 Summary: Allow dynamic update of certificates with additional 
SubjectAltNames
 Key: KAFKA-10279
 URL: https://issues.apache.org/jira/browse/KAFKA-10279
 Project: Kafka
  Issue Type: Improvement
  Components: security
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.7.0


At the moment, we don't allow dynamic keystore update in brokers if DN and 
SubjectAltNames don't match exactly. This is to ensure that existing clients 
and inter-broker communication don't break. Since addition of new entries to 
SubjectAltNames will not break any authentication, we should allow that and 
just verify that new SubjectAltNames is a superset of the old one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #9022: KAFKA-10158: Improve test behavior and resolve flakiness of test...

2020-07-16 Thread GitBox


hachikuji commented on pull request #9022:
URL: https://github.com/apache/kafka/pull/9022#issuecomment-659626413


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10279) Allow dynamic update of certificates with additional SubjectAltNames

2020-07-16 Thread Maulin Vasavada (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10279?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159436#comment-17159436
 ] 

Maulin Vasavada commented on KAFKA-10279:
-

+1 Rajini. 

> Allow dynamic update of certificates with additional SubjectAltNames
> 
>
> Key: KAFKA-10279
> URL: https://issues.apache.org/jira/browse/KAFKA-10279
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.7.0
>
>
> At the moment, we don't allow dynamic keystore update in brokers if DN and 
> SubjectAltNames don't match exactly. This is to ensure that existing clients 
> and inter-broker communication don't break. Since addition of new entries to 
> SubjectAltNames will not break any authentication, we should allow that and 
> just verify that new SubjectAltNames is a superset of the old one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10280) Message filtering support based on keys format/Headers

2020-07-16 Thread Raj (Jira)
Raj created KAFKA-10280:
---

 Summary: Message filtering support based on keys format/Headers
 Key: KAFKA-10280
 URL: https://issues.apache.org/jira/browse/KAFKA-10280
 Project: Kafka
  Issue Type: Improvement
  Components: consumer, core
Reporter: Raj


There are many scenarios where there is a need for consumers to subscribe to a 
message based on pattern eg. Key format or headers. 
Typically, this scenario gets solved using external components implementing a 
filtering logic.

In a deployment with a large number of consumers,This becomes a huger 
performance (network/IO) overheads if most messages are getting discarded by 
the consumer based on the filter. If there are 1's of consumers subscribing 
to the topic partitions, there is unnecessary IO, which could be avoided broker 
can apply filter per subscriptions.
 
I wanted to hear insights from the Kafka community around how they are solving 
this problem and gauge interest in formally submitting KIP around filtering as 
part of core capability.
 
Thanks,
Raj

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeffkbkim commented on a change in pull request #8935: KAFKA-10189: reset event queue time histogram when queue is empty

2020-07-16 Thread GitBox


jeffkbkim commented on a change in pull request #8935:
URL: https://github.com/apache/kafka/pull/8935#discussion_r456064275



##
File path: core/src/main/scala/kafka/controller/ControllerEventManager.scala
##
@@ -139,4 +140,19 @@ class ControllerEventManager(controllerId: Int,
 }
   }
 
+  private def pollFromEventQueue(): QueuedEvent = {

Review comment:
   unfortunately, the metrics version kafka uses (v2.2.0) is no longer 
supported by yammer https://github.com/dropwizard/metrics/issues/1618





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] serjchebotarev commented on a change in pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters

2020-07-16 Thread GitBox


serjchebotarev commented on a change in pull request #9028:
URL: https://github.com/apache/kafka/pull/9028#discussion_r456067480



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -265,7 +265,7 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
 public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() 
throws Exception {

Review comment:
   Tried on this, partially it went good, but several tests fail under 
`ResetIntegrationWithSslTest`, namely: 
   - `shouldNotAllowToResetWhileStreamsIsRunning`
   - `shouldNotAllowToResetWhenInputTopicAbsent`
   - `shouldNotAllowToResetWhenIntermediateTopicAbsent`
   
   From output of `shouldNotAllowToResetWhileStreamsIsRunning` there are some 
log lines that mention SSL handshake failure:
   
   ```
   [2020-07-16 23:00:20,764] INFO stream-thread 
[reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d-StreamThread-1]
 State transition from PARTITIONS_ASSIGNED to RUNNING 
(org.apache.kafka.streams.processor.internals.StreamThread:220)
   [2020-07-16 23:00:20,766] INFO stream-client 
[reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d]
 State transition from REBALANCING to RUNNING 
(org.apache.kafka.streams.KafkaStreams:283)
   [2020-07-16 23:00:20,771] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:21,067] INFO [Consumer 
clientId=reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d-StreamThread-1-consumer,
 groupId=reset-with-ssl-integration-test-not-reset-during-runtime] Found no 
committed offset for partition inputTopic-0 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1349)
   [2020-07-16 23:00:21,093] INFO [Consumer 
clientId=reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d-StreamThread-1-consumer,
 groupId=reset-with-ssl-integration-test-not-reset-during-runtime] Resetting 
offset for partition inputTopic-0 to offset 0. 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:397)
   [2020-07-16 23:00:21,174] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:21,548] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:21,798] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:22,028] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:22,329] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:22,556] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:22,793] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   ```
   
   Was not able to dig deeper into what could be the reason for this. For now 
just left these three tests in the base class without `@Test` annotation and 
calling them as before in `ResetIntegrationTest` only.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] serjchebotarev commented on a change in pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters

2020-07-16 Thread GitBox


serjchebotarev commented on a change in pull request #9028:
URL: https://github.com/apache/kafka/pull/9028#discussion_r456067480



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -265,7 +265,7 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
 public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() 
throws Exception {

Review comment:
   Tried on this, partially it went good, but several tests fail under 
`ResetIntegrationWithSslTest`, namely: 
   - `shouldNotAllowToResetWhileStreamsIsRunning`
   - `shouldNotAllowToResetWhenInputTopicAbsent`
   - `shouldNotAllowToResetWhenIntermediateTopicAbsent`
   
   From output of `shouldNotAllowToResetWhileStreamsIsRunning` there are some 
log lines that mention SSL handshake failure:
   
   ```
   ...
   [2020-07-16 23:00:20,764] INFO stream-thread 
[reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d-StreamThread-1]
 State transition from PARTITIONS_ASSIGNED to RUNNING 
(org.apache.kafka.streams.processor.internals.StreamThread:220)
   [2020-07-16 23:00:20,766] INFO stream-client 
[reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d]
 State transition from REBALANCING to RUNNING 
(org.apache.kafka.streams.KafkaStreams:283)
   [2020-07-16 23:00:20,771] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:21,067] INFO [Consumer 
clientId=reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d-StreamThread-1-consumer,
 groupId=reset-with-ssl-integration-test-not-reset-during-runtime] Found no 
committed offset for partition inputTopic-0 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:1349)
   [2020-07-16 23:00:21,093] INFO [Consumer 
clientId=reset-with-ssl-integration-test-not-reset-during-runtime-6971acc7-e471-4973-a652-5a09b7dce10d-StreamThread-1-consumer,
 groupId=reset-with-ssl-integration-test-not-reset-during-runtime] Resetting 
offset for partition inputTopic-0 to offset 0. 
(org.apache.kafka.clients.consumer.internals.SubscriptionState:397)
   [2020-07-16 23:00:21,174] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:21,548] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:21,798] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:22,028] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:22,329] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:22,556] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   [2020-07-16 23:00:22,793] INFO [SocketServer brokerId=0] Failed 
authentication with /127.0.0.1 (SSL handshake failed) 
(org.apache.kafka.common.network.Selector:616)
   
   org.apache.kafka.streams.integration.ResetIntegrationWithSslTest > 
shouldNotAllowToResetWhileStreamsIsRunning SKIPPED
   
   > Task :streams:test FAILED
   :streams:test (Thread[Execution worker for ':',5,main]) completed. Took 
14.586 secs.
   
   FAILURE: Build failed with an exception.
   ```
   
   Was not able to dig deeper into what could be the reason for this. For now 
just left these three tests in the base class without `@Test` annotation and 
calling them as before in `ResetIntegrationTest` only.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] serjchebotarev commented on a change in pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters

2020-07-16 Thread GitBox


serjchebotarev commented on a change in pull request #9028:
URL: https://github.com/apache/kafka/pull/9028#discussion_r456068727



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -151,7 +151,7 @@ private void prepareConfigs() {
 streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
 streamsConfig.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100);
 streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
STREAMS_CONSUMER_TIMEOUT);

Review comment:
   thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-16 Thread GitBox


mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-659665576


   Updated the benchmarks with @lbradstreet's suggestions. Here are the results 
for 3 partitions, 10 topics. GC profiles included.
   
   On this branch: 
   ```
   Benchmark
(partitionCount)  (topicCount)  Mode  Cnt  ScoreError   
Units
   FetchRequestBenchmark.testFetchRequestForConsumer
   310  avgt   15   2110.741 ± 27.935   
ns/op
   FetchRequestBenchmark.testFetchRequestForReplica 
   310  avgt   15   2021.114 ±  7.816   
ns/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer   
   310  avgt   15   3452.799 ± 16.013   
ns/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica
   310  avgt   15   3691.157 ± 60.260   
ns/op
   
   GC Profile   
 (partitionCount)  (topicCount)  Mode  Cnt  ScoreError  
 Units
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate 
   310  avgt   15   4295.532 ± 56.061  
MB/sec
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm
   310  avgt   15   9984.000 ±  0.001   
 B/op
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space
   310  avgt   15   4292.525 ± 56.341  
MB/sec
   
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm  
310  avgt   15   9977.037 ± 28.311
B/op
   
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space   
310  avgt   15  0.187 ±  0.027  
MB/sec
   
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm
  310  avgt   15  0.435 ±  0.060B/op
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count  
   310  avgt   15   2335.000   
counts
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.time   
   310  avgt   15   1375.000
   ms
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate  
   310  avgt   15   4416.855 ± 16.429  
MB/sec
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm 
   310  avgt   15   9832.000 ±  0.001   
 B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space 
   310  avgt   15   4417.032 ± 24.858  
MB/sec
   
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm   
310  avgt   15   9832.358 ± 28.932
B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space 
   310  avgt   15  0.186 ±  0.015  
MB/sec
   
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm
   310  avgt   15  0.415 ±  0.033
B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.count   
   310  avgt   15   2280.000   
counts
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.time
   310  avgt   15   1376.000
   ms
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate
   310  avgt   15   3256.172 ± 15.524  
MB/sec
   
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm  
310  avgt   15  12384.000 ±  0.001
B/op
   
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space
  310  avgt   15   3255.019 ± 21.484  MB/sec
   
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm
 310  avgt   15  12379.587 ± 49.161B/op
   
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space
  310  avgt   15  0.122 ±  0.022  MB/sec
   
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm
 310  avgt   15  0.462 ±  0.084B/op
   Fetch

[GitHub] [kafka] mumrah edited a comment on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-16 Thread GitBox


mumrah edited a comment on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-659665576


   Updated the benchmarks with @lbradstreet's suggestions. Here are the results 
for 3 partitions, 10 topics. GC profiles included.
   
   On this branch: 
   ```
   Benchmark
(partitionCount)  (topicCount)  Mode  Cnt  ScoreError   
Units
   FetchRequestBenchmark.testFetchRequestForConsumer
   310  avgt   15   2110.741 ± 27.935   
ns/op
   FetchRequestBenchmark.testFetchRequestForReplica 
   310  avgt   15   2021.114 ±  7.816   
ns/op
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer   
   310  avgt   15   3452.799 ± 16.013   
ns/op
   FetchRequestBenchmark.testSerializeFetchRequestForReplica
   310  avgt   15   3691.157 ± 60.260   
ns/op
   
   GC Profile   
 (partitionCount)  (topicCount)  Mode  Cnt  ScoreError  
 Units
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate 
   310  avgt   15   4295.532 ± 56.061  
MB/sec
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm
   310  avgt   15   9984.000 ±  0.001   
 B/op
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space
   310  avgt   15   4292.525 ± 56.341  
MB/sec
   
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm  
310  avgt   15   9977.037 ± 28.311
B/op
   
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space   
310  avgt   15  0.187 ±  0.027  
MB/sec
   
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm
  310  avgt   15  0.435 ±  0.060B/op
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count  
   310  avgt   15   2335.000   
counts
   FetchRequestBenchmark.testFetchRequestForConsumer:·gc.time   
   310  avgt   15   1375.000
   ms
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate  
   310  avgt   15   4416.855 ± 16.429  
MB/sec
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm 
   310  avgt   15   9832.000 ±  0.001   
 B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space 
   310  avgt   15   4417.032 ± 24.858  
MB/sec
   
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm   
310  avgt   15   9832.358 ± 28.932
B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space 
   310  avgt   15  0.186 ±  0.015  
MB/sec
   
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm
   310  avgt   15  0.415 ±  0.033
B/op
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.count   
   310  avgt   15   2280.000   
counts
   FetchRequestBenchmark.testFetchRequestForReplica:·gc.time
   310  avgt   15   1376.000
   ms
   FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate
   310  avgt   15   3256.172 ± 15.524  
MB/sec
   
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm  
310  avgt   15  12384.000 ±  0.001
B/op
   
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space
  310  avgt   15   3255.019 ± 21.484  MB/sec
   
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm
 310  avgt   15  12379.587 ± 49.161B/op
   
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space
  310  avgt   15  0.122 ±  0.022  MB/sec
   
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm
 310  avgt   15  0.462 ±  0.084B/op
 

[GitHub] [kafka] ijuma commented on a change in pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers

2020-07-16 Thread GitBox


ijuma commented on a change in pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#discussion_r456074355



##
File path: clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
##
@@ -139,13 +139,15 @@
 InvalidFetchSizeException::new),
 LEADER_NOT_AVAILABLE(5, "There is no leader for this topic-partition as we 
are in the middle of a leadership election.",
 LeaderNotAvailableException::new),
-NOT_LEADER_FOR_PARTITION(6, "This server is not the leader for that 
topic-partition.",
-NotLeaderForPartitionException::new),
+NOT_LEADER_OR_FOLLOWER(6, "For requests intended only for the leader, this 
error indicates that the broker is not the current leader. " +
+"For requests intended for any replica, this error indicates that 
the broker is not a replica of the topic partition.",
+NotLeaderOrFollowerException::new),
 REQUEST_TIMED_OUT(7, "The request timed out.",
 TimeoutException::new),
 BROKER_NOT_AVAILABLE(8, "The broker is not available.",
 BrokerNotAvailableException::new),
-REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested 
topic-partition.",
+REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested 
topic-partition. This is used for backward compatibility for " +

Review comment:
   @hachikuji @rajinisivaram Since this is still used outside of 
produce/fetch, maybe the backwards compatibility message needs to be qualified?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers

2020-07-16 Thread GitBox


ijuma commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-659670683


   @rajinisivaram With regards to MetadataResponse, it's currently documented 
as:
   
   ```
   /**
* Possible topic-level error codes:
*  UnknownTopic (3)
*  LeaderNotAvailable (5)
*  InvalidTopic (17)
*  TopicAuthorizationFailed (29)
   
* Possible partition-level error codes:
*  LeaderNotAvailable (5)
*  ReplicaNotAvailable (9)
*/
   ```
   
   I don't think we should change it, but it does raise the question of whether 
we should make `ReplicaNotAvailable` retriable.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-16 Thread GitBox


lbradstreet commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-659673793


   > Updated the benchmarks with @lbradstreet's suggestions. Here are the 
results for 3 partitions, 10 topics. GC profiles included.
   > 
   > On this branch:
   > 
   > ```
   > Benchmark  
  (partitionCount)  (topicCount)  Mode  Cnt  ScoreError 
  Units
   > FetchRequestBenchmark.testFetchRequestForConsumer  
 310  avgt   15   2110.741 ± 27.935 
  ns/op
   > FetchRequestBenchmark.testFetchRequestForReplica   
 310  avgt   15   2021.114 ±  7.816 
  ns/op
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer 
 310  avgt   15   3452.799 ± 16.013 
  ns/op
   > FetchRequestBenchmark.testSerializeFetchRequestForReplica  
 310  avgt   15   3691.157 ± 60.260 
  ns/op
   > 
   > GC Profile 
   (partitionCount)  (topicCount)  Mode  Cnt  Score
Error   Units
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate   
 310  avgt   15   4295.532 ± 56.061 
 MB/sec
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.alloc.rate.norm  
 310  avgt   15   9984.000 ±  0.001 
   B/op
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space  
 310  avgt   15   4292.525 ± 56.341 
 MB/sec
   > 
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm  
310  avgt   15   9977.037 ± 28.311
B/op
   > 
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space   
310  avgt   15  0.187 ±  0.027  
MB/sec
   > 
FetchRequestBenchmark.testFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm
  310  avgt   15  0.435 ±  0.060B/op
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.count
 310  avgt   15   2335.000  
 counts
   > FetchRequestBenchmark.testFetchRequestForConsumer:·gc.time 
 310  avgt   15   1375.000  
 ms
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate
 310  avgt   15   4416.855 ± 16.429 
 MB/sec
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.alloc.rate.norm   
 310  avgt   15   9832.000 ±  0.001 
   B/op
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space   
 310  avgt   15   4417.032 ± 24.858 
 MB/sec
   > 
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Eden_Space.norm   
310  avgt   15   9832.358 ± 28.932
B/op
   > 
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space
310  avgt   15  0.186 ±  0.015  
MB/sec
   > 
FetchRequestBenchmark.testFetchRequestForReplica:·gc.churn.PS_Survivor_Space.norm
   310  avgt   15  0.415 ±  0.033
B/op
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.count 
 310  avgt   15   2280.000  
 counts
   > FetchRequestBenchmark.testFetchRequestForReplica:·gc.time  
 310  avgt   15   1376.000  
 ms
   > FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate  
 310  avgt   15   3256.172 ± 15.524 
 MB/sec
   > 
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.alloc.rate.norm  
310  avgt   15  12384.000 ±  0.001
B/op
   > 
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space
  310  avgt   15   3255.019 ± 21.484  MB/sec
   > 
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Eden_Space.norm
 310  avgt   15  12379.587 ± 49.161B/op
   > 
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space
  310  avgt   15  0.122 ±  0.022  MB/sec
   > 
FetchRequestBenchmark.testSerializeFetchRequestForConsumer:·gc.churn.PS_Survivor_Space.norm
   

[GitHub] [kafka] hachikuji commented on pull request #8979: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers

2020-07-16 Thread GitBox


hachikuji commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-659676133


   I think it would make sense to change `ReplicaNotAvailableException` to 
extend `InvalidMetadataException`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-16 Thread GitBox


hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456076927



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##
@@ -16,5 +16,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.protocol.ApiMessage;
+
 public interface AbstractRequestResponse {
+/**
+ * Return the auto-generated `Message` instance if this request/response 
relies on one for
+ * serialization/deserialization. If this class has not yet been updated 
to rely on the auto-generated protocol
+ * classes, return `null`.
+ * @return
+ */
+default ApiMessage data() {

Review comment:
   Is there an advantage to pulling this up? Seems like we still need to 
update a bunch more classes. Until we have all the protocols converted, it 
might be safer to find another approach. 

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -1249,26 +1249,26 @@ private CompletedFetch 
initializeCompletedFetch(CompletedFetch nextCompletedFetc
 }
 }
 
-if (partition.highWatermark >= 0) {
-log.trace("Updating high watermark for partition {} to 
{}", tp, partition.highWatermark);
-subscriptions.updateHighWatermark(tp, 
partition.highWatermark);
+if (partition.highWatermark() >= 0) {
+log.trace("Updating high watermark for partition {} to 
{}", tp, partition.highWatermark());
+subscriptions.updateHighWatermark(tp, 
partition.highWatermark());
 }
 
-if (partition.logStartOffset >= 0) {
-log.trace("Updating log start offset for partition {} to 
{}", tp, partition.logStartOffset);
-subscriptions.updateLogStartOffset(tp, 
partition.logStartOffset);
+if (partition.logStartOffset() >= 0) {
+log.trace("Updating log start offset for partition {} to 
{}", tp, partition.logStartOffset());
+subscriptions.updateLogStartOffset(tp, 
partition.logStartOffset());
 }
 
-if (partition.lastStableOffset >= 0) {
-log.trace("Updating last stable offset for partition {} to 
{}", tp, partition.lastStableOffset);
-subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset);
+if (partition.lastStableOffset() >= 0) {
+log.trace("Updating last stable offset for partition {} to 
{}", tp, partition.lastStableOffset());
+subscriptions.updateLastStableOffset(tp, 
partition.lastStableOffset());
 }
 
-if (partition.preferredReadReplica.isPresent()) {
-
subscriptions.updatePreferredReadReplica(completedFetch.partition, 
partition.preferredReadReplica.get(), () -> {
+if (partition.preferredReadReplica().isPresent()) {

Review comment:
   nit: could probably change this to use `ifPresent`

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -492,74 +327,51 @@ public int maxBytes() {
 }
 
 public boolean isFromFollower() {
-return replicaId >= 0;
+return replicaId() >= 0;
 }
 
 public IsolationLevel isolationLevel() {
-return isolationLevel;
+return IsolationLevel.forId(data.isolationLevel());
 }
 
 public FetchMetadata metadata() {
 return metadata;
 }
 
 public String rackId() {
-return rackId;
+return data.rackId();
 }
 
 public static FetchRequest parse(ByteBuffer buffer, short version) {
-return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), 
version);
+ByteBufferAccessor accessor = new ByteBufferAccessor(buffer);
+FetchRequestData message = new FetchRequestData();
+message.read(accessor, version);
+return new FetchRequest(message, version);
+}
+
+@Override
+public ByteBuffer serialize(RequestHeader header) {

Review comment:
   Are we overriding this so that we save the conversion to `Struct`? As 
far as I can tell, there's nothing specific to `FetchRequest` below. I wonder 
if we can move this implementation to `AbstractRequest.serialize` so that we 
save the conversion to Struct for all APIs that have been converted?

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
##
@@ -492,74 +327,51 @@ public int maxBytes() {
 }
 
 public boolean isFromFollower() {
-return replicaId >= 0;
+return replicaId() >= 0;
 }
 
 public IsolationLevel isolationLevel() {
-return isolationLevel;
+return IsolationLevel.forId(data.isolationLevel());
 }
 
 pu

[GitHub] [kafka] ijuma commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-16 Thread GitBox


ijuma commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456091241



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##
@@ -16,5 +16,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.protocol.ApiMessage;
+
 public interface AbstractRequestResponse {
+/**
+ * Return the auto-generated `Message` instance if this request/response 
relies on one for
+ * serialization/deserialization. If this class has not yet been updated 
to rely on the auto-generated protocol
+ * classes, return `null`.
+ * @return
+ */
+default ApiMessage data() {

Review comment:
   I have a PR that does need. I really need to get that over the line.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-16 Thread GitBox


hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r456092037



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##
@@ -16,5 +16,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.protocol.ApiMessage;
+
 public interface AbstractRequestResponse {
+/**
+ * Return the auto-generated `Message` instance if this request/response 
relies on one for
+ * serialization/deserialization. If this class has not yet been updated 
to rely on the auto-generated protocol
+ * classes, return `null`.
+ * @return
+ */
+default ApiMessage data() {

Review comment:
   Perhaps instead we could add this to a mixin type. Then if we find cases 
where getting accessing to the `ApiMessage` generally would be useful, we could 
just use `instanceof` checks. These would ultimately go away after the 
conversions are finished.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters

2020-07-16 Thread GitBox


ableegoldman commented on a change in pull request #9028:
URL: https://github.com/apache/kafka/pull/9028#discussion_r456101096



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -265,7 +265,7 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
 public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() 
throws Exception {

Review comment:
   Thanks for looking into this! It definitely doesn't sound good that some 
of these tests don't pass with SSL. Maybe there's just some additional setup 
that's needed for these tests? 🤔
   
   Anyways, we don't need to solve all that in this PR. We can revisit the 
issue once this is merged





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters

2020-07-16 Thread GitBox


vvcephei commented on pull request #9028:
URL: https://github.com/apache/kafka/pull/9028#issuecomment-659701606


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters

2020-07-16 Thread GitBox


vvcephei commented on pull request #9028:
URL: https://github.com/apache/kafka/pull/9028#issuecomment-659701794


   Ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino opened a new pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-07-16 Thread GitBox


rondagostino opened a new pull request #9032:
URL: https://github.com/apache/kafka/pull/9032


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10281) KIP-640: Add log compression analysis tool

2020-07-16 Thread Chris Beard (Jira)
Chris Beard created KAFKA-10281:
---

 Summary: KIP-640: Add log compression analysis tool
 Key: KAFKA-10281
 URL: https://issues.apache.org/jira/browse/KAFKA-10281
 Project: Kafka
  Issue Type: Improvement
  Components: tools
Reporter: Chris Beard
Assignee: Chris Beard


Link to KIP: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-640%3A+Add+log+compression+analysis+tool]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #9024: [DO NOT MERGE (yet)] MINOR: bump Streams integration test log level to DEBUG

2020-07-16 Thread GitBox


ableegoldman commented on pull request #9024:
URL: https://github.com/apache/kafka/pull/9024#issuecomment-659747346


   Well it looks like we still get too many logs from the brokers even at INFO 
level. I think we should just demote all broker logs to WARN and just bump it 
back up to INFO if really necessary
   
   On the other hand, seems like turning DEBUG logs on for Streams caused over 
100 tests to fail...not sure what that's about



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9841) Connector and Task duplicated when a worker join with old generation assignment

2020-07-16 Thread Viktor Utkin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159570#comment-17159570
 ] 

Viktor Utkin commented on KAFKA-9841:
-

Hi guys, will fix be back-ported to older versions of kafka?

> Connector and Task duplicated when a worker join with old generation 
> assignment
> ---
>
> Key: KAFKA-9841
> URL: https://issues.apache.org/jira/browse/KAFKA-9841
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.3.1, 2.4.1
>Reporter: Yu Wang
>Assignee: Yu Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> When using IncrementalCooperativeAssignor.class to assign connectors and 
> tasks.
> Suppose there is a worker 'W' got some connection issue with the coordinator.
> During the connection issue, the connectors/tasks on 'W' are assigned to the 
> others worker
> When the connection issue disappear, 'W' will join the group with an old 
> generation assignment. Then the group leader will get duplicated 
> connectors/tasks in the metadata sent by the workers. But the duplicated 
> connectors/tasks will not be revoked.
>  
> Generation 3:
> Worker1:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker2:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker3:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 
> ributed.DistributedHerder)
> Worker4:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[misc], 
> taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker5:
> [2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
> with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
>  
> Generation 4:
> Worker1:
> [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
> leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], 
> taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker2:
> [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
> leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], 
> taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.r

[GitHub] [kafka] vutkin commented on pull request #8453: KAFKA-9841: Revoke duplicate connectors and tasks when zombie workers return with an outdated assignment

2020-07-16 Thread GitBox


vutkin commented on pull request #8453:
URL: https://github.com/apache/kafka/pull/8453#issuecomment-659747738


   Hi guys, will fix be back-ported to older versions of kafka?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9027: KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing

2020-07-16 Thread GitBox


ableegoldman commented on a change in pull request #9027:
URL: https://github.com/apache/kafka/pull/9027#discussion_r456147419



##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -181,6 +193,7 @@ bootstrap.serverscache.max.bytes.buffering
 Medium
 Maximum number of memory bytes to be used for 
record caches across all threads.
+Maximum number of memory bytes to be used for 
record caches across all threads.

Review comment:
   How did that happen 🤔 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10172) [Kafka connect] connectors, tasks metrics doubled

2020-07-16 Thread Viktor Utkin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10172?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Utkin updated KAFKA-10172:
-
Affects Version/s: 2.4.0

> [Kafka connect] connectors, tasks metrics doubled
> -
>
> Key: KAFKA-10172
> URL: https://issues.apache.org/jira/browse/KAFKA-10172
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.3.1
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: image-2020-06-16-12-42-47-753.png
>
>
> After re-balance of connect cluster (4 nodes in total) we noticed that metrics
>  * kafka_connect_connect_worker_metrics_task_count
>  * kafka_connect_connect_worker_metrics_connector_count
> are doubled (2x,3x,4x,etc.), so connect show wrong values.
>  
> !image-2020-06-16-12-42-47-753.png|width=481,height=240!
>  
> But when we request number of connector via RESP API it shows only 36:
> {code:java}
> › curl -s 127.0.0.1:8083/connectors | jq '.[]' | sort | wc -l
> 36{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10172) [Kafka connect] connectors, tasks metrics doubled

2020-07-16 Thread Viktor Utkin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159573#comment-17159573
 ] 

Viktor Utkin commented on KAFKA-10172:
--

Looks like there is a bugfix: https://issues.apache.org/jira/browse/KAFKA-9841

> [Kafka connect] connectors, tasks metrics doubled
> -
>
> Key: KAFKA-10172
> URL: https://issues.apache.org/jira/browse/KAFKA-10172
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.3.1
>Reporter: Viktor Utkin
>Priority: Critical
> Attachments: image-2020-06-16-12-42-47-753.png
>
>
> After re-balance of connect cluster (4 nodes in total) we noticed that metrics
>  * kafka_connect_connect_worker_metrics_task_count
>  * kafka_connect_connect_worker_metrics_connector_count
> are doubled (2x,3x,4x,etc.), so connect show wrong values.
>  
> !image-2020-06-16-12-42-47-753.png|width=481,height=240!
>  
> But when we request number of connector via RESP API it shows only 36:
> {code:java}
> › curl -s 127.0.0.1:8083/connectors | jq '.[]' | sort | wc -l
> 36{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #9027: KAFKA-9161: add docs for KIP-441 and KIP-613 and other configs that need fixing

2020-07-16 Thread GitBox


ableegoldman commented on a change in pull request #9027:
URL: https://github.com/apache/kafka/pull/9027#discussion_r456148936



##
File path: docs/streams/developer-guide/running-app.html
##
@@ -110,6 +110,18 @@ Removing capacity 
from your applicationIf a local state store exists, the changelog is 
replayed from the previously checkpointed offset. The changes are applied and 
the state is restored to the most recent snapshot. This method takes less time 
because it is applying a smaller portion of the changelog.
   
   For more information, see Standby Replicas.
+  
+  As of version 2.6, Streams will now do most of a task's 
restoration in the background through warmup replicas. These will be assigned 
to instances that need to restore a lot of state for a task.
+  A stateful active task will only be assigned to an 
instance once it's state is within the configured

Review comment:
   Thanks





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-16 Thread GitBox


abbccdda commented on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-659752993


   retest this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10282) Log metrics are removed if a log is deleted and re-created quickly enough

2020-07-16 Thread Bob Barrett (Jira)
Bob Barrett created KAFKA-10282:
---

 Summary: Log metrics are removed if a log is deleted and 
re-created quickly enough
 Key: KAFKA-10282
 URL: https://issues.apache.org/jira/browse/KAFKA-10282
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 2.6.0
Reporter: Bob Barrett
Assignee: Bob Barrett
 Fix For: 2.7.0, 2.6.1


When we delete a local log, we mark it for asynchronous deletion by renaming it 
with a `.delete` extension, and then wait `LogConfig.FileDeleteDelayMs` 
milliseconds before actually deleting the files on disk. We don't remove the 
Log metrics from the metrics registry until the actual deletion takes place. If 
we recreate a log of the same topic partition (for example, if we reassign the 
partition away from the broker and quickly reassign it back), the metrics are 
registered when the new log is created, but then unregistered when the async 
deletion of the original log takes place. This leaves us with a partition that 
is not reporting any Log metrics (size, offsets, number of segments, etc).

To fix this, the LogManager should check when creating new logs to see if a log 
for the same topic partition is marked for deletion, and if so, signal to that 
log not to unregister its metrics when it is deleted.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10283) Consolidate client-level and consumer-level assignment within ClientState

2020-07-16 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-10283:
-

 Summary: Consolidate client-level and consumer-level assignment 
within ClientState
 Key: KAFKA-10283
 URL: https://issues.apache.org/jira/browse/KAFKA-10283
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


In StreamsPartitionAssignor, we do a two-level assignment, one on the 
client-level, and then after the assignment is done we further decide within 
the client how to distributed among consumers if there are more.

The {{ClientState}} class is used for book-keeping the assigned tasks, however 
it is only used for the first level, while for the second level it is done 
outside of the class and we only keep track of the results in a few maps for 
logging purposes. This leaves us with a bunch of hierarchical maps, e.g. some 
on the client level and some on the consumer level.

We would like to consolidate some of these maps into a single data structure 
for better keeping track of the assignment information, and also for less bug 
vulnerability causing the assignment information to be inconsistent. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #9024: [DO NOT MERGE (yet)] MINOR: bump Streams integration test log level to DEBUG

2020-07-16 Thread GitBox


guozhangwang commented on pull request #9024:
URL: https://github.com/apache/kafka/pull/9024#issuecomment-659774428


   @ableegoldman probably because the too much logs cause some of the 
operations cannot be completed within the timeout; let's demote brokers to WARN 
and I will trigger jenkins again.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8997: MINOR: Improve log4j for per-consumer assignment

2020-07-16 Thread GitBox


guozhangwang commented on pull request #8997:
URL: https://github.com/apache/kafka/pull/8997#issuecomment-659775048


   @abbccdda updated per your comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-07-16 Thread Boyang Chen (Jira)
Boyang Chen created KAFKA-10284:
---

 Summary: Group membership update due to static member rejoin 
should be persisted
 Key: KAFKA-10284
 URL: https://issues.apache.org/jira/browse/KAFKA-10284
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.5.0, 2.4.0, 2.3.0, 2.6.0
Reporter: Boyang Chen
Assignee: Boyang Chen
 Fix For: 2.6.1


For known static members rejoin, we would update its corresponding member.id 
without triggering a new rebalance. This serves the purpose for avoiding 
unnecessary rebalance for static membership, as well as fencing purpose if some 
still uses the old member.id. 

The bug is that we don't actually persist the membership update, so if no 
upcoming rebalance gets triggered, this new member.id information will get lost 
during group coordinator immigration, thus bringing up the zombie member 
identity.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-16 Thread GitBox


abbccdda commented on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-659793027


   retest this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10205) NullPointerException in StreamTask

2020-07-16 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159615#comment-17159615
 ] 

Matthias J. Sax commented on KAFKA-10205:
-

[~ableegoldman] Good idea. Not everybody reads the docs in detail; we should 
update the docs in addition anyway, too.

[~vvcephei] This ticket is assigned to you atm. Do you want to do the PR? Or 
should we re-assign?

> NullPointerException in StreamTask
> --
>
> Key: KAFKA-10205
> URL: https://issues.apache.org/jira/browse/KAFKA-10205
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: Brian Forkan
>Assignee: John Roesler
>Priority: Minor
>
> In our Kafka Streams application we have been experiencing a 
> NullPointerException when deploying a new version of our application. This 
> does not happen during a normal rolling restart.
> The exception is:
> {code:java}
> Error caught during partition assignment, will abort the current process and 
> re-throw at the end of 
> rebalance","stack_trace":"java.lang.NullPointerException: nullError caught 
> during partition assignment, will abort the current process and re-throw at 
> the end of rebalance","stack_trace":"java.lang.NullPointerException: null at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:186)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:115)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:352)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:310)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:295)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.addNewActiveTasks(TaskManager.java:160)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:120)
>  at 
> org.apache.kafka.streams.processor.internals.StreamsRebalanceListener.onPartitionsAssigned(StreamsRebalanceListener.java:77)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsAssigned(ConsumerCoordinator.java:278)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:419)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:439)
>  at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
>  at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:490)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1275)
>  at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243) 
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1173) 
> at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:86) at 
> brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:80) at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:853)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
> {code}
> And the relevant lines of code - 
> [https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L184-L196]
> I suspect "topology.source(partition.topic());" is returning null.
> Has anyone experienced this issue before? I suspect there is a problem with 
> our topology but I can't replicate this on my machine so I can't tell.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8296) Kafka Streams branch method raises type warnings

2020-07-16 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8296?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8296.

Resolution: Duplicate

Closing this ticket as duplicate.

> Kafka Streams branch method raises type warnings
> 
>
> Key: KAFKA-8296
> URL: https://issues.apache.org/jira/browse/KAFKA-8296
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Michael Drogalis
>Assignee: Ivan Ponomarev
>Priority: Minor
>
> Because the branch method in the DSL takes vargargs, using it as follows 
> raises an unchecked type warning:
> {code:java}
> KStream[] branches = builder. User>stream(inputTopic)
> .branch((key, user) -> "united 
> states".equals(user.getCountry()),
> (key, user) -> "germany".equals(user.getCountry()),
> (key, user) -> "mexico".equals(user.getCountry()),
> (key, user) -> true);
> {code}
> The compiler warns with:
> {code:java}
> Warning:(39, 24) java: unchecked generic array creation for varargs parameter 
> of type org.apache.kafka.streams.kstream.Predicate super io.confluent.developer.avro.User>[]
> {code}
> This is unfortunate because of the way Java's type system + generics work. We 
> could possibly fix this by adding the @SafeVarargs annotation to the branch 
> method signatures.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-16 Thread GitBox


abbccdda commented on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-659793614


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10284) Group membership update due to static member rejoin should be persisted

2020-07-16 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17159616#comment-17159616
 ] 

Boyang Chen commented on KAFKA-10284:
-

[https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L1042]

> Group membership update due to static member rejoin should be persisted
> ---
>
> Key: KAFKA-10284
> URL: https://issues.apache.org/jira/browse/KAFKA-10284
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.3.0, 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.6.1
>
>
> For known static members rejoin, we would update its corresponding member.id 
> without triggering a new rebalance. This serves the purpose for avoiding 
> unnecessary rebalance for static membership, as well as fencing purpose if 
> some still uses the old member.id. 
> The bug is that we don't actually persist the membership update, so if no 
> upcoming rebalance gets triggered, this new member.id information will get 
> lost during group coordinator immigration, thus bringing up the zombie member 
> identity.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >