[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627891#comment-16627891
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---------------------------------------

guozhangwang closed pull request #5687: KAFKA-7223: add tests in preparation 
for suppression
URL: https://github.com/apache/kafka/pull/5687
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
index 7488ef6ff37..49fe96ba20c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java
@@ -155,6 +155,6 @@ static StrictBufferConfig unbounded() {
      * @return a suppression configuration
      */
     static <K> Suppressed<K> untilTimeLimit(final Duration 
timeToWaitForMoreEvents, final BufferConfig bufferConfig) {
-        return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, 
null);
+        return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, 
null, false);
     }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
new file mode 100644
index 00000000000..8a2e619b7b5
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.ByteBufferDeserializer;
+import org.apache.kafka.common.serialization.ByteBufferSerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+public class FullChangeSerde<T> implements Serde<Change<T>> {
+    private final Serde<T> inner;
+
+    public FullChangeSerde(final Serde<T> inner) {
+        this.inner = requireNonNull(inner);
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        inner.configure(configs, isKey);
+    }
+
+    @Override
+    public void close() {
+        inner.close();
+    }
+
+    @Override
+    public Serializer<Change<T>> serializer() {
+        final Serializer<T> innerSerializer = inner.serializer();
+        final ByteBufferSerializer byteBufferSerializer = new 
ByteBufferSerializer();
+
+        return new Serializer<Change<T>>() {
+            @Override
+            public void configure(final Map<String, ?> configs, final boolean 
isKey) {
+                innerSerializer.configure(configs, isKey);
+            }
+
+            @Override
+            public byte[] serialize(final String topic, final Change<T> data) {
+                if (data == null) {
+                    return null;
+                }
+                final byte[] oldBytes = data.oldValue == null ? null : 
innerSerializer.serialize(topic, data.oldValue);
+                final int oldSize = oldBytes == null ? -1 : oldBytes.length;
+                final byte[] newBytes = data.newValue == null ? null : 
innerSerializer.serialize(topic, data.newValue);
+                final int newSize = newBytes == null ? -1 : newBytes.length;
+
+                final ByteBuffer buffer = ByteBuffer.allocate(
+                    4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 
: newSize)
+                );
+                buffer.putInt(oldSize);
+                if (oldBytes != null) {
+                    buffer.put(oldBytes);
+                }
+                buffer.putInt(newSize);
+                if (newBytes != null) {
+                    buffer.put(newBytes);
+                }
+                return byteBufferSerializer.serialize(null, buffer);
+            }
+
+            @Override
+            public void close() {
+                innerSerializer.close();
+            }
+        };
+    }
+
+    @Override
+    public Deserializer<Change<T>> deserializer() {
+        final Deserializer<T> innerDeserializer = inner.deserializer();
+        final ByteBufferDeserializer byteBufferDeserializer = new 
ByteBufferDeserializer();
+        return new Deserializer<Change<T>>() {
+            @Override
+            public void configure(final Map<String, ?> configs, final boolean 
isKey) {
+                innerDeserializer.configure(configs, isKey);
+            }
+
+            @Override
+            public Change<T> deserialize(final String topic, final byte[] 
data) {
+                if (data == null) {
+                    return null;
+                }
+                final ByteBuffer buffer = 
byteBufferDeserializer.deserialize(null, data);
+
+                final int oldSize = buffer.getInt();
+                final byte[] oldBytes = oldSize == -1 ? null : new 
byte[oldSize];
+                if (oldBytes != null) {
+                    buffer.get(oldBytes);
+                }
+                final T oldValue = oldBytes == null ? null : 
innerDeserializer.deserialize(topic, oldBytes);
+
+                final int newSize = buffer.getInt();
+                final byte[] newBytes = newSize == -1 ? null : new 
byte[newSize];
+                if (newBytes != null) {
+                    buffer.get(newBytes);
+                }
+                final T newValue = newBytes == null ? null : 
innerDeserializer.deserialize(topic, newBytes);
+                return new Change<>(newValue, oldValue);
+            }
+
+            @Override
+            public void close() {
+                innerDeserializer.close();
+            }
+        };
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 2330fad1b16..ea5c3049e7f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -364,8 +364,13 @@ public String queryableStoreName() {
     public KTable<K, V> suppress(final Suppressed<K> suppressed) {
         final String name = builder.newProcessorName(SUPPRESS_NAME);
 
+        // TODO: follow-up pr to forward the k/v serdes
         final ProcessorSupplier<K, Change<V>> suppressionSupplier =
-            () -> new KTableSuppressProcessor<>(buildSuppress(suppressed));
+            () -> new KTableSuppressProcessor<>(
+                buildSuppress(suppressed),
+                null,
+                null
+            );
 
         final ProcessorParameters<K, Change<V>> processorParameters = new 
ProcessorParameters<>(
             suppressionSupplier,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
index 548f5991dbb..db09307d48c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java
@@ -34,7 +34,8 @@ public FinalResultsSuppressionBuilder(final 
Suppressed.StrictBufferConfig buffer
         return new SuppressedImpl<>(
             gracePeriod,
             bufferConfig,
-            (ProcessorContext context, K key) -> key.window().end()
+            (ProcessorContext context, K key) -> key.window().end(),
+            true
         );
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
index f65f2b4af20..6f0021fbc49 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals.suppress;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
@@ -23,12 +24,21 @@
 
 import java.time.Duration;
 
+import static java.util.Objects.requireNonNull;
+
 public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> {
     private final SuppressedImpl<K> suppress;
     private InternalProcessorContext internalProcessorContext;
 
-    public KTableSuppressProcessor(final SuppressedImpl<K> suppress) {
-        this.suppress = suppress;
+    private final Serde<K> keySerde;
+    private final Serde<Change<V>> valueSerde;
+
+    public KTableSuppressProcessor(final SuppressedImpl<K> suppress,
+                                   final Serde<K> keySerde,
+                                   final Serde<Change<V>> valueSerde) {
+        this.suppress = requireNonNull(suppress);
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
     }
 
     @Override
@@ -39,12 +49,18 @@ public void init(final ProcessorContext context) {
     @Override
     public void process(final K key, final Change<V> value) {
         if (suppress.getTimeToWaitForMoreEvents() == Duration.ZERO && 
definedRecordTime(key) <= internalProcessorContext.streamTime()) {
-            internalProcessorContext.forward(key, value);
+            if (shouldForward(value)) {
+                internalProcessorContext.forward(key, value);
+            } // else skip
         } else {
             throw new NotImplementedException();
         }
     }
 
+    private boolean shouldForward(final Change<V> value) {
+        return !(value.newValue == null && suppress.suppressTombstones());
+    }
+
     private long definedRecordTime(final K key) {
         return suppress.getTimeDefinition().time(internalProcessorContext, 
key);
     }
@@ -55,10 +71,14 @@ public void close() {
 
     @Override
     public String toString() {
-        return "KTableSuppressProcessor{suppress=" + suppress + '}';
+        return "KTableSuppressProcessor{" +
+            "suppress=" + suppress +
+            ", keySerde=" + keySerde +
+            ", valueSerde=" + valueSerde +
+            '}';
     }
 
-    static class NotImplementedException extends RuntimeException {
+    public static class NotImplementedException extends RuntimeException {
         NotImplementedException() {
             super();
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
index cffc42b66d5..a3bf2db63a2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java
@@ -29,13 +29,16 @@
     private final BufferConfig bufferConfig;
     private final Duration timeToWaitForMoreEvents;
     private final TimeDefinition<K> timeDefinition;
+    private final boolean suppressTombstones;
 
     public SuppressedImpl(final Duration suppressionTime,
                           final BufferConfig bufferConfig,
-                          final TimeDefinition<K> timeDefinition) {
+                          final TimeDefinition<K> timeDefinition,
+                          final boolean suppressTombstones) {
         this.timeToWaitForMoreEvents = suppressionTime == null ? 
DEFAULT_SUPPRESSION_TIME : suppressionTime;
         this.timeDefinition = timeDefinition == null ? (context, anyKey) -> 
context.timestamp() : timeDefinition;
         this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : 
bufferConfig;
+        this.suppressTombstones = suppressTombstones;
     }
 
     interface TimeDefinition<K> {
@@ -73,4 +76,8 @@ public String toString() {
             ", timeDefinition=" + timeDefinition +
             '}';
     }
+
+    boolean suppressTombstones() {
+        return suppressTombstones;
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
index a0e78580d45..af91abaf2b1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java
@@ -42,9 +42,14 @@
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -58,12 +63,18 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import static java.lang.Long.MAX_VALUE;
+import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
 import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
 import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 @Category({IntegrationTest.class})
 public class SuppressionIntegrationTest {
@@ -75,10 +86,12 @@
     private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
     private static final int COMMIT_INTERVAL = 100;
     private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2;
+    private static final long TIMEOUT_MS = 30_000L;
 
+    @Ignore
     @Test
-    public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws 
InterruptedException {
-        final String testId = 
"-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
+    public void shouldSuppressIntermediateEventsWithEmitAfter() throws 
InterruptedException {
+        final String testId = "-shouldSuppressIntermediateEventsWithEmitAfter";
         final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
         final String input = "input" + testId;
         final String outputSuppressed = "output-suppressed" + testId;
@@ -87,7 +100,55 @@ public void 
shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws Interr
         cleanStateBeforeTest(input, outputSuppressed, outputRaw);
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<String, Long> valueCounts = builder
+        final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
+
+        valueCounts
+            .suppress(untilTimeLimit(ofMillis(scaledTime(2L)), unbounded()))
+            .toStream()
+            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    // this record is just here to advance stream time and 
flush the other records through the buffer
+                    new KeyValueTimestamp<>("tick", "tick", scaledTime(5L))
+                )
+            );
+            verifyOutput(
+                outputRaw,
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
+                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
+                    new KeyValueTimestamp<>("tick", 1L, scaledTime(5L))
+                )
+            );
+            verifyOutput(
+                outputSuppressed,
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L))
+                )
+            );
+        } finally {
+            driver.close();
+            cleanStateAfterTest(driver);
+        }
+    }
+
+    private KTable<String, Long> buildCountsTable(final String input, final 
StreamsBuilder builder) {
+        return builder
             .table(
                 input,
                 Consumed.with(STRING_SERDE, STRING_SERDE),
@@ -97,6 +158,20 @@ public void 
shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws Interr
             )
             .groupBy((k, v) -> new KeyValue<>(v, k), 
Serialized.with(STRING_SERDE, STRING_SERDE))
             .count(Materialized.<String, Long, KeyValueStore<Bytes, 
byte[]>>as("counts").withCachingDisabled());
+    }
+
+    @Test
+    public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws 
InterruptedException {
+        final String testId = 
"-shouldNotSuppressIntermediateEventsWithZeroEmitAfter";
+        final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(input, outputSuppressed, outputRaw);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
 
         valueCounts
             .suppress(untilTimeLimit(Duration.ZERO, unbounded()))
@@ -145,10 +220,200 @@ public void 
shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws Interr
         }
     }
 
-    private void cleanStateBeforeTest(final String... topic) throws 
InterruptedException {
-        CLUSTER.deleteAllTopicsAndWait(30_000L);
-        for (final String s : topic) {
-            CLUSTER.createTopic(s, 1, 1);
+    @Ignore
+    @Test
+    public void shouldSuppressIntermediateEventsWithRecordLimit() throws 
InterruptedException {
+        final String testId = "-shouldSuppressIntermediateEventsWithKeyLimit";
+        final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
+
+        valueCounts
+            .suppress(untilTimeLimit(ofMillis(MAX_VALUE), 
maxRecords(1L).emitEarlyWhenFull()))
+            .toStream()
+            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", "x", scaledTime(3L))
+                )
+            );
+            verifyOutput(
+                outputRaw,
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
+                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", 1L, scaledTime(3L))
+                )
+            );
+            verifyOutput(
+                outputSuppressed,
+                asList(
+                    // consecutive updates to v1 get suppressed into only the 
latter.
+                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L))
+                )
+            );
+        } finally {
+            driver.close();
+            cleanStateAfterTest(driver);
+        }
+    }
+
+    @Ignore
+    @Test
+    public void shouldSuppressIntermediateEventsWithBytesLimit() throws 
InterruptedException {
+        final String testId = 
"-shouldSuppressIntermediateEventsWithBytesLimit";
+        final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = buildCountsTable(input, 
builder);
+
+        valueCounts
+            // this is a bit brittle, but I happen to know that the entries 
are a little over 100 bytes in size.
+            .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), 
maxBytes(200L).emitEarlyWhenFull()))
+            .toStream()
+            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        try {
+            produceSynchronously(
+                input,
+                asList(
+                    new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                    new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)),
+                    new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", "x", scaledTime(3L))
+                )
+            );
+            verifyOutput(
+                outputRaw,
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)),
+                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)),
+                    new KeyValueTimestamp<>("x", 1L, scaledTime(3L))
+                )
+            );
+            verifyOutput(
+                outputSuppressed,
+                asList(
+                    // consecutive updates to v1 get suppressed into only the 
latter.
+                    new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)),
+                    new KeyValueTimestamp<>("v1", 1L, scaledTime(2L))
+                )
+            );
+        } finally {
+            driver.close();
+            cleanStateAfterTest(driver);
+        }
+    }
+
+    @Ignore
+    @Test
+    public void shouldSupportFinalResultsForTimeWindows() throws 
InterruptedException {
+        final String testId = "-shouldSupportFinalResultsForTimeWindows";
+        final String appId = 
getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId;
+        final String input = "input" + testId;
+        final String outputSuppressed = "output-suppressed" + testId;
+        final String outputRaw = "output-raw" + testId;
+
+        cleanStateBeforeTest(input, outputRaw, outputSuppressed);
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<Windowed<String>, Long> valueCounts = builder
+            .stream(input,
+                    Consumed.with(STRING_SERDE, STRING_SERDE)
+            )
+            .groupBy((String k1, String v1) -> k1, 
Serialized.with(STRING_SERDE, STRING_SERDE))
+            .windowedBy(TimeWindows.of(scaledTime(2L)).grace(scaledTime(1L)))
+            .count(Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("counts").withCachingDisabled().withLoggingDisabled());
+
+        valueCounts
+            .suppress(untilWindowCloses(unbounded()))
+            .toStream()
+            .map((final Windowed<String> k, final Long v) -> new 
KeyValue<>(k.toString(), v))
+            .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        valueCounts
+            .toStream()
+            .map((final Windowed<String> k, final Long v) -> new 
KeyValue<>(k.toString(), v))
+            .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long()));
+
+        final KafkaStreams driver = getCleanStartedStreams(appId, builder);
+        try {
+            produceSynchronously(input, asList(
+                new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)),
+                new KeyValueTimestamp<>("k1", "v1", scaledTime(2L)),
+                new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)),
+                new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)),
+                new KeyValueTimestamp<>("k1", "v1", scaledTime(4L)),
+                // note this event is dropped since it is out of the grace 
period
+                new KeyValueTimestamp<>("k1", "v1", scaledTime(0L))
+            ));
+            verifyOutput(
+                outputRaw,
+                asList(
+                    new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 1L, 
scaledTime(0L)),
+                    new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 2L, 
scaledTime(1L)),
+                    new KeyValueTimestamp<>(scaledWindowKey("k1", 2L, 4L), 1L, 
scaledTime(2L)),
+                    new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 3L, 
scaledTime(1L)),
+                    new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 4L, 
scaledTime(0L)),
+                    new KeyValueTimestamp<>(scaledWindowKey("k1", 4L, 6L), 1L, 
scaledTime(4L))
+                )
+            );
+
+            verifyOutput(
+                outputSuppressed,
+                singletonList(
+                    new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 4L, 
scaledTime(0L))
+                )
+            );
+        } finally {
+            driver.close();
+            cleanStateAfterTest(driver);
+        }
+    }
+
+    private String scaledWindowKey(final String key, final long unscaledStart, 
final long unscaledEnd) {
+        return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), 
scaledTime(unscaledEnd))).toString();
+    }
+
+    private void cleanStateBeforeTest(final String... topics) throws 
InterruptedException {
+        CLUSTER.deleteAllTopicsAndWait(TIMEOUT_MS);
+        for (final String topic : topics) {
+            CLUSTER.createTopic(topic, 1, 1);
         }
     }
 
