mjsax commented on code in PR #19580: URL: https://github.com/apache/kafka/pull/19580#discussion_r2062743713
########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java: ########## @@ -248,10 +263,54 @@ public void shouldNotJoinOnNullKeyMapperValues() { pushToStream(4, "XXX", false, false); processor.checkAndClearProcessResult(EMPTY); + + assertThat( + driver.metrics().get( + new MetricName( + "dropped-records-total", + "stream-task-metrics", + "", + mkMap( + mkEntry("thread-id", "main"), + mkEntry("task-id", "0_0") + ) + )) + .metricValue(), + is(4.0) + ); + } + + @Test + public void shouldNotJoinOnNullKeyMapperValuesWithNullKeys() { Review Comment: New test to improve test coverage -- this does not expose any bugs. ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java: ########## @@ -255,11 +274,66 @@ public void shouldNotJoinOnNullKeyMapperValues() { // this should not produce any item. pushToStream(4, "XXX", false, false); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "XXX0+null", 0), + new KeyValueTimestamp<>(1, "XXX1+null", 1), + new KeyValueTimestamp<>(2, "XXX2+null", 2), + new KeyValueTimestamp<>(3, "XXX3+null", 3) + ); + + assertThat( + driver.metrics().get( + new MetricName( + "dropped-records-total", + "stream-task-metrics", + "", + mkMap( + mkEntry("thread-id", "main"), + mkEntry("task-id", "0_0") + ) + )) + .metricValue(), + is(0.0) + ); + } + + @Test + public void shouldJoinOnNullKeyMapperValuesWithNullKeys() { Review Comment: Adding missing test case ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java: ########## @@ -126,13 +126,13 @@ protected void updateObservedStreamTime(final long timestamp) { private void doJoin(final Record<StreamKey, StreamValue> record) { final TableKey mappedKey = keyMapper.apply(record.key(), record.value()); - final TableValue value2 = getValue2(record, mappedKey); + final TableValue value2 = getTableValue(record, mappedKey); Review Comment: Side improvement ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java: ########## @@ -199,4 +207,65 @@ public void shouldClearTableEntryOnNullValueUpdates() { new KeyValueTimestamp<>(3, "XX3+Y3", 3)); } + @Test + public void shouldNotDropLeftNullKey() { Review Comment: Adding two missing test cases -- does not expose any bug -- for stream-table-join `mappedkey == key` so not dropping on stream-null-key did work as expected ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java: ########## @@ -150,7 +150,7 @@ private boolean maybeDropRecord(final Record<StreamKey, StreamValue> record) { // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored final TableKey mappedKey = keyMapper.apply(record.key(), record.value()); - if (leftJoin && record.key() == null && record.value() != null) { + if (leftJoin && mappedKey == null && record.value() != null) { Review Comment: This is the actual fix. ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java: ########## @@ -236,15 +253,17 @@ public void shouldClearGlobalTableEntryOnNullValueUpdates() { // push all four items to the primary stream. this should produce four items. - pushToStream(4, "XX", true, false); - processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "XX0,FKey0+null", 4), - new KeyValueTimestamp<>(1, "XX1,FKey1+null", 5), - new KeyValueTimestamp<>(2, "XX2,FKey2+Y2", 6), - new KeyValueTimestamp<>(3, "XX3,FKey3+Y3", 7)); + pushToStream(4, "X", true, false); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "X0,FKey0+null", 4), + new KeyValueTimestamp<>(1, "X1,FKey1+null", 5), + new KeyValueTimestamp<>(2, "X2,FKey2+Y2", 6), + new KeyValueTimestamp<>(3, "X3,FKey3+Y3", 7) + ); } @Test - public void shouldNotJoinOnNullKeyMapperValues() { + public void shouldJoinOnNullKeyMapperValues() { Review Comment: Adjusting the test, to the correct behavior ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java: ########## @@ -248,10 +263,54 @@ public void shouldNotJoinOnNullKeyMapperValues() { pushToStream(4, "XXX", false, false); processor.checkAndClearProcessResult(EMPTY); + + assertThat( + driver.metrics().get( + new MetricName( + "dropped-records-total", + "stream-task-metrics", + "", + mkMap( + mkEntry("thread-id", "main"), + mkEntry("task-id", "0_0") + ) + )) + .metricValue(), + is(4.0) Review Comment: Add new check ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java: ########## @@ -169,8 +174,10 @@ public void shouldNotJoinOnGlobalTableUpdates() { // push all four items to the primary stream. this should produce two items. pushToStream(4, "X", true, false); - processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 2), - new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3)); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "X0,FKey0+Y0", 2), + new KeyValueTimestamp<>(1, "X1,FKey1+Y1", 3) + ); Review Comment: I had a hard timer reading the test code, so I reformatted it to make it easier to read. Similar elsewhere ########## streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java: ########## @@ -354,19 +357,23 @@ public void shouldNotJoinOnTableUpdates() { // push all four items to the primary stream. this should produce two items. pushToStream(4, "X"); - processor.checkAndClearProcessResult(new KeyValueTimestamp<>(0, "X0+Y0", 0), - new KeyValueTimestamp<>(1, "X1+Y1", 1)); + processor.checkAndClearProcessResult( + new KeyValueTimestamp<>(0, "X0+Y0", 0), + new KeyValueTimestamp<>(1, "X1+Y1", 1) + ); Review Comment: Just some cleanup in this file, to align to the related tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org