[ 
https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651087#comment-16651087
 ] 

ASF GitHub Bot commented on KAFKA-7080:
---------------------------------------

mjsax closed pull request #5804: KAFKA-7080 and KAFKA-7222: Cleanup overlapping 
KIP changes
URL: https://github.com/apache/kafka/pull/5804
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index b26f3c339cf..c0f28efe98b 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -134,6 +134,18 @@ <h3><a id="streams_api_changes_210" 
href="#streams_api_changes_210">Streams API
         see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-324%3A+Add+method+to+get+metrics%28%29+in+AdminClient";>KIP-324</a>
     </p>
 
+    <p>
+        We deprecated the notion of segments in window stores as those are 
intended to be an implementation details.
+        Thus, method <code>Windows#segments()</code> and variable 
<code>Windows#segments</code> were deprecated.
+        If you implement custom windows, you should update your code 
accordingly.
+        Similarly, <code>WindowBytesStoreSupplier#segments()</code> was 
deprecated and replaced with 
<code>WindowBytesStoreSupplier#segmentInterval()</code>.
+        If you implement custom window store, you need to update your code 
accordingly.
+       Finally, <code>Stores#persistentWindowStore(...)</code> were deprecated 
and replaced with a new overload that does not allow to specify the number of 
segments any longer.
+        For more details, see <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-319%3A+Replace+segments+with+segmentInterval+in+WindowBytesStoreSupplier";>KIP-319</a>
+        (note: <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables";>KIP-328</a>
 and 
+       <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times";>KIP-358</a>
 'overlap' with KIP-319).
+    </p>
+
     <h3><a id="streams_api_changes_200" 
href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
     <p>
         In 2.0.0 we have added a few new APIs on the 
<code>ReadOnlyWindowStore</code> interface (for details please read <a 
href="#streams_api_changes_200">Streams API changes</a> below).
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index 4dfba2306cb..feaee1e1336 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -83,21 +83,6 @@ public long maintainMs() {
         return maintainDurationMs;
     }
 
-    /**
-     * Return the segment interval in milliseconds.
-     *
-     * @return the segment interval
-     * @deprecated since 2.1. Instead, directly configure the segment interval 
in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}.
-     */
-    @Deprecated
-    public long segmentInterval() {
-        // Pinned arbitrarily to a minimum of 60 seconds. Profiling may 
indicate a different value is more efficient.
-        final long minimumSegmentInterval = 60_000L;
-        // Scaled to the (possibly overridden) retention period
-        return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval);
-    }
-
-
     /**
      * Set the number of segments to be used for rolling the window store.
      * This function is not exposed to users but can be called by developers 
that extend this class.
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index 30e51403714..f7a182472be 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.state;
 
-import java.time.Duration;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
@@ -32,6 +31,7 @@
 import org.apache.kafka.streams.state.internals.SessionStoreBuilder;
 import org.apache.kafka.streams.state.internals.WindowStoreBuilder;
 
+import java.time.Duration;
 import java.util.Objects;
 
 /**
@@ -155,7 +155,7 @@ public String metricsScope() {
      *                              careful to set it the same as the windowed 
keys you're actually storing.
      * @param retainDuplicates      whether or not to retain duplicates.
      * @return an instance of {@link WindowBytesStoreSupplier}
-     * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, 
long, long, boolean, long)} instead
+     * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, 
Duration, Duration, boolean)} instead
      */
     @Deprecated
     public static WindowBytesStoreSupplier persistentWindowStore(final String 
name,
@@ -178,28 +178,6 @@ public static WindowBytesStoreSupplier 
persistentWindowStore(final String name,
         );
     }
 
