[ https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16627891#comment-16627891 ]
ASF GitHub Bot commented on KAFKA-7223: --------------------------------------- guozhangwang closed pull request #5687: KAFKA-7223: add tests in preparation for suppression URL: https://github.com/apache/kafka/pull/5687 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java index 7488ef6ff37..49fe96ba20c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -155,6 +155,6 @@ static StrictBufferConfig unbounded() { * @return a suppression configuration */ static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig) { - return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, null); + return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, null, false); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java new file mode 100644 index 00000000000..8a2e619b7b5 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/FullChangeSerde.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.ByteBufferDeserializer; +import org.apache.kafka.common.serialization.ByteBufferSerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serializer; + +import java.nio.ByteBuffer; +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class FullChangeSerde<T> implements Serde<Change<T>> { + private final Serde<T> inner; + + public FullChangeSerde(final Serde<T> inner) { + this.inner = requireNonNull(inner); + } + + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + inner.configure(configs, isKey); + } + + @Override + public void close() { + inner.close(); + } + + @Override + public Serializer<Change<T>> serializer() { + final Serializer<T> innerSerializer = inner.serializer(); + final ByteBufferSerializer byteBufferSerializer = new ByteBufferSerializer(); + + return new Serializer<Change<T>>() { + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + innerSerializer.configure(configs, isKey); + } + + @Override + public byte[] serialize(final String topic, final Change<T> data) { + if (data == null) { + return null; + } + final byte[] oldBytes = data.oldValue == null ? null : innerSerializer.serialize(topic, data.oldValue); + final int oldSize = oldBytes == null ? -1 : oldBytes.length; + final byte[] newBytes = data.newValue == null ? null : innerSerializer.serialize(topic, data.newValue); + final int newSize = newBytes == null ? -1 : newBytes.length; + + final ByteBuffer buffer = ByteBuffer.allocate( + 4 + (oldSize == -1 ? 0 : oldSize) + 4 + (newSize == -1 ? 0 : newSize) + ); + buffer.putInt(oldSize); + if (oldBytes != null) { + buffer.put(oldBytes); + } + buffer.putInt(newSize); + if (newBytes != null) { + buffer.put(newBytes); + } + return byteBufferSerializer.serialize(null, buffer); + } + + @Override + public void close() { + innerSerializer.close(); + } + }; + } + + @Override + public Deserializer<Change<T>> deserializer() { + final Deserializer<T> innerDeserializer = inner.deserializer(); + final ByteBufferDeserializer byteBufferDeserializer = new ByteBufferDeserializer(); + return new Deserializer<Change<T>>() { + @Override + public void configure(final Map<String, ?> configs, final boolean isKey) { + innerDeserializer.configure(configs, isKey); + } + + @Override + public Change<T> deserialize(final String topic, final byte[] data) { + if (data == null) { + return null; + } + final ByteBuffer buffer = byteBufferDeserializer.deserialize(null, data); + + final int oldSize = buffer.getInt(); + final byte[] oldBytes = oldSize == -1 ? null : new byte[oldSize]; + if (oldBytes != null) { + buffer.get(oldBytes); + } + final T oldValue = oldBytes == null ? null : innerDeserializer.deserialize(topic, oldBytes); + + final int newSize = buffer.getInt(); + final byte[] newBytes = newSize == -1 ? null : new byte[newSize]; + if (newBytes != null) { + buffer.get(newBytes); + } + final T newValue = newBytes == null ? null : innerDeserializer.deserialize(topic, newBytes); + return new Change<>(newValue, oldValue); + } + + @Override + public void close() { + innerDeserializer.close(); + } + }; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 2330fad1b16..ea5c3049e7f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -364,8 +364,13 @@ public String queryableStoreName() { public KTable<K, V> suppress(final Suppressed<K> suppressed) { final String name = builder.newProcessorName(SUPPRESS_NAME); + // TODO: follow-up pr to forward the k/v serdes final ProcessorSupplier<K, Change<V>> suppressionSupplier = - () -> new KTableSuppressProcessor<>(buildSuppress(suppressed)); + () -> new KTableSuppressProcessor<>( + buildSuppress(suppressed), + null, + null + ); final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>( suppressionSupplier, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java index 548f5991dbb..db09307d48c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java @@ -34,7 +34,8 @@ public FinalResultsSuppressionBuilder(final Suppressed.StrictBufferConfig buffer return new SuppressedImpl<>( gracePeriod, bufferConfig, - (ProcessorContext context, K key) -> key.window().end() + (ProcessorContext context, K key) -> key.window().end(), + true ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java index f65f2b4af20..6f0021fbc49 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals.suppress; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.kstream.internals.Change; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; @@ -23,12 +24,21 @@ import java.time.Duration; +import static java.util.Objects.requireNonNull; + public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> { private final SuppressedImpl<K> suppress; private InternalProcessorContext internalProcessorContext; - public KTableSuppressProcessor(final SuppressedImpl<K> suppress) { - this.suppress = suppress; + private final Serde<K> keySerde; + private final Serde<Change<V>> valueSerde; + + public KTableSuppressProcessor(final SuppressedImpl<K> suppress, + final Serde<K> keySerde, + final Serde<Change<V>> valueSerde) { + this.suppress = requireNonNull(suppress); + this.keySerde = keySerde; + this.valueSerde = valueSerde; } @Override @@ -39,12 +49,18 @@ public void init(final ProcessorContext context) { @Override public void process(final K key, final Change<V> value) { if (suppress.getTimeToWaitForMoreEvents() == Duration.ZERO && definedRecordTime(key) <= internalProcessorContext.streamTime()) { - internalProcessorContext.forward(key, value); + if (shouldForward(value)) { + internalProcessorContext.forward(key, value); + } // else skip } else { throw new NotImplementedException(); } } + private boolean shouldForward(final Change<V> value) { + return !(value.newValue == null && suppress.suppressTombstones()); + } + private long definedRecordTime(final K key) { return suppress.getTimeDefinition().time(internalProcessorContext, key); } @@ -55,10 +71,14 @@ public void close() { @Override public String toString() { - return "KTableSuppressProcessor{suppress=" + suppress + '}'; + return "KTableSuppressProcessor{" + + "suppress=" + suppress + + ", keySerde=" + keySerde + + ", valueSerde=" + valueSerde + + '}'; } - static class NotImplementedException extends RuntimeException { + public static class NotImplementedException extends RuntimeException { NotImplementedException() { super(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java index cffc42b66d5..a3bf2db63a2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java @@ -29,13 +29,16 @@ private final BufferConfig bufferConfig; private final Duration timeToWaitForMoreEvents; private final TimeDefinition<K> timeDefinition; + private final boolean suppressTombstones; public SuppressedImpl(final Duration suppressionTime, final BufferConfig bufferConfig, - final TimeDefinition<K> timeDefinition) { + final TimeDefinition<K> timeDefinition, + final boolean suppressTombstones) { this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime; this.timeDefinition = timeDefinition == null ? (context, anyKey) -> context.timestamp() : timeDefinition; this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : bufferConfig; + this.suppressTombstones = suppressTombstones; } interface TimeDefinition<K> { @@ -73,4 +76,8 @@ public String toString() { ", timeDefinition=" + timeDefinition + '}'; } + + boolean suppressTombstones() { + return suppressTombstones; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index a0e78580d45..af91abaf2b1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -42,9 +42,14 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -58,12 +63,18 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import static java.lang.Long.MAX_VALUE; +import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; @Category({IntegrationTest.class}) public class SuppressionIntegrationTest { @@ -75,10 +86,12 @@ private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); private static final int COMMIT_INTERVAL = 100; private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2; + private static final long TIMEOUT_MS = 30_000L; + @Ignore @Test - public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws InterruptedException { - final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter"; + public void shouldSuppressIntermediateEventsWithEmitAfter() throws InterruptedException { + final String testId = "-shouldSuppressIntermediateEventsWithEmitAfter"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; final String outputSuppressed = "output-suppressed" + testId; @@ -87,7 +100,55 @@ public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws Interr cleanStateBeforeTest(input, outputSuppressed, outputRaw); final StreamsBuilder builder = new StreamsBuilder(); - final KTable<String, Long> valueCounts = builder + final KTable<String, Long> valueCounts = buildCountsTable(input, builder); + + valueCounts + .suppress(untilTimeLimit(ofMillis(scaledTime(2L)), unbounded())) + .toStream() + .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); + + valueCounts + .toStream() + .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); + + final KafkaStreams driver = getCleanStartedStreams(appId, builder); + + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + // this record is just here to advance stream time and flush the other records through the buffer + new KeyValueTimestamp<>("tick", "tick", scaledTime(5L)) + ) + ); + verifyOutput( + outputRaw, + asList( + new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)), + new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), + new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), + new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), + new KeyValueTimestamp<>("tick", 1L, scaledTime(5L)) + ) + ); + verifyOutput( + outputSuppressed, + asList( + new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), + new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)) + ) + ); + } finally { + driver.close(); + cleanStateAfterTest(driver); + } + } + + private KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) { + return builder .table( input, Consumed.with(STRING_SERDE, STRING_SERDE), @@ -97,6 +158,20 @@ public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws Interr ) .groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE)) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts").withCachingDisabled()); + } + + @Test + public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws InterruptedException { + final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(input, outputSuppressed, outputRaw); + + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<String, Long> valueCounts = buildCountsTable(input, builder); valueCounts .suppress(untilTimeLimit(Duration.ZERO, unbounded())) @@ -145,10 +220,200 @@ public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws Interr } } - private void cleanStateBeforeTest(final String... topic) throws InterruptedException { - CLUSTER.deleteAllTopicsAndWait(30_000L); - for (final String s : topic) { - CLUSTER.createTopic(s, 1, 1); + @Ignore + @Test + public void shouldSuppressIntermediateEventsWithRecordLimit() throws InterruptedException { + final String testId = "-shouldSuppressIntermediateEventsWithKeyLimit"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<String, Long> valueCounts = buildCountsTable(input, builder); + + valueCounts + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(1L).emitEarlyWhenFull())) + .toStream() + .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); + + valueCounts + .toStream() + .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); + + final KafkaStreams driver = getCleanStartedStreams(appId, builder); + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(3L)) + ) + ); + verifyOutput( + outputRaw, + asList( + new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)), + new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), + new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), + new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), + new KeyValueTimestamp<>("x", 1L, scaledTime(3L)) + ) + ); + verifyOutput( + outputSuppressed, + asList( + // consecutive updates to v1 get suppressed into only the latter. + new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), + new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), + new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)) + ) + ); + } finally { + driver.close(); + cleanStateAfterTest(driver); + } + } + + @Ignore + @Test + public void shouldSuppressIntermediateEventsWithBytesLimit() throws InterruptedException { + final String testId = "-shouldSuppressIntermediateEventsWithBytesLimit"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<String, Long> valueCounts = buildCountsTable(input, builder); + + valueCounts + // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size. + .suppress(untilTimeLimit(Duration.ofMillis(MAX_VALUE), maxBytes(200L).emitEarlyWhenFull())) + .toStream() + .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); + + valueCounts + .toStream() + .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); + + final KafkaStreams driver = getCleanStartedStreams(appId, builder); + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(3L)) + ) + ); + verifyOutput( + outputRaw, + asList( + new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)), + new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), + new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), + new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), + new KeyValueTimestamp<>("x", 1L, scaledTime(3L)) + ) + ); + verifyOutput( + outputSuppressed, + asList( + // consecutive updates to v1 get suppressed into only the latter. + new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), + new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), + new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)) + ) + ); + } finally { + driver.close(); + cleanStateAfterTest(driver); + } + } + + @Ignore + @Test + public void shouldSupportFinalResultsForTimeWindows() throws InterruptedException { + final String testId = "-shouldSupportFinalResultsForTimeWindows"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<Windowed<String>, Long> valueCounts = builder + .stream(input, + Consumed.with(STRING_SERDE, STRING_SERDE) + ) + .groupBy((String k1, String v1) -> k1, Serialized.with(STRING_SERDE, STRING_SERDE)) + .windowedBy(TimeWindows.of(scaledTime(2L)).grace(scaledTime(1L))) + .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled().withLoggingDisabled()); + + valueCounts + .suppress(untilWindowCloses(unbounded())) + .toStream() + .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v)) + .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); + + valueCounts + .toStream() + .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v)) + .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); + + final KafkaStreams driver = getCleanStartedStreams(appId, builder); + try { + produceSynchronously(input, asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)), + new KeyValueTimestamp<>("k1", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)), + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v1", scaledTime(4L)), + // note this event is dropped since it is out of the grace period + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)) + )); + verifyOutput( + outputRaw, + asList( + new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 1L, scaledTime(0L)), + new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 2L, scaledTime(1L)), + new KeyValueTimestamp<>(scaledWindowKey("k1", 2L, 4L), 1L, scaledTime(2L)), + new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 3L, scaledTime(1L)), + new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 4L, scaledTime(0L)), + new KeyValueTimestamp<>(scaledWindowKey("k1", 4L, 6L), 1L, scaledTime(4L)) + ) + ); + + verifyOutput( + outputSuppressed, + singletonList( + new KeyValueTimestamp<>(scaledWindowKey("k1", 0L, 2L), 4L, scaledTime(0L)) + ) + ); + } finally { + driver.close(); + cleanStateAfterTest(driver); + } + } + + private String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) { + return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), scaledTime(unscaledEnd))).toString(); + } + + private void cleanStateBeforeTest(final String... topics) throws InterruptedException { + CLUSTER.deleteAllTopicsAndWait(TIMEOUT_MS); + for (final String topic : topics) { + CLUSTER.createTopic(topic, 1, 1); } } @@ -167,7 +432,7 @@ private KafkaStreams getCleanStartedStreams(final String appId, final StreamsBui private void cleanStateAfterTest(final KafkaStreams driver) throws InterruptedException { driver.cleanUp(); - CLUSTER.deleteAllTopicsAndWait(30_000L); + CLUSTER.deleteAllTopicsAndWait(TIMEOUT_MS); } private long scaledTime(final long unscaledTime) { @@ -195,13 +460,7 @@ private void produceSynchronously(final String topic, final List<KeyValueTimesta ); futures.add(f); } - for (final Future<RecordMetadata> future : futures) { - try { - future.get(); - } catch (final InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - } + // TODO: test EOS //noinspection ConstantConditions if (false) { @@ -209,6 +468,14 @@ private void produceSynchronously(final String topic, final List<KeyValueTimesta } else { producer.flush(); } + + for (final Future<RecordMetadata> future : futures) { + try { + future.get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java index 53f24b58aac..7650c59759e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java @@ -61,31 +61,31 @@ public void intermediateEventsShouldAcceptAnyBufferAndSetBounds() { assertThat( "time alone should be set", untilTimeLimit(ofMillis(2), unbounded()), - is(new SuppressedImpl<>(ofMillis(2), unbounded(), null)) + is(new SuppressedImpl<>(ofMillis(2), unbounded(), null, false)) ); assertThat( "time and unbounded buffer should be set", untilTimeLimit(ofMillis(2), unbounded()), - is(new SuppressedImpl<>(ofMillis(2), unbounded(), null)) + is(new SuppressedImpl<>(ofMillis(2), unbounded(), null, false)) ); assertThat( "time and keys buffer should be set", untilTimeLimit(ofMillis(2), maxRecords(2)), - is(new SuppressedImpl<>(ofMillis(2), maxRecords(2), null)) + is(new SuppressedImpl<>(ofMillis(2), maxRecords(2), null, false)) ); assertThat( "time and size buffer should be set", untilTimeLimit(ofMillis(2), maxBytes(2)), - is(new SuppressedImpl<>(ofMillis(2), maxBytes(2), null)) + is(new SuppressedImpl<>(ofMillis(2), maxBytes(2), null, false)) ); assertThat( "all constraints should be set", untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)), - is(new SuppressedImpl<>(ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null)) + is(new SuppressedImpl<>(ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null, false)) ); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java new file mode 100644 index 00000000000..a6a888840fd --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/FullChangeSerdeTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.easymock.EasyMock; +import org.junit.Test; + +import static java.util.Collections.emptyMap; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +public class FullChangeSerdeTest { + private final FullChangeSerde<String> serde = new FullChangeSerde<>(Serdes.String()); + + @Test + public void shouldRoundTripNull() { + final byte[] serialized = serde.serializer().serialize(null, null); + assertThat( + serde.deserializer().deserialize(null, serialized), + nullValue() + ); + } + + + @Test + public void shouldRoundTripNullChange() { + final byte[] serialized = serde.serializer().serialize(null, new Change<>(null, null)); + assertThat( + serde.deserializer().deserialize(null, serialized), + is(new Change<>(null, null)) + ); + } + + @Test + public void shouldRoundTripOldNull() { + final byte[] serialized = serde.serializer().serialize(null, new Change<>("new", null)); + assertThat( + serde.deserializer().deserialize(null, serialized), + is(new Change<>("new", null)) + ); + } + + @Test + public void shouldRoundTripNewNull() { + final byte[] serialized = serde.serializer().serialize(null, new Change<>(null, "old")); + assertThat( + serde.deserializer().deserialize(null, serialized), + is(new Change<>(null, "old")) + ); + } + + @Test + public void shouldRoundTripChange() { + final byte[] serialized = serde.serializer().serialize(null, new Change<>("new", "old")); + assertThat( + serde.deserializer().deserialize(null, serialized), + is(new Change<>("new", "old")) + ); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldConfigureSerde() { + final Serde<Void> mock = EasyMock.mock(Serde.class); + mock.configure(emptyMap(), false); + EasyMock.expectLastCall(); + EasyMock.replay(mock); + final FullChangeSerde<Void> serde = new FullChangeSerde<>(mock); + serde.configure(emptyMap(), false); + EasyMock.verify(mock); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCloseSerde() { + final Serde<Void> mock = EasyMock.mock(Serde.class); + mock.close(); + EasyMock.expectLastCall(); + EasyMock.replay(mock); + final FullChangeSerde<Void> serde = new FullChangeSerde<>(mock); + serde.close(); + EasyMock.verify(mock); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldConfigureSerializer() { + final Serde<Void> mockSerde = EasyMock.mock(Serde.class); + final Serializer<Void> mockSerializer = EasyMock.mock(Serializer.class); + EasyMock.expect(mockSerde.serializer()).andReturn(mockSerializer); + EasyMock.replay(mockSerde); + mockSerializer.configure(emptyMap(), false); + EasyMock.expectLastCall(); + EasyMock.replay(mockSerializer); + final Serializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).serializer(); + serializer.configure(emptyMap(), false); + EasyMock.verify(mockSerde); + EasyMock.verify(mockSerializer); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCloseSerializer() { + final Serde<Void> mockSerde = EasyMock.mock(Serde.class); + final Serializer<Void> mockSerializer = EasyMock.mock(Serializer.class); + EasyMock.expect(mockSerde.serializer()).andReturn(mockSerializer); + EasyMock.replay(mockSerde); + mockSerializer.close(); + EasyMock.expectLastCall(); + EasyMock.replay(mockSerializer); + final Serializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).serializer(); + serializer.close(); + EasyMock.verify(mockSerde); + EasyMock.verify(mockSerializer); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldConfigureDeserializer() { + final Serde<Void> mockSerde = EasyMock.mock(Serde.class); + final Deserializer<Void> mockDeserializer = EasyMock.mock(Deserializer.class); + EasyMock.expect(mockSerde.deserializer()).andReturn(mockDeserializer); + EasyMock.replay(mockSerde); + mockDeserializer.configure(emptyMap(), false); + EasyMock.expectLastCall(); + EasyMock.replay(mockDeserializer); + final Deserializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).deserializer(); + serializer.configure(emptyMap(), false); + EasyMock.verify(mockSerde); + EasyMock.verify(mockDeserializer); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCloseDeserializer() { + final Serde<Void> mockSerde = EasyMock.mock(Serde.class); + final Deserializer<Void> mockDeserializer = EasyMock.mock(Deserializer.class); + EasyMock.expect(mockSerde.deserializer()).andReturn(mockDeserializer); + EasyMock.replay(mockSerde); + mockDeserializer.close(); + EasyMock.expectLastCall(); + EasyMock.replay(mockDeserializer); + final Deserializer<Change<Void>> serializer = new FullChangeSerde<>(mockSerde).deserializer(); + serializer.close(); + EasyMock.verify(mockSerde); + EasyMock.verify(mockDeserializer); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java index fead6788eb4..d98a15e093b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.kstream.internals; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; @@ -31,16 +32,24 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.streams.test.OutputVerifier; import org.junit.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; @@ -49,10 +58,14 @@ import java.util.Properties; import static java.time.Duration.ZERO; +import static java.time.Duration.ofMillis; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; public class SuppressScenarioTest { private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer(); @@ -146,6 +159,403 @@ public void shouldImmediatelyEmitEventsWithZeroEmitAfter() { } } + @Test(expected = ProcessorStateException.class) + public void shouldSuppressIntermediateEventsWithTimeLimit() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<String, Long> valueCounts = builder + .table( + "input", + Consumed.with(STRING_SERDE, STRING_SERDE), + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(STRING_SERDE, STRING_SERDE) + .withCachingDisabled() + .withLoggingDisabled() + ) + .groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE)) + .count(); + valueCounts + .suppress(untilTimeLimit(ofMillis(2L), unbounded())) + .toStream() + .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long())); + valueCounts + .toStream() + .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); + final Topology topology = builder.build(); + final Properties config = Utils.mkProperties(Utils.mkMap( + Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), + Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") + )); + final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L)); + driver.pipeInput(recordFactory.create("input", "k2", "v1", 2L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("v1", 1L, 0L), + new KeyValueTimestamp<>("v1", 0L, 1L), + new KeyValueTimestamp<>("v2", 1L, 1L), + new KeyValueTimestamp<>("v1", 1L, 2L) + ) + ); + // note that the current stream time is 2, which causes v1 to age out of the buffer, since + // it has been buffered since time 0 (even though the current version of it in the buffer has timestamp 1) + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList(new KeyValueTimestamp<>("v1", 0L, 1L)) + ); + // inserting a dummy "tick" record just to advance stream time + driver.pipeInput(recordFactory.create("input", "tick", "tick", 3L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList(new KeyValueTimestamp<>("tick", 1L, 3L)) + ); + // the stream time is now 3, so it's time to emit this record + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList(new KeyValueTimestamp<>("v2", 1L, 1L)) + ); + + + driver.pipeInput(recordFactory.create("input", "tick", "tick", 4L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("tick", 0L, 4L), + new KeyValueTimestamp<>("tick", 1L, 4L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList( + new KeyValueTimestamp<>("v1", 1L, 2L) + ) + ); + driver.pipeInput(recordFactory.create("input", "tick", "tick", 5L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("tick", 0L, 5L), + new KeyValueTimestamp<>("tick", 1L, 5L) + ) + ); + // Note that because the punctuate runs before the process call, the tick at time 5 causes + // the previous tick to age out of the buffer, so at this point, we see the prior value emitted + // and the new value is still buffered. + + // Also worth noting is that "tick" ages out because it has been buffered since time 3, even though + // the current timestamp of the buffered record is 4. + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList( + new KeyValueTimestamp<>("tick", 1L, 4L) + ) + ); + } + } + + @Test(expected = ProcessorStateException.class) + public void shouldSuppressIntermediateEventsWithRecordLimit() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<String, Long> valueCounts = builder + .table( + "input", + Consumed.with(STRING_SERDE, STRING_SERDE), + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(STRING_SERDE, STRING_SERDE) + .withCachingDisabled() + .withLoggingDisabled() + ) + .groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE)) + .count(Materialized.with(STRING_SERDE, Serdes.Long())); + valueCounts + .suppress(untilTimeLimit(ofMillis(Long.MAX_VALUE), maxRecords(1L).emitEarlyWhenFull())) + .toStream() + .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long())); + valueCounts + .toStream() + .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); + final Topology topology = builder.build(); + System.out.println(topology.describe()); + final Properties config = Utils.mkProperties(Utils.mkMap( + Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), + Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") + )); + final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L)); + driver.pipeInput(recordFactory.create("input", "k2", "v1", 2L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("v1", 1L, 0L), + new KeyValueTimestamp<>("v1", 0L, 1L), + new KeyValueTimestamp<>("v2", 1L, 1L), + new KeyValueTimestamp<>("v1", 1L, 2L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + // consecutive updates to v1 get suppressed into only the latter. + new KeyValueTimestamp<>("v1", 0L, 1L), + new KeyValueTimestamp<>("v2", 1L, 1L) + // the last update won't be evicted until another key comes along. + ) + ); + driver.pipeInput(recordFactory.create("input", "x", "x", 3L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList( + new KeyValueTimestamp<>("x", 1L, 3L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList( + // now we see that last update to v1, but we won't see the update to x until it gets evicted + new KeyValueTimestamp<>("v1", 1L, 2L) + ) + ); + } + } + + @Test(expected = ProcessorStateException.class) + public void shouldSuppressIntermediateEventsWithBytesLimit() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<String, Long> valueCounts = builder + .table( + "input", + Consumed.with(STRING_SERDE, STRING_SERDE), + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(STRING_SERDE, STRING_SERDE) + .withCachingDisabled() + .withLoggingDisabled() + ) + .groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE)) + .count(); + valueCounts + // this is a bit brittle, but I happen to know that the entries are a little over 100 bytes in size. + .suppress(untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), maxBytes(200L).emitEarlyWhenFull())) + .toStream() + .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long())); + valueCounts + .toStream() + .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); + final Topology topology = builder.build(); + System.out.println(topology.describe()); + final Properties config = Utils.mkProperties(Utils.mkMap( + Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), + Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") + )); + final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L)); + final ConsumerRecord<byte[], byte[]> consumerRecord = recordFactory.create("input", "k2", "v1", 2L); + driver.pipeInput(consumerRecord); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("v1", 1L, 0L), + new KeyValueTimestamp<>("v1", 0L, 1L), + new KeyValueTimestamp<>("v2", 1L, 1L), + new KeyValueTimestamp<>("v1", 1L, 2L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + // consecutive updates to v1 get suppressed into only the latter. + new KeyValueTimestamp<>("v1", 0L, 1L), + new KeyValueTimestamp<>("v2", 1L, 1L) + // the last update won't be evicted until another key comes along. + ) + ); + driver.pipeInput(recordFactory.create("input", "x", "x", 3L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList( + new KeyValueTimestamp<>("x", 1L, 3L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList( + // now we see that last update to v1, but we won't see the update to x until it gets evicted + new KeyValueTimestamp<>("v1", 1L, 2L) + ) + ); + } + } + + @Test(expected = KTableSuppressProcessor.NotImplementedException.class) + public void shouldSupportFinalResultsForTimeWindows() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<Windowed<String>, Long> valueCounts = builder + .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) + .groupBy((String k, String v) -> k, Serialized.with(STRING_SERDE, STRING_SERDE)) + .windowedBy(TimeWindows.of(2L).grace(1L)) + .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled()); + valueCounts + .suppress(untilWindowCloses(unbounded())) + .toStream() + .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v)) + .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long())); + valueCounts + .toStream() + .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v)) + .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); + final Topology topology = builder.build(); + System.out.println(topology.describe()); + final Properties config = Utils.mkProperties(Utils.mkMap( + Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), + Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") + )); + final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 2L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 5L)); + // note this last record gets dropped because it is out of the grace period + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("[k1@0/2]", 1L, 0L), + new KeyValueTimestamp<>("[k1@0/2]", 2L, 1L), + new KeyValueTimestamp<>("[k1@2/4]", 1L, 2L), + new KeyValueTimestamp<>("[k1@0/2]", 3L, 1L), + new KeyValueTimestamp<>("[k1@0/2]", 4L, 0L), + new KeyValueTimestamp<>("[k1@4/6]", 1L, 5L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("[k1@0/2]", 4L, 0L), + new KeyValueTimestamp<>("[k1@2/4]", 1L, 2L) + ) + ); + } + } + + @Test(expected = KTableSuppressProcessor.NotImplementedException.class) + public void shouldSupportFinalResultsForTimeWindowsWithLargeJump() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<Windowed<String>, Long> valueCounts = builder + .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) + .groupBy((String k, String v) -> k, Serialized.with(STRING_SERDE, STRING_SERDE)) + .windowedBy(TimeWindows.of(2L).grace(2L)) + .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled().withKeySerde(STRING_SERDE)); + valueCounts + .suppress(untilWindowCloses(unbounded())) + .toStream() + .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v)) + .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long())); + valueCounts + .toStream() + .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v)) + .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); + final Topology topology = builder.build(); + System.out.println(topology.describe()); + final Properties config = Utils.mkProperties(Utils.mkMap( + Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), + Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") + )); + final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 2L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 3L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 4L)); + // this update should get dropped, since the previous event advanced the stream time and closed the window. + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 30L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("[k1@0/2]", 1L, 0L), + new KeyValueTimestamp<>("[k1@0/2]", 2L, 1L), + new KeyValueTimestamp<>("[k1@2/4]", 1L, 2L), + new KeyValueTimestamp<>("[k1@0/2]", 3L, 0L), + new KeyValueTimestamp<>("[k1@2/4]", 2L, 3L), + new KeyValueTimestamp<>("[k1@0/2]", 4L, 0L), + new KeyValueTimestamp<>("[k1@4/6]", 1L, 4L), + new KeyValueTimestamp<>("[k1@30/32]", 1L, 30L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("[k1@0/2]", 4L, 0L), + new KeyValueTimestamp<>("[k1@2/4]", 2L, 3L), + new KeyValueTimestamp<>("[k1@4/6]", 1L, 4L) + ) + ); + } + } + + @Test(expected = KTableSuppressProcessor.NotImplementedException.class) + public void shouldSupportFinalResultsForSessionWindows() { + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<Windowed<String>, Long> valueCounts = builder + .stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)) + .groupBy((String k, String v) -> k, Serialized.with(STRING_SERDE, STRING_SERDE)) + .windowedBy(SessionWindows.with(5L).grace(5L)) + .count(Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("counts").withCachingDisabled()); + valueCounts + .suppress(untilWindowCloses(unbounded())) + .toStream() + .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v)) + .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long())); + valueCounts + .toStream() + .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v)) + .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); + final Topology topology = builder.build(); + System.out.println(topology.describe()); + final Properties config = Utils.mkProperties(Utils.mkMap( + Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), + Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") + )); + final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { + // first window + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L)); + // new window + driver.pipeInput(recordFactory.create("input", "k1", "v1", 7L)); + // late event for first window - this should get dropped from all streams, since the first window is now closed. + driver.pipeInput(recordFactory.create("input", "k1", "v1", 1L)); + // just pushing stream time forward to flush the other events through. + driver.pipeInput(recordFactory.create("input", "k1", "v1", 30L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("[k1@0/0]", 1L, 0L), + new KeyValueTimestamp<>("[k1@0/0]", null, 1L), + new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L), + new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L), + new KeyValueTimestamp<>("[k1@30/30]", 1L, 30L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("[k1@0/1]", 2L, 1L), + new KeyValueTimestamp<>("[k1@7/7]", 1L, 7L) + ) + ); + } + } + + private <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) { if (results.size() != expectedResults.size()) { throw new AssertionError(printRecords(results) + " != " + expectedResults); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java index 466033316c7..a38d1d58f43 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -16,10 +16,13 @@ */ package org.apache.kafka.streams.kstream.internals.suppress; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.FullChangeSerde; +import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.test.MockInternalProcessorContext; @@ -33,29 +36,31 @@ import static java.time.Duration.ZERO; import static java.time.Duration.ofMillis; +import static org.apache.kafka.common.serialization.Serdes.Long; +import static org.apache.kafka.common.serialization.Serdes.String; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; +import static org.apache.kafka.streams.kstream.WindowedSerdes.sessionWindowedSerdeFrom; +import static org.apache.kafka.streams.kstream.WindowedSerdes.timeWindowedSerdeFrom; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.Assert.fail; @SuppressWarnings("PointlessArithmeticExpression") public class KTableSuppressProcessorTest { - /** - * Use this value to indicate that the test correctness does not depend on any particular number - */ private static final long ARBITRARY_LONG = 5L; - /** - * Use this value to indicate that the test correctness does not depend on any particular window - */ + private static final long ARBITRARY_TIMESTAMP = 1993L; + + private static final Change<Long> ARBITRARY_CHANGE = new Change<>(7L, 14L); + private static final TimeWindow ARBITRARY_WINDOW = new TimeWindow(0L, 100L); @Test public void zeroTimeLimitShouldImmediatelyEmit() { final KTableSuppressProcessor<String, Long> processor = - new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded()))); + new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded())), String(), new FullChangeSerde<>(Long())); final MockInternalProcessorContext context = new MockInternalProcessorContext(); processor.init(context); @@ -64,7 +69,7 @@ public void zeroTimeLimitShouldImmediatelyEmit() { context.setTimestamp(timestamp); context.setStreamTime(timestamp); final String key = "hey"; - final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG); + final Change<Long> value = ARBITRARY_CHANGE; processor.process(key, value); assertThat(context.forwarded(), hasSize(1)); @@ -76,7 +81,11 @@ public void zeroTimeLimitShouldImmediatelyEmit() { @Test public void windowedZeroTimeLimitShouldImmediatelyEmit() { final KTableSuppressProcessor<Windowed<String>, Long> processor = - new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded()))); + new KTableSuppressProcessor<>( + getImpl(untilTimeLimit(ZERO, unbounded())), + timeWindowedSerdeFrom(String.class), + new FullChangeSerde<>(Long()) + ); final MockInternalProcessorContext context = new MockInternalProcessorContext(); processor.init(context); @@ -85,7 +94,7 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() { context.setTimestamp(timestamp); context.setStreamTime(timestamp); final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW); - final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG); + final Change<Long> value = ARBITRARY_CHANGE; processor.process(key, value); assertThat(context.forwarded(), hasSize(1)); @@ -94,21 +103,32 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() { assertThat(capturedForward.timestamp(), is(timestamp)); } - @Test - public void intermediateSuppressionShouldThrow() { + @Test(expected = KTableSuppressProcessor.NotImplementedException.class) + public void intermediateSuppressionShouldBufferAndEmitLater() { final KTableSuppressProcessor<String, Long> processor = - new KTableSuppressProcessor<>(getImpl(untilTimeLimit(Duration.ofMillis(1), unbounded()))); + new KTableSuppressProcessor<>( + getImpl(untilTimeLimit(ofMillis(1), unbounded())), + String(), + new FullChangeSerde<>(Long()) + ); final MockInternalProcessorContext context = new MockInternalProcessorContext(); processor.init(context); - try { - processor.process("hey", new Change<>(null, 1L)); - fail("expected an exception for now"); - } catch (final KTableSuppressProcessor.NotImplementedException e) { - // expected - } + final long timestamp = 0L; + context.setRecordMetadata("topic", 0, 0, null, timestamp); + final String key = "hey"; + final Change<Long> value = new Change<>(null, 1L); + processor.process(key, value); assertThat(context.forwarded(), hasSize(0)); + + assertThat(context.scheduledPunctuators(), hasSize(1)); + context.scheduledPunctuators().get(0).getPunctuator().punctuate(1); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); } @@ -118,49 +138,97 @@ public void intermediateSuppressionShouldThrow() { } - @Test - public void finalResultsSuppressionShouldThrow() { - final KTableSuppressProcessor<Windowed<String>, Long> processor = - new KTableSuppressProcessor<>(finalResults(ofMillis(1))); + @Test(expected = KTableSuppressProcessor.NotImplementedException.class) + public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() { + final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>( + finalResults(ofMillis(1L)), + timeWindowedSerdeFrom(String.class), + new FullChangeSerde<>(Long()) + ); final MockInternalProcessorContext context = new MockInternalProcessorContext(); processor.init(context); - context.setTimestamp(ARBITRARY_LONG); - try { - processor.process(new Windowed<>("hey", ARBITRARY_WINDOW), new Change<>(ARBITRARY_LONG, ARBITRARY_LONG)); - fail("expected an exception for now"); - } catch (final KTableSuppressProcessor.NotImplementedException e) { - // expected - } + final long timestamp = ARBITRARY_TIMESTAMP; + context.setRecordMetadata("topic", 0, 0, null, timestamp); + final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW); + final Change<Long> value = ARBITRARY_CHANGE; + processor.process(key, value); assertThat(context.forwarded(), hasSize(0)); + + assertThat(context.scheduledPunctuators(), hasSize(1)); + context.scheduledPunctuators().get(0).getPunctuator().punctuate(timestamp + 1L); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); } - @Test - public void finalResultsWith0GraceBeforeWindowEndShouldThrow() { - final KTableSuppressProcessor<Windowed<String>, Long> processor = - new KTableSuppressProcessor<>(finalResults(ofMillis(0))); + /** + * Testing a special case of final results: that even with a grace period of 0, + * it will still buffer events and emit only after the end of the window. + * As opposed to emitting immediately the way regular suppresion would with a time limit of 0. + */ + @Test(expected = KTableSuppressProcessor.NotImplementedException.class) + public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() { + final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>( + finalResults(ofMillis(0)), + timeWindowedSerdeFrom(String.class), + new FullChangeSerde<>(Long()) + ); final MockInternalProcessorContext context = new MockInternalProcessorContext(); processor.init(context); final long timestamp = 5L; + context.setRecordMetadata("", 0, 0L, null, timestamp); + final long windowEnd = 100L; + final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, windowEnd)); + final Change<Long> value = ARBITRARY_CHANGE; + processor.process(key, value); + assertThat(context.forwarded(), hasSize(0)); + + assertThat(context.scheduledPunctuators(), hasSize(1)); + context.scheduledPunctuators().get(0).getPunctuator().punctuate(windowEnd); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); + } + + @Test + public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() { + final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>( + finalResults(ofMillis(0)), + timeWindowedSerdeFrom(String.class), + new FullChangeSerde<>(Long()) + ); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 100L; context.setTimestamp(timestamp); + context.setStreamTime(timestamp); final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L)); - final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG); - try { - processor.process(key, value); - fail("expected an exception"); - } catch (final KTableSuppressProcessor.NotImplementedException e) { - // expected - } - assertThat(context.forwarded(), hasSize(0)); + final Change<Long> value = ARBITRARY_CHANGE; + processor.process(key, value); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); } @Test - public void finalResultsWith0GraceAtWindowEndShouldImmediatelyEmit() { - final KTableSuppressProcessor<Windowed<String>, Long> processor = - new KTableSuppressProcessor<>(finalResults(ofMillis(0))); + public void finalResultsShouldSuppressTombstonesForTimeWindows() { + final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>( + finalResults(ofMillis(0)), + timeWindowedSerdeFrom(String.class), + new FullChangeSerde<>(Long()) + ); final MockInternalProcessorContext context = new MockInternalProcessorContext(); processor.init(context); @@ -169,7 +237,100 @@ public void finalResultsWith0GraceAtWindowEndShouldImmediatelyEmit() { context.setTimestamp(timestamp); context.setStreamTime(timestamp); final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L)); - final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG); + final Change<Long> value = new Change<>(null, ARBITRARY_LONG); + processor.process(key, value); + + assertThat(context.forwarded(), hasSize(0)); + } + + @Test + public void finalResultsShouldSuppressTombstonesForSessionWindows() { + final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>( + finalResults(ofMillis(0)), + sessionWindowedSerdeFrom(String.class), + new FullChangeSerde<>(Long()) + ); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 100L; + context.setTimestamp(timestamp); + context.setStreamTime(timestamp); + final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L)); + final Change<Long> value = new Change<>(null, ARBITRARY_LONG); + processor.process(key, value); + + assertThat(context.forwarded(), hasSize(0)); + } + + @SuppressWarnings("unchecked") + @Test + public void suppressShouldNotSuppressTombstonesForTimeWindows() { + final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<Windowed<String>, Long>( + (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)), + timeWindowedSerdeFrom(String.class), + new FullChangeSerde<>(Long()) + ); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 100L; + context.setTimestamp(timestamp); + context.setStreamTime(timestamp); + final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0L, 100L)); + final Change<Long> value = new Change<>(null, ARBITRARY_LONG); + processor.process(key, value); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); + } + + @SuppressWarnings("unchecked") + @Test + public void suppressShouldNotSuppressTombstonesForSessionWindows() { + final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<Windowed<String>, Long>( + (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)), + sessionWindowedSerdeFrom(String.class), + new FullChangeSerde<>(Long()) + ); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 100L; + context.setTimestamp(timestamp); + context.setStreamTime(timestamp); + final Windowed<String> key = new Windowed<>("hey", new SessionWindow(0L, 0L)); + final Change<Long> value = new Change<>(null, ARBITRARY_LONG); + processor.process(key, value); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); + } + + @SuppressWarnings("unchecked") + @Test + public void suppressShouldNotSuppressTombstonesForKTable() { + final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<String, Long>( + (SuppressedImpl) untilTimeLimit(ofMillis(0), maxRecords(0)), + Serdes.String(), + new FullChangeSerde<>(Long()) + ); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 100L; + context.setTimestamp(timestamp); + context.setStreamTime(timestamp); + final String key = "hey"; + final Change<Long> value = new Change<>(null, ARBITRARY_LONG); processor.process(key, value); assertThat(context.forwarded(), hasSize(1)); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-328: Add in-memory Suppression > ---------------------------------- > > Key: KAFKA-7223 > URL: https://issues.apache.org/jira/browse/KAFKA-7223 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: John Roesler > Assignee: John Roesler > Priority: Major > > As described in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.] > > This ticket is to implement Suppress, but only for in-memory buffers. > (depends on KAFKA-7222) -- This message was sent by Atlassian JIRA (v7.6.3#76005)