[ 
https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16651091#comment-16651091
 ] 

ASF GitHub Bot commented on KAFKA-7223:
---------------------------------------

mjsax closed pull request #5787: KAFKA-7223: Suppression documentation
URL: https://github.com/apache/kafka/pull/5787
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/ops.html b/docs/ops.html
index d57f1cf20ff..158602b96a1 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -257,7 +257,7 @@ <h4><a id="basic_ops_consumer_group" 
href="#basic_ops_consumer_group">Managing C
     </li>
   </ul>
 
-  Please note, that out of range offsets will be adjusted to available offset 
end. For example, if offset end is at 10 and offset shift request is 
+  Please note, that out of range offsets will be adjusted to available offset 
end. For example, if offset end is at 10 and offset shift request is
   of 15, then, offset at 10 will actually be selected.
 
   <p>
@@ -1546,6 +1546,16 @@ <h5><a id="kafka_streams_task_monitoring" 
href="#kafka_streams_task_monitoring">
         <td>The total number of commit calls. </td>
         
<td>kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)</td>
       </tr>
+      <tr>
+        <td>record-lateness-avg</td>
+        <td>The average observed lateness of records.</td>
+        
<td>kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)</td>
+      </tr>
+      <tr>
+        <td>record-lateness-max</td>
+        <td>The max observed lateness of records.</td>
+        
<td>kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+)</td>
+      </tr>
  </tbody>
 </table>
 
diff --git a/docs/streams/developer-guide/dsl-api.html 
b/docs/streams/developer-guide/dsl-api.html
index 1622702c540..0ff28a8db69 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -58,6 +58,7 @@
                             <li><a class="reference internal" 
href="#hopping-time-windows" id="id21">Hopping time windows</a></li>
                             <li><a class="reference internal" 
href="#sliding-time-windows" id="id22">Sliding time windows</a></li>
                             <li><a class="reference internal" 
href="#session-windows" id="id23">Session Windows</a></li>
+                            <li><a class="reference internal" 
href="#window-final-results" id="id31">Window Final Results</a></li>
                         </ul>
                         </li>
                     </ul>
@@ -65,6 +66,7 @@
                     <li><a class="reference internal" 
href="#applying-processors-and-transformers-processor-api-integration" 
id="id24">Applying processors and transformers (Processor API 
integration)</a></li>
                 </ul>
                 </li>
+                <li><a class="reference internal" 
href="#controlling-emit-rate" id="id32">Controlling KTable update rate</a></li>
                 <li><a class="reference internal" 
href="#writing-streams-back-to-kafka" id="id25">Writing streams back to 
Kafka</a></li>
                 <li><a class="reference internal" 
href="#testing-a-streams-app" id="id26">Testing a Streams application</a></li>
                 <li><a class="reference internal" href="#scala-dsl" 
id="id27">Kafka Streams DSL for Scala</a></li>
@@ -2969,6 +2971,73 @@ <h4><a id="streams_concepts_globalktable" 
href="#streams_concepts_globalktable">
 t=5 (blue), which lead to a merge of sessions and an extension of a session, 
respectively.</span></p>
                         </div>
                     </div>
