[jira] [Commented] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-01-11 Thread Seungchan Ahn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17472542#comment-17472542
 ] 

Seungchan Ahn commented on KAFKA-13217:
---

[~ableegoldman] [~guozhang] Hello, can I have a chance to work on this? If yes, 
could you add me to the contributor list? I've sent a mail to the dev mailing 
list.
 * title: Request to contribute
 * link: https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3

> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necessary or at least 
> much less useful given recent features and improvements. For example 
> rebalances are now lightweight so skipping the 2nd rebalance is not as worth 
> optimizing for, and the new assignor will take into account the actual 
> underlying state for each task/partition assignment, rather than just the 
> previous assignment, so the assignment should be considerably more stable 
> across bounces and rolling restarts. 
>  
> Given that, it might be time to reconsider this optimization. Alternatively, 
> we could introduce another form of the close() API that forces the member to 
> leave the group, to be used in event of actual scale down rather than a 
> transient bounce.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-01-11 Thread Seungchan Ahn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17472542#comment-17472542
 ] 

Seungchan Ahn edited comment on KAFKA-13217 at 1/11/22, 8:23 AM:
-

[~ableegoldman] [~guozhang] Hello 👋 can I have a chance to work on this? If 
yes, could you add me to the contributor list? I've sent a mail to the dev 
mailing list.
 * title: Request to contribute
 * link: [https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3]


was (Author: JIRAUSER283196):
[~ableegoldman] [~guozhang] Hello, can I have a chance to work on this? If yes, 
could you add me to the contributor list? I've sent a mail to the dev mailing 
list.
 * title: Request to contribute
 * link: https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3

> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necessary or at least 
> much less useful given recent features and improvements. For example 
> rebalances are now lightweight so skipping the 2nd rebalance is not as worth 
> optimizing for, and the new assignor will take into account the actual 
> underlying state for each task/partition assignment, rather than just the 
> previous assignment, so the assignment should be considerably more stable 
> across bounces and rolling restarts. 
>  
> Given that, it might be time to reconsider this optimization. Alternatively, 
> we could introduce another form of the close() API that forces the member to 
> leave the group, to be used in event of actual scale down rather than a 
> transient bounce.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

2022-01-11 Thread GitBox


ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r781948695



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 // parse the topology to determine the repartition source topics,
 // making sure they are created with the number of partitions as
 // the maximum of the depending sub-topologies source topics' 
number of partitions
-final Map 
allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+final RepartitionTopics repartitionTopics = new RepartitionTopics(
+taskManager.topologyMetadata(),
+internalTopicManager,
+copartitionedTopicsEnforcer,
+metadata,
+logPrefix
+);
+
+final Map> 
missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+log.error("The following source topics are missing/unknown: 
{}. Please make sure all source topics " +
+  "have been pre-created before starting the 
Streams application. ",
+  taskManager.topologyMetadata().hasNamedTopologies() ?
+  missingExternalSourceTopicsPerTopology :
+  missingExternalSourceTopicsPerTopology.values()
+);
+if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+throw new MissingSourceTopicException("Missing source 
topics.");
+}
+}
+
+final Map 
allRepartitionTopicPartitions = repartitionTopics.topicPartitionsInfo();

Review comment:
   ok, ok, you guys convinced me like 4 comments ago :P




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11566: KAFKA-13495: add reason to JoinGroupRequest

2022-01-11 Thread GitBox


dajac commented on a change in pull request #11566:
URL: https://github.com/apache/kafka/pull/11566#discussion_r781968801



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -1690,6 +1690,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 joinGroupRequest.data.protocolType,
 protocols,
 sendResponseCallback,
+Option(joinGroupRequest.data.reason()),

Review comment:
   nit: You can remove the parenthesis after `reason`.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 final RuntimeException exception = future.exception();
 
 resetJoinGroupFuture();
+rejoinReason = "rebalance failed due to '" + 
exception.getMessage() + "' (" + exception.getClass().getSimpleName() + ")";

Review comment:
   Also, it might be better to use `synchronized (AbstractCoordinator.this) 
{ }` to mutate both `rejoinReason` and `rejoinNeeded` in order to ensure that 
they are consistent with each others.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -467,6 +468,7 @@ boolean joinGroupIfNeeded(final Timer timer) {
 final RuntimeException exception = future.exception();
 
 resetJoinGroupFuture();
+rejoinReason = "rebalance failed due to '" + 
exception.getMessage() + "' (" + exception.getClass().getSimpleName() + ")";

Review comment:
   nit: Would it make sense to use `String.format` like we did at L460?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #11571: KAFKA-13496: add reason to LeaveGroupRequest

2022-01-11 Thread GitBox


dajac commented on a change in pull request #11571:
URL: https://github.com/apache/kafka/pull/11571#discussion_r782071080



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -3907,6 +3909,44 @@ public void testRemoveMembersFromGroup() throws 
Exception {
 }
 }
 
