spena commented on a change in pull request #10613: URL: https://github.com/apache/kafka/pull/10613#discussion_r625833095
########## File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java ########## @@ -863,6 +920,60 @@ public void streamStreamLeftJoinTopologyWithCustomStoresNames() { describe.toString()); } + @Test + public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() { + final StreamsBuilder builder = new StreamsBuilder(); + final KStream<Integer, String> stream1; + final KStream<Integer, String> stream2; + + stream1 = builder.stream("input-topic1"); + stream2 = builder.stream("input-topic2"); + + final JoinWindows joinWindows = JoinWindows.of(ofMillis(100)); + + final WindowBytesStoreSupplier thisStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store", + Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), + Duration.ofMillis(joinWindows.size()), true); + + final WindowBytesStoreSupplier otherStoreSupplier = Stores.inMemoryWindowStore("in-memory-join-store-other", + Duration.ofMillis(joinWindows.size() + joinWindows.gracePeriodMs()), + Duration.ofMillis(joinWindows.size()), true); + + stream1.leftJoin( + stream2, + MockValueJoiner.TOSTRING_JOINER, + joinWindows, + StreamJoined.with(Serdes.Integer(), Serdes.String(), Serdes.String()) + .withThisStoreSupplier(thisStoreSupplier) + .withOtherStoreSupplier(otherStoreSupplier)); + + final TopologyDescription describe = builder.build().describe(); + + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic1])\n" + + " --> KSTREAM-WINDOWED-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000001 (topics: [input-topic2])\n" + + " --> KSTREAM-WINDOWED-0000000003\n" + + " Processor: KSTREAM-WINDOWED-0000000002 (stores: [in-memory-join-store])\n" + + " --> KSTREAM-JOINTHIS-0000000004\n" + + " <-- KSTREAM-SOURCE-0000000000\n" + + " Processor: KSTREAM-WINDOWED-0000000003 (stores: [in-memory-join-store-other])\n" + + " --> KSTREAM-OUTEROTHER-0000000005\n" + + " <-- KSTREAM-SOURCE-0000000001\n" + + " Processor: KSTREAM-JOINTHIS-0000000004 (stores: [in-memory-join-store-other, KSTREAM-OUTERSHARED-0000000004-memory-store])\n" + Review comment: I just moved the name builder inside the `sharedOuterJoinWindowStoreBuilder`. I did not create the name based on the store supplier, though. I would have to append a suffix and I wasn't sure if it would be too long. For instance, if the store supplier is named `in-memory-join-store`, then I would name the shared outer store as `in-memory-join-store-left-shared-join-store`. Also, what name should I use for the full outer? The `this` supplier or the `other` supplier? Say suppliers are named `mem-left-store` and `mem-right-store`. Would I end up with `mem-left-store-outer-shared-join-store` or `mem-right-store-outer-shared-join-store`? What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org