ableegoldman commented on code in PR #18048:
URL: https://github.com/apache/kafka/pull/18048#discussion_r1879285311


##########
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java:
##########
@@ -1816,6 +1816,102 @@ public void 
shouldWrapProcessorsForStreamTableJoinWithGracePeriod() {
         assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2));
     }
 
+    @Test
+    public void shouldWrapProcessorsForTableTableInnerJoin() {
+        final Map<Object, Object> props = dummyStreamsConfigMap();
+        props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, 
RecordingProcessorWrapper.class);
+
+        final WrapperRecorder counter = new WrapperRecorder();
+        props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter);
+
+        final StreamsBuilder builder = new StreamsBuilder(new 
TopologyConfig(new StreamsConfig(props)));
+
+        final KTable<String, String> t1 = builder.table("input1", 
Consumed.as("input1")); // 1
+        final KTable<String, String> t2 = builder.table("input2", 
Consumed.as("input2")); // 2
+
+        t1.join(t2, (v1, v2) -> v1 + v2, Named.as("join-processor")) // 3 
(this), 4 (other), 5 (merger)

Review Comment:
   ok while investigating 
[this](https://github.com/apache/kafka/pull/18048/files#r1878665564) my whole 
understanding of table-table joins unraveled and came back together, which lead 
me to realize that we're actually not testing the new `#stores` methods being 
implemented in this PR because we aren't materializing the join. Apparently the 
two stores here are just the original upstream KTable stores being 
materialized, and the join is performed via a "value getter" on the upstream 
KTable state store rather than on a state store belonging to the join itself 
(it's an optimization)
   
   The stores we're returning in the new `#stores` implementations are not 
what's used to perform the join itself, but essentially contain the output of 
the join. If you don't materialize the join, this output just gets forwarded 
but not persisted -- all this is to say, we should pass a `Materialized` into 
these joins



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to