[ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16551030#comment-16551030
 ] 

ASF GitHub Bot commented on KAFKA-5037:
---------------------------------------

guozhangwang closed pull request #5399: KAFKA-5037 Follow-up: move Scala test 
to Java
URL: https://github.com/apache/kafka/pull/5399
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 5fc768dc82b..61bbb8b5dac 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -16,11 +16,14 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.streams.KafkaStreamsWrapper;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -31,6 +34,8 @@
 import java.util.Collections;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+
 
 /**
  * Tests all available joins of Kafka Streams DSL.
@@ -56,6 +61,31 @@ public void prepareTopology() throws InterruptedException {
         leftStream = builder.stream(INPUT_TOPIC_LEFT);
     }
 
+    @Test
+    public void testShouldAutoShutdownOnIncompleteMetadata() throws 
InterruptedException {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-incomplete");
+
+        final KStream<Long, String> notExistStream = 
builder.stream(INPUT_TOPIC_LEFT + "-not-existed");
+
+        final KTable<Long, String> aggregatedTable = 
notExistStream.leftJoin(rightTable, valueJoiner)
+                .groupBy((key, value) -> key)
+                .reduce((value1, value2) -> value1 + value2);
+
+        // Write the (continuously updating) results to the output topic.
+        aggregatedTable.toStream().to(OUTPUT_TOPIC);
+
+        final KafkaStreamsWrapper streams = new 
KafkaStreamsWrapper(builder.build(), STREAMS_CONFIG);
+        final IntegrationTestUtils.StateListenerStub listener = new 
IntegrationTestUtils.StateListenerStub();
+        streams.setStreamThreadStateListener(listener);
+        streams.start();
+
+        TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, 
"Did not seen thread state transited to PENDING_SHUTDOWN");
+
+        streams.close();
+        assertEquals(listener.runningToRevokedSeen(), true);
+        assertEquals(listener.revokedToPendingShutdownSeen(), true);
+    }
+
     @Test
     public void testInner() throws Exception {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner");
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
deleted file mode 100644
index 3bf597738a9..00000000000
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
- * Copyright (C) 2017-2018 Alexis Seigneurin.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala
-
-import java.util.Properties
-
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.serialization._
-import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.streams._
-import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, 
IntegrationTestUtils}
-import org.apache.kafka.streams.processor.internals.StreamThread
-import org.apache.kafka.streams.scala.ImplicitConversions._
-import org.apache.kafka.streams.scala.kstream._
-import org.apache.kafka.test.TestUtils
-import org.junit.Assert._
-import org.junit._
-import org.junit.rules.TemporaryFolder
-import org.scalatest.junit.JUnitSuite
-
-/**
- * Test suite that verifies the shutdown of StreamThread when metadata is 
incomplete during stream-table joins in Kafka Streams
- * <p>
- */
-class StreamToTableJoinWithIncompleteMetadataIntegrationTest extends 
StreamToTableJoinScalaIntegrationTestBase {
-
-  @Test def testShouldAutoShutdownOnIncompleteMetadata(): Unit = {
-
-    // DefaultSerdes brings into scope implicit serdes (mostly for primitives) 
that will set up all Serialized, Produced,
-    // Consumed and Joined instances. So all APIs below that accept 
Serialized, Produced, Consumed or Joined will
-    // get these instances automatically
-    import Serdes._
-
-    val streamsConfiguration: Properties = getStreamsConfiguration()
-
-    val builder = new StreamsBuilder()
-
-    val userClicksStream: KStream[String, Long] = 
builder.stream(userClicksTopic)
-
-    val userRegionsTable: KTable[String, String] = 
builder.table(userRegionsTopic + "1")
-
-    // Compute the total per region by summing the individual click counts per 
region.
-    val clicksPerRegion: KTable[String, Long] =
-      userClicksStream
-
-      // Join the stream against the table.
-        .leftJoin(userRegionsTable)((clicks, region) => (if (region == null) 
"UNKNOWN" else region, clicks))
-
-        // Change the stream from <user> -> <region, clicks> to <region> -> 
<clicks>
-        .map((_, regionWithClicks) => regionWithClicks)
-
-        // Compute the total per region by summing the individual click counts 
per region.
-        .groupByKey
-        .reduce(_ + _)
-
-    // Write the (continuously updating) results to the output topic.
-    clicksPerRegion.toStream.to(outputTopic)
-
-    val streams: KafkaStreamsWrapper = new 
KafkaStreamsWrapper(builder.build(), streamsConfiguration)
-    val listener = new IntegrationTestUtils.StateListenerStub()
-    streams.setStreamThreadStateListener(listener)
-    streams.start()
-
-    val actualClicksPerRegion: java.util.List[KeyValue[String, Long]] =
-      produceNConsume(userClicksTopic, userRegionsTopic, outputTopic, false)
-    while (!listener.revokedToPendingShutdownSeen()) {
-      Thread.sleep(3)
-    }
-    streams.close()
-    assertEquals(listener.runningToRevokedSeen(), true)
-    assertEquals(listener.revokedToPendingShutdownSeen(), true)
-  }
-}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Infinite loop if all input topics are unknown at startup
> --------------------------------------------------------
>
>                 Key: KAFKA-5037
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5037
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Matthias J. Sax
>            Assignee: Ted Yu
>            Priority: Major
>              Labels: newbie++, user-experience
>             Fix For: 2.1.0
>
>         Attachments: 5037.v2.txt, 5037.v4.txt
>
>
> See discusion: https://github.com/apache/kafka/pull/2815
> We will need some rewrite on {{StreamPartitionsAssignor}} and to add much 
> more test for all kind of corner cases, including pattern subscriptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to