[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458456#comment-16458456
 ] 

ASF GitHub Bot commented on KAFKA-6535:
---------------------------------------

mjsax closed pull request #4730: KAFKA-6535: Set default retention ms for 
Streams repartition topics to Long.MAX_VALUE
URL: https://github.com/apache/kafka/pull/4730
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 565bd0b263c..462824fdcb4 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -100,6 +100,7 @@ <h1>Upgrade Guide and API Changes</h1>
     </ul>
 
     <!-- TODO: verify release verion and update `id` and `href` attributes 
(also at other places that link to this headline) -->
+    
     <h3><a id="streams_api_changes_120" 
href="#streams_api_changes_120">Streams API changes in 1.2.0</a></h3>
     <p>
         We have removed the <code>skippedDueToDeserializationError-rate</code> 
and <code>skippedDueToDeserializationError-total</code> metrics.
@@ -156,7 +157,10 @@ <h3><a id="streams_api_changes_120" 
href="#streams_api_changes_120">Streams API
       The new class <code>To</code> allows you to send records to all or 
specific downstream processors by name and to set the timestamp for the output 
record.
       Forwarding based on child index is not supported in the new API any 
longer.
     </p>
-
+    <p>
+        <a href="https://cwiki.apache.org/confluence/x/DVyHB";>KIP-284</a> 
changed the retention time for repartition topics by setting its default value 
to <code>Long.MAX_VALUE</code>.
+        Instead of relying on data retention Kafka Streams uses the new purge 
data API to delete consumed data from those topics and to keep used storage 
small now.
+    </p>
     <p>
       Kafka Streams DSL for Scala is a new Kafka Streams client library 
available for developers authoring Kafka Streams applications in Scala.  It 
wraps core Kafka Streams DSL types to make it easier to call when
       interoperating with Scala code.  For example, it includes higher order 
functions as parameters for transformations avoiding the need anonymous classes 
in Java 7 or experimental SAM type conversions in Scala 2.11, automatic 
conversion between Java and Scala collection types, a way
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 08cc892d24c..4fe7e20794e 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -75,6 +75,7 @@ <h5><a id="upgrade_120_notable" 
href="#upgrade_120_notable">Notable changes in 1
         updated to aggregate across different versions.
     </li>
     <li> New Kafka Streams configuration parameter <code>upgrade.from</code> 
added that allows rolling bounce upgrade from older version. </li>
+    <li><a href="https://cwiki.apache.org/confluence/x/DVyHB";>KIP-284</a> 
changed the retention time for repartition topics by setting its default value 
to <code>Long.MAX_VALUE</code>.</li>
 </ul>
 
 <h5><a id="upgrade_120_new_protocols" href="#upgrade_120_new_protocols">New 
Protocol Versions</a></h5>
@@ -87,7 +88,6 @@ <h5><a id="upgrade_120_streams" 
href="#upgrade_120_streams">Upgrading a 1.2.0 Ka
     <li> See <a 
href="/{{version}}/documentation/streams/upgrade-guide#streams_api_changes_120">Streams
 API changes in 1.2.0</a> for more details. </li>
 </ul>
 
-
 <h4><a id="upgrade_1_1_0" href="#upgrade_1_1_0">Upgrading from 0.8.x, 0.9.x, 
0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x or 1.0.x to 1.1.x</a></h4>
 <p>Kafka 1.1.0 introduces wire protocol changes. By following the recommended 
rolling upgrade plan below,
     you guarantee no downtime during the upgrade. However, please review the 
<a href="#upgrade_110_notable">notable changes in 1.1.0</a> before upgrading.
@@ -132,6 +132,7 @@ <h4><a id="upgrade_1_1_0" href="#upgrade_1_1_0">Upgrading 
from 0.8.x, 0.9.x, 0.1
         Hot-swaping the jar-file only might not work.</li>
 </ol>
 
+
 <!-- TODO add if 1.1.1 gets release
 <h5><a id="upgrade_111_notable" href="#upgrade_111_notable">Notable changes in 
1.1.1</a></h5>
 <ul>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
index 1459310309c..ca8fbff4a29 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopicConfig.java
@@ -33,9 +33,10 @@
     static {
         final Map<String, String> tempTopicDefaultOverrides = new HashMap<>();
         tempTopicDefaultOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, 
TopicConfig.CLEANUP_POLICY_DELETE);
-        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 
"52428800");     // 50 MB
-        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, 
"52428800");           // 50 MB
-        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_MS_CONFIG, 
"600000");                // 10 min
+        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, 
"52428800");               // 50 MB
+        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_BYTES_CONFIG, 
"52428800");                     // 50 MB
+        tempTopicDefaultOverrides.put(TopicConfig.SEGMENT_MS_CONFIG, 
"600000");                          // 10 min
+        tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, 
String.valueOf(Long.MAX_VALUE));  // Infinity
         REPARTITION_TOPIC_DEFAULT_OVERRIDES = 
