vcrfxia commented on code in PR #13942: URL: https://github.com/apache/kafka/pull/13942#discussion_r1251360190
########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ########## @@ -164,6 +165,53 @@ public void shouldFailIfTableIsNotVersioned() { ); } + @Test + public void shouldFailIfTableIsNotVersionedButMaterializationIsInherited() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> source = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.inMemoryKeyValueStore("tableB"))); + final KTable<String, String> tableB = source.filter((k, v) -> true); + streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"); + + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); + assertThat( + exception.getMessage(), + is("KTable must be versioned to use a grace period in a stream table join.") + ); + } + + @Test + public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited() { Review Comment: I believe those already exist in this file -- there's `shouldFailIfTableIsNotVersioned()` to check the negative case and `shouldDelayJoinByGracePeriod()` for the positive case -- so I think we should be good? A minor suggestion: a bunch of these new tests look similar even though they test different setups and to an unfamiliar reader it's difficult to pick out which lines of the test are important. You could add a comment to the `filter()` line in the topology to call out that the materialization as a versioned store is inherited through this node; that's the crux of this test that distinguishes it from the other test which already exists. ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ########## @@ -164,6 +165,53 @@ public void shouldFailIfTableIsNotVersioned() { ); } + @Test + public void shouldFailIfTableIsNotVersionedButMaterializationIsInherited() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> source = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.inMemoryKeyValueStore("tableB"))); + final KTable<String, String> tableB = source.filter((k, v) -> true); + streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"); + + final IllegalArgumentException exception = assertThrows(IllegalArgumentException.class, builder::build); + assertThat( + exception.getMessage(), + is("KTable must be versioned to use a grace period in a stream table join.") + ); + } + + @Test + public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited() { + final StreamsBuilder builder = new StreamsBuilder(); + final Properties props = new Properties(); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.NO_OPTIMIZATION); + final KStream<String, String> streamA = builder.stream("topic", Consumed.with(Serdes.String(), Serdes.String())); + final KTable<String, String> source = builder.table("topic2", Consumed.with(Serdes.String(), Serdes.String()), + Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", Duration.ofMinutes(5)))); + final KTable<String, String> tableB = source.filter((k, v) -> true); + streamA.join(tableB, (value1, value2) -> value1 + value2, Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", Duration.ofMillis(6))).to("out-one"); + + //should not throw an error + builder.build(); + } + + @Test + public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() { Review Comment: Can you also add the equivalent of this test, for the case where materialization as a versioned store is inherited? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org