vvcephei commented on a change in pull request #8938: URL: https://github.com/apache/kafka/pull/8938#discussion_r446551977
########## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -38,107 +37,128 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; public class SmokeTestClient extends SmokeTestUtil { private final String name; - private Thread thread; private KafkaStreams streams; private boolean uncaughtException = false; - private boolean started; - private boolean closed; + private volatile boolean closed; - public SmokeTestClient(final String name) { - super(); - this.name = name; + private static void addShutdownHook(final String name, final Runnable runnable) { Review comment: I inlined these utilities to make this class "portable". I.e., we can copy and paste it into the "upgrade test" modules without also dragging in a dependency on the client utils. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/SmokeTestDriverIntegrationTest.java ########## @@ -104,10 +104,6 @@ public void shouldWorkWithRebalance() throws InterruptedException { clients.add(smokeTestClient); smokeTestClient.start(props); - while (!clients.get(clients.size() - 1).started()) { - Thread.sleep(100); - } Review comment: This isn't needed anymore, as `SmokeTestClient#start` now blocks until the instance goes to running the first time. ########## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -38,107 +37,128 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; public class SmokeTestClient extends SmokeTestUtil { private final String name; - private Thread thread; private KafkaStreams streams; private boolean uncaughtException = false; - private boolean started; - private boolean closed; + private volatile boolean closed; - public SmokeTestClient(final String name) { - super(); - this.name = name; + private static void addShutdownHook(final String name, final Runnable runnable) { + if (name != null) { + Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); + } else { + Runtime.getRuntime().addShutdownHook(new Thread(runnable)); + } } - public boolean started() { - return started; + private static File tempDirectory() { + final String prefix = "kafka-"; + final File file; + try { + file = Files.createTempDirectory(prefix).toFile(); + } catch (final IOException ex) { + throw new RuntimeException("Failed to create a temp dir", ex); + } + file.deleteOnExit(); + + addShutdownHook("delete-temp-file-shutdown-hook", () -> { + try { + Utils.delete(file); + } catch (final IOException e) { + System.out.println("Error deleting " + file.getAbsolutePath()); + e.printStackTrace(System.out); + } + }); + + return file; + } + + public SmokeTestClient(final String name) { + this.name = name; } public boolean closed() { return closed; } public void start(final Properties streamsProperties) { - streams = createKafkaStreams(streamsProperties); + final Topology build = getTopology(); + streams = new KafkaStreams(build, getStreamsConfig(streamsProperties)); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + streams.setStateListener((newState, oldState) -> { + System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState); + if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { + countDownLatch.countDown(); + } + + if (newState == KafkaStreams.State.NOT_RUNNING) { + closed = true; + } + }); + streams.setUncaughtExceptionHandler((t, e) -> { System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); + System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e); + e.printStackTrace(System.out); uncaughtException = true; - e.printStackTrace(); + streams.close(Duration.ofSeconds(30)); }); - Exit.addShutdownHook("streams-shutdown-hook", () -> close()); + addShutdownHook("streams-shutdown-hook", this::close); - thread = new Thread(() -> streams.start()); - thread.start(); + streams.start(); + try { + if (!countDownLatch.await(1, TimeUnit.MINUTES)) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute"); + } + } catch (final InterruptedException e) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e); + e.printStackTrace(System.out); + } + System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED"); + System.out.println(name + " started at " + Instant.now()); } public void closeAsync() { streams.close(Duration.ZERO); } public void close() { - streams.close(Duration.ofSeconds(5)); - // do not remove these printouts since they are needed for health scripts - if (!uncaughtException) { + final boolean wasClosed = streams.close(Duration.ofMinutes(1)); Review comment: 5 seconds seems a bit stingy :) . Note that we were previously ignoring the case where we didn't close within the timeout, so we have no idea if 5 seconds was ever long enough. I figured a minute is more reasonable. ########## File path: streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java ########## @@ -0,0 +1,632 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; + +public class SmokeTestDriver extends SmokeTestUtil { Review comment: copy/pasted ########## File path: streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; +import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually; + +public class StreamsSmokeTest { Review comment: copy/pasted ########## File path: streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowStore; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.time.Instant; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; + +public class SmokeTestClient extends SmokeTestUtil { Review comment: copy/pasted ########## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -38,107 +37,128 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; public class SmokeTestClient extends SmokeTestUtil { private final String name; - private Thread thread; private KafkaStreams streams; private boolean uncaughtException = false; - private boolean started; - private boolean closed; + private volatile boolean closed; - public SmokeTestClient(final String name) { - super(); - this.name = name; + private static void addShutdownHook(final String name, final Runnable runnable) { + if (name != null) { + Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); + } else { + Runtime.getRuntime().addShutdownHook(new Thread(runnable)); + } } - public boolean started() { - return started; + private static File tempDirectory() { + final String prefix = "kafka-"; + final File file; + try { + file = Files.createTempDirectory(prefix).toFile(); + } catch (final IOException ex) { + throw new RuntimeException("Failed to create a temp dir", ex); + } + file.deleteOnExit(); + + addShutdownHook("delete-temp-file-shutdown-hook", () -> { + try { + Utils.delete(file); + } catch (final IOException e) { + System.out.println("Error deleting " + file.getAbsolutePath()); + e.printStackTrace(System.out); + } + }); + + return file; + } + + public SmokeTestClient(final String name) { + this.name = name; } public boolean closed() { return closed; } public void start(final Properties streamsProperties) { - streams = createKafkaStreams(streamsProperties); + final Topology build = getTopology(); + streams = new KafkaStreams(build, getStreamsConfig(streamsProperties)); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + streams.setStateListener((newState, oldState) -> { + System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState); + if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { + countDownLatch.countDown(); + } + + if (newState == KafkaStreams.State.NOT_RUNNING) { + closed = true; + } + }); + streams.setUncaughtExceptionHandler((t, e) -> { System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); + System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e); + e.printStackTrace(System.out); uncaughtException = true; - e.printStackTrace(); + streams.close(Duration.ofSeconds(30)); }); - Exit.addShutdownHook("streams-shutdown-hook", () -> close()); + addShutdownHook("streams-shutdown-hook", this::close); - thread = new Thread(() -> streams.start()); - thread.start(); + streams.start(); + try { + if (!countDownLatch.await(1, TimeUnit.MINUTES)) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute"); + } + } catch (final InterruptedException e) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e); + e.printStackTrace(System.out); + } + System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED"); Review comment: Blocking and then printing this gives the system tests the ability to explicitly wait until the instances are joined. ########## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -38,107 +37,128 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; public class SmokeTestClient extends SmokeTestUtil { private final String name; - private Thread thread; private KafkaStreams streams; private boolean uncaughtException = false; - private boolean started; Review comment: no longer needed, since start() is now blocking ########## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -38,107 +37,128 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; public class SmokeTestClient extends SmokeTestUtil { private final String name; - private Thread thread; Review comment: This thread was actually pointless, since StreamThreads are already user (not daemon) threads. ########## File path: streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java ########## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; + +public class SmokeTestDriver extends SmokeTestUtil { Review comment: copy/pasted ########## File path: streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowStore; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.time.Instant; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; + +public class SmokeTestClient extends SmokeTestUtil { Review comment: Everything in the `upgrade-system-tests...` directories is just copy/pasted from the main SmokeTest implementations. ########## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -38,107 +37,128 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; public class SmokeTestClient extends SmokeTestUtil { private final String name; - private Thread thread; private KafkaStreams streams; private boolean uncaughtException = false; - private boolean started; - private boolean closed; + private volatile boolean closed; - public SmokeTestClient(final String name) { - super(); - this.name = name; + private static void addShutdownHook(final String name, final Runnable runnable) { + if (name != null) { + Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); + } else { + Runtime.getRuntime().addShutdownHook(new Thread(runnable)); + } } - public boolean started() { - return started; + private static File tempDirectory() { + final String prefix = "kafka-"; + final File file; + try { + file = Files.createTempDirectory(prefix).toFile(); + } catch (final IOException ex) { + throw new RuntimeException("Failed to create a temp dir", ex); + } + file.deleteOnExit(); + + addShutdownHook("delete-temp-file-shutdown-hook", () -> { + try { + Utils.delete(file); + } catch (final IOException e) { + System.out.println("Error deleting " + file.getAbsolutePath()); + e.printStackTrace(System.out); + } + }); + + return file; + } + + public SmokeTestClient(final String name) { + this.name = name; } public boolean closed() { return closed; } public void start(final Properties streamsProperties) { - streams = createKafkaStreams(streamsProperties); + final Topology build = getTopology(); + streams = new KafkaStreams(build, getStreamsConfig(streamsProperties)); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + streams.setStateListener((newState, oldState) -> { + System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState); + if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { + countDownLatch.countDown(); + } + + if (newState == KafkaStreams.State.NOT_RUNNING) { + closed = true; + } + }); + streams.setUncaughtExceptionHandler((t, e) -> { System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); + System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e); + e.printStackTrace(System.out); uncaughtException = true; - e.printStackTrace(); + streams.close(Duration.ofSeconds(30)); }); - Exit.addShutdownHook("streams-shutdown-hook", () -> close()); + addShutdownHook("streams-shutdown-hook", this::close); - thread = new Thread(() -> streams.start()); - thread.start(); + streams.start(); + try { + if (!countDownLatch.await(1, TimeUnit.MINUTES)) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute"); + } + } catch (final InterruptedException e) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e); + e.printStackTrace(System.out); + } + System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED"); + System.out.println(name + " started at " + Instant.now()); } public void closeAsync() { streams.close(Duration.ZERO); } public void close() { - streams.close(Duration.ofSeconds(5)); - // do not remove these printouts since they are needed for health scripts - if (!uncaughtException) { + final boolean wasClosed = streams.close(Duration.ofMinutes(1)); + + if (wasClosed && !uncaughtException) { Review comment: I just happened to notice that we weren't previously checking that Streams actually finished closing. It looks like we were expecting some kind of exception to get thrown during the `join()` below, but that's not the way it works. ########## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -38,107 +37,128 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; public class SmokeTestClient extends SmokeTestUtil { private final String name; - private Thread thread; private KafkaStreams streams; private boolean uncaughtException = false; - private boolean started; - private boolean closed; + private volatile boolean closed; - public SmokeTestClient(final String name) { - super(); - this.name = name; + private static void addShutdownHook(final String name, final Runnable runnable) { + if (name != null) { + Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); + } else { + Runtime.getRuntime().addShutdownHook(new Thread(runnable)); + } } - public boolean started() { - return started; + private static File tempDirectory() { + final String prefix = "kafka-"; + final File file; + try { + file = Files.createTempDirectory(prefix).toFile(); + } catch (final IOException ex) { + throw new RuntimeException("Failed to create a temp dir", ex); + } + file.deleteOnExit(); + + addShutdownHook("delete-temp-file-shutdown-hook", () -> { + try { + Utils.delete(file); + } catch (final IOException e) { + System.out.println("Error deleting " + file.getAbsolutePath()); + e.printStackTrace(System.out); + } + }); + + return file; + } + + public SmokeTestClient(final String name) { + this.name = name; } public boolean closed() { return closed; } public void start(final Properties streamsProperties) { - streams = createKafkaStreams(streamsProperties); + final Topology build = getTopology(); + streams = new KafkaStreams(build, getStreamsConfig(streamsProperties)); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + streams.setStateListener((newState, oldState) -> { + System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState); + if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { + countDownLatch.countDown(); + } + + if (newState == KafkaStreams.State.NOT_RUNNING) { + closed = true; + } + }); + streams.setUncaughtExceptionHandler((t, e) -> { System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); + System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e); + e.printStackTrace(System.out); uncaughtException = true; - e.printStackTrace(); + streams.close(Duration.ofSeconds(30)); }); - Exit.addShutdownHook("streams-shutdown-hook", () -> close()); + addShutdownHook("streams-shutdown-hook", this::close); - thread = new Thread(() -> streams.start()); - thread.start(); + streams.start(); + try { + if (!countDownLatch.await(1, TimeUnit.MINUTES)) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute"); + } + } catch (final InterruptedException e) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e); + e.printStackTrace(System.out); + } + System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED"); + System.out.println(name + " started at " + Instant.now()); } public void closeAsync() { streams.close(Duration.ZERO); } public void close() { - streams.close(Duration.ofSeconds(5)); - // do not remove these printouts since they are needed for health scripts - if (!uncaughtException) { + final boolean wasClosed = streams.close(Duration.ofMinutes(1)); + + if (wasClosed && !uncaughtException) { System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED"); - } - try { - thread.join(); - } catch (final Exception ex) { - // do not remove these printouts since they are needed for health scripts + } else if (wasClosed) { System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); - // ignore + } else { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close"); Review comment: So here's what we print if there was no uncaught exception, but we also didn't close in time. ########## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -38,107 +37,128 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; public class SmokeTestClient extends SmokeTestUtil { private final String name; - private Thread thread; private KafkaStreams streams; private boolean uncaughtException = false; - private boolean started; - private boolean closed; + private volatile boolean closed; - public SmokeTestClient(final String name) { - super(); - this.name = name; + private static void addShutdownHook(final String name, final Runnable runnable) { + if (name != null) { + Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); + } else { + Runtime.getRuntime().addShutdownHook(new Thread(runnable)); + } } - public boolean started() { - return started; + private static File tempDirectory() { + final String prefix = "kafka-"; + final File file; + try { + file = Files.createTempDirectory(prefix).toFile(); + } catch (final IOException ex) { + throw new RuntimeException("Failed to create a temp dir", ex); + } + file.deleteOnExit(); + + addShutdownHook("delete-temp-file-shutdown-hook", () -> { + try { + Utils.delete(file); + } catch (final IOException e) { + System.out.println("Error deleting " + file.getAbsolutePath()); + e.printStackTrace(System.out); + } + }); + + return file; + } + + public SmokeTestClient(final String name) { + this.name = name; } public boolean closed() { return closed; } public void start(final Properties streamsProperties) { - streams = createKafkaStreams(streamsProperties); Review comment: Inlining this function actually fixed a bug in which the next line was setting a handler that actually replaced the handler registered in createKafkaStreams. The function was only used from right here anyway, so it was needless complexity. ########## File path: streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.time.Instant; + +public class SmokeTestUtil { Review comment: copy/pasted ########## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -38,107 +37,128 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; public class SmokeTestClient extends SmokeTestUtil { private final String name; - private Thread thread; private KafkaStreams streams; private boolean uncaughtException = false; - private boolean started; - private boolean closed; + private volatile boolean closed; - public SmokeTestClient(final String name) { - super(); - this.name = name; + private static void addShutdownHook(final String name, final Runnable runnable) { + if (name != null) { + Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); + } else { + Runtime.getRuntime().addShutdownHook(new Thread(runnable)); + } } - public boolean started() { - return started; + private static File tempDirectory() { + final String prefix = "kafka-"; + final File file; + try { + file = Files.createTempDirectory(prefix).toFile(); + } catch (final IOException ex) { + throw new RuntimeException("Failed to create a temp dir", ex); + } + file.deleteOnExit(); + + addShutdownHook("delete-temp-file-shutdown-hook", () -> { + try { + Utils.delete(file); + } catch (final IOException e) { + System.out.println("Error deleting " + file.getAbsolutePath()); + e.printStackTrace(System.out); + } + }); + + return file; + } + + public SmokeTestClient(final String name) { + this.name = name; } public boolean closed() { return closed; } public void start(final Properties streamsProperties) { - streams = createKafkaStreams(streamsProperties); + final Topology build = getTopology(); + streams = new KafkaStreams(build, getStreamsConfig(streamsProperties)); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + streams.setStateListener((newState, oldState) -> { + System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState); + if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { + countDownLatch.countDown(); + } + + if (newState == KafkaStreams.State.NOT_RUNNING) { + closed = true; + } + }); + streams.setUncaughtExceptionHandler((t, e) -> { System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); + System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e); + e.printStackTrace(System.out); uncaughtException = true; - e.printStackTrace(); + streams.close(Duration.ofSeconds(30)); }); - Exit.addShutdownHook("streams-shutdown-hook", () -> close()); + addShutdownHook("streams-shutdown-hook", this::close); - thread = new Thread(() -> streams.start()); - thread.start(); + streams.start(); + try { + if (!countDownLatch.await(1, TimeUnit.MINUTES)) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't start in one minute"); + } + } catch (final InterruptedException e) { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e); + e.printStackTrace(System.out); + } + System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED"); + System.out.println(name + " started at " + Instant.now()); } public void closeAsync() { streams.close(Duration.ZERO); } public void close() { - streams.close(Duration.ofSeconds(5)); - // do not remove these printouts since they are needed for health scripts - if (!uncaughtException) { + final boolean wasClosed = streams.close(Duration.ofMinutes(1)); + + if (wasClosed && !uncaughtException) { System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED"); - } - try { - thread.join(); - } catch (final Exception ex) { - // do not remove these printouts since they are needed for health scripts + } else if (wasClosed) { System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); - // ignore + } else { + System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't close"); } } private Properties getStreamsConfig(final Properties props) { final Properties fullProps = new Properties(props); fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest"); fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name); - fullProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3); - fullProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2); - fullProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 100); - fullProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); - fullProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3); - fullProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - fullProps.put(ProducerConfig.ACKS_CONFIG, "all"); Review comment: Moved these to the python code, where the "properties file" itself is built. I left only the properties that are better off dynamically generated in the java code. ########## File path: streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.time.Instant; + +public class SmokeTestUtil { Review comment: copy/pasted ########## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -38,107 +37,128 @@ import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import org.apache.kafka.test.TestUtils; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; import java.time.Duration; import java.time.Instant; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; public class SmokeTestClient extends SmokeTestUtil { private final String name; - private Thread thread; private KafkaStreams streams; private boolean uncaughtException = false; - private boolean started; - private boolean closed; + private volatile boolean closed; - public SmokeTestClient(final String name) { - super(); - this.name = name; + private static void addShutdownHook(final String name, final Runnable runnable) { + if (name != null) { + Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); + } else { + Runtime.getRuntime().addShutdownHook(new Thread(runnable)); + } } - public boolean started() { - return started; + private static File tempDirectory() { + final String prefix = "kafka-"; + final File file; + try { + file = Files.createTempDirectory(prefix).toFile(); + } catch (final IOException ex) { + throw new RuntimeException("Failed to create a temp dir", ex); + } + file.deleteOnExit(); + + addShutdownHook("delete-temp-file-shutdown-hook", () -> { + try { + Utils.delete(file); + } catch (final IOException e) { + System.out.println("Error deleting " + file.getAbsolutePath()); + e.printStackTrace(System.out); + } + }); + + return file; + } + + public SmokeTestClient(final String name) { + this.name = name; } public boolean closed() { return closed; } public void start(final Properties streamsProperties) { - streams = createKafkaStreams(streamsProperties); + final Topology build = getTopology(); + streams = new KafkaStreams(build, getStreamsConfig(streamsProperties)); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + streams.setStateListener((newState, oldState) -> { + System.out.printf("%s %s: %s -> %s%n", name, Instant.now(), oldState, newState); + if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { + countDownLatch.countDown(); + } + + if (newState == KafkaStreams.State.NOT_RUNNING) { + closed = true; + } + }); + streams.setUncaughtExceptionHandler((t, e) -> { System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION"); + System.out.println(name + ": FATAL: An unexpected exception is encountered on thread " + t + ": " + e); + e.printStackTrace(System.out); uncaughtException = true; - e.printStackTrace(); + streams.close(Duration.ofSeconds(30)); Review comment: This is logic from the inlined function that had gotten lost when we set the handler again. ########## File path: streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java ########## @@ -75,7 +75,7 @@ public void process(final Object key, final Object value) { @Override public void close() { - System.out.printf("Close processor for task %s", context().taskId()); + System.out.printf("Close processor for task %s%n", context().taskId()); Review comment: I just happened to notice that the newline was missing when I looked at the stdout. It didn't affect the tests' ability to grep. ########## File path: streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.time.Instant; + +public class SmokeTestUtil { Review comment: copy/pasted ########## File path: streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java ########## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; + +public class SmokeTestDriver extends SmokeTestUtil { Review comment: copy/pasted ########## File path: streams/upgrade-system-tests-23/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowStore; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.time.Instant; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; + +public class SmokeTestClient extends SmokeTestUtil { Review comment: copy/pasted ########## File path: streams/upgrade-system-tests-22/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; +import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually; + +public class StreamsSmokeTest { Review comment: copy/pasted ########## File path: tests/kafkatest/services/streams.py ########## @@ -305,23 +305,62 @@ def start_node(self, node): class StreamsSmokeTestBaseService(StreamsTestBaseService): """Base class for Streams Smoke Test services providing some common settings and functionality""" - def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3): + def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3): super(StreamsSmokeTestBaseService, self).__init__(test_context, kafka, "org.apache.kafka.streams.tests.StreamsSmokeTest", command) self.NUM_THREADS = num_threads self.PROCESSING_GUARANTEE = processing_guarantee + self.KAFKA_STREAMS_VERSION = "" + self.UPGRADE_FROM = None + self.REPLICATION_FACTOR = replication_factor + + def set_version(self, kafka_streams_version): + self.KAFKA_STREAMS_VERSION = kafka_streams_version + + def set_upgrade_from(self, upgrade_from): + self.UPGRADE_FROM = upgrade_from Review comment: some other stuff for the upgrade tests. ########## File path: streams/upgrade-system-tests-24/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; +import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually; + +public class StreamsSmokeTest { Review comment: copy/pasted ########## File path: tests/kafkatest/services/streams.py ########## @@ -305,23 +305,62 @@ def start_node(self, node): class StreamsSmokeTestBaseService(StreamsTestBaseService): """Base class for Streams Smoke Test services providing some common settings and functionality""" - def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3): + def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3): Review comment: Added the ability to set this, so that we can just run one broker from the upgrade tests. ########## File path: streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java ########## @@ -0,0 +1,622 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Random; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.Collections.emptyMap; +import static org.apache.kafka.common.utils.Utils.mkEntry; + +public class SmokeTestDriver extends SmokeTestUtil { Review comment: copy/pasted ########## File path: streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java ########## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.kstream.Initializer; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +import java.time.Instant; + +public class SmokeTestUtil { Review comment: copy/pasted ########## File path: streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java ########## @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.KafkaThread; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Suppressed.BufferConfig; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.WindowStore; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.time.Duration; +import java.time.Instant; +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; + +public class SmokeTestClient extends SmokeTestUtil { Review comment: copy/pasted ########## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ########## @@ -37,6 +37,9 @@ # can be replaced with metadata_2_versions backward_compatible_metadata_2_versions = [str(LATEST_0_10_2), str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1)] metadata_3_or_higher_versions = [str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5), str(DEV_VERSION)] +smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4), str(LATEST_2_5)] Review comment: Unfortunately, 2.1 doesn't work. See https://issues.apache.org/jira/browse/KAFKA-10203 ########## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ########## @@ -189,8 +192,8 @@ def test_upgrade_downgrade_brokers(self, from_version, to_version): processor.stop() processor.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False) - @matrix(from_version=metadata_2_versions, to_version=metadata_2_versions) - def test_simple_upgrade_downgrade(self, from_version, to_version): + @matrix(from_version=smoke_test_versions, to_version=dev_version) + def test_app_upgrade(self, from_version, to_version): Review comment: Changed the name to reflect that we're just testing upgrades now. ########## File path: tests/kafkatest/services/streams.py ########## @@ -305,23 +305,62 @@ def start_node(self, node): class StreamsSmokeTestBaseService(StreamsTestBaseService): """Base class for Streams Smoke Test services providing some common settings and functionality""" - def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3): + def __init__(self, test_context, kafka, command, processing_guarantee = 'at_least_once', num_threads = 3, replication_factor = 3): super(StreamsSmokeTestBaseService, self).__init__(test_context, kafka, "org.apache.kafka.streams.tests.StreamsSmokeTest", command) self.NUM_THREADS = num_threads self.PROCESSING_GUARANTEE = processing_guarantee + self.KAFKA_STREAMS_VERSION = "" + self.UPGRADE_FROM = None + self.REPLICATION_FACTOR = replication_factor + + def set_version(self, kafka_streams_version): + self.KAFKA_STREAMS_VERSION = kafka_streams_version + + def set_upgrade_from(self, upgrade_from): + self.UPGRADE_FROM = upgrade_from def prop_file(self): properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), streams_property.PROCESSING_GUARANTEE: self.PROCESSING_GUARANTEE, - streams_property.NUM_THREADS: self.NUM_THREADS} + streams_property.NUM_THREADS: self.NUM_THREADS, + "replication.factor": self.REPLICATION_FACTOR, + "num.standby.replicas": 2, + "buffered.records.per.partition": 100, + "commit.interval.ms": 1000, + "auto.offset.reset": "earliest", + "acks": "all"} Review comment: moved from Java ########## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ########## @@ -201,14 +204,29 @@ def test_simple_upgrade_downgrade(self, from_version, to_version): self.zk = ZookeeperService(self.test_context, num_nodes=1) self.zk.start() - self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics=self.topics) + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, topics={ + 'echo' : { 'partitions': 5, 'replication-factor': 1 }, + 'data' : { 'partitions': 5, 'replication-factor': 1 }, + 'min' : { 'partitions': 5, 'replication-factor': 1 }, + 'min-suppressed' : { 'partitions': 5, 'replication-factor': 1 }, + 'min-raw' : { 'partitions': 5, 'replication-factor': 1 }, + 'max' : { 'partitions': 5, 'replication-factor': 1 }, + 'sum' : { 'partitions': 5, 'replication-factor': 1 }, + 'sws-raw' : { 'partitions': 5, 'replication-factor': 1 }, + 'sws-suppressed' : { 'partitions': 5, 'replication-factor': 1 }, + 'dif' : { 'partitions': 5, 'replication-factor': 1 }, + 'cnt' : { 'partitions': 5, 'replication-factor': 1 }, + 'avg' : { 'partitions': 5, 'replication-factor': 1 }, + 'wcnt' : { 'partitions': 5, 'replication-factor': 1 }, + 'tagg' : { 'partitions': 5, 'replication-factor': 1 } + }) Review comment: required setup for the smoke test app ########## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ########## @@ -349,56 +370,42 @@ def get_version_string(self, version): def start_all_nodes_with(self, version): kafka_version_str = self.get_version_string(version) - # start first with <version> self.prepare_for(self.processor1, version) - node1 = self.processor1.node - with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor: - with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor: - self.processor1.start() - log_monitor.wait_until(kafka_version_str, - timeout_sec=60, - err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account)) - monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) - - # start second with <version> self.prepare_for(self.processor2, version) - node2 = self.processor2.node - with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: - with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor: - with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor: - self.processor2.start() - log_monitor.wait_until(kafka_version_str, - timeout_sec=60, - err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account)) - first_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) - second_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account)) - - # start third with <version> self.prepare_for(self.processor3, version) - node3 = self.processor3.node - with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: - with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor: - with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor: - with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor: - self.processor3.start() - log_monitor.wait_until(kafka_version_str, - timeout_sec=60, - err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account)) - first_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) - second_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account)) - third_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account)) + + self.processor1.start() + self.processor2.start() + self.processor3.start() Review comment: I changed the "first time startup" method to just start all the instances at the same time, rather than waiting for them to start up and process one at a time. For the upgrade part of the test, we still do rolling upgrades. ########## File path: streams/upgrade-system-tests-25/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.IOException; +import java.time.Duration; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import static org.apache.kafka.streams.tests.SmokeTestDriver.generate; +import static org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually; + +public class StreamsSmokeTest { Review comment: copy/pasted ########## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ########## @@ -349,56 +370,42 @@ def get_version_string(self, version): def start_all_nodes_with(self, version): kafka_version_str = self.get_version_string(version) - # start first with <version> self.prepare_for(self.processor1, version) - node1 = self.processor1.node - with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor: - with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor: - self.processor1.start() - log_monitor.wait_until(kafka_version_str, - timeout_sec=60, - err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account)) - monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) - - # start second with <version> self.prepare_for(self.processor2, version) - node2 = self.processor2.node - with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: - with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor: - with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor: - self.processor2.start() - log_monitor.wait_until(kafka_version_str, - timeout_sec=60, - err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account)) - first_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) - second_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account)) - - # start third with <version> self.prepare_for(self.processor3, version) - node3 = self.processor3.node - with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: - with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor: - with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor: - with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor: - self.processor3.start() - log_monitor.wait_until(kafka_version_str, - timeout_sec=60, - err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account)) - first_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) - second_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account)) - third_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account)) + + self.processor1.start() + self.processor2.start() + self.processor3.start() + + # double-check the version + self.wait_for_verification(self.processor1, kafka_version_str, self.processor1.LOG_FILE) + self.wait_for_verification(self.processor2, kafka_version_str, self.processor2.LOG_FILE) + self.wait_for_verification(self.processor3, kafka_version_str, self.processor3.LOG_FILE) + + # wait for the members to join + self.wait_for_verification(self.processor1, "SMOKE-TEST-CLIENT-STARTED", self.processor1.STDOUT_FILE) + self.wait_for_verification(self.processor2, "SMOKE-TEST-CLIENT-STARTED", self.processor2.STDOUT_FILE) + self.wait_for_verification(self.processor3, "SMOKE-TEST-CLIENT-STARTED", self.processor3.STDOUT_FILE) + + # make sure they've processed something + self.wait_for_verification(self.processor1, self.processed_msg, self.processor1.STDOUT_FILE) + self.wait_for_verification(self.processor2, self.processed_msg, self.processor2.STDOUT_FILE) + self.wait_for_verification(self.processor3, self.processed_msg, self.processor3.STDOUT_FILE) + + def wait_for_verification(self, processor, message, file, num_lines=1): + wait_until(lambda: self.verify_from_file(processor, message, file) >= num_lines, + timeout_sec=60, + err_msg="Did expect to read '%s' from %s" % (message, processor.node.account)) + + @staticmethod + def verify_from_file(processor, message, file): Review comment: Copied this over here so that we don't have to mess with monitors if we just want to search from the beginning of the files. ########## File path: tests/kafkatest/tests/streams/streams_upgrade_test.py ########## @@ -349,56 +370,42 @@ def get_version_string(self, version): def start_all_nodes_with(self, version): kafka_version_str = self.get_version_string(version) - # start first with <version> self.prepare_for(self.processor1, version) - node1 = self.processor1.node - with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor: - with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor: - self.processor1.start() - log_monitor.wait_until(kafka_version_str, - timeout_sec=60, - err_msg="Could not detect Kafka Streams version " + version + " " + str(node1.account)) - monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) - - # start second with <version> self.prepare_for(self.processor2, version) - node2 = self.processor2.node - with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: - with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor: - with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor: - self.processor2.start() - log_monitor.wait_until(kafka_version_str, - timeout_sec=60, - err_msg="Could not detect Kafka Streams version " + version + " on " + str(node2.account)) - first_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) - second_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account)) - - # start third with <version> self.prepare_for(self.processor3, version) - node3 = self.processor3.node - with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor: - with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor: - with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor: - with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor: - self.processor3.start() - log_monitor.wait_until(kafka_version_str, - timeout_sec=60, - err_msg="Could not detect Kafka Streams version " + version + " on " + str(node3.account)) - first_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node1.account)) - second_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node2.account)) - third_monitor.wait_until(self.processed_msg, - timeout_sec=60, - err_msg="Never saw output '%s' on " % self.processed_msg + str(node3.account)) + + self.processor1.start() + self.processor2.start() + self.processor3.start() + + # double-check the version + self.wait_for_verification(self.processor1, kafka_version_str, self.processor1.LOG_FILE) + self.wait_for_verification(self.processor2, kafka_version_str, self.processor2.LOG_FILE) + self.wait_for_verification(self.processor3, kafka_version_str, self.processor3.LOG_FILE) + + # wait for the members to join Review comment: Here's where we use that new output line after synchronously waiting to join the group in `start()` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org