+                   <div class="section" id="window-final-results">
+                           <span id="windowing-final-results"></span><h5><a 
class="toc-backref" href="#id31">Window Final Results</a><a class="headerlink" 
href="#window-final-results" title="Permalink to this headline"></a></h5>
+                           <p>In Kafka Streams, windowed computations update 
their results continuously.
+                              As new data arrives for a window, freshly 
computed results are emitted downstream.
+                              For many applications, this is ideal, since 
fresh results are always available.
+                              and Kafka Streams is designed to make 
programming continuous computations seamless.
+                              However, some applications need to take action 
<strong>only</strong> on the final result of a windowed computation.
+                              Common examples of this are sending alerts or 
delivering results to a system that doesn't support updates.
+                           </p>
+                           <p>Suppose that you have an hourly windowed count 
of events per user.
+                              If you want to send an alert when a user has 
<em>less than</em> three events in an hour, you have a real challange.
+                              All users would match this condition at first, 
until they accrue enough events, so you cannot simply
+                              send an alert when someone matches the 
condition; you have to wait until you know you won't see any more events for a 
particular window
+                              and <em>then</em> send the alert.
+                           </p>
+                            <p>Kafka Streams offers a clean way to define this 
logic: after defining your windowed computation, you can
+                              <span class="pre">suppress</span> the 
intermediate results, emitting the final count for each user when the window is 
<strong>closed</strong>.
+                           </p>
+                           <p>For example:</p>
+                           <div class="highlight-java"><div class="highlight">
+<pre>
+KGroupedStream&lt;UserId, Event&gt; grouped = ...;
+grouped
+    .windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10)))
+    .count()
+    .suppress(Suppressed.untilWindowCloses(unbounded()))
+    .filter((windowedUserId, count) -&gt; count &lt; 3)
+    .toStream()
+    .foreach((windowedUserId, count) -&gt; sendAlert(windowedUserId.window(), 
windowedUserId.key(), count));
+</pre>
+                           </div></div>
+                           <p>The key parts of this program are:
+                           <dl>
+                                   <dt><code>grace(ofMinutes(10))</code></dt>
+                                   <dd>This allows us to bound the lateness of 
events the window will accept.
+                                       For example, the 09:00 to 10:00 window 
will accept late-arriving records until 10:10, at which point, the window is 
<strong>closed</strong>.
+                                   </dd>
+                                   
<dt><code>.suppress(Suppressed.untilWindowCloses(...))</code></dt>
+                                   <dd>This configures the suppression 
operator to emit nothing for a window until it closes, and then emit the final 
result.
+                                       For example, if user <code>U</code> 
gets 10 events between 09:00 and 10:10, the <code>filter</code> downstream of 
the suppression
+                                       will get no events for the windowed key 
<code>U@09:00-10:00</code> until 10:10, and then it will get exactly one with 
the value <code>10</code>.
+                                       This is the final result of the 
windowed count.
+                                   </dd>
+                                   <dt><code>unbounded()</code></dt>
+                                   <dd>
+                                     This configures the buffer used for 
storing events
+                                     until their windows close.
+                                     Production code is able to put a cap on 
the amount
+                                     of memory to use for the buffer,
+                                     but this simple example creates a buffer 
with no
+                                     upper bound.
+                                   </dd>
+                           </dl>
+                           </p>
+                           <p>
+                             One thing to note is that suppression is just 
like any other
+                             Kafka Streams operator, so you can build a 
topology with two
+                             branches emerging from the <code>count</code>,
+                             one suppressed, and one not, or even multiple 
differently
+                             configured suppressions.
+                             This allows you to apply suppressions where they 
are needed
+                             and otherwise rely on the default continuous 
update behavior.
+                           </p>
+                           <p>For more detailed information, see the JavaDoc 
on the <code>Suppressed</code> config object
+                              and <a 
href="https://cwiki.apache.org/confluence/x/sQU0BQ"; title="KIP-328">KIP-328</a>.
+                           </p>
+                   </div>
                 </div>
             </div>
             <div class="section" 
id="applying-processors-and-transformers-processor-api-integration">
@@ -3134,6 +3203,50 @@ <h4><a id="streams_concepts_globalktable" 
href="#streams_concepts_globalktable">
                 </div>
             </div>
         </div>
+       
+       <div class="section" id="controlling-emit-rate">
+            <span id="streams-developer-guide-dsl-suppression"></span><h2><a 
class="toc-backref" href="#id32">Controlling KTable emit rate</a><a 
class="headerlink" href="#controlling-emit-rate" title="Permalink to this 
headline"></a></h2>
+           <p>A KTable is logically a continuously updated table.
+              These updates make their way to downstream operators whenever 
new data is available, ensuring that the whole computation is as fresh as 
possible.
+              Logically speaking, most programs describe a series of 
transformations, and the update rate is not a factor in the program behavior.
+              In these cases, the rate of update is more of a performance 
concern.
+              Operators are able to optimize both the network traffic (to the 
Kafka brokers) and the disk traffic (to the local state stores) by adjusting
+              commit interval and batch size configurations.
+           </p>
+           <p>However, some applications need to take other actions, such as 
calling out to external systems, 
+              and therefore need to exercise some control over the rate of 
invocations, for example of <code>KStream#foreach</code>.
+           </p>
+           <p>Rather than achieving this as a side-effect of the  <a 
class="reference internal" 
href="memory-mgmt.html#streams-developer-guide-memory-management-record-cache"><span
 class="std std-ref">KTable record cache</span></a>,
