big-andy-coates commented on a change in pull request #9156:
URL: https://github.com/apache/kafka/pull/9156#discussion_r487049443



##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) 
processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for 
the `source` to be aware of this.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a 
consensus we should flip, then happy enough to do it.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined 
above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined 
above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment 
above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).  So if we don't call 
`enableSendingOldValues` on it, it won't swallow the output when things haven't 
changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used 
to both signify that the upstream is sending old values, and to control if the 
filter should forward old values. I'll split these into two variables.

##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) 
processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for 
the `source` to be aware of this.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a 
consensus we should flip, then happy enough to do it.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined 
above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined 
above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment 
above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).  So if we don't call 
`enableSendingOldValues` on it, it won't swallow the output when things haven't 
changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used 
to both signify that the upstream is sending old values, and to control if the 
filter should forward old values. I'll split these into two variables.

##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) 
processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for 
the `source` to be aware of this.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a 
consensus we should flip, then happy enough to do it.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined 
above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined 
above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment 
above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).  So if we don't call 
`enableSendingOldValues` on it, it won't swallow the output when things haven't 
changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used 
to both signify that the upstream is sending old values, and to control if the 
filter should forward old values. I'll split these into two variables.

##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) 
processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for 
the `source` to be aware of this.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a 
consensus we should flip, then happy enough to do it.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined 
above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined 
above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment 
above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).  So if we don't call 
`enableSendingOldValues` on it, it won't swallow the output when things haven't 
changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used 
to both signify that the upstream is sending old values, and to control if the 
filter should forward old values. I'll split these into two variables.

##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) 
processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for 
the `source` to be aware of this.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a 
consensus we should flip, then happy enough to do it.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined 
above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined 
above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment 
above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).  So if we don't call 
`enableSendingOldValues` on it, it won't swallow the output when things haven't 
changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used 
to both signify that the upstream is sending old values, and to control if the 
filter should forward old values. I'll split these into two variables.

##########
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##########
@@ -17,11 +17,12 @@
 
 package org.apache.kafka.server.authorizer;
 
-import java.net.InetAddress;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
+import java.net.InetAddress;

Review comment:
       Done. (Must sort out Kafka code style settings at some point)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -832,18 +834,25 @@ public String queryableStoreName() {
     }
 
     @SuppressWarnings("unchecked")
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
         if (!sendOldValues) {
             if (processorSupplier instanceof KTableSource) {
                 final KTableSource<K, ?> source = (KTableSource<K, V>) 
processorSupplier;
+                if (onlyIfMaterialized && !source.materialized()) {
+                    return false;
+                }
                 source.enableSendingOldValues();

Review comment:
       Nope.  The `if` above is handling the `boolean`, there is no need for 
the `source` to be aware of this.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, as outlined above, is to stick with the pattern that's here.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableAggregate.java
##########
@@ -47,8 +47,10 @@
     }
 
     @Override
-    public void enableSendingOldValues() {
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {

Review comment:
       I did initially go with this. However, it seems really unintuitive. 
   
   Given a param such as `forceMaterialization` then it's clear that 
`enableSendingOldValues(true)` will force materialization, but what does 
`enabledSendingOldValues(false)` mean? OK, we're not forcing materialization, 
but actually the method won't enable sending old values if the param is `false` 
and its not already materialized.
   
   Even if we go with `maybeEnableSendingOldValues(enforceMaterialization)` I 
still would argue the semantics are not intuitive from the names alone.  
(Though of course JavaDocs would help).
   
   However, given `enableSendingOldValues(onlyIfMaterialized)` is, IMHO, very 
intuitive. `enableSendingOldValues(true)` will only enable sending old values 
if already materialized, whereas `enableSendingOldValues(false)` will always 
enable sending old values, materializing as necessary, must like the previous 
`enableSendingOldValues()` method did.
   
   Likewise, if we stick with `enableSendingOldValues(onlyIfMaterialized)` then 
I don't think we need to include a `maybe` or `IfPossible` in the name as this 
is implied already by the `onlyIf`.
   
   That said, this is not code I have to look at every day. If there's a 
consensus we should flip, then happy enough to do it.
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined 
above](|https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       My view, [as outlined 
above](https://github.com/apache/kafka/pull/9156#discussion_r487057186), is to 
stick with the pattern that's here.
   
   

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableAbstractJoin.java
##########
@@ -39,9 +39,15 @@
     }
 
     @Override
-    public final void enableSendingOldValues() {
-        table1.enableSendingOldValues();
-        table2.enableSendingOldValues();
+    public final boolean enableSendingOldValues(final boolean 
onlyIfMaterialized) {
+        if (!table1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!table2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinMerger.java
##########
@@ -77,10 +77,16 @@ public String getQueryableName() {
     }
 
     @Override
-    public void enableSendingOldValues() {
-        parent1.enableSendingOldValues();
-        parent2.enableSendingOldValues();
+    public boolean enableSendingOldValues(final boolean onlyIfMaterialized) {
+        if (!parent1.enableSendingOldValues(onlyIfMaterialized)) {
+            throw new IllegalStateException("Table-table joins should always 
be materialized");
+        }
+
+        if (!parent2.enableSendingOldValues(onlyIfMaterialized)) {

Review comment:
       as above. :p

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See https://github.com/apache/kafka/pull/9156#discussion_r487057805

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
##########
@@ -295,12 +295,39 @@ public void shouldNotSendOldValuesOnMaterialization() {
         doTestNotSendingOldValue(builder, table1, table2, topic1);
     }
 
+    @Test
+    public void 
shouldNotSendOldValuesWithoutMaterializationIfOptionallyRequested() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String topic1 = "topic1";
+
+        final KTableImpl<String, Integer, Integer> table1 =
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, 
consumed);
+        final KTableImpl<String, Integer, Integer> table2 = 
(KTableImpl<String, Integer, Integer>) table1.filter(predicate);
+
+        table2.enableSendingOldValues(true);

Review comment:
       See [my comment 
above[(https://github.com/apache/kafka/pull/9156#discussion_r487057805)

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
##########
@@ -182,6 +182,8 @@ public String queryableStoreName() {
         final KTableProcessorSupplier<K, V, V> processorSupplier =
             new KTableFilter<>(this, predicate, filterNot, queryableStoreName);
 
+        processorSupplier.enableSendingOldValues(true);

Review comment:
       `KTableFilter` has a boolean flag internally that also needs to be set 
if it is to correctly handle old values, (existing code).  So if we don't call 
`enableSendingOldValues` on it, it won't swallow the output when things haven't 
changed.
   
   to put it another way, the `sendOldValues` field of `KTableFilter` is used 
to both signify that the upstream is sending old values, and to control if the 
filter should forward old values. I'll split these into two variables.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to