bbejeck commented on a change in pull request #8504:
URL: https://github.com/apache/kafka/pull/8504#discussion_r428343055
##########
File path:
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
##########
@@ -77,6 +79,38 @@ public void
shouldLogAndMeterOnSkippedRecordsWithNullValueWithBuiltInMetricsVers
shouldLogAndMeterOnSkippedRecordsWithNullValue(StreamsConfig.METRICS_LATEST);
}
+
+ @Test
+ public void shouldReuseRepartitionTopicWithGeneratedName() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final Properties props = new Properties();
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,
StreamsConfig.NO_OPTIMIZATION);
+ final KStream<String, String> stream1 = builder.stream("topic",
Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> stream2 = builder.stream("topic2",
Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> stream3 = builder.stream("topic3",
Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> newStream = stream1.map((k, v) -> new
KeyValue<>(v, k));
+ newStream.join(stream2, (value1, value2) -> value1 + value2,
JoinWindows.of(ofMillis(100))).to("out-one");
+ newStream.join(stream3, (value1, value2) -> value1 + value2,
JoinWindows.of(ofMillis(100))).to("out-to");
+ assertEquals(expectedTopologyWithGeneratedRepartitionTopic,
builder.build(props).describe().toString());
+ }
+
+ @Test
+ public void shouldCreateRepartitionTopicsWithUserProvidedName() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final Properties props = new Properties();
+ props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION,
StreamsConfig.NO_OPTIMIZATION);
+ final KStream<String, String> stream1 = builder.stream("topic",
Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> stream2 = builder.stream("topic2",
Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> stream3 = builder.stream("topic3",
Consumed.with(Serdes.String(), Serdes.String()));
+ final KStream<String, String> newStream = stream1.map((k, v) -> new
KeyValue<>(v, k));
+ final StreamJoined<String, String, String> streamJoined =
StreamJoined.with(Serdes.String(), Serdes.String(), Serdes.String());
+ newStream.join(stream2, (value1, value2) -> value1 + value2,
JoinWindows.of(ofMillis(100)),
streamJoined.withName("first-join")).to("out-one");
+ newStream.join(stream3, (value1, value2) -> value1 + value2,
JoinWindows.of(ofMillis(100)),
streamJoined.withName("second-join")).to("out-two");
+ final Topology topology = builder.build(props);
+ System.out.println(topology.describe().toString());
+ assertEquals(expectedTopologyWithUserNamedRepartitionTopics,
topology.describe().toString());
Review comment:
>This could be fixed by inserting a repartition() i the new code
enforcing the old name -- however, this make me wonder if we might want to
throw a "naming conflict" (ie, cannot pick a name) exception based on the
original topology for this case when both operators are named, and tell people
to insert repartition() right away? For this case, if they later remove a join
it's clear what is happening to them.
I see your point, but I think that is a bad user experience and IMHO leaks
too much detail about an operation we want to handle automatically.
I'm leaning towards the simpler case of what we had before. With generated
names re-use the reputation node, but if the user creates a new join with
explicit names, just go ahead and create two repartition topics.
WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]