Collections.unmodifiableMap(tempTopicDefaultOverrides);
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 1469d180d24..2d9c8c4bdc9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -164,7 +164,7 @@ public void shouldCompactTopicsForKeyValueStoreChangelogs() 
throws Exception {
 
         final Properties repartitionProps = getTopicProperties(appID + 
"-Counts-repartition");
         assertEquals(LogConfig.Delete(), 
repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
-        assertEquals(4, repartitionProps.size());
+        assertEquals(5, repartitionProps.size());
     }
 
     @Test
@@ -213,6 +213,6 @@ public void 
shouldCompactAndDeleteTopicsForWindowStoreChangelogs() throws Except
 
         final Properties repartitionProps = getTopicProperties(appID + 
"-CountWindows-repartition");
         assertEquals(LogConfig.Delete(), 
repartitionProps.getProperty(LogConfig.CleanupPolicyProp()));
-        assertEquals(4, repartitionProps.size());
+        assertEquals(5, repartitionProps.size());
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index f948c2c7fae..e3b888dea1d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -588,7 +588,8 @@ public void 
shouldAddInternalTopicConfigForRepartitionTopics() {
         final TopicsInfo topicsInfo = 
builder.topicGroups().values().iterator().next();
         final InternalTopicConfig topicConfig = 
topicsInfo.repartitionSourceTopics.get("appId-foo");
         final Map<String, String> properties = 
topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
-        assertEquals(4, properties.size());
+        assertEquals(5, properties.size());
+        assertEquals(String.valueOf(Long.MAX_VALUE), 
properties.get(TopicConfig.RETENTION_MS_CONFIG));
         assertEquals("appId-foo", topicConfig.name());
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
index eaae352f36d..ea7a926c1f1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicConfigTest.java
@@ -57,4 +57,12 @@ public void shouldUseSuppliedConfigs() {
         assertEquals("1000", properties.get("retention.ms"));
         assertEquals("10000", properties.get("retention.bytes"));
     }
+
+    @Test
+    public void shouldUseSuppliedConfigsForRepartitionConfig() {
+        final Map<String, String> configs = new HashMap<>();
+        configs.put("retention.ms", "1000");
+        final RepartitionTopicConfig topicConfig = new 
RepartitionTopicConfig("name", configs);
+        assertEquals("1000", topicConfig.getProperties(Collections.<String, 
String>emptyMap(), 0).get(TopicConfig.RETENTION_MS_CONFIG));
+    }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
index 25468c0d449..b3663fa0279 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java
@@ -554,7 +554,8 @@ public void 
shouldAddInternalTopicConfigForRepartitionTopics() throws Exception
         final InternalTopologyBuilder.TopicsInfo topicsInfo = 
builder.topicGroups().values().iterator().next();
         final InternalTopicConfig topicConfig = 
topicsInfo.repartitionSourceTopics.get("appId-foo");
         final Map<String, String> properties = 
topicConfig.getProperties(Collections.<String, String>emptyMap(), 10000);
-        assertEquals(4, properties.size());
+        assertEquals(5, properties.size());
+        assertEquals(String.valueOf(Long.MAX_VALUE), 
properties.get(TopicConfig.RETENTION_MS_CONFIG));
         assertEquals(TopicConfig.CLEANUP_POLICY_DELETE, 
properties.get(TopicConfig.CLEANUP_POLICY_CONFIG));
         assertEquals("appId-foo", topicConfig.name());
         assertTrue(topicConfig instanceof RepartitionTopicConfig);


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Set default retention ms for Streams repartition topics to Long.MAX_VALUE
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-6535
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6535
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Guozhang Wang
>            Assignee: Khaireddine Rezgui
>            Priority: Major
>              Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.
> More specifically, in {{RepartitionTopicConfig}} we have a few default 
> overrides for repartition topic configs, we should just add the override for 
> {{TopicConfig.RETENTION_MS_CONFIG}} to set it to Long.MAX_VALUE. This still 
> allows users to override themselves if they want via 
> {{StreamsConfig.TOPIC_PREFIX}}. We need to add unit test to verify this 
> update takes effect.
> In addition to the code change, we also need to have doc changes in 
> streams/upgrade_guide.html specifying this default value change.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to