fonsdant commented on code in PR #18314: URL: https://github.com/apache/kafka/pull/18314#discussion_r1933065851
########## docs/streams/developer-guide/dsl-api.html: ########## @@ -3097,152 +3098,994 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key </div> </div> </div> - <div class="section" id="applying-processors-and-transformers-processor-api-integration"> - <span id="streams-developer-guide-dsl-process"></span><h3><a class="toc-backref" href="#id24">Applying processors and transformers (Processor API integration)</a><a class="headerlink" href="#applying-processors-and-transformers-processor-api-integration" title="Permalink to this headline"></a></h3> - <p>Beyond the aforementioned <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"><span class="std std-ref">stateless</span></a> and - <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"><span class="std std-ref">stateful</span></a> transformations, you may also - leverage the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> from the DSL. - There are a number of scenarios where this may be helpful:</p> - <ul class="simple"> - <li><strong>Customization:</strong> You need to implement special, customized logic that is not or not yet available in the DSL.</li> - <li><strong>Combining ease-of-use with full flexibility where it’s needed:</strong> Even though you generally prefer to use - the expressiveness of the DSL, there are certain steps in your processing that require more flexibility and - tinkering than the DSL provides. For example, only the Processor API provides access to a - record’s metadata such as its topic, partition, and offset information. - However, you don’t want to switch completely to the Processor API just because of that.</li> - <li><strong>Migrating from other tools:</strong> You are migrating from other stream processing technologies that provide an - imperative API, and migrating some of your legacy code to the Processor API was faster and/or easier than to - migrate completely to the DSL right away.</li> + <div class="section" id="applying-processors-processor-api-integration"> + <a class="headerlink" href="#applying-processors-processor-api-integration" title="Permalink to this headline"> + <h3> + <a class="toc-backref" href="#id24">Applying processors (Processor API integration)</a> + </h3> + </a> + <p>Beyond the aforementioned <a class="reference internal" + href="#streams-developer-guide-dsl-transformations-stateless"> + <span class="std std-ref">stateless</span></a> and + <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"> <span + class="std std-ref">stateful</span></a> transformations, you may also leverage the Processor API from the + DSL. There are a number of scenarios where this may be helpful: + </p> + <ul> + <li><strong>Customization:</strong> You need to implement special, customized logic that is not or not yet + available + in the DSL.</li> + <li><strong>Combining ease-of-use with full flexibility where it's needed:</strong> Even though you generally + prefer + to use the expressiveness of the DSL, there are certain steps in your processing that require more + flexibility and tinkering than the DSL provides. For example, only the Processor API provides access to a + record's metadata such as its topic, partition, and offset information. However, you don't want to switch + completely to the Processor API just because of that; and</li> + <li><strong>Migrating from other tools:</strong> You are migrating from other stream processing technologies + that + provide an imperative API, and migrating some of your legacy code to the Processor API was faster and/or + easier than to migrate completely to the DSL right away.</li> </ul> - <table border="1" class="non-scrolling-table width-100-percent docutils"> - <colgroup> - <col width="19%" /> - <col width="81%" /> - </colgroup> - <thead valign="bottom"> - <tr class="row-odd"><th class="head">Transformation</th> - <th class="head">Description</th> - </tr> + <h4>Operations and concepts</h4> + <ul> + <li><code>KStream#process</code>: Process all records in a stream, one record at a time, by applying a + <code>Processor</code> (provided by a given <code>ProcessorSupplier</code>); + </li> + <li><code>KStream#processValues</code>: Process all records in a stream, one record at a time, by applying a + <code>FixedKeyProcessor</code> (provided by a given <code>FixedKeyProcessorSupplier</code>); + </li> + <li><code>Processor</code>: A processor of key-value pair records;</li> + <li><code>ContextualProcessor</code>: An abstract implementation of <code>Processor</code> that manages the + <code>ProcessorContext</code> instance and provides default no-op implementation of + <code>Processor#close</code>. + </li> + <li><code>FixedKeyProcessor</code>: A processor of key-value pair records where keys are immutable;</li> + <li><code>ContextualFixedKeyProcessor</code>: An abstract implementation of <code>FixedKeyProcessor</code> that + manages the <code>FixedKeyProcessorContext</code> instance and provides default no-op implementation of + <code>Processor#close</code>; + </li> + <li><code>ProcessorSupplier</code>: A processor supplier that can create one or more <code>Processor</code> + instances; and</li> + <li><code>FixedKeyProcessorSupplier</code>: A processor supplier that can create one or more + <code>FixedKeyProcessor</code> instances. + </li> + </ul> + <h4>Examples</h4> + <p>Follow the examples below to learn how to apply <code>process</code> and <code>processValues</code> to your + <code>KStream</code>. + </p> + <table> + <thead> + <tr> + <th>Example</th> + <th>Operation</th> + <th>State Type</th> + </tr> </thead> - <tbody valign="top"> - <tr class="row-even"><td><p class="first"><strong>Process</strong></p> - <ul class="last simple"> - <li>KStream -> void</li> - </ul> - </td> - <td><p class="first"><strong>Terminal operation.</strong> Applies a <code class="docutils literal"><span class="pre">Processor</span></code> to each record. - <code class="docutils literal"><span class="pre">process()</span></code> allows you to leverage the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> from the DSL. - (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">details</a>)</p> - <p>This is essentially equivalent to adding the <code class="docutils literal"><span class="pre">Processor</span></code> via <code class="docutils literal"><span class="pre">Topology#addProcessor()</span></code> to your - <a class="reference internal" href="../core-concepts.html#streams_topology"><span class="std std-ref">processor topology</span></a>.</p> - <p class="last">An example is available in the - <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">javadocs</a>.</p> - </td> - </tr> - <tr class="row-odd"><td><p class="first"><strong>Transform</strong></p> - <ul class="last simple"> - <li>KStream -> KStream</li> - </ul> - </td> - <td><p class="first">Applies a <code class="docutils literal"><span class="pre">Transformer</span></code> to each record. - <code class="docutils literal"><span class="pre">transform()</span></code> allows you to leverage the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> from the DSL. - (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">details</a>)</p> - <p>Each input record is transformed into zero, one, or more output records (similar to the stateless <code class="docutils literal"><span class="pre">flatMap</span></code>). - The <code class="docutils literal"><span class="pre">Transformer</span></code> must return <code class="docutils literal"><span class="pre">null</span></code> for zero output. - You can modify the record’s key and value, including their types.</p> - <p><strong>Marks the stream for data re-partitioning:</strong> - Applying a grouping or a join after <code class="docutils literal"><span class="pre">transform</span></code> will result in re-partitioning of the records. - If possible use <code class="docutils literal"><span class="pre">transformValues</span></code> instead, which will not cause data re-partitioning.</p> - <p><code class="docutils literal"><span class="pre">transform</span></code> is essentially equivalent to adding the <code class="docutils literal"><span class="pre">Transformer</span></code> via <code class="docutils literal"><span class="pre">Topology#addProcessor()</span></code> to your - <a class="reference internal" href="../core-concepts.html#streams_topology"><span class="std std-ref">processor topology</span></a>.</p> - <p class="last">An example is available in the - <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">javadocs</a>. - </p> - </td> - </tr> - <tr class="row-even"><td><p class="first"><strong>Transform (values only)</strong></p> - <ul class="last simple"> - <li>KStream -> KStream</li> - <li>KTable -> KTable</li> - </ul> - </td> - <td><p class="first">Applies a <code class="docutils literal"><span class="pre">ValueTransformer</span></code> to each record, while retaining the key of the original record. - <code class="docutils literal"><span class="pre">transformValues()</span></code> allows you to leverage the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> from the DSL. - (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">details</a>)</p> - <p>Each input record is transformed into exactly one output record (zero output records or multiple output records are not possible). - The <code class="docutils literal"><span class="pre">ValueTransformer</span></code> may return <code class="docutils literal"><span class="pre">null</span></code> as the new value for a record.</p> - <p><code class="docutils literal"><span class="pre">transformValues</span></code> is preferable to <code class="docutils literal"><span class="pre">transform</span></code> because it will not cause data re-partitioning.</p> - <p><code class="docutils literal"><span class="pre">transformValues</span></code> is essentially equivalent to adding the <code class="docutils literal"><span class="pre">ValueTransformer</span></code> via <code class="docutils literal"><span class="pre">Topology#addProcessor()</span></code> to your - <a class="reference internal" href="../core-concepts.html#streams_topology"><span class="std std-ref">processor topology</span></a>.</p> - <p class="last">An example is available in the - <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">javadocs</a>.</p> - </td> - </tr> + <tbody> + <tr> + <td><a href="#cumulative-discounts-for-a-loyalty-program">Cumulative Discounts for a Loyalty Program</a> + </td> + <td><code>process</code></td> + <td>Stateful</td> + </tr> + <tr> + <td><a href="#categorizing-logs-by-severity">Categorizing Logs by Severity</a></td> + <td><code>process</code></td> + <td>Stateless</td> + </tr> + <tr> + <td><a href="#traffic-radar-monitoring-car-count">Traffic Radar Monitoring Car Count</a></td> + <td><code>processValues</code></td> + <td>Stateful</td> + </tr> + <tr> + <td><a href="#replacing-slang-in-text-messages">Replacing Slang in Text Messages</a></td> + <td><code>processValues</code></td> + <td>Stateless</td> + </tr> </tbody> </table> - <p>The following example shows how to leverage, via the <code class="docutils literal"><span class="pre">KStream#process()</span></code> method, a custom <code class="docutils literal"><span class="pre">Processor</span></code> that sends an - email notification whenever a page view count reaches a predefined threshold.</p> - <p>First, we need to implement a custom stream processor, <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code>, that implements the <code class="docutils literal"><span class="pre">Processor</span></code> - interface:</p> - <pre class="line-numbers"><code class="language-java">// A processor that sends an alert message about a popular page to a configurable email address -public class PopularPageEmailAlert implements Processor<PageId, Long, Void, Void> { - - private final String emailAddress; - private ProcessorContext<Void, Void> context; - - public PopularPageEmailAlert(String emailAddress) { - this.emailAddress = emailAddress; - } + <h5 id="cumulative-discounts-for-a-loyalty-program"">Cumulative Discounts for a Loyalty Program</h5> + <ul> + <li><strong>Idea:</strong> A stream of purchase events contains user IDs and transaction amounts. Use a state + store + to accumulate the total spending of each user. When their total crosses a threshold, apply a discount on + their next transaction and update their accumulated total.</li> + <li><strong>Real-World Context:</strong> In a retail loyalty program, tracking cumulative customer spending + enables + dynamic rewards, such as issuing a discount when a customer's total purchases exceed a predefined limit. + </li> + </ul> + <pre class="line-numbers"><code class="language-java">package org.apache.kafka.streams.kstream; - @Override - public void init(ProcessorContext<Void, Void> context) { - this.context = context; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; Review Comment: Removed with packages. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org