JoelWee commented on a change in pull request #9186:
URL: https://github.com/apache/kafka/pull/9186#discussion_r475198868



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinProcessor.java
##########
@@ -58,29 +60,46 @@ public void init(final ProcessorContext context) {
 
     @Override
     public void process(final K1 key, final V1 value) {
-        // we do join iff keys are equal, thus, if key is null we cannot join 
and just ignore the record
-        // If {@code keyMapper} returns {@code null} it implies there is no 
match,
-        // so ignore unless it is a left join
+        // we do join iff the joining keys are equal, thus, if the mappedKey 
is null we cannot join
+        // and just ignore the record.
         //
         // we also ignore the record if value is null, because in a key-value 
data model a null-value indicates
         // an empty message (ie, there is nothing to be joined) -- this 
contrast SQL NULL semantics
         // furthermore, on left/outer joins 'null' in ValueJoiner#apply() 
indicates a missing record --
         // thus, to be consistent and to avoid ambiguous null semantics, null 
values are ignored
-        if (key == null || value == null) {
+        final Optional<K2> maybeMappedKey = maybeExtractMappedKey(key, value);
+        if (!maybeMappedKey.isPresent()) {
             LOG.warn(
                 "Skipping record due to null key or value. key=[{}] value=[{}] 
topic=[{}] partition=[{}] offset=[{}]",
                 key, value, context().topic(), context().partition(), 
context().offset()
             );
             droppedRecordsSensor.record();
         } else {
-            final K2 mappedKey = keyMapper.apply(key, value);
-            final V2 value2 = mappedKey == null ? null : 
getValueOrNull(valueGetter.get(mappedKey));
+            final K2 mappedKey = maybeMappedKey.get();
+            final V2 value2 = getValueOrNull(valueGetter.get(mappedKey));
             if (leftJoin || value2 != null) {
                 context().forward(key, joiner.apply(value, value2));
             }
         }
     }
 
+    private Optional<K2> maybeExtractMappedKey(final K1 key, final V1 value) {
+        if (value == null) {
+            return Optional.empty();
+        }
+
+        // we allow the case where the key is null but mappedKey is not null 
and thus
+        // we need to guard against nullPointerExceptions. This may happen for 
GlobalKTables.
+        // For KTables, the keyMapper simply returns the key, so this will 
never happen
+        Optional<K2> maybeMappedKey;
+        try {
+            maybeMappedKey = Optional.ofNullable(keyMapper.apply(key, value));
+        } catch (final NullPointerException e) {

Review comment:
       Yea I think what you say makes sense. When I wrote this I was thinking 
about backwards compatibility: previously all null keys would never be passed 
to keyMappers but now they are. It feels like existing users might be surprised 
if their existing keyMappers start throwing NPEs. But it would also be 
suprising if the NPE a user throws get swallowed as you mentioned. 
   
   I agree that it's better not to catch the NPE and it possibly looks like 
users should have always been handling the case where the key is null anyway so 
it should be fine?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to