ableegoldman commented on code in PR #19400: URL: https://github.com/apache/kafka/pull/19400#discussion_r2053464025
########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -1205,45 +1200,9 @@ private Optional<String> removeStreamThread(final long timeoutMs) throws Timeout final long cacheSizePerThread = cacheSizePerThread(numLiveStreamThreads()); log.info("Resizing thread cache due to thread removal, new cache size per thread is {}", cacheSizePerThread); resizeThreadCache(cacheSizePerThread); - if (groupInstanceID.isPresent() && callingThreadIsNotCurrentStreamThread) { - final MemberToRemove memberToRemove = new MemberToRemove(groupInstanceID.get()); - final Collection<MemberToRemove> membersToRemove = Collections.singletonList(memberToRemove); - final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = - adminClient.removeMembersFromConsumerGroup( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), - new RemoveMembersFromConsumerGroupOptions(membersToRemove) - ); - try { - final long remainingTimeMs = timeoutMs - (time.milliseconds() - startMs); - removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs, TimeUnit.MILLISECONDS); - } catch (final java.util.concurrent.TimeoutException exception) { - log.error( - String.format( - "Could not remove static member %s from consumer group %s due to a timeout:", - groupInstanceID.get(), - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG) - ), - exception - ); - throw new TimeoutException(exception.getMessage(), exception); - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (final ExecutionException exception) { - log.error( - String.format( - "Could not remove static member %s from consumer group %s due to:", - groupInstanceID.get(), - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG) - ), - exception - ); - throw new StreamsException( - "Could not remove static member " + groupInstanceID.get() - + " from consumer group " + applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG) - + " for the following reason: ", - exception.getCause() - ); - } Review Comment: nice! love to see all this code get deleted for one line 😄 ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1805,12 +1816,12 @@ private void completeShutdown(final boolean cleanRun) { log.error("Failed to unsubscribe due to the following error: ", e); } try { - mainConsumer.close(); + mainConsumer.close(closeOptions); } catch (final Throwable e) { log.error("Failed to close consumer due to the following error:", e); } try { - restoreConsumer.close(); + restoreConsumer.close(closeOptions); Review Comment: the restore consumer direct assignment so it's not part of the consumer group and doesn't need the close options (I don't remember, what happens if you pass in LEAVE_GROUP or REMAIN_IN_GROUP for a consumer that doesn't use group subscription?) ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -1994,6 +2005,13 @@ public Map<String, KafkaFuture<Uuid>> clientInstanceIds(final Duration timeout) return result; } + public void closeConsumer(final long timeoutMs, final boolean leaveGroup) { + final CloseOptions.GroupMembershipOperation operation = leaveGroup ? LEAVE_GROUP : REMAIN_IN_GROUP; + final CloseOptions closeOptions = CloseOptions.timeout(Duration.ofMillis(timeoutMs <= 0 ? 0 : timeoutMs)) Review Comment: it's definitely a bit questionable that we don't pass the timeout in to consumer#close in the current code (interestingly we also don't for producer#close but do use the timeout for admin#close) But, I don't want to change that in this PR, on the off chance there was an actual reason for it I know we had some issues with the producer timeout on close in the past but I don't remember the specifics and I'm not personally aware of any similar issues for the consumer's close. It's worth following up on, but separately from this. Let's just use the default for now (ie don't set the timeout at all) ########## tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java: ########## @@ -256,9 +255,8 @@ public void testResetWhenLongSessionTimeoutConfiguredWithForceOption(final TestI streams = new KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig); streams.cleanUp(); - // Reset would fail since long session timeout has been configured final boolean cleanResult = tryCleanGlobal(false, null, null, appID); - assertFalse(cleanResult); + assertTrue(cleanResult); Review Comment: not 100% sure i follow, why does this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org