mjsax commented on code in PR #13564: URL: https://github.com/apache/kafka/pull/13564#discussion_r1166102301
########## streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java: ########## @@ -39,6 +39,9 @@ */ public class KeyValueStoreWrapper<K, V> implements StateStore { + public static final long PUT_RETURN_CODE_IS_LATEST Review Comment: So we need this variable? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java: ########## @@ -112,10 +114,13 @@ public void process(final Record<K, V> record) { newTimestamp = Math.max(record.timestamp(), oldAggAndTimestamp.timestamp()); } - store.put(record.key(), newAgg, newTimestamp); - tupleForwarder.maybeForward( - record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null)) - .withTimestamp(newTimestamp)); + final long putReturnCode = store.put(record.key(), newAgg, newTimestamp); + // if not put to store, do not forward downstream either + if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) { Review Comment: Same as KStreamAggregate. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java: ########## @@ -42,12 +48,13 @@ public boolean equals(final Object o) { return false; } final Change<?> change = (Change<?>) o; - return Objects.equals(newValue, change.newValue) && - Objects.equals(oldValue, change.oldValue); + return Objects.equals(newValue, change.newValue) + && Objects.equals(oldValue, change.oldValue) + && isLatest == change.isLatest; Review Comment: Tricky one -- if we don't use it, maybe we should just delete it (including `hashCode`) and sidestep the question? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java: ########## @@ -47,7 +47,8 @@ public void apply(final Record<KOut, Change<ValueAndTimestamp<VOut>>> record) { .withValue( new Change<>( getValueOrNull(record.value().newValue), - getValueOrNull(record.value().oldValue))) + getValueOrNull(record.value().oldValue), + record.value().isLatest)) Review Comment: Seems we also need to change the caching layer to store the boolean `isLatest` flag? Otherwise, it just set it to `false` using the existing `ChangeValue` constructor. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java: ########## @@ -154,27 +154,25 @@ public void process(final Record<K, Change<V>> record) { final KeyValue<? extends K1, ? extends V1> oldPair = record.value().oldValue == null ? null : mapper.apply(record.key(), record.value().oldValue); + final boolean isLatest = record.value().isLatest; Review Comment: Should we move this before the `if` and use instead of `!record.value().isLatest` ? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java: ########## @@ -117,13 +120,16 @@ public void init(final ProcessorContext<K, Change<V>> context) { @Override public void process(final Record<K, Change<V>> record) { if (queryableName != null) { - store.put(record.key(), record.value().newValue, record.timestamp()); - tupleForwarder.maybeForward(record); + final long putReturnCode = store.put(record.key(), record.value().newValue, record.timestamp()); + // if not put to store, do not forward downstream either Review Comment: Could this ever happen? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java: ########## @@ -118,10 +120,13 @@ public void process(final Record<KIn, VIn> record) { newAgg = aggregator.apply(record.key(), record.value(), oldAgg); - store.put(record.key(), newAgg, newTimestamp); - tupleForwarder.maybeForward( - record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null)) - .withTimestamp(newTimestamp)); + final long putReturnCode = store.put(record.key(), newAgg, newTimestamp); + // if not put to store, do not forward downstream either + if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) { + tupleForwarder.maybeForward( + record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null, putReturnCode == PUT_RETURN_CODE_IS_LATEST)) Review Comment: `putReturnCode == PUT_RETURN_CODE_IS_LATEST` should always be `true` ? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java: ########## @@ -139,7 +139,7 @@ public void process(final Record<K, Change<V1>> record) { oldValue = joiner.apply(record.value().oldValue, valueRight); } - context().forward(record.withValue(new Change<>(newValue, oldValue)).withTimestamp(resultTimestamp)); + context().forward(record.withValue(new Change<>(newValue, oldValue, record.value().isLatest)).withTimestamp(resultTimestamp)); Review Comment: Given that we drop out-of-order for versioned stores, can we just pass in `true` as "is latest"? Guess I am also ok to keep the code as-is -- might be more future prove in case we support "repairing history" in the future? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java: ########## @@ -118,10 +120,13 @@ public void process(final Record<KIn, VIn> record) { newAgg = aggregator.apply(record.key(), record.value(), oldAgg); - store.put(record.key(), newAgg, newTimestamp); - tupleForwarder.maybeForward( - record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null)) - .withTimestamp(newTimestamp)); + final long putReturnCode = store.put(record.key(), newAgg, newTimestamp); Review Comment: Given that we compute `newTimestamp` as max, it should only move forward, even for out-of-order records, so I think this processor does not need an update? It actually also implies, that we don't really "fix the history" for this case either (what I think is fine), but we can simplify this code (we might also want to call this out on the KIP?) ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java: ########## @@ -116,10 +118,13 @@ public void process(final Record<KIn, Change<VIn>> record) { } // update the store with the new value - store.put(record.key(), newAgg, newTimestamp); - tupleForwarder.maybeForward( - record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : null)) - .withTimestamp(newTimestamp)); + final long putReturnCode = store.put(record.key(), newAgg, newTimestamp); + // if not put to store, do not forward downstream either + if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) { Review Comment: Cf my comments above -- not 100% sure if keeping the code is better or worse; tend to prefer to remove it and just add a comment about it? ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java: ########## @@ -118,11 +120,14 @@ public void process(final Record<K, Change<V>> record) { if (queryableName == null) { Review Comment: Side cleanup: all other processors use `queryableName != null` -- not sure why it's reversed here. Kinda annoying. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org