@@ -167,7 +432,7 @@ private KafkaStreams getCleanStartedStreams(final String 
appId, final StreamsBui
 
     private void cleanStateAfterTest(final KafkaStreams driver) throws 
InterruptedException {
         driver.cleanUp();
-        CLUSTER.deleteAllTopicsAndWait(30_000L);
+        CLUSTER.deleteAllTopicsAndWait(TIMEOUT_MS);
     }
 
     private long scaledTime(final long unscaledTime) {
@@ -195,13 +460,7 @@ private void produceSynchronously(final String topic, 
final List<KeyValueTimesta
                 );
                 futures.add(f);
             }
-            for (final Future<RecordMetadata> future : futures) {
-                try {
-                    future.get();
-                } catch (final InterruptedException | ExecutionException e) {
-                    throw new RuntimeException(e);
-                }
-            }
+
             // TODO: test EOS
             //noinspection ConstantConditions
             if (false) {
@@ -209,6 +468,14 @@ private void produceSynchronously(final String topic, 
final List<KeyValueTimesta
             } else {
                 producer.flush();
             }
+
+            for (final Future<RecordMetadata> future : futures) {
+                try {
+                    future.get();
+                } catch (final InterruptedException | ExecutionException e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java 
b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
index 53f24b58aac..7650c59759e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java
@@ -61,31 +61,31 @@ public void 
intermediateEventsShouldAcceptAnyBufferAndSetBounds() {
         assertThat(
             "time alone should be set",
             untilTimeLimit(ofMillis(2), unbounded()),
-            is(new SuppressedImpl<>(ofMillis(2), unbounded(), null))
+            is(new SuppressedImpl<>(ofMillis(2), unbounded(), null, false))
         );
 
         assertThat(
             "time and unbounded buffer should be set",
             untilTimeLimit(ofMillis(2), unbounded()),
-            is(new SuppressedImpl<>(ofMillis(2), unbounded(), null))
+            is(new SuppressedImpl<>(ofMillis(2), unbounded(), null, false))
         );
 
         assertThat(
             "time and keys buffer should be set",
             untilTimeLimit(ofMillis(2), maxRecords(2)),
-            is(new SuppressedImpl<>(ofMillis(2), maxRecords(2), null))
+            is(new SuppressedImpl<>(ofMillis(2), maxRecords(2), null, false))
         );
 
         assertThat(
             "time and size buffer should be set",
             untilTimeLimit(ofMillis(2), maxBytes(2)),
-            is(new SuppressedImpl<>(ofMillis(2), maxBytes(2), null))
+            is(new SuppressedImpl<>(ofMillis(2), maxBytes(2), null, false))
         );
 
         assertThat(
             "all constraints should be set",
             untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)),
-            is(new SuppressedImpl<>(ofMillis(2), new EagerBufferConfigImpl(3L, 
2L), null))
+            is(new SuppressedImpl<>(ofMillis(2), new EagerBufferConfigImpl(3L, 
2L), null, false))
         );
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
new file mode 100644
index 00000000000..a6a888840fd
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import static java.util.Collections.emptyMap;
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class FullChangeSerdeTest {
+    private final FullChangeSerde<String> serde = new 
FullChangeSerde<>(Serdes.String());
+
+    @Test
+    public void shouldRoundTripNull() {
+        final byte[] serialized = serde.serializer().serialize(null, null);
+        assertThat(
+            serde.deserializer().deserialize(null, serialized),
+            nullValue()
+        );
+    }
+
+
+    @Test
+    public void shouldRoundTripNullChange() {
+        final byte[] serialized = serde.serializer().serialize(null, new 
Change<>(null, null));
+        assertThat(
+            serde.deserializer().deserialize(null, serialized),
+            is(new Change<>(null, null))
+        );
+    }
+
+    @Test
+    public void shouldRoundTripOldNull() {
+        final byte[] serialized = serde.serializer().serialize(null, new 
Change<>("new", null));
+        assertThat(
+            serde.deserializer().deserialize(null, serialized),
+            is(new Change<>("new", null))
+        );
+    }
+
+    @Test
+    public void shouldRoundTripNewNull() {
+        final byte[] serialized = serde.serializer().serialize(null, new 
Change<>(null, "old"));
+        assertThat(
+            serde.deserializer().deserialize(null, serialized),
+            is(new Change<>(null, "old"))
+        );
+    }
+
+    @Test
+    public void shouldRoundTripChange() {
+        final byte[] serialized = serde.serializer().serialize(null, new 
Change<>("new", "old"));
+        assertThat(
+            serde.deserializer().deserialize(null, serialized),
+            is(new Change<>("new", "old"))
+        );
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldConfigureSerde() {
+        final Serde<Void> mock = EasyMock.mock(Serde.class);
+        mock.configure(emptyMap(), false);
+        EasyMock.expectLastCall();
+        EasyMock.replay(mock);
+        final FullChangeSerde<Void> serde = new FullChangeSerde<>(mock);
+        serde.configure(emptyMap(), false);
+        EasyMock.verify(mock);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCloseSerde() {
+        final Serde<Void> mock = EasyMock.mock(Serde.class);
+        mock.close();
+        EasyMock.expectLastCall();
+        EasyMock.replay(mock);
+        final FullChangeSerde<Void> serde = new FullChangeSerde<>(mock);
+        serde.close();
+        EasyMock.verify(mock);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldConfigureSerializer() {
+        final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
+        final Serializer<Void> mockSerializer = 
EasyMock.mock(Serializer.class);
+        EasyMock.expect(mockSerde.serializer()).andReturn(mockSerializer);
+        EasyMock.replay(mockSerde);
+        mockSerializer.configure(emptyMap(), false);
+        EasyMock.expectLastCall();
+        EasyMock.replay(mockSerializer);
+        final Serializer<Change<Void>> serializer = new 
FullChangeSerde<>(mockSerde).serializer();
+        serializer.configure(emptyMap(), false);
+        EasyMock.verify(mockSerde);
+        EasyMock.verify(mockSerializer);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCloseSerializer() {
+        final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
+        final Serializer<Void> mockSerializer = 
EasyMock.mock(Serializer.class);
+        EasyMock.expect(mockSerde.serializer()).andReturn(mockSerializer);
+        EasyMock.replay(mockSerde);
+        mockSerializer.close();
+        EasyMock.expectLastCall();
+        EasyMock.replay(mockSerializer);
+        final Serializer<Change<Void>> serializer = new 
FullChangeSerde<>(mockSerde).serializer();
+        serializer.close();
+        EasyMock.verify(mockSerde);
+        EasyMock.verify(mockSerializer);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldConfigureDeserializer() {
+        final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
+        final Deserializer<Void> mockDeserializer = 
EasyMock.mock(Deserializer.class);
+        EasyMock.expect(mockSerde.deserializer()).andReturn(mockDeserializer);
+        EasyMock.replay(mockSerde);
+        mockDeserializer.configure(emptyMap(), false);
+        EasyMock.expectLastCall();
+        EasyMock.replay(mockDeserializer);
+        final Deserializer<Change<Void>> serializer = new 
FullChangeSerde<>(mockSerde).deserializer();
+        serializer.configure(emptyMap(), false);
+        EasyMock.verify(mockSerde);
+        EasyMock.verify(mockDeserializer);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCloseDeserializer() {
+        final Serde<Void> mockSerde = EasyMock.mock(Serde.class);
+        final Deserializer<Void> mockDeserializer = 
EasyMock.mock(Deserializer.class);
+        EasyMock.expect(mockSerde.deserializer()).andReturn(mockDeserializer);
+        EasyMock.replay(mockSerde);
+        mockDeserializer.close();
+        EasyMock.expectLastCall();
+        EasyMock.replay(mockDeserializer);
+        final Deserializer<Change<Void>> serializer = new 
FullChangeSerde<>(mockSerde).deserializer();
+        serializer.close();
+        EasyMock.verify(mockSerde);
+        EasyMock.verify(mockDeserializer);
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
index fead6788eb4..d98a15e093b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.LongDeserializer;
@@ -31,16 +32,24 @@
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Serialized;
+import org.apache.kafka.streams.kstream.SessionWindows;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import 
org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.WindowStore;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
 import org.apache.kafka.streams.test.OutputVerifier;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -49,10 +58,14 @@
 import java.util.Properties;
 
 import static java.time.Duration.ZERO;
+import static java.time.Duration.ofMillis;
 import static java.util.Arrays.asList;
 import static java.util.Collections.singletonList;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
 import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
 import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
 
 public class SuppressScenarioTest {
     private static final StringDeserializer STRING_DESERIALIZER = new 
StringDeserializer();
@@ -146,6 +159,403 @@ public void 
shouldImmediatelyEmitEventsWithZeroEmitAfter() {
         }
     }
 
+    @Test(expected = ProcessorStateException.class)
+    public void shouldSuppressIntermediateEventsWithTimeLimit() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = builder
+            .table(
+                "input",
+                Consumed.with(STRING_SERDE, STRING_SERDE),
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>with(STRING_SERDE, STRING_SERDE)
+                    .withCachingDisabled()
+                    .withLoggingDisabled()
+            )
+            .groupBy((k, v) -> new KeyValue<>(v, k), 
Serialized.with(STRING_SERDE, STRING_SERDE))
+            .count();
+        valueCounts
+            .suppress(untilTimeLimit(ofMillis(2L), unbounded()))
+            .toStream()
+            .to("output-suppressed", Produced.with(STRING_SERDE, 
Serdes.Long()));
+        valueCounts
+            .toStream()
+            .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
+        final Topology topology = builder.build();
+        final Properties config = Utils.mkProperties(Utils.mkMap(
+            Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, 
getClass().getSimpleName().toLowerCase(Locale.getDefault())),
+            Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
+        ));
+        final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, config)) {
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
+            driver.pipeInput(recordFactory.create("input", "k2", "v1", 2L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, 0L),
+                    new KeyValueTimestamp<>("v1", 0L, 1L),
+                    new KeyValueTimestamp<>("v2", 1L, 1L),
+                    new KeyValueTimestamp<>("v1", 1L, 2L)
+                )
+            );
+            // note that the current stream time is 2, which causes v1 to age 
out of the buffer, since
+            // it has been buffered since time 0 (even though the current 
version of it in the buffer has timestamp 1)
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("v1", 0L, 1L))
+            );
+            // inserting a dummy "tick" record just to advance stream time
+            driver.pipeInput(recordFactory.create("input", "tick", "tick", 
3L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("tick", 1L, 3L))
+            );
+            // the stream time is now 3, so it's time to emit this record
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(new KeyValueTimestamp<>("v2", 1L, 1L))
+            );
+
+
+            driver.pipeInput(recordFactory.create("input", "tick", "tick", 
4L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("tick", 0L, 4L),
+                    new KeyValueTimestamp<>("tick", 1L, 4L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(
+                    new KeyValueTimestamp<>("v1", 1L, 2L)
+                )
+            );
+            driver.pipeInput(recordFactory.create("input", "tick", "tick", 
5L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("tick", 0L, 5L),
+                    new KeyValueTimestamp<>("tick", 1L, 5L)
+                )
+            );
+            // Note that because the punctuate runs before the process call, 
the tick at time 5 causes
+            // the previous tick to age out of the buffer, so at this point, 
we see the prior value emitted
+            // and the new value is still buffered.
+
+            // Also worth noting is that "tick" ages out because it has been 
buffered since time 3, even though
+            // the current timestamp of the buffered record is 4.
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(
+                    new KeyValueTimestamp<>("tick", 1L, 4L)
+                )
+            );
+        }
+    }
+
+    @Test(expected = ProcessorStateException.class)
+    public void shouldSuppressIntermediateEventsWithRecordLimit() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = builder
+            .table(
+                "input",
+                Consumed.with(STRING_SERDE, STRING_SERDE),
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>with(STRING_SERDE, STRING_SERDE)
+                    .withCachingDisabled()
+                    .withLoggingDisabled()
+            )
+            .groupBy((k, v) -> new KeyValue<>(v, k), 
Serialized.with(STRING_SERDE, STRING_SERDE))
+            .count(Materialized.with(STRING_SERDE, Serdes.Long()));
+        valueCounts
+            .suppress(untilTimeLimit(ofMillis(Long.MAX_VALUE), 
maxRecords(1L).emitEarlyWhenFull()))
+            .toStream()
+            .to("output-suppressed", Produced.with(STRING_SERDE, 
Serdes.Long()));
+        valueCounts
+            .toStream()
+            .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
+        final Topology topology = builder.build();
+        System.out.println(topology.describe());
+        final Properties config = Utils.mkProperties(Utils.mkMap(
+            Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, 
getClass().getSimpleName().toLowerCase(Locale.getDefault())),
+            Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
+        ));
+        final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, config)) {
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
+            driver.pipeInput(recordFactory.create("input", "k2", "v1", 2L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, 0L),
+                    new KeyValueTimestamp<>("v1", 0L, 1L),
+                    new KeyValueTimestamp<>("v2", 1L, 1L),
+                    new KeyValueTimestamp<>("v1", 1L, 2L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    // consecutive updates to v1 get suppressed into only the 
latter.
+                    new KeyValueTimestamp<>("v1", 0L, 1L),
+                    new KeyValueTimestamp<>("v2", 1L, 1L)
+                    // the last update won't be evicted until another key 
comes along.
+                )
+            );
+            driver.pipeInput(recordFactory.create("input", "x", "x", 3L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(
+                    new KeyValueTimestamp<>("x", 1L, 3L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(
+                    // now we see that last update to v1, but we won't see the 
update to x until it gets evicted
+                    new KeyValueTimestamp<>("v1", 1L, 2L)
+                )
+            );
+        }
+    }
+
+    @Test(expected = ProcessorStateException.class)
+    public void shouldSuppressIntermediateEventsWithBytesLimit() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<String, Long> valueCounts = builder
+            .table(
+                "input",
+                Consumed.with(STRING_SERDE, STRING_SERDE),
+                Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>with(STRING_SERDE, STRING_SERDE)
+                    .withCachingDisabled()
+                    .withLoggingDisabled()
+            )
+            .groupBy((k, v) -> new KeyValue<>(v, k), 
Serialized.with(STRING_SERDE, STRING_SERDE))
+            .count();
+        valueCounts
+            // this is a bit brittle, but I happen to know that the entries 
are a little over 100 bytes in size.
+            .suppress(untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), 
maxBytes(200L).emitEarlyWhenFull()))
+            .toStream()
+            .to("output-suppressed", Produced.with(STRING_SERDE, 
Serdes.Long()));
+        valueCounts
+            .toStream()
+            .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
+        final Topology topology = builder.build();
+        System.out.println(topology.describe());
+        final Properties config = Utils.mkProperties(Utils.mkMap(
+            Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, 
getClass().getSimpleName().toLowerCase(Locale.getDefault())),
+            Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
+        ));
+        final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, config)) {
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L));
+            final ConsumerRecord<byte[], byte[]> consumerRecord = 
recordFactory.create("input", "k2", "v1", 2L);
+            driver.pipeInput(consumerRecord);
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("v1", 1L, 0L),
+                    new KeyValueTimestamp<>("v1", 0L, 1L),
+                    new KeyValueTimestamp<>("v2", 1L, 1L),
+                    new KeyValueTimestamp<>("v1", 1L, 2L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    // consecutive updates to v1 get suppressed into only the 
latter.
+                    new KeyValueTimestamp<>("v1", 0L, 1L),
+                    new KeyValueTimestamp<>("v2", 1L, 1L)
+                    // the last update won't be evicted until another key 
comes along.
+                )
+            );
+            driver.pipeInput(recordFactory.create("input", "x", "x", 3L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(
+                    new KeyValueTimestamp<>("x", 1L, 3L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                singletonList(
+                    // now we see that last update to v1, but we won't see the 
update to x until it gets evicted
+                    new KeyValueTimestamp<>("v1", 1L, 2L)
+                )
+            );
+        }
+    }
+
+    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    public void shouldSupportFinalResultsForTimeWindows() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<Windowed<String>, Long> valueCounts = builder
+            .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
+            .groupBy((String k, String v) -> k, Serialized.with(STRING_SERDE, 
STRING_SERDE))
+            .windowedBy(TimeWindows.of(2L).grace(1L))
+            .count(Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("counts").withCachingDisabled());
+        valueCounts
+            .suppress(untilWindowCloses(unbounded()))
+            .toStream()
+            .map((final Windowed<String> k, final Long v) -> new 
KeyValue<>(k.toString(), v))
+            .to("output-suppressed", Produced.with(STRING_SERDE, 
Serdes.Long()));
+        valueCounts
+            .toStream()
+            .map((final Windowed<String> k, final Long v) -> new 
KeyValue<>(k.toString(), v))
+            .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
+        final Topology topology = builder.build();
+        System.out.println(topology.describe());
+        final Properties config = Utils.mkProperties(Utils.mkMap(
+            Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, 
getClass().getSimpleName().toLowerCase(Locale.getDefault())),
+            Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
+        ));
+        final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, config)) {
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 2L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L));
+            // note this last record gets dropped because it is out of the 
grace period
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("[k1@0/2]", 1L, 0L),
+                    new KeyValueTimestamp<>("[k1@0/2]", 2L, 1L),
+                    new KeyValueTimestamp<>("[k1@2/4]", 1L, 2L),
+                    new KeyValueTimestamp<>("[k1@0/2]", 3L, 1L),
+                    new KeyValueTimestamp<>("[k1@0/2]", 4L, 0L),
+                    new KeyValueTimestamp<>("[k1@4/6]", 1L, 5L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("[k1@0/2]", 4L, 0L),
+                    new KeyValueTimestamp<>("[k1@2/4]", 1L, 2L)
+                )
+            );
+        }
+    }
+
+    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    public void shouldSupportFinalResultsForTimeWindowsWithLargeJump() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<Windowed<String>, Long> valueCounts = builder
+            .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
+            .groupBy((String k, String v) -> k, Serialized.with(STRING_SERDE, 
STRING_SERDE))
+            .windowedBy(TimeWindows.of(2L).grace(2L))
+            .count(Materialized.<String, Long, WindowStore<Bytes, 
byte[]>>as("counts").withCachingDisabled().withKeySerde(STRING_SERDE));
+        valueCounts
+            .suppress(untilWindowCloses(unbounded()))
+            .toStream()
+            .map((final Windowed<String> k, final Long v) -> new 
KeyValue<>(k.toString(), v))
+            .to("output-suppressed", Produced.with(STRING_SERDE, 
Serdes.Long()));
+        valueCounts
+            .toStream()
+            .map((final Windowed<String> k, final Long v) -> new 
KeyValue<>(k.toString(), v))
+            .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
+        final Topology topology = builder.build();
+        System.out.println(topology.describe());
+        final Properties config = Utils.mkProperties(Utils.mkMap(
+            Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, 
getClass().getSimpleName().toLowerCase(Locale.getDefault())),
+            Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
+        ));
+        final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, config)) {
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 2L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 3L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 4L));
+            // this update should get dropped, since the previous event 
advanced the stream time and closed the window.
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 30L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("[k1@0/2]", 1L, 0L),
+                    new KeyValueTimestamp<>("[k1@0/2]", 2L, 1L),
+                    new KeyValueTimestamp<>("[k1@2/4]", 1L, 2L),
+                    new KeyValueTimestamp<>("[k1@0/2]", 3L, 0L),
+                    new KeyValueTimestamp<>("[k1@2/4]", 2L, 3L),
+                    new KeyValueTimestamp<>("[k1@0/2]", 4L, 0L),
+                    new KeyValueTimestamp<>("[k1@4/6]", 1L, 4L),
+                    new KeyValueTimestamp<>("[k1@30/32]", 1L, 30L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("[k1@0/2]", 4L, 0L),
+                    new KeyValueTimestamp<>("[k1@2/4]", 2L, 3L),
+                    new KeyValueTimestamp<>("[k1@4/6]", 1L, 4L)
+                )
+            );
+        }
+    }
+
+    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    public void shouldSupportFinalResultsForSessionWindows() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        final KTable<Windowed<String>, Long> valueCounts = builder
+            .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE))
+            .groupBy((String k, String v) -> k, Serialized.with(STRING_SERDE, 
STRING_SERDE))
+            .windowedBy(SessionWindows.with(5L).grace(5L))
+            .count(Materialized.<String, Long, SessionStore<Bytes, 
byte[]>>as("counts").withCachingDisabled());
+        valueCounts
+            .suppress(untilWindowCloses(unbounded()))
+            .toStream()
+            .map((final Windowed<String> k, final Long v) -> new 
KeyValue<>(k.toString(), v))
+            .to("output-suppressed", Produced.with(STRING_SERDE, 
Serdes.Long()));
+        valueCounts
+            .toStream()
+            .map((final Windowed<String> k, final Long v) -> new 
KeyValue<>(k.toString(), v))
+            .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long()));
+        final Topology topology = builder.build();
+        System.out.println(topology.describe());
+        final Properties config = Utils.mkProperties(Utils.mkMap(
+            Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, 
getClass().getSimpleName().toLowerCase(Locale.getDefault())),
+            Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus")
+        ));
+        final ConsumerRecordFactory<String, String> recordFactory = new 
ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, config)) {
+            // first window
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L));
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
+            // new window
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 7L));
+            // late event for first window - this should get dropped from all 
streams, since the first window is now closed.
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L));
+            // just pushing stream time forward to flush the other events 
through.
+            driver.pipeInput(recordFactory.create("input", "k1", "v1", 30L));
+            verify(
+                drainProducerRecords(driver, "output-raw", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("[k1@0/0]", 1L, 0L),
+                    new KeyValueTimestamp<>("[k1@0/0]", null, 1L),
+                    new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L),
+                    new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L),
+                    new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L)
+                )
+            );
+            verify(
+                drainProducerRecords(driver, "output-suppressed", 
STRING_DESERIALIZER, LONG_DESERIALIZER),
+                asList(
+                    new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L),
+                    new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L)
+                )
+            );
+        }
+    }
+
+
     private <K, V> void verify(final List<ProducerRecord<K, V>> results, final 
List<KeyValueTimestamp<K, V>> expectedResults) {
         if (results.size() != expectedResults.size()) {
             throw new AssertionError(printRecords(results) + " != " + 
expectedResults);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
index 466033316c7..a38d1d58f43 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.streams.kstream.internals.suppress;
 
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.Suppressed;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.Change;
+import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
+import org.apache.kafka.streams.kstream.internals.SessionWindow;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.test.MockInternalProcessorContext;
@@ -33,29 +36,31 @@
 
 import static java.time.Duration.ZERO;
 import static java.time.Duration.ofMillis;
+import static org.apache.kafka.common.serialization.Serdes.Long;
+import static org.apache.kafka.common.serialization.Serdes.String;
+import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords;
 import static 
org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded;
 import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit;
 import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+import static 
org.apache.kafka.streams.kstream.WindowedSerdes.sessionWindowedSerdeFrom;
+import static 
org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.fail;
 
 @SuppressWarnings("PointlessArithmeticExpression")
 public class KTableSuppressProcessorTest {
-    /**
-     * Use this value to indicate that the test correctness does not depend on 
any particular number
-     */
     private static final long ARBITRARY_LONG = 5L;
 
-    /**
-     * Use this value to indicate that the test correctness does not depend on 
any particular window
-     */
+    private static final long ARBITRARY_TIMESTAMP = 1993L;
+
+    private static final Change<Long> ARBITRARY_CHANGE = new Change<>(7L, 14L);
+
     private static final TimeWindow ARBITRARY_WINDOW = new TimeWindow(0L, 
100L);
 
     @Test
     public void zeroTimeLimitShouldImmediatelyEmit() {
         final KTableSuppressProcessor<String, Long> processor =
-            new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, 
unbounded())));
+            new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, 
unbounded())), String(), new FullChangeSerde<>(Long()));
 
         final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
         processor.init(context);
