mjsax commented on a change in pull request #10668: URL: https://github.com/apache/kafka/pull/10668#discussion_r630602355
########## File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ########## @@ -553,141 +552,152 @@ public void testInitializesAndDestroysMetricsReporters() { @Test public void testCloseIsIdempotent() { - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - streams.close(); - final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + streams.close(); + final int closeCount = MockMetricsReporter.CLOSE_COUNT.get(); - streams.close(); - Assert.assertEquals("subsequent close() calls should do nothing", - closeCount, MockMetricsReporter.CLOSE_COUNT.get()); + streams.close(); + Assert.assertEquals("subsequent close() calls should do nothing", + closeCount, MockMetricsReporter.CLOSE_COUNT.get()); + } } @Test public void shouldAddThreadWhenRunning() throws InterruptedException { props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - streams.start(); - final int oldSize = streams.threads.size(); - TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); - assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); - assertThat(streams.threads.size(), equalTo(oldSize + 1)); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + streams.start(); + final int oldSize = streams.threads.size(); + TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, "wait until running"); + assertThat(streams.addStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 2))); + assertThat(streams.threads.size(), equalTo(oldSize + 1)); + } } @Test public void shouldNotAddThreadWhenCreated() { - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - final int oldSize = streams.threads.size(); - assertThat(streams.addStreamThread(), equalTo(Optional.empty())); - assertThat(streams.threads.size(), equalTo(oldSize)); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + final int oldSize = streams.threads.size(); + assertThat(streams.addStreamThread(), equalTo(Optional.empty())); + assertThat(streams.threads.size(), equalTo(oldSize)); + } } @Test public void shouldNotAddThreadWhenClosed() { - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - final int oldSize = streams.threads.size(); - streams.close(); - assertThat(streams.addStreamThread(), equalTo(Optional.empty())); - assertThat(streams.threads.size(), equalTo(oldSize)); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + final int oldSize = streams.threads.size(); + streams.close(); + assertThat(streams.addStreamThread(), equalTo(Optional.empty())); + assertThat(streams.threads.size(), equalTo(oldSize)); + } } @Test public void shouldNotAddThreadWhenError() { - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - final int oldSize = streams.threads.size(); - streams.start(); - globalStreamThread.shutdown(); - assertThat(streams.addStreamThread(), equalTo(Optional.empty())); - assertThat(streams.threads.size(), equalTo(oldSize)); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + final int oldSize = streams.threads.size(); + streams.start(); + globalStreamThread.shutdown(); + assertThat(streams.addStreamThread(), equalTo(Optional.empty())); + assertThat(streams.threads.size(), equalTo(oldSize)); + } } @Test public void shouldNotReturnDeadThreads() { - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - streams.start(); - streamThreadOne.shutdown(); - final Set<ThreadMetadata> threads = streams.localThreadsMetadata(); + final Set<ThreadMetadata> threads; + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + streams.start(); + streamThreadOne.shutdown(); + threads = streams.localThreadsMetadata(); + } assertThat(threads.size(), equalTo(1)); assertThat(threads, hasItem(streamThreadTwo.threadMetadata())); } @Test public void shouldRemoveThread() throws InterruptedException { props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - streams.start(); - final int oldSize = streams.threads.size(); - TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, - "Kafka Streams client did not reach state RUNNING"); - assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1))); - assertThat(streams.threads.size(), equalTo(oldSize - 1)); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + streams.start(); + final int oldSize = streams.threads.size(); + TestUtils.waitForCondition(() -> streams.state() == KafkaStreams.State.RUNNING, 15L, + "Kafka Streams client did not reach state RUNNING"); + assertThat(streams.removeStreamThread(), equalTo(Optional.of("processId-StreamThread-" + 1))); + assertThat(streams.threads.size(), equalTo(oldSize - 1)); + } } @Test public void shouldNotRemoveThreadWhenNotRunning() { props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1); - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - assertThat(streams.removeStreamThread(), equalTo(Optional.empty())); - assertThat(streams.threads.size(), equalTo(1)); + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { + assertThat(streams.removeStreamThread(), equalTo(Optional.empty())); + assertThat(streams.threads.size(), equalTo(1)); + } } @Test public void testCannotStartOnceClosed() { - final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time); - streams.start(); - streams.close(); - try { + try (final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) { streams.start(); - fail("Should have throw IllegalStateException"); - } catch (final IllegalStateException expected) { - // this is ok - } finally { streams.close(); + try { + streams.start(); + fail("Should have throw IllegalStateException"); Review comment: +1 -- also below in other tests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org