vcrfxia commented on code in PR #13942:
URL: https://github.com/apache/kafka/pull/13942#discussion_r1251360190


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##########
@@ -164,6 +165,53 @@ public void shouldFailIfTableIsNotVersioned() {
         );
     }
 
+    @Test
+    public void shouldFailIfTableIsNotVersionedButMaterializationIsInherited() 
{
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.inMemoryKeyValueStore("tableB")));
+        final KTable<String, String> tableB = source.filter((k, v) -> true);
+        streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one");
+
+        final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, builder::build);
+        assertThat(
+            exception.getMessage(),
+            is("KTable must be versioned to use a grace period in a stream 
table join.")
+        );
+    }
+
+    @Test
+    public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited() 
{

Review Comment:
   I believe those already exist in this file -- there's 
`shouldFailIfTableIsNotVersioned()` to check the negative case and 
`shouldDelayJoinByGracePeriod()` for the positive case -- so I think we should 
be good?
   
   A minor suggestion: a bunch of these new tests look similar even though they 
test different setups and to an unfamiliar reader it's difficult to pick out 
which lines of the test are important. You could add a comment to the 
`filter()` line in the topology to call out that the materialization as a 
versioned store is inherited through this node; that's the crux of this test 
that distinguishes it from the other test which already exists.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##########
@@ -164,6 +165,53 @@ public void shouldFailIfTableIsNotVersioned() {
         );
     }
 
+    @Test
+    public void shouldFailIfTableIsNotVersionedButMaterializationIsInherited() 
{
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.inMemoryKeyValueStore("tableB")));
+        final KTable<String, String> tableB = source.filter((k, v) -> true);
+        streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one");
+
+        final IllegalArgumentException exception = 
assertThrows(IllegalArgumentException.class, builder::build);
+        assertThat(
+            exception.getMessage(),
+            is("KTable must be versioned to use a grace period in a stream 
table join.")
+        );
+    }
+
+    @Test
+    public void shouldNotFailIfTableIsVersionedButMaterializationIsInherited() 
{
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties props = new Properties();
+        props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, 
StreamsConfig.NO_OPTIMIZATION);
+        final KStream<String, String> streamA = builder.stream("topic", 
Consumed.with(Serdes.String(), Serdes.String()));
+        final KTable<String, String> source = builder.table("topic2", 
Consumed.with(Serdes.String(), Serdes.String()),
+            Materialized.as(Stores.persistentVersionedKeyValueStore("tableB", 
Duration.ofMinutes(5))));
+        final KTable<String, String> tableB = source.filter((k, v) -> true);
+        streamA.join(tableB, (value1, value2) -> value1 + value2, 
Joined.with(Serdes.String(), Serdes.String(), Serdes.String(), "first-join", 
Duration.ofMillis(6))).to("out-one");
+
+        //should not throw an error
+        builder.build();
+    }
+
+    @Test
+    public void shouldFailIfGracePeriodIsLongerThanHistoryRetention() {

Review Comment:
   Can you also add the equivalent of this test, for the case where 
materialization as a versioned store is inherited?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to