mjsax commented on code in PR #13564:
URL: https://github.com/apache/kafka/pull/13564#discussion_r1166102301


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/KeyValueStoreWrapper.java:
##########
@@ -39,6 +39,9 @@
  */
 public class KeyValueStoreWrapper<K, V> implements StateStore {
 
+    public static final long PUT_RETURN_CODE_IS_LATEST

Review Comment:
   So we need this variable?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamReduce.java:
##########
@@ -112,10 +114,13 @@ public void process(final Record<K, V> record) {
                 newTimestamp = Math.max(record.timestamp(), 
oldAggAndTimestamp.timestamp());
             }
 
-            store.put(record.key(), newAgg, newTimestamp);
-            tupleForwarder.maybeForward(
-                record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
-                    .withTimestamp(newTimestamp));
+            final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);
+            // if not put to store, do not forward downstream either
+            if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) {

Review Comment:
   Same as KStreamAggregate.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/Change.java:
##########
@@ -42,12 +48,13 @@ public boolean equals(final Object o) {
             return false;
         }
         final Change<?> change = (Change<?>) o;
-        return Objects.equals(newValue, change.newValue) &&
-                Objects.equals(oldValue, change.oldValue);
+        return Objects.equals(newValue, change.newValue)
+            && Objects.equals(oldValue, change.oldValue)
+            && isLatest == change.isLatest;

Review Comment:
   Tricky one -- if we don't use it, maybe we should just delete it (including 
`hashCode`) and sidestep the question?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimestampedCacheFlushListener.java:
##########
@@ -47,7 +47,8 @@ public void apply(final Record<KOut, 
Change<ValueAndTimestamp<VOut>>> record) {
                     .withValue(
                         new Change<>(
                             getValueOrNull(record.value().newValue),
-                            getValueOrNull(record.value().oldValue)))
+                            getValueOrNull(record.value().oldValue),
+                            record.value().isLatest))

Review Comment:
   Seems we also need to change the caching layer to store the boolean 
`isLatest` flag? Otherwise, it just set it to `false` using the existing 
`ChangeValue` constructor.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java:
##########
@@ -154,27 +154,25 @@ public void process(final Record<K, Change<V>> record) {
             final KeyValue<? extends K1, ? extends V1> oldPair = 
record.value().oldValue == null ? null :
                 mapper.apply(record.key(), record.value().oldValue);
 
+            final boolean isLatest = record.value().isLatest;

Review Comment:
   Should we move this before the `if` and use instead of 
`!record.value().isLatest` ?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java:
##########
@@ -117,13 +120,16 @@ public void init(final ProcessorContext<K, Change<V>> 
context) {
         @Override
         public void process(final Record<K, Change<V>> record) {
             if (queryableName != null) {
-                store.put(record.key(), record.value().newValue, 
record.timestamp());
-                tupleForwarder.maybeForward(record);
+                final long putReturnCode = store.put(record.key(), 
record.value().newValue, record.timestamp());
+                // if not put to store, do not forward downstream either

Review Comment:
   Could this ever happen?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java:
##########
@@ -118,10 +120,13 @@ public void process(final Record<KIn, VIn> record) {
 
             newAgg = aggregator.apply(record.key(), record.value(), oldAgg);
 
-            store.put(record.key(), newAgg, newTimestamp);
-            tupleForwarder.maybeForward(
-                record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
-                    .withTimestamp(newTimestamp));
+            final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);
+            // if not put to store, do not forward downstream either
+            if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) {
+                tupleForwarder.maybeForward(
+                    record.withValue(new Change<>(newAgg, sendOldValues ? 
oldAgg : null, putReturnCode == PUT_RETURN_CODE_IS_LATEST))

Review Comment:
   `putReturnCode == PUT_RETURN_CODE_IS_LATEST` should always be `true` ?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoin.java:
##########
@@ -139,7 +139,7 @@ public void process(final Record<K, Change<V1>> record) {
                 oldValue = joiner.apply(record.value().oldValue, valueRight);
             }
 
-            context().forward(record.withValue(new Change<>(newValue, 
oldValue)).withTimestamp(resultTimestamp));
+            context().forward(record.withValue(new Change<>(newValue, 
oldValue, record.value().isLatest)).withTimestamp(resultTimestamp));

Review Comment:
   Given that we drop out-of-order for versioned stores, can we just pass in 
`true` as "is latest"?
   
   Guess I am also ok to keep the code as-is -- might be more future prove in 
case we support "repairing history" in the future?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java:
##########
@@ -118,10 +120,13 @@ public void process(final Record<KIn, VIn> record) {
 
             newAgg = aggregator.apply(record.key(), record.value(), oldAgg);
 
-            store.put(record.key(), newAgg, newTimestamp);
-            tupleForwarder.maybeForward(
-                record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
-                    .withTimestamp(newTimestamp));
+            final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);

Review Comment:
   Given that we compute `newTimestamp` as max, it should only move forward, 
even for out-of-order records, so I think this processor does not need an 
update?
   
   It actually also implies, that we don't really "fix the history" for this 
case either (what I think is fine), but we can simplify this code (we might 
also want to call this out on the KIP?)



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java:
##########
@@ -116,10 +118,13 @@ public void process(final Record<KIn, Change<VIn>> 
record) {
             }
 
             // update the store with the new value
-            store.put(record.key(), newAgg, newTimestamp);
-            tupleForwarder.maybeForward(
-                record.withValue(new Change<>(newAgg, sendOldValues ? oldAgg : 
null))
-                    .withTimestamp(newTimestamp));
+            final long putReturnCode = store.put(record.key(), newAgg, 
newTimestamp);
+            // if not put to store, do not forward downstream either
+            if (putReturnCode != PUT_RETURN_CODE_NOT_PUT) {

Review Comment:
   Cf my comments above -- not 100% sure if keeping the code is better or 
worse; tend to prefer to remove it and just add a comment about it?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableTransformValues.java:
##########
@@ -118,11 +120,14 @@ public void process(final Record<K, Change<V>> record) {
 
             if (queryableName == null) {

Review Comment:
   Side cleanup: all other processors use `queryableName != null` -- not sure 
why it's reversed here. Kinda annoying.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to