+              you can directly impose a rate limit via the 
<code>KTable#suppress</code> operator.
+           </p>
+           <p>For example:
+           </p>
+           <div class="highlight-java"><div class="highlight">
+<pre>
+KGroupedTable&lt;String, String&gt; groupedTable = ...;
+groupedTable
+    .count()
+    .suppress(untilTimeLimit(ofMinutes(5), 
maxBytes(1_000_000L).emitEarlyWhenFull()))
+    .toStream()
+    .foreach((key, count) -&gt; updateCountsDatabase(key, count));
+</pre>
+           </div></div>
+           <p>This configuration ensures that 
<code>updateCountsDatabase</code> gets events for each <code>key</code> no more 
than once every 5 minutes.
+              Note that the latest state for each key has to be buffered in 
memory for that 5-minute period.
+              You have the option to control the maximum amount of memory to 
use for this buffer (in this case, 1MB).
+              There is also an option to impose a limit in terms of number of 
records (or to leave both limits unspecified).
+           </p>
+           <p>Additionally, it is possible to choose what happens if the 
buffer fills up.
+              This example takes a relaxed approach and just emits the oldest 
records before their 5-minute time limit to bring the buffer back down to size.
+              Alternatively, you can choose to stop processing and shut the 
application down.
+              This may seem extreme, but it gives you a guarantee that the 
5-minute time limit will be absolutely enforced.
+              After the application shuts down, you could allocate more memory 
for the buffer and resume processing.
+              Emitting early is preferable for most applications.
+           </p>
+           <p>For more detailed information, see the JavaDoc on the 
<code>Suppressed</code> config object
+              and <a href="https://cwiki.apache.org/confluence/x/sQU0BQ"; 
title="KIP-328">KIP-328</a>.
+           </p>
+       </div>
         <div class="section" id="writing-streams-back-to-kafka">
             <span id="streams-developer-guide-dsl-destinations"></span><h2><a 
class="toc-backref" href="#id25">Writing streams back to Kafka</a><a 
class="headerlink" href="#writing-streams-back-to-kafka" title="Permalink to 
this headline"></a></h2>
             <p>Any streams and tables may be (continuously) written back to a 
Kafka topic.  As we will describe in more detail below, the output data might be
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index a85bbb8250d..5b0d8b59233 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -60,7 +60,7 @@ public static Sensor recordLatenessSensor(final 
InternalProcessorContext context
         sensor.add(
             new MetricName(
                 "record-lateness-avg",
-                "stream-processor-node-metrics",
+                "stream-task-metrics",
                 "The average observed lateness of records.",
                 tags),
             new Avg()
@@ -68,7 +68,7 @@ public static Sensor recordLatenessSensor(final 
InternalProcessorContext context
         sensor.add(
             new MetricName(
                 "record-lateness-max",
-                "stream-processor-node-metrics",
+                "stream-task-metrics",
                 "The max observed lateness of records.",
                 tags),
             new Max()
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 236cd8c6e3b..1e39bd3339d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -390,7 +390,7 @@ private void assertLatenessMetrics(final TopologyTestDriver 
driver,
 
         final MetricName latenessMaxMetric = new MetricName(
             "record-lateness-max",
-            "stream-processor-node-metrics",
+            "stream-task-metrics",
             "The max observed lateness of records.",
             mkMap(
                 mkEntry("client-id", "topology-test-driver-virtual-thread"),
@@ -401,7 +401,7 @@ private void assertLatenessMetrics(final TopologyTestDriver 
driver,
 
         final MetricName latenessAvgMetric = new MetricName(
             "record-lateness-avg",
-            "stream-processor-node-metrics",
+            "stream-task-metrics",
             "The average observed lateness of records.",
             mkMap(
                 mkEntry("client-id", "topology-test-driver-virtual-thread"),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KIP-328: Add in-memory Suppression
> ----------------------------------
>
>                 Key: KAFKA-7223
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7223
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Major
>
> As described in 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables.]
>  
> This ticket is to implement Suppress, but only for in-memory buffers.
> (depends on KAFKA-7222)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to