[ https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16637193#comment-16637193 ]
ASF GitHub Bot commented on KAFKA-7223: --------------------------------------- guozhangwang closed pull request #5724: KAFKA-7223: Make suppression buffer durable URL: https://github.com/apache/kafka/pull/5724 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 3ce962b1926..a12a97a2395 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; +import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode; import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; @@ -43,6 +44,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer; import java.time.Duration; import java.util.Collections; @@ -356,20 +358,24 @@ public String queryableStoreName() { @Override public KTable<K, V> suppress(final Suppressed<K> suppressed) { final String name = builder.newProcessorName(SUPPRESS_NAME); + final String storeName = builder.newStoreName(SUPPRESS_NAME); final ProcessorSupplier<K, Change<V>> suppressionSupplier = () -> new KTableSuppressProcessor<>( buildSuppress(suppressed), + storeName, keySerde, valSerde == null ? null : new FullChangeSerde<>(valSerde) ); - final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>( - suppressionSupplier, - name - ); - final ProcessorGraphNode<K, Change<V>> node = new ProcessorGraphNode<>(name, processorParameters, false); + final ProcessorGraphNode<K, Change<V>> node = new StatefulProcessorNode<>( + name, + new ProcessorParameters<>(suppressionSupplier, name), + null, + new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName), + false + ); builder.addGraphNode(streamsGraphNode, node); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java deleted file mode 100644 index 677a662f79d..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/InMemoryTimeOrderedKeyValueBuffer.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals.suppress; - -import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.state.internals.ContextualRecord; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; -import java.util.TreeMap; -import java.util.function.Consumer; -import java.util.function.Supplier; - -class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer { - private final Map<Bytes, TimeKey> index = new HashMap<>(); - private final TreeMap<TimeKey, ContextualRecord> sortedMap = new TreeMap<>(); - private long memBufferSize = 0L; - private long minTimestamp = Long.MAX_VALUE; - - @Override - public void evictWhile(final Supplier<Boolean> predicate, - final Consumer<KeyValue<Bytes, ContextualRecord>> callback) { - final Iterator<Map.Entry<TimeKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator(); - - if (predicate.get()) { - Map.Entry<TimeKey, ContextualRecord> next = null; - if (delegate.hasNext()) { - next = delegate.next(); - } - - // predicate being true means we read one record, call the callback, and then remove it - while (next != null && predicate.get()) { - callback.accept(new KeyValue<>(next.getKey().key(), next.getValue())); - - delegate.remove(); - index.remove(next.getKey().key()); - - memBufferSize = memBufferSize - computeRecordSize(next.getKey().key(), next.getValue()); - - // peek at the next record so we can update the minTimestamp - if (delegate.hasNext()) { - next = delegate.next(); - minTimestamp = next == null ? Long.MAX_VALUE : next.getKey().time(); - } else { - next = null; - minTimestamp = Long.MAX_VALUE; - } - } - } - } - - @Override - public void put(final long time, - final Bytes key, - final ContextualRecord value) { - // non-resetting semantics: - // if there was a previous version of the same record, - // then insert the new record in the same place in the priority queue - - final TimeKey previousKey = index.get(key); - if (previousKey == null) { - final TimeKey nextKey = new TimeKey(time, key); - index.put(key, nextKey); - sortedMap.put(nextKey, value); - minTimestamp = Math.min(minTimestamp, time); - memBufferSize = memBufferSize + computeRecordSize(key, value); - } else { - final ContextualRecord removedValue = sortedMap.put(previousKey, value); - memBufferSize = - memBufferSize - + computeRecordSize(key, value) - - (removedValue == null ? 0 : computeRecordSize(key, removedValue)); - } - } - - @Override - public int numRecords() { - return index.size(); - } - - @Override - public long bufferSize() { - return memBufferSize; - } - - @Override - public long minTimestamp() { - return minTimestamp; - } - - private long computeRecordSize(final Bytes key, final ContextualRecord value) { - long size = 0L; - size += 8; // buffer time - size += key.get().length; - if (value != null) { - size += value.sizeBytes(); - } - return size; - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java index 57e5066d09e..62c034d4893 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -28,6 +28,9 @@ import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.streams.state.internals.ContextualRecord; +import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer; + +import java.util.Objects; import static java.util.Objects.requireNonNull; @@ -35,25 +38,27 @@ private final long maxRecords; private final long maxBytes; private final long suppressDurationMillis; - private final TimeOrderedKeyValueBuffer buffer; private final TimeDefinition<K> bufferTimeDefinition; private final BufferFullStrategy bufferFullStrategy; private final boolean shouldSuppressTombstones; + private final String storeName; + private TimeOrderedKeyValueBuffer buffer; private InternalProcessorContext internalProcessorContext; private Serde<K> keySerde; - private Serde<Change<V>> valueSerde; + private FullChangeSerde<V> valueSerde; public KTableSuppressProcessor(final SuppressedInternal<K> suppress, + final String storeName, final Serde<K> keySerde, final FullChangeSerde<V> valueSerde) { + this.storeName = storeName; requireNonNull(suppress); this.keySerde = keySerde; this.valueSerde = valueSerde; maxRecords = suppress.getBufferConfig().maxRecords(); maxBytes = suppress.getBufferConfig().maxBytes(); suppressDurationMillis = suppress.getTimeToWaitForMoreEvents().toMillis(); - buffer = new InMemoryTimeOrderedKeyValueBuffer(); bufferTimeDefinition = suppress.getTimeDefinition(); bufferFullStrategy = suppress.getBufferConfig().bufferFullStrategy(); shouldSuppressTombstones = suppress.shouldSuppressTombstones(); @@ -63,8 +68,9 @@ public KTableSuppressProcessor(final SuppressedInternal<K> suppress, @Override public void init(final ProcessorContext context) { internalProcessorContext = (InternalProcessorContext) context; - this.keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde; - this.valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde; + keySerde = keySerde == null ? (Serde<K>) context.keySerde() : keySerde; + valueSerde = valueSerde == null ? FullChangeSerde.castOrWrap(context.valueSerde()) : valueSerde; + buffer = Objects.requireNonNull((TimeOrderedKeyValueBuffer) context.getStateStore(storeName)); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java deleted file mode 100644 index d3ad350686a..00000000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeKey.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals.suppress; - -import org.apache.kafka.common.utils.Bytes; - -import java.util.Objects; - -class TimeKey implements Comparable<TimeKey> { - private final long time; - private final Bytes key; - - TimeKey(final long time, final Bytes key) { - this.time = time; - this.key = key; - } - - Bytes key() { - return key; - } - - long time() { - return time; - } - - @Override - public boolean equals(final Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - final TimeKey timeKey = (TimeKey) o; - return time == timeKey.time && - Objects.equals(key, timeKey.key); - } - - @Override - public int hashCode() { - return Objects.hash(time, key); - } - - @Override - public int compareTo(final TimeKey o) { - // ordering of keys within a time uses hashCode. - final int timeComparison = Long.compare(time, o.time); - return timeComparison == 0 ? key.compareTo(o.key) : timeComparison; - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java new file mode 100644 index 00000000000..3ac6fc87073 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryTimeOrderedKeyValueBuffer.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.BytesSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.ProcessorStateManager; +import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.state.StoreBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; +import java.util.function.Consumer; +import java.util.function.Supplier; + +public class InMemoryTimeOrderedKeyValueBuffer implements TimeOrderedKeyValueBuffer { + private static final Logger LOG = LoggerFactory.getLogger(InMemoryTimeOrderedKeyValueBuffer.class); + private static final BytesSerializer KEY_SERIALIZER = new BytesSerializer(); + private static final ByteArraySerializer VALUE_SERIALIZER = new ByteArraySerializer(); + + private final Map<Bytes, BufferKey> index = new HashMap<>(); + private final TreeMap<BufferKey, ContextualRecord> sortedMap = new TreeMap<>(); + + private final Set<Bytes> dirtyKeys = new HashSet<>(); + private final String storeName; + private final boolean loggingEnabled; + + private long memBufferSize = 0L; + private long minTimestamp = Long.MAX_VALUE; + private RecordCollector collector; + private String changelogTopic; + + private volatile boolean open; + + public static class Builder implements StoreBuilder<StateStore> { + + private final String storeName; + private boolean loggingEnabled = true; + + public Builder(final String storeName) { + this.storeName = storeName; + } + + @Override + public StoreBuilder<StateStore> withCachingEnabled() { + throw new UnsupportedOperationException(); + } + + @Override + public StoreBuilder<StateStore> withCachingDisabled() { + throw new UnsupportedOperationException(); + } + + @Override + public StoreBuilder<StateStore> withLoggingEnabled(final Map<String, String> config) { + throw new UnsupportedOperationException(); + } + + @Override + public StoreBuilder<StateStore> withLoggingDisabled() { + loggingEnabled = false; + return this; + } + + @Override + public StateStore build() { + return new InMemoryTimeOrderedKeyValueBuffer(storeName, loggingEnabled); + } + + @Override + public Map<String, String> logConfig() { + return Collections.emptyMap(); + } + + @Override + public boolean loggingEnabled() { + return loggingEnabled; + } + + @Override + public String name() { + return storeName; + } + } + + private static class BufferKey implements Comparable<BufferKey> { + private final long time; + private final Bytes key; + + private BufferKey(final long time, final Bytes key) { + this.time = time; + this.key = key; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final BufferKey bufferKey = (BufferKey) o; + return time == bufferKey.time && + Objects.equals(key, bufferKey.key); + } + + @Override + public int hashCode() { + return Objects.hash(time, key); + } + + @Override + public int compareTo(final BufferKey o) { + // ordering of keys within a time uses hashCode. + final int timeComparison = Long.compare(time, o.time); + return timeComparison == 0 ? key.compareTo(o.key) : timeComparison; + } + } + + private InMemoryTimeOrderedKeyValueBuffer(final String storeName, final boolean loggingEnabled) { + this.storeName = storeName; + this.loggingEnabled = loggingEnabled; + } + + @Override + public String name() { + return storeName; + } + + + @Override + public boolean persistent() { + return false; + } + + @Override + public void init(final ProcessorContext context, final StateStore root) { + context.register(root, (RecordBatchingStateRestoreCallback) this::restoreBatch); + if (loggingEnabled) { + collector = ((RecordCollector.Supplier) context).recordCollector(); + changelogTopic = ProcessorStateManager.storeChangelogTopic(context.applicationId(), storeName); + } + open = true; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public void close() { + index.clear(); + sortedMap.clear(); + dirtyKeys.clear(); + memBufferSize = 0; + minTimestamp = Long.MAX_VALUE; + open = false; + } + + @Override + public void flush() { + if (loggingEnabled) { + // counting on this getting called before the record collector's flush + for (final Bytes key : dirtyKeys) { + + final BufferKey bufferKey = index.get(key); + + if (bufferKey == null) { + // The record was evicted from the buffer. Send a tombstone. + collector.send(changelogTopic, key, null, null, null, null, KEY_SERIALIZER, VALUE_SERIALIZER); + } else { + final ContextualRecord value = sortedMap.get(bufferKey); + + final byte[] innerValue = value.value(); + final byte[] timeAndValue = ByteBuffer.wrap(new byte[8 + innerValue.length]) + .putLong(bufferKey.time) + .put(innerValue) + .array(); + + final ProcessorRecordContext recordContext = value.recordContext(); + collector.send( + changelogTopic, + key, + timeAndValue, + recordContext.headers(), + recordContext.partition(), + recordContext.timestamp(), + KEY_SERIALIZER, + VALUE_SERIALIZER + ); + } + } + dirtyKeys.clear(); + } + } + + private void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> batch) { + for (final ConsumerRecord<byte[], byte[]> record : batch) { + final Bytes key = Bytes.wrap(record.key()); + if (record.value() == null) { + // This was a tombstone. Delete the record. + final BufferKey bufferKey = index.remove(key); + if (bufferKey != null) { + sortedMap.remove(bufferKey); + } + } else { + final ByteBuffer timeAndValue = ByteBuffer.wrap(record.value()); + final long time = timeAndValue.getLong(); + final byte[] value = new byte[record.value().length - 8]; + timeAndValue.get(value); + + cleanPut( + time, + key, + new ContextualRecord( + value, + new ProcessorRecordContext( + record.timestamp(), + record.offset(), + record.partition(), + record.topic(), + record.headers() + ) + ) + ); + } + } + } + + + @Override + public void evictWhile(final Supplier<Boolean> predicate, + final Consumer<KeyValue<Bytes, ContextualRecord>> callback) { + final Iterator<Map.Entry<BufferKey, ContextualRecord>> delegate = sortedMap.entrySet().iterator(); + + if (predicate.get()) { + Map.Entry<BufferKey, ContextualRecord> next = null; + if (delegate.hasNext()) { + next = delegate.next(); + } + + // predicate being true means we read one record, call the callback, and then remove it + while (next != null && predicate.get()) { + callback.accept(new KeyValue<>(next.getKey().key, next.getValue())); + + delegate.remove(); + index.remove(next.getKey().key); + + dirtyKeys.add(next.getKey().key); + + memBufferSize = memBufferSize - computeRecordSize(next.getKey().key, next.getValue()); + + // peek at the next record so we can update the minTimestamp + if (delegate.hasNext()) { + next = delegate.next(); + minTimestamp = next == null ? Long.MAX_VALUE : next.getKey().time; + } else { + next = null; + minTimestamp = Long.MAX_VALUE; + } + } + } + } + + @Override + public void put(final long time, + final Bytes key, + final ContextualRecord value) { + cleanPut(time, key, value); + dirtyKeys.add(key); + } + + private void cleanPut(final long time, final Bytes key, final ContextualRecord value) { + // non-resetting semantics: + // if there was a previous version of the same record, + // then insert the new record in the same place in the priority queue + + final BufferKey previousKey = index.get(key); + if (previousKey == null) { + final BufferKey nextKey = new BufferKey(time, key); + index.put(key, nextKey); + sortedMap.put(nextKey, value); + minTimestamp = Math.min(minTimestamp, time); + memBufferSize = memBufferSize + computeRecordSize(key, value); + } else { + final ContextualRecord removedValue = sortedMap.put(previousKey, value); + memBufferSize = + memBufferSize + + computeRecordSize(key, value) + - (removedValue == null ? 0 : computeRecordSize(key, removedValue)); + } + } + + @Override + public int numRecords() { + return index.size(); + } + + @Override + public long bufferSize() { + return memBufferSize; + } + + @Override + public long minTimestamp() { + return minTimestamp; + } + + private long computeRecordSize(final Bytes key, final ContextualRecord value) { + long size = 0L; + size += 8; // buffer time + size += key.get().length; + if (value != null) { + size += value.sizeBytes(); + } + return size; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java similarity index 87% rename from streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java rename to streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java index 98a4f63c83f..86a8c1e651d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/TimeOrderedKeyValueBuffer.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBuffer.java @@ -14,16 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.streams.kstream.internals.suppress; +package org.apache.kafka.streams.state.internals; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.state.internals.ContextualRecord; +import org.apache.kafka.streams.processor.StateStore; import java.util.function.Consumer; import java.util.function.Supplier; -interface TimeOrderedKeyValueBuffer { +public interface TimeOrderedKeyValueBuffer extends StateStore { void evictWhile(final Supplier<Boolean> predicate, final Consumer<KeyValue<Bytes, ContextualRecord>> callback); void put(final long time, final Bytes key, final ContextualRecord value); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java new file mode 100644 index 00000000000..93ecc53a7fc --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.java @@ -0,0 +1,260 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.IntegrationTest; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.lang.Long.MAX_VALUE; +import static java.time.Duration.ofMillis; +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; +import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStartedStreams; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; +import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +@RunWith(Parameterized.class) +@Category({IntegrationTest.class}) +public class SuppressionDurabilityIntegrationTest { + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3); + private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer(); + private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); + private static final Serde<String> STRING_SERDE = Serdes.String(); + private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); + private static final int COMMIT_INTERVAL = 100; + private final boolean eosEnabled; + + public SuppressionDurabilityIntegrationTest(final boolean eosEnabled) { + this.eosEnabled = eosEnabled; + } + + @Parameters(name = "{index}: eosEnabled={0}") + public static Collection<Object[]> parameters() { + return Arrays.asList(new Object[] {false}, new Object[] {true}); + } + + private KTable<String, Long> buildCountsTable(final String input, final StreamsBuilder builder) { + return builder + .table( + input, + Consumed.with(STRING_SERDE, STRING_SERDE), + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(STRING_SERDE, STRING_SERDE) + .withCachingDisabled() + .withLoggingDisabled() + ) + .groupBy((k, v) -> new KeyValue<>(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)) + .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts").withCachingDisabled()); + } + + @Test + public void shouldRecoverBufferAfterShutdown() { + final String testId = "-shouldRecoverBufferAfterShutdown"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); + + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<String, Long> valueCounts = buildCountsTable(input, builder); + + final KStream<String, Long> suppressedCounts = valueCounts + .suppress(untilTimeLimit(ofMillis(MAX_VALUE), maxRecords(3L).emitEarlyWhenFull())) + .toStream(); + + final AtomicInteger eventCount = new AtomicInteger(0); + suppressedCounts.foreach((key, value) -> eventCount.incrementAndGet()); + + suppressedCounts + .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); + + valueCounts + .toStream() + .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); + + final Properties streamsConfig = mkProperties(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosEnabled ? EXACTLY_ONCE : AT_LEAST_ONCE) + )); + + KafkaStreams driver = getStartedStreams(streamsConfig, builder, true); + try { + // start by putting some stuff in the buffer + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v2", scaledTime(2L)), + new KeyValueTimestamp<>("k3", "v3", scaledTime(3L)) + ) + ); + verifyOutput( + outputRaw, + asList( + new KeyValueTimestamp<>("v1", 1L, scaledTime(1L)), + new KeyValueTimestamp<>("v2", 1L, scaledTime(2L)), + new KeyValueTimestamp<>("v3", 1L, scaledTime(3L)) + ) + ); + assertThat(eventCount.get(), is(0)); + + // flush two of the first three events out. + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k4", "v4", scaledTime(4L)), + new KeyValueTimestamp<>("k5", "v5", scaledTime(5L)) + ) + ); + verifyOutput( + outputRaw, + asList( + new KeyValueTimestamp<>("v4", 1L, scaledTime(4L)), + new KeyValueTimestamp<>("v5", 1L, scaledTime(5L)) + ) + ); + assertThat(eventCount.get(), is(2)); + verifyOutput( + outputSuppressed, + asList( + new KeyValueTimestamp<>("v1", 1L, scaledTime(1L)), + new KeyValueTimestamp<>("v2", 1L, scaledTime(2L)) + ) + ); + + // bounce to ensure that the history, including retractions, + // get restored properly. (i.e., we shouldn't see those first events again) + + // restart the driver + driver.close(); + assertThat(driver.state(), is(KafkaStreams.State.NOT_RUNNING)); + driver = getStartedStreams(streamsConfig, builder, false); + + + // flush those recovered buffered events out. + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k6", "v6", scaledTime(6L)), + new KeyValueTimestamp<>("k7", "v7", scaledTime(7L)), + new KeyValueTimestamp<>("k8", "v8", scaledTime(8L)) + ) + ); + verifyOutput( + outputRaw, + asList( + new KeyValueTimestamp<>("v6", 1L, scaledTime(6L)), + new KeyValueTimestamp<>("v7", 1L, scaledTime(7L)), + new KeyValueTimestamp<>("v8", 1L, scaledTime(8L)) + ) + ); + assertThat(eventCount.get(), is(5)); + verifyOutput( + outputSuppressed, + asList( + new KeyValueTimestamp<>("v3", 1L, scaledTime(3L)), + new KeyValueTimestamp<>("v4", 1L, scaledTime(4L)), + new KeyValueTimestamp<>("v5", 1L, scaledTime(5L)) + ) + ); + + } finally { + driver.close(); + cleanStateAfterTest(CLUSTER, driver); + } + } + + private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) { + final Properties properties = mkProperties( + mkMap( + mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"), + mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()), + mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName()) + ) + ); + IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps); + + } + + /** + * scaling to ensure that there are commits in between the various test events, + * just to exercise that everything works properly in the presence of commits. + */ + private long scaledTime(final long unscaledTime) { + return COMMIT_INTERVAL * 2 * unscaledTime; + } + + private void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) { + final Properties producerConfig = mkProperties(mkMap( + mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"), + mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()), + mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()), + mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()) + )); + IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, toProduce); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java index a9920e3a6f1..208f1eb3c50 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -17,16 +17,12 @@ package org.apache.kafka.streams.integration; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.LongDeserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; @@ -38,10 +34,10 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; -import org.apache.kafka.streams.kstream.Serialized; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.TimeWindow; @@ -53,14 +49,9 @@ import org.junit.experimental.categories.Category; import java.time.Duration; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Locale; -import java.util.Objects; import java.util.Properties; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import static java.lang.Long.MAX_VALUE; import static java.time.Duration.ofMillis; @@ -69,6 +60,9 @@ import static org.apache.kafka.common.utils.Utils.mkEntry; import static org.apache.kafka.common.utils.Utils.mkMap; import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.StreamsConfig.AT_LEAST_ONCE; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateBeforeTest; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; @@ -87,18 +81,17 @@ private static final Serde<String> STRING_SERDE = Serdes.String(); private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); private static final int COMMIT_INTERVAL = 100; - private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2; private static final long TIMEOUT_MS = 30_000L; @Test - public void shouldSuppressIntermediateEventsWithEmitAfter() throws InterruptedException { + public void shouldSuppressIntermediateEventsWithEmitAfter() { final String testId = "-shouldSuppressIntermediateEventsWithEmitAfter"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; final String outputSuppressed = "output-suppressed" + testId; final String outputRaw = "output-raw" + testId; - cleanStateBeforeTest(input, outputSuppressed, outputRaw); + cleanStateBeforeTest(CLUSTER, input, outputSuppressed, outputRaw); final StreamsBuilder builder = new StreamsBuilder(); final KTable<String, Long> valueCounts = buildCountsTable(input, builder); @@ -112,7 +105,8 @@ public void shouldSuppressIntermediateEventsWithEmitAfter() throws InterruptedEx .toStream() .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - final KafkaStreams driver = getCleanStartedStreams(appId, builder); + final Properties streamsConfig = getStreamsConfig(appId); + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); try { produceSynchronously( @@ -144,7 +138,7 @@ public void shouldSuppressIntermediateEventsWithEmitAfter() throws InterruptedEx ); } finally { driver.close(); - cleanStateAfterTest(driver); + cleanStateAfterTest(CLUSTER, driver); } } @@ -157,19 +151,19 @@ public void shouldSuppressIntermediateEventsWithEmitAfter() throws InterruptedEx .withCachingDisabled() .withLoggingDisabled() ) - .groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE)) + .groupBy((k, v) -> new KeyValue<>(v, k), Grouped.with(STRING_SERDE, STRING_SERDE)) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts").withCachingDisabled()); } @Test - public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws InterruptedException { + public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() { final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; final String outputSuppressed = "output-suppressed" + testId; final String outputRaw = "output-raw" + testId; - cleanStateBeforeTest(input, outputSuppressed, outputRaw); + cleanStateBeforeTest(CLUSTER, input, outputSuppressed, outputRaw); final StreamsBuilder builder = new StreamsBuilder(); final KTable<String, Long> valueCounts = buildCountsTable(input, builder); @@ -183,7 +177,8 @@ public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws Interr .toStream() .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - final KafkaStreams driver = getCleanStartedStreams(appId, builder); + final Properties streamsConfig = getStreamsConfig(appId); + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); try { produceSynchronously( @@ -217,19 +212,19 @@ public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws Interr ); } finally { driver.close(); - cleanStateAfterTest(driver); + cleanStateAfterTest(CLUSTER, driver); } } @Test - public void shouldSuppressIntermediateEventsWithRecordLimit() throws InterruptedException { + public void shouldSuppressIntermediateEventsWithRecordLimit() { final String testId = "-shouldSuppressIntermediateEventsWithRecordLimit"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; final String outputSuppressed = "output-suppressed" + testId; final String outputRaw = "output-raw" + testId; - cleanStateBeforeTest(input, outputRaw, outputSuppressed); + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); final StreamsBuilder builder = new StreamsBuilder(); final KTable<String, Long> valueCounts = buildCountsTable(input, builder); @@ -243,7 +238,8 @@ public void shouldSuppressIntermediateEventsWithRecordLimit() throws Interrupted .toStream() .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - final KafkaStreams driver = getCleanStartedStreams(appId, builder); + final Properties streamsConfig = getStreamsConfig(appId); + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); try { produceSynchronously( input, @@ -275,7 +271,7 @@ public void shouldSuppressIntermediateEventsWithRecordLimit() throws Interrupted ); } finally { driver.close(); - cleanStateAfterTest(driver); + cleanStateAfterTest(CLUSTER, driver); } } @@ -287,7 +283,7 @@ public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedExc final String outputSuppressed = "output-suppressed" + testId; final String outputRaw = "output-raw" + testId; - cleanStateBeforeTest(input, outputRaw, outputSuppressed); + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); final StreamsBuilder builder = new StreamsBuilder(); final KTable<String, Long> valueCounts = buildCountsTable(input, builder); @@ -301,7 +297,8 @@ public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedExc .toStream() .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - final KafkaStreams driver = getCleanStartedStreams(appId, builder); + final Properties streamsConfig = getStreamsConfig(appId); + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); try { produceSynchronously( input, @@ -315,19 +312,19 @@ public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedExc verifyErrorShutdown(driver); } finally { driver.close(); - cleanStateAfterTest(driver); + cleanStateAfterTest(CLUSTER, driver); } } @Test - public void shouldSuppressIntermediateEventsWithBytesLimit() throws InterruptedException { + public void shouldSuppressIntermediateEventsWithBytesLimit() { final String testId = "-shouldSuppressIntermediateEventsWithBytesLimit"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; final String outputSuppressed = "output-suppressed" + testId; final String outputRaw = "output-raw" + testId; - cleanStateBeforeTest(input, outputRaw, outputSuppressed); + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); final StreamsBuilder builder = new StreamsBuilder(); final KTable<String, Long> valueCounts = buildCountsTable(input, builder); @@ -342,7 +339,8 @@ public void shouldSuppressIntermediateEventsWithBytesLimit() throws InterruptedE .toStream() .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - final KafkaStreams driver = getCleanStartedStreams(appId, builder); + final Properties streamsConfig = getStreamsConfig(appId); + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); try { produceSynchronously( input, @@ -374,7 +372,7 @@ public void shouldSuppressIntermediateEventsWithBytesLimit() throws InterruptedE ); } finally { driver.close(); - cleanStateAfterTest(driver); + cleanStateAfterTest(CLUSTER, driver); } } @@ -386,7 +384,7 @@ public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedExce final String outputSuppressed = "output-suppressed" + testId; final String outputRaw = "output-raw" + testId; - cleanStateBeforeTest(input, outputRaw, outputSuppressed); + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); final StreamsBuilder builder = new StreamsBuilder(); final KTable<String, Long> valueCounts = buildCountsTable(input, builder); @@ -401,7 +399,8 @@ public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedExce .toStream() .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - final KafkaStreams driver = getCleanStartedStreams(appId, builder); + final Properties streamsConfig = getStreamsConfig(appId); + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); try { produceSynchronously( input, @@ -415,26 +414,26 @@ public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedExce verifyErrorShutdown(driver); } finally { driver.close(); - cleanStateAfterTest(driver); + cleanStateAfterTest(CLUSTER, driver); } } @Test - public void shouldSupportFinalResultsForTimeWindows() throws InterruptedException { + public void shouldSupportFinalResultsForTimeWindows() { final String testId = "-shouldSupportFinalResultsForTimeWindows"; final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; final String input = "input" + testId; final String outputSuppressed = "output-suppressed" + testId; final String outputRaw = "output-raw" + testId; - cleanStateBeforeTest(input, outputRaw, outputSuppressed); + cleanStateBeforeTest(CLUSTER, input, outputRaw, outputSuppressed); final StreamsBuilder builder = new StreamsBuilder(); final KTable<Windowed<String>, Long> valueCounts = builder .stream(input, Consumed.with(STRING_SERDE, STRING_SERDE) ) - .groupBy((String k1, String v1) -> k1, Serialized.with(STRING_SERDE, STRING_SERDE)) + .groupBy((String k1, String v1) -> k1, Grouped.with(STRING_SERDE, STRING_SERDE)) .windowedBy(TimeWindows.of(scaledTime(2L)).grace(scaledTime(1L))) .count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("counts").withCachingDisabled().withLoggingDisabled()); @@ -449,7 +448,8 @@ public void shouldSupportFinalResultsForTimeWindows() throws InterruptedExceptio .map((final Windowed<String> k, final Long v) -> new KeyValue<>(k.toString(), v)) .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); - final KafkaStreams driver = getCleanStartedStreams(appId, builder); + final Properties streamsConfig = getStreamsConfig(appId); + final KafkaStreams driver = IntegrationTestUtils.getStartedStreams(streamsConfig, builder, true); try { produceSynchronously(input, asList( new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), @@ -481,81 +481,40 @@ public void shouldSupportFinalResultsForTimeWindows() throws InterruptedExceptio ); } finally { driver.close(); - cleanStateAfterTest(driver); + cleanStateAfterTest(CLUSTER, driver); } } - private String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) { - return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), scaledTime(unscaledEnd))).toString(); - } - - private void cleanStateBeforeTest(final String... topics) throws InterruptedException { - CLUSTER.deleteAllTopicsAndWait(TIMEOUT_MS); - for (final String topic : topics) { - CLUSTER.createTopic(topic, 1, 1); - } - } - - private KafkaStreams getCleanStartedStreams(final String appId, final StreamsBuilder builder) { - final Properties streamsConfig = mkProperties(mkMap( + private Properties getStreamsConfig(final String appId) { + return mkProperties(mkMap( mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), - mkEntry(StreamsConfig.POLL_MS_CONFIG, Objects.toString(COMMIT_INTERVAL)), - mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Objects.toString(COMMIT_INTERVAL)) + mkEntry(StreamsConfig.POLL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Integer.toString(COMMIT_INTERVAL)), + mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, AT_LEAST_ONCE) )); - final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig); - driver.cleanUp(); - driver.start(); - return driver; } - private void cleanStateAfterTest(final KafkaStreams driver) throws InterruptedException { - driver.cleanUp(); - CLUSTER.deleteAllTopicsAndWait(TIMEOUT_MS); + private String scaledWindowKey(final String key, final long unscaledStart, final long unscaledEnd) { + return new Windowed<>(key, new TimeWindow(scaledTime(unscaledStart), scaledTime(unscaledEnd))).toString(); } + /** + * scaling to ensure that there are commits in between the various test events, + * just to exercise that everything works properly in the presence of commits. + */ private long scaledTime(final long unscaledTime) { - return SCALE_FACTOR * unscaledTime; + return COMMIT_INTERVAL * 2 * unscaledTime; } private void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) { final Properties producerConfig = mkProperties(mkMap( mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"), - mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER.getClass().getName()), - mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER.getClass().getName()), + mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()), + mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ((Serializer<String>) STRING_SERIALIZER).getClass().getName()), mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()) )); - try (final Producer<String, String> producer = new KafkaProducer<>(producerConfig)) { - // TODO: test EOS - //noinspection ConstantConditions - if (false) { - producer.initTransactions(); - producer.beginTransaction(); - } - final LinkedList<Future<RecordMetadata>> futures = new LinkedList<>(); - for (final KeyValueTimestamp<String, String> record : toProduce) { - final Future<RecordMetadata> f = producer.send( - new ProducerRecord<>(topic, null, record.timestamp(), record.key(), record.value(), null) - ); - futures.add(f); - } - - // TODO: test EOS - //noinspection ConstantConditions - if (false) { - producer.commitTransaction(); - } else { - producer.flush(); - } - - for (final Future<RecordMetadata> future : futures) { - try { - future.get(); - } catch (final InterruptedException | ExecutionException e) { - throw new RuntimeException(e); - } - } - } + IntegrationTestUtils.produceSynchronously(producerConfig, false, topic, toProduce); } private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedException { @@ -563,69 +522,16 @@ private void verifyErrorShutdown(final KafkaStreams driver) throws InterruptedEx assertThat(driver.state(), is(KafkaStreams.State.ERROR)); } - private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> expected) { - final List<ConsumerRecord<String, Long>> results; - try { - final Properties properties = mkProperties( - mkMap( - mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"), - mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), - mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()), - mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName()) - ) - ); - results = IntegrationTestUtils.waitUntilMinRecordsReceived(properties, topic, expected.size()); - } catch (final InterruptedException e) { - throw new RuntimeException(e); - } - - if (results.size() != expected.size()) { - throw new AssertionError(printRecords(results) + " != " + expected); - } - final Iterator<KeyValueTimestamp<String, Long>> expectedIterator = expected.iterator(); - for (final ConsumerRecord<String, Long> result : results) { - final KeyValueTimestamp<String, Long> expected1 = expectedIterator.next(); - try { - compareKeyValueTimestamp(result, expected1.key(), expected1.value(), expected1.timestamp()); - } catch (final AssertionError e) { - throw new AssertionError(printRecords(results) + " != " + expected, e); - } - } - } - - private <K, V> void compareKeyValueTimestamp(final ConsumerRecord<K, V> record, final K expectedKey, final V expectedValue, final long expectedTimestamp) { - Objects.requireNonNull(record); - final K recordKey = record.key(); - final V recordValue = record.value(); - final long recordTimestamp = record.timestamp(); - final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp + - " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp); - if (recordKey != null) { - if (!recordKey.equals(expectedKey)) { - throw error; - } - } else if (expectedKey != null) { - throw error; - } - if (recordValue != null) { - if (!recordValue.equals(expectedValue)) { - throw error; - } - } else if (expectedValue != null) { - throw error; - } - if (recordTimestamp != expectedTimestamp) { - throw error; - } - } + private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> keyValueTimestamps) { + final Properties properties = mkProperties( + mkMap( + mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"), + mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()), + mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName()) + ) + ); + IntegrationTestUtils.verifyKeyValueTimestamps(properties, topic, keyValueTimestamps); - private <K, V> String printRecords(final List<ConsumerRecord<K, V>> result) { - final StringBuilder resultStr = new StringBuilder(); - resultStr.append("[\n"); - for (final ConsumerRecord<?, ?> record : result) { - resultStr.append(" ").append(record.toString()).append("\n"); - } - resultStr.append("]"); - return resultStr.toString(); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 985b57f4cd5..8bca79f8d83 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.streams.integration.utils; +import kafka.api.Request; +import kafka.server.KafkaServer; +import kafka.server.MetadataCache; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -33,11 +36,14 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.ThreadStateTransitionValidator; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; +import scala.Option; import java.io.File; import java.io.IOException; @@ -47,24 +53,23 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.stream.Collectors; -import kafka.api.Request; -import kafka.server.KafkaServer; -import kafka.server.MetadataCache; -import scala.Option; - /** * Utility functions to make integration testing more convenient. */ public class IntegrationTestUtils { public static final long DEFAULT_TIMEOUT = 30 * 1000L; + private static final long DEFAULT_COMMIT_INTERVAL = 100L; public static final String INTERNAL_LEAVE_GROUP_ON_CLOSE = "internal.leave.group.on.close"; /* @@ -112,6 +117,26 @@ public static void purgeLocalStreamsState(final Properties streamsConfiguration) } } + public static void cleanStateBeforeTest(final EmbeddedKafkaCluster cluster, final String... topics) { + try { + cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT); + for (final String topic : topics) { + cluster.createTopic(topic, 1, 1); + } + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + + public static void cleanStateAfterTest(final EmbeddedKafkaCluster cluster, final KafkaStreams driver) { + driver.cleanUp(); + try { + cluster.deleteAllTopicsAndWait(DEFAULT_TIMEOUT); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + } + /** * @param topic Kafka topic to write the data records to * @param records Data records to write to Kafka @@ -171,15 +196,6 @@ public static void purgeLocalStreamsState(final Properties streamsConfiguration) IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, timestamp, false); } - public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic, - final Collection<KeyValue<K, V>> records, - final Properties producerConfig, - final Headers headers, - final Long timestamp) - throws ExecutionException, InterruptedException { - IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, records, producerConfig, headers, timestamp, false); - } - public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, @@ -212,7 +228,42 @@ public static void purgeLocalStreamsState(final Properties streamsConfiguration) producer.flush(); } } - + + public static <V, K> void produceSynchronously(final Properties producerConfig, + final boolean eos, + final String topic, + final List<KeyValueTimestamp<K, V>> toProduce) { + try (final Producer<K, V> producer = new KafkaProducer<>(producerConfig)) { + // TODO: test EOS + //noinspection ConstantConditions + if (false) { + producer.initTransactions(); + producer.beginTransaction(); + } + final LinkedList<Future<RecordMetadata>> futures = new LinkedList<>(); + for (final KeyValueTimestamp<K, V> record : toProduce) { + final Future<RecordMetadata> f = producer.send( + new ProducerRecord<>(topic, null, record.timestamp(), record.key(), record.value(), null) + ); + futures.add(f); + } + + if (eos) { + producer.commitTransaction(); + } else { + producer.flush(); + } + + for (final Future<RecordMetadata> future : futures) { + try { + future.get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + } + } + public static <K, V> void produceAbortedKeyValuesSynchronouslyWithTimestamp(final String topic, final Collection<KeyValue<K, V>> records, final Properties producerConfig, @@ -227,7 +278,7 @@ public static void purgeLocalStreamsState(final Properties streamsConfiguration) f.get(); producer.abortTransaction(); } - } + } } public static <V> void produceValuesSynchronously(final String topic, @@ -297,7 +348,7 @@ public static void waitForCompletion(final KafkaStreams streams, final int expectedNumRecords) throws InterruptedException { return waitUntilMinKeyValueRecordsReceived(consumerConfig, topic, expectedNumRecords, DEFAULT_TIMEOUT); } - + /** * Wait until enough data (key-value records) has been consumed. * @@ -483,6 +534,70 @@ public static void waitUntilMetadataIsPropagated(final List<KafkaServer> servers } + public static void verifyKeyValueTimestamps(final Properties consumerConfig, + final String topic, + final List<KeyValueTimestamp<String, Long>> expected) { + + final List<ConsumerRecord<String, Long>> results; + try { + results = IntegrationTestUtils.waitUntilMinRecordsReceived(consumerConfig, topic, expected.size()); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + + if (results.size() != expected.size()) { + throw new AssertionError(printRecords(results) + " != " + expected); + } + final Iterator<KeyValueTimestamp<String, Long>> expectedIterator = expected.iterator(); + for (final ConsumerRecord<String, Long> result : results) { + final KeyValueTimestamp<String, Long> expected1 = expectedIterator.next(); + try { + compareKeyValueTimestamp(result, expected1.key(), expected1.value(), expected1.timestamp()); + } catch (final AssertionError e) { + throw new AssertionError(printRecords(results) + " != " + expected, e); + } + } + } + + private static <K, V> void compareKeyValueTimestamp(final ConsumerRecord<K, V> record, + final K expectedKey, + final V expectedValue, + final long expectedTimestamp) { + Objects.requireNonNull(record); + final K recordKey = record.key(); + final V recordValue = record.value(); + final long recordTimestamp = record.timestamp(); + final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp + + " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp); + if (recordKey != null) { + if (!recordKey.equals(expectedKey)) { + throw error; + } + } else if (expectedKey != null) { + throw error; + } + if (recordValue != null) { + if (!recordValue.equals(expectedValue)) { + throw error; + } + } else if (expectedValue != null) { + throw error; + } + if (recordTimestamp != expectedTimestamp) { + throw error; + } + } + + private static <K, V> String printRecords(final List<ConsumerRecord<K, V>> result) { + final StringBuilder resultStr = new StringBuilder(); + resultStr.append("[\n"); + for (final ConsumerRecord<?, ?> record : result) { + resultStr.append(" ").append(record.toString()).append("\n"); + } + resultStr.append("]"); + return resultStr.toString(); + } + /** * Returns up to `maxMessages` message-values from the topic. * @@ -520,6 +635,15 @@ public static void waitUntilMetadataIsPropagated(final List<KafkaServer> servers return consumedValues; } + public static KafkaStreams getStartedStreams(final Properties streamsConfig, final StreamsBuilder builder, final boolean clean) { + final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig); + if (clean) { + driver.cleanUp(); + } + driver.start(); + return driver; + } + /** * Returns up to `maxMessages` message-values from the topic. * diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java index bb7f49ce7a5..002ace2ed28 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -29,7 +29,9 @@ import org.apache.kafka.streams.kstream.internals.SessionWindow; import org.apache.kafka.streams.kstream.internals.TimeWindow; import org.apache.kafka.streams.processor.MockProcessorContext; +import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer; import org.apache.kafka.test.MockInternalProcessorContext; import org.hamcrest.BaseMatcher; import org.hamcrest.Description; @@ -60,13 +62,38 @@ private static final Change<Long> ARBITRARY_CHANGE = new Change<>(7L, 14L); + private static class Harness<K, V> { + private final KTableSuppressProcessor<K, V> processor; + private final MockInternalProcessorContext context; + + + Harness(final Suppressed<K> suppressed, + final Serde<K> keySerde, + final Serde<V> valueSerde) { + + final String storeName = "test-store"; + + final StateStore buffer = new InMemoryTimeOrderedKeyValueBuffer.Builder(storeName) + .withLoggingDisabled() + .build(); + final KTableSuppressProcessor<K, V> processor = + new KTableSuppressProcessor<>(getImpl(suppressed), storeName, keySerde, new FullChangeSerde<>(valueSerde)); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + buffer.init(context, buffer); + processor.init(context); + + this.processor = processor; + this.context = context; + } + } + @Test public void zeroTimeLimitShouldImmediatelyEmit() { - final KTableSuppressProcessor<String, Long> processor = - new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded())), String(), new FullChangeSerde<>(Long())); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<String, Long> harness = + new Harness<>(untilTimeLimit(ZERO, unbounded()), String(), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<String, Long> processor = harness.processor; final long timestamp = ARBITRARY_LONG; context.setRecordMetadata("", 0, 0L, null, timestamp); @@ -83,15 +110,10 @@ public void zeroTimeLimitShouldImmediatelyEmit() { @Test public void windowedZeroTimeLimitShouldImmediatelyEmit() { - final KTableSuppressProcessor<Windowed<String>, Long> processor = - new KTableSuppressProcessor<>( - getImpl(untilTimeLimit(ZERO, unbounded())), - timeWindowedSerdeFrom(String.class, 100L), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<Windowed<String>, Long> harness = + new Harness<>(untilTimeLimit(ZERO, unbounded()), timeWindowedSerdeFrom(String.class, 100L), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor; final long timestamp = ARBITRARY_LONG; context.setRecordMetadata("", 0, 0L, null, timestamp); @@ -108,15 +130,10 @@ public void windowedZeroTimeLimitShouldImmediatelyEmit() { @Test public void intermediateSuppressionShouldBufferAndEmitLater() { - final KTableSuppressProcessor<String, Long> processor = - new KTableSuppressProcessor<>( - getImpl(untilTimeLimit(ofMillis(1), unbounded())), - String(), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<String, Long> harness = + new Harness<>(untilTimeLimit(ofMillis(1), unbounded()), String(), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<String, Long> processor = harness.processor; final long timestamp = 0L; context.setRecordMetadata("topic", 0, 0, null, timestamp); @@ -138,14 +155,10 @@ public void intermediateSuppressionShouldBufferAndEmitLater() { @Test public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() { - final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>( - finalResults(ofMillis(1L)), - timeWindowedSerdeFrom(String.class, 1L), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<Windowed<String>, Long> harness = + new Harness<>(finalResults(ofMillis(1L)), timeWindowedSerdeFrom(String.class, 1L), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor; final long windowStart = 99L; final long recordTime = 99L; @@ -184,18 +197,14 @@ public void finalResultsSuppressionShouldBufferAndEmitAtGraceExpiration() { /** * Testing a special case of final results: that even with a grace period of 0, * it will still buffer events and emit only after the end of the window. - * As opposed to emitting immediately the way regular suppresion would with a time limit of 0. + * As opposed to emitting immediately the way regular suppression would with a time limit of 0. */ @Test public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() { - final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>( - finalResults(ofMillis(0)), - timeWindowedSerdeFrom(String.class, 100L), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<Windowed<String>, Long> harness = + new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor; // note the record is in the past, but the window end is in the future, so we still have to buffer, // even though the grace period is 0. @@ -221,14 +230,10 @@ public void finalResultsWithZeroGraceShouldStillBufferUntilTheWindowEnd() { @Test public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() { - final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>( - finalResults(ofMillis(0)), - timeWindowedSerdeFrom(String.class, 100L), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<Windowed<String>, Long> harness = + new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor; final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); @@ -245,14 +250,10 @@ public void finalResultsWithZeroGraceAtWindowEndShouldImmediatelyEmit() { @Test public void finalResultsShouldSuppressTombstonesForTimeWindows() { - final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>( - finalResults(ofMillis(0)), - timeWindowedSerdeFrom(String.class, 100L), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<Windowed<String>, Long> harness = + new Harness<>(finalResults(ofMillis(0L)), timeWindowedSerdeFrom(String.class, 100L), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor; final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); @@ -266,14 +267,10 @@ public void finalResultsShouldSuppressTombstonesForTimeWindows() { @Test public void finalResultsShouldSuppressTombstonesForSessionWindows() { - final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>( - finalResults(ofMillis(0)), - sessionWindowedSerdeFrom(String.class), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<Windowed<String>, Long> harness = + new Harness<>(finalResults(ofMillis(0L)), sessionWindowedSerdeFrom(String.class), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor; final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); @@ -287,14 +284,10 @@ public void finalResultsShouldSuppressTombstonesForSessionWindows() { @Test public void suppressShouldNotSuppressTombstonesForTimeWindows() { - final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>( - getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))), - timeWindowedSerdeFrom(String.class, 100L), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<Windowed<String>, Long> harness = + new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), timeWindowedSerdeFrom(String.class, 100L), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor; final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); @@ -311,14 +304,10 @@ public void suppressShouldNotSuppressTombstonesForTimeWindows() { @Test public void suppressShouldNotSuppressTombstonesForSessionWindows() { - final KTableSuppressProcessor<Windowed<String>, Long> processor = new KTableSuppressProcessor<>( - getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))), - sessionWindowedSerdeFrom(String.class), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<Windowed<String>, Long> harness = + new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), sessionWindowedSerdeFrom(String.class), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<Windowed<String>, Long> processor = harness.processor; final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); @@ -335,14 +324,10 @@ public void suppressShouldNotSuppressTombstonesForSessionWindows() { @Test public void suppressShouldNotSuppressTombstonesForKTable() { - final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>( - getImpl(untilTimeLimit(ofMillis(0), maxRecords(0))), - Serdes.String(), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<String, Long> harness = + new Harness<>(untilTimeLimit(ofMillis(0), maxRecords(0)), String(), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<String, Long> processor = harness.processor; final long timestamp = 100L; context.setRecordMetadata("", 0, 0L, null, timestamp); @@ -359,14 +344,10 @@ public void suppressShouldNotSuppressTombstonesForKTable() { @Test public void suppressShouldEmitWhenOverRecordCapacity() { - final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>( - getImpl(untilTimeLimit(Duration.ofDays(100), maxRecords(1))), - Serdes.String(), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<String, Long> harness = + new Harness<>(untilTimeLimit(Duration.ofDays(100), maxRecords(1)), String(), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<String, Long> processor = harness.processor; final long timestamp = 100L; context.setStreamTime(timestamp); @@ -386,14 +367,10 @@ public void suppressShouldEmitWhenOverRecordCapacity() { @Test public void suppressShouldEmitWhenOverByteCapacity() { - final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>( - getImpl(untilTimeLimit(Duration.ofDays(100), maxBytes(60L))), - Serdes.String(), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<String, Long> harness = + new Harness<>(untilTimeLimit(Duration.ofDays(100), maxBytes(60L)), String(), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<String, Long> processor = harness.processor; final long timestamp = 100L; context.setStreamTime(timestamp); @@ -413,14 +390,10 @@ public void suppressShouldEmitWhenOverByteCapacity() { @Test public void suppressShouldShutDownWhenOverRecordCapacity() { - final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>( - getImpl(untilTimeLimit(Duration.ofDays(100), maxRecords(1).shutDownWhenFull())), - Serdes.String(), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<String, Long> harness = + new Harness<>(untilTimeLimit(Duration.ofDays(100), maxRecords(1).shutDownWhenFull()), String(), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<String, Long> processor = harness.processor; final long timestamp = 100L; context.setStreamTime(timestamp); @@ -441,14 +414,10 @@ public void suppressShouldShutDownWhenOverRecordCapacity() { @Test public void suppressShouldShutDownWhenOverByteCapacity() { - final KTableSuppressProcessor<String, Long> processor = new KTableSuppressProcessor<>( - getImpl(untilTimeLimit(Duration.ofDays(100), maxBytes(60L).shutDownWhenFull())), - Serdes.String(), - new FullChangeSerde<>(Long()) - ); - - final MockInternalProcessorContext context = new MockInternalProcessorContext(); - processor.init(context); + final Harness<String, Long> harness = + new Harness<>(untilTimeLimit(Duration.ofDays(100), maxBytes(60L).shutDownWhenFull()), String(), Long()); + final MockInternalProcessorContext context = harness.context; + final KTableSuppressProcessor<String, Long> processor = harness.processor; final long timestamp = 100L; context.setStreamTime(timestamp); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-328: Add in-memory Suppression > ---------------------------------- > > Key: KAFKA-7223 > URL: https://issues.apache.org/jira/browse/KAFKA-7223 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: John Roesler > Assignee: John Roesler > Priority: Major > > As described in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.] > > This ticket is to implement Suppress, but only for in-memory buffers. > (depends on KAFKA-7222) -- This message was sent by Atlassian JIRA (v7.6.3#76005)