cadonna commented on code in PR #19275: URL: https://github.com/apache/kafka/pull/19275#discussion_r2016429245
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -372,15 +371,13 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Runnable shutdownErrorHook, final BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler) { - final boolean stateUpdaterEnabled = InternalConfig.stateUpdaterEnabled(config.originals()); Review Comment: Could you please also remove the internal config `_state.updater.enabled_` and all corresponding code from `StreamsConfig`? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -237,13 +237,13 @@ State setState(final State newState) { if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) { log.debug("Ignoring request to transit from PENDING_SHUTDOWN to {}: " + - "only DEAD state is a valid next state", newState); + "only DEAD state is a valid next state", newState); // when the state is already in PENDING_SHUTDOWN, all other transitions will be // refused but we do not throw exception here return null; } else if (state == State.DEAD) { log.debug("Ignoring request to transit from DEAD to {}: " + - "no valid next state after DEAD", newState); + "no valid next state after DEAD", newState); Review Comment: Could you revert this change, please? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader( - time, - config, - restorationLogContext, - adminClient, - restoreConsumer, - userStateRestoreListener, - userStandbyUpdateListener + time, + config, + restorationLogContext, + adminClient, + restoreConsumer, + userStateRestoreListener, + userStandbyUpdateListener ); final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); final boolean proceessingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator( - topologyMetadata, - config, - streamsMetrics, - stateDirectory, - changelogReader, - cache, - time, - clientSupplier, - threadId, - threadIdx, - processId, - log, - stateUpdaterEnabled, - proceessingThreadsEnabled + topologyMetadata, + config, + streamsMetrics, + stateDirectory, + changelogReader, + cache, + time, + clientSupplier, + threadId, + threadIdx, + processId, + log, + proceessingThreadsEnabled ); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( - topologyMetadata, - config, - streamsMetrics, - stateDirectory, - changelogReader, - threadId, - log, - stateUpdaterEnabled); + topologyMetadata, + config, + streamsMetrics, + stateDirectory, + changelogReader, + threadId, + log); Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader( - time, - config, - restorationLogContext, - adminClient, - restoreConsumer, - userStateRestoreListener, - userStandbyUpdateListener + time, + config, + restorationLogContext, + adminClient, + restoreConsumer, + userStateRestoreListener, + userStandbyUpdateListener ); final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); final boolean proceessingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator( - topologyMetadata, - config, - streamsMetrics, - stateDirectory, - changelogReader, - cache, - time, - clientSupplier, - threadId, - threadIdx, - processId, - log, - stateUpdaterEnabled, - proceessingThreadsEnabled + topologyMetadata, + config, + streamsMetrics, + stateDirectory, + changelogReader, + cache, + time, + clientSupplier, + threadId, + threadIdx, + processId, + log, + proceessingThreadsEnabled ); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( - topologyMetadata, - config, - streamsMetrics, - stateDirectory, - changelogReader, - threadId, - log, - stateUpdaterEnabled); + topologyMetadata, + config, + streamsMetrics, + stateDirectory, + changelogReader, + threadId, + log); final Tasks tasks = new Tasks(new LogContext(logPrefix)); final boolean processingThreadsEnabled = - InternalConfig.processingThreadsEnabled(config.originals()); + InternalConfig.processingThreadsEnabled(config.originals()); Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -615,27 +601,27 @@ public StreamThread(final Time time, ThreadMetrics.closeTaskSensor(threadId, streamsMetrics); ThreadMetrics.addThreadStartTimeMetric( - threadId, - streamsMetrics, - time.milliseconds() + threadId, + streamsMetrics, + time.milliseconds() ); ThreadMetrics.addThreadStateTelemetryMetric( - processId.toString(), - threadId, - streamsMetrics, - (metricConfig, now) -> this.state().ordinal()); + processId.toString(), + threadId, + streamsMetrics, + (metricConfig, now) -> this.state().ordinal()); ThreadMetrics.addThreadStateMetric( - threadId, - streamsMetrics, - (metricConfig, now) -> this.state()); + threadId, + streamsMetrics, + (metricConfig, now) -> this.state()); ThreadMetrics.addThreadBlockedTimeMetric( - threadId, - new StreamThreadTotalBlockedTime( - mainConsumer, - restoreConsumer, - taskManager::totalProducerBlockedTime - ), - streamsMetrics + threadId, + new StreamThreadTotalBlockedTime( + mainConsumer, + restoreConsumer, + taskManager::totalProducerBlockedTime + ), + streamsMetrics Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -743,7 +728,7 @@ boolean runLoop() { } } catch (final TaskCorruptedException e) { log.warn("Detected the states of tasks " + e.corruptedTasks() + " are corrupted. " + - "Will close the task as dirty and re-create and bootstrap from scratch.", e); + "Will close the task as dirty and re-create and bootstrap from scratch.", e); Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -844,14 +829,14 @@ void maybeGetClientInstanceIds() { } } else { producerInstanceIdFuture.completeExceptionally( - new TimeoutException("Could not retrieve thread producer client instance id.") + new TimeoutException("Could not retrieve thread producer client instance id.") Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -844,14 +829,14 @@ void maybeGetClientInstanceIds() { } } else { producerInstanceIdFuture.completeExceptionally( - new TimeoutException("Could not retrieve thread producer client instance id.") + new TimeoutException("Could not retrieve thread producer client instance id.") ); } } if (mainConsumerInstanceIdFuture.isDone() - && (!stateUpdaterEnabled && restoreConsumerInstanceIdFuture.isDone()) - && producerInstanceIdFuture.isDone()) { + && restoreConsumerInstanceIdFuture.isDone() + && producerInstanceIdFuture.isDone()) { Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader( - time, - config, - restorationLogContext, - adminClient, - restoreConsumer, - userStateRestoreListener, - userStandbyUpdateListener + time, + config, + restorationLogContext, + adminClient, + restoreConsumer, + userStateRestoreListener, + userStandbyUpdateListener ); final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); final boolean proceessingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator( - topologyMetadata, - config, - streamsMetrics, - stateDirectory, - changelogReader, - cache, - time, - clientSupplier, - threadId, - threadIdx, - processId, - log, - stateUpdaterEnabled, - proceessingThreadsEnabled + topologyMetadata, + config, + streamsMetrics, + stateDirectory, + changelogReader, + cache, + time, + clientSupplier, + threadId, + threadIdx, + processId, + log, + proceessingThreadsEnabled Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -237,13 +237,13 @@ State setState(final State newState) { if (state == State.PENDING_SHUTDOWN && newState != State.DEAD) { log.debug("Ignoring request to transit from PENDING_SHUTDOWN to {}: " + - "only DEAD state is a valid next state", newState); + "only DEAD state is a valid next state", newState); Review Comment: Could you revert this change, please? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader( - time, - config, - restorationLogContext, - adminClient, - restoreConsumer, - userStateRestoreListener, - userStandbyUpdateListener + time, + config, + restorationLogContext, + adminClient, + restoreConsumer, + userStateRestoreListener, + userStandbyUpdateListener Review Comment: Could you please revert this change? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader( - time, - config, - restorationLogContext, - adminClient, - restoreConsumer, - userStateRestoreListener, - userStandbyUpdateListener + time, + config, + restorationLogContext, + adminClient, + restoreConsumer, + userStateRestoreListener, + userStandbyUpdateListener ); final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); final boolean proceessingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator( - topologyMetadata, - config, - streamsMetrics, - stateDirectory, - changelogReader, - cache, - time, - clientSupplier, - threadId, - threadIdx, - processId, - log, - stateUpdaterEnabled, - proceessingThreadsEnabled + topologyMetadata, + config, + streamsMetrics, + stateDirectory, + changelogReader, + cache, + time, + clientSupplier, + threadId, + threadIdx, + processId, + log, + proceessingThreadsEnabled ); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( - topologyMetadata, - config, - streamsMetrics, - stateDirectory, - changelogReader, - threadId, - log, - stateUpdaterEnabled); + topologyMetadata, + config, + streamsMetrics, + stateDirectory, + changelogReader, + threadId, + log); final Tasks tasks = new Tasks(new LogContext(logPrefix)); final boolean processingThreadsEnabled = - InternalConfig.processingThreadsEnabled(config.originals()); + InternalConfig.processingThreadsEnabled(config.originals()); final DefaultTaskManager schedulingTaskManager = - maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks); + maybeCreateSchedulingTaskManager(processingThreadsEnabled, topologyMetadata, time, threadId, tasks); final StateUpdater stateUpdater = - maybeCreateAndStartStateUpdater( - stateUpdaterEnabled, - streamsMetrics, - config, - restoreConsumer, - changelogReader, - topologyMetadata, - time, - clientId, - threadIdx - ); + maybeCreateAndStartStateUpdater( + streamsMetrics, + config, + restoreConsumer, + changelogReader, + topologyMetadata, + time, + clientId, + threadIdx + ); Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -486,81 +480,73 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, streamsMetrics.metricsRegistry().addReporter(reporter); final StreamThread streamThread = new StreamThread( - time, - config, - adminClient, - mainConsumer, - restoreConsumer, - changelogReader, - originalReset, - taskManager, - stateUpdater, - streamsMetrics, - topologyMetadata, - processId, - threadId, - logContext, - referenceContainer.assignmentErrorCode, - referenceContainer.nextScheduledRebalanceMs, - referenceContainer.nonFatalExceptionsToHandle, - shutdownErrorHook, - streamsUncaughtExceptionHandler, - cache::resize + time, + config, + adminClient, + mainConsumer, + restoreConsumer, + changelogReader, + originalReset, + taskManager, + stateUpdater, + streamsMetrics, + topologyMetadata, + processId, + threadId, + logContext, + referenceContainer.assignmentErrorCode, + referenceContainer.nextScheduledRebalanceMs, + referenceContainer.nonFatalExceptionsToHandle, + shutdownErrorHook, + streamsUncaughtExceptionHandler, + cache::resize ); return streamThread.updateThreadMetadata(adminClientId(clientId)); } private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean processingThreadsEnabled, - final boolean stateUpdaterEnabled, final TopologyMetadata topologyMetadata, final Time time, final String threadId, final Tasks tasks) { if (processingThreadsEnabled) { - if (!stateUpdaterEnabled) { - throw new IllegalStateException("Processing threads require the state updater to be enabled"); - } final DefaultTaskManager defaultTaskManager = new DefaultTaskManager( - time, - threadId, - tasks, - new DefaultTaskExecutorCreator(), - topologyMetadata.taskExecutionMetadata(), - 1 + time, + threadId, + tasks, + new DefaultTaskExecutorCreator(), + topologyMetadata.taskExecutionMetadata(), + 1 Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -486,81 +480,73 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, streamsMetrics.metricsRegistry().addReporter(reporter); final StreamThread streamThread = new StreamThread( - time, - config, - adminClient, - mainConsumer, - restoreConsumer, - changelogReader, - originalReset, - taskManager, - stateUpdater, - streamsMetrics, - topologyMetadata, - processId, - threadId, - logContext, - referenceContainer.assignmentErrorCode, - referenceContainer.nextScheduledRebalanceMs, - referenceContainer.nonFatalExceptionsToHandle, - shutdownErrorHook, - streamsUncaughtExceptionHandler, - cache::resize + time, + config, + adminClient, + mainConsumer, + restoreConsumer, + changelogReader, + originalReset, + taskManager, + stateUpdater, + streamsMetrics, + topologyMetadata, + processId, + threadId, + logContext, + referenceContainer.assignmentErrorCode, + referenceContainer.nextScheduledRebalanceMs, + referenceContainer.nonFatalExceptionsToHandle, + shutdownErrorHook, + streamsUncaughtExceptionHandler, + cache::resize ); return streamThread.updateThreadMetadata(adminClientId(clientId)); } private static DefaultTaskManager maybeCreateSchedulingTaskManager(final boolean processingThreadsEnabled, - final boolean stateUpdaterEnabled, final TopologyMetadata topologyMetadata, final Time time, final String threadId, final Tasks tasks) { if (processingThreadsEnabled) { - if (!stateUpdaterEnabled) { - throw new IllegalStateException("Processing threads require the state updater to be enabled"); - } final DefaultTaskManager defaultTaskManager = new DefaultTaskManager( - time, - threadId, - tasks, - new DefaultTaskExecutorCreator(), - topologyMetadata.taskExecutionMetadata(), - 1 + time, + threadId, + tasks, + new DefaultTaskExecutorCreator(), + topologyMetadata.taskExecutionMetadata(), + 1 ); defaultTaskManager.startTaskExecutors(); return defaultTaskManager; } return null; } - private static StateUpdater maybeCreateAndStartStateUpdater(final boolean stateUpdaterEnabled, - final StreamsMetricsImpl streamsMetrics, + private static StateUpdater maybeCreateAndStartStateUpdater(final StreamsMetricsImpl streamsMetrics, Review Comment: Since this method now always creates and starts a state updater, could you please rename it to `createAndStartStateUpdater()`? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader( - time, - config, - restorationLogContext, - adminClient, - restoreConsumer, - userStateRestoreListener, - userStandbyUpdateListener + time, + config, + restorationLogContext, + adminClient, + restoreConsumer, + userStateRestoreListener, + userStandbyUpdateListener ); final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); final boolean proceessingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator( - topologyMetadata, - config, - streamsMetrics, - stateDirectory, - changelogReader, - cache, - time, - clientSupplier, - threadId, - threadIdx, - processId, - log, - stateUpdaterEnabled, - proceessingThreadsEnabled + topologyMetadata, + config, + streamsMetrics, + stateDirectory, + changelogReader, + cache, + time, + clientSupplier, + threadId, + threadIdx, + processId, + log, + proceessingThreadsEnabled ); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( - topologyMetadata, - config, - streamsMetrics, - stateDirectory, - changelogReader, - threadId, - log, - stateUpdaterEnabled); + topologyMetadata, + config, + streamsMetrics, + stateDirectory, + changelogReader, + threadId, + log); final Tasks tasks = new Tasks(new LogContext(logPrefix)); final boolean processingThreadsEnabled = - InternalConfig.processingThreadsEnabled(config.originals()); + InternalConfig.processingThreadsEnabled(config.originals()); final DefaultTaskManager schedulingTaskManager = - maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks); + maybeCreateSchedulingTaskManager(processingThreadsEnabled, topologyMetadata, time, threadId, tasks); Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -394,76 +391,73 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(restoreConsumerConfigs); final StoreChangelogReader changelogReader = new StoreChangelogReader( - time, - config, - restorationLogContext, - adminClient, - restoreConsumer, - userStateRestoreListener, - userStandbyUpdateListener + time, + config, + restorationLogContext, + adminClient, + restoreConsumer, + userStateRestoreListener, + userStandbyUpdateListener ); final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); final boolean proceessingThreadsEnabled = InternalConfig.processingThreadsEnabled(config.originals()); final ActiveTaskCreator activeTaskCreator = new ActiveTaskCreator( - topologyMetadata, - config, - streamsMetrics, - stateDirectory, - changelogReader, - cache, - time, - clientSupplier, - threadId, - threadIdx, - processId, - log, - stateUpdaterEnabled, - proceessingThreadsEnabled + topologyMetadata, + config, + streamsMetrics, + stateDirectory, + changelogReader, + cache, + time, + clientSupplier, + threadId, + threadIdx, + processId, + log, + proceessingThreadsEnabled ); final StandbyTaskCreator standbyTaskCreator = new StandbyTaskCreator( - topologyMetadata, - config, - streamsMetrics, - stateDirectory, - changelogReader, - threadId, - log, - stateUpdaterEnabled); + topologyMetadata, + config, + streamsMetrics, + stateDirectory, + changelogReader, + threadId, + log); final Tasks tasks = new Tasks(new LogContext(logPrefix)); final boolean processingThreadsEnabled = - InternalConfig.processingThreadsEnabled(config.originals()); + InternalConfig.processingThreadsEnabled(config.originals()); final DefaultTaskManager schedulingTaskManager = - maybeCreateSchedulingTaskManager(processingThreadsEnabled, stateUpdaterEnabled, topologyMetadata, time, threadId, tasks); + maybeCreateSchedulingTaskManager(processingThreadsEnabled, topologyMetadata, time, threadId, tasks); final StateUpdater stateUpdater = - maybeCreateAndStartStateUpdater( - stateUpdaterEnabled, - streamsMetrics, - config, - restoreConsumer, - changelogReader, - topologyMetadata, - time, - clientId, - threadIdx - ); + maybeCreateAndStartStateUpdater( + streamsMetrics, + config, + restoreConsumer, + changelogReader, + topologyMetadata, + time, + clientId, + threadIdx + ); final TaskManager taskManager = new TaskManager( - time, - changelogReader, - new ProcessId(processId), - logPrefix, - activeTaskCreator, - standbyTaskCreator, - tasks, - topologyMetadata, - adminClient, - stateDirectory, - stateUpdater, - schedulingTaskManager + time, + changelogReader, + new ProcessId(processId), + logPrefix, + activeTaskCreator, + standbyTaskCreator, + tasks, + topologyMetadata, + adminClient, + stateDirectory, + stateUpdater, + schedulingTaskManager Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -615,27 +601,27 @@ public StreamThread(final Time time, ThreadMetrics.closeTaskSensor(threadId, streamsMetrics); ThreadMetrics.addThreadStartTimeMetric( - threadId, - streamsMetrics, - time.milliseconds() + threadId, + streamsMetrics, + time.milliseconds() ); ThreadMetrics.addThreadStateTelemetryMetric( - processId.toString(), - threadId, - streamsMetrics, - (metricConfig, now) -> this.state().ordinal()); + processId.toString(), + threadId, + streamsMetrics, + (metricConfig, now) -> this.state().ordinal()); Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -615,27 +601,27 @@ public StreamThread(final Time time, ThreadMetrics.closeTaskSensor(threadId, streamsMetrics); ThreadMetrics.addThreadStartTimeMetric( - threadId, - streamsMetrics, - time.milliseconds() + threadId, + streamsMetrics, + time.milliseconds() ); ThreadMetrics.addThreadStateTelemetryMetric( - processId.toString(), - threadId, - streamsMetrics, - (metricConfig, now) -> this.state().ordinal()); + processId.toString(), + threadId, + streamsMetrics, + (metricConfig, now) -> this.state().ordinal()); ThreadMetrics.addThreadStateMetric( - threadId, - streamsMetrics, - (metricConfig, now) -> this.state()); + threadId, + streamsMetrics, + (metricConfig, now) -> this.state()); Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -760,12 +745,12 @@ boolean runLoop() { } catch (final UnsupportedVersionException e) { final String errorMessage = e.getMessage(); if (errorMessage != null && - errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) { + errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) { Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -486,81 +480,73 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, streamsMetrics.metricsRegistry().addReporter(reporter); final StreamThread streamThread = new StreamThread( - time, - config, - adminClient, - mainConsumer, - restoreConsumer, - changelogReader, - originalReset, - taskManager, - stateUpdater, - streamsMetrics, - topologyMetadata, - processId, - threadId, - logContext, - referenceContainer.assignmentErrorCode, - referenceContainer.nextScheduledRebalanceMs, - referenceContainer.nonFatalExceptionsToHandle, - shutdownErrorHook, - streamsUncaughtExceptionHandler, - cache::resize + time, + config, + adminClient, + mainConsumer, + restoreConsumer, + changelogReader, + originalReset, + taskManager, + stateUpdater, + streamsMetrics, + topologyMetadata, + processId, + threadId, + logContext, + referenceContainer.assignmentErrorCode, + referenceContainer.nextScheduledRebalanceMs, + referenceContainer.nonFatalExceptionsToHandle, + shutdownErrorHook, + streamsUncaughtExceptionHandler, + cache::resize Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -615,27 +601,27 @@ public StreamThread(final Time time, ThreadMetrics.closeTaskSensor(threadId, streamsMetrics); ThreadMetrics.addThreadStartTimeMetric( - threadId, - streamsMetrics, - time.milliseconds() + threadId, + streamsMetrics, + time.milliseconds() Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -801,13 +786,13 @@ void maybeGetClientInstanceIds() { } } else { mainConsumerInstanceIdFuture.completeExceptionally( - new TimeoutException("Could not retrieve main consumer client instance id.") + new TimeoutException("Could not retrieve main consumer client instance id.") ); } } - if (!stateUpdaterEnabled && !restoreConsumerInstanceIdFuture.isDone()) { + if (!restoreConsumerInstanceIdFuture.isDone()) { Review Comment: Shouldn't you delete the whole `if`-clause? Since the state updater will be always enabled after this PR, this block should always be skipped. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -657,13 +643,12 @@ public StreamThread(final Time time, this.pollTime = Duration.ofMillis(config.getLong(StreamsConfig.POLL_MS_CONFIG)); final int dummyThreadIdx = 1; this.maxPollTimeMs = new InternalConsumerConfig(config.getMainConsumerConfigs("dummyGroupId", "dummyClientId", dummyThreadIdx)) - .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); + .getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG); Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -831,7 +816,7 @@ void maybeGetClientInstanceIds() { if (fetchDeadlineClientInstanceId >= time.milliseconds()) { try { producerInstanceIdFuture.complete( - taskManager.streamsProducer().kafkaProducer().clientInstanceId(Duration.ZERO) + taskManager.streamsProducer().kafkaProducer().clientInstanceId(Duration.ZERO) Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -760,12 +745,12 @@ boolean runLoop() { } catch (final UnsupportedVersionException e) { final String errorMessage = e.getMessage(); if (errorMessage != null && - errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) { + errorMessage.startsWith("Broker unexpectedly doesn't support requireStable flag on version ")) { log.error("Shutting down because the Kafka cluster seems to be on a too old version. " + - "Setting {}=\"{}\" requires broker version 2.5 or higher.", - StreamsConfig.PROCESSING_GUARANTEE_CONFIG, - StreamsConfig.EXACTLY_ONCE_V2); + "Setting {}=\"{}\" requires broker version 2.5 or higher.", + StreamsConfig.PROCESSING_GUARANTEE_CONFIG, + StreamsConfig.EXACTLY_ONCE_V2); Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -822,7 +807,7 @@ void maybeGetClientInstanceIds() { } } else { restoreConsumerInstanceIdFuture.completeExceptionally( - new TimeoutException("Could not retrieve restore consumer client instance id.") + new TimeoutException("Could not retrieve restore consumer client instance id.") Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -801,13 +786,13 @@ void maybeGetClientInstanceIds() { } } else { mainConsumerInstanceIdFuture.completeExceptionally( - new TimeoutException("Could not retrieve main consumer client instance id.") + new TimeoutException("Could not retrieve main consumer client instance id.") Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -918,8 +903,8 @@ public void sendShutdownRequest(final AssignorError assignorError) { private void handleTaskMigrated(final TaskMigratedException e) { log.warn("Detected that the thread is being fenced. " + - "This implies that this thread missed a rebalance and dropped out of the consumer group. " + - "Will close out all assigned tasks and rejoin the consumer group.", e); + "This implies that this thread missed a rebalance and dropped out of the consumer group. " + + "Will close out all assigned tasks and rejoin the consumer group.", e); Review Comment: Could you please revert the additional indentation? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -844,14 +829,14 @@ void maybeGetClientInstanceIds() { } } else { producerInstanceIdFuture.completeExceptionally( - new TimeoutException("Could not retrieve thread producer client instance id.") + new TimeoutException("Could not retrieve thread producer client instance id.") ); } } if (mainConsumerInstanceIdFuture.isDone() - && (!stateUpdaterEnabled && restoreConsumerInstanceIdFuture.isDone()) - && producerInstanceIdFuture.isDone()) { + && restoreConsumerInstanceIdFuture.isDone() Review Comment: I believe you can also remove this line. With the state updater fetching this instance ID is done within the state updater. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org