[GitHub] [kafka] ning2008wisc commented on pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

2020-05-22 Thread GitBox


ning2008wisc commented on pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#issuecomment-632527159


   @ryannedolan @mimaison I added the Integration tests for testing this 
automated consumer offset sync in MM 2.0. When available, I am appreciated for 
your first pass of review. Thanks



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs

2020-05-22 Thread GitBox


tombentley commented on a change in pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#discussion_r429081838



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
##
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import java.util.Map;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
+
+/**
+ * Decorator for a {@link Transformation} which applies the delegate only when 
a
+ * {@link Predicate} is true (or false, according to {@code negate}).
+ * @param 
+ */
+class PredicatedTransformation> implements 
Transformation {
+
+/*test*/ final Predicate predicate;
+/*test*/ final Transformation delegate;
+/*test*/ final boolean negate;
+
+PredicatedTransformation(Predicate predicate, boolean negate, 
Transformation delegate) {
+this.predicate = predicate;
+this.negate = negate;
+this.delegate = delegate;
+}
+
+@Override
+public void configure(Map configs) {
+
+}
+
+@Override
+public R apply(R record) {
+if (negate ^ predicate.test(record)) {
+return delegate.apply(record);
+}
+return record;
+}
+
+@Override
+public ConfigDef config() {
+return null;

Review comment:
   This is related to the issue discussed about `configure()`.
   
   We could return an empty `ConfigDef` here, but that would be a lie which 
could ultimately lead some other error if someone tried to use it with 
`configure()`.
   
   We can't invent a `ConfigDef` schema for this because the 
`PredicatedTransformer` would need to know about the `Transformer` is was going 
to be wrapping, but it can't know that before it's been configured with at 
least the `Transformer`'s `ConfigDef` and it can't be configured before 
`config()` has been called. So we have a chicken and egg problem. Something 
(`ConnectorConfig`) must have some _a priori_ knowledge of either 
`PredicatedTransformer`'s `ConfigDef`, or know how to configure it without 
needing to call `config()` at all.
   
   Since `PredicatedTransformer` is a purely internal class which will never be 
directly exposed to Connect users, we're not obliged to stick to the contract 
of `config()` and `configure()`. i.e. So  both `PredicatedTransformer.config` 
and `PredicatedTransformer.configure` can throw when called, since we know no 
one else can call them and we know `ConnectorConfig` never will.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429082038



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -261,6 +261,43 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
 Assert.assertEquals(1, exitCode);
 }
 
+public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() 
throws Exception {
+appID = testId + "-with-force-option";
+streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
STREAMS_CONSUMER_TIMEOUT * 100);

Review comment:
   Without the `"" +` to convert the value to String, we will get exception 
like: it is because `STREAMS_CONSUMER_TIMEOUT = 2000L`, `""+` is widely used in 
this test, just follow it here without any change to not enlarge the scope of 
this PR, I can help to create a Jira to enhance it if we think this workaround 
is not quite intuitive~
   
   ```
   org.apache.kafka.common.config.ConfigException: Invalid value 20 for 
configuration session.timeout.ms: Expected value to be a 32-bit integer, but it 
was a java.lang.Long
   at 
org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:672)
   at 
org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474)
   at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467)
   at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108)
   at 
org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129)
   at 
org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:606)
   at 
org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630)
   at 
org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56)
   at 
org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:313)
   at 
org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:766)
   at 
org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:652)
   at 
org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:562)
   at 
org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(AbstractResetIntegrationTest.java:270)
   at 
org.apache.kafka.streams.integration.ResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(ResetIntegrationTest.java:77)
   
   ``` 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429082487



##
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##
@@ -1155,12 +1192,15 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
   assertTrue(deleteResult.deletedGroups().containsKey(testGroupId))
   assertNull(deleteResult.deletedGroups().get(testGroupId).get())
-} finally {
-  consumerThread.interrupt()
-  consumerThread.join()
-}
   } finally {
-Utils.closeQuietly(consumer, "consumer")
+consumerThreads.foreach {
+  case consumerThread =>
+consumerThread.interrupt()
+consumerThread.join()
+}
+  }
+  }finally {

Review comment:
   Thanks, but I wonder what does the **template** refer to here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429083674



##
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##
@@ -1017,47 +1017,70 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   assertTrue(0 == list1.errors().get().size())
   assertTrue(0 == list1.valid().get().size())
   val testTopicName = "test_topic"
+  val testTopicName1 = testTopicName + "1"
+  val testTopicName2 = testTopicName + "2"
   val testNumPartitions = 2
-  client.createTopics(Collections.singleton(
-new NewTopic(testTopicName, testNumPartitions, 1.toShort))).all().get()
-  waitForTopics(client, List(testTopicName), List())
+
+  client.createTopics(util.Arrays.asList(new NewTopic(testTopicName, 
testNumPartitions, 1.toShort),
+new NewTopic(testTopicName1, testNumPartitions, 1.toShort),
+new NewTopic(testTopicName2, testNumPartitions, 1.toShort)
+  )).all().get()
+  waitForTopics(client, List(testTopicName, testTopicName1, 
testTopicName2), List())
 
   val producer = createProducer()
   try {
 producer.send(new ProducerRecord(testTopicName, 0, null, null)).get()
   } finally {
 Utils.closeQuietly(producer, "producer")
   }
+
+  val EMPTY_GROUP_INSTANCE_ID = ""
   val testGroupId = "test_group_id"
   val testClientId = "test_client_id"
   val testInstanceId = "test_instance_id"
+  val testInstanceId1 = testInstanceId + "1"

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429083564



##
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##
@@ -1138,7 +1165,7 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
   val validMemberFuture = removeMembersResult.memberResult(new 
MemberToRemove(testInstanceId))
   assertNull(validMemberFuture.get())
 
-  // The group should contain no member now.
+  // The group's active members number should decrease by 1

Review comment:
   Removed, but curious about the reason :)

##
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##
@@ -1147,6 +1174,16 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
   assertEquals(testGroupId, testGroupDescription.groupId)
   assertFalse(testGroupDescription.isSimpleConsumerGroup)
+  assertEquals(consumerSet.size -1, 
testGroupDescription.members().size())

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429083960



