mjsax commented on code in PR #19580:
URL: https://github.com/apache/kafka/pull/19580#discussion_r2062743713


##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java:
##########
@@ -248,10 +263,54 @@ public void shouldNotJoinOnNullKeyMapperValues() {
 
         pushToStream(4, "XXX", false, false);
         processor.checkAndClearProcessResult(EMPTY);
+
+        assertThat(
+            driver.metrics().get(
+                    new MetricName(
+                        "dropped-records-total",
+                        "stream-task-metrics",
+                        "",
+                        mkMap(
+                            mkEntry("thread-id", "main"),
+                            mkEntry("task-id", "0_0")
+                        )
+                    ))
+                .metricValue(),
+            is(4.0)
+        );
+    }
+
+    @Test
+    public void shouldNotJoinOnNullKeyMapperValuesWithNullKeys() {

Review Comment:
   New test to improve test coverage -- this does not expose any bugs.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java:
##########
@@ -255,11 +274,66 @@ public void shouldNotJoinOnNullKeyMapperValues() {
         // this should not produce any item.
 
         pushToStream(4, "XXX", false, false);
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "XXX0+null", 0),
+            new KeyValueTimestamp<>(1, "XXX1+null", 1),
+            new KeyValueTimestamp<>(2, "XXX2+null", 2),
+            new KeyValueTimestamp<>(3, "XXX3+null", 3)
+        );
+
+        assertThat(
+            driver.metrics().get(
+                    new MetricName(
+                        "dropped-records-total",
+                        "stream-task-metrics",
+                        "",
+                        mkMap(
+                            mkEntry("thread-id", "main"),
+                            mkEntry("task-id", "0_0")
+                        )
+                    ))
+                .metricValue(),
+            is(0.0)
+        );
+    }
+
+    @Test
+    public void shouldJoinOnNullKeyMapperValuesWithNullKeys() {

Review Comment:
   Adding missing test case



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -126,13 +126,13 @@ protected void updateObservedStreamTime(final long 
timestamp) {
 
     private void doJoin(final Record<StreamKey, StreamValue> record) {
         final TableKey mappedKey = keyMapper.apply(record.key(), 
record.value());
-        final TableValue value2 = getValue2(record, mappedKey);
+        final TableValue value2 = getTableValue(record, mappedKey);

Review Comment:
   Side improvement 



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java:
##########
@@ -199,4 +207,65 @@ public void shouldClearTableEntryOnNullValueUpdates() {
                 new KeyValueTimestamp<>(3, "XX3+Y3", 3));
     }
 
+    @Test
+    public void shouldNotDropLeftNullKey() {

Review Comment:
   Adding two missing test cases -- does not expose any bug -- for 
stream-table-join `mappedkey == key` so not dropping on stream-null-key did 
work as expected



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java:
##########
@@ -150,7 +150,7 @@ private boolean maybeDropRecord(final Record<StreamKey, 
StreamValue> record) {
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
         final TableKey mappedKey = keyMapper.apply(record.key(), 
record.value());
-        if (leftJoin && record.key() == null && record.value() != null) {
+        if (leftJoin && mappedKey == null && record.value() != null) {

Review Comment:
   This is the actual fix.



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java:
##########
@@ -236,15 +253,17 @@ public void 
shouldClearGlobalTableEntryOnNullValueUpdates() {
 
         // push all four items to the primary stream. this should produce four 
items.
 
-        pushToStream(4, "XX", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"XX0,FKey0+null", 4),
-                new KeyValueTimestamp<>(1, "XX1,FKey1+null", 5),
-                new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 6),
-                new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7));
+        pushToStream(4, "X", true, false);
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+null", 4),
+            new KeyValueTimestamp<>(1, "X1,FKey1+null", 5),
+            new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 6),
+            new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 7)
+        );
     }
 
     @Test
-    public void shouldNotJoinOnNullKeyMapperValues() {
+    public void shouldJoinOnNullKeyMapperValues() {

Review Comment:
   Adjusting the test, to the correct behavior



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java:
##########
@@ -248,10 +263,54 @@ public void shouldNotJoinOnNullKeyMapperValues() {
 
         pushToStream(4, "XXX", false, false);
         processor.checkAndClearProcessResult(EMPTY);
+
+        assertThat(
+            driver.metrics().get(
+                    new MetricName(
+                        "dropped-records-total",
+                        "stream-task-metrics",
+                        "",
+                        mkMap(
+                            mkEntry("thread-id", "main"),
+                            mkEntry("task-id", "0_0")
+                        )
+                    ))
+                .metricValue(),
+            is(4.0)

Review Comment:
   Add new check



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java:
##########
@@ -169,8 +174,10 @@ public void shouldNotJoinOnGlobalTableUpdates() {
         // push all four items to the primary stream. this should produce two 
items.
 
         pushToStream(4, "X", true, false);
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0,FKey0+Y0", 2),
-                new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 2),
+            new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3)
+        );

Review Comment:
   I had a hard timer reading the test code, so I reformatted it to make it 
easier to read. Similar elsewhere



##########
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java:
##########
@@ -354,19 +357,23 @@ public void shouldNotJoinOnTableUpdates() {
 
         // push all four items to the primary stream. this should produce two 
items.
         pushToStream(4, "X");
-        processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, 
"X0+Y0", 0),
-                new KeyValueTimestamp<>(1, "X1+Y1", 1));
+        processor.checkAndClearProcessResult(
+            new KeyValueTimestamp<>(0, "X0+Y0", 0),
+            new KeyValueTimestamp<>(1, "X1+Y1", 1)
+        );

Review Comment:
   Just some cleanup in this file, to align to the related tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to