+@Test
+public void testRemoveMembersFromGroupReason() throws Exception {
+final Cluster cluster = mockCluster(3, 0);
+final Time time = new MockTime();
+
+try (AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(time, 
cluster)) {
+
+env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+
+
env.kafkaClient().prepareResponse(prepareFindCoordinatorResponse(Errors.NONE, 
env.cluster().controller()));
+env.kafkaClient().prepareResponse(body -> {
+if (!(body instanceof LeaveGroupRequest)) {
+return false;
+}
+LeaveGroupRequestData leaveGroupRequest = ((LeaveGroupRequest) 
body).data();
+
+return leaveGroupRequest.members().stream().allMatch(
+member -> member.reason().equals("member was removed by an 
admin: testing remove members reason")
+);
+}, new LeaveGroupResponse(new 
LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers(
+Arrays.asList(
+new MemberResponse().setGroupInstanceId("instance-1"),
+new MemberResponse().setGroupInstanceId("instance-2")
+))
+));
+
+Collection membersToRemove = Arrays.asList(new 
MemberToRemove("instance-1"), new MemberToRemove("instance-2"));
+
+RemoveMembersFromConsumerGroupOptions options = new 
RemoveMembersFromConsumerGroupOptions(membersToRemove);
+options.reason("testing remove members reason");

Review comment:
   Should we also add a test that does not specify the reason to test the 
default case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13585) Fix `kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds` flaky test

2022-01-11 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-13585:
---

Assignee: David Jacot

> Fix 
> `kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds` 
> flaky test
> -
>
> Key: KAFKA-13585
> URL: https://issues.apache.org/jira/browse/KAFKA-13585
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Major
>
> {noformat}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162)
>   at 
> app//kafka.server.ReplicaManagerTest.assertFetcherHasTopicId(ReplicaManagerTest.scala:3502)
>   at 
> app//kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds(ReplicaManagerTest.scala:3572)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13585) Fix `kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds` flaky test

2022-01-11 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot updated KAFKA-13585:

Priority: Minor  (was: Major)

> Fix 
> `kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds` 
> flaky test
> -
>
> Key: KAFKA-13585
> URL: https://issues.apache.org/jira/browse/KAFKA-13585
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Minor
>
> {noformat}
> org.opentest4j.AssertionFailedError: expected:  but was: 
>   at 
> app//org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:55)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:40)
>   at app//org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:35)
>   at app//org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:162)
>   at 
> app//kafka.server.ReplicaManagerTest.assertFetcherHasTopicId(ReplicaManagerTest.scala:3502)
>   at 
> app//kafka.server.ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds(ReplicaManagerTest.scala:3572)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] YeonCheolGit closed pull request #11660: MINOR: fix comment in ClusterConnectionStates

2022-01-11 Thread GitBox


YeonCheolGit closed pull request #11660:
URL: https://github.com/apache/kafka/pull/11660


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] YeonCheolGit commented on a change in pull request #11660: MINOR: fix comment in ClusterConnectionStates

2022-01-11 Thread GitBox


