[ https://issues.apache.org/jira/browse/KAFKA-17112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865693#comment-17865693 ]
Ao Li commented on KAFKA-17112: ------------------------------- [~cadonna]Thanks for your reply! I've tried moving `stateUpdater.start();` to `StreamThread::start`. This does not work for many tests because the `StreamTread::start` is never called. {code} @Test public void shouldLogAndRecordSkippedRecordsForInvalidTimestamps() { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); final Properties properties = configProps(false); properties.setProperty( StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, LogAndSkipOnInvalidTimestamp.class.getName() ); final StreamsConfig config = new StreamsConfig(properties); thread = createStreamThread(CLIENT_ID, config); thread.setState(StreamThread.State.STARTING); thread.setState(StreamThread.State.PARTITIONS_REVOKED); final TaskId task1 = new TaskId(0, t1p1.partition()); final Set<TopicPartition> assignedPartitions = Collections.singleton(t1p1); thread.taskManager().handleAssignment( Collections.singletonMap( task1, assignedPartitions), emptyMap()); final MockConsumer<byte[], byte[]> mockConsumer = (MockConsumer<byte[], byte[]>) thread.mainConsumer(); mockConsumer.assign(Collections.singleton(t1p1)); mockConsumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); thread.rebalanceListener().onPartitionsAssigned(assignedPartitions); runOnce(); {code} If we move `stateUpdater.start();` to `StreamThread::start`, they will hang because they may wait for stateUpdater. I also tried to replace all `thread.setState(StreamThread.State.STARTING);` with `thread.start();` to see if it is an easy fix, but it also breaks many tests. Also, the processingThread (TaskExecutor) is created in `StreamThread::create` and then passed to `TaskManager`, and StreamThread will lose its access after the StreamThread::creat call. This could be fixed easily by implementing a start method in TaskManager. > StreamThread shutdown calls completeShutdown only in CREATED state > ------------------------------------------------------------------ > > Key: KAFKA-17112 > URL: https://issues.apache.org/jira/browse/KAFKA-17112 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests > Affects Versions: 3.9.0 > Reporter: Ao Li > Priority: Minor > > While running tests in `StreamThreadTest.java` in kafka/streams, I noticed > the test left many lingering threads. Though the class runs `shutdown` after > each test, the shutdown only executes `completeShutdown` if the StreamThread > is in CREATED state. See > [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java#L231] > and > [https://github.com/apache/kafka/blob/0b11971f2c94f7aadc3fab2c51d94642065a72e5/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1435] > > For example, you may run test > org.apache.kafka.streams.processor.internals.StreamThreadTest#shouldNotCloseTaskProducerWhenSuspending > with commit 0b11971f2c94f7aadc3fab2c51d94642065a72e5. When the test calls > `thread.shutdown()`, the thread is in `PARTITIONS_REVOKED` state. Thus, > `completeShutdown` is not called. The test creates three lingering threads: 2 > `StateUpdater` and 1 `TaskExecutor` > > This means that calls to `thread.shutdown` has no effect in > `StreamThreadTest.java`. -- This message was sent by Atlassian Jira (v8.20.10#820010)