[GitHub] [kafka] ning2008wisc commented on pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0
ning2008wisc commented on pull request #7577: URL: https://github.com/apache/kafka/pull/7577#issuecomment-632527159 @ryannedolan @mimaison I added the Integration tests for testing this automated consumer offset sync in MM 2.0. When available, I am appreciated for your first pass of review. Thanks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on a change in pull request #8699: KAFKA-9673: Filter and Conditional SMTs
tombentley commented on a change in pull request #8699: URL: https://github.com/apache/kafka/pull/8699#discussion_r429081838 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java ## @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.predicates.Predicate; + +/** + * Decorator for a {@link Transformation} which applies the delegate only when a + * {@link Predicate} is true (or false, according to {@code negate}). + * @param + */ +class PredicatedTransformation> implements Transformation { + +/*test*/ final Predicate predicate; +/*test*/ final Transformation delegate; +/*test*/ final boolean negate; + +PredicatedTransformation(Predicate predicate, boolean negate, Transformation delegate) { +this.predicate = predicate; +this.negate = negate; +this.delegate = delegate; +} + +@Override +public void configure(Map configs) { + +} + +@Override +public R apply(R record) { +if (negate ^ predicate.test(record)) { +return delegate.apply(record); +} +return record; +} + +@Override +public ConfigDef config() { +return null; Review comment: This is related to the issue discussed about `configure()`. We could return an empty `ConfigDef` here, but that would be a lie which could ultimately lead some other error if someone tried to use it with `configure()`. We can't invent a `ConfigDef` schema for this because the `PredicatedTransformer` would need to know about the `Transformer` is was going to be wrapping, but it can't know that before it's been configured with at least the `Transformer`'s `ConfigDef` and it can't be configured before `config()` has been called. So we have a chicken and egg problem. Something (`ConnectorConfig`) must have some _a priori_ knowledge of either `PredicatedTransformer`'s `ConfigDef`, or know how to configure it without needing to call `config()` at all. Since `PredicatedTransformer` is a purely internal class which will never be directly exposed to Connect users, we're not obliged to stick to the contract of `config()` and `configure()`. i.e. So both `PredicatedTransformer.config` and `PredicatedTransformer.configure` can throw when called, since we know no one else can call them and we know `ConnectorConfig` never will. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429082038 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -261,6 +261,43 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception Assert.assertEquals(1, exitCode); } +public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception { +appID = testId + "-with-force-option"; +streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); +streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT * 100); Review comment: Without the `"" +` to convert the value to String, we will get exception like: it is because `STREAMS_CONSUMER_TIMEOUT = 2000L`, `""+` is widely used in this test, just follow it here without any change to not enlarge the scope of this PR, I can help to create a Jira to enhance it if we think this workaround is not quite intuitive~ ``` org.apache.kafka.common.config.ConfigException: Invalid value 20 for configuration session.timeout.ms: Expected value to be a 32-bit integer, but it was a java.lang.Long at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:672) at org.apache.kafka.common.config.ConfigDef.parseValue(ConfigDef.java:474) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:467) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:108) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:129) at org.apache.kafka.clients.consumer.ConsumerConfig.(ConsumerConfig.java:606) at org.apache.kafka.clients.consumer.KafkaConsumer.(KafkaConsumer.java:630) at org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getRestoreConsumer(DefaultKafkaClientSupplier.java:56) at org.apache.kafka.streams.processor.internals.StreamThread.create(StreamThread.java:313) at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:766) at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:652) at org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:562) at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(AbstractResetIntegrationTest.java:270) at org.apache.kafka.streams.integration.ResetIntegrationTest.testResetWhenLongSessionTimeoutConfiguredWithForceOption(ResetIntegrationTest.java:77) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429082487 ## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ## @@ -1155,12 +1192,15 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(deleteResult.deletedGroups().containsKey(testGroupId)) assertNull(deleteResult.deletedGroups().get(testGroupId).get()) -} finally { - consumerThread.interrupt() - consumerThread.join() -} } finally { -Utils.closeQuietly(consumer, "consumer") +consumerThreads.foreach { + case consumerThread => +consumerThread.interrupt() +consumerThread.join() +} + } + }finally { Review comment: Thanks, but I wonder what does the **template** refer to here? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429083674 ## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ## @@ -1017,47 +1017,70 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertTrue(0 == list1.errors().get().size()) assertTrue(0 == list1.valid().get().size()) val testTopicName = "test_topic" + val testTopicName1 = testTopicName + "1" + val testTopicName2 = testTopicName + "2" val testNumPartitions = 2 - client.createTopics(Collections.singleton( -new NewTopic(testTopicName, testNumPartitions, 1.toShort))).all().get() - waitForTopics(client, List(testTopicName), List()) + + client.createTopics(util.Arrays.asList(new NewTopic(testTopicName, testNumPartitions, 1.toShort), +new NewTopic(testTopicName1, testNumPartitions, 1.toShort), +new NewTopic(testTopicName2, testNumPartitions, 1.toShort) + )).all().get() + waitForTopics(client, List(testTopicName, testTopicName1, testTopicName2), List()) val producer = createProducer() try { producer.send(new ProducerRecord(testTopicName, 0, null, null)).get() } finally { Utils.closeQuietly(producer, "producer") } + + val EMPTY_GROUP_INSTANCE_ID = "" val testGroupId = "test_group_id" val testClientId = "test_client_id" val testInstanceId = "test_instance_id" + val testInstanceId1 = testInstanceId + "1" Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429083564 ## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ## @@ -1138,7 +1165,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { val validMemberFuture = removeMembersResult.memberResult(new MemberToRemove(testInstanceId)) assertNull(validMemberFuture.get()) - // The group should contain no member now. + // The group's active members number should decrease by 1 Review comment: Removed, but curious about the reason :) ## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ## @@ -1147,6 +1174,16 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(testGroupId, testGroupDescription.groupId) assertFalse(testGroupDescription.isSimpleConsumerGroup) + assertEquals(consumerSet.size -1, testGroupDescription.members().size()) Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429083960 ## File path: core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala ## @@ -1075,13 +1098,17 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest { assertEquals(testGroupId, testGroupDescription.groupId()) assertFalse(testGroupDescription.isSimpleConsumerGroup) - assertEquals(1, testGroupDescription.members().size()) + assertEquals(groupInstanceSet.size, testGroupDescription.members().size()) val member = testGroupDescription.members().iterator().next() assertEquals(testClientId, member.clientId()) - val topicPartitions = member.assignment().topicPartitions() - assertEquals(testNumPartitions, topicPartitions.size()) - assertEquals(testNumPartitions, topicPartitions.asScala. -count(tp => tp.topic().equals(testTopicName))) + val members = testGroupDescription.members() + assertEquals(testClientId, members.asScala.head.clientId()) Review comment: Thanks, all members' clientId are checked now This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429084559 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -2411,6 +2411,50 @@ public void testRemoveMembersFromGroup() throws Exception { assertNull(noErrorResult.all().get()); assertNull(noErrorResult.memberResult(memberOne).get()); assertNull(noErrorResult.memberResult(memberTwo).get()); + +// Return with success for "removeAll" scenario Review comment: Good catch! Added the test for partial failure This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429084219 ## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ## @@ -186,9 +190,15 @@ private void validateNoActiveConsumers(final String groupId, final List members = new ArrayList<>(describeResult.describedGroups().get(groupId).get().members()); if (!members.isEmpty()) { -throw new IllegalStateException("Consumer group '" + groupId + "' is still active " -+ "and has following members: " + members + ". " -+ "Make sure to stop all running application instances before running the reset tool."); +if (options.has(forceOption)) { +System.out.println("Force deleting all active members in the group: " + groupId); +adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all(); Review comment: Agreed, fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429084786 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java ## @@ -37,7 +37,15 @@ public RemoveMembersFromConsumerGroupOptions(Collection members) this.members = new HashSet<>(members); } +public RemoveMembersFromConsumerGroupOptions() { +this.members = new HashSet<>(); Review comment: Make sense, fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429087051 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin KafkaFutureImpl> future = new KafkaFutureImpl<>(); ConsumerGroupOperationContext, RemoveMembersFromConsumerGroupOptions> context = -new ConsumerGroupOperationContext<>(groupId, options, deadline, future); +new ConsumerGroupOperationContext<>(groupId, options, deadline, future); +List members; +if (options.removeAll()) { +members = getMembersFromGroup(groupId); +} else { +members = options.members().stream().map( + MemberToRemove::toMemberIdentity).collect(Collectors.toList()); +} Call findCoordinatorCall = getFindCoordinatorCall(context, -() -> getRemoveMembersFromGroupCall(context)); +() -> getRemoveMembersFromGroupCall(context, members)); runnable.call(findCoordinatorCall, startFindCoordinatorMs); return new RemoveMembersFromConsumerGroupResult(future, options.members()); } -private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext, RemoveMembersFromConsumerGroupOptions> context) { +private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext, +RemoveMembersFromConsumerGroupOptions> context, List allMembers) { Review comment: Yeah, fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429086933 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java ## @@ -46,26 +46,42 @@ * If not, the first member error shall be returned. */ public KafkaFuture all() { -final KafkaFutureImpl result = new KafkaFutureImpl<>(); -this.future.whenComplete((memberErrors, throwable) -> { -if (throwable != null) { -result.completeExceptionally(throwable); -} else { -for (MemberToRemove memberToRemove : memberInfos) { -if (maybeCompleteExceptionally(memberErrors, memberToRemove.toMemberIdentity(), result)) { -return; +if (removeAll()) { Review comment: Yeah, just as you surmised, but you are right, we should scan the removal results as well. Slightly updated, followed the convention of `non-removeAll` scenario, just return with the first exception This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429087492 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3612,6 +3611,26 @@ private boolean dependsOnSpecificNode(ConfigResource resource) { || resource.type() == ConfigResource.Type.BROKER_LOGGER; } +private List getMembersFromGroup(String groupId) { +Collection members; +try { +members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); +} catch (Throwable ex) { Review comment: Make sense, fixed~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429087282 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3612,6 +3611,26 @@ private boolean dependsOnSpecificNode(ConfigResource resource) { || resource.type() == ConfigResource.Type.BROKER_LOGGER; } +private List getMembersFromGroup(String groupId) { +Collection members; +try { +members = describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members(); +} catch (Throwable ex) { +throw new KafkaException("Encounter exception when trying to get members from group: " + groupId, ex); +} + +List memberToRemove = new ArrayList<>(); +for (final MemberDescription member : members) { +if (member.groupInstanceId().isPresent()) { +memberToRemove.add(new MemberIdentity().setGroupInstanceId(member.groupInstanceId().get()) +); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429087133 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin KafkaFutureImpl> future = new KafkaFutureImpl<>(); ConsumerGroupOperationContext, RemoveMembersFromConsumerGroupOptions> context = -new ConsumerGroupOperationContext<>(groupId, options, deadline, future); +new ConsumerGroupOperationContext<>(groupId, options, deadline, future); +List members; +if (options.removeAll()) { +members = getMembersFromGroup(groupId); +} else { +members = options.members().stream().map( + MemberToRemove::toMemberIdentity).collect(Collectors.toList()); +} Call findCoordinatorCall = getFindCoordinatorCall(context, -() -> getRemoveMembersFromGroupCall(context)); +() -> getRemoveMembersFromGroupCall(context, members)); runnable.call(findCoordinatorCall, startFindCoordinatorMs); return new RemoveMembersFromConsumerGroupResult(future, options.members()); } -private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext, RemoveMembersFromConsumerGroupOptions> context) { +private Call getRemoveMembersFromGroupCall(ConsumerGroupOperationContext, +RemoveMembersFromConsumerGroupOptions> context, List allMembers) { return new Call("leaveGroup", context.deadline(), new ConstantNodeIdProvider(context.node().get().id())) { @Override LeaveGroupRequest.Builder createRequest(int timeoutMs) { -return new LeaveGroupRequest.Builder(context.groupId(), - context.options().members().stream().map( - MemberToRemove::toMemberIdentity).collect(Collectors.toList())); +return new LeaveGroupRequest.Builder(context.groupId(), Review comment: Updated This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429087216 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroup(Strin KafkaFutureImpl> future = new KafkaFutureImpl<>(); ConsumerGroupOperationContext, RemoveMembersFromConsumerGroupOptions> context = -new ConsumerGroupOperationContext<>(groupId, options, deadline, future); +new ConsumerGroupOperationContext<>(groupId, options, deadline, future); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jiameixie opened a new pull request #8711: KAFKA-9904:Use ThreadLocalConcurrent to Replace Random
jiameixie opened a new pull request #8711: URL: https://github.com/apache/kafka/pull/8711 When applicable, use of ThreadLocalRandom rather than shared Random objects in concurrent programs will typically encounter much less overhead and contention. Change-Id: Idf56adc8cbb4c611e327e639a49d90827a23d947 Signed-off-by: Jiamei Xie *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #8699: KAFKA-9673: Filter and Conditional SMTs
tombentley commented on pull request #8699: URL: https://github.com/apache/kafka/pull/8699#issuecomment-632551475 @C0urante thanks for the review, some excellent points there! I think an integration test is a great idea, which I'll work on next. I've addressed all your other comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #8711: KAFKA-9904:Use ThreadLocalConcurrent to Replace Random
chia7712 commented on pull request #8711: URL: https://github.com/apache/kafka/pull/8711#issuecomment-632552331 Is this PR different from #8531? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #8705: KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException
chia7712 commented on a change in pull request #8705: URL: https://github.com/apache/kafka/pull/8705#discussion_r429100323 ## File path: core/src/test/scala/unit/kafka/network/SocketServerTest.scala ## @@ -1742,6 +1746,12 @@ class SocketServerTest { selector = Some(testableSelector) testableSelector } + +override private[network] def processException(errorMessage: String, throwable: Throwable, isUncaught: Boolean): Unit = { Review comment: ```isUncaught``` is used by testing only so it is a bit awkward to production code. Could you check the ```errorMessage``` instead of adding new argument? for example: ```scala if (errorMessage == "Processor got uncaught exception.") uncaughtExceptions += 1 ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jiameixie commented on pull request #8711: KAFKA-9904:Use ThreadLocalConcurrent to Replace Random
jiameixie commented on pull request #8711: URL: https://github.com/apache/kafka/pull/8711#issuecomment-632558263 > Is this PR different from #8531? @chia7712 It's the same issue. #8531 has wrong spell for KAFKA. And I didn't see any PR about this issue in https://issues.apache.org/jira/browse/KAFKA-9904. So I thought there was no PR. I'll close this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jiameixie closed pull request #8711: KAFKA-9904:Use ThreadLocalConcurrent to Replace Random
jiameixie closed pull request #8711: URL: https://github.com/apache/kafka/pull/8711 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10006) Streams should not attempt to create internal topics that may exist
[ https://issues.apache.org/jira/browse/KAFKA-10006?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Luke Chen reassigned KAFKA-10006: - Assignee: Luke Chen > Streams should not attempt to create internal topics that may exist > --- > > Key: KAFKA-10006 > URL: https://issues.apache.org/jira/browse/KAFKA-10006 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sophie Blee-Goldman >Assignee: Luke Chen >Priority: Major > > During assignment, Streams will attempt to validate all internal topics and > their number of partitions, and create them if necessary. However we have > seen that the InternalTopicManager will occasionally try to recreate internal > topics that already exist when the broker is unavailable, and the > describeTopics request fails. > This is because we catch both UnknownTopicOrPartitionException AND > LeaderNotAvailableException and treat them the same, ie we assume the topic > does not exist. We shouldn't try to create topics until we've validated that > they don't actually exist. If we can't connect to the brokers to check > whether the topic exists, there's no reason to believe we would be able to > create the topic anyway. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mimaison commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state
mimaison commented on a change in pull request #8238: URL: https://github.com/apache/kafka/pull/8238#discussion_r429131002 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java ## @@ -26,4 +31,34 @@ */ @InterfaceStability.Evolving public class ListConsumerGroupsOptions extends AbstractOptions { + +private Optional> states = Optional.empty(); + +/** + * Only groups in these states will be returned by listConsumerGroups() + * If not set, all groups are returned without their states + * throw IllegalArgumentException if states is empty + */ +public ListConsumerGroupsOptions inStates(Set states) { +if (states == null || states.isEmpty()) { +throw new IllegalArgumentException("states should not be null or empty"); +} +this.states = Optional.of(states); +return this; +} + +/** + * All groups with their states will be returned by listConsumerGroups() + */ +public ListConsumerGroupsOptions inAnyState() { +this.states = Optional.of(EnumSet.allOf(ConsumerGroupState.class)); Review comment: That's a good point so I agree, it makes sense to return all states when `null` (or an empty list) is used. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state
mimaison commented on a change in pull request #8238: URL: https://github.com/apache/kafka/pull/8238#discussion_r429133529 ## File path: clients/src/main/resources/common/message/ListGroupsRequest.json ## @@ -20,8 +20,14 @@ // Version 1 and 2 are the same as version 0. // // Version 3 is the first flexible version. - "validVersions": "0-3", + // + // Version 4 adds the States flexible field (KIP-518). + "validVersions": "0-4", "flexibleVersions": "3+", "fields": [ +{ "name": "States", "type": "[]string", "versions": "4+", "tag": 0, "taggedVersions": "4+", Review comment: Yes the version bump is necessary to detect if this field was supported. As we're bumping the version and as you said in https://github.com/apache/kafka/pull/8238#discussion_r426869755 the overhead of the extra field on this API is not a concern, it's probably simpler to use a regular field. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429133442 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java ## @@ -46,26 +46,42 @@ * If not, the first member error shall be returned. */ public KafkaFuture all() { -final KafkaFutureImpl result = new KafkaFutureImpl<>(); -this.future.whenComplete((memberErrors, throwable) -> { -if (throwable != null) { -result.completeExceptionally(throwable); -} else { -for (MemberToRemove memberToRemove : memberInfos) { -if (maybeCompleteExceptionally(memberErrors, memberToRemove.toMemberIdentity(), result)) { -return; +if (removeAll()) { +final KafkaFutureImpl result = new KafkaFutureImpl<>(); +this.future.whenComplete((memberErrors, throwable) -> { +if (throwable != null) { +result.completeExceptionally(throwable); +} else { +System.out.println("Remove all active members succeeded, removed " + memberErrors.size() + " members: " + memberErrors.keySet()); Review comment: Fixed This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state
mimaison commented on a change in pull request #8238: URL: https://github.com/apache/kafka/pull/8238#discussion_r429136058 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java ## @@ -26,4 +31,34 @@ */ @InterfaceStability.Evolving public class ListConsumerGroupsOptions extends AbstractOptions { + +private Optional> states = Optional.empty(); + +/** + * Only groups in these states will be returned by listConsumerGroups() + * If not set, all groups are returned without their states + * throw IllegalArgumentException if states is empty + */ +public ListConsumerGroupsOptions inStates(Set states) { +if (states == null || states.isEmpty()) { +throw new IllegalArgumentException("states should not be null or empty"); +} +this.states = Optional.of(states); +return this; +} + +/** + * All groups with their states will be returned by listConsumerGroups() + */ +public ListConsumerGroupsOptions inAnyState() { +this.states = Optional.of(EnumSet.allOf(ConsumerGroupState.class)); Review comment: This can also be argued for the state value in the response. Currently `ConsumerGroupDescription` stores the state as `ConsumerGroupState` so states the client isn't aware of are mapped to `UNKNOWN` so I'm doing the same in `ConsumerGroupListing`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8238: KAFKA-9130: KIP-518 Allow listing consumer groups per state
mimaison commented on pull request #8238: URL: https://github.com/apache/kafka/pull/8238#issuecomment-632591659 Thanks @hachikuji for the feedback. You brought some interesting points that overall simplify the KIP/logic a bit. While initially, it seemed a perfect use case for tagged fields, the need to bump the version to check for compatibility made it a bit confusing. I've pushed an update following your suggestions to use a regular field and default to see all groups. I think it's easier to reason about and keeps the API simple. Let me know what you think. I'll update the KIP accordingly if we think that's the best option. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kowshik commented on a change in pull request #8680: KAFKA-10027: Implement read path for feature versioning system (KIP-584)
kowshik commented on a change in pull request #8680: URL: https://github.com/apache/kafka/pull/8680#discussion_r428573407 ## File path: clients/src/main/java/org/apache/kafka/common/feature/BaseVersionRange.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Represents an immutable basic version range using 2 attributes: min and max of type long. + * The min and max attributes are expected to be >= 1, and with max >= min. + * + * The class also provides API to serialize/deserialize the version range to/from a map. + * The class allows for configurable labels for the min/max attributes, which can be specialized by + * sub-classes (if needed). + */ +class BaseVersionRange { Review comment: It is thoroughly tested in it's child class test suite: `SupportedVersionRangeTest`. Personally I feel it is good enough this way, because, anyway to test this class we need to inherit into a sub-class (since constructor is `protected`). And by testing via `SupportedVersionRangeTest`, we achieve exactly the same. I have now added top-level documentation in the test suite of `SupportedVersionRangeTest`, explaining the above. ## File path: clients/src/main/java/org/apache/kafka/common/feature/Features.java ## @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.feature; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.Objects; + +import static java.util.stream.Collectors.joining; + +/** + * Represents an immutable dictionary with key being feature name, and value being VersionRangeType. + * Also provides API to serialize/deserialize the features and their version ranges to/from a map. + * + * This class can be instantiated only using its factory functions, with the important ones being: + * Features.supportedFeatures(...) and Features.finalizedFeatures(...). + * + * @param is the type of version range. + * @see SupportedVersionRange + * @see FinalizedVersionRange + */ +public class Features { +private final Map features; + +/** + * Constructor is made private, as for readability it is preferred the caller uses one of the + * static factory functions for instantiation (see below). + * + * @param features Map of feature name to type of VersionRange, as the backing data structure + * for the Features object. + */ +private Features(Map features) { +this.features = features; +} + +/** + * @param features Map of feature name to VersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "supported" features. + */ +public static Features supportedFeatures(Map features) { +return new Features<>(features); +} + +/** + * @param features Map of feature name to FinalizedVersionRange, as the backing data structure + * for the Features object. + * @return Returns a new Features object representing "finalized" features. + */ +public static Features finalizedFeatures(Map features) { +return new Features<>(features); +} + +// Visible for testi
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429145185 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -3660,7 +3686,7 @@ void handleResponse(AbstractResponse abstractResponse) { // We set member.id to empty here explicitly, so that the lookup will succeed as user doesn't // know the exact member.id. memberErrors.put(new MemberIdentity() - .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) + .setMemberId(memberResponse.memberId()) Review comment: For removing static members, this still true because we put memberId as `""` in the request, and the server will also response with the same request field. (Verified `GroupCoordinator#handleLeaveGroup`) For removing dynamic members, we need this change to know the memberId for the caller. I suppose the `individual check` here is just to check the response against the members to be removed(for `removeAll` scenario)? Previously I thought of putting all members got from `KafkaAdminClient#getMembersFromGroup` in the RemoveMembersFromConsumerGroupResult for checking, but in `removeAll` scenario, we get members as `MemberIdentity` which cannot be converted back to `MemberToRemove`, so I'm hesitate to do in this way This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10031) Exist client create wrong topic automatically for a total-fresh restarted Kafka cluster
jiamei xie created KAFKA-10031: -- Summary: Exist client create wrong topic automatically for a total-fresh restarted Kafka cluster Key: KAFKA-10031 URL: https://issues.apache.org/jira/browse/KAFKA-10031 Project: Kafka Issue Type: Bug Components: clients Reporter: jiamei xie Assignee: jiamei xie Kill all zookeeper and kafka process. Clear zookeeper and kafka data dir. Restart zookeeper and kafka. If there are any active client. Topic used by client will be auto-created. How to reproduce? 1. Start zookeeper and kafka zookeeper and kafka config file. nohup bin/zookeeper-server-start.sh config/zookeeper.properties & nohup bin/kafka-server-start.sh config/server.properties & 2. Create topic test with 2 partitions bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 2 --replication-factor 1 3. Produce some data to topic test bin/kafka-producer-perf-test.sh --topic test --num-records 5000 --record-size 100 --throughput=-1 --producer-props bootstrap.servers=localhost:9092 4. Kill zookeeper and kafka. ProducerPerformance is still running. jps 21072 QuorumPeerMain 21704 ProducerPerformance 21230 Kafka 21854 Jps kill -9 21072 21230 5. Remove Zookeeper and Kafka data rm -rf /tmp/zookeeper/ rm -rf /tmp/kafka-logs/ 6. Start zookeeper and kafka nohup bin/zookeeper-server-start.sh config/zookeeper.properties & nohup bin/kafka-server-start.sh config/server.properties & 7. Check topic and you’ll see there is topic named test with partition 1. And the ProducerPerformance process continues to run normally. bin/kafka-topics.sh --describe --zookeeper localhost:2181 Topic: test PartitionCount: 1 ReplicationFactor: 1Configs: Topic: test Partition: 0Leader: 0 Replicas: 0 Isr: 0 Some output of ProducerPerformance process. 1995632 records sent, 399126.4 records/sec (38.06 MB/sec), 378.6 ms avg latency, 435.0 ms max latency. org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for test-1:12 ms has passed since batch creation ….. org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for test-1:121774 ms has passed since batch creation 1711254 records sent, 342250.8 records/sec (32.64 MB/sec), 2324.5 ms avg latency, 123473.0 ms max latency. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10031) Exist client automatically create wrong topic for a total-fresh restarted Kafka cluster
[ https://issues.apache.org/jira/browse/KAFKA-10031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jiamei xie updated KAFKA-10031: --- Summary: Exist client automatically create wrong topic for a total-fresh restarted Kafka cluster (was: Exist client create wrong topic automatically for a total-fresh restarted Kafka cluster) > Exist client automatically create wrong topic for a total-fresh restarted > Kafka cluster > > > Key: KAFKA-10031 > URL: https://issues.apache.org/jira/browse/KAFKA-10031 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > Kill all zookeeper and kafka process. Clear zookeeper and kafka data dir. > Restart zookeeper and kafka. If there are any active client. Topic used by > client will be auto-created. > How to reproduce? > 1.Start zookeeper and kafka zookeeper and kafka config file. > nohup bin/zookeeper-server-start.sh config/zookeeper.properties & > nohup bin/kafka-server-start.sh config/server.properties & > 2.Create topic test with 2 partitions > bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test > --partitions 2 --replication-factor 1 > 3.Produce some data to topic test > bin/kafka-producer-perf-test.sh --topic test --num-records 5000 > --record-size 100 --throughput=-1 --producer-props > bootstrap.servers=localhost:9092 > 4.Kill zookeeper and kafka. ProducerPerformance is still running. > jps > 21072 QuorumPeerMain > 21704 ProducerPerformance > 21230 Kafka > 21854 Jps > kill -9 21072 21230 > 5.Remove Zookeeper and Kafka data > rm -rf /tmp/zookeeper/ > rm -rf /tmp/kafka-logs/ > 6.Start zookeeper and kafka > nohup bin/zookeeper-server-start.sh config/zookeeper.properties & > nohup bin/kafka-server-start.sh config/server.properties & > 7.Check topic and you’ll see there is topic named test with partition 1. > And the ProducerPerformance process continues to run normally. > bin/kafka-topics.sh --describe --zookeeper localhost:2181 > Topic: test PartitionCount: 1 ReplicationFactor: 1Configs: > Topic: test Partition: 0Leader: 0 Replicas: 0 Isr: 0 > Some output of ProducerPerformance process. > 1995632 records sent, 399126.4 records/sec (38.06 MB/sec), 378.6 ms avg > latency, 435.0 ms max latency. > org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for > test-1:12 ms has passed since batch creation > ….. > org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for > test-1:121774 ms has passed since batch creation > 1711254 records sent, 342250.8 records/sec (32.64 MB/sec), 2324.5 ms avg > latency, 123473.0 ms max latency. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10031) Exist client create wrong topic for a total-fresh restarted Kafka cluster automatically
[ https://issues.apache.org/jira/browse/KAFKA-10031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jiamei xie updated KAFKA-10031: --- Summary: Exist client create wrong topic for a total-fresh restarted Kafka cluster automatically (was: Exist client automatically create wrong topic for a total-fresh restarted Kafka cluster) > Exist client create wrong topic for a total-fresh restarted Kafka cluster > automatically > > > Key: KAFKA-10031 > URL: https://issues.apache.org/jira/browse/KAFKA-10031 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > Kill all zookeeper and kafka process. Clear zookeeper and kafka data dir. > Restart zookeeper and kafka. If there are any active client. Topic used by > client will be auto-created. > How to reproduce? > 1.Start zookeeper and kafka zookeeper and kafka config file. > nohup bin/zookeeper-server-start.sh config/zookeeper.properties & > nohup bin/kafka-server-start.sh config/server.properties & > 2.Create topic test with 2 partitions > bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test > --partitions 2 --replication-factor 1 > 3.Produce some data to topic test > bin/kafka-producer-perf-test.sh --topic test --num-records 5000 > --record-size 100 --throughput=-1 --producer-props > bootstrap.servers=localhost:9092 > 4.Kill zookeeper and kafka. ProducerPerformance is still running. > jps > 21072 QuorumPeerMain > 21704 ProducerPerformance > 21230 Kafka > 21854 Jps > kill -9 21072 21230 > 5.Remove Zookeeper and Kafka data > rm -rf /tmp/zookeeper/ > rm -rf /tmp/kafka-logs/ > 6.Start zookeeper and kafka > nohup bin/zookeeper-server-start.sh config/zookeeper.properties & > nohup bin/kafka-server-start.sh config/server.properties & > 7.Check topic and you’ll see there is topic named test with partition 1. > And the ProducerPerformance process continues to run normally. > bin/kafka-topics.sh --describe --zookeeper localhost:2181 > Topic: test PartitionCount: 1 ReplicationFactor: 1Configs: > Topic: test Partition: 0Leader: 0 Replicas: 0 Isr: 0 > Some output of ProducerPerformance process. > 1995632 records sent, 399126.4 records/sec (38.06 MB/sec), 378.6 ms avg > latency, 435.0 ms max latency. > org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for > test-1:12 ms has passed since batch creation > ….. > org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for > test-1:121774 ms has passed since batch creation > 1711254 records sent, 342250.8 records/sec (32.64 MB/sec), 2324.5 ms avg > latency, 123473.0 ms max latency. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10031) Exist client automatically create wrong topic for a total-fresh restarted Kafka cluster
[ https://issues.apache.org/jira/browse/KAFKA-10031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] jiamei xie updated KAFKA-10031: --- Summary: Exist client automatically create wrong topic for a total-fresh restarted Kafka cluster (was: Exist client create wrong topic for a total-fresh restarted Kafka cluster automatically) > Exist client automatically create wrong topic for a total-fresh restarted > Kafka cluster > - > > Key: KAFKA-10031 > URL: https://issues.apache.org/jira/browse/KAFKA-10031 > Project: Kafka > Issue Type: Bug > Components: clients >Reporter: jiamei xie >Assignee: jiamei xie >Priority: Major > > Kill all zookeeper and kafka process. Clear zookeeper and kafka data dir. > Restart zookeeper and kafka. If there are any active client. Topic used by > client will be auto-created. > How to reproduce? > 1.Start zookeeper and kafka zookeeper and kafka config file. > nohup bin/zookeeper-server-start.sh config/zookeeper.properties & > nohup bin/kafka-server-start.sh config/server.properties & > 2.Create topic test with 2 partitions > bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test > --partitions 2 --replication-factor 1 > 3.Produce some data to topic test > bin/kafka-producer-perf-test.sh --topic test --num-records 5000 > --record-size 100 --throughput=-1 --producer-props > bootstrap.servers=localhost:9092 > 4.Kill zookeeper and kafka. ProducerPerformance is still running. > jps > 21072 QuorumPeerMain > 21704 ProducerPerformance > 21230 Kafka > 21854 Jps > kill -9 21072 21230 > 5.Remove Zookeeper and Kafka data > rm -rf /tmp/zookeeper/ > rm -rf /tmp/kafka-logs/ > 6.Start zookeeper and kafka > nohup bin/zookeeper-server-start.sh config/zookeeper.properties & > nohup bin/kafka-server-start.sh config/server.properties & > 7.Check topic and you’ll see there is topic named test with partition 1. > And the ProducerPerformance process continues to run normally. > bin/kafka-topics.sh --describe --zookeeper localhost:2181 > Topic: test PartitionCount: 1 ReplicationFactor: 1Configs: > Topic: test Partition: 0Leader: 0 Replicas: 0 Isr: 0 > Some output of ProducerPerformance process. > 1995632 records sent, 399126.4 records/sec (38.06 MB/sec), 378.6 ms avg > latency, 435.0 ms max latency. > org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for > test-1:12 ms has passed since batch creation > ….. > org.apache.kafka.common.errors.TimeoutException: Expiring 148 record(s) for > test-1:121774 ms has passed since batch creation > 1711254 records sent, 342250.8 records/sec (32.64 MB/sec), 2324.5 ms avg > latency, 123473.0 ms max latency. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon opened a new pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon opened a new pull request #8712: URL: https://github.com/apache/kafka/pull/8712 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a change in pull request #8697: [WIP] KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)
cadonna commented on a change in pull request #8697: URL: https://github.com/apache/kafka/pull/8697#discussion_r429158357 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ## @@ -642,6 +642,30 @@ public static void addAvgAndMaxToSensor(final Sensor sensor, ); } +public static void addMaxAndMinToSensor(final Sensor sensor, Review comment: prop (super-nit): Could you call the method `addMinAndMaxToSensor()` since we have already one method that is called `addAvgAndMinAndMaxToSensor()`? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java ## @@ -356,4 +354,27 @@ public void shouldGetDroppedRecordsSensorOrLateRecordDropSensor() { shouldGetDroppedRecordsSensor(); } } + +@Test +public void shouldGetRecordE2ELatencySensor() { +final String operation = "record-e2e-latency"; +expect(streamsMetrics.taskLevelSensor(THREAD_ID, TASK_ID, operation, RecordingLevel.INFO)) +.andReturn(expectedSensor); +expect(streamsMetrics.taskLevelTagMap(THREAD_ID, TASK_ID)).andReturn(tagMap); +StreamsMetricsImpl.addMaxAndMinToSensor( +expectedSensor, +TASK_LEVEL_GROUP, +tagMap, +operation, +RECORD_E2E_LATENCY_MAX_DESCRIPTION, Review comment: req: Please do not use the constant here. The point of this test is also to check the correctness of the description, i.e., the content of that constant. ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetricsTest.java ## @@ -14,15 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.kstream.internals.metrics; +package org.apache.kafka.streams.processor.internals.metrics; Review comment: Yes, very nice! ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java ## @@ -86,6 +87,14 @@ private TaskMetrics() {} private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " + "from consumer and not yet processed for this active task"; +private static final String RECORD_E2E_LATENCY = "record-e2e-latency"; +static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = +"The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the " ++ "system time when it has been fully processed by the task"; +static final String RECORD_E2E_LATENCY_MIN_DESCRIPTION = Review comment: See my comment above. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java ## @@ -86,6 +87,14 @@ private TaskMetrics() {} private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " + "from consumer and not yet processed for this active task"; +private static final String RECORD_E2E_LATENCY = "record-e2e-latency"; +static final String RECORD_E2E_LATENCY_MAX_DESCRIPTION = Review comment: req: Please define this constant as private as all the others. I left a request in `TaskMetricsTest` which makes this request clearer. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -137,6 +138,7 @@ public StreamTask(final TaskId id, } processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics); processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics); +recordE2ELatencySensor = TaskMetrics.recordE2ELatencySensor(threadId, taskId, streamsMetrics); Review comment: Q: Is there a specific reason to init the sensor here and not in `SinkNode`? You can init and store it there. That was one motivation to make `*Metrics` classes (e.g. `TaskMetrics`) static, so that you do not need any code in the processor context to get specific sensors. If there is not specific reason, you could get rid of the changes in the `*Context*` classes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs
mimaison commented on pull request #8312: URL: https://github.com/apache/kafka/pull/8312#issuecomment-632632500 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9173) StreamsPartitionAssignor assigns partitions to only one worker
[ https://issues.apache.org/jira/browse/KAFKA-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17113958#comment-17113958 ] Bruno Cadonna commented on KAFKA-9173: -- The issue that all 10 tasks are assigned to the same node is fixed with KIP-441 and the following PR adds unit tests that verify it: https://github.com/apache/kafka/pull/8689 For the other issues, new tickets should be created as [~mjsax] proposed. I will close this ticket as fixed in 2.6. > StreamsPartitionAssignor assigns partitions to only one worker > -- > > Key: KAFKA-9173 > URL: https://issues.apache.org/jira/browse/KAFKA-9173 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0, 2.2.1 >Reporter: Oleg Muravskiy >Priority: Critical > Labels: user-experience > Attachments: StreamsPartitionAssignor.log > > > I'm running a distributed KafkaStreams application on 10 worker nodes, > subscribed to 21 topics with 10 partitions in each. I'm only using a > Processor interface, and a persistent state store. > However, only one worker gets assigned partitions, all other workers get > nothing. Restarting the application, or cleaning local state stores does not > help. StreamsPartitionAssignor migrates to other nodes, and eventually picks > up other node to assign partitions to, but still only one node. > It's difficult to figure out where to look for the signs of problems, I'm > attaching the log messages from the StreamsPartitionAssignor. Let me know > what else I could provide to help resolve this. > [^StreamsPartitionAssignor.log] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9173) StreamsPartitionAssignor assigns partitions to only one worker
[ https://issues.apache.org/jira/browse/KAFKA-9173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bruno Cadonna resolved KAFKA-9173. -- Fix Version/s: 2.6.0 Resolution: Fixed > StreamsPartitionAssignor assigns partitions to only one worker > -- > > Key: KAFKA-9173 > URL: https://issues.apache.org/jira/browse/KAFKA-9173 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.3.0, 2.2.1 >Reporter: Oleg Muravskiy >Priority: Critical > Labels: user-experience > Fix For: 2.6.0 > > Attachments: StreamsPartitionAssignor.log > > > I'm running a distributed KafkaStreams application on 10 worker nodes, > subscribed to 21 topics with 10 partitions in each. I'm only using a > Processor interface, and a persistent state store. > However, only one worker gets assigned partitions, all other workers get > nothing. Restarting the application, or cleaning local state stores does not > help. StreamsPartitionAssignor migrates to other nodes, and eventually picks > up other node to assign partitions to, but still only one node. > It's difficult to figure out where to look for the signs of problems, I'm > attaching the log messages from the StreamsPartitionAssignor. Let me know > what else I could provide to help resolve this. > [^StreamsPartitionAssignor.log] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness
cadonna commented on a change in pull request #8696: URL: https://github.com/apache/kafka/pull/8696#discussion_r429196241 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -132,22 +132,97 @@ static boolean assignTaskMovements(final Map> tasksToCau return movementsNeeded; } +static int assignStandbyTaskMovements(final Map> tasksToCaughtUpClients, + final Map clientStates, + final AtomicInteger remainingWarmupReplicas, + final Map> warmups) { +final BiFunction caughtUpPredicate = +(client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients); + +final ConstrainedPrioritySet caughtUpClientsByTaskLoad = new ConstrainedPrioritySet( +caughtUpPredicate, +client -> clientStates.get(client).assignedTaskLoad() +); + +final Queue taskMovements = new PriorityQueue<>( + Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task) +); + +for (final Map.Entry clientStateEntry : clientStates.entrySet()) { +final UUID destination = clientStateEntry.getKey(); +final ClientState state = clientStateEntry.getValue(); +for (final TaskId task : state.standbyTasks()) { +if (warmups.getOrDefault(destination, Collections.emptySet()).contains(task)) { +// this is a warmup, so we won't move it. +} else if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, destination, tasksToCaughtUpClients)) { Review comment: prop: Could you add a method `taskIsNotCaughtUpOnClientAndCaughtUpClientsExist()`? Applying De Morgan's law every time I read this code gives me headache. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java ## @@ -57,14 +59,42 @@ public boolean assign(final Map clients, configs.numStandbyReplicas ); -final boolean probingRebalanceNeeded = assignTaskMovements( -tasksToCaughtUpClients(statefulTasks, clientStates, configs.acceptableRecoveryLag), +final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas); + +final Map> tasksToCaughtUpClients = tasksToCaughtUpClients( +statefulTasks, +clientStates, +configs.acceptableRecoveryLag +); + +// We temporarily need to know which standby tasks were intended as warmups +// for active tasks, so that we don't move them (again) when we plan standby +// task movements. We can then immediately treat warmups exactly the same as +// hot-standby replicas, so we just track it right here as metadata, rather +// than add "warmup" assignments to ClientState, for example. +final Map> warmups = new TreeMap<>(); + +final int neededActiveTaskMovements = assignActiveTaskMovements( +tasksToCaughtUpClients, clientStates, -configs.maxWarmupReplicas +warmups, +remainingWarmupReplicas +); + +final int neededStandbyTaskMovements = assignStandbyTaskMovements( +tasksToCaughtUpClients, +clientStates, +remainingWarmupReplicas, +warmups ); assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks)); +// We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any +// due to being configured for no warmups. Review comment: To be clear, according to `StreamsConfig`, we do NOT allow `max.warmup.replicas = 0`. It must at least be 1. Or was your statement hypothetical, that it would be OK to allow it? Anyway, I am in favour of keeping the `> 0` check here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mimaison commented on a change in pull request #8312: KAFKA-9432 automated protocol for DescribeConfigs
mimaison commented on a change in pull request #8312: URL: https://github.com/apache/kafka/pull/8312#discussion_r429211490 ## File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java ## @@ -174,19 +97,25 @@ public boolean isReadOnly() { } public enum ConfigSource { -UNKNOWN_CONFIG((byte) 0), -TOPIC_CONFIG((byte) 1), -DYNAMIC_BROKER_CONFIG((byte) 2), -DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3), -STATIC_BROKER_CONFIG((byte) 4), -DEFAULT_CONFIG((byte) 5), -DYNAMIC_BROKER_LOGGER_CONFIG((byte) 6); +UNKNOWN_CONFIG((byte) 0, org.apache.kafka.clients.admin.ConfigEntry.ConfigSource.UNKNOWN), Review comment: It's unfortunate the public enum cannot do the mapping and we have to keep a copy here. Could we maybe rename this one so this looks less crazy? WDYT? ## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ## @@ -1912,53 +1915,51 @@ public DescribeConfigsResult describeConfigs(Collection configRe // The non-BROKER resources which we want to describe. These resources can be described by a // single, unified DescribeConfigs request. -final Collection unifiedRequestResources = new ArrayList<>(configResources.size()); +final DescribeConfigsRequestData unifiedRequestResources = new DescribeConfigsRequestData(); for (ConfigResource resource : configResources) { if (dependsOnSpecificNode(resource)) { brokerFutures.put(resource, new KafkaFutureImpl<>()); brokerResources.add(resource); } else { unifiedRequestFutures.put(resource, new KafkaFutureImpl<>()); -unifiedRequestResources.add(resource); +unifiedRequestResources.resources().add(new DescribeConfigsResource() +.setResourceName(resource.name()) +.setResourceType(resource.type().id())); } } final long now = time.milliseconds(); -if (!unifiedRequestResources.isEmpty()) { +if (!unifiedRequestResources.resources().isEmpty()) { runnable.call(new Call("describeConfigs", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) { @Override DescribeConfigsRequest.Builder createRequest(int timeoutMs) { -return new DescribeConfigsRequest.Builder(unifiedRequestResources) -.includeSynonyms(options.includeSynonyms()); +return new DescribeConfigsRequest.Builder(unifiedRequestResources.setIncludeSynoyms(options.includeSynonyms())); Review comment: I wonder if it would read better if `unifiedRequestResources` was instead a `List` and we were creating the `DescribeConfigsRequestData` object and calling all its setters here. WDYT? ## File path: clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java ## @@ -220,145 +153,66 @@ public ConfigSource source() { } } +public Map resultMap() { +return data().results().stream().collect(Collectors.toMap( +configsResult -> +new ConfigResource(ConfigResource.Type.forId(configsResult.resourceType()), +configsResult.resourceName()), +Function.identity())); +} -private final int throttleTimeMs; -private final Map configs; +private final DescribeConfigsResponseData data; -public DescribeConfigsResponse(int throttleTimeMs, Map configs) { -this.throttleTimeMs = throttleTimeMs; -this.configs = Objects.requireNonNull(configs, "configs"); +public DescribeConfigsResponse(DescribeConfigsResponseData data) { +this.data = data; } -public DescribeConfigsResponse(Struct struct) { -throttleTimeMs = struct.get(THROTTLE_TIME_MS); -Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME); -configs = new HashMap<>(resourcesArray.length); -for (Object resourceObj : resourcesArray) { -Struct resourceStruct = (Struct) resourceObj; - -ApiError error = new ApiError(resourceStruct); -ConfigResource.Type resourceType = ConfigResource.Type.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME)); -String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME); -ConfigResource resource = new ConfigResource(resourceType, resourceName); - -Object[] configEntriesArray = resourceStruct.getArray(CONFIG_ENTRIES_KEY_NAME); -List configEntries = new ArrayList<>(configEntriesArray.length); -for (Object configEntriesObj: configEntriesArray) { -Struct configEntriesStruct = (Struct) configEntriesObj; -Stri
[GitHub] [kafka] rnpridgeon commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
rnpridgeon commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r429216813 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java ## @@ -16,19 +16,24 @@ */ package org.apache.kafka.connect.util; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; import java.util.concurrent.ExecutionException; public final class ConnectUtils { private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class); +public static final String CONNECT_KAFKA_CLUSTER_ID = "connect.kafka.cluster.id"; +public static final String CONNECT_GROUP_ID = "connect.group.id"; Review comment: CONNECT_VERSION? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rnpridgeon commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
rnpridgeon commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r429216813 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java ## @@ -16,19 +16,24 @@ */ package org.apache.kafka.connect.util; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.InvalidRecordException; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.WorkerConfig; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; import java.util.concurrent.ExecutionException; public final class ConnectUtils { private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class); +public static final String CONNECT_KAFKA_CLUSTER_ID = "connect.kafka.cluster.id"; +public static final String CONNECT_GROUP_ID = "connect.group.id"; Review comment: Maybe it would be helpful to add CONNECT_VERSION as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #8713: KAFKA-6145: Add unit tests for assignments of only stateless tasks
cadonna commented on pull request #8713: URL: https://github.com/apache/kafka/pull/8713#issuecomment-632676433 Call for review: @ableegoldman @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #8713: KAFKA-6145: Add unit tests for assignments of only stateless tasks
cadonna opened a new pull request #8713: URL: https://github.com/apache/kafka/pull/8713 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #8714: MINOR: Improve broker registration message
ijuma opened a new pull request #8714: URL: https://github.com/apache/kafka/pull/8714 It was previously: > INFO Registered broker 0 at path /brokers/ids/0 with addresses: ArraySeq(EndPoint(localhost,9092,ListenerName(PLAINTEXT),PLAINTEXT),EndPoint(localhost,9093,ListenerName(SSL),SSL)), czxid (broker epoch): 4294967320 (kafka.zk.KafkaZkClient) It's now: > INFO Registered broker 0 at path /brokers/ids/0 with addresses: PLAINTEXT://localhost:9092,SSL://localhost:9093, czxid (broker epoch): 4294967320 (kafka.zk.KafkaZkClient) ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10032) Response to ApiVersionRequest returns wrong Produce(0) version
Antonio Pires created KAFKA-10032: - Summary: Response to ApiVersionRequest returns wrong Produce(0) version Key: KAFKA-10032 URL: https://issues.apache.org/jira/browse/KAFKA-10032 Project: Kafka Issue Type: Bug Affects Versions: 2.4.0, 0.11.0.0 Reporter: Antonio Pires The response to {{ApiVersionRequest}} returns incorrect information for the Production {{_ApiKey_}} as it is not considering {{_log.message.format.version_}} config when being overwritten. While the internals, {{_Log.append()_}}, use {{log.message.format.version}} when adding a new record set to the active segment, the version exposed to clients is not the same, which can generate unexpected behaviour. For example Using version _>0.11.0_ with {{_log.message.format.version_}} set to a previous version (0.10.2), the broker returns {{(id: 1 rack: null) -> (}} {{ Produce(0): 0 to 3 [usable: 3],}} {{ Fetch(1): 0 to 5 [usable: 5],}} {{ ...}} \{{ )}} and should instead be responding with {{(id: 1 rack: null) -> (}} {{ Produce(0): 0 to 2 [usable: 2],}} {{ Fetch(1): 0 to 5 [usable: 5],}} {{ ...}} \{{ )}} Shouldn't a single source of truth be used to get this version? In {{_Log.append()_}}, instead of directly getting the version from the config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, in turn, should take the config in consideration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10032) Response to ApiVersionRequest returns wrong Produce(0) version
[ https://issues.apache.org/jira/browse/KAFKA-10032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antonio Pires updated KAFKA-10032: -- Description: The response to {{ApiVersionRequest}} returns incorrect information for the Production {{_ApiKey_}} as it is not considering {{_log.message.format.version_}} config when being overwritten. While the internals, {{_Log.append()_}}, use {{log.message.format.version}} when adding a new record set to the active segment, the version exposed to clients is not the same, which can generate unexpected behaviour. For example Using version _>0.11.0_ with {{_log.message.format.version_}} set to a previous version (0.10.2), the broker returns {{(id: 1 rack: null) -> (}} {{ Produce(0): 0 to 3 [usable: 3],}} {{ Fetch(1): 0 to 5 [usable: 5],}} {{ ...}} ) and should instead be responding with {{(id: 1 rack: null) -> (}} {{ Produce(0): 0 to 2 [usable: 2],}} {{ Fetch(1): 0 to 5 [usable: 5],}} {{ ...}} ) Shouldn't a single source of truth be used to get this version? In {{_Log.append()_}}, instead of directly getting the version from the config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, in turn, should take the config in consideration. was: The response to {{ApiVersionRequest}} returns incorrect information for the Production {{_ApiKey_}} as it is not considering {{_log.message.format.version_}} config when being overwritten. While the internals, {{_Log.append()_}}, use {{log.message.format.version}} when adding a new record set to the active segment, the version exposed to clients is not the same, which can generate unexpected behaviour. For example Using version _>0.11.0_ with {{_log.message.format.version_}} set to a previous version (0.10.2), the broker returns {{(id: 1 rack: null) -> (}} {{ Produce(0): 0 to 3 [usable: 3],}} {{ Fetch(1): 0 to 5 [usable: 5],}} {{ ...}} \{{ )}} and should instead be responding with {{(id: 1 rack: null) -> (}} {{ Produce(0): 0 to 2 [usable: 2],}} {{ Fetch(1): 0 to 5 [usable: 5],}} {{ ...}} \{{ )}} Shouldn't a single source of truth be used to get this version? In {{_Log.append()_}}, instead of directly getting the version from the config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, in turn, should take the config in consideration. > Response to ApiVersionRequest returns wrong Produce(0) version > -- > > Key: KAFKA-10032 > URL: https://issues.apache.org/jira/browse/KAFKA-10032 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0, 2.4.0 >Reporter: Antonio Pires >Priority: Major > > The response to {{ApiVersionRequest}} returns incorrect information for the > Production {{_ApiKey_}} as it is not considering > {{_log.message.format.version_}} config when being overwritten. While the > internals, {{_Log.append()_}}, use {{log.message.format.version}} when adding > a new record set to the active segment, the version exposed to clients is not > the same, which can generate unexpected behaviour. > For example > Using version _>0.11.0_ with {{_log.message.format.version_}} set to a > previous version (0.10.2), the broker returns > {{(id: 1 rack: null) -> (}} > {{ Produce(0): 0 to 3 [usable: 3],}} > {{ Fetch(1): 0 to 5 [usable: 5],}} > {{ ...}} > ) > and should instead be responding with > {{(id: 1 rack: null) -> (}} > {{ Produce(0): 0 to 2 [usable: 2],}} > {{ Fetch(1): 0 to 5 [usable: 5],}} > {{ ...}} > ) > Shouldn't a single source of truth be used to get this version? > In {{_Log.append()_}}, instead of directly getting the version from the > config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, > in turn, should take the config in consideration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10032) Response to ApiVersionRequest returns wrong Produce(0) version
[ https://issues.apache.org/jira/browse/KAFKA-10032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antonio Pires updated KAFKA-10032: -- Description: The response to {{ApiVersionRequest}} returns incorrect information for the Production {{_ApiKey_}} as it is not considering {{_log.message.format.version_}} config when being overwritten. While the internals, {{_Log.append()_}}, use {{_log.message.format.version_}} when adding a new record set to the active segment, the version exposed to clients is not the same, which can generate unexpected behaviour. For example Using version _>0.11.0_ with {{_log.message.format.version_}} set to a previous version (0.10.2), the broker returns {{(id: 1 rack: null) -> (}} {{ Produce(0): 0 to 3 [usable: 3],}} {{ Fetch(1): 0 to 5 [usable: 5],}} {{ ...}} ) and should instead be responding with {{(id: 1 rack: null) -> (}} {{ Produce(0): 0 to 2 [usable: 2],}} {{ Fetch(1): 0 to 5 [usable: 5],}} {{ ...}} ) Shouldn't a single source of truth be used to get this version? In {{_Log.append()_}}, instead of directly getting the version from the config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, in turn, should take the config in consideration. was: The response to {{ApiVersionRequest}} returns incorrect information for the Production {{_ApiKey_}} as it is not considering {{_log.message.format.version_}} config when being overwritten. While the internals, {{_Log.append()_}}, use {{log.message.format.version}} when adding a new record set to the active segment, the version exposed to clients is not the same, which can generate unexpected behaviour. For example Using version _>0.11.0_ with {{_log.message.format.version_}} set to a previous version (0.10.2), the broker returns {{(id: 1 rack: null) -> (}} {{ Produce(0): 0 to 3 [usable: 3],}} {{ Fetch(1): 0 to 5 [usable: 5],}} {{ ...}} ) and should instead be responding with {{(id: 1 rack: null) -> (}} {{ Produce(0): 0 to 2 [usable: 2],}} {{ Fetch(1): 0 to 5 [usable: 5],}} {{ ...}} ) Shouldn't a single source of truth be used to get this version? In {{_Log.append()_}}, instead of directly getting the version from the config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, in turn, should take the config in consideration. > Response to ApiVersionRequest returns wrong Produce(0) version > -- > > Key: KAFKA-10032 > URL: https://issues.apache.org/jira/browse/KAFKA-10032 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.11.0.0, 2.4.0 >Reporter: Antonio Pires >Priority: Major > > The response to {{ApiVersionRequest}} returns incorrect information for the > Production {{_ApiKey_}} as it is not considering > {{_log.message.format.version_}} config when being overwritten. While the > internals, {{_Log.append()_}}, use {{_log.message.format.version_}} when > adding a new record set to the active segment, the version exposed to clients > is not the same, which can generate unexpected behaviour. > For example > Using version _>0.11.0_ with {{_log.message.format.version_}} set to a > previous version (0.10.2), the broker returns > {{(id: 1 rack: null) -> (}} > {{ Produce(0): 0 to 3 [usable: 3],}} > {{ Fetch(1): 0 to 5 [usable: 5],}} > {{ ...}} > ) > and should instead be responding with > {{(id: 1 rack: null) -> (}} > {{ Produce(0): 0 to 2 [usable: 2],}} > {{ Fetch(1): 0 to 5 [usable: 5],}} > {{ ...}} > ) > Shouldn't a single source of truth be used to get this version? > In {{_Log.append()_}}, instead of directly getting the version from the > config, shouldn't we use the {{_ApiVersions.maxUsableProduceMagic()_}} that, > in turn, should take the config in consideration. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10033) AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existing topic
Gregory Koshelev created KAFKA-10033: Summary: AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existing topic Key: KAFKA-10033 URL: https://issues.apache.org/jira/browse/KAFKA-10033 Project: Kafka Issue Type: Improvement Components: admin Affects Versions: 2.4.0 Reporter: Gregory Koshelev Currently, altering configs of non-existing topic leads to {{UnknownServerException}}: {code} java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnknownServerException: Topic "kgn_test" does not exist. at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at ru.kontur.vostok.hercules.stream.manager.kafka.KafkaManager.changeTtl(KafkaManager.java:130) ... 10 common frames omitted Caused by: org.apache.kafka.common.errors.UnknownServerException: Topic "kgn_test" does not exist. {code} The output above is produced due to {{AdminZkClient.validateTopicConfig}} method: {code} def validateTopicConfig(topic: String, configs: Properties): Unit = { Topic.validate(topic) if (!zkClient.topicExists(topic)) throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic)) // remove the topic overrides LogConfig.validate(configs) } {code} {{UnknownServerException}} is common exception but in this case cause is pretty clear. So this can be fixed easily by using {{UnknownTopicOrPartitionException}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r429276664 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -157,16 +161,16 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams } -if (!topicsNotReady.isEmpty()) { -log.info("Topics {} can not be made ready with {} retries left", topicsNotReady, retries); +if (isNeedRetry(topicsNotReady)) { +log.info("Topics {} can not be made ready with {} retries left", topicsNotReady, remainingRetries); Review comment: the log should put `remainingRetries` since it's logging `with {} retries left` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429276600 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java ## @@ -51,9 +52,21 @@ if (throwable != null) { result.completeExceptionally(throwable); } else { -for (MemberToRemove memberToRemove : memberInfos) { -if (maybeCompleteExceptionally(memberErrors, memberToRemove.toMemberIdentity(), result)) { -return; +if (removeAll()) { +for (Map.Entry entry: memberErrors.entrySet()) { +Exception exception = entry.getValue().exception(); +if (exception != null) { +Throwable ex = new KafkaException("Encounter exception when trying to remove: " Review comment: Wrap to let the failed member info available for caller like `StreamsResetter` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on pull request #8589: URL: https://github.com/apache/kafka/pull/8589#issuecomment-632722266 @abbccdda Updated and req for review agian.. really appreciated your help to pick out so much style violations, also wondering if we can use some format tool like `scalafmt` to automatic format~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gnkoshelev opened a new pull request #8715: KAFKA-10033: AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existin
gnkoshelev opened a new pull request #8715: URL: https://github.com/apache/kafka/pull/8715 Fixes KAFKA-10033. Replace AdminOperationException with UnknownTopicOrPartitionException if topic does not exists when validating altering configs of topic in AdminZkClient. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] feyman2016 commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
feyman2016 commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r429276600 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java ## @@ -51,9 +52,21 @@ if (throwable != null) { result.completeExceptionally(throwable); } else { -for (MemberToRemove memberToRemove : memberInfos) { -if (maybeCompleteExceptionally(memberErrors, memberToRemove.toMemberIdentity(), result)) { -return; +if (removeAll()) { +for (Map.Entry entry: memberErrors.entrySet()) { +Exception exception = entry.getValue().exception(); +if (exception != null) { +Throwable ex = new KafkaException("Encounter exception when trying to remove: " Review comment: Wrap to let the failed member info available for caller like `StreamsResetter`. Only capture the first found member error like in the non `removeAll` scenario. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8714: MINOR: Improve broker registration and Log logging
ijuma commented on a change in pull request #8714: URL: https://github.com/apache/kafka/pull/8714#discussion_r429285942 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1694,9 +1694,10 @@ class Log(@volatile private var _dir: File, private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = { lock synchronized { val deletable = deletableSegments(predicate) - if (deletable.nonEmpty) + if (deletable.nonEmpty) { info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason") - deleteSegments(deletable) +deleteSegments(deletable) + } else 0 Review comment: This doesn't change behavior, but makes the code a bit less weird. Previously, we would special case the log, but rely on the called method to check again if `deletable` is empty. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r429288449 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java ## @@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult describeTopics(Collection topic future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found.")); topicDescriptions.put(requestedTopic, future); } +// try to simulate the leader not available situation when topic name is "LeaderNotAvailableTopic" +if (requestedTopic.equals("LeaderNotAvailableTopic")) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new LeaderNotAvailableException("The leader of Topic " + requestedTopic + " is not available.")); Review comment: Try to simulate the `LeaderNotAvailableException` in the `MockAdminClient`, by setting the topic name to `LeaderNotAvailableTopic`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r429288449 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java ## @@ -330,6 +331,12 @@ synchronized public DescribeTopicsResult describeTopics(Collection topic future.completeExceptionally(new UnknownTopicOrPartitionException("Topic " + requestedTopic + " not found.")); topicDescriptions.put(requestedTopic, future); } +// try to simulate the leader not available situation when topic name is "LeaderNotAvailableTopic" +if (requestedTopic.equals("LeaderNotAvailableTopic")) { +KafkaFutureImpl future = new KafkaFutureImpl<>(); +future.completeExceptionally(new LeaderNotAvailableException("The leader of Topic " + requestedTopic + " is not available.")); Review comment: Try to simulate the `LeaderNotAvailableException` in the `MockAdminClient`, if the topic name is`LeaderNotAvailableTopic`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on a change in pull request #8712: URL: https://github.com/apache/kafka/pull/8712#discussion_r429289380 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ## @@ -247,11 +261,19 @@ public InternalTopicManager(final Admin adminClient, final StreamsConfig streams log.error(errorMsg); throw new StreamsException(errorMsg); } -} else { +} else if (!needRetryTopics.contains(topicName)) { topicsToCreate.add(topicName); } } return topicsToCreate; } + +private boolean isNeedRetry(final Set topicsNotReady) { +return !topicsNotReady.isEmpty() || hasNeedRetryTopic(); Review comment: If there's topic with `LeaderNotAvailableException`, we also need to retry. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #8712: KAFKA-10006: Don't create internal topics when LeaderNotAvailableException
showuon commented on pull request #8712: URL: https://github.com/apache/kafka/pull/8712#issuecomment-632729141 Hi @ableegoldman , could you review this PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8697: [WIP] KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)
mjsax commented on pull request #8697: URL: https://github.com/apache/kafka/pull/8697#issuecomment-632758838 Build failed with checkstyle: ``` [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13/clients/src/main/java/org/apache/kafka/common/metrics/stats/Percentiles.java:23:8: Unused import - org.apache.kafka.common.metrics.Measurable. [UnusedImports] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax commented on pull request #8679: URL: https://github.com/apache/kafka/pull/8679#issuecomment-632759415 Java 8 and Java 11 passed. Java 14: ``` org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true] ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated
mjsax merged pull request #8679: URL: https://github.com/apache/kafka/pull/8679 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10003) Deprecate KStream#through in favor of KStream#repartition
[ https://issues.apache.org/jira/browse/KAFKA-10003?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-10003. - Fix Version/s: 2.6.0 Resolution: Fixed > Deprecate KStream#through in favor of KStream#repartition > - > > Key: KAFKA-10003 > URL: https://issues.apache.org/jira/browse/KAFKA-10003 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Levani Kokhreidze >Assignee: Matthias J. Sax >Priority: Major > Labels: kip > Fix For: 2.6.0 > > > After introducing `KStream#repartition` in KIP-221 > ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint)|https://cwiki.apache.org/confluence/display/KAFKA/KIP-221%3A+Enhance+DSL+with+Connecting+Topic+Creation+and+Repartition+Hint], > it makes sense to deprecate `KStream#through` in favor of new operator (see > voting thread for more context: > [https://www.mail-archive.com/dev@kafka.apache.org/msg107645.html)|https://www.mail-archive.com/dev@kafka.apache.org/msg107645.html] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10033) AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existing topic
[ https://issues.apache.org/jira/browse/KAFKA-10033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Brian Byrne reassigned KAFKA-10033: --- Assignee: Brian Byrne > AdminClient should throw UnknownTopicOrPartitionException instead of > UnknownServerException if altering configs of non-existing topic > - > > Key: KAFKA-10033 > URL: https://issues.apache.org/jira/browse/KAFKA-10033 > Project: Kafka > Issue Type: Improvement > Components: admin >Affects Versions: 2.4.0 >Reporter: Gregory Koshelev >Assignee: Brian Byrne >Priority: Major > > Currently, altering configs of non-existing topic leads to > {{UnknownServerException}}: > {code} > java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.UnknownServerException: Topic "kgn_test" does > not exist. > at > org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) > at > org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) > at > org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) > at > org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) > at > ru.kontur.vostok.hercules.stream.manager.kafka.KafkaManager.changeTtl(KafkaManager.java:130) > ... 10 common frames omitted > Caused by: org.apache.kafka.common.errors.UnknownServerException: Topic > "kgn_test" does not exist. > {code} > The output above is produced due to {{AdminZkClient.validateTopicConfig}} > method: > {code} > def validateTopicConfig(topic: String, configs: Properties): Unit = { > Topic.validate(topic) > if (!zkClient.topicExists(topic)) > throw new AdminOperationException("Topic \"%s\" does not > exist.".format(topic)) > // remove the topic overrides > LogConfig.validate(configs) > } > {code} > {{UnknownServerException}} is common exception but in this case cause is > pretty clear. So this can be fixed easily by using > {{UnknownTopicOrPartitionException}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on a change in pull request #8376: KAFKA-9724 Newer clients not always sending fetch request to older brokers
mumrah commented on a change in pull request #8376: URL: https://github.com/apache/kafka/pull/8376#discussion_r429323395 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ## @@ -422,8 +427,29 @@ public synchronized void position(TopicPartition tp, FetchPosition position) { assignedState(tp).position(position); } -public synchronized boolean maybeValidatePositionForCurrentLeader(TopicPartition tp, Metadata.LeaderAndEpoch leaderAndEpoch) { -return assignedState(tp).maybeValidatePosition(leaderAndEpoch); +/** + * Enter the offset validation state if the leader for this partition is known to support a usable version of the + * OffsetsForLeaderEpoch API. If the leader node does not support the API, simply complete the offset validation. + * + * @param apiVersions + * @param tp + * @param leaderAndEpoch + * @return true if we enter the offset validation state + */ +public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions apiVersions, TopicPartition tp, + Metadata.LeaderAndEpoch leaderAndEpoch) { +if (leaderAndEpoch.leader.isPresent()) { +NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString()); +if (nodeApiVersions == null || hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { +return assignedState(tp).maybeValidatePosition(leaderAndEpoch); +} else { +// If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation +completeValidation(tp); +return false; +} +} else { +return assignedState(tp).maybeValidatePosition(leaderAndEpoch); Review comment: I wonder, do we really need this call here? If the leader is not present the epoch shouldn't be present either -- right? If that's the case, then the call to maybeValidatePosition will short circuit ```java private boolean maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEpoch) { if (this.fetchState.equals(FetchStates.AWAIT_RESET)) { return false; } if (!currentLeaderAndEpoch.leader.isPresent() && !currentLeaderAndEpoch.epoch.isPresent()) { return false; } if (position != null && !position.currentLeader.equals(currentLeaderAndEpoch)) { FetchPosition newPosition = new FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch); validatePosition(newPosition); preferredReadReplica = null; } return this.fetchState.equals(FetchStates.AWAIT_VALIDATION); } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #8376: KAFKA-9724 Newer clients not always sending fetch request to older brokers
mumrah commented on a change in pull request #8376: URL: https://github.com/apache/kafka/pull/8376#discussion_r429325049 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ## @@ -422,8 +427,29 @@ public synchronized void position(TopicPartition tp, FetchPosition position) { assignedState(tp).position(position); } -public synchronized boolean maybeValidatePositionForCurrentLeader(TopicPartition tp, Metadata.LeaderAndEpoch leaderAndEpoch) { -return assignedState(tp).maybeValidatePosition(leaderAndEpoch); +/** + * Enter the offset validation state if the leader for this partition is known to support a usable version of the + * OffsetsForLeaderEpoch API. If the leader node does not support the API, simply complete the offset validation. + * + * @param apiVersions + * @param tp + * @param leaderAndEpoch + * @return true if we enter the offset validation state + */ +public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions apiVersions, TopicPartition tp, + Metadata.LeaderAndEpoch leaderAndEpoch) { +if (leaderAndEpoch.leader.isPresent()) { +NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString()); +if (nodeApiVersions == null || hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { +return assignedState(tp).maybeValidatePosition(leaderAndEpoch); +} else { +// If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation +completeValidation(tp); +return false; +} +} else { +return assignedState(tp).maybeValidatePosition(leaderAndEpoch); Review comment: Oh, actually looking at the javadoc for LeaderAndEpoch, I see > It is also possible that we know of the leader epoch, but not the leader when it is derived from an external source (e.g. a committed offset). Also in Metadata, we do return a LeaderAndEpoch with the last-seen epoch, but no leader if the metadata is stale. So, I guess it makes sense to keep this call in maybeValidatePositionForCurrentLeader This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound
hachikuji commented on a change in pull request #8702: URL: https://github.com/apache/kafka/pull/8702#discussion_r429325325 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -565,8 +566,8 @@ private void recordRebalanceFailure() { // Note that we override the request timeout using the rebalance timeout since that is the // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays. - -int joinGroupTimeoutMs = Math.max(rebalanceConfig.rebalanceTimeoutMs, rebalanceConfig.rebalanceTimeoutMs + 5000); +int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(), Review comment: I added the request timeout to the send message in `NetworkClient`. Also made some tweaks for more consistent logging. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound
hachikuji commented on a change in pull request #8702: URL: https://github.com/apache/kafka/pull/8702#discussion_r429327443 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -502,14 +502,8 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long String destination = clientRequest.destination(); RequestHeader header = clientRequest.makeHeader(request.version()); if (log.isDebugEnabled()) { -int latestClientVersion = clientRequest.apiKey().latestVersion(); -if (header.apiVersion() == latestClientVersion) { -log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request, Review comment: Given that we have to work with all client versions, it's just as common for the client not to match the broker version, so it's not really useful for the behavior to be different when they do match. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound
hachikuji commented on a change in pull request #8702: URL: https://github.com/apache/kafka/pull/8702#discussion_r429327946 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -839,20 +833,22 @@ private void handleCompletedReceives(List responses, long now) { InFlightRequest req = inFlightRequests.completeNext(source); Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); -if (log.isTraceEnabled()) { Review comment: Letting some requests be debug level, but making the responses is trace often means we are sometimes left with only half of the picture. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound
hachikuji commented on a change in pull request #8702: URL: https://github.com/apache/kafka/pull/8702#discussion_r429328416 ## File path: clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ## @@ -312,8 +313,27 @@ public void testJoinGroupRequestTimeout() { mockTime.sleep(REQUEST_TIMEOUT_MS + 1); assertFalse(consumerClient.poll(future, mockTime.timer(0))); -mockTime.sleep(REBALANCE_TIMEOUT_MS - REQUEST_TIMEOUT_MS + 5000); +mockTime.sleep(REBALANCE_TIMEOUT_MS - REQUEST_TIMEOUT_MS + AbstractCoordinator.JOIN_GROUP_TIMEOUT_LAPSE); assertTrue(consumerClient.poll(future, mockTime.timer(0))); +assertTrue(future.exception() instanceof DisconnectException); +} + +@Test +public void testJoinGroupRequestTimeoutLowerBoundedByDefaultRequestTimeout() { +int rebalanceTimeoutMs = REQUEST_TIMEOUT_MS - 1; +setupCoordinator(RETRY_BACKOFF_MS, rebalanceTimeoutMs, Optional.empty()); +mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); +coordinator.ensureCoordinatorReady(mockTime.timer(0)); + +RequestFuture future = coordinator.sendJoinGroupRequest(); + +long expectedRequestDeadline = mockTime.milliseconds() + REQUEST_TIMEOUT_MS; +mockTime.sleep(rebalanceTimeoutMs + AbstractCoordinator.JOIN_GROUP_TIMEOUT_LAPSE + 1); +assertFalse(consumerClient.poll(future, mockTime.timer(0))); + +mockTime.sleep(expectedRequestDeadline - mockTime.milliseconds() + 1); Review comment: We need to take into account the time that has already passed. I was a little annoyed at having to write `REQUEST_TIMEOUT - rebalanceTimeoutMs - AbstractCoordinator.JOIN_GROUP_TIMEOUT_LAPSE`. A bit annoying either way I guess. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound
hachikuji commented on a change in pull request #8702: URL: https://github.com/apache/kafka/pull/8702#discussion_r429327946 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -839,20 +833,22 @@ private void handleCompletedReceives(List responses, long now) { InFlightRequest req = inFlightRequests.completeNext(source); Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); -if (log.isTraceEnabled()) { Review comment: Letting some requests be debug level, but making the responses be trace often means we are sometimes left with only half of the picture. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound
hachikuji commented on a change in pull request #8702: URL: https://github.com/apache/kafka/pull/8702#discussion_r429327946 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -839,20 +833,22 @@ private void handleCompletedReceives(List responses, long now) { InFlightRequest req = inFlightRequests.completeNext(source); Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); -if (log.isTraceEnabled()) { Review comment: Letting some requests be debug level, but making the responses be trace often means we are left with only half of the picture. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter
abbccdda commented on a change in pull request #8589: URL: https://github.com/apache/kafka/pull/8589#discussion_r42955 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -379,6 +379,22 @@ private static MetadataResponse prepareMetadataResponse(Cluster cluster, Errors MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED); } +private static DescribeGroupsResponseData prepareDescribeGroupsResponseData(String groupId, List groupInstances, + List topicPartitions) { +final ByteBuffer memberAssignment = ConsumerProtocol.serializeAssignment(new ConsumerPartitionAssignor.Assignment(topicPartitions)); +byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()]; +List describedGroupMembers = groupInstances.stream().map(groupInstance -> DescribeGroupsResponse.groupMember("0", groupInstance, "clientId0", "clientHost", memberAssignmentBytes, null)).collect(Collectors.toList()); Review comment: nit: we could set `"0"` to `JoinGroupRequest.UNKNOWN_MEMBER_ID` if we don't want to test it out. Having all members use the same member.id is a bit weird. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java ## @@ -51,9 +52,21 @@ if (throwable != null) { result.completeExceptionally(throwable); } else { -for (MemberToRemove memberToRemove : memberInfos) { -if (maybeCompleteExceptionally(memberErrors, memberToRemove.toMemberIdentity(), result)) { -return; +if (removeAll()) { +for (Map.Entry entry: memberErrors.entrySet()) { +Exception exception = entry.getValue().exception(); +if (exception != null) { +Throwable ex = new KafkaException("Encounter exception when trying to remove: " Review comment: Let's put the exception in the cause so that we could verify the cause in `KafkaAdminClientTest`, as: ``` if (exception != null) { result.completeExceptionally(new KafkaException( "Encounter exception when trying to remove: " + entry.getKey(), exception)); return; } ``` ## File path: clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java ## @@ -37,7 +38,15 @@ public RemoveMembersFromConsumerGroupOptions(Collection members) this.members = new HashSet<>(members); } +public RemoveMembersFromConsumerGroupOptions() { +this.members = Collections.emptySet();; Review comment: nit: extra semi-colon ## File path: core/src/main/scala/kafka/tools/StreamsResetter.java ## @@ -119,7 +122,9 @@ + "* This tool will not clean up the local state on the stream application instances (the persisted " + "stores used to cache aggregation results).\n" + "You need to call KafkaStreams#cleanUp() in your application or manually delete them from the " -+ "directory specified by \"state.dir\" configuration (/tmp/kafka-streams/ by default).\n\n" ++ "directory specified by \"state.dir\" configuration (/tmp/kafka-streams/ by default).\n" ++ "*Please use the \"--force\" option to force remove active members in case long session " Review comment: nit: space after `*`. Also I feel we could make the context more concrete by: ``` When long session timeout has been configured, active members could take longer to get expired on the broker thus blocking the reset job to complete. Use the \"--force\" option could remove those left-over members immediately. Make sure to stop all stream applications when this option is specified to avoid unexpected disruptions. ``` ## File path: streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java ## @@ -261,6 +261,43 @@ public void shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception Assert.assertEquals(1, exitCode); } +public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() throws Exception { +appID = testId + "-with-force-option"; +streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID); +streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + STREAMS_CONSUMER_TIMEOUT * 100); Review comment: I see, this is indeed weird, please file a JIRA so that we could clean in a follow-up PR if others feel the same way. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For q
[GitHub] [kafka] vvcephei commented on pull request #8713: KAFKA-6145: Add unit tests for assignments of only stateless tasks
vvcephei commented on pull request #8713: URL: https://github.com/apache/kafka/pull/8713#issuecomment-632787649 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness
vvcephei commented on a change in pull request #8696: URL: https://github.com/apache/kafka/pull/8696#discussion_r429346085 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java ## @@ -57,14 +59,42 @@ public boolean assign(final Map clients, configs.numStandbyReplicas ); -final boolean probingRebalanceNeeded = assignTaskMovements( -tasksToCaughtUpClients(statefulTasks, clientStates, configs.acceptableRecoveryLag), +final AtomicInteger remainingWarmupReplicas = new AtomicInteger(configs.maxWarmupReplicas); + +final Map> tasksToCaughtUpClients = tasksToCaughtUpClients( +statefulTasks, +clientStates, +configs.acceptableRecoveryLag +); + +// We temporarily need to know which standby tasks were intended as warmups +// for active tasks, so that we don't move them (again) when we plan standby +// task movements. We can then immediately treat warmups exactly the same as +// hot-standby replicas, so we just track it right here as metadata, rather +// than add "warmup" assignments to ClientState, for example. +final Map> warmups = new TreeMap<>(); + +final int neededActiveTaskMovements = assignActiveTaskMovements( +tasksToCaughtUpClients, clientStates, -configs.maxWarmupReplicas +warmups, +remainingWarmupReplicas +); + +final int neededStandbyTaskMovements = assignStandbyTaskMovements( +tasksToCaughtUpClients, +clientStates, +remainingWarmupReplicas, +warmups ); assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks)); +// We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any +// due to being configured for no warmups. Review comment: Yeah, I've just realized this, too. And upon second consideration, I don't think the warmup=0 really provides a good mechanism for what I was thinking of. I think we'd better leave it as "at least one". Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness
vvcephei commented on a change in pull request #8696: URL: https://github.com/apache/kafka/pull/8696#discussion_r429346417 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java ## @@ -132,22 +132,97 @@ static boolean assignTaskMovements(final Map> tasksToCau return movementsNeeded; } +static int assignStandbyTaskMovements(final Map> tasksToCaughtUpClients, + final Map clientStates, + final AtomicInteger remainingWarmupReplicas, + final Map> warmups) { +final BiFunction caughtUpPredicate = +(client, task) -> taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients); + +final ConstrainedPrioritySet caughtUpClientsByTaskLoad = new ConstrainedPrioritySet( +caughtUpPredicate, +client -> clientStates.get(client).assignedTaskLoad() +); + +final Queue taskMovements = new PriorityQueue<>( + Comparator.comparing(TaskMovement::numCaughtUpClients).thenComparing(TaskMovement::task) +); + +for (final Map.Entry clientStateEntry : clientStates.entrySet()) { +final UUID destination = clientStateEntry.getKey(); +final ClientState state = clientStateEntry.getValue(); +for (final TaskId task : state.standbyTasks()) { +if (warmups.getOrDefault(destination, Collections.emptySet()).contains(task)) { +// this is a warmup, so we won't move it. +} else if (!taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, destination, tasksToCaughtUpClients)) { Review comment: Haha, sure :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness
vvcephei commented on pull request #8696: URL: https://github.com/apache/kafka/pull/8696#issuecomment-632791703 Hey @cadonna , thanks for the feedback! I think I'll go ahead and merge this because I've already fixed the "max warmup" thing in a separate bugfix PR, which builds on this one. I'll apply your DeMorgan's law suggestion also in the follow-up. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei merged pull request #8696: KAFKA-6145: KIP-441: Enforce Standby Task Stickiness
vvcephei merged pull request #8696: URL: https://github.com/apache/kafka/pull/8696 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei opened a new pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough
vvcephei opened a new pull request #8716: URL: https://github.com/apache/kafka/pull/8716 Also fixes a system test by configuring the HATA to perform a one-shot balanced assignment ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bdbyrne opened a new pull request #8717: KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config
bdbyrne opened a new pull request #8717: URL: https://github.com/apache/kafka/pull/8717 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bdbyrne commented on pull request #8715: KAFKA-10033: AdminClient should throw UnknownTopicOrPartitionException instead of UnknownServerException if altering configs of non-existing t
bdbyrne commented on pull request #8715: URL: https://github.com/apache/kafka/pull/8715#issuecomment-632799086 Hi @gnkoshelev - it's assumed the topic has been validated by the time it attempts to alter its config via AdminZKClient. I've posted the proper fix at https://github.com/apache/kafka/pull/8717. Thanks for finding and reporting! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bdbyrne commented on pull request #8717: KAFKA-10033: Throw UnknownTopicOrPartitionException when modifying a non-existent topic's config
bdbyrne commented on pull request #8717: URL: https://github.com/apache/kafka/pull/8717#issuecomment-632799858 @cmccabe this is ready for review. The test location doesn't quite fit as it's in the module test dynamic config changes on the broker, but it's "near" the old error. Any ideas on where it might be better situated? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on a change in pull request #8702: MINOR: Fix join group request timeout lower bound
guozhangwang commented on a change in pull request #8702: URL: https://github.com/apache/kafka/pull/8702#discussion_r429352451 ## File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ## @@ -502,14 +502,8 @@ private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long String destination = clientRequest.destination(); RequestHeader header = clientRequest.makeHeader(request.version()); if (log.isDebugEnabled()) { -int latestClientVersion = clientRequest.apiKey().latestVersion(); -if (header.apiVersion() == latestClientVersion) { -log.trace("Sending {} {} with correlation id {} to node {}", clientRequest.apiKey(), request, Review comment: SG. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough
vvcephei commented on a change in pull request #8716: URL: https://github.com/apache/kafka/pull/8716#discussion_r429352759 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java ## @@ -334,6 +334,7 @@ public String toString() { ") prevStandbyTasks: (" + prevStandbyTasks + ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() + ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() + +") taskLagTotals: (" + taskLagTotals.entrySet() + Review comment: I found this useful while debugging the system test. ## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ## @@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG)); consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG)); consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG)); +consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG)); +consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG)); +consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG)); Review comment: This is where we forgot to copy over the configs. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java ## @@ -359,6 +359,10 @@ private AssignmentConfigs(final StreamsConfig configs) { final Integer maxWarmupReplicas, final Integer numStandbyReplicas, final Long probingRebalanceIntervalMs) { +if (maxWarmupReplicas < 1) { +throw new IllegalArgumentException("must configure at least one warmup replica"); +} + Review comment: I added this constraint to mirror the constraint we already apply in StreamConfig. It's not critical, but I was disappointed that I had written a bunch of tests that included a technically invalid configuration. I'll write a test for this... ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java ## @@ -90,15 +91,12 @@ public boolean assign(final Map clients, assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks)); -// We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any -// due to being configured for no warmups. -final boolean probingRebalanceNeeded = -configs.maxWarmupReplicas > 0 && neededActiveTaskMovements + neededStandbyTaskMovements > 0; +final boolean probingRebalanceNeeded = neededActiveTaskMovements + neededStandbyTaskMovements > 0; log.info("Decided on assignment: " + clientStates + - " with " + - (probingRebalanceNeeded ? "" : "no") + + " with" + + (probingRebalanceNeeded ? "" : " no") + Review comment: Fixing a double-space we were printing when there was a followup. (It would say `with followup`) ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java ## @@ -90,15 +91,12 @@ public boolean assign(final Map clients, assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks)); -// We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any -// due to being configured for no warmups. -final boolean probingRebalanceNeeded = -configs.maxWarmupReplicas > 0 && neededActiveTaskMovements + neededStandbyTaskMovements > 0; +final boolean probingRebalanceNeeded = neededActiveTaskMovements + neededStandbyTaskMovements > 0; Review comment: Since we can't have zero warmups, we don't need this condition (that I added in https://github.com/apache/kafka/pull/8696) ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java ## @@ -241,16 +239,29 @@ private static void assignStatelessActiveTasks(final TreeMap final Map> taskToCaughtUpClients = new HashMap<>(); for (final TaskId task : statefulTasks) { - +final TreeSet caughtUpClients = new TreeSet<>(); Review comment: A short-lived, empty TreeSet costs practically nothing, and I found the other logic (with null meaning empty) a bit confusing during debugging. ## File path: streams/src/main/java/org
[GitHub] [kafka] xvrl commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
xvrl commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r429366836 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients + */ +public class KafkaMetricsContext implements MetricsContext { +/** + * Client or Service's metadata map. + */ +private Map metadata = new HashMap<>(); + +/** + * Create a MetricsContext with namespace, no service or client properties + * @param namespace value for _namespace key + */ +public KafkaMetricsContext(String namespace) { +this(namespace, new HashMap<>()); +} + +/** + * Create a MetricsContext with namespace, service or client properties + * @param namespace value for _namespace key + * @param metadata metadata additional entries to add to the context. + * values will be converted to string using Object.toString() + */ +public KafkaMetricsContext(String namespace, Map metadata) { +this.metadata.put(MetricsContext.NAMESPACE, namespace); +metadata.forEach((key, value) -> this.metadata.put(key, value.toString())); Review comment: I think we it's ok for the component that owns the reporter to take precedence over the labels passed from upstream. We did not specify the behavior in the KIP, so implementations should use namespacing of labels to avoid this. If in practice we find this behavior is less desirable, we can file a follow-on KIP, since the interface is till evolving. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wj1918 closed pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect
wj1918 closed pull request #8612: URL: https://github.com/apache/kafka/pull/8612 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] wj1918 commented on pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect
wj1918 commented on pull request #8612: URL: https://github.com/apache/kafka/pull/8612#issuecomment-632819461 @kkonstantine thanks for the comment. understood, will close this PR and create separate PRs when ready. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rnpridgeon commented on a change in pull request #8691: KAFKA-9960: implement KIP-606 to add metadata context to MetricsReporter
rnpridgeon commented on a change in pull request #8691: URL: https://github.com/apache/kafka/pull/8691#discussion_r429370889 ## File path: clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetricsContext.java ## @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.metrics; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** + * A implementation of MetricsContext, it encapsulates required metrics context properties for Kafka services and clients + */ +public class KafkaMetricsContext implements MetricsContext { +/** + * Client or Service's metadata map. + */ +private Map metadata = new HashMap<>(); + +/** + * Create a MetricsContext with namespace, no service or client properties + * @param namespace value for _namespace key + */ +public KafkaMetricsContext(String namespace) { +this(namespace, new HashMap<>()); +} + +/** + * Create a MetricsContext with namespace, service or client properties + * @param namespace value for _namespace key + * @param metadata metadata additional entries to add to the context. + * values will be converted to string using Object.toString() + */ +public KafkaMetricsContext(String namespace, Map metadata) { +this.metadata.put(MetricsContext.NAMESPACE, namespace); +metadata.forEach((key, value) -> this.metadata.put(key, value.toString())); Review comment: Mostly I'm concerned about the case where some composite may share similar labels to the underlying client it manages. If we allow the downstream client to overwrite such a label we will lose a portion of the upstream components context. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8612: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect
kkonstantine commented on pull request #8612: URL: https://github.com/apache/kafka/pull/8612#issuecomment-632820983 Thanks @wj1918 ! Happy to review after they are ready. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8663: KAFKA-9985: Sink connector may exhaust broker when writing in DLQ
rhauch commented on pull request #8663: URL: https://github.com/apache/kafka/pull/8663#issuecomment-632824180 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch removed a comment on pull request #8630: KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation
rhauch removed a comment on pull request #8630: URL: https://github.com/apache/kafka/pull/8630#issuecomment-632824360 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8630: KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation
rhauch commented on pull request #8630: URL: https://github.com/apache/kafka/pull/8630#issuecomment-632824360 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8630: KAFKA-9969: Exclude ConnectorClientConfigRequest from class loading isolation
rhauch commented on pull request #8630: URL: https://github.com/apache/kafka/pull/8630#issuecomment-632824461 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-10005) Decouple RestoreListener from RestoreCallback and not enable bulk loading for RocksDB
[ https://issues.apache.org/jira/browse/KAFKA-10005?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang reassigned KAFKA-10005: - Assignee: (was: Guozhang Wang) > Decouple RestoreListener from RestoreCallback and not enable bulk loading for > RocksDB > - > > Key: KAFKA-10005 > URL: https://issues.apache.org/jira/browse/KAFKA-10005 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Guozhang Wang >Priority: Major > > In Kafka Streams we have two restoration callbacks: > * RestoreCallback (BatchingRestoreCallback): specified per-store via > registration to specify the logic of applying a batch of records read from > the changelog to the store. Used for both updating standby tasks and > restoring active tasks. > * RestoreListener: specified per-instance via `setRestoreListener`, to > specify the logic for `onRestoreStart / onRestoreEnd / onBatchRestored`. > As we can see these two callbacks are for quite different purposes, however > today we allow user's to register a per-store RestoreCallback which is also > implementing the RestoreListener. Such weird mixing is actually motivated by > Streams internal usage to enable / disable bulk loading inside RocksDB. For > user's however this is less meaningful to specify a callback to be a listener > since the `onRestoreStart / End` has the storeName passed in, so that users > can just define different listening logic if needed for different stores. > On the other hand, this mixing of two callbacks enforces Streams to check > internally if the passed in per-store callback is also implementing listener, > and if yes trigger their calls, which increases the complexity. Besides, > toggle rocksDB for bulk loading requires us to open / close / reopen / > reclose 4 times during the restoration which could also be costly. > Given that we have KIP-441 in place, I think we should consider different > ways other than toggle bulk loading during restoration for Streams (e.g. > using different threads for restoration). > The proposal for this ticket is to completely decouple the listener from > callback -- i.e. we would not presume users passing in a callback function > that implements both RestoreCallback and RestoreListener, and also for > RocksDB we replace the bulk loading mechanism with other ways of > optimization: https://rockset.com/blog/optimizing-bulk-load-in-rocksdb/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17114291#comment-17114291 ] Guozhang Wang commented on KAFKA-9168: -- Just add a reference that this feature is only available in 6.8.1+ > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Task > Components: streams >Reporter: Sagar Rao >Priority: Blocker > Labels: perfomance > Fix For: 3.0.0 > > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17114309#comment-17114309 ] Ryanne Dolan commented on KAFKA-7500: - [~qq619618919] I usually recommend using a load balancers and health checks for this purpose, but you can also get away with just two VIPs (dc1-kafka, dc2-kafka) and KIP-302. > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Affects Versions: 2.4.0 >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Major > Labels: pull-request-available, ready-to-commit > Fix For: 2.4.0 > > Attachments: Active-Active XDCR setup.png > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch opened a new pull request #8718: [WIP] MINOR: Improve failure message in Kafka cluster used in Connect integration tests
rhauch opened a new pull request #8718: URL: https://github.com/apache/kafka/pull/8718 DO NOT MERGE Debugging build failures This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8697: [WIP] KAFKA-9983: KIP-613, add task-level e2e latency metrics (min and max)
ableegoldman commented on a change in pull request #8697: URL: https://github.com/apache/kafka/pull/8697#discussion_r429396524 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -137,6 +138,7 @@ public StreamTask(final TaskId id, } processRatioSensor = TaskMetrics.activeProcessRatioSensor(threadId, taskId, streamsMetrics); processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics); +recordE2ELatencySensor = TaskMetrics.recordE2ELatencySensor(threadId, taskId, streamsMetrics); Review comment: I modified the proposal slightly to make these all processor-node level (will push the changes in a minute) but this question is still relevant, so here's the answer: We can't record the e2e latency in the sink node because not all topologies _have_ a sink node. For that reason we also can't record at the record collector. We need to figure out the terminal nodes when processing the topology, then record this metric after `child.process` in `ProcessorContext#forward` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch opened a new pull request #8719: [WIP] MINOR: Improve failure message in Kafka cluster used in Connect integration tests (pre Gradle memory change)
rhauch opened a new pull request #8719: URL: https://github.com/apache/kafka/pull/8719 DO NOT MERGE Debugging build failures See #8718 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org