ableegoldman commented on code in PR #18048: URL: https://github.com/apache/kafka/pull/18048#discussion_r1879285311
########## streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java: ########## @@ -1816,6 +1816,102 @@ public void shouldWrapProcessorsForStreamTableJoinWithGracePeriod() { assertThat(counter.numConnectedStateStores(), CoreMatchers.is(2)); } + @Test + public void shouldWrapProcessorsForTableTableInnerJoin() { + final Map<Object, Object> props = dummyStreamsConfigMap(); + props.put(PROCESSOR_WRAPPER_CLASS_CONFIG, RecordingProcessorWrapper.class); + + final WrapperRecorder counter = new WrapperRecorder(); + props.put(PROCESSOR_WRAPPER_COUNTER_CONFIG, counter); + + final StreamsBuilder builder = new StreamsBuilder(new TopologyConfig(new StreamsConfig(props))); + + final KTable<String, String> t1 = builder.table("input1", Consumed.as("input1")); // 1 + final KTable<String, String> t2 = builder.table("input2", Consumed.as("input2")); // 2 + + t1.join(t2, (v1, v2) -> v1 + v2, Named.as("join-processor")) // 3 (this), 4 (other), 5 (merger) Review Comment: ok while investigating [this](https://github.com/apache/kafka/pull/18048/files#r1878665564) my whole understanding of table-table joins unraveled and came back together, which lead me to realize that we're actually not testing the new `#stores` methods being implemented in this PR because we aren't materializing the join. Apparently the two stores here are just the original upstream KTable stores being materialized, and the join is performed via a "value getter" on the upstream KTable state store rather than on a state store belonging to the join itself (it's an optimization) The stores we're returning in the new `#stores` implementations are not what's used to perform the join itself, but essentially contain the output of the join. If you don't materialize the join, this output just gets forwarded but not persisted -- all this is to say, we should pass a `Materialized` into these joins -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org