YeonCheolGit commented on a change in pull request #11660:
URL: https://github.com/apache/kafka/pull/11660#discussion_r782195849



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -68,7 +68,7 @@ public ClusterConnectionStates(long reconnectBackoffMs, long 
reconnectBackoffMax
 }
 
 /**
- * Return true iff we can currently initiate a new connection. This will 
be the case if we are not
+ * Return true if we can currently initiate a new connection. This will be 
the case if we are not

Review comment:
   @junrao 
   Oh really didn't know about that. Sorry for interrupt you and thank you for 
letting me know:) 
   I will close this PR.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

2022-01-11 Thread GitBox


ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r781814872



##
File path: checkstyle/suppressions.xml
##
@@ -158,7 +158,7 @@
   
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread).java"/>
 
 
+  files="(KTableImpl|StreamsPartitionAssignor).java"/>

Review comment:
   I would prefer to leave out any nontrivial refactoring such as breaking 
up the assignor's ridiculously long `assign` method, though I do agree that it 
needs to be broken up (again -- I actually did a pretty hefty refactoring to 
clean this method up a while back, I guess it's just gotten out of control 
again since then :/ )
   
   edit: nevermind, I will clean it up in this PR and remove the suppression. 
Lol it's hard to argue against something you and Guozhang left like 5 comments 
to request 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

2022-01-11 Thread GitBox


ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r782209624



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 // parse the topology to determine the repartition source topics,
 // making sure they are created with the number of partitions as
 // the maximum of the depending sub-topologies source topics' 
number of partitions
-final Map 
allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+final RepartitionTopics repartitionTopics = new RepartitionTopics(

Review comment:
   I wanted to break up the `prepareRepartitinTopics` call into explicit 
steps so I could get the missing source topics returned by the middle step, but 
I think there's a better way to achieve what we need here so I'll clean it up




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

2022-01-11 Thread GitBox


ableegoldman commented on a change in pull request #11600:
URL: https://github.com/apache/kafka/pull/11600#discussion_r782215325



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -380,15 +380,37 @@ public GroupAssignment assign(final Cluster metadata, 
final GroupSubscription gr
 // parse the topology to determine the repartition source topics,
 // making sure they are created with the number of partitions as
 // the maximum of the depending sub-topologies source topics' 
number of partitions
-final Map 
allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+final RepartitionTopics repartitionTopics = new RepartitionTopics(
+taskManager.topologyMetadata(),
+internalTopicManager,
+copartitionedTopicsEnforcer,
+metadata,
+logPrefix
+);
+
+final Map> 
missingExternalSourceTopicsPerTopology = repartitionTopics.setup();
+if (!missingExternalSourceTopicsPerTopology.isEmpty()) {
+log.error("The following source topics are missing/unknown: 
{}. Please make sure all source topics " +
+  "have been pre-created before starting the 
Streams application. ",
+  taskManager.topologyMetadata().hasNamedTopologies() ?
+  missingExternalSourceTopicsPerTopology :
+  missingExternalSourceTopicsPerTopology.values()
+);

Review comment:
   Good point, I think I changed how things worked halfway through this PR 
and forgot to update/put some things back the way they were when the changes 
were no longer necessary (like this and inlining the `prepareRepartitionTopics` 
method)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13591) Fix flaky test ControllerIntegrationTest.testTopicIdCreatedOnUpgrade

2022-01-11 Thread David Jacot (Jira)
David Jacot created KAFKA-13591:
---

 Summary: Fix flaky test 
ControllerIntegrationTest.testTopicIdCreatedOnUpgrade
 Key: KAFKA-13591
 URL: https://issues.apache.org/jira/browse/KAFKA-13591
 Project: Kafka
  Issue Type: Bug
Reporter: David Jacot



{noformat}
org.opentest4j.AssertionFailedError: expected: not equal but was: 
at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39)
at 
org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:276)
at 
org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:265)
at 
org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:260)
at 
org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:2798)
at 
kafka.controller.ControllerIntegrationTest.testTopicIdCreatedOnUpgrade(ControllerIntegrationTest.scala:1140)
{noformat}




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dajac opened a new pull request #11665: KAFKA-13585; Fix flaky test `ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds`

2022-01-11 Thread GitBox


dajac opened a new pull request #11665:
URL: https://github.com/apache/kafka/pull/11665


   `ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds` fails 
regularly because `assertFetcherHasTopicId` can't validate the topic id in the 
fetch state. The issue is quite subtile. Under the hood, 
`assertFetcherHasTopicId` acquires the `partitionMapLock` in the fetcher 
thread. `partitionMapLock` is also acquired by the the `processFetchRequest` 
method. If `processFetchRequest` acquires it before `assertFetcherHasTopicId` 
can check the fetch state, `assertFetcherHasTopicId` has not chance to verify 
the state anymore because `processFetchRequest` will remove the fetch state 
before releasing the lock.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #11666: KAFKA-13591; Fix flaky test `ControllerIntegrationTest.testTopicIdCreatedOnUpgrade`

2022-01-11 Thread GitBox


dajac opened a new pull request #11666:
URL: https://github.com/apache/kafka/pull/11666


   The issue is that when 
`zkClient.getTopicIdsForTopics(Set(tp.topic)).get(tp.topic)` is called after 
the new controller is brought up, there is not guarantee that the controller 
has already written the topic id to the topic znode.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] nicktelford commented on pull request #11610: KAFKA-13549: Add min.repartition.purge.interval.ms

2022-01-11 Thread GitBox


nicktelford commented on pull request #11610:
URL: https://github.com/apache/kafka/pull/11610#issuecomment-1010106545


   KIP available at: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-811%3A+Add+separate+min.repartition.purge.interval.ms+to+Kafka+Streams
   
   The commit and PR description have been updated to the latest specification 
from the KIP, based on the dev list discussion.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #11615: KAFKA-13546: Do not fail connector if default topic creation group is explicitly specified

2022-01-11 Thread GitBox


C0urante commented on a change in pull request #11615:
URL: https://github.com/apache/kafka/pull/11615#discussion_r782322703



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
##
@@ -64,6 +67,18 @@
 return props;
 }
 
+@Test
+public void 
shouldNotFailWithExplicitlySpecifiedDefaultTopicCreationGroup() {
+Map props = defaultConnectorProps();
+//Ensure default group is omitted even if its specified multiple times.
+props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", 
DEFAULT_TOPIC_CREATION_GROUP,
+TOPIC_CREATION_GROUP_1, DEFAULT_TOPIC_CREATION_GROUP, 
TOPIC_CREATION_GROUP_2));
+props.put(TOPIC_CREATION_GROUPS_CONFIG, DEFAULT_TOPIC_CREATION_GROUP);

Review comment:
   Was this left in accidentally? Looks like it overrides the line above.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #11615: KAFKA-13546: Do not fail connector if default topic creation group is explicitly specified

2022-01-11 Thread GitBox


C0urante commented on a change in pull request #11615:
URL: https://github.com/apache/kafka/pull/11615#discussion_r782322703



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceConnectorConfigTest.java
##
@@ -64,6 +67,18 @@
 return props;
 }
 
+@Test
+public void 
shouldNotFailWithExplicitlySpecifiedDefaultTopicCreationGroup() {
+Map props = defaultConnectorProps();
+//Ensure default group is omitted even if its specified multiple times.
+props.put(TOPIC_CREATION_GROUPS_CONFIG, String.join(",", 
DEFAULT_TOPIC_CREATION_GROUP,
+TOPIC_CREATION_GROUP_1, DEFAULT_TOPIC_CREATION_GROUP, 
TOPIC_CREATION_GROUP_2));
+props.put(TOPIC_CREATION_GROUPS_CONFIG, DEFAULT_TOPIC_CREATION_GROUP);

Review comment:
   Was this left in accidentally? Looks like it overrides the line above, 
which seems more suitable for this test case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13591) Fix flaky test ControllerIntegrationTest.testTopicIdCreatedOnUpgrade

2022-01-11 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-13591:
---

Assignee: David Jacot

> Fix flaky test ControllerIntegrationTest.testTopicIdCreatedOnUpgrade
> 
>
> Key: KAFKA-13591
> URL: https://issues.apache.org/jira/browse/KAFKA-13591
> Project: Kafka
>  Issue Type: Bug
>Reporter: David Jacot
>Assignee: David Jacot
>Priority: Minor
>
> {noformat}
> org.opentest4j.AssertionFailedError: expected: not equal but was: 
>   at org.junit.jupiter.api.AssertionUtils.fail(AssertionUtils.java:39)
>   at 
> org.junit.jupiter.api.AssertNotEquals.failEqual(AssertNotEquals.java:276)
>   at 
> org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:265)
>   at 
> org.junit.jupiter.api.AssertNotEquals.assertNotEquals(AssertNotEquals.java:260)
>   at 
> org.junit.jupiter.api.Assertions.assertNotEquals(Assertions.java:2798)
>   at 
> kafka.controller.ControllerIntegrationTest.testTopicIdCreatedOnUpgrade(ControllerIntegrationTest.scala:1140)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] diegoazevedo commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-01-11 Thread GitBox


diegoazevedo commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1010167705


   @vlsi the problem is that version 1.* is deprecated since 2015. 
   Details: 
https://blogs.apache.org/foundation/entry/apache_logging_services_project_announces
   
   The vulnerability reported by security scanners (like Tenable Nessus) is 
exactly that: the use of an "unsupported Version". So I think the movement for 
2.* is correct.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vlsi commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-01-11 Thread GitBox


vlsi commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1010208088


   > problem is that version 1.* is deprecated since 2015
   
   @diegoazevedo , let me explain: if there are people willing to support log4j 
1.x, then it could be supported and maintained just fine.
   
   https://reload4j.qos.ch/ is a fork by @ceki (the one who created log4j in 
the first place!)
   
   As I highlighted above, there are individuals (including ASF committers like 
myself or even ASF members) who are willing to volunteer on supporting log4j 
1.x. Currently, the question has not yet been decided by the ASF, however, I do 
not see how they can "forbid" maintaining 1.x provided the version is wildly 
used in the industry, there's a high demand on the fixes, and there are 
individuals to work on that.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11665: KAFKA-13585; Fix flaky test `ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds`

2022-01-11 Thread GitBox


jolshan commented on pull request #11665:
URL: https://github.com/apache/kafka/pull/11665#issuecomment-1010212921


   Is this test useful if we can't verify the topic ID? I think we have a 
similar one that doesn't check topic ID?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #11665: KAFKA-13585; Fix flaky test `ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds`

2022-01-11 Thread GitBox


dajac commented on pull request #11665:
URL: https://github.com/apache/kafka/pull/11665#issuecomment-1010214440


   > Is this test useful if we can't verify the topic ID? I think we have a 
similar one that doesn't check topic ID?
   
   Do we? I haven't found it...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on pull request #11665: KAFKA-13585; Fix flaky test `ReplicaManagerTest.testReplicaAlterLogDirsWithAndWithoutIds`

2022-01-11 Thread GitBox


jolshan commented on pull request #11665:
URL: https://github.com/apache/kafka/pull/11665#issuecomment-1010224534


   Strange -- I might be misremembering because I can't seem to find it either.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vlsi edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2022-01-11 Thread GitBox


vlsi edited a comment on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-1010208088


   > problem is that version 1.* is deprecated since 2015
   
   @diegoazevedo , let me explain: if there are people willing to support log4j 
1.x, then it could be supported and maintained just fine.
   
   https://reload4j.qos.ch/ is a fork by @ceki (the one who created log4j in 
the first place!)
   
   As I highlighted above, there are individuals (including ASF committers like 
myself or even ASF members) who are willing to volunteer on supporting log4j 
1.x. Currently, the question has not yet been decided by the ASF, however, I do 
not see how they can "forbid" maintaining 1.x provided the version is wildly 
used in the industry, there's a high demand on the fixes, and there are 
individuals to work on that.
   
   Of course, if the ASF allows the volunteers to maintain 1.x, then reload4j 
will not be required. However, if the ASF blocks 1.x (for any reason), then 
reload4j might be way better for the consumers than migrating to 2.x or 
something else.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-11 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17473149#comment-17473149
 ] 

Matthias J. Sax commented on KAFKA-13289:
-

{quote}The app uses a 0ms join window with a 0ms
{quote}
That is pretty aggressive and will result in a retention of 0ms, too. Thus, 
every out-of-order record will be considered late and would be dropped (and you 
would see the corresponding logging).
{quote}Here's a question - could a large difference between message timestamp 
(used for joining) and system time be causing the "Skipping record" messages?
{quote}
It should not. System time should be totally out of the picture.
{quote}I (accidentally) noticed that one way to reproduce "Skipping record for 
expired segment" messages seems to be adding messages to the input topics with 
older timestamps than those of earlier messages, i.e. going back in time.
{quote}
That is expected behavior. – Not sure what you mean by "would not mind". Kafka 
Streams can process out-of-order data, but of course if will drop data that is 
older than the configured retention period (for this case, you see the 
corresponding logs).

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but s

[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

2022-01-11 Thread GitBox


rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r782538135



##
File path: core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
##
@@ -196,14 +199,16 @@ abstract class QuorumTestHarness extends Logging {
 }
   }
 
-  def createAndStartBroker(config: KafkaConfig,
-   time: Time = Time.SYSTEM): KafkaBroker = {
-implementation.createAndStartBroker(config,
-  time)
+  def createBroker(config: KafkaConfig,
+   time: Time = Time.SYSTEM,
+   startup: Boolean = true): KafkaBroker = {
+implementation.createBroker(config, time, startup)
   }
 
   def shutdownZooKeeper(): Unit = asZk().shutdown()
 
+  def shutdownKRaftController(): Unit = asKRaft().shutdown()

Review comment:
   Good point. There is no `shutdown()` method that we can invoke directly, 
but here we do want the RaftManager to not be shut down, so I've adjusted the 
method accordingly and kept the name.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #11606: MINOR: Add shutdown tests for KRaft

2022-01-11 Thread GitBox


rondagostino commented on a change in pull request #11606:
URL: https://github.com/apache/kafka/pull/11606#discussion_r782539950



##
File path: 
core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
##
@@ -253,4 +248,32 @@ abstract class KafkaServerTestHarness extends 
QuorumTestHarness {
 getController().kafkaController.controllerContext.topicNames.toMap
   }
 
+  private def createBrokers(startup: Boolean): Unit = {
+// Add each broker to `servers` buffer as soon as it is created to ensure 
that brokers
+// are shutdown cleanly in tearDown even if a subsequent broker fails to 
start
+for (config <- configs) {
+  val broker = createBrokerFromConfig(config)
+  _brokers += broker
+  if (startup) {
+broker.startup()

Review comment:
   This PR is not changing how this works in general, so this comment 
applies to the pre-PR implementation as well.  I think the assumption is that 
the list of alive brokers is only valid if **all** brokers get created and are 
(now optionally) started -- and the test should be failed/torn down if an 
exception should occur that prevents all brokers from being processed.  I'll 
add a comment to this effect in `recreateBrokers()` (which is the only place 
this gets invoked aside from in `setUp()`).




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-01-11 Thread Seungchan Ahn (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seungchan Ahn reassigned KAFKA-13217:
-

Assignee: Seungchan Ahn

> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Seungchan Ahn
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necessary or at least 
> much less useful given recent features and improvements. For example 
> rebalances are now lightweight so skipping the 2nd rebalance is not as worth 
> optimizing for, and the new assignor will take into account the actual 
> underlying state for each task/partition assignment, rather than just the 
> previous assignment, so the assignment should be considerably more stable 
> across bounces and rolling restarts. 
>  
> Given that, it might be time to reconsider this optimization. Alternatively, 
> we could introduce another form of the close() API that forces the member to 
> leave the group, to be used in event of actual scale down rather than a 
> transient bounce.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-01-11 Thread Seungchan Ahn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17472542#comment-17472542
 ] 

Seungchan Ahn edited comment on KAFKA-13217 at 1/12/22, 12:23 AM:
--

[~ableegoldman] -- [~guozhang] -Hello 👋 can I have a chance to work on this? If 
yes, could you add me to the contributor list? I've sent a mail to the dev 
mailing list.-
 * -title: Request to contribute-
 * -link:- [-https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3-]

FYI, It's done by another member.
 * on: https://lists.apache.org/thread/hyo61vnx2d65511trbr179sqmdgcgmyk


was (Author: JIRAUSER283196):
[~ableegoldman] [~guozhang] Hello 👋 can I have a chance to work on this? If 
yes, could you add me to the contributor list? I've sent a mail to the dev 
mailing list.
 * title: Request to contribute
 * link: [https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3]

> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Seungchan Ahn
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necessary or at least 
> much less useful given recent features and improvements. For example 
> rebalances are now lightweight so skipping the 2nd rebalance is not as worth 
> optimizing for, and the new assignor will take into account the actual 
> underlying state for each task/partition assignment, rather than just the 
> previous assignment, so the assignment should be considerably more stable 
> across bounces and rolling restarts. 
>  
> Given that, it might be time to reconsider this optimization. Alternatively, 
> we could introduce another form of the close() API that forces the member to 
> leave the group, to be used in event of actual scale down rather than a 
> transient bounce.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Comment Edited] (KAFKA-13217) Reconsider skipping the LeaveGroup on close() or add an overload that does so

2022-01-11 Thread Seungchan Ahn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17472542#comment-17472542
 ] 

Seungchan Ahn edited comment on KAFKA-13217 at 1/12/22, 12:24 AM:
--

[~ableegoldman] – [~guozhang] -Hello 👋 can I have a chance to work on this? If 
yes, could you add me to the contributor list? I've sent a mail to the dev 
mailing list.-
 * -title: Request to contribute-
 * -link: [https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3]-

FYI, It's done by another member.
 * on: [https://lists.apache.org/thread/hyo61vnx2d65511trbr179sqmdgcgmyk]


was (Author: JIRAUSER283196):
[~ableegoldman] -- [~guozhang] -Hello 👋 can I have a chance to work on this? If 
yes, could you add me to the contributor list? I've sent a mail to the dev 
mailing list.-
 * -title: Request to contribute-
 * -link:- [-https://lists.apache.org/thread/hq2szmlg0c1hh9w4rpxjw3mh82ml6qw3-]

FYI, It's done by another member.
 * on: https://lists.apache.org/thread/hyo61vnx2d65511trbr179sqmdgcgmyk

> Reconsider skipping the LeaveGroup on close() or add an overload that does so
> -
>
> Key: KAFKA-13217
> URL: https://issues.apache.org/jira/browse/KAFKA-13217
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Seungchan Ahn
>Priority: Major
>  Labels: needs-kip, newbie, newbie++
>
> In Kafka Streams, when an instance is shut down via the close() API, we 
> intentionally skip sending a LeaveGroup request. This is because often the 
> shutdown is not due to a scaling down event but instead some transient 
> closure, such as during a rolling bounce. In cases where the instance is 
> expected to start up again shortly after, we originally wanted to avoid that 
> member's tasks from being redistributed across the remaining group members 
> since this would disturb the stable assignment and could cause unnecessary 
> state migration and restoration. We also hoped
> to limit the disruption to just a single rebalance, rather than forcing the 
> group to rebalance once when the member shuts down and then again when it 
> comes back up. So it's really an optimization  for the case in which the 
> shutdown is temporary.
>  
> That said, many of those optimizations are no longer necessary or at least 
> much less useful given recent features and improvements. For example 
> rebalances are now lightweight so skipping the 2nd rebalance is not as worth 
> optimizing for, and the new assignor will take into account the actual 
> underlying state for each task/partition assignment, rather than just the 
> previous assignment, so the assignment should be considerably more stable 
> across bounces and rolling restarts. 
>  
> Given that, it might be time to reconsider this optimization. Alternatively, 
> we could introduce another form of the close() API that forces the member to 
> leave the group, to be used in event of actual scale down rather than a 
> transient bounce.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-11 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17473235#comment-17473235
 ] 

Eugen Dück commented on KAFKA-13289:


{quote}
bq. The app uses a 0ms join window with a 0ms

That is pretty aggressive and will result in a retention of 0ms, too. Thus, 
every out-of-order record will be considered late and would be dropped (and you 
would see the corresponding logging).
{quote}

In terms of joining (which is done per key), that is the behavior we want. 
However it seems the dropping of messages is based on "partition time", i.e. 
per partition? As we don't have monotonically increasing timestamps across 
keys, that would mean we have an actual problem...
So a solution would be to increase the window size or the grace period (which I 
honestly still don't know what the difference would be - a link or so to an 
explanation would be awesome), at the cost of having massively increased number 
of join pairs.

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values joined, rather than the right value being null.
> Note that I understand it's expected that we initially get the left/null 
> values for many values since that's the expected semantics of kafka-streams 
> left join, at least until 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-ImprovedLeft/OuterStream-StreamJoin(v3.1.xandnewer)spurious
> I've noticed that if I set a very large grace value on the join window the 
> problem is solved, but since the input I provide is not out of order I did 
> not expect to need to do that, and I'm weary of the resource requirements 
> doing so in practice on an application with a lot of volume.
>

[GitHub] [kafka] dengziming opened a new pull request #11667: MINOR; Enable Kraft in ApiVersionTest

2022-01-11 Thread GitBox


dengziming opened a new pull request #11667:
URL: https://github.com/apache/kafka/pull/11667


   *More detailed description of your change*
   1. Replace ClusterTestExtensions with QuorumTestHarness
   2. Enable Kraft in ApiVersionTest
   
   *Summary of testing strategy (including rationale)*
   QA
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13242) KRaft Controller doesn't handle UpdateFeaturesRequest

2022-01-11 Thread dengziming (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

dengziming updated KAFKA-13242:
---
Parent: KAFKA-13410
Issue Type: Sub-task  (was: Bug)

> KRaft Controller doesn't handle UpdateFeaturesRequest
> -
>
> Key: KAFKA-13242
> URL: https://issues.apache.org/jira/browse/KAFKA-13242
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: dengziming
>Assignee: dengziming
>Priority: Major
> Fix For: 3.0.1
>
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] jsancio opened a new pull request #11668: MINOR: Update KRaft controller's event processing time

2022-01-11 Thread GitBox


jsancio opened a new pull request #11668:
URL: https://github.com/apache/kafka/pull/11668


   Make sure that the event queue processing time histogram gets updated
   and add tests that verify that the update methods modify the correct
   histogram.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-13289) Bulk processing correctly ordered input data through a join with kafka-streams results in `Skipping record for expired segment`

2022-01-11 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-13289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17473235#comment-17473235
 ] 

Eugen Dück edited comment on KAFKA-13289 at 1/12/22, 3:04 AM:
--

{quote}
bq. The app uses a 0ms join window with a 0ms

That is pretty aggressive and will result in a retention of 0ms, too. Thus, 
every out-of-order record will be considered late and would be dropped (and you 
would see the corresponding logging).
{quote}

In terms of joining (which is done per key), that is the behavior we want. 
However it seems the dropping of messages is based on "partition time", i.e. 
per partition? As we don't have monotonically increasing timestamps across 
keys, that would mean we have an actual problem...
So a solution would perhaps be to increase the grace. I will check this out.


was (Author: eugendueck):
{quote}
bq. The app uses a 0ms join window with a 0ms

That is pretty aggressive and will result in a retention of 0ms, too. Thus, 
every out-of-order record will be considered late and would be dropped (and you 
would see the corresponding logging).
{quote}

In terms of joining (which is done per key), that is the behavior we want. 
However it seems the dropping of messages is based on "partition time", i.e. 
per partition? As we don't have monotonically increasing timestamps across 
keys, that would mean we have an actual problem...
So a solution would be to increase the window size or the grace period (which I 
honestly still don't know what the difference would be - a link or so to an 
explanation would be awesome), at the cost of having massively increased number 
of join pairs.

> Bulk processing correctly ordered input data through a join with 
> kafka-streams results in `Skipping record for expired segment`
> ---
>
> Key: KAFKA-13289
> URL: https://issues.apache.org/jira/browse/KAFKA-13289
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.8.0
>Reporter: Matthew Sheppard
>Priority: Minor
>
> When pushing bulk data through a kafka-steams app, I see it log the following 
> message many times...
> {noformat}
> WARN 
> org.apache.kafka.streams.state.internals.AbstractRocksDBSegmentedBytesStore - 
> Skipping record for expired segment.
> {noformat}
> ...and data which I expect to have been joined through a leftJoin step 
> appears to be lost.
> I've seen this in practice either when my application has been shut down for 
> a while and then is brought back up, or when I've used something like the 
> [app-reset-rool](https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html)
>  in an attempt to have the application reprocess past data.
> I was able to reproduce this behaviour in isolation by generating 1000 
> messages to two topics spaced an hour apart (with the original timestamps in 
> order), then having kafka streams select a key for them and try to leftJoin 
> the two rekeyed streams.
> Self contained source code for that reproduction is available at 
> https://github.com/mattsheppard/ins14809/blob/main/src/test/java/ins14809/Ins14809Test.java
> The actual kafka-streams topology in there looks like this.
> {code:java}
> final StreamsBuilder builder = new StreamsBuilder();
> final KStream leftStream = 
> builder.stream(leftTopic);
> final KStream rightStream = 
> builder.stream(rightTopic);
> final KStream rekeyedLeftStream = leftStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> final KStream rekeyedRightStream = rightStream
> .selectKey((k, v) -> v.substring(0, v.indexOf(":")));
> JoinWindows joinWindow = JoinWindows.of(Duration.ofSeconds(5));
> final KStream joined = rekeyedLeftStream.leftJoin(
> rekeyedRightStream,
> (left, right) -> left + "/" + right,
> joinWindow
> );
> {code}
> ...and the eventual output I produce looks like this...
> {code}
> ...
> 523 [523,left/null]
> 524 [524,left/null, 524,left/524,right]
> 525 [525,left/525,right]
> 526 [526,left/null]
> 527 [527,left/null]
> 528 [528,left/528,right]
> 529 [529,left/null]
> 530 [530,left/null]
> 531 [531,left/null, 531,left/531,right]
> 532 [532,left/null]
> 533 [533,left/null]
> 534 [534,left/null, 534,left/534,right]
> 535 [535,left/null]
> 536 [536,left/null]
> 537 [537,left/null, 537,left/537,right]
> 538 [538,left/null]
> 539 [539,left/null]
> 540 [540,left/null]
> 541 [541,left/null]
> 542 [542,left/null]
> 543 [543,left/null]
> ...
> {code}
> ...where as, given the input data, I expect to see every row end with the two 
> values

[GitHub] [kafka] cmccabe commented on pull request #11668: MINOR: Update KRaft controller's event processing time

2022-01-11 Thread GitBox


cmccabe commented on pull request #11668:
URL: https://github.com/apache/kafka/pull/11668#issuecomment-1010658277


   Test failures are not related


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe merged pull request #11668: MINOR: Fix confusion between EventQueueProcessingTimeMs and EventQueueTimeMs

2022-01-11 Thread GitBox


cmccabe merged pull request #11668:
URL: https://github.com/apache/kafka/pull/11668


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lmr3796 opened a new pull request #11669: MINOR: Replace if/else with match in KafkaZkClient#getPartitionAssignmentForTopics

2022-01-11 Thread GitBox


lmr3796 opened a new pull request #11669:
URL: https://github.com/apache/kafka/pull/11669


   The patch apache#4196 (commit f300480f) replaced the if/else with case match 
in `KafkaZkClient`.
   The method`getPartitionAssignmentForTopics` seems to be missed in the patch, 
while similar ones like `getFullReplicaAssignmentForTopics`, 
`getReplicaAssignmentAndTopicIdForTopics`, etc. adopted the `case-match` style.
   
   This PR changes it to make the code style more consistent.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lmr3796 commented on pull request #11669: MINOR: Replace if/else with match in KafkaZkClient#getPartitionAssignmentForTopics

2022-01-11 Thread GitBox


lmr3796 commented on pull request #11669:
URL: https://github.com/apache/kafka/pull/11669#issuecomment-1010717423


   Hi @ijuma this serves as a follow-up on #4196.
   
   I saw you reviewed that PR and wondering if you can also review this one.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org