vvcephei commented on a change in pull request #9027: URL: https://github.com/apache/kafka/pull/9027#discussion_r455299303
########## File path: docs/streams/developer-guide/config-streams.html ########## @@ -181,6 +193,7 @@ <h4><a class="toc-backref" href="#id5">bootstrap.servers</a><a class="headerlink <tr class="row-even"><td>cache.max.bytes.buffering</td> <td>Medium</td> <td colspan="2">Maximum number of memory bytes to be used for record caches across all threads.</td> + <td colspan="2">Maximum number of memory bytes to be used for record caches across all threads.</td> Review comment: duplicate? ########## File path: docs/streams/developer-guide/config-streams.html ########## @@ -270,43 +319,47 @@ <h4><a class="toc-backref" href="#id5">bootstrap.servers</a><a class="headerlink <td colspan="2">The amount of time in milliseconds, before a request is retried. This applies if the <code class="docutils literal"><span class="pre">retries</span></code> parameter is configured to be greater than 0. </td> <td>100</td> </tr> - <tr class="row-even"><td>rocksdb.config.setter</td> + <tr class="row-odd"><td>rocksdb.config.setter</td> <td>Medium</td> <td colspan="2">The RocksDB configuration.</td> <td></td> </tr> - <tr class="row-odd"><td>state.cleanup.delay.ms</td> + <tr class="row-even"><td>state.cleanup.delay.ms</td> <td>Low</td> <td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td> <td>600000 milliseconds</td> </tr> - <tr class="row-even"><td>state.dir</td> + <tr class="row-odd"><td>state.dir</td> <td>High</td> <td colspan="2">Directory location for state stores.</td> <td><code class="docutils literal"><span class="pre">/tmp/kafka-streams</span></code></td> </tr> - <tr class="row-odd"><td>timestamp.extractor</td> + <tr class="row-even"><td>topology.optimization</td> <td>Medium</td> - <td colspan="2">Timestamp extractor class that implements the <code class="docutils literal"><span class="pre">TimestampExtractor</span></code> interface.</td> - <td>See <a class="reference internal" href="#streams-developer-guide-timestamp-extractor"><span class="std std-ref">Timestamp Extractor</span></a></td> + <td colspan="2">A configuration telling Kafka Streams if it should optimize the topology</td> + <td>none</td> </tr> - <tr class="row-even"><td>upgrade.from</td> + <tr class="row-odd"><td>upgrade.from</td> <td>Medium</td> <td colspan="2">The version you are upgrading from during a rolling upgrade.</td> <td>See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td> </tr> - <tr class="row-odd"><td>value.serde</td> - <td>Medium</td> - <td colspan="2">Default serializer/deserializer class for record values, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also key.serde).</td> - <td><code class="docutils literal"><span class="pre">Serdes.ByteArray().getClass().getName()</span></code></td> - </tr> <tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td> <td>Low</td> <td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td> <td>86400000 milliseconds = 1 day</td> </tr> </tbody> </table> + <div class="section" id="acceptable-recovery-lag"> + <h4><a class="toc-backref" href="#id27">acceptable.recovery.lag</a><a class="headerlink" href="#acceptable-recovery-lag" title="Permalink to this headline"></a></h4> + <blockquote> + <div> + The maximum acceptable lag (total number of offsets to catch up from the changelog) for an instance to be considered caught-up and able to receive an active task. Streams will only assign + stateful active tasks to instances whose state stores are within the acceptable recovery lag, if any exist, and assign warmup replicas to restore state in the background for instances + that are not yet caught up. Should correspond to a recovery time of well under a minute for a given workload. Must be at least 0. Review comment: Maybe we can also mention that if you set it to Long.MAX_VALUE, it effectively disables warmups and HA task migration, allowing Streams to produce a balanced assignment in one shot. ########## File path: docs/streams/developer-guide/running-app.html ########## @@ -110,6 +110,18 @@ <h3><a class="toc-backref" href="#id6">Removing capacity from your application</ <li>If a local state store exists, the changelog is replayed from the previously checkpointed offset. The changes are applied and the state is restored to the most recent snapshot. This method takes less time because it is applying a smaller portion of the changelog.</li> </ul> <p>For more information, see <a class="reference internal" href="config-streams.html#num-standby-replicas"><span class="std std-ref">Standby Replicas</span></a>.</p> + <p> + As of version 2.6, Streams will now do most of a task's restoration in the background through warmup replicas. These will be assigned to instances that need to restore a lot of state for a task. + A stateful active task will only be assigned to an instance once it's state is within the configured Review comment: ```suggestion A stateful active task will only be assigned to an instance once its state is within the configured ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ########## @@ -639,16 +643,16 @@ Serdes.ByteArraySerde.class.getName(), Importance.MEDIUM, DEFAULT_VALUE_SERDE_CLASS_DOC) - .define(NUM_STANDBY_REPLICAS_CONFIG, - Type.INT, - 0, + .define(DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS, + Type.CLASS, + null, Importance.MEDIUM, - NUM_STANDBY_REPLICAS_DOC) - .define(NUM_STREAM_THREADS_CONFIG, - Type.INT, - 1, + DEFAULT_WINDOWED_KEY_SERDE_INNER_CLASS_DOC) + .define(DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS, + Type.CLASS, + null, Importance.MEDIUM, - NUM_STREAM_THREADS_DOC) + DEFAULT_WINDOWED_VALUE_SERDE_INNER_CLASS_DOC) Review comment: Woah. Good catch. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org