@@ -64,7 +69,7 @@ public void zeroTimeLimitShouldImmediatelyEmit() {
         context.setTimestamp(timestamp);
         context.setStreamTime(timestamp);
         final String key = "hey";
-        final Change<Long> value = new Change<>(ARBITRARY_LONG, 
ARBITRARY_LONG);
+        final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
@@ -76,7 +81,11 @@ public void zeroTimeLimitShouldImmediatelyEmit() {
     @Test
     public void windowedZeroTimeLimitShouldImmediatelyEmit() {
         final KTableSuppressProcessor<Windowed<String>, Long> processor =
-            new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, 
unbounded())));
+            new KTableSuppressProcessor<>(
+                getImpl(untilTimeLimit(ZERO, unbounded())),
+                timeWindowedSerdeFrom(String.class),
+                new FullChangeSerde<>(Long())
+            );
 
         final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
         processor.init(context);
@@ -85,7 +94,7 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() {
         context.setTimestamp(timestamp);
         context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW);
-        final Change<Long> value = new Change<>(ARBITRARY_LONG, 
ARBITRARY_LONG);
+        final Change<Long> value = ARBITRARY_CHANGE;
         processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));
@@ -94,21 +103,32 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() {
         assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
-    @Test
-    public void intermediateSuppressionShouldThrow() {
+    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    public void intermediateSuppressionShouldBufferAndEmitLater() {
         final KTableSuppressProcessor<String, Long> processor =
-            new 
KTableSuppressProcessor<>(getImpl(untilTimeLimit(Duration.ofMillis(1), 
unbounded())));
+            new KTableSuppressProcessor<>(
+                getImpl(untilTimeLimit(ofMillis(1), unbounded())),
+                String(),
+                new FullChangeSerde<>(Long())
+            );
 
         final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
         processor.init(context);
 
-        try {
-            processor.process("hey", new Change<>(null, 1L));
-            fail("expected an exception for now");
-        } catch (final KTableSuppressProcessor.NotImplementedException e) {
-            // expected
-        }
+        final long timestamp = 0L;
+        context.setRecordMetadata("topic", 0, 0, null, timestamp);
+        final String key = "hey";
+        final Change<Long> value = new Change<>(null, 1L);
+        processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
+
+        assertThat(context.scheduledPunctuators(), hasSize(1));
+        context.scheduledPunctuators().get(0).getPunctuator().punctuate(1);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
 
@@ -118,49 +138,97 @@ public void intermediateSuppressionShouldThrow() {
     }
 
 
-    @Test
-    public void finalResultsSuppressionShouldThrow() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor =
-            new KTableSuppressProcessor<>(finalResults(ofMillis(1)));
+    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
+            finalResults(ofMillis(1L)),
+            timeWindowedSerdeFrom(String.class),
+            new FullChangeSerde<>(Long())
+        );
 
         final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
         processor.init(context);
 
-        context.setTimestamp(ARBITRARY_LONG);
-        try {
-            processor.process(new Windowed<>("hey", ARBITRARY_WINDOW), new 
Change<>(ARBITRARY_LONG, ARBITRARY_LONG));
-            fail("expected an exception for now");
-        } catch (final KTableSuppressProcessor.NotImplementedException e) {
-            // expected
-        }
+        final long timestamp = ARBITRARY_TIMESTAMP;
+        context.setRecordMetadata("topic", 0, 0, null, timestamp);
+        final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW);
+        final Change<Long> value = ARBITRARY_CHANGE;
+        processor.process(key, value);
         assertThat(context.forwarded(), hasSize(0));
+
+        assertThat(context.scheduledPunctuators(), hasSize(1));
+        
context.scheduledPunctuators().get(0).getPunctuator().punctuate(timestamp + 1L);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
-    @Test
-    public void finalResultsWith0GraceBeforeWindowEndShouldThrow() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor =
-            new KTableSuppressProcessor<>(finalResults(ofMillis(0)));
+    /**
+     * Testing a special case of final results: that even with a grace period 
of 0,
+     * it will still buffer events and emit only after the end of the window.
+     * As opposed to emitting immediately the way regular suppresion would 
with a time limit of 0.
+     */
+    @Test(expected = KTableSuppressProcessor.NotImplementedException.class)
+    public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
+            finalResults(ofMillis(0)),
+            timeWindowedSerdeFrom(String.class),
+            new FullChangeSerde<>(Long())
+        );
 
         final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
         processor.init(context);
 
         final long timestamp = 5L;
