guozhangwang commented on a change in pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#discussion_r626253159
##########
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##########
@@ -863,6 +920,60 @@ public void
streamStreamLeftJoinTopologyWithCustomStoresNames() {
describe.toString());
}
+ @Test
+ public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<Integer, String> stream1;
+ final KStream<Integer, String> stream2;
+
+ stream1 = builder.stream("input-topic1");
+ stream2 = builder.stream("input-topic2");
+
+ final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+ final WindowBytesStoreSupplier thisStoreSupplier =
Stores.inMemoryWindowStore("in-memory-join-store",
+ Duration.ofMillis(joinWindows.size() +
joinWindows.gracePeriodMs()),
+ Duration.ofMillis(joinWindows.size()), true);
+
+ final WindowBytesStoreSupplier otherStoreSupplier =
Stores.inMemoryWindowStore("in-memory-join-store-other",
+ Duration.ofMillis(joinWindows.size() +
joinWindows.gracePeriodMs()),
+ Duration.ofMillis(joinWindows.size()), true);
+
+ stream1.leftJoin(
+ stream2,
+ MockValueJoiner.TOSTRING_JOINER,
+ joinWindows,
+ StreamJoined.with(Serdes.Integer(), Serdes.String(),
Serdes.String())
+ .withThisStoreSupplier(thisStoreSupplier)
+ .withOtherStoreSupplier(otherStoreSupplier));
+
+ final TopologyDescription describe = builder.build().describe();
+
+ assertEquals(
+ "Topologies:\n" +
+ " Sub-topology: 0\n" +
+ " Source: KSTREAM-SOURCE-0000000000 (topics:
[input-topic1])\n" +
+ " --> KSTREAM-WINDOWED-0000000002\n" +
+ " Source: KSTREAM-SOURCE-0000000001 (topics:
[input-topic2])\n" +
+ " --> KSTREAM-WINDOWED-0000000003\n" +
+ " Processor: KSTREAM-WINDOWED-0000000002 (stores:
[in-memory-join-store])\n" +
+ " --> KSTREAM-JOINTHIS-0000000004\n" +
+ " <-- KSTREAM-SOURCE-0000000000\n" +
+ " Processor: KSTREAM-WINDOWED-0000000003 (stores:
[in-memory-join-store-other])\n" +
+ " --> KSTREAM-OUTEROTHER-0000000005\n" +
+ " <-- KSTREAM-SOURCE-0000000001\n" +
+ " Processor: KSTREAM-JOINTHIS-0000000004 (stores:
[in-memory-join-store-other, KSTREAM-OUTERSHARED-0000000004-memory-store])\n" +
Review comment:
Maybe we can also make it a bit simpler to just rely on the left side:
if the supplier is given on the left side (which would always provide the
name), then we name the shared as `thisStoreSupplier.name() +
"outerJoinSuffix"`; if it is not, then we look into
`streamJoinedInternal.storeName()`, and the last resort as
`outerJoinStoreGeneratedName`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]