ableegoldman commented on code in PR #19400:
URL: https://github.com/apache/kafka/pull/19400#discussion_r2053464025


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1205,45 +1200,9 @@ private Optional<String> removeStreamThread(final long 
timeoutMs) throws Timeout
                         final long cacheSizePerThread = 
cacheSizePerThread(numLiveStreamThreads());
                         log.info("Resizing thread cache due to thread removal, 
new cache size per thread is {}", cacheSizePerThread);
                         resizeThreadCache(cacheSizePerThread);
-                        if (groupInstanceID.isPresent() && 
callingThreadIsNotCurrentStreamThread) {
-                            final MemberToRemove memberToRemove = new 
MemberToRemove(groupInstanceID.get());
-                            final Collection<MemberToRemove> membersToRemove = 
Collections.singletonList(memberToRemove);
-                            final RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroupResult = 
-                                adminClient.removeMembersFromConsumerGroup(
-                                    
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
-                                    new 
RemoveMembersFromConsumerGroupOptions(membersToRemove)
-                                );
-                            try {
-                                final long remainingTimeMs = timeoutMs - 
(time.milliseconds() - startMs);
-                                
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs,
 TimeUnit.MILLISECONDS);
-                            } catch (final 
java.util.concurrent.TimeoutException exception) {
-                                log.error(
-                                    String.format(
-                                        "Could not remove static member %s 
from consumer group %s due to a timeout:",
-                                        groupInstanceID.get(),
-                                        
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
-                                    ),
-                                        exception
-                                );
-                                throw new 
TimeoutException(exception.getMessage(), exception);
-                            } catch (final InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                            } catch (final ExecutionException exception) {
-                                log.error(
-                                    String.format(
-                                        "Could not remove static member %s 
from consumer group %s due to:",
-                                        groupInstanceID.get(),
-                                        
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
-                                    ),
-                                        exception
-                                );
-                                throw new StreamsException(
-                                        "Could not remove static member " + 
groupInstanceID.get()
-                                            + " from consumer group " + 
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
-                                            + " for the following reason: ",
-                                        exception.getCause()
-                                );
-                            }

Review Comment:
   nice! love to see all this code get deleted for one line 😄 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1805,12 +1816,12 @@ private void completeShutdown(final boolean cleanRun) {
             log.error("Failed to unsubscribe due to the following error: ", e);
         }
         try {
-            mainConsumer.close();
+            mainConsumer.close(closeOptions);
         } catch (final Throwable e) {
             log.error("Failed to close consumer due to the following error:", 
e);
         }
         try {
-            restoreConsumer.close();
+            restoreConsumer.close(closeOptions);

Review Comment:
   the restore consumer direct assignment so it's not part of the consumer 
group and doesn't need the close options (I don't remember, what happens if you 
pass in LEAVE_GROUP or REMAIN_IN_GROUP for a consumer that doesn't use group 
subscription?)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1994,6 +2005,13 @@ public Map<String, KafkaFuture<Uuid>> 
clientInstanceIds(final Duration timeout)
         return result;
     }
 
+    public void closeConsumer(final long timeoutMs, final boolean leaveGroup) {
+        final CloseOptions.GroupMembershipOperation operation = leaveGroup ? 
LEAVE_GROUP : REMAIN_IN_GROUP;
+        final CloseOptions closeOptions = 
CloseOptions.timeout(Duration.ofMillis(timeoutMs <= 0 ? 0 : timeoutMs))

Review Comment:
    it's definitely a bit questionable that we don't pass the timeout in to 
consumer#close in the current code (interestingly we also don't for 
producer#close but do use the timeout for admin#close) But, I don't want to 
change that in this PR, on the off chance there was an actual reason for it 
   
   I know we had some issues with the producer timeout on close in the past but 
I don't remember the specifics and I'm not personally aware of any similar 
issues for the consumer's close. It's worth following up on, but separately 
from this. Let's just use the default for now (ie don't set the timeout at all)



##########
tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java:
##########
@@ -256,9 +255,8 @@ public void 
testResetWhenLongSessionTimeoutConfiguredWithForceOption(final TestI
         streams = new 
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
         streams.cleanUp();
 
-        // Reset would fail since long session timeout has been configured
         final boolean cleanResult = tryCleanGlobal(false, null, null, appID);
-        assertFalse(cleanResult);
+        assertTrue(cleanResult);

Review Comment:
   not 100% sure i follow, why does this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to