[ https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16626419#comment-16626419 ]
ASF GitHub Bot commented on KAFKA-7223: --------------------------------------- guozhangwang closed pull request #5567: KAFKA-7223: Suppress API with only immediate emit URL: https://github.com/apache/kafka/pull/5567 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index bdd6dc3b37a..293bc6b7a86 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -389,6 +389,16 @@ */ <KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper); + /** + * Suppress some updates from this changelog stream, determined by the supplied {@link Suppressed} configuration. + * + * This controls what updates downstream table and stream operations will receive. + * + * @param suppressed Configuration object determining what, if any, updates to suppress + * @return A new KTable with the desired suppression characteristics. + */ + KTable<K, V> suppress(final Suppressed<K> suppressed); + /** * Create a new {@code KTable} by transforming the value of each record in this {@code KTable} into a new value * (with possibly a new type), with default serializers, deserializers, and state store. diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java new file mode 100644 index 00000000000..7488ef6ff37 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl; +import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; +import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl; +import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl; + +import java.time.Duration; + +public interface Suppressed<K> { + + /** + * Marker interface for a buffer configuration that is "strict" in the sense that it will strictly + * enforce the time bound and never emit early. + */ + interface StrictBufferConfig extends BufferConfig<StrictBufferConfig> { + + } + + interface BufferConfig<BC extends BufferConfig<BC>> { + /** + * Create a size-constrained buffer in terms of the maximum number of keys it will store. + */ + static BufferConfig<?> maxRecords(final long recordLimit) { + return new EagerBufferConfigImpl(recordLimit, Long.MAX_VALUE); + } + + /** + * Set a size constraint on the buffer in terms of the maximum number of keys it will store. + */ + BC withMaxRecords(final long recordLimit); + + /** + * Create a size-constrained buffer in terms of the maximum number of bytes it will use. + */ + static BufferConfig<?> maxBytes(final long byteLimit) { + return new EagerBufferConfigImpl(Long.MAX_VALUE, byteLimit); + } + + /** + * Set a size constraint on the buffer, the maximum number of bytes it will use. + */ + BC withMaxBytes(final long byteLimit); + + /** + * Create a buffer unconstrained by size (either keys or bytes). + * + * As a result, the buffer will consume as much memory as it needs, dictated by the time bound. + * + * If there isn't enough heap available to meet the demand, the application will encounter an + * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that + * JVM processes under extreme memory pressure may exhibit poor GC behavior. + * + * This is a convenient option if you doubt that your buffer will be that large, but also don't + * wish to pick particular constraints, such as in testing. + * + * This buffer is "strict" in the sense that it will enforce the time bound or crash. + * It will never emit early. + */ + static StrictBufferConfig unbounded() { + return new StrictBufferConfigImpl(); + } + + /** + * Set the buffer to be unconstrained by size (either keys or bytes). + * + * As a result, the buffer will consume as much memory as it needs, dictated by the time bound. + * + * If there isn't enough heap available to meet the demand, the application will encounter an + * {@link OutOfMemoryError} and shut down (not guaranteed to be a graceful exit). Also, note that + * JVM processes under extreme memory pressure may exhibit poor GC behavior. + * + * This is a convenient option if you doubt that your buffer will be that large, but also don't + * wish to pick particular constraints, such as in testing. + * + * This buffer is "strict" in the sense that it will enforce the time bound or crash. + * It will never emit early. + */ + StrictBufferConfig withNoBound(); + + /** + * Set the buffer to gracefully shut down the application when any of its constraints are violated + * + * This buffer is "strict" in the sense that it will enforce the time bound or shut down. + * It will never emit early. + */ + StrictBufferConfig shutDownWhenFull(); + + /** + * Sets the buffer to use on-disk storage if it requires more memory than the constraints allow. + * + * This buffer is "strict" in the sense that it will never emit early. + */ + StrictBufferConfig spillToDiskWhenFull(); + + /** + * Set the buffer to just emit the oldest records when any of its constraints are violated. + * + * This buffer is "not strict" in the sense that it may emit early, so it is suitable for reducing + * duplicate results downstream, but does not promise to eliminate them. + */ + BufferConfig emitEarlyWhenFull(); + } + + /** + * Configure the suppression to emit only the "final results" from the window. + * + * By default all Streams operators emit results whenever new results are available. + * This includes windowed operations. + * + * This configuration will instead emit just one result per key for each window, guaranteeing + * to deliver only the final result. This option is suitable for use cases in which the business logic + * requires a hard guarantee that only the final result is propagated. For example, sending alerts. + * + * To accomplish this, the operator will buffer events from the window until the window close (that is, + * until the end-time passes, and additionally until the grace period expires). Since windowed operators + * are required to reject late events for a window whose grace period is expired, there is an additional + * guarantee that the final results emitted from this suppression will match any queriable state upstream. + * + * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results. + * This is required to be a "strict" config, since it would violate the "final results" + * property to emit early and then issue an update later. + * @param <K> The key type for the KTable to apply this suppression to. "Final results" mode is only available + * on Windowed KTables (this is enforced by the type parameter). + * @return a "final results" mode suppression configuration + */ + static <K extends Windowed> Suppressed<K> untilWindowCloses(final StrictBufferConfig bufferConfig) { + return new FinalResultsSuppressionBuilder<>(bufferConfig); + } + + /** + * Configure the suppression to wait {@code timeToWaitForMoreEvents} amount of time after receiving a record + * before emitting it further downstream. If another record for the same key arrives in the mean time, it replaces + * the first record in the buffer but does <em>not</em> re-start the timer. + * + * @param timeToWaitForMoreEvents The amount of time to wait, per record, for new events. + * @param bufferConfig A configuration specifying how much space to use for buffering intermediate results. + * @param <K> The key type for the KTable to apply this suppression to. + * @return a suppression configuration + */ + static <K> Suppressed<K> untilTimeLimit(final Duration timeToWaitForMoreEvents, final BufferConfig bufferConfig) { + return new SuppressedImpl<>(timeToWaitForMoreEvents, bufferConfig, null); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java index 5a3c897f781..b89399bf7b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregate.java @@ -38,7 +38,7 @@ import java.util.ArrayList; import java.util.List; -class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> { +public class KStreamSessionWindowAggregate<K, V, Agg> implements KStreamAggProcessorSupplier<K, Windowed<K>, V, Agg> { private static final Logger LOG = LoggerFactory.getLogger(KStreamSessionWindowAggregate.class); private final String storeName; @@ -49,11 +49,11 @@ private boolean sendOldValues = false; - KStreamSessionWindowAggregate(final SessionWindows windows, - final String storeName, - final Initializer<Agg> initializer, - final Aggregator<? super K, ? super V, Agg> aggregator, - final Merger<? super K, Agg> sessionMerger) { + public KStreamSessionWindowAggregate(final SessionWindows windows, + final String storeName, + final Initializer<Agg> initializer, + final Aggregator<? super K, ? super V, Agg> aggregator, + final Merger<? super K, Agg> sessionMerger) { this.windows = windows; this.storeName = storeName; this.initializer = initializer; @@ -66,6 +66,10 @@ return new KStreamSessionWindowAggregateProcessor(); } + public SessionWindows windows() { + return windows; + } + @Override public void enableSendingOldValues() { sendOldValues = true; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 57542847b96..f29251573e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -44,10 +44,10 @@ private boolean sendOldValues = false; - KStreamWindowAggregate(final Windows<W> windows, - final String storeName, - final Initializer<Agg> initializer, - final Aggregator<? super K, ? super V, Agg> aggregator) { + public KStreamWindowAggregate(final Windows<W> windows, + final String storeName, + final Initializer<Agg> initializer, + final Aggregator<? super K, ? super V, Agg> aggregator) { this.windows = windows; this.storeName = storeName; this.initializer = initializer; @@ -59,6 +59,10 @@ return new KStreamWindowAggregateProcessor(); } + public Windows<W> windows() { + return windows; + } + @Override public void enableSendingOldValues() { sendOldValues = true; diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 352e42d3918..2330fad1b16 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -26,21 +26,30 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.kstream.Suppressed; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.kstream.ValueMapperWithKey; import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier; +import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.internals.graph.KTableKTableJoinNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode; import org.apache.kafka.streams.kstream.internals.graph.TableProcessorNode; +import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; +import org.apache.kafka.streams.kstream.internals.suppress.KTableSuppressProcessor; +import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.state.KeyValueStore; +import java.time.Duration; +import java.util.Collections; import java.util.Objects; import java.util.Set; +import static org.apache.kafka.streams.kstream.internals.graph.GraphGraceSearchUtil.findAndVerifyWindowGrace; + /** * The implementation class of {@link KTable}. * @@ -66,6 +75,8 @@ private static final String SELECT_NAME = "KTABLE-SELECT-"; + private static final String SUPPRESS_NAME = "KTABLE-SUPPRESS-"; + private static final String TOSTREAM_NAME = "KTABLE-TOSTREAM-"; private static final String TRANSFORMVALUES_NAME = "KTABLE-TRANSFORMVALUES-"; @@ -349,6 +360,53 @@ public String queryableStoreName() { return toStream().selectKey(mapper); } + @Override + public KTable<K, V> suppress(final Suppressed<K> suppressed) { + final String name = builder.newProcessorName(SUPPRESS_NAME); + + final ProcessorSupplier<K, Change<V>> suppressionSupplier = + () -> new KTableSuppressProcessor<>(buildSuppress(suppressed)); + + final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>( + suppressionSupplier, + name + ); + + final ProcessorGraphNode<K, Change<V>> node = new ProcessorGraphNode<>(name, processorParameters, false); + + builder.addGraphNode(streamsGraphNode, node); + + return new KTableImpl<K, S, V>( + builder, + name, + suppressionSupplier, + keySerde, + valSerde, + Collections.singleton(this.name), + null, + false, + node + ); + } + + @SuppressWarnings("unchecked") + private SuppressedImpl<K> buildSuppress(final Suppressed<K> suppress) { + if (suppress instanceof FinalResultsSuppressionBuilder) { + final long grace = findAndVerifyWindowGrace(streamsGraphNode); + + final FinalResultsSuppressionBuilder<?> builder = (FinalResultsSuppressionBuilder) suppress; + + final SuppressedImpl<? extends Windowed> finalResultsSuppression = + builder.buildFinalResultsSuppression(Duration.ofMillis(grace)); + + return (SuppressedImpl<K>) finalResultsSuppression; + } else if (suppress instanceof SuppressedImpl) { + return (SuppressedImpl<K>) suppress; + } else { + throw new IllegalArgumentException("Custom subclasses of Suppressed are not allowed."); + } + } + @Override public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { @@ -492,12 +550,12 @@ public String queryableStoreName() { final ProcessorParameters joinMergeProcessorParameters = new ProcessorParameters(joinMerge, joinMergeName); kTableJoinNodeBuilder.withJoinMergeProcessorParameters(joinMergeProcessorParameters) - .withJoinOtherProcessorParameters(joinOtherProcessorParameters) - .withJoinThisProcessorParameters(joinThisProcessorParameters) - .withJoinThisStoreNames(valueGetterSupplier().storeNames()) - .withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames()) - .withOtherJoinSideNodeName(((KTableImpl) other).name) - .withThisJoinSideNodeName(name); + .withJoinOtherProcessorParameters(joinOtherProcessorParameters) + .withJoinThisProcessorParameters(joinThisProcessorParameters) + .withJoinThisStoreNames(valueGetterSupplier().storeNames()) + .withJoinOtherStoreNames(((KTableImpl) other).valueGetterSupplier().storeNames()) + .withOtherJoinSideNodeName(((KTableImpl) other).name) + .withThisJoinSideNodeName(name); final KTableKTableJoinNode kTableKTableJoinNode = kTableJoinNodeBuilder.build(); builder.addGraphNode(this.streamsGraphNode, kTableKTableJoinNode); @@ -526,10 +584,10 @@ public String queryableStoreName() { final String selectName = builder.newProcessorName(SELECT_NAME); final KTableProcessorSupplier<K, V, KeyValue<K1, V1>> selectSupplier = new KTableRepartitionMap<>(this, selector); - final ProcessorParameters processorParameters = new ProcessorParameters<>(selectSupplier, selectName); + final ProcessorParameters<K, Change<V>> processorParameters = new ProcessorParameters<>(selectSupplier, selectName); // select the aggregate key and values (old and new), it would require parent to send old values - final ProcessorGraphNode<K1, V1> groupByMapNode = new ProcessorGraphNode<>( + final ProcessorGraphNode<K, Change<V>> groupByMapNode = new ProcessorGraphNode<>( selectName, processorParameters, false diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java new file mode 100644 index 00000000000..306ddf5cf5e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.graph; + +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.Windows; +import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate; +import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +public final class GraphGraceSearchUtil { + private GraphGraceSearchUtil() {} + + public static long findAndVerifyWindowGrace(final StreamsGraphNode streamsGraphNode) { + return findAndVerifyWindowGrace(streamsGraphNode, ""); + } + + private static long findAndVerifyWindowGrace(final StreamsGraphNode streamsGraphNode, final String chain) { + // error base case: we traversed off the end of the graph without finding a window definition + if (streamsGraphNode == null) { + throw new TopologyException( + "Window close time is only defined for windowed computations. Got [" + chain + "]." + ); + } + // base case: return if this node defines a grace period. + { + final Long gracePeriod = extractGracePeriod(streamsGraphNode); + if (gracePeriod != null) { + return gracePeriod; + } + } + + final String newChain = chain.equals("") ? streamsGraphNode.nodeName() : streamsGraphNode.nodeName() + "->" + chain; + + if (streamsGraphNode.parentNodes().isEmpty()) { + // error base case: we traversed to the end of the graph without finding a window definition + throw new TopologyException( + "Window close time is only defined for windowed computations. Got [" + newChain + "]." + ); + } + + // recursive case: all parents must define a grace period, and we use the max of our parents' graces. + long inheritedGrace = -1; + for (final StreamsGraphNode parentNode : streamsGraphNode.parentNodes()) { + final long parentGrace = findAndVerifyWindowGrace(parentNode, newChain); + inheritedGrace = Math.max(inheritedGrace, parentGrace); + } + + if (inheritedGrace == -1) { + throw new IllegalStateException(); // shouldn't happen, and it's not a legal grace period + } + + return inheritedGrace; + } + + private static Long extractGracePeriod(final StreamsGraphNode node) { + if (node instanceof StatefulProcessorNode) { + final ProcessorSupplier processorSupplier = ((StatefulProcessorNode) node).processorParameters().processorSupplier(); + if (processorSupplier instanceof KStreamWindowAggregate) { + final KStreamWindowAggregate kStreamWindowAggregate = (KStreamWindowAggregate) processorSupplier; + final Windows windows = kStreamWindowAggregate.windows(); + return windows.gracePeriodMs(); + } else if (processorSupplier instanceof KStreamSessionWindowAggregate) { + final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) processorSupplier; + final SessionWindows windows = kStreamSessionWindowAggregate.windows(); + return windows.gracePeriodMs(); + } else { + return null; + } + } else { + return null; + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java new file mode 100644 index 00000000000..e731dc6f5e1 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferConfigImpl.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.Suppressed; + +import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; + +abstract class BufferConfigImpl<BC extends Suppressed.BufferConfig<BC>> implements Suppressed.BufferConfig<BC> { + public abstract long maxKeys(); + + public abstract long maxBytes(); + + @SuppressWarnings("unused") + public abstract BufferFullStrategy bufferFullStrategy(); + + @Override + public Suppressed.StrictBufferConfig withNoBound() { + return new StrictBufferConfigImpl( + Long.MAX_VALUE, + Long.MAX_VALUE, + SHUT_DOWN // doesn't matter, given the bounds + ); + } + + @Override + public Suppressed.StrictBufferConfig shutDownWhenFull() { + return new StrictBufferConfigImpl(maxKeys(), maxBytes(), SHUT_DOWN); + } + + @Override + public Suppressed.BufferConfig emitEarlyWhenFull() { + return new EagerBufferConfigImpl(maxKeys(), maxBytes()); + } + + @Override + public Suppressed.StrictBufferConfig spillToDiskWhenFull() { + throw new UnsupportedOperationException("not implemented"); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferFullStrategy.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferFullStrategy.java new file mode 100644 index 00000000000..2da7c141825 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/BufferFullStrategy.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +public enum BufferFullStrategy { + EMIT, + SPILL_TO_DISK, + SHUT_DOWN +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java new file mode 100644 index 00000000000..0c2c883e18a --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/EagerBufferConfigImpl.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.Suppressed; + +import java.util.Objects; + +public class EagerBufferConfigImpl extends BufferConfigImpl { + + private final long maxKeys; + private final long maxBytes; + + public EagerBufferConfigImpl(final long maxKeys, final long maxBytes) { + this.maxKeys = maxKeys; + this.maxBytes = maxBytes; + } + + @Override + public Suppressed.BufferConfig withMaxRecords(final long recordLimit) { + return new EagerBufferConfigImpl(recordLimit, maxBytes); + } + + @Override + public Suppressed.BufferConfig withMaxBytes(final long byteLimit) { + return new EagerBufferConfigImpl(maxKeys, byteLimit); + } + + @Override + public long maxKeys() { + return maxKeys; + } + + @Override + public long maxBytes() { + return maxBytes; + } + + @Override + public BufferFullStrategy bufferFullStrategy() { + return BufferFullStrategy.EMIT; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final EagerBufferConfigImpl that = (EagerBufferConfigImpl) o; + return maxKeys == that.maxKeys && + maxBytes == that.maxBytes; + } + + @Override + public int hashCode() { + return Objects.hash(maxKeys, maxBytes); + } + + @Override + public String toString() { + return "EagerBufferConfigImpl{maxKeys=" + maxKeys + ", maxBytes=" + maxBytes + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java new file mode 100644 index 00000000000..548f5991dbb --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/FinalResultsSuppressionBuilder.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.ProcessorContext; + +import java.time.Duration; +import java.util.Objects; + +public class FinalResultsSuppressionBuilder<K extends Windowed> implements Suppressed<K> { + private final StrictBufferConfig bufferConfig; + + public FinalResultsSuppressionBuilder(final Suppressed.StrictBufferConfig bufferConfig) { + this.bufferConfig = bufferConfig; + } + + public SuppressedImpl<K> buildFinalResultsSuppression(final Duration gracePeriod) { + return new SuppressedImpl<>( + gracePeriod, + bufferConfig, + (ProcessorContext context, K key) -> key.window().end() + ); + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final FinalResultsSuppressionBuilder<?> that = (FinalResultsSuppressionBuilder<?>) o; + return Objects.equals(bufferConfig, that.bufferConfig); + } + + @Override + public int hashCode() { + return Objects.hash(bufferConfig); + } + + @Override + public String toString() { + return "FinalResultsSuppressionBuilder{bufferConfig=" + bufferConfig + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java new file mode 100644 index 00000000000..f65f2b4af20 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessor.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; + +import java.time.Duration; + +public class KTableSuppressProcessor<K, V> implements Processor<K, Change<V>> { + private final SuppressedImpl<K> suppress; + private InternalProcessorContext internalProcessorContext; + + public KTableSuppressProcessor(final SuppressedImpl<K> suppress) { + this.suppress = suppress; + } + + @Override + public void init(final ProcessorContext context) { + internalProcessorContext = (InternalProcessorContext) context; + } + + @Override + public void process(final K key, final Change<V> value) { + if (suppress.getTimeToWaitForMoreEvents() == Duration.ZERO && definedRecordTime(key) <= internalProcessorContext.streamTime()) { + internalProcessorContext.forward(key, value); + } else { + throw new NotImplementedException(); + } + } + + private long definedRecordTime(final K key) { + return suppress.getTimeDefinition().time(internalProcessorContext, key); + } + + @Override + public void close() { + } + + @Override + public String toString() { + return "KTableSuppressProcessor{suppress=" + suppress + '}'; + } + + static class NotImplementedException extends RuntimeException { + NotImplementedException() { + super(); + } + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java new file mode 100644 index 00000000000..0634a748a5b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/StrictBufferConfigImpl.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.Suppressed; + +import java.util.Objects; + +import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; + +public class StrictBufferConfigImpl extends BufferConfigImpl<Suppressed.StrictBufferConfig> implements Suppressed.StrictBufferConfig { + + private final long maxKeys; + private final long maxBytes; + private final BufferFullStrategy bufferFullStrategy; + + public StrictBufferConfigImpl(final long maxKeys, + final long maxBytes, + final BufferFullStrategy bufferFullStrategy) { + this.maxKeys = maxKeys; + this.maxBytes = maxBytes; + this.bufferFullStrategy = bufferFullStrategy; + } + + public StrictBufferConfigImpl() { + this.maxKeys = Long.MAX_VALUE; + this.maxBytes = Long.MAX_VALUE; + this.bufferFullStrategy = SHUT_DOWN; + } + + @Override + public Suppressed.StrictBufferConfig withMaxRecords(final long recordLimit) { + return new StrictBufferConfigImpl(recordLimit, maxBytes, bufferFullStrategy); + } + + @Override + public Suppressed.StrictBufferConfig withMaxBytes(final long byteLimit) { + return new StrictBufferConfigImpl(maxKeys, byteLimit, bufferFullStrategy); + } + + @Override + public long maxKeys() { + return maxKeys; + } + + @Override + public long maxBytes() { + return maxBytes; + } + + @Override + public BufferFullStrategy bufferFullStrategy() { + return bufferFullStrategy; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final StrictBufferConfigImpl that = (StrictBufferConfigImpl) o; + return maxKeys == that.maxKeys && + maxBytes == that.maxBytes && + bufferFullStrategy == that.bufferFullStrategy; + } + + @Override + public int hashCode() { + return Objects.hash(maxKeys, maxBytes, bufferFullStrategy); + } + + @Override + public String toString() { + return "StrictBufferConfigImpl{maxKeys=" + maxKeys + + ", maxBytes=" + maxBytes + + ", bufferFullStrategy=" + bufferFullStrategy + '}'; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java new file mode 100644 index 00000000000..cffc42b66d5 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/suppress/SuppressedImpl.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.processor.ProcessorContext; + +import java.time.Duration; +import java.util.Objects; + +public class SuppressedImpl<K> implements Suppressed<K> { + private static final Duration DEFAULT_SUPPRESSION_TIME = Duration.ofMillis(Long.MAX_VALUE); + private static final StrictBufferConfig DEFAULT_BUFFER_CONFIG = BufferConfig.unbounded(); + + private final BufferConfig bufferConfig; + private final Duration timeToWaitForMoreEvents; + private final TimeDefinition<K> timeDefinition; + + public SuppressedImpl(final Duration suppressionTime, + final BufferConfig bufferConfig, + final TimeDefinition<K> timeDefinition) { + this.timeToWaitForMoreEvents = suppressionTime == null ? DEFAULT_SUPPRESSION_TIME : suppressionTime; + this.timeDefinition = timeDefinition == null ? (context, anyKey) -> context.timestamp() : timeDefinition; + this.bufferConfig = bufferConfig == null ? DEFAULT_BUFFER_CONFIG : bufferConfig; + } + + interface TimeDefinition<K> { + long time(final ProcessorContext context, final K key); + } + + TimeDefinition<K> getTimeDefinition() { + return timeDefinition; + } + + Duration getTimeToWaitForMoreEvents() { + return timeToWaitForMoreEvents == null ? Duration.ZERO : timeToWaitForMoreEvents; + } + + @Override + public boolean equals(final Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + final SuppressedImpl<?> that = (SuppressedImpl<?>) o; + return Objects.equals(bufferConfig, that.bufferConfig) && + Objects.equals(getTimeToWaitForMoreEvents(), that.getTimeToWaitForMoreEvents()) && + Objects.equals(getTimeDefinition(), that.getTimeDefinition()); + } + + @Override + public int hashCode() { + return Objects.hash(bufferConfig, getTimeToWaitForMoreEvents(), getTimeDefinition()); + } + + @Override + public String toString() { + return "SuppressedImpl{" + + ", bufferConfig=" + bufferConfig + + ", timeToWaitForMoreEvents=" + timeToWaitForMoreEvents + + ", timeDefinition=" + timeDefinition + + '}'; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java b/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java new file mode 100644 index 00000000000..421311257b4 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/KeyValueTimestamp.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams; + +public class KeyValueTimestamp<K, V> { + private final K key; + private final V value; + private final long timestamp; + + public KeyValueTimestamp(final K key, final V value, final long timestamp) { + this.key = key; + this.value = value; + this.timestamp = timestamp; + } + + public K key() { + return key; + } + + public V value() { + return value; + } + + public long timestamp() { + return timestamp; + } + + @Override + public String toString() { + return "KeyValueTimestamp{key=" + key + ", value=" + value + ", timestamp=" + timestamp + '}'; + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java new file mode 100644 index 00000000000..a0e78580d45 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/SuppressionIntegrationTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.test.IntegrationTest; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import java.time.Duration; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Objects; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import static java.util.Arrays.asList; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.apache.kafka.common.utils.Utils.mkProperties; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; +import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; + +@Category({IntegrationTest.class}) +public class SuppressionIntegrationTest { + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer(); + private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); + private static final Serde<String> STRING_SERDE = Serdes.String(); + private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); + private static final int COMMIT_INTERVAL = 100; + private static final int SCALE_FACTOR = COMMIT_INTERVAL * 2; + + @Test + public void shouldNotSuppressIntermediateEventsWithZeroEmitAfter() throws InterruptedException { + final String testId = "-shouldNotSuppressIntermediateEventsWithZeroEmitAfter"; + final String appId = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + testId; + final String input = "input" + testId; + final String outputSuppressed = "output-suppressed" + testId; + final String outputRaw = "output-raw" + testId; + + cleanStateBeforeTest(input, outputSuppressed, outputRaw); + + final StreamsBuilder builder = new StreamsBuilder(); + final KTable<String, Long> valueCounts = builder + .table( + input, + Consumed.with(STRING_SERDE, STRING_SERDE), + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(STRING_SERDE, STRING_SERDE) + .withCachingDisabled() + .withLoggingDisabled() + ) + .groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE)) + .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts").withCachingDisabled()); + + valueCounts + .suppress(untilTimeLimit(Duration.ZERO, unbounded())) + .toStream() + .to(outputSuppressed, Produced.with(STRING_SERDE, Serdes.Long())); + + valueCounts + .toStream() + .to(outputRaw, Produced.with(STRING_SERDE, Serdes.Long())); + + final KafkaStreams driver = getCleanStartedStreams(appId, builder); + + try { + produceSynchronously( + input, + asList( + new KeyValueTimestamp<>("k1", "v1", scaledTime(0L)), + new KeyValueTimestamp<>("k1", "v2", scaledTime(1L)), + new KeyValueTimestamp<>("k2", "v1", scaledTime(2L)), + new KeyValueTimestamp<>("x", "x", scaledTime(4L)) + ) + ); + verifyOutput( + outputRaw, + asList( + new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)), + new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), + new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), + new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), + new KeyValueTimestamp<>("x", 1L, scaledTime(4L)) + ) + ); + verifyOutput( + outputSuppressed, + asList( + new KeyValueTimestamp<>("v1", 1L, scaledTime(0L)), + new KeyValueTimestamp<>("v1", 0L, scaledTime(1L)), + new KeyValueTimestamp<>("v2", 1L, scaledTime(1L)), + new KeyValueTimestamp<>("v1", 1L, scaledTime(2L)), + new KeyValueTimestamp<>("x", 1L, scaledTime(4L)) + ) + ); + } finally { + driver.close(); + cleanStateAfterTest(driver); + } + } + + private void cleanStateBeforeTest(final String... topic) throws InterruptedException { + CLUSTER.deleteAllTopicsAndWait(30_000L); + for (final String s : topic) { + CLUSTER.createTopic(s, 1, 1); + } + } + + private KafkaStreams getCleanStartedStreams(final String appId, final StreamsBuilder builder) { + final Properties streamsConfig = mkProperties(mkMap( + mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId), + mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(StreamsConfig.POLL_MS_CONFIG, Objects.toString(COMMIT_INTERVAL)), + mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Objects.toString(COMMIT_INTERVAL)) + )); + final KafkaStreams driver = new KafkaStreams(builder.build(), streamsConfig); + driver.cleanUp(); + driver.start(); + return driver; + } + + private void cleanStateAfterTest(final KafkaStreams driver) throws InterruptedException { + driver.cleanUp(); + CLUSTER.deleteAllTopicsAndWait(30_000L); + } + + private long scaledTime(final long unscaledTime) { + return SCALE_FACTOR * unscaledTime; + } + + private void produceSynchronously(final String topic, final List<KeyValueTimestamp<String, String>> toProduce) { + final Properties producerConfig = mkProperties(mkMap( + mkEntry(ProducerConfig.CLIENT_ID_CONFIG, "anything"), + mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER.getClass().getName()), + mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, STRING_SERIALIZER.getClass().getName()), + mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()) + )); + try (final Producer<String, String> producer = new KafkaProducer<>(producerConfig)) { + // TODO: test EOS + //noinspection ConstantConditions + if (false) { + producer.initTransactions(); + producer.beginTransaction(); + } + final LinkedList<Future<RecordMetadata>> futures = new LinkedList<>(); + for (final KeyValueTimestamp<String, String> record : toProduce) { + final Future<RecordMetadata> f = producer.send( + new ProducerRecord<>(topic, null, record.timestamp(), record.key(), record.value(), null) + ); + futures.add(f); + } + for (final Future<RecordMetadata> future : futures) { + try { + future.get(); + } catch (final InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + // TODO: test EOS + //noinspection ConstantConditions + if (false) { + producer.commitTransaction(); + } else { + producer.flush(); + } + } + } + + private void verifyOutput(final String topic, final List<KeyValueTimestamp<String, Long>> expected) { + final List<ConsumerRecord<String, Long>> results; + try { + final Properties properties = mkProperties( + mkMap( + mkEntry(ConsumerConfig.GROUP_ID_CONFIG, "test-group"), + mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ((Deserializer<String>) STRING_DESERIALIZER).getClass().getName()), + mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ((Deserializer<Long>) LONG_DESERIALIZER).getClass().getName()) + ) + ); + results = IntegrationTestUtils.waitUntilMinRecordsReceived(properties, topic, expected.size()); + } catch (final InterruptedException e) { + throw new RuntimeException(e); + } + + if (results.size() != expected.size()) { + throw new AssertionError(printRecords(results) + " != " + expected); + } + final Iterator<KeyValueTimestamp<String, Long>> expectedIterator = expected.iterator(); + for (final ConsumerRecord<String, Long> result : results) { + final KeyValueTimestamp<String, Long> expected1 = expectedIterator.next(); + try { + compareKeyValueTimestamp(result, expected1.key(), expected1.value(), expected1.timestamp()); + } catch (final AssertionError e) { + throw new AssertionError(printRecords(results) + " != " + expected, e); + } + } + } + + private <K, V> void compareKeyValueTimestamp(final ConsumerRecord<K, V> record, final K expectedKey, final V expectedValue, final long expectedTimestamp) { + Objects.requireNonNull(record); + final K recordKey = record.key(); + final V recordValue = record.value(); + final long recordTimestamp = record.timestamp(); + final AssertionError error = new AssertionError("Expected <" + expectedKey + ", " + expectedValue + "> with timestamp=" + expectedTimestamp + + " but was <" + recordKey + ", " + recordValue + "> with timestamp=" + recordTimestamp); + if (recordKey != null) { + if (!recordKey.equals(expectedKey)) { + throw error; + } + } else if (expectedKey != null) { + throw error; + } + if (recordValue != null) { + if (!recordValue.equals(expectedValue)) { + throw error; + } + } else if (expectedValue != null) { + throw error; + } + if (recordTimestamp != expectedTimestamp) { + throw error; + } + } + + private <K, V> String printRecords(final List<ConsumerRecord<K, V>> result) { + final StringBuilder resultStr = new StringBuilder(); + resultStr.append("[\n"); + for (final ConsumerRecord<?, ?> record : result) { + resultStr.append(" ").append(record.toString()).append("\n"); + } + resultStr.append("]"); + return resultStr.toString(); + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java new file mode 100644 index 00000000000..53f24b58aac --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/SuppressedTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.kstream.internals.suppress.EagerBufferConfigImpl; +import org.apache.kafka.streams.kstream.internals.suppress.FinalResultsSuppressionBuilder; +import org.apache.kafka.streams.kstream.internals.suppress.StrictBufferConfigImpl; +import org.apache.kafka.streams.kstream.internals.suppress.SuppressedImpl; +import org.junit.Test; + +import static java.lang.Long.MAX_VALUE; +import static java.time.Duration.ofMillis; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxBytes; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.maxRecords; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; +import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; +import static org.apache.kafka.streams.kstream.internals.suppress.BufferFullStrategy.SHUT_DOWN; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class SuppressedTest { + + @Test + public void bufferBuilderShouldBeConsistent() { + assertThat( + "noBound should remove bounds", + maxBytes(2L).withMaxRecords(4L).withNoBound(), + is(unbounded()) + ); + + assertThat( + "keys alone should be set", + maxRecords(2L), + is(new EagerBufferConfigImpl(2L, MAX_VALUE)) + ); + + assertThat( + "size alone should be set", + maxBytes(2L), + is(new EagerBufferConfigImpl(MAX_VALUE, 2L)) + ); + } + + @Test + public void intermediateEventsShouldAcceptAnyBufferAndSetBounds() { + assertThat( + "time alone should be set", + untilTimeLimit(ofMillis(2), unbounded()), + is(new SuppressedImpl<>(ofMillis(2), unbounded(), null)) + ); + + assertThat( + "time and unbounded buffer should be set", + untilTimeLimit(ofMillis(2), unbounded()), + is(new SuppressedImpl<>(ofMillis(2), unbounded(), null)) + ); + + assertThat( + "time and keys buffer should be set", + untilTimeLimit(ofMillis(2), maxRecords(2)), + is(new SuppressedImpl<>(ofMillis(2), maxRecords(2), null)) + ); + + assertThat( + "time and size buffer should be set", + untilTimeLimit(ofMillis(2), maxBytes(2)), + is(new SuppressedImpl<>(ofMillis(2), maxBytes(2), null)) + ); + + assertThat( + "all constraints should be set", + untilTimeLimit(ofMillis(2L), maxRecords(3L).withMaxBytes(2L)), + is(new SuppressedImpl<>(ofMillis(2), new EagerBufferConfigImpl(3L, 2L), null)) + ); + } + + @Test + public void finalEventsShouldAcceptStrictBuffersAndSetBounds() { + + assertThat( + untilWindowCloses(unbounded()), + is(new FinalResultsSuppressionBuilder<>(unbounded())) + ); + + assertThat( + untilWindowCloses(maxRecords(2L).shutDownWhenFull()), + is(new FinalResultsSuppressionBuilder<>(new StrictBufferConfigImpl(2L, MAX_VALUE, SHUT_DOWN)) + ) + ); + + assertThat( + untilWindowCloses(maxBytes(2L).shutDownWhenFull()), + is(new FinalResultsSuppressionBuilder<>(new StrictBufferConfigImpl(MAX_VALUE, 2L, SHUT_DOWN)) + ) + ); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java new file mode 100644 index 00000000000..fead6788eb4 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.KeyValueTimestamp; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.kstream.Consumed; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.kstream.Serialized; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.apache.kafka.streams.test.OutputVerifier; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Locale; +import java.util.Properties; + +import static java.time.Duration.ZERO; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; +import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; + +public class SuppressScenarioTest { + private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer(); + private static final StringSerializer STRING_SERIALIZER = new StringSerializer(); + private static final Serde<String> STRING_SERDE = Serdes.String(); + private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer(); + + @Test + public void shouldImmediatelyEmitEventsWithZeroEmitAfter() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KTable<String, Long> valueCounts = builder + .table( + "input", + Consumed.with(STRING_SERDE, STRING_SERDE), + Materialized.<String, String, KeyValueStore<Bytes, byte[]>>with(STRING_SERDE, STRING_SERDE) + .withCachingDisabled() + .withLoggingDisabled() + ) + .groupBy((k, v) -> new KeyValue<>(v, k), Serialized.with(STRING_SERDE, STRING_SERDE)) + .count(); + + valueCounts + .suppress(untilTimeLimit(ZERO, unbounded())) + .toStream() + .to("output-suppressed", Produced.with(STRING_SERDE, Serdes.Long())); + + valueCounts + .toStream() + .to("output-raw", Produced.with(STRING_SERDE, Serdes.Long())); + + final Topology topology = builder.build(); + + final Properties config = Utils.mkProperties(Utils.mkMap( + Utils.mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, getClass().getSimpleName().toLowerCase(Locale.getDefault())), + Utils.mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "bogus") + )); + + final ConsumerRecordFactory<String, String> recordFactory = new ConsumerRecordFactory<>(STRING_SERIALIZER, STRING_SERIALIZER); + + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, config)) { + driver.pipeInput(recordFactory.create("input", "k1", "v1", 0L)); + driver.pipeInput(recordFactory.create("input", "k1", "v2", 1L)); + driver.pipeInput(recordFactory.create("input", "k2", "v1", 2L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("v1", 1L, 0L), + new KeyValueTimestamp<>("v1", 0L, 1L), + new KeyValueTimestamp<>("v2", 1L, 1L), + new KeyValueTimestamp<>("v1", 1L, 2L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("v1", 1L, 0L), + new KeyValueTimestamp<>("v1", 0L, 1L), + new KeyValueTimestamp<>("v2", 1L, 1L), + new KeyValueTimestamp<>("v1", 1L, 2L) + ) + ); + driver.pipeInput(recordFactory.create("input", "x", "x", 3L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList( + new KeyValueTimestamp<>("x", 1L, 3L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + singletonList( + new KeyValueTimestamp<>("x", 1L, 3L) + ) + ); + driver.pipeInput(recordFactory.create("input", "x", "x", 4L)); + verify( + drainProducerRecords(driver, "output-raw", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("x", 0L, 4L), + new KeyValueTimestamp<>("x", 1L, 4L) + ) + ); + verify( + drainProducerRecords(driver, "output-suppressed", STRING_DESERIALIZER, LONG_DESERIALIZER), + asList( + new KeyValueTimestamp<>("x", 0L, 4L), + new KeyValueTimestamp<>("x", 1L, 4L) + ) + ); + } + } + + private <K, V> void verify(final List<ProducerRecord<K, V>> results, final List<KeyValueTimestamp<K, V>> expectedResults) { + if (results.size() != expectedResults.size()) { + throw new AssertionError(printRecords(results) + " != " + expectedResults); + } + final Iterator<KeyValueTimestamp<K, V>> expectedIterator = expectedResults.iterator(); + for (final ProducerRecord<K, V> result : results) { + final KeyValueTimestamp<K, V> expected = expectedIterator.next(); + try { + OutputVerifier.compareKeyValueTimestamp(result, expected.key(), expected.value(), expected.timestamp()); + } catch (final AssertionError e) { + throw new AssertionError(printRecords(results) + " != " + expectedResults, e); + } + } + } + + private <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver, final String topic, final Deserializer<K> keyDeserializer, final Deserializer<V> valueDeserializer) { + final List<ProducerRecord<K, V>> result = new LinkedList<>(); + for (ProducerRecord<K, V> next = driver.readOutput(topic, keyDeserializer, valueDeserializer); + next != null; + next = driver.readOutput(topic, keyDeserializer, valueDeserializer)) { + result.add(next); + } + return new ArrayList<>(result); + } + + private <K, V> String printRecords(final List<ProducerRecord<K, V>> result) { + final StringBuilder resultStr = new StringBuilder(); + resultStr.append("[\n"); + for (final ProducerRecord<?, ?> record : result) { + resultStr.append(" ").append(record.toString()).append("\n"); + } + resultStr.append("]"); + return resultStr.toString(); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java new file mode 100644 index 00000000000..2b054230839 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.graph; + +import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.kstream.SessionWindows; +import org.apache.kafka.streams.kstream.TimeWindows; +import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate; +import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +public class GraphGraceSearchUtilTest { + @Test + public void shouldThrowOnNull() { + try { + GraphGraceSearchUtil.findAndVerifyWindowGrace(null); + fail("Should have thrown."); + } catch (final TopologyException e) { + assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got [].")); + } + } + + @Test + public void shouldFailIfThereIsNoGraceAncestor() { + // doesn't matter if this ancestor is stateless or stateful. The important thing it that there is + // no grace period defined on any ancestor of the node + final StatefulProcessorNode<String, Long> gracelessAncestor = new StatefulProcessorNode<>( + "stateful", + new ProcessorParameters<>( + () -> new Processor<Object, Object>() { + @Override + public void init(final ProcessorContext context) {} + + @Override + public void process(final Object key, final Object value) {} + + @Override + public void close() {} + }, + "dummy" + ), + null, + null, + false + ); + + final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null); + gracelessAncestor.addChild(node); + + try { + GraphGraceSearchUtil.findAndVerifyWindowGrace(node); + fail("should have thrown."); + } catch (final TopologyException e) { + assertThat(e.getMessage(), is("Invalid topology: Window close time is only defined for windowed computations. Got [stateful->stateless].")); + } + } + + @Test + public void shouldExtractGraceFromKStreamWindowAggregateNode() { + final TimeWindows windows = TimeWindows.of(10L).grace(1234L); + final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>( + "asdf", + new ProcessorParameters<>( + new KStreamWindowAggregate<String, Long, Integer, TimeWindow>( + windows, + "asdf", + null, + null + ), + "asdf" + ), + null, + null, + false + ); + + final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); + assertThat(extracted, is(windows.gracePeriodMs())); + } + + @Test + public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() { + final SessionWindows windows = SessionWindows.with(10L).grace(1234L); + + final StatefulProcessorNode<String, Long> node = new StatefulProcessorNode<>( + "asdf", + new ProcessorParameters<>( + new KStreamSessionWindowAggregate<String, Long, Integer>( + windows, + "asdf", + null, + null, + null + ), + "asdf" + ), + null, + null, + false + ); + + final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); + assertThat(extracted, is(windows.gracePeriodMs())); + } + + @Test + public void shouldExtractGraceFromAncestorThroughStatefulParent() { + final SessionWindows windows = SessionWindows.with(10L).grace(1234L); + final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>( + "asdf", + new ProcessorParameters<>(new KStreamSessionWindowAggregate<String, Long, Integer>( + windows, "asdf", null, null, null + ), "asdf"), + null, + null, + false + ); + + final StatefulProcessorNode<String, Long> statefulParent = new StatefulProcessorNode<>( + "stateful", + new ProcessorParameters<>( + () -> new Processor<Object, Object>() { + @Override + public void init(final ProcessorContext context) {} + + @Override + public void process(final Object key, final Object value) {} + + @Override + public void close() {} + }, + "dummy" + ), + null, + null, + false + ); + graceGrandparent.addChild(statefulParent); + + final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null); + statefulParent.addChild(node); + + final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); + assertThat(extracted, is(windows.gracePeriodMs())); + } + + @Test + public void shouldExtractGraceFromAncestorThroughStatelessParent() { + final SessionWindows windows = SessionWindows.with(10L).grace(1234L); + final StatefulProcessorNode<String, Long> graceGrandparent = new StatefulProcessorNode<>( + "asdf", + new ProcessorParameters<>( + new KStreamSessionWindowAggregate<String, Long, Integer>( + windows, + "asdf", + null, + null, + null + ), + "asdf" + ), + null, + null, + false + ); + + final ProcessorGraphNode<String, Long> statelessParent = new ProcessorGraphNode<>("stateless", null); + graceGrandparent.addChild(statelessParent); + + final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null); + statelessParent.addChild(node); + + final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); + assertThat(extracted, is(windows.gracePeriodMs())); + } + + @Test + public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() { + final StatefulProcessorNode<String, Long> leftParent = new StatefulProcessorNode<>( + "asdf", + new ProcessorParameters<>( + new KStreamSessionWindowAggregate<String, Long, Integer>( + SessionWindows.with(10L).grace(1234L), + "asdf", + null, + null, + null + ), + "asdf" + ), + null, + null, + false + ); + + final StatefulProcessorNode<String, Long> rightParent = new StatefulProcessorNode<>( + "asdf", + new ProcessorParameters<>( + new KStreamWindowAggregate<String, Long, Integer, TimeWindow>( + TimeWindows.of(10L).grace(4321L), + "asdf", + null, + null + ), + "asdf" + ), + null, + null, + false + ); + + final ProcessorGraphNode<String, Long> node = new ProcessorGraphNode<>("stateless", null); + leftParent.addChild(node); + rightParent.addChild(node); + + final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); + assertThat(extracted, is(4321L)); + } + +} diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java new file mode 100644 index 00000000000..466033316c7 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/suppress/KTableSuppressProcessorTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals.suppress; + +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.Suppressed; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.Change; +import org.apache.kafka.streams.kstream.internals.TimeWindow; +import org.apache.kafka.streams.processor.MockProcessorContext; +import org.apache.kafka.test.MockInternalProcessorContext; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.junit.Test; + +import java.time.Duration; +import java.util.Collection; + +import static java.time.Duration.ZERO; +import static java.time.Duration.ofMillis; +import static org.apache.kafka.streams.kstream.Suppressed.BufferConfig.unbounded; +import static org.apache.kafka.streams.kstream.Suppressed.untilTimeLimit; +import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.fail; + +@SuppressWarnings("PointlessArithmeticExpression") +public class KTableSuppressProcessorTest { + /** + * Use this value to indicate that the test correctness does not depend on any particular number + */ + private static final long ARBITRARY_LONG = 5L; + + /** + * Use this value to indicate that the test correctness does not depend on any particular window + */ + private static final TimeWindow ARBITRARY_WINDOW = new TimeWindow(0L, 100L); + + @Test + public void zeroTimeLimitShouldImmediatelyEmit() { + final KTableSuppressProcessor<String, Long> processor = + new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded()))); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = ARBITRARY_LONG; + context.setTimestamp(timestamp); + context.setStreamTime(timestamp); + final String key = "hey"; + final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG); + processor.process(key, value); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); + } + + @Test + public void windowedZeroTimeLimitShouldImmediatelyEmit() { + final KTableSuppressProcessor<Windowed<String>, Long> processor = + new KTableSuppressProcessor<>(getImpl(untilTimeLimit(ZERO, unbounded()))); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = ARBITRARY_LONG; + context.setTimestamp(timestamp); + context.setStreamTime(timestamp); + final Windowed<String> key = new Windowed<>("hey", ARBITRARY_WINDOW); + final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG); + processor.process(key, value); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); + } + + @Test + public void intermediateSuppressionShouldThrow() { + final KTableSuppressProcessor<String, Long> processor = + new KTableSuppressProcessor<>(getImpl(untilTimeLimit(Duration.ofMillis(1), unbounded()))); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + try { + processor.process("hey", new Change<>(null, 1L)); + fail("expected an exception for now"); + } catch (final KTableSuppressProcessor.NotImplementedException e) { + // expected + } + assertThat(context.forwarded(), hasSize(0)); + } + + + @SuppressWarnings("unchecked") + private <K extends Windowed> SuppressedImpl<K> finalResults(final Duration grace) { + return ((FinalResultsSuppressionBuilder) untilWindowCloses(unbounded())).buildFinalResultsSuppression(grace); + } + + + @Test + public void finalResultsSuppressionShouldThrow() { + final KTableSuppressProcessor<Windowed<String>, Long> processor = + new KTableSuppressProcessor<>(finalResults(ofMillis(1))); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + context.setTimestamp(ARBITRARY_LONG); + try { + processor.process(new Windowed<>("hey", ARBITRARY_WINDOW), new Change<>(ARBITRARY_LONG, ARBITRARY_LONG)); + fail("expected an exception for now"); + } catch (final KTableSuppressProcessor.NotImplementedException e) { + // expected + } + assertThat(context.forwarded(), hasSize(0)); + } + + @Test + public void finalResultsWith0GraceBeforeWindowEndShouldThrow() { + final KTableSuppressProcessor<Windowed<String>, Long> processor = + new KTableSuppressProcessor<>(finalResults(ofMillis(0))); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 5L; + context.setTimestamp(timestamp); + final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L)); + final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG); + try { + processor.process(key, value); + fail("expected an exception"); + } catch (final KTableSuppressProcessor.NotImplementedException e) { + // expected + } + assertThat(context.forwarded(), hasSize(0)); + } + + @Test + public void finalResultsWith0GraceAtWindowEndShouldImmediatelyEmit() { + final KTableSuppressProcessor<Windowed<String>, Long> processor = + new KTableSuppressProcessor<>(finalResults(ofMillis(0))); + + final MockInternalProcessorContext context = new MockInternalProcessorContext(); + processor.init(context); + + final long timestamp = 100L; + context.setTimestamp(timestamp); + context.setStreamTime(timestamp); + final Windowed<String> key = new Windowed<>("hey", new TimeWindow(0, 100L)); + final Change<Long> value = new Change<>(ARBITRARY_LONG, ARBITRARY_LONG); + processor.process(key, value); + + assertThat(context.forwarded(), hasSize(1)); + final MockProcessorContext.CapturedForward capturedForward = context.forwarded().get(0); + assertThat(capturedForward.keyValue(), is(new KeyValue<>(key, value))); + assertThat(capturedForward.timestamp(), is(timestamp)); + } + + private static <E> Matcher<Collection<E>> hasSize(final int i) { + return new BaseMatcher<Collection<E>>() { + @Override + public void describeTo(final Description description) { + description.appendText("a collection of size " + i); + } + + @SuppressWarnings("unchecked") + @Override + public boolean matches(final Object item) { + if (item == null) { + return false; + } else { + return ((Collection<E>) item).size() == i; + } + } + + }; + } + + private static <K> SuppressedImpl<K> getImpl(final Suppressed<K> suppressed) { + return (SuppressedImpl<K>) suppressed; + } +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java new file mode 100644 index 00000000000..14f8561030f --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.streams.processor.MockProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorNode; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.state.internals.ThreadCache; + +public class MockInternalProcessorContext extends MockProcessorContext implements InternalProcessorContext { + private ProcessorNode currentNode; + private long streamTime; + + @Override + public StreamsMetricsImpl metrics() { + return (StreamsMetricsImpl) super.metrics(); + } + + @Override + public ProcessorRecordContext recordContext() { + return new ProcessorRecordContext(timestamp(), offset(), partition(), topic(), headers()); + } + + @Override + public void setRecordContext(final ProcessorRecordContext recordContext) { + setRecordMetadata( + recordContext.topic(), + recordContext.partition(), + recordContext.offset(), + recordContext.headers(), + recordContext.timestamp() + ); + } + + @Override + public void setCurrentNode(final ProcessorNode currentNode) { + this.currentNode = currentNode; + } + + @Override + public ProcessorNode currentNode() { + return currentNode; + } + + @Override + public ThreadCache getCache() { + return null; + } + + @Override + public void initialize() { + + } + + @Override + public void uninitialize() { + + } + + @Override + public long streamTime() { + return streamTime; + } + + public void setStreamTime(final long streamTime) { + this.streamTime = streamTime; + } +} \ No newline at end of file diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java index cba02573b59..dc854b0a5f5 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/processor/MockProcessorContext.java @@ -405,13 +405,18 @@ public void cancel() { @SuppressWarnings("unchecked") @Override public <K, V> void forward(final K key, final V value) { - capturedForwards.add(new CapturedForward(To.all(), new KeyValue(key, value))); + forward(key, value, To.all()); } @SuppressWarnings("unchecked") @Override public <K, V> void forward(final K key, final V value, final To to) { - capturedForwards.add(new CapturedForward(to, new KeyValue(key, value))); + capturedForwards.add( + new CapturedForward( + to.timestamp == -1 ? to.withTimestamp(timestamp == null ? -1 : timestamp) : to, + new KeyValue(key, value) + ) + ); } @SuppressWarnings("deprecation") ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-328: Add in-memory Suppression > ---------------------------------- > > Key: KAFKA-7223 > URL: https://issues.apache.org/jira/browse/KAFKA-7223 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: John Roesler > Assignee: John Roesler > Priority: Major > > As described in > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.] > > This ticket is to implement Suppress, but only for in-memory buffers. > (depends on KAFKA-7222) -- This message was sent by Atlassian JIRA (v7.6.3#76005)