+        context.setRecordMetadata("", 0, 0L, null, timestamp);
+        final long windowEnd = 100L;
+        final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 
windowEnd));
+        final Change<Long> value = ARBITRARY_CHANGE;
+        processor.process(key, value);
+        assertThat(context.forwarded(), hasSize(0));
+
+        assertThat(context.scheduledPunctuators(), hasSize(1));
+        
context.scheduledPunctuators().get(0).getPunctuator().punctuate(windowEnd);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    @Test
+    public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
+            finalResults(ofMillis(0)),
+            timeWindowedSerdeFrom(String.class),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
         context.setTimestamp(timestamp);
+        context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 
100L));
-        final Change<Long> value = new Change<>(ARBITRARY_LONG, 
ARBITRARY_LONG);
-        try {
-            processor.process(key, value);
-            fail("expected an exception");
-        } catch (final KTableSuppressProcessor.NotImplementedException e) {
-            // expected
-        }
-        assertThat(context.forwarded(), hasSize(0));
+        final Change<Long> value = ARBITRARY_CHANGE;
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
     }
 
     @Test
-    public void finalResultsWith0GraceAtWindowEndShouldImmediatelyEmit() {
-        final KTableSuppressProcessor<Windowed<String>, Long> processor =
-            new KTableSuppressProcessor<>(finalResults(ofMillis(0)));
+    public void finalResultsShouldSuppressTombstonesForTimeWindows() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
+            finalResults(ofMillis(0)),
+            timeWindowedSerdeFrom(String.class),
+            new FullChangeSerde<>(Long())
+        );
 
         final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
         processor.init(context);