##
File path: 
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
##
@@ -1075,13 +1098,17 @@ class PlaintextAdminIntegrationTest extends 
BaseAdminIntegrationTest {
 
   assertEquals(testGroupId, testGroupDescription.groupId())
   assertFalse(testGroupDescription.isSimpleConsumerGroup)
-  assertEquals(1, testGroupDescription.members().size())
+  assertEquals(groupInstanceSet.size, 
testGroupDescription.members().size())
   val member = testGroupDescription.members().iterator().next()
   assertEquals(testClientId, member.clientId())
-  val topicPartitions = member.assignment().topicPartitions()
-  assertEquals(testNumPartitions, topicPartitions.size())
-  assertEquals(testNumPartitions, topicPartitions.asScala.
-count(tp => tp.topic().equals(testTopicName)))
+  val members = testGroupDescription.members()
+  assertEquals(testClientId, members.asScala.head.clientId())

Review comment:
   Thanks, all members' clientId are checked now





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429084559



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -2411,6 +2411,50 @@ public void testRemoveMembersFromGroup() throws 
Exception {
 assertNull(noErrorResult.all().get());
 assertNull(noErrorResult.memberResult(memberOne).get());
 assertNull(noErrorResult.memberResult(memberTwo).get());
+
+// Return with success for "removeAll" scenario

Review comment:
   Good catch! Added the test for partial failure 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429084219



##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -186,9 +190,15 @@ private void validateNoActiveConsumers(final String 
groupId,
 final List members =
 new 
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
 if (!members.isEmpty()) {
-throw new IllegalStateException("Consumer group '" + groupId + "' 
is still active "
-+ "and has following members: " + members + ". "
-+ "Make sure to stop all running application instances 
before running the reset tool.");
+if (options.has(forceOption)) {
+System.out.println("Force deleting all active members in the 
group: " + groupId);
+adminClient.removeMembersFromConsumerGroup(groupId, new 
RemoveMembersFromConsumerGroupOptions()).all();

Review comment:
   Agreed, fixed 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429084786



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##
@@ -37,7 +37,15 @@ public 
RemoveMembersFromConsumerGroupOptions(Collection members)
 this.members = new HashSet<>(members);
 }
 
+public RemoveMembersFromConsumerGroupOptions() {
+this.members = new HashSet<>();

Review comment:
   Make sense, fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429087051



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 KafkaFutureImpl> future = new 
KafkaFutureImpl<>();
 
 ConsumerGroupOperationContext, 
RemoveMembersFromConsumerGroupOptions> context =
-new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
+new ConsumerGroupOperationContext<>(groupId, options, 
deadline, future);
 
+List members;
+if (options.removeAll()) {
+members = getMembersFromGroup(groupId);
+} else {
+members = options.members().stream().map(
+
MemberToRemove::toMemberIdentity).collect(Collectors.toList());
+}
 Call findCoordinatorCall = getFindCoordinatorCall(context,
-() -> getRemoveMembersFromGroupCall(context));
+() -> getRemoveMembersFromGroupCall(context, members));
 runnable.call(findCoordinatorCall, startFindCoordinatorMs);
 
 return new RemoveMembersFromConsumerGroupResult(future, 
options.members());
 }
 
-private Call 
getRemoveMembersFromGroupCall(ConsumerGroupOperationContext, RemoveMembersFromConsumerGroupOptions> context) {
+private Call 
getRemoveMembersFromGroupCall(ConsumerGroupOperationContext,
+RemoveMembersFromConsumerGroupOptions> context, 
List allMembers) {

Review comment:
   Yeah, fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429086933



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##
@@ -46,26 +46,42 @@
  * If not, the first member error shall be returned.
  */
 public KafkaFuture all() {
-final KafkaFutureImpl result = new KafkaFutureImpl<>();
-this.future.whenComplete((memberErrors, throwable) -> {
-if (throwable != null) {
-result.completeExceptionally(throwable);
-} else {
-for (MemberToRemove memberToRemove : memberInfos) {
-if (maybeCompleteExceptionally(memberErrors, 
memberToRemove.toMemberIdentity(), result)) {
-return;
+if (removeAll()) {

Review comment:
   Yeah, just as you surmised, but you are right, we should scan the 
removal results as well. Slightly updated, followed the convention of 
`non-removeAll` scenario, just return with the first exception





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429087492



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3612,6 +3611,26 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
 }
 
+private List getMembersFromGroup(String groupId) {
+Collection members;
+try {
+members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+} catch (Throwable ex) {

Review comment:
   Make sense, fixed~





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429087282



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3612,6 +3611,26 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
 }
 
+private List getMembersFromGroup(String groupId) {
+Collection members;
+try {
+members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+} catch (Throwable ex) {
+throw new KafkaException("Encounter exception when trying to get 
members from group: " + groupId, ex);
+}
+
+List memberToRemove = new ArrayList<>();
+for (final MemberDescription member : members) {
+if (member.groupInstanceId().isPresent()) {
+memberToRemove.add(new 
MemberIdentity().setGroupInstanceId(member.groupInstanceId().get())
+);

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429087133



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 KafkaFutureImpl> future = new 
KafkaFutureImpl<>();
 
 ConsumerGroupOperationContext, 
RemoveMembersFromConsumerGroupOptions> context =
-new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
+new ConsumerGroupOperationContext<>(groupId, options, 
deadline, future);
 
+List members;
+if (options.removeAll()) {
+members = getMembersFromGroup(groupId);
+} else {
+members = options.members().stream().map(
+
MemberToRemove::toMemberIdentity).collect(Collectors.toList());
+}
 Call findCoordinatorCall = getFindCoordinatorCall(context,
-() -> getRemoveMembersFromGroupCall(context));
+() -> getRemoveMembersFromGroupCall(context, members));
 runnable.call(findCoordinatorCall, startFindCoordinatorMs);
 
 return new RemoveMembersFromConsumerGroupResult(future, 
options.members());
 }
 
-private Call 
getRemoveMembersFromGroupCall(ConsumerGroupOperationContext, RemoveMembersFromConsumerGroupOptions> context) {
+private Call 
getRemoveMembersFromGroupCall(ConsumerGroupOperationContext,
+RemoveMembersFromConsumerGroupOptions> context, 
List allMembers) {
 return new Call("leaveGroup",
 context.deadline(),
 new ConstantNodeIdProvider(context.node().get().id())) 
{
 @Override
 LeaveGroupRequest.Builder createRequest(int timeoutMs) {
-return new LeaveGroupRequest.Builder(context.groupId(),
- 
context.options().members().stream().map(
- 
MemberToRemove::toMemberIdentity).collect(Collectors.toList()));
+return new LeaveGroupRequest.Builder(context.groupId(),

Review comment:
   Updated





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429087216



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 KafkaFutureImpl> future = new 
KafkaFutureImpl<>();
 
 ConsumerGroupOperationContext, 
RemoveMembersFromConsumerGroupOptions> context =
-new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
+new ConsumerGroupOperationContext<>(groupId, options, 
deadline, future);

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie opened a new pull request #8711: KAFKA-9904:Use ThreadLocalConcurrent to Replace Random

2020-05-22 Thread GitBox


jiameixie opened a new pull request #8711:
URL: https://github.com/apache/kafka/pull/8711


   When applicable, use of ThreadLocalRandom rather
   than shared Random objects in concurrent programs
   will typically encounter much less overhead and
   contention.
   
   Change-Id: Idf56adc8cbb4c611e327e639a49d90827a23d947
   Signed-off-by: Jiamei Xie 
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs

2020-05-22 Thread GitBox


tombentley commented on pull request #8699:
URL: https://github.com/apache/kafka/pull/8699#issuecomment-632551475


   @C0urante thanks for the review, some excellent points there! I think an 
integration test is a great idea, which I'll work on next. I've addressed all 
your other comments. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8711: KAFKA-9904:Use ThreadLocalConcurrent to Replace Random

2020-05-22 Thread GitBox


chia7712 commented on pull request #8711:
URL: https://github.com/apache/kafka/pull/8711#issuecomment-632552331


   Is this PR different from #8531?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException

2020-05-22 Thread GitBox


chia7712 commented on a change in pull request #8705:
URL: https://github.com/apache/kafka/pull/8705#discussion_r429100323



##
File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala
##
@@ -1742,6 +1746,12 @@ class SocketServerTest {
selector = Some(testableSelector)
testableSelector
 }
+
+override private[network] def processException(errorMessage: String, 
throwable: Throwable, isUncaught: Boolean): Unit = {

Review comment:
   ```isUncaught``` is used by testing only so it is a bit awkward to 
production code. Could you check the ```errorMessage``` instead of adding new 
argument? for example:
   
   ```scala
   if (errorMessage == "Processor got uncaught exception.") uncaughtExceptions 
+= 1
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie commented on pull request #8711: KAFKA-9904:Use ThreadLocalConcurrent to Replace Random

2020-05-22 Thread GitBox


jiameixie commented on pull request #8711:
URL: https://github.com/apache/kafka/pull/8711#issuecomment-632558263


   > Is this PR different from #8531?
   
   @chia7712 It's the same issue. #8531 has wrong spell for KAFKA.  And I 
didn't see any PR about this issue in 
https://issues.apache.org/jira/browse/KAFKA-9904.  So I thought there was no 
PR.  I'll close this PR. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie closed pull request #8711: KAFKA-9904:Use ThreadLocalConcurrent to Replace Random

2020-05-22 Thread GitBox


jiameixie closed pull request #8711:
URL: https://github.com/apache/kafka/pull/8711


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10006) Streams should not attempt to create internal topics that may exist

2020-05-22 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen reassigned KAFKA-10006:
-

Assignee: Luke Chen

> Streams should not attempt to create internal topics that may exist
> ---
>
> Key: KAFKA-10006
> URL: https://issues.apache.org/jira/browse/KAFKA-10006
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sophie Blee-Goldman
>Assignee: Luke Chen
>Priority: Major
>
> During assignment, Streams will attempt to validate all internal topics and 
> their number of partitions, and create them if necessary. However we have 
> seen that the InternalTopicManager will occasionally try to recreate internal 
> topics that already exist when the broker is unavailable, and the 
> describeTopics request fails. 
> This is because we catch both UnknownTopicOrPartitionException AND 
> LeaderNotAvailableException and treat them the same, ie we assume the topic 
> does not exist. We shouldn't try to create topics until we've validated that 
> they don't actually exist. If we can't connect to the brokers to check 
> whether the topic exists, there's no reason to believe we would be able to 
> create the topic anyway.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state

2020-05-22 Thread GitBox


mimaison commented on a change in pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#discussion_r429131002



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
##
@@ -26,4 +31,34 @@
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsOptions extends 
AbstractOptions {
+
+private Optional> states = Optional.empty();
+
+/**
+ * Only groups in these states will be returned by listConsumerGroups()
+ * If not set, all groups are returned without their states
+ * throw IllegalArgumentException if states is empty
+ */
+public ListConsumerGroupsOptions inStates(Set states) {
+if (states == null || states.isEmpty()) {
+throw new IllegalArgumentException("states should not be null or 
empty");
+}
+this.states = Optional.of(states);
+return this;
+}
+
+/**
+ * All groups with their states will be returned by listConsumerGroups()
+ */
+public ListConsumerGroupsOptions inAnyState() {
+this.states = Optional.of(EnumSet.allOf(ConsumerGroupState.class));

Review comment:
   That's a good point so I agree, it makes sense to return all states when 
`null` (or an empty list) is used.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state

2020-05-22 Thread GitBox


mimaison commented on a change in pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#discussion_r429133529



##
File path: clients/src/main/resources/common/message/ListGroupsRequest.json
##
@@ -20,8 +20,14 @@
   // Version 1 and 2 are the same as version 0.
   //
   // Version 3 is the first flexible version.
-  "validVersions": "0-3",
+  //
+  // Version 4 adds the States flexible field (KIP-518).
+  "validVersions": "0-4",
   "flexibleVersions": "3+",
   "fields": [
+{ "name": "States", "type": "[]string", "versions": "4+", "tag": 0, 
"taggedVersions": "4+",

Review comment:
   Yes the version bump is necessary to detect if this field was supported.
   
   As we're bumping the version and as you said in 
https://github.com/apache/kafka/pull/8238#discussion_r426869755 the overhead of 
the extra field on this API is not a concern, it's probably simpler to use a 
regular field.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429133442



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##
@@ -46,26 +46,42 @@
  * If not, the first member error shall be returned.
  */
 public KafkaFuture all() {
-final KafkaFutureImpl result = new KafkaFutureImpl<>();
-this.future.whenComplete((memberErrors, throwable) -> {
-if (throwable != null) {
-result.completeExceptionally(throwable);
-} else {
-for (MemberToRemove memberToRemove : memberInfos) {
-if (maybeCompleteExceptionally(memberErrors, 
memberToRemove.toMemberIdentity(), result)) {
-return;
+if (removeAll()) {
+final KafkaFutureImpl result = new KafkaFutureImpl<>();
+this.future.whenComplete((memberErrors, throwable) -> {
+if (throwable != null) {
+result.completeExceptionally(throwable);
+} else {
+System.out.println("Remove all active members succeeded, 
removed " + memberErrors.size() + " members: " + memberErrors.keySet());

Review comment:
   Fixed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state

2020-05-22 Thread GitBox


mimaison commented on a change in pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#discussion_r429136058



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
##
@@ -26,4 +31,34 @@
  */
 @InterfaceStability.Evolving
 public class ListConsumerGroupsOptions extends 
AbstractOptions {
+
+private Optional> states = Optional.empty();
+
+/**
+ * Only groups in these states will be returned by listConsumerGroups()
+ * If not set, all groups are returned without their states
+ * throw IllegalArgumentException if states is empty
+ */
+public ListConsumerGroupsOptions inStates(Set states) {
+if (states == null || states.isEmpty()) {
+throw new IllegalArgumentException("states should not be null or 
empty");
+}
+this.states = Optional.of(states);
+return this;
+}
+
+/**
+ * All groups with their states will be returned by listConsumerGroups()
+ */
+public ListConsumerGroupsOptions inAnyState() {
+this.states = Optional.of(EnumSet.allOf(ConsumerGroupState.class));

Review comment:
   This can also be argued for the state value in the response. Currently 
`ConsumerGroupDescription` stores the state as `ConsumerGroupState` so states 
the client isn't aware of are mapped to `UNKNOWN` so I'm doing the same in 
`ConsumerGroupListing`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state

2020-05-22 Thread GitBox


mimaison commented on pull request #8238:
URL: https://github.com/apache/kafka/pull/8238#issuecomment-632591659


   Thanks @hachikuji for the feedback. You brought some interesting points that 
overall simplify the KIP/logic a bit. 
   While initially, it seemed a perfect use case for tagged fields, the need to 
bump the version to check for compatibility made it a bit confusing. 
   
   I've pushed an update following your suggestions to use a regular field and 
default to see all groups. I think it's easier to reason about and keeps the 
API simple. Let me know what you think. I'll update the KIP accordingly if we 
think that's the best option.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)

2020-05-22 Thread GitBox


kowshik commented on a change in pull request #8680:
URL: https://github.com/apache/kafka/pull/8680#discussion_r428573407



##
File path: 
clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java
##
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Represents an immutable basic version range using 2 attributes: min and max 
of type long.
+ * The min and max attributes are expected to be >= 1, and with max >= min.
+ *
+ * The class also provides API to serialize/deserialize the version range 
to/from a map.
+ * The class allows for configurable labels for the min/max attributes, which 
can be specialized by
+ * sub-classes (if needed).
+ */
+class BaseVersionRange {

Review comment:
   It is thoroughly tested in it's child class test suite: 
`SupportedVersionRangeTest`.
   Personally I feel it is good enough this way, because, anyway to test this 
class we need to inherit into a sub-class (since constructor is `protected`). 
And by testing via `SupportedVersionRangeTest`, we achieve exactly the same.
   
   I have now added top-level documentation in the test suite of 
`SupportedVersionRangeTest`, explaining the above.

##
File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java
##
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.feature;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.Objects;
+
+import static java.util.stream.Collectors.joining;
+
+/**
+ * Represents an immutable dictionary with key being feature name, and value 
being VersionRangeType.
+ * Also provides API to serialize/deserialize the features and their version 
ranges to/from a map.
+ *
+ * This class can be instantiated only using its factory functions, with the 
important ones being:
+ * Features.supportedFeatures(...) and Features.finalizedFeatures(...).
+ *
+ * @param  is the type of version range.
+ * @see SupportedVersionRange
+ * @see FinalizedVersionRange
+ */
+public class Features {
+private final Map features;
+
+/**
+ * Constructor is made private, as for readability it is preferred the 
caller uses one of the
+ * static factory functions for instantiation (see below).
+ *
+ * @param features   Map of feature name to type of VersionRange, as the 
backing data structure
+ *   for the Features object.
+ */
+private Features(Map features) {
+this.features = features;
+}
+
+/**
+ * @param features   Map of feature name to VersionRange, as the backing 
data structure
+ *   for the Features object.
+ * @return   Returns a new Features object representing 
"supported" features.
+ */
+public static Features 
supportedFeatures(Map features) {
+return new Features<>(features);
+}
+
+/**
+ * @param features   Map of feature name to FinalizedVersionRange, as the 
backing data structure
+ *   for the Features object.
+ * @return   Returns a new Features object representing 
"finalized" features.
+ */
+public static Features 
finalizedFeatures(Map features) {
+return new Features<>(features);
+}
+
+// Visible for testi

[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429145185



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3660,7 +3686,7 @@ void handleResponse(AbstractResponse abstractResponse) {
 // We set member.id to empty here explicitly, so that the 
lookup will succeed as user doesn't
 // know the exact member.id.
 memberErrors.put(new MemberIdentity()
- 
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
+ 
.setMemberId(memberResponse.memberId())

Review comment:
   For removing static members, this still true because we put memberId as 
`""` in the request, and the server will also response with the same request 
field. (Verified `GroupCoordinator#handleLeaveGroup`)
   For removing dynamic members, we need this change to know the memberId for 
the caller.
   I suppose the `individual check` here is just to check the response against 
the members to be removed(for `removeAll` scenario)? Previously I thought of 
putting all members got from `KafkaAdminClient#getMembersFromGroup` in the 
RemoveMembersFromConsumerGroupResult for checking, but in `removeAll` scenario, 
we get members as  `MemberIdentity` which cannot be converted back to 
`MemberToRemove`, so I'm hesitate to do in this way





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10031) Exist client create wrong topic automatically for a total-fresh restarted Kafka cluster

2020-05-22 Thread jiamei xie (Jira)
jiamei xie created KAFKA-10031:
--

 Summary: Exist client create wrong topic automatically for a 
total-fresh  restarted Kafka cluster
 Key: KAFKA-10031
 URL: https://issues.apache.org/jira/browse/KAFKA-10031
 Project: Kafka
  Issue Type: Bug
  Components: clients
Reporter: jiamei xie
Assignee: jiamei xie


Kill all zookeeper and kafka process. Clear zookeeper and kafka data dir.  
Restart zookeeper and kafka. If there are any active client.  Topic used by 
client will be auto-created.  

How to reproduce?

1.  Start zookeeper and kafka zookeeper and kafka config file. 
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
nohup bin/kafka-server-start.sh config/server.properties &

2.  Create topic test with 2 partitions
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test 
--partitions 2 --replication-factor 1

3.  Produce some data to topic test
bin/kafka-producer-perf-test.sh --topic test --num-records 5000 
--record-size 100 --throughput=-1 --producer-props 
bootstrap.servers=localhost:9092

4.  Kill zookeeper and kafka. ProducerPerformance is still running.
jps
21072 QuorumPeerMain
21704 ProducerPerformance
21230 Kafka
21854 Jps
kill -9 21072 21230

5.  Remove Zookeeper and Kafka data
rm -rf /tmp/zookeeper/
rm -rf /tmp/kafka-logs/

6.  Start zookeeper and kafka 
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
nohup bin/kafka-server-start.sh config/server.properties &

7.  Check topic and you’ll see there is topic named test with partition 1.  
And the ProducerPerformance process continues to run normally.
bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic: test PartitionCount: 1   ReplicationFactor: 1Configs:
Topic: test Partition: 0Leader: 0   Replicas: 0 Isr: 0

Some output of ProducerPerformance process.
1995632 records sent, 399126.4 records/sec (38.06 MB/sec), 378.6 ms avg 
latency, 435.0 ms max latency.
org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for 
test-1:12 ms has passed since batch creation
…..
org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for 
test-1:121774 ms has passed since batch creation
1711254 records sent, 342250.8 records/sec (32.64 MB/sec), 2324.5 ms avg 
latency, 123473.0 ms max latency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10031) Exist client automatically create wrong topic for a total-fresh restarted Kafka cluster

2020-05-22 Thread jiamei xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiamei xie updated KAFKA-10031:
---
Summary: Exist client automatically create wrong topic for a total-fresh  
restarted Kafka cluster  (was: Exist client create wrong topic automatically 
for a total-fresh  restarted Kafka cluster)

> Exist client automatically create wrong topic for a total-fresh  restarted 
> Kafka cluster
> 
>
> Key: KAFKA-10031
> URL: https://issues.apache.org/jira/browse/KAFKA-10031
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> Kill all zookeeper and kafka process. Clear zookeeper and kafka data dir.  
> Restart zookeeper and kafka. If there are any active client.  Topic used by 
> client will be auto-created.  
> How to reproduce?
> 1.Start zookeeper and kafka zookeeper and kafka config file. 
> nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
> nohup bin/kafka-server-start.sh config/server.properties &
> 2.Create topic test with 2 partitions
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test 
> --partitions 2 --replication-factor 1
> 3.Produce some data to topic test
> bin/kafka-producer-perf-test.sh --topic test --num-records 5000 
> --record-size 100 --throughput=-1 --producer-props 
> bootstrap.servers=localhost:9092
> 4.Kill zookeeper and kafka. ProducerPerformance is still running.
> jps
> 21072 QuorumPeerMain
> 21704 ProducerPerformance
> 21230 Kafka
> 21854 Jps
> kill -9 21072 21230
> 5.Remove Zookeeper and Kafka data
> rm -rf /tmp/zookeeper/
> rm -rf /tmp/kafka-logs/
> 6.Start zookeeper and kafka 
> nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
> nohup bin/kafka-server-start.sh config/server.properties &
> 7.Check topic and you’ll see there is topic named test with partition 1.  
> And the ProducerPerformance process continues to run normally.
> bin/kafka-topics.sh --describe --zookeeper localhost:2181
> Topic: test PartitionCount: 1   ReplicationFactor: 1Configs:
> Topic: test Partition: 0Leader: 0   Replicas: 0 Isr: 0
> Some output of ProducerPerformance process.
> 1995632 records sent, 399126.4 records/sec (38.06 MB/sec), 378.6 ms avg 
> latency, 435.0 ms max latency.
> org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for 
> test-1:12 ms has passed since batch creation
> …..
> org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for 
> test-1:121774 ms has passed since batch creation
> 1711254 records sent, 342250.8 records/sec (32.64 MB/sec), 2324.5 ms avg 
> latency, 123473.0 ms max latency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10031) Exist client create wrong topic for a total-fresh restarted Kafka cluster automatically

2020-05-22 Thread jiamei xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiamei xie updated KAFKA-10031:
---
Summary: Exist client create wrong topic for a total-fresh  restarted Kafka 
cluster automatically  (was: Exist client automatically create wrong topic for 
a total-fresh  restarted Kafka cluster)

> Exist client create wrong topic for a total-fresh  restarted Kafka cluster 
> automatically
> 
>
> Key: KAFKA-10031
> URL: https://issues.apache.org/jira/browse/KAFKA-10031
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> Kill all zookeeper and kafka process. Clear zookeeper and kafka data dir.  
> Restart zookeeper and kafka. If there are any active client.  Topic used by 
> client will be auto-created.  
> How to reproduce?
> 1.Start zookeeper and kafka zookeeper and kafka config file. 
> nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
> nohup bin/kafka-server-start.sh config/server.properties &
> 2.Create topic test with 2 partitions
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test 
> --partitions 2 --replication-factor 1
> 3.Produce some data to topic test
> bin/kafka-producer-perf-test.sh --topic test --num-records 5000 
> --record-size 100 --throughput=-1 --producer-props 
> bootstrap.servers=localhost:9092
> 4.Kill zookeeper and kafka. ProducerPerformance is still running.
> jps
> 21072 QuorumPeerMain
> 21704 ProducerPerformance
> 21230 Kafka
> 21854 Jps
> kill -9 21072 21230
> 5.Remove Zookeeper and Kafka data
> rm -rf /tmp/zookeeper/
> rm -rf /tmp/kafka-logs/
> 6.Start zookeeper and kafka 
> nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
> nohup bin/kafka-server-start.sh config/server.properties &
> 7.Check topic and you’ll see there is topic named test with partition 1.  
> And the ProducerPerformance process continues to run normally.
> bin/kafka-topics.sh --describe --zookeeper localhost:2181
> Topic: test PartitionCount: 1   ReplicationFactor: 1Configs:
> Topic: test Partition: 0Leader: 0   Replicas: 0 Isr: 0
> Some output of ProducerPerformance process.
> 1995632 records sent, 399126.4 records/sec (38.06 MB/sec), 378.6 ms avg 
> latency, 435.0 ms max latency.
> org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for 
> test-1:12 ms has passed since batch creation
> …..
> org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for 
> test-1:121774 ms has passed since batch creation
> 1711254 records sent, 342250.8 records/sec (32.64 MB/sec), 2324.5 ms avg 
> latency, 123473.0 ms max latency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10031) Exist client automatically create wrong topic for a total-fresh restarted Kafka cluster

2020-05-22 Thread jiamei xie (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

jiamei xie updated KAFKA-10031:
---
Summary: Exist client  automatically create wrong topic for a total-fresh  
restarted Kafka cluster  (was: Exist client create wrong topic for a 
total-fresh  restarted Kafka cluster automatically)

> Exist client  automatically create wrong topic for a total-fresh  restarted 
> Kafka cluster
> -
>
> Key: KAFKA-10031
> URL: https://issues.apache.org/jira/browse/KAFKA-10031
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: jiamei xie
>Assignee: jiamei xie
>Priority: Major
>
> Kill all zookeeper and kafka process. Clear zookeeper and kafka data dir.  
> Restart zookeeper and kafka. If there are any active client.  Topic used by 
> client will be auto-created.  
> How to reproduce?
> 1.Start zookeeper and kafka zookeeper and kafka config file. 
> nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
> nohup bin/kafka-server-start.sh config/server.properties &
> 2.Create topic test with 2 partitions
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test 
> --partitions 2 --replication-factor 1
> 3.Produce some data to topic test
> bin/kafka-producer-perf-test.sh --topic test --num-records 5000 
> --record-size 100 --throughput=-1 --producer-props 
> bootstrap.servers=localhost:9092
> 4.Kill zookeeper and kafka. ProducerPerformance is still running.
> jps
> 21072 QuorumPeerMain
> 21704 ProducerPerformance
> 21230 Kafka
> 21854 Jps
> kill -9 21072 21230
> 5.Remove Zookeeper and Kafka data
> rm -rf /tmp/zookeeper/
> rm -rf /tmp/kafka-logs/
> 6.Start zookeeper and kafka 
> nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
> nohup bin/kafka-server-start.sh config/server.properties &
> 7.Check topic and you’ll see there is topic named test with partition 1.  
> And the ProducerPerformance process continues to run normally.
> bin/kafka-topics.sh --describe --zookeeper localhost:2181
> Topic: test PartitionCount: 1   ReplicationFactor: 1Configs:
> Topic: test Partition: 0Leader: 0   Replicas: 0 Isr: 0
> Some output of ProducerPerformance process.
> 1995632 records sent, 399126.4 records/sec (38.06 MB/sec), 378.6 ms avg 
> latency, 435.0 ms max latency.
> org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for 
> test-1:12 ms has passed since batch creation
> …..
> org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for 
> test-1:121774 ms has passed since batch creation
> 1711254 records sent, 342250.8 records/sec (32.64 MB/sec), 2324.5 ms avg 
> latency, 123473.0 ms max latency.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon opened a new pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-05-22 Thread GitBox


showuon opened a new pull request #8712:
URL: https://github.com/apache/kafka/pull/8712


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8697: [WIP] KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

2020-05-22 Thread GitBox


cadonna commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429158357



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##
@@ -642,6 +642,30 @@ public static void addAvgAndMaxToSensor(final Sensor 
sensor,
 );
 }
 
+public static void addMaxAndMinToSensor(final Sensor sensor,

Review comment:
   prop (super-nit): Could you call the method `addMinAndMaxToSensor()` 
since we have already one method that is called `addAvgAndMinAndMaxToSensor()`?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
##
@@ -356,4 +354,27 @@ public void 
shouldGetDroppedRecordsSensorOrLateRecordDropSensor() {
 shouldGetDroppedRecordsSensor();
 }
 }
+
+@Test
+public void shouldGetRecordE2ELatencySensor() {
+final String operation = "record-e2e-latency";
+expect(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, operation, 
RecordingLevel.INFO))
+.andReturn(expectedSensor);
+expect(streamsMetrics.taskLevelTagMap(THREAD_ID, 
TASK_ID)).andReturn(tagMap);
+StreamsMetricsImpl.addMaxAndMinToSensor(
+expectedSensor,
+TASK_LEVEL_GROUP,
+tagMap,
+operation,
+RECORD_E2E_LATENCY_MAX_DESCRIPTION,

Review comment:
   req: Please do not use the constant here. The point of this test is also 
to check the correctness of the description, i.e., the content of that constant.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java
##
@@ -14,15 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.streams.kstream.internals.metrics;
+package org.apache.kafka.streams.processor.internals.metrics;

Review comment:
   Yes, very nice!

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##
@@ -86,6 +87,14 @@ private TaskMetrics() {}
 private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count 
of buffered records that are polled " +
 "from consumer and not yet processed for this active task";
 
+private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
+static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION =
+"The maximum end-to-end latency of a record, measuring by comparing 
the record timestamp with the "
++ "system time when it has been fully processed by the task";
+static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION =

Review comment:
   See my comment above.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##
@@ -86,6 +87,14 @@ private TaskMetrics() {}
 private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count 
of buffered records that are polled " +
 "from consumer and not yet processed for this active task";
 
+private static final String RECORD_E2E_LATENCY = "record-e2e-latency";
+static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION =

Review comment:
   req: Please define this constant as private as all the others. I left a 
request in `TaskMetricsTest` which makes this request clearer.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -137,6 +138,7 @@ public StreamTask(final TaskId id,
 }
 processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, 
taskId, streamsMetrics);
 processLatencySensor = TaskMetrics.processLatencySensor(threadId, 
taskId, streamsMetrics);
+recordE2ELatencySensor = TaskMetrics.recordE2ELatencySensor(threadId, 
taskId, streamsMetrics);

Review comment:
   Q: Is there a specific reason to init the sensor here and not in 
`SinkNode`? You can init and store it there. That was one motivation to make 
`*Metrics` classes (e.g. `TaskMetrics`) static, so that you do not need any 
code in the processor context to get specific sensors. If there is not specific 
reason, you could get rid of the changes in the `*Context*` classes.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs

2020-05-22 Thread GitBox


mimaison commented on pull request #8312:
URL: https://github.com/apache/kafka/pull/8312#issuecomment-632632500


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9173) StreamsPartitionAssignor assigns partitions to only one worker

2020-05-22 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17113958#comment-17113958
 ] 

Bruno Cadonna commented on KAFKA-9173:
--

The issue that all 10 tasks are assigned to the same node is fixed with KIP-441 
and the following PR adds unit tests that verify it:
https://github.com/apache/kafka/pull/8689

For the other issues, new tickets should be created as [~mjsax] proposed.

I will close this ticket as fixed in 2.6.

> StreamsPartitionAssignor assigns partitions to only one worker
> --
>
> Key: KAFKA-9173
> URL: https://issues.apache.org/jira/browse/KAFKA-9173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Oleg Muravskiy
>Priority: Critical
>  Labels: user-experience
> Attachments: StreamsPartitionAssignor.log
>
>
> I'm running a distributed KafkaStreams application on 10 worker nodes, 
> subscribed to 21 topics with 10 partitions in each. I'm only using a 
> Processor interface, and a persistent state store.
> However, only one worker gets assigned partitions, all other workers get 
> nothing. Restarting the application, or cleaning local state stores does not 
> help. StreamsPartitionAssignor migrates to other nodes, and eventually picks 
> up other node to assign partitions to, but still only one node.
> It's difficult to figure out where to look for the signs of problems, I'm 
> attaching the log messages from the StreamsPartitionAssignor. Let me know 
> what else I could provide to help resolve this.
> [^StreamsPartitionAssignor.log]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9173) StreamsPartitionAssignor assigns partitions to only one worker

2020-05-22 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-9173.
--
Fix Version/s: 2.6.0
   Resolution: Fixed

> StreamsPartitionAssignor assigns partitions to only one worker
> --
>
> Key: KAFKA-9173
> URL: https://issues.apache.org/jira/browse/KAFKA-9173
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.0, 2.2.1
>Reporter: Oleg Muravskiy
>Priority: Critical
>  Labels: user-experience
> Fix For: 2.6.0
>
> Attachments: StreamsPartitionAssignor.log
>
>
> I'm running a distributed KafkaStreams application on 10 worker nodes, 
> subscribed to 21 topics with 10 partitions in each. I'm only using a 
> Processor interface, and a persistent state store.
> However, only one worker gets assigned partitions, all other workers get 
> nothing. Restarting the application, or cleaning local state stores does not 
> help. StreamsPartitionAssignor migrates to other nodes, and eventually picks 
> up other node to assign partitions to, but still only one node.
> It's difficult to figure out where to look for the signs of problems, I'm 
> attaching the log messages from the StreamsPartitionAssignor. Let me know 
> what else I could provide to help resolve this.
> [^StreamsPartitionAssignor.log]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness

2020-05-22 Thread GitBox


cadonna commented on a change in pull request #8696:
URL: https://github.com/apache/kafka/pull/8696#discussion_r429196241



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -132,22 +132,97 @@ static boolean assignTaskMovements(final Map> tasksToCau
 return movementsNeeded;
 }
 
+static int assignStandbyTaskMovements(final Map> 
tasksToCaughtUpClients,
+  final Map 
clientStates,
+  final AtomicInteger 
remainingWarmupReplicas,
+  final Map> 
warmups) {
+final BiFunction caughtUpPredicate =
+(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients);
+
+final ConstrainedPrioritySet caughtUpClientsByTaskLoad = new 
ConstrainedPrioritySet(
+caughtUpPredicate,
+client -> clientStates.get(client).assignedTaskLoad()
+);
+
+final Queue taskMovements = new PriorityQueue<>(
+
Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task)
+);
+
+for (final Map.Entry clientStateEntry : 
clientStates.entrySet()) {
+final UUID destination = clientStateEntry.getKey();
+final ClientState state = clientStateEntry.getValue();
+for (final TaskId task : state.standbyTasks()) {
+if (warmups.getOrDefault(destination, 
Collections.emptySet()).contains(task)) {
+// this is a warmup, so we won't move it.
+} else if 
(!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, destination, 
tasksToCaughtUpClients)) {

Review comment:
   prop: Could you add a method 
`taskIsNotCaughtUpOnClientAndCaughtUpClientsExist()`? Applying De Morgan's law 
every time I read this code gives me headache.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -57,14 +59,42 @@ public boolean assign(final Map clients,
 configs.numStandbyReplicas
 );
 
-final boolean probingRebalanceNeeded = assignTaskMovements(
-tasksToCaughtUpClients(statefulTasks, clientStates, 
configs.acceptableRecoveryLag),
+final AtomicInteger remainingWarmupReplicas = new 
AtomicInteger(configs.maxWarmupReplicas);
+
+final Map> tasksToCaughtUpClients = 
tasksToCaughtUpClients(
+statefulTasks,
+clientStates,
+configs.acceptableRecoveryLag
+);
+
+// We temporarily need to know which standby tasks were intended as 
warmups
+// for active tasks, so that we don't move them (again) when we plan 
standby
+// task movements. We can then immediately treat warmups exactly the 
same as
+// hot-standby replicas, so we just track it right here as metadata, 
rather
+// than add "warmup" assignments to ClientState, for example.
+final Map> warmups = new TreeMap<>();
+
+final int neededActiveTaskMovements = assignActiveTaskMovements(
+tasksToCaughtUpClients,
 clientStates,
-configs.maxWarmupReplicas
+warmups,
+remainingWarmupReplicas
+);
+
+final int neededStandbyTaskMovements = assignStandbyTaskMovements(
+tasksToCaughtUpClients,
+clientStates,
+remainingWarmupReplicas,
+warmups
 );
 
 assignStatelessActiveTasks(clientStates, diff(TreeSet::new, 
allTaskIds, statefulTasks));
 
+// We shouldn't plan a probing rebalance if we _needed_ task 
movements, but couldn't do any
+// due to being configured for no warmups.

Review comment:
   To be clear, according to `StreamsConfig`, we do NOT allow 
`max.warmup.replicas = 0`. It must at least be 1.  Or was your statement 
hypothetical, that it would be OK to allow it? Anyway, I am in favour of 
keeping the `> 0` check here. 
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs

2020-05-22 Thread GitBox


mimaison commented on a change in pull request #8312:
URL: https://github.com/apache/kafka/pull/8312#discussion_r429211490



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
##
@@ -174,19 +97,25 @@ public boolean isReadOnly() {
 }
 
 public enum ConfigSource {
-UNKNOWN_CONFIG((byte) 0),
-TOPIC_CONFIG((byte) 1),
-DYNAMIC_BROKER_CONFIG((byte) 2),
-DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3),
-STATIC_BROKER_CONFIG((byte) 4),
-DEFAULT_CONFIG((byte) 5),
-DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6);
+UNKNOWN_CONFIG((byte) 0, 
org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.UNKNOWN),

Review comment:
   It's unfortunate the public enum cannot do the mapping and we have to 
keep a copy here. Could we maybe rename this one so this looks less crazy? WDYT?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -1912,53 +1915,51 @@ public DescribeConfigsResult 
describeConfigs(Collection configRe
 
 // The non-BROKER resources which we want to describe.  These 
resources can be described by a
 // single, unified DescribeConfigs request.
-final Collection unifiedRequestResources = new 
ArrayList<>(configResources.size());
+final DescribeConfigsRequestData unifiedRequestResources = new 
DescribeConfigsRequestData();
 
 for (ConfigResource resource : configResources) {
 if (dependsOnSpecificNode(resource)) {
 brokerFutures.put(resource, new KafkaFutureImpl<>());
 brokerResources.add(resource);
 } else {
 unifiedRequestFutures.put(resource, new KafkaFutureImpl<>());
-unifiedRequestResources.add(resource);
+unifiedRequestResources.resources().add(new 
DescribeConfigsResource()
+.setResourceName(resource.name())
+.setResourceType(resource.type().id()));
 }
 }
 
 final long now = time.milliseconds();
-if (!unifiedRequestResources.isEmpty()) {
+if (!unifiedRequestResources.resources().isEmpty()) {
 runnable.call(new Call("describeConfigs", calcDeadlineMs(now, 
options.timeoutMs()),
 new LeastLoadedNodeProvider()) {
 
 @Override
 DescribeConfigsRequest.Builder createRequest(int timeoutMs) {
-return new 
DescribeConfigsRequest.Builder(unifiedRequestResources)
-.includeSynonyms(options.includeSynonyms());
+return new 
DescribeConfigsRequest.Builder(unifiedRequestResources.setIncludeSynoyms(options.includeSynonyms()));

Review comment:
   I wonder if it would read better if `unifiedRequestResources` was 
instead a `List` and we were creating the 
`DescribeConfigsRequestData` object and calling all its setters here. WDYT?

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
##
@@ -220,145 +153,66 @@ public ConfigSource source() {
 }
 }
 
+public Map resultMap() {
+return data().results().stream().collect(Collectors.toMap(
+configsResult ->
+new 
ConfigResource(ConfigResource.Type.forId(configsResult.resourceType()),
+configsResult.resourceName()),
+Function.identity()));
+}
 
-private final int throttleTimeMs;
-private final Map configs;
+private final DescribeConfigsResponseData data;
 
-public DescribeConfigsResponse(int throttleTimeMs, Map configs) {
-this.throttleTimeMs = throttleTimeMs;
-this.configs = Objects.requireNonNull(configs, "configs");
+public DescribeConfigsResponse(DescribeConfigsResponseData data) {
+this.data = data;
 }
 
-public DescribeConfigsResponse(Struct struct) {
-throttleTimeMs = struct.get(THROTTLE_TIME_MS);
-Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
-configs = new HashMap<>(resourcesArray.length);
-for (Object resourceObj : resourcesArray) {
-Struct resourceStruct = (Struct) resourceObj;
-
-ApiError error = new ApiError(resourceStruct);
-ConfigResource.Type resourceType = 
ConfigResource.Type.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
-String resourceName = 
resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
-ConfigResource resource = new ConfigResource(resourceType, 
resourceName);
-
-Object[] configEntriesArray = 
resourceStruct.getArray(CONFIG_ENTRIES_KEY_NAME);
-List configEntries = new 
ArrayList<>(configEntriesArray.length);
-for (Object configEntriesObj: configEntriesArray) {
-Struct configEntriesStruct = (Struct) configEntriesObj;
-Stri

[GitHub] [kafka] rnpridgeon commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-22 Thread GitBox


rnpridgeon commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429216813



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##
@@ -16,19 +16,24 @@
  */
 package org.apache.kafka.connect.util;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.InvalidRecordException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 public final class ConnectUtils {
 private static final Logger log = 
LoggerFactory.getLogger(ConnectUtils.class);
+public static final String CONNECT_KAFKA_CLUSTER_ID = 
"connect.kafka.cluster.id";
+public static final String CONNECT_GROUP_ID = "connect.group.id";

Review comment:
   CONNECT_VERSION?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rnpridgeon commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-22 Thread GitBox


rnpridgeon commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429216813



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
##
@@ -16,19 +16,24 @@
  */
 package org.apache.kafka.connect.util;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.InvalidRecordException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
 public final class ConnectUtils {
 private static final Logger log = 
LoggerFactory.getLogger(ConnectUtils.class);
+public static final String CONNECT_KAFKA_CLUSTER_ID = 
"connect.kafka.cluster.id";
+public static final String CONNECT_GROUP_ID = "connect.group.id";

Review comment:
   Maybe it would be helpful to add CONNECT_VERSION as well. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #8713: KAFKA-6145: Add unit tests for assignments of only stateless tasks

2020-05-22 Thread GitBox


cadonna commented on pull request #8713:
URL: https://github.com/apache/kafka/pull/8713#issuecomment-632676433


   Call for review: @ableegoldman @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #8713: KAFKA-6145: Add unit tests for assignments of only stateless tasks

2020-05-22 Thread GitBox


cadonna opened a new pull request #8713:
URL: https://github.com/apache/kafka/pull/8713


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #8714: MINOR: Improve broker registration message

2020-05-22 Thread GitBox


ijuma opened a new pull request #8714:
URL: https://github.com/apache/kafka/pull/8714


   It was previously:
   
   > INFO Registered broker 0 at path /brokers/ids/0 with addresses: 
ArraySeq(EndPoint(localhost,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(localhost,9093,ListenerName(SSL),SSL)),
 czxid (broker epoch):
   4294967320 (kafka.zk.KafkaZkClient)
   
   It's now:
   
   > INFO Registered broker 0 at path /brokers/ids/0 with addresses: 
PLAINTEXT://localhost:9092,SSL://localhost:9093, czxid (broker epoch):
   4294967320 (kafka.zk.KafkaZkClient)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10032) Response to ApiVersionRequest returns wrong Produce(0) version

2020-05-22 Thread Antonio Pires (Jira)
Antonio Pires created KAFKA-10032:
-

 Summary: Response to ApiVersionRequest returns wrong Produce(0) 
version
 Key: KAFKA-10032
 URL: https://issues.apache.org/jira/browse/KAFKA-10032
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.4.0, 0.11.0.0
Reporter: Antonio Pires


The response to {{ApiVersionRequest}} returns incorrect information for the 
Production {{_ApiKey_}} as it is not considering 
{{_log.message.format.version_}} config when being overwritten. While the 
internals, {{_Log.append()_}}, use {{log.message.format.version}} when adding a 
new record set to the active segment, the version exposed to clients is not the 
same, which can generate unexpected behaviour.

For example
 Using version _>0.11.0_ with {{_log.message.format.version_}} set to a 
previous version (0.10.2), the broker returns

{{(id: 1 rack: null) -> (}}
 {{    Produce(0): 0 to 3 [usable: 3],}}
 {{    Fetch(1): 0 to 5 [usable: 5],}}
 {{    ...}}
 \{{ )}}

and should instead be responding with

{{(id: 1 rack: null) -> (}}
 {{    Produce(0): 0 to 2 [usable: 2],}}
 {{    Fetch(1): 0 to 5 [usable: 5],}}
 {{    ...}}
 \{{ )}}

Shouldn't a single source of truth be used to get this version?
 In {{_Log.append()_}}, instead of directly getting the version from the 
config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, in 
turn, should take the config in consideration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10032) Response to ApiVersionRequest returns wrong Produce(0) version

2020-05-22 Thread Antonio Pires (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antonio Pires updated KAFKA-10032:
--
Description: 
The response to {{ApiVersionRequest}} returns incorrect information for the 
Production {{_ApiKey_}} as it is not considering 
{{_log.message.format.version_}} config when being overwritten. While the 
internals, {{_Log.append()_}}, use {{log.message.format.version}} when adding a 
new record set to the active segment, the version exposed to clients is not the 
same, which can generate unexpected behaviour.

For example
 Using version _>0.11.0_ with {{_log.message.format.version_}} set to a 
previous version (0.10.2), the broker returns

{{(id: 1 rack: null) -> (}}
 {{    Produce(0): 0 to 3 [usable: 3],}}
 {{    Fetch(1): 0 to 5 [usable: 5],}}
 {{    ...}}
)

and should instead be responding with

{{(id: 1 rack: null) -> (}}
 {{    Produce(0): 0 to 2 [usable: 2],}}
 {{    Fetch(1): 0 to 5 [usable: 5],}}
 {{    ...}}
)

Shouldn't a single source of truth be used to get this version?
 In {{_Log.append()_}}, instead of directly getting the version from the 
config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, in 
turn, should take the config in consideration.

  was:
The response to {{ApiVersionRequest}} returns incorrect information for the 
Production {{_ApiKey_}} as it is not considering 
{{_log.message.format.version_}} config when being overwritten. While the 
internals, {{_Log.append()_}}, use {{log.message.format.version}} when adding a 
new record set to the active segment, the version exposed to clients is not the 
same, which can generate unexpected behaviour.

For example
 Using version _>0.11.0_ with {{_log.message.format.version_}} set to a 
previous version (0.10.2), the broker returns

{{(id: 1 rack: null) -> (}}
 {{    Produce(0): 0 to 3 [usable: 3],}}
 {{    Fetch(1): 0 to 5 [usable: 5],}}
 {{    ...}}
 \{{ )}}

and should instead be responding with

{{(id: 1 rack: null) -> (}}
 {{    Produce(0): 0 to 2 [usable: 2],}}
 {{    Fetch(1): 0 to 5 [usable: 5],}}
 {{    ...}}
 \{{ )}}

Shouldn't a single source of truth be used to get this version?
 In {{_Log.append()_}}, instead of directly getting the version from the 
config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, in 
turn, should take the config in consideration.


> Response to ApiVersionRequest returns wrong Produce(0) version
> --
>
> Key: KAFKA-10032
> URL: https://issues.apache.org/jira/browse/KAFKA-10032
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 2.4.0
>Reporter: Antonio Pires
>Priority: Major
>
> The response to {{ApiVersionRequest}} returns incorrect information for the 
> Production {{_ApiKey_}} as it is not considering 
> {{_log.message.format.version_}} config when being overwritten. While the 
> internals, {{_Log.append()_}}, use {{log.message.format.version}} when adding 
> a new record set to the active segment, the version exposed to clients is not 
> the same, which can generate unexpected behaviour.
> For example
>  Using version _>0.11.0_ with {{_log.message.format.version_}} set to a 
> previous version (0.10.2), the broker returns
> {{(id: 1 rack: null) -> (}}
>  {{    Produce(0): 0 to 3 [usable: 3],}}
>  {{    Fetch(1): 0 to 5 [usable: 5],}}
>  {{    ...}}
> )
> and should instead be responding with
> {{(id: 1 rack: null) -> (}}
>  {{    Produce(0): 0 to 2 [usable: 2],}}
>  {{    Fetch(1): 0 to 5 [usable: 5],}}
>  {{    ...}}
> )
> Shouldn't a single source of truth be used to get this version?
>  In {{_Log.append()_}}, instead of directly getting the version from the 
> config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, 
> in turn, should take the config in consideration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10032) Response to ApiVersionRequest returns wrong Produce(0) version

2020-05-22 Thread Antonio Pires (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Antonio Pires updated KAFKA-10032:
--
Description: 
The response to {{ApiVersionRequest}} returns incorrect information for the 
Production {{_ApiKey_}} as it is not considering 
{{_log.message.format.version_}} config when being overwritten. While the 
internals, {{_Log.append()_}}, use {{_log.message.format.version_}} when adding 
a new record set to the active segment, the version exposed to clients is not 
the same, which can generate unexpected behaviour.

For example
 Using version _>0.11.0_ with {{_log.message.format.version_}} set to a 
previous version (0.10.2), the broker returns

{{(id: 1 rack: null) -> (}}
 {{    Produce(0): 0 to 3 [usable: 3],}}
 {{    Fetch(1): 0 to 5 [usable: 5],}}
 {{    ...}}
)

and should instead be responding with

{{(id: 1 rack: null) -> (}}
 {{    Produce(0): 0 to 2 [usable: 2],}}
 {{    Fetch(1): 0 to 5 [usable: 5],}}
 {{    ...}}
)

Shouldn't a single source of truth be used to get this version?
 In {{_Log.append()_}}, instead of directly getting the version from the 
config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, in 
turn, should take the config in consideration.

  was:
The response to {{ApiVersionRequest}} returns incorrect information for the 
Production {{_ApiKey_}} as it is not considering 
{{_log.message.format.version_}} config when being overwritten. While the 
internals, {{_Log.append()_}}, use {{log.message.format.version}} when adding a 
new record set to the active segment, the version exposed to clients is not the 
same, which can generate unexpected behaviour.

For example
 Using version _>0.11.0_ with {{_log.message.format.version_}} set to a 
previous version (0.10.2), the broker returns

{{(id: 1 rack: null) -> (}}
 {{    Produce(0): 0 to 3 [usable: 3],}}
 {{    Fetch(1): 0 to 5 [usable: 5],}}
 {{    ...}}
)

and should instead be responding with

{{(id: 1 rack: null) -> (}}
 {{    Produce(0): 0 to 2 [usable: 2],}}
 {{    Fetch(1): 0 to 5 [usable: 5],}}
 {{    ...}}
)

Shouldn't a single source of truth be used to get this version?
 In {{_Log.append()_}}, instead of directly getting the version from the 
config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, in 
turn, should take the config in consideration.


> Response to ApiVersionRequest returns wrong Produce(0) version
> --
>
> Key: KAFKA-10032
> URL: https://issues.apache.org/jira/browse/KAFKA-10032
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 2.4.0
>Reporter: Antonio Pires
>Priority: Major
>
> The response to {{ApiVersionRequest}} returns incorrect information for the 
> Production {{_ApiKey_}} as it is not considering 
> {{_log.message.format.version_}} config when being overwritten. While the 
> internals, {{_Log.append()_}}, use {{_log.message.format.version_}} when 
> adding a new record set to the active segment, the version exposed to clients 
> is not the same, which can generate unexpected behaviour.
> For example
>  Using version _>0.11.0_ with {{_log.message.format.version_}} set to a 
> previous version (0.10.2), the broker returns
> {{(id: 1 rack: null) -> (}}
>  {{    Produce(0): 0 to 3 [usable: 3],}}
>  {{    Fetch(1): 0 to 5 [usable: 5],}}
>  {{    ...}}
> )
> and should instead be responding with
> {{(id: 1 rack: null) -> (}}
>  {{    Produce(0): 0 to 2 [usable: 2],}}
>  {{    Fetch(1): 0 to 5 [usable: 5],}}
>  {{    ...}}
> )
> Shouldn't a single source of truth be used to get this version?
>  In {{_Log.append()_}}, instead of directly getting the version from the 
> config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, 
> in turn, should take the config in consideration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10033) AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existing topic

2020-05-22 Thread Gregory Koshelev (Jira)
Gregory Koshelev created KAFKA-10033:


 Summary: AdminClient should throw UnknownTopicOrPartitionException 
instead of UnknownServerException if altering configs of non-existing topic
 Key: KAFKA-10033
 URL: https://issues.apache.org/jira/browse/KAFKA-10033
 Project: Kafka
  Issue Type: Improvement
  Components: admin
Affects Versions: 2.4.0
Reporter: Gregory Koshelev


Currently, altering configs of non-existing topic leads to 
{{UnknownServerException}}:

{code}
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnknownServerException: Topic "kgn_test" does 
not exist.
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at 
ru.kontur.vostok.hercules.stream.manager.kafka.KafkaManager.changeTtl(KafkaManager.java:130)
... 10 common frames omitted
Caused by: org.apache.kafka.common.errors.UnknownServerException: Topic 
"kgn_test" does not exist.
{code}

The output above is produced due to {{AdminZkClient.validateTopicConfig}} 
method:
{code}
  def validateTopicConfig(topic: String, configs: Properties): Unit = {
Topic.validate(topic)
if (!zkClient.topicExists(topic))
  throw new AdminOperationException("Topic \"%s\" does not 
exist.".format(topic))
// remove the topic overrides
LogConfig.validate(configs)
  }
{code}

{{UnknownServerException}} is common exception but in this case cause is pretty 
clear. So this can be fixed easily by using 
{{UnknownTopicOrPartitionException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-05-22 Thread GitBox


showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r429276664



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -157,16 +161,16 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
 }
 
 
-if (!topicsNotReady.isEmpty()) {
-log.info("Topics {} can not be made ready with {} retries 
left", topicsNotReady, retries);
+if (isNeedRetry(topicsNotReady)) {
+log.info("Topics {} can not be made ready with {} retries 
left", topicsNotReady, remainingRetries);

Review comment:
   the log should put `remainingRetries` since it's logging `with {} 
retries left`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429276600



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##
@@ -51,9 +52,21 @@
 if (throwable != null) {
 result.completeExceptionally(throwable);
 } else {
-for (MemberToRemove memberToRemove : memberInfos) {
-if (maybeCompleteExceptionally(memberErrors, 
memberToRemove.toMemberIdentity(), result)) {
-return;
+if (removeAll()) {
+for (Map.Entry entry: 
memberErrors.entrySet()) {
+Exception exception = entry.getValue().exception();
+if (exception != null) {
+Throwable ex = new KafkaException("Encounter 
exception when trying to remove: "

Review comment:
   Wrap to let the failed member info available for caller like 
`StreamsResetter`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#issuecomment-632722266


   @abbccdda Updated and req for review agian.. really appreciated your help to 
pick out so much style violations, also wondering if we can use some format 
tool like `scalafmt` to automatic format~



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gnkoshelev opened a new pull request #8715: KAFKA-10033: AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existin

2020-05-22 Thread GitBox


gnkoshelev opened a new pull request #8715:
URL: https://github.com/apache/kafka/pull/8715


   Fixes KAFKA-10033.
   
   Replace AdminOperationException with UnknownTopicOrPartitionException if 
topic does not exists when validating altering configs of topic in 
AdminZkClient.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


feyman2016 commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429276600



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##
@@ -51,9 +52,21 @@
 if (throwable != null) {
 result.completeExceptionally(throwable);
 } else {
-for (MemberToRemove memberToRemove : memberInfos) {
-if (maybeCompleteExceptionally(memberErrors, 
memberToRemove.toMemberIdentity(), result)) {
-return;
+if (removeAll()) {
+for (Map.Entry entry: 
memberErrors.entrySet()) {
+Exception exception = entry.getValue().exception();
+if (exception != null) {
+Throwable ex = new KafkaException("Encounter 
exception when trying to remove: "

Review comment:
   Wrap to let the failed member info available for caller like 
`StreamsResetter`. Only capture the first found member error like in the non 
`removeAll` scenario.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8714: MINOR: Improve broker registration and Log logging

2020-05-22 Thread GitBox


ijuma commented on a change in pull request #8714:
URL: https://github.com/apache/kafka/pull/8714#discussion_r429285942



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1694,9 +1694,10 @@ class Log(@volatile private var _dir: File,
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean, reason: String): Int = {
 lock synchronized {
   val deletable = deletableSegments(predicate)
-  if (deletable.nonEmpty)
+  if (deletable.nonEmpty) {
 info(s"Found deletable segments with base offsets 
[${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
-  deleteSegments(deletable)
+deleteSegments(deletable)
+  } else 0

Review comment:
   This doesn't change behavior, but makes the code a bit less weird. 
Previously, we would special case the log, but rely on the called method to 
check again if `deletable` is empty.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-05-22 Thread GitBox


showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r429288449



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##
@@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult 
describeTopics(Collection topic
 future.completeExceptionally(new 
UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found."));
 topicDescriptions.put(requestedTopic, future);
 }
+// try to simulate the leader not available situation when topic 
name is "LeaderNotAvailableTopic"
+if (requestedTopic.equals("LeaderNotAvailableTopic")) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new 
LeaderNotAvailableException("The leader of Topic " + requestedTopic + " is not 
available."));

Review comment:
   Try to simulate the `LeaderNotAvailableException` in the 
`MockAdminClient`, by setting the topic name to `LeaderNotAvailableTopic`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-05-22 Thread GitBox


showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r429288449



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
##
@@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult 
describeTopics(Collection topic
 future.completeExceptionally(new 
UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found."));
 topicDescriptions.put(requestedTopic, future);
 }
+// try to simulate the leader not available situation when topic 
name is "LeaderNotAvailableTopic"
+if (requestedTopic.equals("LeaderNotAvailableTopic")) {
+KafkaFutureImpl future = new 
KafkaFutureImpl<>();
+future.completeExceptionally(new 
LeaderNotAvailableException("The leader of Topic " + requestedTopic + " is not 
available."));

Review comment:
   Try to simulate the `LeaderNotAvailableException` in the 
`MockAdminClient`, if the topic name is`LeaderNotAvailableTopic`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-05-22 Thread GitBox


showuon commented on a change in pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#discussion_r429289380



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
##
@@ -247,11 +261,19 @@ public InternalTopicManager(final Admin adminClient, 
final StreamsConfig streams
 log.error(errorMsg);
 throw new StreamsException(errorMsg);
 }
-} else {
+} else if (!needRetryTopics.contains(topicName)) {
 topicsToCreate.add(topicName);
 }
 }
 
 return topicsToCreate;
 }
+
+private boolean isNeedRetry(final Set topicsNotReady) {
+return !topicsNotReady.isEmpty() || hasNeedRetryTopic();

Review comment:
   If there's topic with `LeaderNotAvailableException`, we also need to 
retry.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException

2020-05-22 Thread GitBox


showuon commented on pull request #8712:
URL: https://github.com/apache/kafka/pull/8712#issuecomment-632729141


   Hi @ableegoldman , could you review this PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8697: [WIP] KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

2020-05-22 Thread GitBox


mjsax commented on pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#issuecomment-632758838


   Build failed with checkstyle:
   ```
   [ant:checkstyle] [ERROR] 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java:23:8:
 Unused import - org.apache.kafka.common.metrics.Measurable. [UnusedImports]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-22 Thread GitBox


mjsax commented on pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#issuecomment-632759415


   Java 8 and Java 11 passed.
   Java 14:
   ```
   
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-22 Thread GitBox


mjsax merged pull request #8679:
URL: https://github.com/apache/kafka/pull/8679


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10003) Deprecate KStream#through in favor of KStream#repartition

2020-05-22 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-10003.
-
Fix Version/s: 2.6.0
   Resolution: Fixed

> Deprecate KStream#through in favor of KStream#repartition
> -
>
> Key: KAFKA-10003
> URL: https://issues.apache.org/jira/browse/KAFKA-10003
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Levani Kokhreidze
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: kip
> Fix For: 2.6.0
>
>
> After introducing `KStream#repartition` in KIP-221 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint],
>  it makes sense to deprecate `KStream#through` in favor of new operator (see 
> voting thread for more context: 
> [https://www.mail-archive.com/dev@kafka.apache.org/msg107645.html)|https://www.mail-archive.com/dev@kafka.apache.org/msg107645.html]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10033) AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existing topic

2020-05-22 Thread Brian Byrne (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brian Byrne reassigned KAFKA-10033:
---

Assignee: Brian Byrne

> AdminClient should throw UnknownTopicOrPartitionException instead of 
> UnknownServerException if altering configs of non-existing topic
> -
>
> Key: KAFKA-10033
> URL: https://issues.apache.org/jira/browse/KAFKA-10033
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 2.4.0
>Reporter: Gregory Koshelev
>Assignee: Brian Byrne
>Priority: Major
>
> Currently, altering configs of non-existing topic leads to 
> {{UnknownServerException}}:
> {code}
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnknownServerException: Topic "kgn_test" does 
> not exist.
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>   at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>   at 
> ru.kontur.vostok.hercules.stream.manager.kafka.KafkaManager.changeTtl(KafkaManager.java:130)
>   ... 10 common frames omitted
> Caused by: org.apache.kafka.common.errors.UnknownServerException: Topic 
> "kgn_test" does not exist.
> {code}
> The output above is produced due to {{AdminZkClient.validateTopicConfig}} 
> method:
> {code}
>   def validateTopicConfig(topic: String, configs: Properties): Unit = {
> Topic.validate(topic)
> if (!zkClient.topicExists(topic))
>   throw new AdminOperationException("Topic \"%s\" does not 
> exist.".format(topic))
> // remove the topic overrides
> LogConfig.validate(configs)
>   }
> {code}
> {{UnknownServerException}} is common exception but in this case cause is 
> pretty clear. So this can be fixed easily by using 
> {{UnknownTopicOrPartitionException}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on a change in pull request #8376: KAFKA-9724 Newer clients not always sending fetch request to older brokers

2020-05-22 Thread GitBox


mumrah commented on a change in pull request #8376:
URL: https://github.com/apache/kafka/pull/8376#discussion_r429323395



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##
@@ -422,8 +427,29 @@ public synchronized void position(TopicPartition tp, 
FetchPosition position) {
 assignedState(tp).position(position);
 }
 
-public synchronized boolean 
maybeValidatePositionForCurrentLeader(TopicPartition tp, 
Metadata.LeaderAndEpoch leaderAndEpoch) {
-return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+/**
+ * Enter the offset validation state if the leader for this partition is 
known to support a usable version of the
+ * OffsetsForLeaderEpoch API. If the leader node does not support the API, 
simply complete the offset validation.
+ *
+ * @param apiVersions
+ * @param tp
+ * @param leaderAndEpoch
+ * @return true if we enter the offset validation state
+ */
+public synchronized boolean 
maybeValidatePositionForCurrentLeader(ApiVersions apiVersions, TopicPartition 
tp,
+  
Metadata.LeaderAndEpoch leaderAndEpoch) {
+if (leaderAndEpoch.leader.isPresent()) {
+NodeApiVersions nodeApiVersions = 
apiVersions.get(leaderAndEpoch.leader.get().idString());
+if (nodeApiVersions == null || 
hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+} else {
+// If the broker does not support a newer version of 
OffsetsForLeaderEpoch, we skip validation
+completeValidation(tp);
+return false;
+}
+} else {
+return assignedState(tp).maybeValidatePosition(leaderAndEpoch);

Review comment:
   I wonder, do we really need this call here? If the leader is not present 
the epoch shouldn't be present either -- right? If that's the case, then the 
call to maybeValidatePosition will short circuit
   
   ```java
   private boolean maybeValidatePosition(Metadata.LeaderAndEpoch 
currentLeaderAndEpoch) {
   if (this.fetchState.equals(FetchStates.AWAIT_RESET)) {
   return false;
   }
   
   if (!currentLeaderAndEpoch.leader.isPresent() && 
!currentLeaderAndEpoch.epoch.isPresent()) {
   return false;
   }
   
   if (position != null && 
!position.currentLeader.equals(currentLeaderAndEpoch)) {
   FetchPosition newPosition = new 
FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch);
   validatePosition(newPosition);
   preferredReadReplica = null;
   }
   return this.fetchState.equals(FetchStates.AWAIT_VALIDATION);
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #8376: KAFKA-9724 Newer clients not always sending fetch request to older brokers

2020-05-22 Thread GitBox


mumrah commented on a change in pull request #8376:
URL: https://github.com/apache/kafka/pull/8376#discussion_r429325049



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##
@@ -422,8 +427,29 @@ public synchronized void position(TopicPartition tp, 
FetchPosition position) {
 assignedState(tp).position(position);
 }
 
-public synchronized boolean 
maybeValidatePositionForCurrentLeader(TopicPartition tp, 
Metadata.LeaderAndEpoch leaderAndEpoch) {
-return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+/**
+ * Enter the offset validation state if the leader for this partition is 
known to support a usable version of the
+ * OffsetsForLeaderEpoch API. If the leader node does not support the API, 
simply complete the offset validation.
+ *
+ * @param apiVersions
+ * @param tp
+ * @param leaderAndEpoch
+ * @return true if we enter the offset validation state
+ */
+public synchronized boolean 
maybeValidatePositionForCurrentLeader(ApiVersions apiVersions, TopicPartition 
tp,
+  
Metadata.LeaderAndEpoch leaderAndEpoch) {
+if (leaderAndEpoch.leader.isPresent()) {
+NodeApiVersions nodeApiVersions = 
apiVersions.get(leaderAndEpoch.leader.get().idString());
+if (nodeApiVersions == null || 
hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
+return assignedState(tp).maybeValidatePosition(leaderAndEpoch);
+} else {
+// If the broker does not support a newer version of 
OffsetsForLeaderEpoch, we skip validation
+completeValidation(tp);
+return false;
+}
+} else {
+return assignedState(tp).maybeValidatePosition(leaderAndEpoch);

Review comment:
   Oh, actually looking at the javadoc for LeaderAndEpoch, I see
   
   > It is also possible that we know of the leader epoch, but not the leader 
when it is derived from an external source (e.g. a committed offset).
   
   Also in Metadata, we do return a LeaderAndEpoch with the last-seen epoch, 
but no leader if the metadata is stale. So, I guess it makes sense to keep this 
call in maybeValidatePositionForCurrentLeader
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound

2020-05-22 Thread GitBox


hachikuji commented on a change in pull request #8702:
URL: https://github.com/apache/kafka/pull/8702#discussion_r429325325



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -565,8 +566,8 @@ private void recordRebalanceFailure() {
 
 // Note that we override the request timeout using the rebalance 
timeout since that is the
 // maximum time that it may block on the coordinator. We add an extra 
5 seconds for small delays.
-
-int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, 
rebalanceConfig.rebalanceTimeoutMs + 5000);
+int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(),

Review comment:
   I added the request timeout to the send message in `NetworkClient`. Also 
made some tweaks for more consistent logging.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound

2020-05-22 Thread GitBox


hachikuji commented on a change in pull request #8702:
URL: https://github.com/apache/kafka/pull/8702#discussion_r429327443



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -502,14 +502,8 @@ private void doSend(ClientRequest clientRequest, boolean 
isInternalRequest, long
 String destination = clientRequest.destination();
 RequestHeader header = clientRequest.makeHeader(request.version());
 if (log.isDebugEnabled()) {
-int latestClientVersion = clientRequest.apiKey().latestVersion();
-if (header.apiVersion() == latestClientVersion) {
-log.trace("Sending {} {} with correlation id {} to node {}", 
clientRequest.apiKey(), request,

Review comment:
   Given that we have to work with all client versions, it's just as common 
for the client not to match the broker version, so it's not really useful for 
the behavior to be different when they do match.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound

2020-05-22 Thread GitBox


hachikuji commented on a change in pull request #8702:
URL: https://github.com/apache/kafka/pull/8702#discussion_r429327946



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -839,20 +833,22 @@ private void handleCompletedReceives(List 
responses, long now) {
 InFlightRequest req = inFlightRequests.completeNext(source);
 Struct responseStruct = 
parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
 throttleTimeSensor, now);
-if (log.isTraceEnabled()) {

Review comment:
   Letting some requests be debug level, but making the responses is trace 
often means we are sometimes left with only half of the picture.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound

2020-05-22 Thread GitBox


hachikuji commented on a change in pull request #8702:
URL: https://github.com/apache/kafka/pull/8702#discussion_r429328416



##
File path: 
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
##
@@ -312,8 +313,27 @@ public void testJoinGroupRequestTimeout() {
 mockTime.sleep(REQUEST_TIMEOUT_MS + 1);
 assertFalse(consumerClient.poll(future, mockTime.timer(0)));
 
-mockTime.sleep(REBALANCE_TIMEOUT_MS - REQUEST_TIMEOUT_MS + 5000);
+mockTime.sleep(REBALANCE_TIMEOUT_MS - REQUEST_TIMEOUT_MS + 
AbstractCoordinator.JOIN_GROUP_TIMEOUT_LAPSE);
 assertTrue(consumerClient.poll(future, mockTime.timer(0)));
+assertTrue(future.exception() instanceof DisconnectException);
+}
+
+@Test
+public void 
testJoinGroupRequestTimeoutLowerBoundedByDefaultRequestTimeout() {
+int rebalanceTimeoutMs = REQUEST_TIMEOUT_MS - 1;
+setupCoordinator(RETRY_BACKOFF_MS, rebalanceTimeoutMs, 
Optional.empty());
+mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
+coordinator.ensureCoordinatorReady(mockTime.timer(0));
+
+RequestFuture future = coordinator.sendJoinGroupRequest();
+
+long expectedRequestDeadline = mockTime.milliseconds() + 
REQUEST_TIMEOUT_MS;
+mockTime.sleep(rebalanceTimeoutMs + 
AbstractCoordinator.JOIN_GROUP_TIMEOUT_LAPSE + 1);
+assertFalse(consumerClient.poll(future, mockTime.timer(0)));
+
+mockTime.sleep(expectedRequestDeadline - mockTime.milliseconds() + 1);

Review comment:
   We need to take into account the time that has already passed. I was a 
little annoyed at having to write `REQUEST_TIMEOUT - rebalanceTimeoutMs - 
AbstractCoordinator.JOIN_GROUP_TIMEOUT_LAPSE`. A bit annoying either way I 
guess.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound

2020-05-22 Thread GitBox


hachikuji commented on a change in pull request #8702:
URL: https://github.com/apache/kafka/pull/8702#discussion_r429327946



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -839,20 +833,22 @@ private void handleCompletedReceives(List 
responses, long now) {
 InFlightRequest req = inFlightRequests.completeNext(source);
 Struct responseStruct = 
parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
 throttleTimeSensor, now);
-if (log.isTraceEnabled()) {

Review comment:
   Letting some requests be debug level, but making the responses be trace 
often means we are sometimes left with only half of the picture.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound

2020-05-22 Thread GitBox


hachikuji commented on a change in pull request #8702:
URL: https://github.com/apache/kafka/pull/8702#discussion_r429327946



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -839,20 +833,22 @@ private void handleCompletedReceives(List 
responses, long now) {
 InFlightRequest req = inFlightRequests.completeNext(source);
 Struct responseStruct = 
parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
 throttleTimeSensor, now);
-if (log.isTraceEnabled()) {

Review comment:
   Letting some requests be debug level, but making the responses be trace 
often means we are left with only half of the picture.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


abbccdda commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r42955



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -379,6 +379,22 @@ private static MetadataResponse 
prepareMetadataResponse(Cluster cluster, Errors
 MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
 }
 
+private static DescribeGroupsResponseData 
prepareDescribeGroupsResponseData(String groupId, List groupInstances,
+   
 List topicPartitions) {
+final ByteBuffer memberAssignment = 
ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(topicPartitions));
+byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
+List describedGroupMembers = 
groupInstances.stream().map(groupInstance -> 
DescribeGroupsResponse.groupMember("0", groupInstance, "clientId0", 
"clientHost", memberAssignmentBytes, null)).collect(Collectors.toList());

Review comment:
   nit: we could set `"0"` to `JoinGroupRequest.UNKNOWN_MEMBER_ID` if we 
don't want to test it out. Having all members use the same member.id is a bit 
weird.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##
@@ -51,9 +52,21 @@
 if (throwable != null) {
 result.completeExceptionally(throwable);
 } else {
-for (MemberToRemove memberToRemove : memberInfos) {
-if (maybeCompleteExceptionally(memberErrors, 
memberToRemove.toMemberIdentity(), result)) {
-return;
+if (removeAll()) {
+for (Map.Entry entry: 
memberErrors.entrySet()) {
+Exception exception = entry.getValue().exception();
+if (exception != null) {
+Throwable ex = new KafkaException("Encounter 
exception when trying to remove: "

Review comment:
   Let's put the exception in the cause so that we could verify the cause 
in `KafkaAdminClientTest`, as:
   ```
   if (exception != null) {
 result.completeExceptionally(new KafkaException(
"Encounter exception when trying to remove: " + entry.getKey(), exception));
 return;
   }
   ```

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##
@@ -37,7 +38,15 @@ public 
RemoveMembersFromConsumerGroupOptions(Collection members)
 this.members = new HashSet<>(members);
 }
 
+public RemoveMembersFromConsumerGroupOptions() {
+this.members = Collections.emptySet();;

Review comment:
   nit: extra semi-colon

##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -119,7 +122,9 @@
 + "* This tool will not clean up the local state on the stream 
application instances (the persisted "
 + "stores used to cache aggregation results).\n"
 + "You need to call KafkaStreams#cleanUp() in your application or 
manually delete them from the "
-+ "directory specified by \"state.dir\" configuration 
(/tmp/kafka-streams/ by default).\n\n"
++ "directory specified by \"state.dir\" configuration 
(/tmp/kafka-streams/ by default).\n"
++ "*Please use the \"--force\" option to force remove active 
members in case long session "

Review comment:
   nit: space after `*`. Also I feel we could make the context more 
concrete by:
   ```
   When long session timeout has been configured, active members could take 
longer to get expired on the broker thus blocking the reset job to complete. 
Use the \"--force\" option could remove those left-over members immediately. 
Make sure to stop all stream applications when this option is specified to 
avoid unexpected disruptions.
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -261,6 +261,43 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
 Assert.assertEquals(1, exitCode);
 }
 
+public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() 
throws Exception {
+appID = testId + "-with-force-option";
+streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
STREAMS_CONSUMER_TIMEOUT * 100);

Review comment:
   I see, this is indeed weird, please file a JIRA so that we could clean 
in a follow-up PR if others feel the same way.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For q

[GitHub] [kafka] vvcephei commented on pull request #8713: KAFKA-6145: Add unit tests for assignments of only stateless tasks

2020-05-22 Thread GitBox


vvcephei commented on pull request #8713:
URL: https://github.com/apache/kafka/pull/8713#issuecomment-632787649


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness

2020-05-22 Thread GitBox


vvcephei commented on a change in pull request #8696:
URL: https://github.com/apache/kafka/pull/8696#discussion_r429346085



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -57,14 +59,42 @@ public boolean assign(final Map clients,
 configs.numStandbyReplicas
 );
 
-final boolean probingRebalanceNeeded = assignTaskMovements(
-tasksToCaughtUpClients(statefulTasks, clientStates, 
configs.acceptableRecoveryLag),
+final AtomicInteger remainingWarmupReplicas = new 
AtomicInteger(configs.maxWarmupReplicas);
+
+final Map> tasksToCaughtUpClients = 
tasksToCaughtUpClients(
+statefulTasks,
+clientStates,
+configs.acceptableRecoveryLag
+);
+
+// We temporarily need to know which standby tasks were intended as 
warmups
+// for active tasks, so that we don't move them (again) when we plan 
standby
+// task movements. We can then immediately treat warmups exactly the 
same as
+// hot-standby replicas, so we just track it right here as metadata, 
rather
+// than add "warmup" assignments to ClientState, for example.
+final Map> warmups = new TreeMap<>();
+
+final int neededActiveTaskMovements = assignActiveTaskMovements(
+tasksToCaughtUpClients,
 clientStates,
-configs.maxWarmupReplicas
+warmups,
+remainingWarmupReplicas
+);
+
+final int neededStandbyTaskMovements = assignStandbyTaskMovements(
+tasksToCaughtUpClients,
+clientStates,
+remainingWarmupReplicas,
+warmups
 );
 
 assignStatelessActiveTasks(clientStates, diff(TreeSet::new, 
allTaskIds, statefulTasks));
 
+// We shouldn't plan a probing rebalance if we _needed_ task 
movements, but couldn't do any
+// due to being configured for no warmups.

Review comment:
   Yeah, I've just realized this, too. And upon second consideration, I 
don't think the warmup=0 really provides a good mechanism for what I was 
thinking of. I think we'd better leave it as "at least one". Thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness

2020-05-22 Thread GitBox


vvcephei commented on a change in pull request #8696:
URL: https://github.com/apache/kafka/pull/8696#discussion_r429346417



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##
@@ -132,22 +132,97 @@ static boolean assignTaskMovements(final Map> tasksToCau
 return movementsNeeded;
 }
 
+static int assignStandbyTaskMovements(final Map> 
tasksToCaughtUpClients,
+  final Map 
clientStates,
+  final AtomicInteger 
remainingWarmupReplicas,
+  final Map> 
warmups) {
+final BiFunction caughtUpPredicate =
+(client, task) -> 
taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, 
tasksToCaughtUpClients);
+
+final ConstrainedPrioritySet caughtUpClientsByTaskLoad = new 
ConstrainedPrioritySet(
+caughtUpPredicate,
+client -> clientStates.get(client).assignedTaskLoad()
+);
+
+final Queue taskMovements = new PriorityQueue<>(
+
Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task)
+);
+
+for (final Map.Entry clientStateEntry : 
clientStates.entrySet()) {
+final UUID destination = clientStateEntry.getKey();
+final ClientState state = clientStateEntry.getValue();
+for (final TaskId task : state.standbyTasks()) {
+if (warmups.getOrDefault(destination, 
Collections.emptySet()).contains(task)) {
+// this is a warmup, so we won't move it.
+} else if 
(!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, destination, 
tasksToCaughtUpClients)) {

Review comment:
   Haha, sure :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness

2020-05-22 Thread GitBox


vvcephei commented on pull request #8696:
URL: https://github.com/apache/kafka/pull/8696#issuecomment-632791703


   Hey @cadonna , thanks for the feedback! I think I'll go ahead and merge this 
because I've already fixed the "max warmup" thing in a separate bugfix PR, 
which builds on this one.
   
   I'll apply your DeMorgan's law suggestion also in the follow-up.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness

2020-05-22 Thread GitBox


vvcephei merged pull request #8696:
URL: https://github.com/apache/kafka/pull/8696


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei opened a new pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

2020-05-22 Thread GitBox


vvcephei opened a new pull request #8716:
URL: https://github.com/apache/kafka/pull/8716


   Also fixes a system test by configuring the HATA to perform a one-shot 
balanced assignment
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bdbyrne opened a new pull request #8717: KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config

2020-05-22 Thread GitBox


bdbyrne opened a new pull request #8717:
URL: https://github.com/apache/kafka/pull/8717


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bdbyrne commented on pull request #8715: KAFKA-10033: AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existing t

2020-05-22 Thread GitBox


bdbyrne commented on pull request #8715:
URL: https://github.com/apache/kafka/pull/8715#issuecomment-632799086


   Hi @gnkoshelev - it's assumed the topic has been validated by the time it 
attempts to alter its config via AdminZKClient. I've posted the proper fix at 
https://github.com/apache/kafka/pull/8717. Thanks for finding and reporting!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bdbyrne commented on pull request #8717: KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config

2020-05-22 Thread GitBox


bdbyrne commented on pull request #8717:
URL: https://github.com/apache/kafka/pull/8717#issuecomment-632799858


   @cmccabe this is ready for review.
   
   The test location doesn't quite fit as it's in the module test dynamic 
config changes on the broker, but it's "near" the old error. Any ideas on where 
it might be better situated?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound

2020-05-22 Thread GitBox


guozhangwang commented on a change in pull request #8702:
URL: https://github.com/apache/kafka/pull/8702#discussion_r429352451



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -502,14 +502,8 @@ private void doSend(ClientRequest clientRequest, boolean 
isInternalRequest, long
 String destination = clientRequest.destination();
 RequestHeader header = clientRequest.makeHeader(request.version());
 if (log.isDebugEnabled()) {
-int latestClientVersion = clientRequest.apiKey().latestVersion();
-if (header.apiVersion() == latestClientVersion) {
-log.trace("Sending {} {} with correlation id {} to node {}", 
clientRequest.apiKey(), request,

Review comment:
   SG.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

2020-05-22 Thread GitBox


vvcephei commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r429352759



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##
@@ -334,6 +334,7 @@ public String toString() {
 ") prevStandbyTasks: (" + prevStandbyTasks +
 ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() +
 ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() +
+") taskLagTotals: (" + taskLagTotals.entrySet() +

Review comment:
   I found this useful while debugging the system test.

##
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final 
Object maxInFlightReque
 consumerProps.put(REPLICATION_FACTOR_CONFIG, 
getInt(REPLICATION_FACTOR_CONFIG));
 consumerProps.put(APPLICATION_SERVER_CONFIG, 
getString(APPLICATION_SERVER_CONFIG));
 consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, 
getInt(NUM_STANDBY_REPLICAS_CONFIG));
+consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, 
getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, 
getInt(MAX_WARMUP_REPLICAS_CONFIG));
+consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, 
getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
   This is where we forgot to copy over the configs.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##
@@ -359,6 +359,10 @@ private AssignmentConfigs(final StreamsConfig configs) {
   final Integer maxWarmupReplicas,
   final Integer numStandbyReplicas,
   final Long probingRebalanceIntervalMs) {
+if (maxWarmupReplicas < 1) {
+throw new IllegalArgumentException("must configure at least 
one warmup replica");
+}
+

Review comment:
   I added this constraint to mirror the constraint we already apply in 
StreamConfig. It's not critical, but I was disappointed that I had written a 
bunch of tests that included a technically invalid configuration.
   
   I'll write a test for this...

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -90,15 +91,12 @@ public boolean assign(final Map clients,
 
 assignStatelessActiveTasks(clientStates, diff(TreeSet::new, 
allTaskIds, statefulTasks));
 
-// We shouldn't plan a probing rebalance if we _needed_ task 
movements, but couldn't do any
-// due to being configured for no warmups.
-final boolean probingRebalanceNeeded =
-configs.maxWarmupReplicas > 0 && neededActiveTaskMovements + 
neededStandbyTaskMovements > 0;
+final boolean probingRebalanceNeeded = neededActiveTaskMovements + 
neededStandbyTaskMovements > 0;
 
 log.info("Decided on assignment: " +
  clientStates +
- " with " +
- (probingRebalanceNeeded ? "" : "no") +
+ " with" +
+ (probingRebalanceNeeded ? "" : " no") +

Review comment:
   Fixing a double-space we were printing when there was a followup. (It 
would say `with  followup`)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -90,15 +91,12 @@ public boolean assign(final Map clients,
 
 assignStatelessActiveTasks(clientStates, diff(TreeSet::new, 
allTaskIds, statefulTasks));
 
-// We shouldn't plan a probing rebalance if we _needed_ task 
movements, but couldn't do any
-// due to being configured for no warmups.
-final boolean probingRebalanceNeeded =
-configs.maxWarmupReplicas > 0 && neededActiveTaskMovements + 
neededStandbyTaskMovements > 0;
+final boolean probingRebalanceNeeded = neededActiveTaskMovements + 
neededStandbyTaskMovements > 0;

Review comment:
   Since we can't have zero warmups, we don't need this condition (that I 
added in https://github.com/apache/kafka/pull/8696)

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##
@@ -241,16 +239,29 @@ private static void assignStatelessActiveTasks(final 
TreeMap
 final Map> taskToCaughtUpClients = new 
HashMap<>();
 
 for (final TaskId task : statefulTasks) {
-
+final TreeSet caughtUpClients = new TreeSet<>();

Review comment:
   A short-lived, empty TreeSet costs practically nothing, and I found the 
other logic (with null meaning empty) a bit confusing during debugging.

##
File path: 
streams/src/main/java/org

[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-22 Thread GitBox


xvrl commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429366836



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics 
context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+/**
+ * Client or Service's metadata map.
+ */
+private Map metadata = new HashMap<>();
+
+/**
+ * Create a MetricsContext with namespace, no service or client properties
+ * @param namespace value for _namespace key
+ */
+public KafkaMetricsContext(String namespace) {
+this(namespace, new HashMap<>());
+}
+
+/**
+ * Create a MetricsContext with namespace, service or client properties
+ * @param namespace value for _namespace key
+ * @param metadata  metadata additional entries to add to the context.
+ *  values will be converted to string using 
Object.toString()
+ */
+public KafkaMetricsContext(String namespace, Map metadata) {
+this.metadata.put(MetricsContext.NAMESPACE, namespace);
+metadata.forEach((key, value) -> this.metadata.put(key, 
value.toString()));

Review comment:
   I think we it's ok for the component that owns the reporter to take 
precedence over the labels passed from upstream. We did not specify the 
behavior in the KIP, so implementations should use namespacing of labels to 
avoid this. If in practice we find this behavior is less desirable, we can file 
a follow-on KIP, since the interface is till evolving.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wj1918 closed pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

2020-05-22 Thread GitBox


wj1918 closed pull request #8612:
URL: https://github.com/apache/kafka/pull/8612


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wj1918 commented on pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

2020-05-22 Thread GitBox


wj1918 commented on pull request #8612:
URL: https://github.com/apache/kafka/pull/8612#issuecomment-632819461


   @kkonstantine thanks for the comment. understood, will close this PR and 
create separate PRs when ready.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rnpridgeon commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter

2020-05-22 Thread GitBox


rnpridgeon commented on a change in pull request #8691:
URL: https://github.com/apache/kafka/pull/8691#discussion_r429370889



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java
##
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.metrics;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A implementation of MetricsContext, it encapsulates required metrics 
context properties for Kafka services and clients
+ */
+public class KafkaMetricsContext implements MetricsContext {
+/**
+ * Client or Service's metadata map.
+ */
+private Map metadata = new HashMap<>();
+
+/**
+ * Create a MetricsContext with namespace, no service or client properties
+ * @param namespace value for _namespace key
+ */
+public KafkaMetricsContext(String namespace) {
+this(namespace, new HashMap<>());
+}
+
+/**
+ * Create a MetricsContext with namespace, service or client properties
+ * @param namespace value for _namespace key
+ * @param metadata  metadata additional entries to add to the context.
+ *  values will be converted to string using 
Object.toString()
+ */
+public KafkaMetricsContext(String namespace, Map metadata) {
+this.metadata.put(MetricsContext.NAMESPACE, namespace);
+metadata.forEach((key, value) -> this.metadata.put(key, 
value.toString()));

Review comment:
   Mostly I'm concerned about the case where some composite may share 
similar labels to the underlying client it manages. If we allow the downstream 
client to overwrite such a label we will lose a portion of the upstream 
components context. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect

2020-05-22 Thread GitBox


kkonstantine commented on pull request #8612:
URL: https://github.com/apache/kafka/pull/8612#issuecomment-632820983


   Thanks @wj1918 !
   Happy to review after they are ready. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8663: KAFKA-9985: Sink connector may exhaust broker when writing in DLQ

2020-05-22 Thread GitBox


rhauch commented on pull request #8663:
URL: https://github.com/apache/kafka/pull/8663#issuecomment-632824180


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch removed a comment on pull request #8630: KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation

2020-05-22 Thread GitBox


rhauch removed a comment on pull request #8630:
URL: https://github.com/apache/kafka/pull/8630#issuecomment-632824360


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8630: KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation

2020-05-22 Thread GitBox


rhauch commented on pull request #8630:
URL: https://github.com/apache/kafka/pull/8630#issuecomment-632824360


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8630: KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation

2020-05-22 Thread GitBox


rhauch commented on pull request #8630:
URL: https://github.com/apache/kafka/pull/8630#issuecomment-632824461


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB

2020-05-22 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-10005:
-

Assignee: (was: Guozhang Wang)

> Decouple RestoreListener from RestoreCallback and not enable bulk loading for 
> RocksDB
> -
>
> Key: KAFKA-10005
> URL: https://issues.apache.org/jira/browse/KAFKA-10005
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> In Kafka Streams we have two restoration callbacks:
> * RestoreCallback (BatchingRestoreCallback): specified per-store via 
> registration to specify the logic of applying a batch of records read from 
> the changelog to the store. Used for both updating standby tasks and 
> restoring active tasks.
> * RestoreListener: specified per-instance via `setRestoreListener`, to 
> specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`.
> As we can see these two callbacks are for quite different purposes, however 
> today we allow user's to register a per-store RestoreCallback which is also 
> implementing the RestoreListener. Such weird mixing is actually motivated by 
> Streams internal usage to enable / disable bulk loading inside RocksDB. For 
> user's however this is less meaningful to specify a callback to be a listener 
> since the `onRestoreStart / End` has the storeName passed in, so that users 
> can just define different listening logic if needed for different stores.
> On the other hand, this mixing of two callbacks enforces Streams to check 
> internally if the passed in per-store callback is also implementing listener, 
> and if yes trigger their calls, which increases the complexity. Besides, 
> toggle rocksDB for bulk loading requires us to open / close / reopen / 
> reclose 4 times during the restoration which could also be costly.
> Given that we have KIP-441 in place, I think we should consider different 
> ways other than toggle bulk loading during restoration for Streams (e.g. 
> using different threads for restoration).
> The proposal for this ticket is to completely decouple the listener from 
> callback -- i.e. we would not presume users passing in a callback function 
> that implements both RestoreCallback and RestoreListener, and also for 
> RocksDB we replace the bulk loading mechanism with other ways of 
> optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2020-05-22 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17114291#comment-17114291
 ] 

Guozhang Wang commented on KAFKA-9168:
--

Just add a reference that this feature is only available in 6.8.1+

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Sagar Rao
>Priority: Blocker
>  Labels: perfomance
> Fix For: 3.0.0
>
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2020-05-22 Thread Ryanne Dolan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17114309#comment-17114309
 ] 

Ryanne Dolan commented on KAFKA-7500:
-

[~qq619618919] I usually recommend using a load balancers and health checks for 
this purpose, but you can also get away with just two VIPs (dc1-kafka, 
dc2-kafka) and KIP-302.

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Affects Versions: 2.4.0
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Major
>  Labels: pull-request-available, ready-to-commit
> Fix For: 2.4.0
>
> Attachments: Active-Active XDCR setup.png
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch opened a new pull request #8718: [WIP] MINOR: Improve failure message in Kafka cluster used in Connect integration tests

2020-05-22 Thread GitBox


rhauch opened a new pull request #8718:
URL: https://github.com/apache/kafka/pull/8718


   DO NOT MERGE
   
   Debugging build failures
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: [WIP] KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)

2020-05-22 Thread GitBox


ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429396524



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -137,6 +138,7 @@ public StreamTask(final TaskId id,
 }
 processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, 
taskId, streamsMetrics);
 processLatencySensor = TaskMetrics.processLatencySensor(threadId, 
taskId, streamsMetrics);
+recordE2ELatencySensor = TaskMetrics.recordE2ELatencySensor(threadId, 
taskId, streamsMetrics);

Review comment:
   I modified the proposal slightly to make these all processor-node level 
(will push the changes in a minute) but this question is still relevant, so 
here's the answer:
   We can't record the e2e latency in the sink node because not all topologies 
_have_ a sink node. For that reason we also can't record at the record 
collector. We need to figure out the terminal nodes when processing the 
topology, then record this metric after `child.process` in 
`ProcessorContext#forward`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch opened a new pull request #8719: [WIP] MINOR: Improve failure message in Kafka cluster used in Connect integration tests (pre Gradle memory change)

2020-05-22 Thread GitBox


rhauch opened a new pull request #8719:
URL: https://github.com/apache/kafka/pull/8719


   DO NOT MERGE
   
   Debugging build failures
   
   See #8718 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >