[jira] [Commented] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so
[ https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17472542#comment-17472542 ] Seungchan Ahn commented on KAFKA-13217: --- [~ableegoldman] [~guozhang] Hello, can I have a chance to work on this? If yes, could you add me to the contributor list? I've sent a mail to the dev mailing list. * title: Request to contribute * link: https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3 > Reconsider skipping the LeaveGroup on close() or add an overload that does so > - > > Key: KAFKA-13217 > URL: https://issues.apache.org/jira/browse/KAFKA-13217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip, newbie, newbie++ > > In Kafka Streams, when an instance is shut down via the close() API, we > intentionally skip sending a LeaveGroup request. This is because often the > shutdown is not due to a scaling down event but instead some transient > closure, such as during a rolling bounce. In cases where the instance is > expected to start up again shortly after, we originally wanted to avoid that > member's tasks from being redistributed across the remaining group members > since this would disturb the stable assignment and could cause unnecessary > state migration and restoration. We also hoped > to limit the disruption to just a single rebalance, rather than forcing the > group to rebalance once when the member shuts down and then again when it > comes back up. So it's really an optimization for the case in which the > shutdown is temporary. > > That said, many of those optimizations are no longer necessary or at least > much less useful given recent features and improvements. For example > rebalances are now lightweight so skipping the 2nd rebalance is not as worth > optimizing for, and the new assignor will take into account the actual > underlying state for each task/partition assignment, rather than just the > previous assignment, so the assignment should be considerably more stable > across bounces and rolling restarts. > > Given that, it might be time to reconsider this optimization. Alternatively, > we could introduce another form of the close() API that forces the member to > leave the group, to be used in event of actual scale down rather than a > transient bounce. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so
[ https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17472542#comment-17472542 ] Seungchan Ahn edited comment on KAFKA-13217 at 1/11/22, 8:23 AM: - [~ableegoldman] [~guozhang] Hello 👋 can I have a chance to work on this? If yes, could you add me to the contributor list? I've sent a mail to the dev mailing list. * title: Request to contribute * link: [https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3] was (Author: JIRAUSER283196): [~ableegoldman] [~guozhang] Hello, can I have a chance to work on this? If yes, could you add me to the contributor list? I've sent a mail to the dev mailing list. * title: Request to contribute * link: https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3 > Reconsider skipping the LeaveGroup on close() or add an overload that does so > - > > Key: KAFKA-13217 > URL: https://issues.apache.org/jira/browse/KAFKA-13217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > Labels: needs-kip, newbie, newbie++ > > In Kafka Streams, when an instance is shut down via the close() API, we > intentionally skip sending a LeaveGroup request. This is because often the > shutdown is not due to a scaling down event but instead some transient > closure, such as during a rolling bounce. In cases where the instance is > expected to start up again shortly after, we originally wanted to avoid that > member's tasks from being redistributed across the remaining group members > since this would disturb the stable assignment and could cause unnecessary > state migration and restoration. We also hoped > to limit the disruption to just a single rebalance, rather than forcing the > group to rebalance once when the member shuts down and then again when it > comes back up. So it's really an optimization for the case in which the > shutdown is temporary. > > That said, many of those optimizations are no longer necessary or at least > much less useful given recent features and improvements. For example > rebalances are now lightweight so skipping the 2nd rebalance is not as worth > optimizing for, and the new assignor will take into account the actual > underlying state for each task/partition assignment, rather than just the > previous assignment, so the assignment should be considerably more stable > across bounces and rolling restarts. > > Given that, it might be time to reconsider this optimization. Alternatively, > we could introduce another form of the close() API that forces the member to > leave the group, to be used in event of actual scale down rather than a > transient bounce. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies
ableegoldman commented on a change in pull request #11600: URL: https://github.com/apache/kafka/pull/11600#discussion_r781948695 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // parse the topology to determine the repartition source topics, // making sure they are created with the number of partitions as // the maximum of the depending sub-topologies source topics' number of partitions -final Map allRepartitionTopicPartitions = prepareRepartitionTopics(metadata); +final RepartitionTopics repartitionTopics = new RepartitionTopics( +taskManager.topologyMetadata(), +internalTopicManager, +copartitionedTopicsEnforcer, +metadata, +logPrefix +); + +final Map> missingExternalSourceTopicsPerTopology = repartitionTopics.setup(); +if (!missingExternalSourceTopicsPerTopology.isEmpty()) { +log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " + + "have been pre-created before starting the Streams application. ", + taskManager.topologyMetadata().hasNamedTopologies() ? + missingExternalSourceTopicsPerTopology : + missingExternalSourceTopicsPerTopology.values() +); +if (!taskManager.topologyMetadata().hasNamedTopologies()) { +throw new MissingSourceTopicException("Missing source topics."); +} +} + +final Map allRepartitionTopicPartitions = repartitionTopics.topicPartitionsInfo(); Review comment: ok, ok, you guys convinced me like 4 comments ago :P -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest
dajac commented on a change in pull request #11566: URL: https://github.com/apache/kafka/pull/11566#discussion_r781968801 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1690,6 +1690,7 @@ class KafkaApis(val requestChannel: RequestChannel, joinGroupRequest.data.protocolType, protocols, sendResponseCallback, +Option(joinGroupRequest.data.reason()), Review comment: nit: You can remove the parenthesis after `reason`. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) { final RuntimeException exception = future.exception(); resetJoinGroupFuture(); +rejoinReason = "rebalance failed due to '" + exception.getMessage() + "' (" + exception.getClass().getSimpleName() + ")"; Review comment: Also, it might be better to use `synchronized (AbstractCoordinator.this) { }` to mutate both `rejoinReason` and `rejoinNeeded` in order to ensure that they are consistent with each others. ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java ## @@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) { final RuntimeException exception = future.exception(); resetJoinGroupFuture(); +rejoinReason = "rebalance failed due to '" + exception.getMessage() + "' (" + exception.getClass().getSimpleName() + ")"; Review comment: nit: Would it make sense to use `String.format` like we did at L460? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest
dajac commented on a change in pull request #11571: URL: https://github.com/apache/kafka/pull/11571#discussion_r782071080 ## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ## @@ -3907,6 +3909,44 @@ public void testRemoveMembersFromGroup() throws Exception { } } +@Test +public void testRemoveMembersFromGroupReason() throws Exception { +final Cluster cluster = mockCluster(3, 0); +final Time time = new MockTime(); + +try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, cluster)) { + +env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, env.cluster().controller())); +env.kafkaClient().prepareResponse(body -> { +if (!(body instanceof LeaveGroupRequest)) { +return false; +} +LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) body).data(); + +return leaveGroupRequest.members().stream().allMatch( +member -> member.reason().equals("member was removed by an admin: testing remove members reason") +); +}, new LeaveGroupResponse(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers( +Arrays.asList( +new MemberResponse().setGroupInstanceId("instance-1"), +new MemberResponse().setGroupInstanceId("instance-2") +)) +)); + +Collection membersToRemove = Arrays.asList(new MemberToRemove("instance-1"), new MemberToRemove("instance-2")); + +RemoveMembersFromConsumerGroupOptions options = new RemoveMembersFromConsumerGroupOptions(membersToRemove); +options.reason("testing remove members reason"); Review comment: Should we also add a test that does not specify the reason to test the default case? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13585) Fix `kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds` flaky test
[ https://issues.apache.org/jira/browse/KAFKA-13585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-13585: --- Assignee: David Jacot > Fix > `kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds` > flaky test > - > > Key: KAFKA-13585 > URL: https://issues.apache.org/jira/browse/KAFKA-13585 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Major > > {noformat} > org.opentest4j.AssertionFailedError: expected: but was: > at > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) > at > app//kafka.server.ReplicaManagerTest.assertFetcherHasTopicId(ReplicaManagerTest.scala:3502) > at > app//kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds(ReplicaManagerTest.scala:3572) > {noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Updated] (KAFKA-13585) Fix `kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds` flaky test
[ https://issues.apache.org/jira/browse/KAFKA-13585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot updated KAFKA-13585: Priority: Minor (was: Major) > Fix > `kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds` > flaky test > - > > Key: KAFKA-13585 > URL: https://issues.apache.org/jira/browse/KAFKA-13585 > Project: Kafka > Issue Type: Improvement >Reporter: David Jacot >Assignee: David Jacot >Priority: Minor > > {noformat} > org.opentest4j.AssertionFailedError: expected: but was: > at > app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40) > at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35) > at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162) > at > app//kafka.server.ReplicaManagerTest.assertFetcherHasTopicId(ReplicaManagerTest.scala:3502) > at > app//kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds(ReplicaManagerTest.scala:3572) > {noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] YeonCheolGit closed pull request #11660: MINOR: fix comment in ClusterConnectionStates
YeonCheolGit closed pull request #11660: URL: https://github.com/apache/kafka/pull/11660 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] YeonCheolGit commented on a change in pull request #11660: MINOR: fix comment in ClusterConnectionStates
YeonCheolGit commented on a change in pull request #11660: URL: https://github.com/apache/kafka/pull/11660#discussion_r782195849 ## File path: clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ## @@ -68,7 +68,7 @@ public ClusterConnectionStates(long reconnectBackoffMs, long reconnectBackoffMax } /** - * Return true iff we can currently initiate a new connection. This will be the case if we are not + * Return true if we can currently initiate a new connection. This will be the case if we are not Review comment: @junrao Oh really didn't know about that. Sorry for interrupt you and thank you for letting me know:) I will close this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies
ableegoldman commented on a change in pull request #11600: URL: https://github.com/apache/kafka/pull/11600#discussion_r781814872 ## File path: checkstyle/suppressions.xml ## @@ -158,7 +158,7 @@ files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread).java"/> + files="(KTableImpl|StreamsPartitionAssignor).java"/> Review comment: I would prefer to leave out any nontrivial refactoring such as breaking up the assignor's ridiculously long `assign` method, though I do agree that it needs to be broken up (again -- I actually did a pretty hefty refactoring to clean this method up a while back, I guess it's just gotten out of control again since then :/ ) edit: nevermind, I will clean it up in this PR and remove the suppression. Lol it's hard to argue against something you and Guozhang left like 5 comments to request -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies
ableegoldman commented on a change in pull request #11600: URL: https://github.com/apache/kafka/pull/11600#discussion_r782209624 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // parse the topology to determine the repartition source topics, // making sure they are created with the number of partitions as // the maximum of the depending sub-topologies source topics' number of partitions -final Map allRepartitionTopicPartitions = prepareRepartitionTopics(metadata); +final RepartitionTopics repartitionTopics = new RepartitionTopics( Review comment: I wanted to break up the `prepareRepartitinTopics` call into explicit steps so I could get the missing source topics returned by the middle step, but I think there's a better way to achieve what we need here so I'll clean it up -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies
ableegoldman commented on a change in pull request #11600: URL: https://github.com/apache/kafka/pull/11600#discussion_r782215325 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, final GroupSubscription gr // parse the topology to determine the repartition source topics, // making sure they are created with the number of partitions as // the maximum of the depending sub-topologies source topics' number of partitions -final Map allRepartitionTopicPartitions = prepareRepartitionTopics(metadata); +final RepartitionTopics repartitionTopics = new RepartitionTopics( +taskManager.topologyMetadata(), +internalTopicManager, +copartitionedTopicsEnforcer, +metadata, +logPrefix +); + +final Map> missingExternalSourceTopicsPerTopology = repartitionTopics.setup(); +if (!missingExternalSourceTopicsPerTopology.isEmpty()) { +log.error("The following source topics are missing/unknown: {}. Please make sure all source topics " + + "have been pre-created before starting the Streams application. ", + taskManager.topologyMetadata().hasNamedTopologies() ? + missingExternalSourceTopicsPerTopology : + missingExternalSourceTopicsPerTopology.values() +); Review comment: Good point, I think I changed how things worked halfway through this PR and forgot to update/put some things back the way they were when the changes were no longer necessary (like this and inlining the `prepareRepartitionTopics` method) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-13591) Fix flaky test ControllerIntegrationTest.testTopicIdCreatedOnUpgrade
David Jacot created KAFKA-13591: --- Summary: Fix flaky test ControllerIntegrationTest.testTopicIdCreatedOnUpgrade Key: KAFKA-13591 URL: https://issues.apache.org/jira/browse/KAFKA-13591 Project: Kafka Issue Type: Bug Reporter: David Jacot {noformat} org.opentest4j.AssertionFailedError: expected: not equal but was: at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) at org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:276) at org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:265) at org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:260) at org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:2798) at kafka.controller.ControllerIntegrationTest.testTopicIdCreatedOnUpgrade(ControllerIntegrationTest.scala:1140) {noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] dajac opened a new pull request #11665: KAFKA-13585; Fix flaky test `ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds`
dajac opened a new pull request #11665: URL: https://github.com/apache/kafka/pull/11665 `ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds` fails regularly because `assertFetcherHasTopicId` can't validate the topic id in the fetch state. The issue is quite subtile. Under the hood, `assertFetcherHasTopicId` acquires the `partitionMapLock` in the fetcher thread. `partitionMapLock` is also acquired by the the `processFetchRequest` method. If `processFetchRequest` acquires it before `assertFetcherHasTopicId` can check the fetch state, `assertFetcherHasTopicId` has not chance to verify the state anymore because `processFetchRequest` will remove the fetch state before releasing the lock. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #11666: KAFKA-13591; Fix flaky test `ControllerIntegrationTest.testTopicIdCreatedOnUpgrade`
dajac opened a new pull request #11666: URL: https://github.com/apache/kafka/pull/11666 The issue is that when `zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)` is called after the new controller is brought up, there is not guarantee that the controller has already written the topic id to the topic znode. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] nicktelford commented on pull request #11610: KAFKA-13549: Add min.repartition.purge.interval.ms
nicktelford commented on pull request #11610: URL: https://github.com/apache/kafka/pull/11610#issuecomment-1010106545 KIP available at: https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+separate+min.repartition.purge.interval.ms+to+Kafka+Streams The commit and PR description have been updated to the latest specification from the KIP, based on the dev list discussion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #11615: KAFKA-13546: Do not fail connector if default topic creation group is explicitly specified
C0urante commented on a change in pull request #11615: URL: https://github.com/apache/kafka/pull/11615#discussion_r782322703 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java ## @@ -64,6 +67,18 @@ return props; } +@Test +public void shouldNotFailWithExplicitlySpecifiedDefaultTopicCreationGroup() { +Map props = defaultConnectorProps(); +//Ensure default group is omitted even if its specified multiple times. +props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", DEFAULT_TOPIC_CREATION_GROUP, +TOPIC_CREATION_GROUP_1, DEFAULT_TOPIC_CREATION_GROUP, TOPIC_CREATION_GROUP_2)); +props.put(TOPIC_CREATION_GROUPS_CONFIG, DEFAULT_TOPIC_CREATION_GROUP); Review comment: Was this left in accidentally? Looks like it overrides the line above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on a change in pull request #11615: KAFKA-13546: Do not fail connector if default topic creation group is explicitly specified
C0urante commented on a change in pull request #11615: URL: https://github.com/apache/kafka/pull/11615#discussion_r782322703 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java ## @@ -64,6 +67,18 @@ return props; } +@Test +public void shouldNotFailWithExplicitlySpecifiedDefaultTopicCreationGroup() { +Map props = defaultConnectorProps(); +//Ensure default group is omitted even if its specified multiple times. +props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", DEFAULT_TOPIC_CREATION_GROUP, +TOPIC_CREATION_GROUP_1, DEFAULT_TOPIC_CREATION_GROUP, TOPIC_CREATION_GROUP_2)); +props.put(TOPIC_CREATION_GROUPS_CONFIG, DEFAULT_TOPIC_CREATION_GROUP); Review comment: Was this left in accidentally? Looks like it overrides the line above, which seems more suitable for this test case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13591) Fix flaky test ControllerIntegrationTest.testTopicIdCreatedOnUpgrade
[ https://issues.apache.org/jira/browse/KAFKA-13591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-13591: --- Assignee: David Jacot > Fix flaky test ControllerIntegrationTest.testTopicIdCreatedOnUpgrade > > > Key: KAFKA-13591 > URL: https://issues.apache.org/jira/browse/KAFKA-13591 > Project: Kafka > Issue Type: Bug >Reporter: David Jacot >Assignee: David Jacot >Priority: Minor > > {noformat} > org.opentest4j.AssertionFailedError: expected: not equal but was: > at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39) > at > org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:276) > at > org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:265) > at > org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:260) > at > org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:2798) > at > kafka.controller.ControllerIntegrationTest.testTopicIdCreatedOnUpgrade(ControllerIntegrationTest.scala:1140) > {noformat} -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] diegoazevedo commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
diegoazevedo commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1010167705 @vlsi the problem is that version 1.* is deprecated since 2015. Details: https://blogs.apache.org/foundation/entry/apache_logging_services_project_announces The vulnerability reported by security scanners (like Tenable Nessus) is exactly that: the use of an "unsupported Version". So I think the movement for 2.* is correct. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vlsi commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
vlsi commented on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1010208088 > problem is that version 1.* is deprecated since 2015 @diegoazevedo , let me explain: if there are people willing to support log4j 1.x, then it could be supported and maintained just fine. https://reload4j.qos.ch/ is a fork by @ceki (the one who created log4j in the first place!) As I highlighted above, there are individuals (including ASF committers like myself or even ASF members) who are willing to volunteer on supporting log4j 1.x. Currently, the question has not yet been decided by the ASF, however, I do not see how they can "forbid" maintaining 1.x provided the version is wildly used in the industry, there's a high demand on the fixes, and there are individuals to work on that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11665: KAFKA-13585; Fix flaky test `ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds`
jolshan commented on pull request #11665: URL: https://github.com/apache/kafka/pull/11665#issuecomment-1010212921 Is this test useful if we can't verify the topic ID? I think we have a similar one that doesn't check topic ID? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #11665: KAFKA-13585; Fix flaky test `ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds`
dajac commented on pull request #11665: URL: https://github.com/apache/kafka/pull/11665#issuecomment-1010214440 > Is this test useful if we can't verify the topic ID? I think we have a similar one that doesn't check topic ID? Do we? I haven't found it... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jolshan commented on pull request #11665: KAFKA-13585; Fix flaky test `ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds`
jolshan commented on pull request #11665: URL: https://github.com/apache/kafka/pull/11665#issuecomment-1010224534 Strange -- I might be misremembering because I can't seem to find it either. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vlsi edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2
vlsi edited a comment on pull request #7898: URL: https://github.com/apache/kafka/pull/7898#issuecomment-1010208088 > problem is that version 1.* is deprecated since 2015 @diegoazevedo , let me explain: if there are people willing to support log4j 1.x, then it could be supported and maintained just fine. https://reload4j.qos.ch/ is a fork by @ceki (the one who created log4j in the first place!) As I highlighted above, there are individuals (including ASF committers like myself or even ASF members) who are willing to volunteer on supporting log4j 1.x. Currently, the question has not yet been decided by the ASF, however, I do not see how they can "forbid" maintaining 1.x provided the version is wildly used in the industry, there's a high demand on the fixes, and there are individuals to work on that. Of course, if the ASF allows the volunteers to maintain 1.x, then reload4j will not be required. However, if the ASF blocks 1.x (for any reason), then reload4j might be way better for the consumers than migrating to 2.x or something else. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17473149#comment-17473149 ] Matthias J. Sax commented on KAFKA-13289: - {quote}The app uses a 0ms join window with a 0ms {quote} That is pretty aggressive and will result in a retention of 0ms, too. Thus, every out-of-order record will be considered late and would be dropped (and you would see the corresponding logging). {quote}Here's a question - could a large difference between message timestamp (used for joining) and system time be causing the "Skipping record" messages? {quote} It should not. System time should be totally out of the picture. {quote}I (accidentally) noticed that one way to reproduce "Skipping record for expired segment" messages seems to be adding messages to the input topics with older timestamps than those of earlier messages, i.e. going back in time. {quote} That is expected behavior. – Not sure what you mean by "would not mind". Kafka Streams can process out-of-order data, but of course if will drop data that is older than the configured retention period (for this case, you see the corresponding logs). > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but s
[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft
rondagostino commented on a change in pull request #11606: URL: https://github.com/apache/kafka/pull/11606#discussion_r782538135 ## File path: core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala ## @@ -196,14 +199,16 @@ abstract class QuorumTestHarness extends Logging { } } - def createAndStartBroker(config: KafkaConfig, - time: Time = Time.SYSTEM): KafkaBroker = { -implementation.createAndStartBroker(config, - time) + def createBroker(config: KafkaConfig, + time: Time = Time.SYSTEM, + startup: Boolean = true): KafkaBroker = { +implementation.createBroker(config, time, startup) } def shutdownZooKeeper(): Unit = asZk().shutdown() + def shutdownKRaftController(): Unit = asKRaft().shutdown() Review comment: Good point. There is no `shutdown()` method that we can invoke directly, but here we do want the RaftManager to not be shut down, so I've adjusted the method accordingly and kept the name. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft
rondagostino commented on a change in pull request #11606: URL: https://github.com/apache/kafka/pull/11606#discussion_r782539950 ## File path: core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala ## @@ -253,4 +248,32 @@ abstract class KafkaServerTestHarness extends QuorumTestHarness { getController().kafkaController.controllerContext.topicNames.toMap } + private def createBrokers(startup: Boolean): Unit = { +// Add each broker to `servers` buffer as soon as it is created to ensure that brokers +// are shutdown cleanly in tearDown even if a subsequent broker fails to start +for (config <- configs) { + val broker = createBrokerFromConfig(config) + _brokers += broker + if (startup) { +broker.startup() Review comment: This PR is not changing how this works in general, so this comment applies to the pre-PR implementation as well. I think the assumption is that the list of alive brokers is only valid if **all** brokers get created and are (now optionally) started -- and the test should be failed/torn down if an exception should occur that prevents all brokers from being processed. I'll add a comment to this effect in `recreateBrokers()` (which is the only place this gets invoked aside from in `setUp()`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so
[ https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Seungchan Ahn reassigned KAFKA-13217: - Assignee: Seungchan Ahn > Reconsider skipping the LeaveGroup on close() or add an overload that does so > - > > Key: KAFKA-13217 > URL: https://issues.apache.org/jira/browse/KAFKA-13217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Seungchan Ahn >Priority: Major > Labels: needs-kip, newbie, newbie++ > > In Kafka Streams, when an instance is shut down via the close() API, we > intentionally skip sending a LeaveGroup request. This is because often the > shutdown is not due to a scaling down event but instead some transient > closure, such as during a rolling bounce. In cases where the instance is > expected to start up again shortly after, we originally wanted to avoid that > member's tasks from being redistributed across the remaining group members > since this would disturb the stable assignment and could cause unnecessary > state migration and restoration. We also hoped > to limit the disruption to just a single rebalance, rather than forcing the > group to rebalance once when the member shuts down and then again when it > comes back up. So it's really an optimization for the case in which the > shutdown is temporary. > > That said, many of those optimizations are no longer necessary or at least > much less useful given recent features and improvements. For example > rebalances are now lightweight so skipping the 2nd rebalance is not as worth > optimizing for, and the new assignor will take into account the actual > underlying state for each task/partition assignment, rather than just the > previous assignment, so the assignment should be considerably more stable > across bounces and rolling restarts. > > Given that, it might be time to reconsider this optimization. Alternatively, > we could introduce another form of the close() API that forces the member to > leave the group, to be used in event of actual scale down rather than a > transient bounce. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so
[ https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17472542#comment-17472542 ] Seungchan Ahn edited comment on KAFKA-13217 at 1/12/22, 12:23 AM: -- [~ableegoldman] -- [~guozhang] -Hello 👋 can I have a chance to work on this? If yes, could you add me to the contributor list? I've sent a mail to the dev mailing list.- * -title: Request to contribute- * -link:- [-https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3-] FYI, It's done by another member. * on: https://lists.apache.org/thread/hyo61vnx2d65511trbr179sqmdgcgmyk was (Author: JIRAUSER283196): [~ableegoldman] [~guozhang] Hello 👋 can I have a chance to work on this? If yes, could you add me to the contributor list? I've sent a mail to the dev mailing list. * title: Request to contribute * link: [https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3] > Reconsider skipping the LeaveGroup on close() or add an overload that does so > - > > Key: KAFKA-13217 > URL: https://issues.apache.org/jira/browse/KAFKA-13217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Seungchan Ahn >Priority: Major > Labels: needs-kip, newbie, newbie++ > > In Kafka Streams, when an instance is shut down via the close() API, we > intentionally skip sending a LeaveGroup request. This is because often the > shutdown is not due to a scaling down event but instead some transient > closure, such as during a rolling bounce. In cases where the instance is > expected to start up again shortly after, we originally wanted to avoid that > member's tasks from being redistributed across the remaining group members > since this would disturb the stable assignment and could cause unnecessary > state migration and restoration. We also hoped > to limit the disruption to just a single rebalance, rather than forcing the > group to rebalance once when the member shuts down and then again when it > comes back up. So it's really an optimization for the case in which the > shutdown is temporary. > > That said, many of those optimizations are no longer necessary or at least > much less useful given recent features and improvements. For example > rebalances are now lightweight so skipping the 2nd rebalance is not as worth > optimizing for, and the new assignor will take into account the actual > underlying state for each task/partition assignment, rather than just the > previous assignment, so the assignment should be considerably more stable > across bounces and rolling restarts. > > Given that, it might be time to reconsider this optimization. Alternatively, > we could introduce another form of the close() API that forces the member to > leave the group, to be used in event of actual scale down rather than a > transient bounce. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Comment Edited] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so
[ https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17472542#comment-17472542 ] Seungchan Ahn edited comment on KAFKA-13217 at 1/12/22, 12:24 AM: -- [~ableegoldman] – [~guozhang] -Hello 👋 can I have a chance to work on this? If yes, could you add me to the contributor list? I've sent a mail to the dev mailing list.- * -title: Request to contribute- * -link: [https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3]- FYI, It's done by another member. * on: [https://lists.apache.org/thread/hyo61vnx2d65511trbr179sqmdgcgmyk] was (Author: JIRAUSER283196): [~ableegoldman] -- [~guozhang] -Hello 👋 can I have a chance to work on this? If yes, could you add me to the contributor list? I've sent a mail to the dev mailing list.- * -title: Request to contribute- * -link:- [-https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3-] FYI, It's done by another member. * on: https://lists.apache.org/thread/hyo61vnx2d65511trbr179sqmdgcgmyk > Reconsider skipping the LeaveGroup on close() or add an overload that does so > - > > Key: KAFKA-13217 > URL: https://issues.apache.org/jira/browse/KAFKA-13217 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Seungchan Ahn >Priority: Major > Labels: needs-kip, newbie, newbie++ > > In Kafka Streams, when an instance is shut down via the close() API, we > intentionally skip sending a LeaveGroup request. This is because often the > shutdown is not due to a scaling down event but instead some transient > closure, such as during a rolling bounce. In cases where the instance is > expected to start up again shortly after, we originally wanted to avoid that > member's tasks from being redistributed across the remaining group members > since this would disturb the stable assignment and could cause unnecessary > state migration and restoration. We also hoped > to limit the disruption to just a single rebalance, rather than forcing the > group to rebalance once when the member shuts down and then again when it > comes back up. So it's really an optimization for the case in which the > shutdown is temporary. > > That said, many of those optimizations are no longer necessary or at least > much less useful given recent features and improvements. For example > rebalances are now lightweight so skipping the 2nd rebalance is not as worth > optimizing for, and the new assignor will take into account the actual > underlying state for each task/partition assignment, rather than just the > previous assignment, so the assignment should be considerably more stable > across bounces and rolling restarts. > > Given that, it might be time to reconsider this optimization. Alternatively, > we could introduce another form of the close() API that forces the member to > leave the group, to be used in event of actual scale down rather than a > transient bounce. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17473235#comment-17473235 ] Eugen Dück commented on KAFKA-13289: {quote} bq. The app uses a 0ms join window with a 0ms That is pretty aggressive and will result in a retention of 0ms, too. Thus, every out-of-order record will be considered late and would be dropped (and you would see the corresponding logging). {quote} In terms of joining (which is done per key), that is the behavior we want. However it seems the dropping of messages is based on "partition time", i.e. per partition? As we don't have monotonically increasing timestamps across keys, that would mean we have an actual problem... So a solution would be to increase the window size or the grace period (which I honestly still don't know what the difference would be - a link or so to an explanation would be awesome), at the cost of having massively increased number of join pairs. > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values joined, rather than the right value being null. > Note that I understand it's expected that we initially get the left/null > values for many values since that's the expected semantics of kafka-streams > left join, at least until > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious > I've noticed that if I set a very large grace value on the join window the > problem is solved, but since the input I provide is not out of order I did > not expect to need to do that, and I'm weary of the resource requirements > doing so in practice on an application with a lot of volume. >
[GitHub] [kafka] dengziming opened a new pull request #11667: MINOR; Enable Kraft in ApiVersionTest
dengziming opened a new pull request #11667: URL: https://github.com/apache/kafka/pull/11667 *More detailed description of your change* 1. Replace ClusterTestExtensions with QuorumTestHarness 2. Enable Kraft in ApiVersionTest *Summary of testing strategy (including rationale)* QA ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-13242) KRaft Controller doesn't handle UpdateFeaturesRequest
[ https://issues.apache.org/jira/browse/KAFKA-13242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] dengziming updated KAFKA-13242: --- Parent: KAFKA-13410 Issue Type: Sub-task (was: Bug) > KRaft Controller doesn't handle UpdateFeaturesRequest > - > > Key: KAFKA-13242 > URL: https://issues.apache.org/jira/browse/KAFKA-13242 > Project: Kafka > Issue Type: Sub-task >Reporter: dengziming >Assignee: dengziming >Priority: Major > Fix For: 3.0.1 > > -- This message was sent by Atlassian Jira (v8.20.1#820001)
[GitHub] [kafka] jsancio opened a new pull request #11668: MINOR: Update KRaft controller's event processing time
jsancio opened a new pull request #11668: URL: https://github.com/apache/kafka/pull/11668 Make sure that the event queue processing time histogram gets updated and add tests that verify that the update methods modify the correct histogram. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`
[ https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17473235#comment-17473235 ] Eugen Dück edited comment on KAFKA-13289 at 1/12/22, 3:04 AM: -- {quote} bq. The app uses a 0ms join window with a 0ms That is pretty aggressive and will result in a retention of 0ms, too. Thus, every out-of-order record will be considered late and would be dropped (and you would see the corresponding logging). {quote} In terms of joining (which is done per key), that is the behavior we want. However it seems the dropping of messages is based on "partition time", i.e. per partition? As we don't have monotonically increasing timestamps across keys, that would mean we have an actual problem... So a solution would perhaps be to increase the grace. I will check this out. was (Author: eugendueck): {quote} bq. The app uses a 0ms join window with a 0ms That is pretty aggressive and will result in a retention of 0ms, too. Thus, every out-of-order record will be considered late and would be dropped (and you would see the corresponding logging). {quote} In terms of joining (which is done per key), that is the behavior we want. However it seems the dropping of messages is based on "partition time", i.e. per partition? As we don't have monotonically increasing timestamps across keys, that would mean we have an actual problem... So a solution would be to increase the window size or the grace period (which I honestly still don't know what the difference would be - a link or so to an explanation would be awesome), at the cost of having massively increased number of join pairs. > Bulk processing correctly ordered input data through a join with > kafka-streams results in `Skipping record for expired segment` > --- > > Key: KAFKA-13289 > URL: https://issues.apache.org/jira/browse/KAFKA-13289 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.8.0 >Reporter: Matthew Sheppard >Priority: Minor > > When pushing bulk data through a kafka-steams app, I see it log the following > message many times... > {noformat} > WARN > org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - > Skipping record for expired segment. > {noformat} > ...and data which I expect to have been joined through a leftJoin step > appears to be lost. > I've seen this in practice either when my application has been shut down for > a while and then is brought back up, or when I've used something like the > [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html) > in an attempt to have the application reprocess past data. > I was able to reproduce this behaviour in isolation by generating 1000 > messages to two topics spaced an hour apart (with the original timestamps in > order), then having kafka streams select a key for them and try to leftJoin > the two rekeyed streams. > Self contained source code for that reproduction is available at > https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java > The actual kafka-streams topology in there looks like this. > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KStream leftStream = > builder.stream(leftTopic); > final KStream rightStream = > builder.stream(rightTopic); > final KStream rekeyedLeftStream = leftStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > final KStream rekeyedRightStream = rightStream > .selectKey((k, v) -> v.substring(0, v.indexOf(":"))); > JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5)); > final KStream joined = rekeyedLeftStream.leftJoin( > rekeyedRightStream, > (left, right) -> left + "/" + right, > joinWindow > ); > {code} > ...and the eventual output I produce looks like this... > {code} > ... > 523 [523,left/null] > 524 [524,left/null, 524,left/524,right] > 525 [525,left/525,right] > 526 [526,left/null] > 527 [527,left/null] > 528 [528,left/528,right] > 529 [529,left/null] > 530 [530,left/null] > 531 [531,left/null, 531,left/531,right] > 532 [532,left/null] > 533 [533,left/null] > 534 [534,left/null, 534,left/534,right] > 535 [535,left/null] > 536 [536,left/null] > 537 [537,left/null, 537,left/537,right] > 538 [538,left/null] > 539 [539,left/null] > 540 [540,left/null] > 541 [541,left/null] > 542 [542,left/null] > 543 [543,left/null] > ... > {code} > ...where as, given the input data, I expect to see every row end with the two > values
[GitHub] [kafka] cmccabe commented on pull request #11668: MINOR: Update KRaft controller's event processing time
cmccabe commented on pull request #11668: URL: https://github.com/apache/kafka/pull/11668#issuecomment-1010658277 Test failures are not related -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe merged pull request #11668: MINOR: Fix confusion between EventQueueProcessingTimeMs and EventQueueTimeMs
cmccabe merged pull request #11668: URL: https://github.com/apache/kafka/pull/11668 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lmr3796 opened a new pull request #11669: MINOR: Replace if/else with match in KafkaZkClient#getPartitionAssignmentForTopics
lmr3796 opened a new pull request #11669: URL: https://github.com/apache/kafka/pull/11669 The patch apache#4196 (commit f300480f) replaced the if/else with case match in `KafkaZkClient`. The method`getPartitionAssignmentForTopics` seems to be missed in the patch, while similar ones like `getFullReplicaAssignmentForTopics`, `getReplicaAssignmentAndTopicIdForTopics`, etc. adopted the `case-match` style. This PR changes it to make the code style more consistent. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] lmr3796 commented on pull request #11669: MINOR: Replace if/else with match in KafkaZkClient#getPartitionAssignmentForTopics
lmr3796 commented on pull request #11669: URL: https://github.com/apache/kafka/pull/11669#issuecomment-1010717423 Hi @ijuma this serves as a follow-up on #4196. I saw you reviewed that PR and wondering if you can also review this one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org