fonsdant commented on code in PR #18314: URL: https://github.com/apache/kafka/pull/18314#discussion_r1933119733
########## docs/streams/developer-guide/dsl-api.html: ########## @@ -3097,152 +3098,994 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key </div> </div> </div> - <div class="section" id="applying-processors-and-transformers-processor-api-integration"> - <span id="streams-developer-guide-dsl-process"></span><h3><a class="toc-backref" href="#id24">Applying processors and transformers (Processor API integration)</a><a class="headerlink" href="#applying-processors-and-transformers-processor-api-integration" title="Permalink to this headline"></a></h3> - <p>Beyond the aforementioned <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"><span class="std std-ref">stateless</span></a> and - <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"><span class="std std-ref">stateful</span></a> transformations, you may also - leverage the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> from the DSL. - There are a number of scenarios where this may be helpful:</p> - <ul class="simple"> - <li><strong>Customization:</strong> You need to implement special, customized logic that is not or not yet available in the DSL.</li> - <li><strong>Combining ease-of-use with full flexibility where it’s needed:</strong> Even though you generally prefer to use - the expressiveness of the DSL, there are certain steps in your processing that require more flexibility and - tinkering than the DSL provides. For example, only the Processor API provides access to a - record’s metadata such as its topic, partition, and offset information. - However, you don’t want to switch completely to the Processor API just because of that.</li> - <li><strong>Migrating from other tools:</strong> You are migrating from other stream processing technologies that provide an - imperative API, and migrating some of your legacy code to the Processor API was faster and/or easier than to - migrate completely to the DSL right away.</li> + <div class="section" id="applying-processors-processor-api-integration"> + <a class="headerlink" href="#applying-processors-processor-api-integration" title="Permalink to this headline"> + <h3> + <a class="toc-backref" href="#id24">Applying processors (Processor API integration)</a> + </h3> + </a> + <p>Beyond the aforementioned <a class="reference internal" + href="#streams-developer-guide-dsl-transformations-stateless"> + <span class="std std-ref">stateless</span></a> and + <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"> <span + class="std std-ref">stateful</span></a> transformations, you may also leverage the Processor API from the + DSL. There are a number of scenarios where this may be helpful: + </p> + <ul> + <li><strong>Customization:</strong> You need to implement special, customized logic that is not or not yet + available + in the DSL.</li> + <li><strong>Combining ease-of-use with full flexibility where it's needed:</strong> Even though you generally + prefer + to use the expressiveness of the DSL, there are certain steps in your processing that require more + flexibility and tinkering than the DSL provides. For example, only the Processor API provides access to a + record's metadata such as its topic, partition, and offset information. However, you don't want to switch + completely to the Processor API just because of that; and</li> + <li><strong>Migrating from other tools:</strong> You are migrating from other stream processing technologies + that + provide an imperative API, and migrating some of your legacy code to the Processor API was faster and/or + easier than to migrate completely to the DSL right away.</li> </ul> - <table border="1" class="non-scrolling-table width-100-percent docutils"> - <colgroup> - <col width="19%" /> - <col width="81%" /> - </colgroup> - <thead valign="bottom"> - <tr class="row-odd"><th class="head">Transformation</th> - <th class="head">Description</th> - </tr> + <h4>Operations and concepts</h4> + <ul> + <li><code>KStream#process</code>: Process all records in a stream, one record at a time, by applying a + <code>Processor</code> (provided by a given <code>ProcessorSupplier</code>); + </li> + <li><code>KStream#processValues</code>: Process all records in a stream, one record at a time, by applying a + <code>FixedKeyProcessor</code> (provided by a given <code>FixedKeyProcessorSupplier</code>); + </li> + <li><code>Processor</code>: A processor of key-value pair records;</li> + <li><code>ContextualProcessor</code>: An abstract implementation of <code>Processor</code> that manages the + <code>ProcessorContext</code> instance and provides default no-op implementation of + <code>Processor#close</code>. + </li> + <li><code>FixedKeyProcessor</code>: A processor of key-value pair records where keys are immutable;</li> + <li><code>ContextualFixedKeyProcessor</code>: An abstract implementation of <code>FixedKeyProcessor</code> that + manages the <code>FixedKeyProcessorContext</code> instance and provides default no-op implementation of + <code>Processor#close</code>; + </li> + <li><code>ProcessorSupplier</code>: A processor supplier that can create one or more <code>Processor</code> + instances; and</li> + <li><code>FixedKeyProcessorSupplier</code>: A processor supplier that can create one or more + <code>FixedKeyProcessor</code> instances. + </li> + </ul> + <h4>Examples</h4> + <p>Follow the examples below to learn how to apply <code>process</code> and <code>processValues</code> to your + <code>KStream</code>. + </p> + <table> + <thead> + <tr> + <th>Example</th> + <th>Operation</th> + <th>State Type</th> + </tr> </thead> - <tbody valign="top"> - <tr class="row-even"><td><p class="first"><strong>Process</strong></p> - <ul class="last simple"> - <li>KStream -> void</li> - </ul> - </td> - <td><p class="first"><strong>Terminal operation.</strong> Applies a <code class="docutils literal"><span class="pre">Processor</span></code> to each record. - <code class="docutils literal"><span class="pre">process()</span></code> allows you to leverage the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> from the DSL. - (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">details</a>)</p> - <p>This is essentially equivalent to adding the <code class="docutils literal"><span class="pre">Processor</span></code> via <code class="docutils literal"><span class="pre">Topology#addProcessor()</span></code> to your - <a class="reference internal" href="../core-concepts.html#streams_topology"><span class="std std-ref">processor topology</span></a>.</p> - <p class="last">An example is available in the - <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">javadocs</a>.</p> - </td> - </tr> - <tr class="row-odd"><td><p class="first"><strong>Transform</strong></p> - <ul class="last simple"> - <li>KStream -> KStream</li> - </ul> - </td> - <td><p class="first">Applies a <code class="docutils literal"><span class="pre">Transformer</span></code> to each record. - <code class="docutils literal"><span class="pre">transform()</span></code> allows you to leverage the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> from the DSL. - (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">details</a>)</p> - <p>Each input record is transformed into zero, one, or more output records (similar to the stateless <code class="docutils literal"><span class="pre">flatMap</span></code>). - The <code class="docutils literal"><span class="pre">Transformer</span></code> must return <code class="docutils literal"><span class="pre">null</span></code> for zero output. - You can modify the record’s key and value, including their types.</p> - <p><strong>Marks the stream for data re-partitioning:</strong> - Applying a grouping or a join after <code class="docutils literal"><span class="pre">transform</span></code> will result in re-partitioning of the records. - If possible use <code class="docutils literal"><span class="pre">transformValues</span></code> instead, which will not cause data re-partitioning.</p> - <p><code class="docutils literal"><span class="pre">transform</span></code> is essentially equivalent to adding the <code class="docutils literal"><span class="pre">Transformer</span></code> via <code class="docutils literal"><span class="pre">Topology#addProcessor()</span></code> to your - <a class="reference internal" href="../core-concepts.html#streams_topology"><span class="std std-ref">processor topology</span></a>.</p> - <p class="last">An example is available in the - <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">javadocs</a>. - </p> - </td> - </tr> - <tr class="row-even"><td><p class="first"><strong>Transform (values only)</strong></p> - <ul class="last simple"> - <li>KStream -> KStream</li> - <li>KTable -> KTable</li> - </ul> - </td> - <td><p class="first">Applies a <code class="docutils literal"><span class="pre">ValueTransformer</span></code> to each record, while retaining the key of the original record. - <code class="docutils literal"><span class="pre">transformValues()</span></code> allows you to leverage the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> from the DSL. - (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">details</a>)</p> - <p>Each input record is transformed into exactly one output record (zero output records or multiple output records are not possible). - The <code class="docutils literal"><span class="pre">ValueTransformer</span></code> may return <code class="docutils literal"><span class="pre">null</span></code> as the new value for a record.</p> - <p><code class="docutils literal"><span class="pre">transformValues</span></code> is preferable to <code class="docutils literal"><span class="pre">transform</span></code> because it will not cause data re-partitioning.</p> - <p><code class="docutils literal"><span class="pre">transformValues</span></code> is essentially equivalent to adding the <code class="docutils literal"><span class="pre">ValueTransformer</span></code> via <code class="docutils literal"><span class="pre">Topology#addProcessor()</span></code> to your - <a class="reference internal" href="../core-concepts.html#streams_topology"><span class="std std-ref">processor topology</span></a>.</p> - <p class="last">An example is available in the - <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">javadocs</a>.</p> - </td> - </tr> + <tbody> + <tr> + <td><a href="#cumulative-discounts-for-a-loyalty-program">Cumulative Discounts for a Loyalty Program</a> + </td> + <td><code>process</code></td> + <td>Stateful</td> + </tr> + <tr> + <td><a href="#categorizing-logs-by-severity">Categorizing Logs by Severity</a></td> + <td><code>process</code></td> + <td>Stateless</td> + </tr> + <tr> + <td><a href="#traffic-radar-monitoring-car-count">Traffic Radar Monitoring Car Count</a></td> + <td><code>processValues</code></td> + <td>Stateful</td> + </tr> + <tr> + <td><a href="#replacing-slang-in-text-messages">Replacing Slang in Text Messages</a></td> + <td><code>processValues</code></td> + <td>Stateless</td> + </tr> </tbody> </table> - <p>The following example shows how to leverage, via the <code class="docutils literal"><span class="pre">KStream#process()</span></code> method, a custom <code class="docutils literal"><span class="pre">Processor</span></code> that sends an - email notification whenever a page view count reaches a predefined threshold.</p> - <p>First, we need to implement a custom stream processor, <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code>, that implements the <code class="docutils literal"><span class="pre">Processor</span></code> - interface:</p> - <pre class="line-numbers"><code class="language-java">// A processor that sends an alert message about a popular page to a configurable email address -public class PopularPageEmailAlert implements Processor<PageId, Long, Void, Void> { - - private final String emailAddress; - private ProcessorContext<Void, Void> context; - - public PopularPageEmailAlert(String emailAddress) { - this.emailAddress = emailAddress; - } + <h5 id="cumulative-discounts-for-a-loyalty-program"">Cumulative Discounts for a Loyalty Program</h5> + <ul> + <li><strong>Idea:</strong> A stream of purchase events contains user IDs and transaction amounts. Use a state + store + to accumulate the total spending of each user. When their total crosses a threshold, apply a discount on + their next transaction and update their accumulated total.</li> + <li><strong>Real-World Context:</strong> In a retail loyalty program, tracking cumulative customer spending + enables + dynamic rewards, such as issuing a discount when a customer's total purchases exceed a predefined limit. + </li> + </ul> + <pre class="line-numbers"><code class="language-java">package org.apache.kafka.streams.kstream; - @Override - public void init(ProcessorContext<Void, Void> context) { - this.context = context; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; + +public class CumulativeDiscountsForALoyaltyProgramExample { + private static final double DISCOUNT_THRESHOLD = 100.0; + private static final String CUSTOMER_SPENDING_STORE = "customer-spending-store"; + private static final String DISCOUNT_NOTIFICATION_MESSAGE = + "Discount applied! You have received a reward for your purchases."; + private static final String DISCOUNT_NOTIFICATIONS_TOPIC = "discount-notifications-topic"; + private static final String PURCHASE_EVENTS_TOPIC = "purchase-events-topic"; + + public static void applyDiscountWithProcess(final StreamsBuilder builder) { + // Define the state store for tracking cumulative spending + builder.addStateStore( + Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE), + Serdes.String(), + Serdes.Double() + ) + ); + final KStream<String, Double> purchaseStream = builder.stream(PURCHASE_EVENTS_TOPIC); + // Apply the Processor with the state store + final KStream<String, String> notificationStream = + purchaseStream.process(CumulativeDiscountProcessor::new, CUSTOMER_SPENDING_STORE); + // Send the notifications to the output topic + notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC); + } - // Here you would perform any additional initializations such as setting up an email client. - } + private static class CumulativeDiscountProcessor implements Processor<String, Double, String, String> { + private KeyValueStore<String, Double> spendingStore; + private ProcessorContext<String, String> context; + + @Override + public void init(final ProcessorContext<String, String> context) { + this.context = context; + // Retrieve the state store for cumulative spending + spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE); + } + + @Override + public void process(final Record<String, Double> record) { + if (record.value() == null) { + return; // Skip null purchase amounts + } - @Override - void process(Record<PageId, Long> record) { - // Here you would format and send the alert email. - // - // In this specific example, you would be able to include - // information about the page's ID and its view count - } + // Get the current spending total for the customer + Double currentSpending = spendingStore.get(record.key()); + if (currentSpending == null) { + currentSpending = 0.0; + } + // Update the cumulative spending + currentSpending += record.value(); + spendingStore.put(record.key(), currentSpending); + + // Check if the customer qualifies for a discount + if (currentSpending >= DISCOUNT_THRESHOLD) { + // Reset the spending after applying the discount + spendingStore.put(record.key(), currentSpending - DISCOUNT_THRESHOLD); + // Send a discount notification + context.forward(record.withValue(DISCOUNT_NOTIFICATION_MESSAGE)); + } + } + } +}</code></pre> + <h5 id="categorizing-logs-by-severity">Categorizing Logs by Severity</h5> + <ul> + <li><strong>Idea:</strong> You have a stream of log messages. Each message contains a severity level (e.g., + INFO, + WARN, ERROR) in the value. The processor filters messages, routing ERROR messages to a dedicated topic and + discarding INFO messages. The rest (WARN) are forwarded to a dedicated topic too.</li> + <li><strong>Real-World Context:</strong> In a production monitoring system, categorizing logs by severity + ensures + ERROR logs are sent to a critical incident management system, WARN logs are analyzed for potential risks, + and + INFO logs are stored for basic reporting purposes.</li> + </ul> + <pre class="line-numbers"><code class="language-java">package org.apache.kafka.streams.kstream; - @Override - void close() { - // Any code for clean up would go here, for example tearing down the email client and anything - // else you created in the init() method - // This processor instance will not be used again after this call. - } +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Record; + +import java.util.Collections; +import java.util.List; + +public class CategorizingLogsBySeverityExample { + private static final String ERROR_LOGS_TOPIC = "error-logs-topic"; + private static final String INPUT_LOGS_TOPIC = "input-logs-topic"; + private static final String UNKNOWN_LOGS_TOPIC = "unknown-logs-topic"; + private static final String WARN_LOGS_TOPIC = "warn-logs-topic"; + + public static void categorizeWithProcess(final StreamsBuilder builder) { + final KStream<String, String> logStream = builder.stream(INPUT_LOGS_TOPIC); + logStream.process(LogSeverityProcessor::new) + .to((key, value, recordContext) -> { + // Determine the target topic dynamically + if ("ERROR".equals(key)) return ERROR_LOGS_TOPIC; + if ("WARN".equals(key)) return WARN_LOGS_TOPIC; + return UNKNOWN_LOGS_TOPIC; + }); + } + + private static class LogSeverityProcessor extends ContextualProcessor<String, String, String, String> { + @Override + public void process(final Record<String, String> record) { + if (record.value() == null) { + return; // Skip null values + } + // Assume the severity is the first word in the log message + // For example: "ERROR: Disk not found" -> "ERROR" + final int colonIndex = record.value().indexOf(':'); + final String severity = colonIndex > 0 ? record.value().substring(0, colonIndex).trim() : "UNKNOWN"; + + // Route logs based on severity + switch (severity) { + case "ERROR": + context().forward(record.withKey(ERROR_LOGS_TOPIC)); + break; + case "WARN": + context().forward(record.withKey(WARN_LOGS_TOPIC)); + break; + case "INFO": + // INFO logs are ignored + break; + default: + // Forward to an "unknown" topic for logs with unrecognized severities + context().forward(record.withKey(UNKNOWN_LOGS_TOPIC)); + } + } + } }</code></pre> - <div class="admonition tip"> - <p><b>Tip</b></p> - <p class="last">Even though we do not demonstrate it in this example, a stream processor can access any available state stores by - calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. - State stores are only available if they have been connected to the processor, or if they are global stores. While global stores do not need to be connected explicitly, they only allow for read-only access. - There are two ways to connect state stores to a processor: - <ul class="simple"> - <li>By passing the name of a store that has already been added via <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code> to the corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call.</li> - <li>Implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> - passed to <code class="docutils literal"><span class="pre">KStream#process()</span></code>. In this case there is no need to call <code class="docutils literal"><span class="pre">StreamsBuilder#addStateStore()</span></code> - beforehand, the store will be automatically added for you. You can also implement <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the - <code class="docutils literal"><span class="pre">Value*</span></code> or <code class="docutils literal"><span class="pre">*WithKey</span></code> supplier variants, or <code class="docutils literal"><span class="pre">TransformerSupplier</span></code> or any of its variants. - </li> - </ul> - </div> - <p>Then we can leverage the <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the DSL via <code class="docutils literal"><span class="pre">KStream#process</span></code>.</p> - <pre class="line-numbers"><code class="language-java">KStream<String, GenericRecord> pageViews = ...; - -// Send an email notification when the view count of a page reaches one thousand. -pageViews.groupByKey() - .count() - .filter((PageId pageId, Long viewCount) -> viewCount == 1000) - // PopularPageEmailAlert is your custom processor that implements the - // `Processor` interface, see further down below. - .process(() -> new PopularPageEmailAlert("ale...@yourcompany.com"));</code></pre> - </div> + <h5 id="traffic-radar-monitoring-car-count">Traffic Radar Monitoring Car Count</h5> + <ul> + <li><strong>Idea:</strong> A radar monitors cars passing along a road stretch. A system counts the cars for each + day, maintaining a cumulative total for the current day in a state store. At the end of the day, the count + is + emitted and the state is cleared for the next day.</li> + <li><strong>Real-World Context:</strong> A car counting system can be useful for determining measures for + widening + or controlling traffic depending on the number of cars passing through the monitored stretch.</li> + </ul> + <pre class="line-numbers"><code class="language-java">package org.apache.kafka.streams.kstream; + +import org.apache.kafka.common.serialization; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.state.Stores; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +public class TrafficRadarMonitoringCarCountExample { + private static final String DAILY_COUNT_STORE = "price-state-store"; + private static final String DAILY_COUNT_TOPIC = "price-state-topic"; + private static final String RADAR_COUNT_TOPIC = "car-radar-topic"; + + public static void countWithProcessValues(final StreamsBuilder builder) { + // Define a state store for tracking daily car counts + builder.addStateStore( + Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE), + Serdes.String(), + Serdes.Long() + ) + ); + final KStream<Void, String> radarStream = builder.stream(RADAR_COUNT_TOPIC); + // Apply the FixedKeyProcessor with the state store + radarStream.processValues(DailyCarCountProcessor::new, DAILY_COUNT_STORE) + .to(DAILY_COUNT_TOPIC); + } + + private static class DailyCarCountProcessor implements FixedKeyProcessor<Void, String, String> { + private FixedKeyProcessorContext<Void, String> context; + private KeyValueStore<String, Long> stateStore; + private static final DateTimeFormatter DATE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault()); + + @Override + public void init(final FixedKeyProcessorContext<Void, String> context) { + this.context = context; + stateStore = context.getStateStore(DAILY_COUNT_STORE); + } + + @Override + public void process(final FixedKeyRecord<Void, String> record) { + if (record.value() == null) { + return; // Skip null events + } + + // Derive the current day from the event timestamp + final long timestamp = System.currentTimeMillis(); // Use system time for simplicity + final String currentDay = DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp)); + // Retrieve the current count for the day + Long dailyCount = stateStore.get(currentDay); + if (dailyCount == null) { + dailyCount = 0L; + } + // Increment the count + dailyCount++; + stateStore.put(currentDay, dailyCount); + + // Emit the current day's count + context.forward(record.withValue(String.format("Day: %s, Car Count: %s", currentDay, dailyCount))); + } + } +}</code></pre> + <h5 id="replacing-slang-in-text-messages">Replacing Slang in Text Messages</h5> + <ul> + <li><strong>Idea:</strong> A messaging stream contains user-generated content, and you want to replace slang + words + with their formal equivalents (e.g., "u" becomes "you", "brb" becomes "be + right back"). The operation only modifies the message value and keeps the key intact.</li> + <li><strong>Real-World Context:</strong> In customer support chat systems, normalizing text by replacing slang + with + formal equivalents ensures that automated sentiment analysis tools work accurately and provide reliable + insights.</li> + </ul> + <pre class="line-numbers"><code class="language-java">package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +public class ReplacingSlangTextInMessagesExample { + private static final Map<String, String> SLANG_DICTIONARY = Map.of( + "u", "you", + "brb", "be right back", + "omg", "oh my god", + "btw", "by the way" + ); + private static final String INPUT_MESSAGES_TOPIC = "input-messages-topic"; + private static final String OUTPUT_MESSAGES_TOPIC = "output-messages-topic"; + + public static void replaceWithProcessValues(final StreamsBuilder builder) { + KStream<String, String> messageStream = builder.stream(INPUT_MESSAGES_TOPIC); + messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC); + } + + private static class SlangReplacementProcessor extends ContextualFixedKeyProcessor<String, String, String> { + @Override + public void process(final FixedKeyRecord<String, String> record) { + if (record.value() == null) { + return; // Skip null values + } + + // Replace slang words in the message + final String[] words = record.value().split("\\s+"); + for (final String word : words) { + String replacedWord = SLANG_DICTIONARY.getOrDefault(word, word); + context().forward(record.withValue(replacedWord)); + } + } + } +}</code></pre> + <h4>Keynotes</h4> + <ul> + <li><strong>Type Safety and Flexibility:</strong> The process and processValues APIs utilize + <code>ProcessorContext</code> and <code>Record</code> or <code>FixedKeyRecord</code> objects for better type + safety and flexibility of custom processing logic. + </li> + <li><strong>Clear State and Logic Management:</strong> Implementations for <code>Processor</code> or + <code>FixedKeyProcessor</code> should manage state and logic clearly. Use <code>context().forward()</code> + for emitting records downstream. + </li> + <li><strong>Unified API:</strong> Consolidates multiple methods into a single, versatile API.</li> + <li><strong>Future-Proof:</strong> Ensures compatibility with the latest Kafka Streams releases.</li> + </ul> + </div> + <div class="section" id="transformers-removal-and-migration-to-processors"> + <a class="headerlink" href="#transformers-removal-and-migration-to-processors" title="Permalink to this headline"> + <h3> + <a class="toc-backref" href="#id37">Transformers removal and migration to processors</a> + </h3> + </a> + <p>As of Kafka 4.0, several deprecated methods in the Kafka Streams API, such as <code>transform</code>, + <code>flatTransform</code>,<br><code>transformValues</code>, <code>flatTransformValues</code>, and + <code>process</code> have been removed. These methods have been replaced with the<br>more versatile Processor API. + This guide provides detailed steps for migrating existing code to use the new Processor<br>API and explains the + benefits of the changes.</p> + <p>The following deprecated methods are no longer available in Kafka Streams:</p> + <ul> + <li><code>KStream#transform</code></li> + <li><code>KStream#flatTransform</code></li> + <li><code>KStream#transformValues</code></li> + <li><code>KStream#flatTransformValues</code></li> + <li><code>KStream#process</code></li> + </ul> + <p>The Processor API now serves as a unified replacement for all these methods. It simplifies the API surface + while<br>maintaining support for both stateless and stateful operations.</p> + <h4>Migration Examples</h4> + <p>To migrate from the deprecated <code>transform</code>, <code>transformValues</code>, <code>flatTransform</code>, and + <code>flatTransformValues</code> methods to the<br>Processor API (PAPI) in Kafka Streams, let's resume the + previouss examples. The new <code>process</code> and <code>processValues</code><br>methods enable a more flexible + and reusable approach by requiring implementations of the <code>Processor</code> + or<br><code>FixedKeyProcessor</code> interfaces.</p> + <table> + <thead> + <tr> + <th>Example</th> + <th>Migrating from</th> + <th>Migrating to</th> + <th>State Type</th> + </tr> + </thead> + <tbody> + <tr> + <td><a href="#cumulative-discounts-for-a-loyalty-program-removal">Cumulative Discounts for a Loyalty Program</a> + </td> + <td><code>transform</code></td> + <td><code>process</code></td> + <td>Stateful</td> Review Comment: Updated. ########## docs/streams/developer-guide/dsl-api.html: ########## @@ -3097,152 +3098,994 @@ <h5><a class="toc-backref" href="#id34">KTable-KTable Foreign-Key </div> </div> </div> - <div class="section" id="applying-processors-and-transformers-processor-api-integration"> - <span id="streams-developer-guide-dsl-process"></span><h3><a class="toc-backref" href="#id24">Applying processors and transformers (Processor API integration)</a><a class="headerlink" href="#applying-processors-and-transformers-processor-api-integration" title="Permalink to this headline"></a></h3> - <p>Beyond the aforementioned <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"><span class="std std-ref">stateless</span></a> and - <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"><span class="std std-ref">stateful</span></a> transformations, you may also - leverage the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> from the DSL. - There are a number of scenarios where this may be helpful:</p> - <ul class="simple"> - <li><strong>Customization:</strong> You need to implement special, customized logic that is not or not yet available in the DSL.</li> - <li><strong>Combining ease-of-use with full flexibility where it’s needed:</strong> Even though you generally prefer to use - the expressiveness of the DSL, there are certain steps in your processing that require more flexibility and - tinkering than the DSL provides. For example, only the Processor API provides access to a - record’s metadata such as its topic, partition, and offset information. - However, you don’t want to switch completely to the Processor API just because of that.</li> - <li><strong>Migrating from other tools:</strong> You are migrating from other stream processing technologies that provide an - imperative API, and migrating some of your legacy code to the Processor API was faster and/or easier than to - migrate completely to the DSL right away.</li> + <div class="section" id="applying-processors-processor-api-integration"> + <a class="headerlink" href="#applying-processors-processor-api-integration" title="Permalink to this headline"> + <h3> + <a class="toc-backref" href="#id24">Applying processors (Processor API integration)</a> + </h3> + </a> + <p>Beyond the aforementioned <a class="reference internal" + href="#streams-developer-guide-dsl-transformations-stateless"> + <span class="std std-ref">stateless</span></a> and + <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"> <span + class="std std-ref">stateful</span></a> transformations, you may also leverage the Processor API from the + DSL. There are a number of scenarios where this may be helpful: + </p> + <ul> + <li><strong>Customization:</strong> You need to implement special, customized logic that is not or not yet + available + in the DSL.</li> + <li><strong>Combining ease-of-use with full flexibility where it's needed:</strong> Even though you generally + prefer + to use the expressiveness of the DSL, there are certain steps in your processing that require more + flexibility and tinkering than the DSL provides. For example, only the Processor API provides access to a + record's metadata such as its topic, partition, and offset information. However, you don't want to switch + completely to the Processor API just because of that; and</li> + <li><strong>Migrating from other tools:</strong> You are migrating from other stream processing technologies + that + provide an imperative API, and migrating some of your legacy code to the Processor API was faster and/or + easier than to migrate completely to the DSL right away.</li> </ul> - <table border="1" class="non-scrolling-table width-100-percent docutils"> - <colgroup> - <col width="19%" /> - <col width="81%" /> - </colgroup> - <thead valign="bottom"> - <tr class="row-odd"><th class="head">Transformation</th> - <th class="head">Description</th> - </tr> + <h4>Operations and concepts</h4> + <ul> + <li><code>KStream#process</code>: Process all records in a stream, one record at a time, by applying a + <code>Processor</code> (provided by a given <code>ProcessorSupplier</code>); + </li> + <li><code>KStream#processValues</code>: Process all records in a stream, one record at a time, by applying a + <code>FixedKeyProcessor</code> (provided by a given <code>FixedKeyProcessorSupplier</code>); + </li> + <li><code>Processor</code>: A processor of key-value pair records;</li> + <li><code>ContextualProcessor</code>: An abstract implementation of <code>Processor</code> that manages the + <code>ProcessorContext</code> instance and provides default no-op implementation of + <code>Processor#close</code>. + </li> + <li><code>FixedKeyProcessor</code>: A processor of key-value pair records where keys are immutable;</li> + <li><code>ContextualFixedKeyProcessor</code>: An abstract implementation of <code>FixedKeyProcessor</code> that + manages the <code>FixedKeyProcessorContext</code> instance and provides default no-op implementation of + <code>Processor#close</code>; + </li> + <li><code>ProcessorSupplier</code>: A processor supplier that can create one or more <code>Processor</code> + instances; and</li> + <li><code>FixedKeyProcessorSupplier</code>: A processor supplier that can create one or more + <code>FixedKeyProcessor</code> instances. + </li> + </ul> + <h4>Examples</h4> + <p>Follow the examples below to learn how to apply <code>process</code> and <code>processValues</code> to your + <code>KStream</code>. + </p> + <table> + <thead> + <tr> + <th>Example</th> + <th>Operation</th> + <th>State Type</th> + </tr> </thead> - <tbody valign="top"> - <tr class="row-even"><td><p class="first"><strong>Process</strong></p> - <ul class="last simple"> - <li>KStream -> void</li> - </ul> - </td> - <td><p class="first"><strong>Terminal operation.</strong> Applies a <code class="docutils literal"><span class="pre">Processor</span></code> to each record. - <code class="docutils literal"><span class="pre">process()</span></code> allows you to leverage the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> from the DSL. - (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">details</a>)</p> - <p>This is essentially equivalent to adding the <code class="docutils literal"><span class="pre">Processor</span></code> via <code class="docutils literal"><span class="pre">Topology#addProcessor()</span></code> to your - <a class="reference internal" href="../core-concepts.html#streams_topology"><span class="std std-ref">processor topology</span></a>.</p> - <p class="last">An example is available in the - <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#process-org.apache.kafka.streams.processor.ProcessorSupplier-java.lang.String...-">javadocs</a>.</p> - </td> - </tr> - <tr class="row-odd"><td><p class="first"><strong>Transform</strong></p> - <ul class="last simple"> - <li>KStream -> KStream</li> - </ul> - </td> - <td><p class="first">Applies a <code class="docutils literal"><span class="pre">Transformer</span></code> to each record. - <code class="docutils literal"><span class="pre">transform()</span></code> allows you to leverage the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> from the DSL. - (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">details</a>)</p> - <p>Each input record is transformed into zero, one, or more output records (similar to the stateless <code class="docutils literal"><span class="pre">flatMap</span></code>). - The <code class="docutils literal"><span class="pre">Transformer</span></code> must return <code class="docutils literal"><span class="pre">null</span></code> for zero output. - You can modify the record’s key and value, including their types.</p> - <p><strong>Marks the stream for data re-partitioning:</strong> - Applying a grouping or a join after <code class="docutils literal"><span class="pre">transform</span></code> will result in re-partitioning of the records. - If possible use <code class="docutils literal"><span class="pre">transformValues</span></code> instead, which will not cause data re-partitioning.</p> - <p><code class="docutils literal"><span class="pre">transform</span></code> is essentially equivalent to adding the <code class="docutils literal"><span class="pre">Transformer</span></code> via <code class="docutils literal"><span class="pre">Topology#addProcessor()</span></code> to your - <a class="reference internal" href="../core-concepts.html#streams_topology"><span class="std std-ref">processor topology</span></a>.</p> - <p class="last">An example is available in the - <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transform-org.apache.kafka.streams.kstream.TransformerSupplier-java.lang.String...-">javadocs</a>. - </p> - </td> - </tr> - <tr class="row-even"><td><p class="first"><strong>Transform (values only)</strong></p> - <ul class="last simple"> - <li>KStream -> KStream</li> - <li>KTable -> KTable</li> - </ul> - </td> - <td><p class="first">Applies a <code class="docutils literal"><span class="pre">ValueTransformer</span></code> to each record, while retaining the key of the original record. - <code class="docutils literal"><span class="pre">transformValues()</span></code> allows you to leverage the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a> from the DSL. - (<a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">details</a>)</p> - <p>Each input record is transformed into exactly one output record (zero output records or multiple output records are not possible). - The <code class="docutils literal"><span class="pre">ValueTransformer</span></code> may return <code class="docutils literal"><span class="pre">null</span></code> as the new value for a record.</p> - <p><code class="docutils literal"><span class="pre">transformValues</span></code> is preferable to <code class="docutils literal"><span class="pre">transform</span></code> because it will not cause data re-partitioning.</p> - <p><code class="docutils literal"><span class="pre">transformValues</span></code> is essentially equivalent to adding the <code class="docutils literal"><span class="pre">ValueTransformer</span></code> via <code class="docutils literal"><span class="pre">Topology#addProcessor()</span></code> to your - <a class="reference internal" href="../core-concepts.html#streams_topology"><span class="std std-ref">processor topology</span></a>.</p> - <p class="last">An example is available in the - <a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/kstream/KStream.html#transformValues-org.apache.kafka.streams.kstream.ValueTransformerSupplier-java.lang.String...-">javadocs</a>.</p> - </td> - </tr> + <tbody> + <tr> + <td><a href="#cumulative-discounts-for-a-loyalty-program">Cumulative Discounts for a Loyalty Program</a> + </td> + <td><code>process</code></td> + <td>Stateful</td> + </tr> + <tr> + <td><a href="#categorizing-logs-by-severity">Categorizing Logs by Severity</a></td> + <td><code>process</code></td> + <td>Stateless</td> + </tr> + <tr> + <td><a href="#traffic-radar-monitoring-car-count">Traffic Radar Monitoring Car Count</a></td> + <td><code>processValues</code></td> + <td>Stateful</td> + </tr> + <tr> + <td><a href="#replacing-slang-in-text-messages">Replacing Slang in Text Messages</a></td> + <td><code>processValues</code></td> + <td>Stateless</td> + </tr> </tbody> </table> - <p>The following example shows how to leverage, via the <code class="docutils literal"><span class="pre">KStream#process()</span></code> method, a custom <code class="docutils literal"><span class="pre">Processor</span></code> that sends an - email notification whenever a page view count reaches a predefined threshold.</p> - <p>First, we need to implement a custom stream processor, <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code>, that implements the <code class="docutils literal"><span class="pre">Processor</span></code> - interface:</p> - <pre class="line-numbers"><code class="language-java">// A processor that sends an alert message about a popular page to a configurable email address -public class PopularPageEmailAlert implements Processor<PageId, Long, Void, Void> { - - private final String emailAddress; - private ProcessorContext<Void, Void> context; - - public PopularPageEmailAlert(String emailAddress) { - this.emailAddress = emailAddress; - } + <h5 id="cumulative-discounts-for-a-loyalty-program"">Cumulative Discounts for a Loyalty Program</h5> + <ul> + <li><strong>Idea:</strong> A stream of purchase events contains user IDs and transaction amounts. Use a state + store + to accumulate the total spending of each user. When their total crosses a threshold, apply a discount on + their next transaction and update their accumulated total.</li> + <li><strong>Real-World Context:</strong> In a retail loyalty program, tracking cumulative customer spending + enables + dynamic rewards, such as issuing a discount when a customer's total purchases exceed a predefined limit. + </li> + </ul> + <pre class="line-numbers"><code class="language-java">package org.apache.kafka.streams.kstream; - @Override - public void init(ProcessorContext<Void, Void> context) { - this.context = context; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; + +public class CumulativeDiscountsForALoyaltyProgramExample { + private static final double DISCOUNT_THRESHOLD = 100.0; + private static final String CUSTOMER_SPENDING_STORE = "customer-spending-store"; + private static final String DISCOUNT_NOTIFICATION_MESSAGE = + "Discount applied! You have received a reward for your purchases."; + private static final String DISCOUNT_NOTIFICATIONS_TOPIC = "discount-notifications-topic"; + private static final String PURCHASE_EVENTS_TOPIC = "purchase-events-topic"; + + public static void applyDiscountWithProcess(final StreamsBuilder builder) { + // Define the state store for tracking cumulative spending + builder.addStateStore( + Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE), + Serdes.String(), + Serdes.Double() + ) + ); + final KStream<String, Double> purchaseStream = builder.stream(PURCHASE_EVENTS_TOPIC); + // Apply the Processor with the state store + final KStream<String, String> notificationStream = + purchaseStream.process(CumulativeDiscountProcessor::new, CUSTOMER_SPENDING_STORE); + // Send the notifications to the output topic + notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC); + } - // Here you would perform any additional initializations such as setting up an email client. - } + private static class CumulativeDiscountProcessor implements Processor<String, Double, String, String> { + private KeyValueStore<String, Double> spendingStore; + private ProcessorContext<String, String> context; + + @Override + public void init(final ProcessorContext<String, String> context) { + this.context = context; + // Retrieve the state store for cumulative spending + spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE); + } + + @Override + public void process(final Record<String, Double> record) { + if (record.value() == null) { + return; // Skip null purchase amounts + } - @Override - void process(Record<PageId, Long> record) { - // Here you would format and send the alert email. - // - // In this specific example, you would be able to include - // information about the page's ID and its view count - } + // Get the current spending total for the customer + Double currentSpending = spendingStore.get(record.key()); + if (currentSpending == null) { + currentSpending = 0.0; + } + // Update the cumulative spending + currentSpending += record.value(); + spendingStore.put(record.key(), currentSpending); + + // Check if the customer qualifies for a discount + if (currentSpending >= DISCOUNT_THRESHOLD) { + // Reset the spending after applying the discount + spendingStore.put(record.key(), currentSpending - DISCOUNT_THRESHOLD); + // Send a discount notification + context.forward(record.withValue(DISCOUNT_NOTIFICATION_MESSAGE)); + } + } + } +}</code></pre> + <h5 id="categorizing-logs-by-severity">Categorizing Logs by Severity</h5> + <ul> + <li><strong>Idea:</strong> You have a stream of log messages. Each message contains a severity level (e.g., + INFO, + WARN, ERROR) in the value. The processor filters messages, routing ERROR messages to a dedicated topic and + discarding INFO messages. The rest (WARN) are forwarded to a dedicated topic too.</li> + <li><strong>Real-World Context:</strong> In a production monitoring system, categorizing logs by severity + ensures + ERROR logs are sent to a critical incident management system, WARN logs are analyzed for potential risks, + and + INFO logs are stored for basic reporting purposes.</li> + </ul> + <pre class="line-numbers"><code class="language-java">package org.apache.kafka.streams.kstream; - @Override - void close() { - // Any code for clean up would go here, for example tearing down the email client and anything - // else you created in the init() method - // This processor instance will not be used again after this call. - } +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Record; + +import java.util.Collections; +import java.util.List; + +public class CategorizingLogsBySeverityExample { + private static final String ERROR_LOGS_TOPIC = "error-logs-topic"; + private static final String INPUT_LOGS_TOPIC = "input-logs-topic"; + private static final String UNKNOWN_LOGS_TOPIC = "unknown-logs-topic"; + private static final String WARN_LOGS_TOPIC = "warn-logs-topic"; + + public static void categorizeWithProcess(final StreamsBuilder builder) { + final KStream<String, String> logStream = builder.stream(INPUT_LOGS_TOPIC); + logStream.process(LogSeverityProcessor::new) + .to((key, value, recordContext) -> { + // Determine the target topic dynamically + if ("ERROR".equals(key)) return ERROR_LOGS_TOPIC; + if ("WARN".equals(key)) return WARN_LOGS_TOPIC; + return UNKNOWN_LOGS_TOPIC; + }); + } + + private static class LogSeverityProcessor extends ContextualProcessor<String, String, String, String> { + @Override + public void process(final Record<String, String> record) { + if (record.value() == null) { + return; // Skip null values + } + // Assume the severity is the first word in the log message + // For example: "ERROR: Disk not found" -> "ERROR" + final int colonIndex = record.value().indexOf(':'); + final String severity = colonIndex > 0 ? record.value().substring(0, colonIndex).trim() : "UNKNOWN"; + + // Route logs based on severity + switch (severity) { + case "ERROR": + context().forward(record.withKey(ERROR_LOGS_TOPIC)); + break; + case "WARN": + context().forward(record.withKey(WARN_LOGS_TOPIC)); + break; + case "INFO": + // INFO logs are ignored + break; + default: + // Forward to an "unknown" topic for logs with unrecognized severities + context().forward(record.withKey(UNKNOWN_LOGS_TOPIC)); + } + } + } }</code></pre> - <div class="admonition tip"> - <p><b>Tip</b></p> - <p class="last">Even though we do not demonstrate it in this example, a stream processor can access any available state stores by - calling <code class="docutils literal"><span class="pre">ProcessorContext#getStateStore()</span></code>. - State stores are only available if they have been connected to the processor, or if they are global stores. While global stores do not need to be connected explicitly, they only allow for read-only access. - There are two ways to connect state stores to a processor: - <ul class="simple"> - <li>By passing the name of a store that has already been added via <code class="docutils literal"><span class="pre">Topology#addStateStore()</span></code> to the corresponding <code class="docutils literal"><span class="pre">KStream#process()</span></code> method call.</li> - <li>Implementing <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the <code class="docutils literal"><span class="pre">ProcessorSupplier</span></code> - passed to <code class="docutils literal"><span class="pre">KStream#process()</span></code>. In this case there is no need to call <code class="docutils literal"><span class="pre">StreamsBuilder#addStateStore()</span></code> - beforehand, the store will be automatically added for you. You can also implement <code class="docutils literal"><span class="pre">ConnectedStoreProvider#stores()</span></code> on the - <code class="docutils literal"><span class="pre">Value*</span></code> or <code class="docutils literal"><span class="pre">*WithKey</span></code> supplier variants, or <code class="docutils literal"><span class="pre">TransformerSupplier</span></code> or any of its variants. - </li> - </ul> - </div> - <p>Then we can leverage the <code class="docutils literal"><span class="pre">PopularPageEmailAlert</span></code> processor in the DSL via <code class="docutils literal"><span class="pre">KStream#process</span></code>.</p> - <pre class="line-numbers"><code class="language-java">KStream<String, GenericRecord> pageViews = ...; - -// Send an email notification when the view count of a page reaches one thousand. -pageViews.groupByKey() - .count() - .filter((PageId pageId, Long viewCount) -> viewCount == 1000) - // PopularPageEmailAlert is your custom processor that implements the - // `Processor` interface, see further down below. - .process(() -> new PopularPageEmailAlert("ale...@yourcompany.com"));</code></pre> - </div> + <h5 id="traffic-radar-monitoring-car-count">Traffic Radar Monitoring Car Count</h5> + <ul> + <li><strong>Idea:</strong> A radar monitors cars passing along a road stretch. A system counts the cars for each + day, maintaining a cumulative total for the current day in a state store. At the end of the day, the count + is + emitted and the state is cleared for the next day.</li> + <li><strong>Real-World Context:</strong> A car counting system can be useful for determining measures for + widening + or controlling traffic depending on the number of cars passing through the monitored stretch.</li> + </ul> + <pre class="line-numbers"><code class="language-java">package org.apache.kafka.streams.kstream; + +import org.apache.kafka.common.serialization; +import org.apache.kafka.streams.processor.api.FixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.state.Stores; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; + +public class TrafficRadarMonitoringCarCountExample { + private static final String DAILY_COUNT_STORE = "price-state-store"; + private static final String DAILY_COUNT_TOPIC = "price-state-topic"; + private static final String RADAR_COUNT_TOPIC = "car-radar-topic"; + + public static void countWithProcessValues(final StreamsBuilder builder) { + // Define a state store for tracking daily car counts + builder.addStateStore( + Stores.keyValueStoreBuilder( + Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE), + Serdes.String(), + Serdes.Long() + ) + ); + final KStream<Void, String> radarStream = builder.stream(RADAR_COUNT_TOPIC); + // Apply the FixedKeyProcessor with the state store + radarStream.processValues(DailyCarCountProcessor::new, DAILY_COUNT_STORE) + .to(DAILY_COUNT_TOPIC); + } + + private static class DailyCarCountProcessor implements FixedKeyProcessor<Void, String, String> { + private FixedKeyProcessorContext<Void, String> context; + private KeyValueStore<String, Long> stateStore; + private static final DateTimeFormatter DATE_FORMATTER = + DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault()); + + @Override + public void init(final FixedKeyProcessorContext<Void, String> context) { + this.context = context; + stateStore = context.getStateStore(DAILY_COUNT_STORE); + } + + @Override + public void process(final FixedKeyRecord<Void, String> record) { + if (record.value() == null) { + return; // Skip null events + } + + // Derive the current day from the event timestamp + final long timestamp = System.currentTimeMillis(); // Use system time for simplicity + final String currentDay = DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp)); + // Retrieve the current count for the day + Long dailyCount = stateStore.get(currentDay); + if (dailyCount == null) { + dailyCount = 0L; + } + // Increment the count + dailyCount++; + stateStore.put(currentDay, dailyCount); + + // Emit the current day's count + context.forward(record.withValue(String.format("Day: %s, Car Count: %s", currentDay, dailyCount))); + } + } +}</code></pre> + <h5 id="replacing-slang-in-text-messages">Replacing Slang in Text Messages</h5> + <ul> + <li><strong>Idea:</strong> A messaging stream contains user-generated content, and you want to replace slang + words + with their formal equivalents (e.g., "u" becomes "you", "brb" becomes "be + right back"). The operation only modifies the message value and keeps the key intact.</li> + <li><strong>Real-World Context:</strong> In customer support chat systems, normalizing text by replacing slang + with + formal equivalents ensures that automated sentiment analysis tools work accurately and provide reliable + insights.</li> + </ul> + <pre class="line-numbers"><code class="language-java">package org.apache.kafka.streams.kstream; + +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.processor.api.ContextualFixedKeyProcessor; +import org.apache.kafka.streams.processor.api.FixedKeyRecord; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; + +public class ReplacingSlangTextInMessagesExample { + private static final Map<String, String> SLANG_DICTIONARY = Map.of( + "u", "you", + "brb", "be right back", + "omg", "oh my god", + "btw", "by the way" + ); + private static final String INPUT_MESSAGES_TOPIC = "input-messages-topic"; + private static final String OUTPUT_MESSAGES_TOPIC = "output-messages-topic"; + + public static void replaceWithProcessValues(final StreamsBuilder builder) { + KStream<String, String> messageStream = builder.stream(INPUT_MESSAGES_TOPIC); + messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC); + } + + private static class SlangReplacementProcessor extends ContextualFixedKeyProcessor<String, String, String> { + @Override + public void process(final FixedKeyRecord<String, String> record) { + if (record.value() == null) { + return; // Skip null values + } + + // Replace slang words in the message + final String[] words = record.value().split("\\s+"); + for (final String word : words) { + String replacedWord = SLANG_DICTIONARY.getOrDefault(word, word); + context().forward(record.withValue(replacedWord)); + } + } + } +}</code></pre> + <h4>Keynotes</h4> + <ul> + <li><strong>Type Safety and Flexibility:</strong> The process and processValues APIs utilize + <code>ProcessorContext</code> and <code>Record</code> or <code>FixedKeyRecord</code> objects for better type + safety and flexibility of custom processing logic. + </li> + <li><strong>Clear State and Logic Management:</strong> Implementations for <code>Processor</code> or + <code>FixedKeyProcessor</code> should manage state and logic clearly. Use <code>context().forward()</code> + for emitting records downstream. + </li> + <li><strong>Unified API:</strong> Consolidates multiple methods into a single, versatile API.</li> + <li><strong>Future-Proof:</strong> Ensures compatibility with the latest Kafka Streams releases.</li> + </ul> + </div> + <div class="section" id="transformers-removal-and-migration-to-processors"> + <a class="headerlink" href="#transformers-removal-and-migration-to-processors" title="Permalink to this headline"> + <h3> + <a class="toc-backref" href="#id37">Transformers removal and migration to processors</a> + </h3> + </a> + <p>As of Kafka 4.0, several deprecated methods in the Kafka Streams API, such as <code>transform</code>, + <code>flatTransform</code>,<br><code>transformValues</code>, <code>flatTransformValues</code>, and + <code>process</code> have been removed. These methods have been replaced with the<br>more versatile Processor API. + This guide provides detailed steps for migrating existing code to use the new Processor<br>API and explains the + benefits of the changes.</p> + <p>The following deprecated methods are no longer available in Kafka Streams:</p> + <ul> + <li><code>KStream#transform</code></li> + <li><code>KStream#flatTransform</code></li> + <li><code>KStream#transformValues</code></li> + <li><code>KStream#flatTransformValues</code></li> + <li><code>KStream#process</code></li> + </ul> + <p>The Processor API now serves as a unified replacement for all these methods. It simplifies the API surface + while<br>maintaining support for both stateless and stateful operations.</p> + <h4>Migration Examples</h4> + <p>To migrate from the deprecated <code>transform</code>, <code>transformValues</code>, <code>flatTransform</code>, and + <code>flatTransformValues</code> methods to the<br>Processor API (PAPI) in Kafka Streams, let's resume the + previouss examples. The new <code>process</code> and <code>processValues</code><br>methods enable a more flexible + and reusable approach by requiring implementations of the <code>Processor</code> + or<br><code>FixedKeyProcessor</code> interfaces.</p> + <table> + <thead> + <tr> + <th>Example</th> + <th>Migrating from</th> + <th>Migrating to</th> + <th>State Type</th> + </tr> + </thead> + <tbody> + <tr> + <td><a href="#cumulative-discounts-for-a-loyalty-program-removal">Cumulative Discounts for a Loyalty Program</a> + </td> + <td><code>transform</code></td> + <td><code>process</code></td> + <td>Stateful</td> + </tr> + <tr> + <td><a href="#categorizing-logs-by-severity-removal">Categorizing Logs by Severity</a></td> + <td><code>flatTransform</code></td> + <td><code>process</code></td> + <td>Stateless</td> + </tr> + <tr> + <td><a href="#traffic-radar-monitoring-car-count-removal">Traffic Radar Monitoring Car Count</a></td> + <td><code>transformValues</code></td> + <td><code>processValues</code></td> + <td>Stateful</td> Review Comment: Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org