[ https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651087#comment-16651087 ]
ASF GitHub Bot commented on KAFKA-7080: --------------------------------------- mjsax closed pull request #5804: KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes URL: https://github.com/apache/kafka/pull/5804 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index b26f3c339cf..c0f28efe98b 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -134,6 +134,18 @@ <h3><a id="streams_api_changes_210" href="#streams_api_changes_210">Streams API see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-324%3A+Add+method+to+get+metrics%28%29+in+AdminClient">KIP-324</a> </p> + <p> + We deprecated the notion of segments in window stores as those are intended to be an implementation details. + Thus, method <code>Windows#segments()</code> and variable <code>Windows#segments</code> were deprecated. + If you implement custom windows, you should update your code accordingly. + Similarly, <code>WindowBytesStoreSupplier#segments()</code> was deprecated and replaced with <code>WindowBytesStoreSupplier#segmentInterval()</code>. + If you implement custom window store, you need to update your code accordingly. + Finally, <code>Stores#persistentWindowStore(...)</code> were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer. + For more details, see <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-319%3A+Replace+segments+with+segmentInterval+in+WindowBytesStoreSupplier">KIP-319</a> + (note: <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables">KIP-328</a> and + <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a> 'overlap' with KIP-319). + </p> + <h3><a id="streams_api_changes_200" href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3> <p> In 2.0.0 we have added a few new APIs on the <code>ReadOnlyWindowStore</code> interface (for details please read <a href="#streams_api_changes_200">Streams API changes</a> below). diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index 4dfba2306cb..feaee1e1336 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -83,21 +83,6 @@ public long maintainMs() { return maintainDurationMs; } - /** - * Return the segment interval in milliseconds. - * - * @return the segment interval - * @deprecated since 2.1. Instead, directly configure the segment interval in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}. - */ - @Deprecated - public long segmentInterval() { - // Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient. - final long minimumSegmentInterval = 60_000L; - // Scaled to the (possibly overridden) retention period - return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval); - } - - /** * Set the number of segments to be used for rolling the window store. * This function is not exposed to users but can be called by developers that extend this class. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 30e51403714..f7a182472be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state; -import java.time.Duration; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -32,6 +31,7 @@ import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; +import java.time.Duration; import java.util.Objects; /** @@ -155,7 +155,7 @@ public String metricsScope() { * careful to set it the same as the windowed keys you're actually storing. * @param retainDuplicates whether or not to retain duplicates. * @return an instance of {@link WindowBytesStoreSupplier} - * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead + * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead */ @Deprecated public static WindowBytesStoreSupplier persistentWindowStore(final String name, @@ -178,28 +178,6 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, ); } - /** - * Create a persistent {@link WindowBytesStoreSupplier}. - * @param name name of the store (cannot be {@code null}) - * @param retentionPeriod length of time to retain data in the store (cannot be negative) - * Note that the retention period must be at least long enough to contain the - * windowed data's entire life cycle, from window-start through window-end, - * and for the entire grace period. - * @param windowSize size of the windows (cannot be negative) - * @param retainDuplicates whether or not to retain duplicates. - * @return an instance of {@link WindowBytesStoreSupplier} - * @deprecated Use {@link #persistentWindowStore(String, Duration, Duration, boolean)} instead - */ - @Deprecated - public static WindowBytesStoreSupplier persistentWindowStore(final String name, - final long retentionPeriod, - final long windowSize, - final boolean retainDuplicates) { - // we're arbitrarily defaulting to segments no smaller than one minute. - final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 60_000L); - return persistentWindowStore(name, retentionPeriod, windowSize, retainDuplicates, defaultSegmentInterval); - } - /** * Create a persistent {@link WindowBytesStoreSupplier}. * @param name name of the store (cannot be {@code null}) @@ -220,28 +198,16 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod"); ApiUtils.validateMillisecondDuration(windowSize, "windowSize"); - return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates); + final long defaultSegmentInterval = Math.max(retentionPeriod.toMillis() / 2, 60_000L); + + return persistentWindowStore(name, retentionPeriod.toMillis(), windowSize.toMillis(), retainDuplicates, defaultSegmentInterval); } - /** - * Create a persistent {@link WindowBytesStoreSupplier}. - * @param name name of the store (cannot be {@code null}) - * @param retentionPeriod length of time to retain data in the store (cannot be negative) - * Note that the retention period must be at least long enough to contain the - * windowed data's entire life cycle, from window-start through window-end, - * and for the entire grace period. - * @param segmentInterval size of segments in ms (cannot be negative) - * @param windowSize size of the windows (cannot be negative) - * @param retainDuplicates whether or not to retain duplicates. - * @return an instance of {@link WindowBytesStoreSupplier} - * @deprecated Use {@link #persistentWindowStore(String, Duration, Duration, boolean)} instead - */ - @Deprecated - public static WindowBytesStoreSupplier persistentWindowStore(final String name, - final long retentionPeriod, - final long windowSize, - final boolean retainDuplicates, - final long segmentInterval) { + private static WindowBytesStoreSupplier persistentWindowStore(final String name, + final long retentionPeriod, + final long windowSize, + final boolean retainDuplicates, + final long segmentInterval) { Objects.requireNonNull(name, "name cannot be null"); if (retentionPeriod < 0L) { throw new IllegalArgumentException("retentionPeriod cannot be negative"); @@ -269,7 +235,9 @@ public static WindowBytesStoreSupplier persistentWindowStore(final String name, * windowed data's entire life cycle, from window-start through window-end, * and for the entire grace period. * @return an instance of a {@link SessionBytesStoreSupplier} + * @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, Duration)} instead */ + @Deprecated public static SessionBytesStoreSupplier persistentSessionStore(final String name, final long retentionPeriod) { Objects.requireNonNull(name, "name cannot be null"); @@ -288,6 +256,7 @@ public static SessionBytesStoreSupplier persistentSessionStore(final String name * and for the entire grace period. * @return an instance of a {@link SessionBytesStoreSupplier} */ + @SuppressWarnings("deprecation") public static SessionBytesStoreSupplier persistentSessionStore(final String name, final Duration retentionPeriod) { ApiUtils.validateMillisecondDuration(retentionPeriod, "retentionPeriod"); ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > WindowStoreBuilder incorrectly initializes CachingWindowStore > ------------------------------------------------------------- > > Key: KAFKA-7080 > URL: https://issues.apache.org/jira/browse/KAFKA-7080 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.0.0, 1.0.1, 1.1.0, 2.0.0 > Reporter: John Roesler > Assignee: John Roesler > Priority: Major > Fix For: 2.1.0 > > > When caching is enabled on the WindowStoreBuilder, it creates a > CachingWindowStore. However, it incorrectly passes storeSupplier.segments() > (the number of segments) to the segmentInterval argument. > > The impact is low, since any valid number of segments is also a valid segment > size, but it likely results in much smaller segments than intended. For > example, the segments may be sized 3ms instead of 60,000ms. > > Ideally the WindowBytesStoreSupplier interface would allow suppliers to > advertise their segment size instead of segment count. I plan to create a KIP > to propose this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)