bbejeck commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r428343055



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -77,6 +79,38 @@ public void 
shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers
         
shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
     }
 
+
+    @Test
+    public void shouldReuseRepartitionTopicWithGeneratedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+        newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-one");
+        newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100))).to("out-to");
+        assertEquals(expectedTopologyWithGeneratedRepartitionTopic, 
builder.build(props).describe().toString());
+    }
+
+    @Test
+    public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> stream1 = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream2 = builder.stream("topic2", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> stream3 = builder.stream("topic3", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KStream<String, String> newStream = stream1.map((k, v) -> new 
KeyValue<>(v, k));
+        final StreamJoined<String, String, String> streamJoined = 
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
+        newStream.join(stream2, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("first-join")).to("out-one");
+        newStream.join(stream3, (value1, value2) -> value1 + value2, 
JoinWindows.of(ofMillis(100)), 
streamJoined.withName("second-join")).to("out-two");
+        final Topology topology =  builder.build(props);
+        System.out.println(topology.describe().toString());
+        assertEquals(expectedTopologyWithUserNamedRepartitionTopics, 
topology.describe().toString());

Review comment:
       >This could be fixed by inserting a repartition() i the new code 
enforcing the old name -- however, this make me wonder if we might want to 
throw a "naming conflict" (ie, cannot pick a name) exception based on the 
original topology for this case when both operators are named, and tell people 
to insert repartition() right away? For this case, if they later remove a join 
it's clear what is happening to them.
   
   I see your point, but I think that is a bad user experience and IMHO leaks 
too much detail about an operation we want to handle automatically.
   
   I'm leaning towards the simpler case of what we had before.  With generated 
names re-use the reputation node, but if the user creates a new join with 
explicit names, just go ahead and create two repartition topics.
   
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to