@@ -169,7 +237,100 @@ public void 
finalResultsWith0GraceAtWindowEndShouldImmediatelyEmit() {
         context.setTimestamp(timestamp);
         context.setStreamTime(timestamp);
         final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 
100L));
-        final Change<Long> value = new Change<>(ARBITRARY_LONG, 
ARBITRARY_LONG);
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(0));
+    }
+
+    @Test
+    public void finalResultsShouldSuppressTombstonesForSessionWindows() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<>(
+            finalResults(ofMillis(0)),
+            sessionWindowedSerdeFrom(String.class),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setTimestamp(timestamp);
+        context.setStreamTime(timestamp);
+        final Windowed<String> key = new Windowed<>("hey", new 
SessionWindow(0L, 0L));
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(0));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void suppressShouldNotSuppressTombstonesForTimeWindows() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<Windowed<String>, Long>(
+            (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)),
+            timeWindowedSerdeFrom(String.class),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setTimestamp(timestamp);
+        context.setStreamTime(timestamp);
+        final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 
100L));
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void suppressShouldNotSuppressTombstonesForSessionWindows() {
+        final KTableSuppressProcessor<Windowed<String>, Long> processor = new 
KTableSuppressProcessor<Windowed<String>, Long>(
+            (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)),
+            sessionWindowedSerdeFrom(String.class),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setTimestamp(timestamp);
+        context.setStreamTime(timestamp);
+        final Windowed<String> key = new Windowed<>("hey", new 
SessionWindow(0L, 0L));
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
+        processor.process(key, value);
+
+        assertThat(context.forwarded(), hasSize(1));
+        final MockProcessorContext.CapturedForward capturedForward = 
context.forwarded().get(0);
+        assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value)));
+        assertThat(capturedForward.timestamp(), is(timestamp));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void suppressShouldNotSuppressTombstonesForKTable() {
+        final KTableSuppressProcessor<String, Long> processor = new 
KTableSuppressProcessor<String, Long>(
+            (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)),
+            Serdes.String(),
+            new FullChangeSerde<>(Long())
+        );
+
+        final MockInternalProcessorContext context = new 
MockInternalProcessorContext();
+        processor.init(context);
+
+        final long timestamp = 100L;
+        context.setTimestamp(timestamp);
+        context.setStreamTime(timestamp);
+        final String key = "hey";
+        final Change<Long> value = new Change<>(null, ARBITRARY_LONG);
         processor.process(key, value);
 
         assertThat(context.forwarded(), hasSize(1));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-328: Add in-memory Suppression
> ----------------------------------
>
>                 Key: KAFKA-7223
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7223
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to