[ https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551030#comment-16551030 ]
ASF GitHub Bot commented on KAFKA-5037: --------------------------------------- guozhangwang closed pull request #5399: KAFKA-5037 Follow-up: move Scala test to Java URL: https://github.com/apache/kafka/pull/5399 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java index 5fc768dc82b..61bbb8b5dac 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java @@ -16,11 +16,14 @@ */ package org.apache.kafka.streams.integration; +import org.apache.kafka.streams.KafkaStreamsWrapper; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.test.IntegrationTest; +import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -31,6 +34,8 @@ import java.util.Collections; import java.util.List; +import static org.junit.Assert.assertEquals; + /** * Tests all available joins of Kafka Streams DSL. @@ -56,6 +61,31 @@ public void prepareTopology() throws InterruptedException { leftStream = builder.stream(INPUT_TOPIC_LEFT); } + @Test + public void testShouldAutoShutdownOnIncompleteMetadata() throws InterruptedException { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-incomplete"); + + final KStream<Long, String> notExistStream = builder.stream(INPUT_TOPIC_LEFT + "-not-existed"); + + final KTable<Long, String> aggregatedTable = notExistStream.leftJoin(rightTable, valueJoiner) + .groupBy((key, value) -> key) + .reduce((value1, value2) -> value1 + value2); + + // Write the (continuously updating) results to the output topic. + aggregatedTable.toStream().to(OUTPUT_TOPIC); + + final KafkaStreamsWrapper streams = new KafkaStreamsWrapper(builder.build(), STREAMS_CONFIG); + final IntegrationTestUtils.StateListenerStub listener = new IntegrationTestUtils.StateListenerStub(); + streams.setStreamThreadStateListener(listener); + streams.start(); + + TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, "Did not seen thread state transited to PENDING_SHUTDOWN"); + + streams.close(); + assertEquals(listener.runningToRevokedSeen(), true); + assertEquals(listener.revokedToPendingShutdownSeen(), true); + } + @Test public void testInner() throws Exception { STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner"); diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala deleted file mode 100644 index 3bf597738a9..00000000000 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com> - * Copyright (C) 2017-2018 Alexis Seigneurin. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.scala - -import java.util.Properties - -import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.serialization._ -import org.apache.kafka.common.utils.MockTime -import org.apache.kafka.streams._ -import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, IntegrationTestUtils} -import org.apache.kafka.streams.processor.internals.StreamThread -import org.apache.kafka.streams.scala.ImplicitConversions._ -import org.apache.kafka.streams.scala.kstream._ -import org.apache.kafka.test.TestUtils -import org.junit.Assert._ -import org.junit._ -import org.junit.rules.TemporaryFolder -import org.scalatest.junit.JUnitSuite - -/** - * Test suite that verifies the shutdown of StreamThread when metadata is incomplete during stream-table joins in Kafka Streams - * <p> - */ -class StreamToTableJoinWithIncompleteMetadataIntegrationTest extends StreamToTableJoinScalaIntegrationTestBase { - - @Test def testShouldAutoShutdownOnIncompleteMetadata(): Unit = { - - // DefaultSerdes brings into scope implicit serdes (mostly for primitives) that will set up all Serialized, Produced, - // Consumed and Joined instances. So all APIs below that accept Serialized, Produced, Consumed or Joined will - // get these instances automatically - import Serdes._ - - val streamsConfiguration: Properties = getStreamsConfiguration() - - val builder = new StreamsBuilder() - - val userClicksStream: KStream[String, Long] = builder.stream(userClicksTopic) - - val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic + "1") - - // Compute the total per region by summing the individual click counts per region. - val clicksPerRegion: KTable[String, Long] = - userClicksStream - - // Join the stream against the table. - .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) "UNKNOWN" else region, clicks)) - - // Change the stream from <user> -> <region, clicks> to <region> -> <clicks> - .map((_, regionWithClicks) => regionWithClicks) - - // Compute the total per region by summing the individual click counts per region. - .groupByKey - .reduce(_ + _) - - // Write the (continuously updating) results to the output topic. - clicksPerRegion.toStream.to(outputTopic) - - val streams: KafkaStreamsWrapper = new KafkaStreamsWrapper(builder.build(), streamsConfiguration) - val listener = new IntegrationTestUtils.StateListenerStub() - streams.setStreamThreadStateListener(listener) - streams.start() - - val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] = - produceNConsume(userClicksTopic, userRegionsTopic, outputTopic, false) - while (!listener.revokedToPendingShutdownSeen()) { - Thread.sleep(3) - } - streams.close() - assertEquals(listener.runningToRevokedSeen(), true) - assertEquals(listener.revokedToPendingShutdownSeen(), true) - } -} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Infinite loop if all input topics are unknown at startup > -------------------------------------------------------- > > Key: KAFKA-5037 > URL: https://issues.apache.org/jira/browse/KAFKA-5037 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0 > Reporter: Matthias J. Sax > Assignee: Ted Yu > Priority: Major > Labels: newbie++, user-experience > Fix For: 2.1.0 > > Attachments: 5037.v2.txt, 5037.v4.txt > > > See discusion: https://github.com/apache/kafka/pull/2815 > We will need some rewrite on {{StreamPartitionsAssignor}} and to add much > more test for all kind of corner cases, including pattern subscriptions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)