-    /**
-     * Create a persistent {@link WindowBytesStoreSupplier}.
-     * @param name                  name of the store (cannot be {@code null})
-     * @param retentionPeriod       length of time to retain data in the store 
(cannot be negative)
-     *                              Note that the retention period must be at 
least long enough to contain the
-     *                              windowed data's entire life cycle, from 
window-start through window-end,
-     *                              and for the entire grace period.
-     * @param windowSize            size of the windows (cannot be negative)
-     * @param retainDuplicates      whether or not to retain duplicates.
-     * @return an instance of {@link WindowBytesStoreSupplier}
-     * @deprecated Use {@link #persistentWindowStore(String, Duration, 
Duration, boolean)} instead
-     */
-    @Deprecated
-    public static WindowBytesStoreSupplier persistentWindowStore(final String 
name,
-                                                                 final long 
retentionPeriod,
-                                                                 final long 
windowSize,
-                                                                 final boolean 
retainDuplicates) {
-        // we're arbitrarily defaulting to segments no smaller than one minute.
-        final long defaultSegmentInterval = Math.max(retentionPeriod / 2, 
60_000L);
-        return persistentWindowStore(name, retentionPeriod, windowSize, 
retainDuplicates, defaultSegmentInterval);
-    }
-
     /**
      * Create a persistent {@link WindowBytesStoreSupplier}.
      * @param name                  name of the store (cannot be {@code null})
@@ -220,28 +198,16 @@ public static WindowBytesStoreSupplier 
persistentWindowStore(final String name,
         ApiUtils.validateMillisecondDuration(retentionPeriod, 
"retentionPeriod");
         ApiUtils.validateMillisecondDuration(windowSize, "windowSize");
 
-        return persistentWindowStore(name, retentionPeriod.toMillis(), 
windowSize.toMillis(), retainDuplicates);
+        final long defaultSegmentInterval = 
Math.max(retentionPeriod.toMillis() / 2, 60_000L);
+
+        return persistentWindowStore(name, retentionPeriod.toMillis(), 
windowSize.toMillis(), retainDuplicates, defaultSegmentInterval);
     }
 
-    /**
-     * Create a persistent {@link WindowBytesStoreSupplier}.
-     * @param name                  name of the store (cannot be {@code null})
-     * @param retentionPeriod       length of time to retain data in the store 
(cannot be negative)
-     *                              Note that the retention period must be at 
least long enough to contain the
-     *                              windowed data's entire life cycle, from 
window-start through window-end,
-     *                              and for the entire grace period.
-     * @param segmentInterval       size of segments in ms (cannot be negative)
-     * @param windowSize            size of the windows (cannot be negative)
-     * @param retainDuplicates      whether or not to retain duplicates.
-     * @return an instance of {@link WindowBytesStoreSupplier}
-     * @deprecated Use {@link #persistentWindowStore(String, Duration, 
Duration, boolean)} instead
-     */
-    @Deprecated
-    public static WindowBytesStoreSupplier persistentWindowStore(final String 
name,
-                                                                 final long 
retentionPeriod,
-                                                                 final long 
windowSize,
-                                                                 final boolean 
retainDuplicates,
-                                                                 final long 
segmentInterval) {
+    private static WindowBytesStoreSupplier persistentWindowStore(final String 
name,
+                                                                  final long 
retentionPeriod,
+                                                                  final long 
windowSize,
+                                                                  final 
boolean retainDuplicates,
+                                                                  final long 
segmentInterval) {
         Objects.requireNonNull(name, "name cannot be null");
         if (retentionPeriod < 0L) {
             throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
@@ -269,7 +235,9 @@ public static WindowBytesStoreSupplier 
persistentWindowStore(final String name,
      *                          windowed data's entire life cycle, from 
window-start through window-end,
      *                          and for the entire grace period.
      * @return an instance of a {@link  SessionBytesStoreSupplier}
+     * @deprecated since 2.1 Use {@link Stores#persistentSessionStore(String, 
Duration)} instead
      */
+    @Deprecated
     public static SessionBytesStoreSupplier persistentSessionStore(final 
String name,
                                                                    final long 
retentionPeriod) {
         Objects.requireNonNull(name, "name cannot be null");
@@ -288,6 +256,7 @@ public static SessionBytesStoreSupplier 
persistentSessionStore(final String name
      *                          and for the entire grace period.
      * @return an instance of a {@link  SessionBytesStoreSupplier}
      */
+    @SuppressWarnings("deprecation")
     public static SessionBytesStoreSupplier persistentSessionStore(final 
String name,
                                                                    final 
Duration retentionPeriod) {
         ApiUtils.validateMillisecondDuration(retentionPeriod, 
"retentionPeriod");


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> WindowStoreBuilder incorrectly initializes CachingWindowStore
> -------------------------------------------------------------
>
>                 Key: KAFKA-7080
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7080
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0, 1.0.1, 1.1.0, 2.0.0
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>             Fix For: 2.1.0
>
>
> When caching is enabled on the WindowStoreBuilder, it creates a 
> CachingWindowStore. However, it incorrectly passes storeSupplier.segments() 
> (the number of segments) to the segmentInterval argument.
>  
> The impact is low, since any valid number of segments is also a valid segment 
> size, but it likely results in much smaller segments than intended. For 
> example, the segments may be sized 3ms instead of 60,000ms.
>  
> Ideally the WindowBytesStoreSupplier interface would allow suppliers to 
> advertise their segment size instead of segment count. I plan to create a KIP 
> to propose this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to