fonsdant commented on PR #18314: URL: https://github.com/apache/kafka/pull/18314#issuecomment-2566716097
I have wrote a "draft" into Markdown, before to write it into HTML. I hope this helps the review. Now, I will translate it to HTML. *** # Migrating from transform to process To migrate from the deprecated transform and flatTransform methods to the process API in Kafka Streams, follow these steps. The new process and processValues APIs enable a more flexible and reusable approach by requiring implementations of the Processor or ProcessorWithKey interfaces. Here are examples to help you migrate: | Example | Migrating from | Migrating to | Type | |-------------------------------------------------------------------------------------------|-----------------------|-----------------|-----------| | [Cumulative Discounts for a Loyalty Program](#cumulative-discounts-for-a-loyalty-program) | `transform` | `process` | Stateful | | [Categorizing Logs by Severity](#categorizing-logs-by-severity) | `flatTransform` | `process` | Stateless | | [Traffic Radar Monitoring Car Count](#traffic-radar-monitoring-car-count) | `transformValues` | `processValues` | Stateful | | [Replacing Slang in Text Messages](#replacing-slang-in-text-messages) | `flatTransformValues` | `processValues` | Stateless | ## Stateless Examples ### Categorizing Logs by Severity * **Idea:** You have a stream of log messages. Each message contains a severity level (e.g., INFO, WARN, ERROR) in the value. The processor filters messages, routing ERROR messages to a dedicated topic and discarding INFO messages. The rest (WARN) are forwarded to another processor. * **Real-World Context:** In a production monitoring system, categorizing logs by severity ensures ERROR logs are sent to a critical incident management system, WARN logs are analyzed for potential risks, and INFO logs are stored for basic reporting purposes. Below, methods `categorizeWithFlatTransform` and `categorizeWithProcess` show how you can migrate from `flatTransform` to `process`. ```java package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import java.util.Collections; import java.util.List; public class CategorizingLogsBySeverityExample { private static final String ERROR_LOGS_TOPIC = "error-logs-topic"; private static final String INPUT_LOGS_TOPIC = "input-logs-topic"; private static final String UNKNOWN_LOGS_TOPIC = "unknown-logs-topic"; private static final String WARN_LOGS_TOPIC = "warn-logs-topic"; public static void categorizeWithFlatTransform(final StreamsBuilder builder) { final KStream<String, String> logStream = builder.stream(INPUT_LOGS_TOPIC); logStream.flatTransform(() -> new LogSeverityTransformer()) .to((key, value, recordContext) -> { // Determine the target topic dynamically if ("ERROR".equals(key)) return ERROR_LOGS_TOPIC; if ("WARN".equals(key)) return WARN_LOGS_TOPIC; return UNKNOWN_LOGS_TOPIC; }); } public static void categorizeWithProcess(final StreamsBuilder builder) { final KStream<String, String> logStream = builder.stream(INPUT_LOGS_TOPIC); logStream.process(LogSeverityProcessor::new); } private static class LogSeverityTransformer implements Transformer<String, String, Iterable<KeyValue<String, String>>> { @Override public void init(org.apache.kafka.streams.processor.ProcessorContext context) { } @Override public Iterable<KeyValue<String, String>> transform(String key, String value) { if (value == null) { return Collections.emptyList(); // Skip null values } // Assume the severity is the first word in the log message // For example: "ERROR: Disk not found" -> "ERROR" int colonIndex = value.indexOf(':'); String severity = colonIndex > 0 ? value.substring(0, colonIndex).trim() : "UNKNOWN"; // Create appropriate KeyValue pair based on severity return switch (severity) { case "ERROR" -> List.of(new KeyValue<>("ERROR", value)); case "WARN" -> List.of(new KeyValue<>("WARN", value)); case "INFO" -> Collections.emptyList(); // INFO logs are ignored default -> List.of(new KeyValue<>("UNKNOWN", value)); }; } @Override public void close() { } } private static class LogSeverityProcessor implements Processor<String, String, String, String> { private ProcessorContext<String, String> context; @Override public void init(final ProcessorContext<String, String> context) { this.context = context; } @Override public void process(final Record<String, String> record) { if (record.value() == null) { return; // Skip null values } // Assume the severity is the first word in the log message // For example: "ERROR: Disk not found" -> "ERROR" final int colonIndex = record.value().indexOf(':'); final String severity = colonIndex > 0 ? record.value().substring(0, colonIndex).trim() : "UNKNOWN"; // Route logs based on severity switch (severity) { case "ERROR": context.forward(new Record<>(ERROR_LOGS_TOPIC, record.value(), record.timestamp())); break; case "WARN": context.forward(new Record<>(WARN_LOGS_TOPIC, record.value(), record.timestamp())); break; case "INFO": // INFO logs are ignored break; default: // Forward to an "unknown" topic for logs with unrecognized severities context.forward(new Record<>(UNKNOWN_LOGS_TOPIC, record.value(), record.timestamp())); } } } } ``` ### Replacing Slang in Text Messages * **Idea:** A messaging stream contains user-generated content, and you want to replace slang words with their formal equivalents (e.g., "u" becomes "you", "brb" becomes "be right back"). The operation only modifies the message value and keeps the key intact. * **Real-World Context:** In customer support chat systems, normalizing text by replacing slang with formal equivalents ensures that automated sentiment analysis tools work accurately and provide reliable insights. Below, methods `replaceWithFlatTransformValues` and `replaceWithProcessValues` show how you can migrate from `flatTransformValues` to `processValues`. ```java package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import java.util.Arrays; import java.util.Collections; import java.util.Map; public class ReplacingSlangTextInMessagesExample { private static final Map<String, String> SLANG_DICTIONARY = Map.of( "u", "you", "brb", "be right back", "omg", "oh my god", "btw", "by the way" ); private static final String INPUT_MESSAGES_TOPIC = "input-messages-topic"; private static final String OUTPUT_MESSAGES_TOPIC = "output-messages-topic"; public static void replaceWithFlatTransformValues(final StreamsBuilder builder) { KStream<String, String> messageStream = builder.stream(INPUT_MESSAGES_TOPIC); messageStream.flatTransformValues(SlangReplacementTransformer::new).to(OUTPUT_MESSAGES_TOPIC); } public static void replaceWithProcessValues(final StreamsBuilder builder) { KStream<String, String> messageStream = builder.stream(INPUT_MESSAGES_TOPIC); messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC); } private static class SlangReplacementTransformer implements ValueTransformer<String, Iterable<String>> { @Override public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { } @Override public Iterable<String> transform(final String value) { if (value == null) { return Collections.emptyList(); // Skip null values } // Replace slang words in the message final String[] words = value.split("\\s+"); return Arrays.asList( Arrays.stream(words) .map(word -> SLANG_DICTIONARY.getOrDefault(word, word)) .toArray(String[]::new) ); } @Override public void close() { } } private static class SlangReplacementProcessor implements FixedKeyProcessor<String, String, String> { private FixedKeyProcessorContext<String, String> context; @Override public void init(final FixedKeyProcessorContext<String, String> context) { this.context = context; } @Override public void process(final FixedKeyRecord<String, String> record) { if (record.value() == null) { return; // Skip null values } // Replace slang words in the message final String[] words = record.value().split("\\s+"); final StringBuilder replacedMessage = new StringBuilder(); for (String word : words) { replacedMessage.append(SLANG_DICTIONARY.getOrDefault(word, word)).append(" "); } context.forward(record.withValue(replacedMessage.toString())); } } } ``` ## Stateful Examples ### Cumulative Discounts for a Loyalty Program * **Idea:** A stream of purchase events contains user IDs and transaction amounts. Use a state store to accumulate the total spending of each user. When their total crosses a threshold, apply a discount on their next transaction and update their accumulated total. * **Real-World Context:** In a retail loyalty program, tracking cumulative customer spending enables dynamic rewards, such as issuing a discount when a customer’s total purchases exceed a predefined limit. Below, methods `applyDiscountWithTransform` and `applyDiscountWithProcess` show how you can migrate from `transform` to `process`. ```java package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; public class CumulativeDiscountsForALoyaltyProgramExample { private static final double DISCOUNT_THRESHOLD = 100.0; private static final String CUSTOMER_SPENDING_STORE = "customer-spending-store"; private static final String DISCOUNT_NOTIFICATION_MESSAGE = "Discount applied! You have received a reward for your purchases."; private static final String DISCOUNT_NOTIFICATIONS_TOPIC = "discount-notifications-topic"; private static final String PURCHASE_EVENTS_TOPIC = "purchase-events-topic"; public static void applyDiscountWithTransform(final StreamsBuilder builder) { // Define the state store for tracking cumulative spending builder.addStateStore( Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE), org.apache.kafka.common.serialization.Serdes.String(), org.apache.kafka.common.serialization.Serdes.Double() ) ); final KStream<String, Double> purchaseStream = builder.stream(PURCHASE_EVENTS_TOPIC); // Apply the Transformer with the state store final KStream<String, String> notificationStream = purchaseStream.transform(CumulativeDiscountTransformer::new, CUSTOMER_SPENDING_STORE); // Send the notifications to the output topic notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC); } public static void applyDiscountWithProcess(final StreamsBuilder builder) { // Define the state store for tracking cumulative spending builder.addStateStore( Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE), org.apache.kafka.common.serialization.Serdes.String(), org.apache.kafka.common.serialization.Serdes.Double() ) ); final KStream<String, Double> purchaseStream = builder.stream(PURCHASE_EVENTS_TOPIC); // Apply the Processor with the state store final KStream<String, String> notificationStream = purchaseStream.process(CumulativeDiscountProcessor::new, CUSTOMER_SPENDING_STORE); // Send the notifications to the output topic notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC); } private static class CumulativeDiscountTransformer implements Transformer<String, Double, KeyValue<String, String>> { private KeyValueStore<String, Double> spendingStore; @Override public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { // Retrieve the state store for cumulative spending spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE); } @Override public KeyValue<String, String> transform(final String key, final Double value) { if (value == null) { return null; // Skip null purchase amounts } // Get the current spending total for the customer Double currentSpending = spendingStore.get(key); if (currentSpending == null) { currentSpending = 0.0; } // Update the cumulative spending currentSpending += value; spendingStore.put(key, currentSpending); // Check if the customer qualifies for a discount if (currentSpending >= DISCOUNT_THRESHOLD) { // Reset the spending after applying the discount spendingStore.put(key, currentSpending - DISCOUNT_THRESHOLD); // Return a notification message return new KeyValue<>(key, DISCOUNT_NOTIFICATION_MESSAGE); } return null; // No discount, so no output for this record } @Override public void close() { } } private static class CumulativeDiscountProcessor implements Processor<String, Double, String, String> { private KeyValueStore<String, Double> spendingStore; private ProcessorContext<String, String> context; @Override public void init(final ProcessorContext<String, String> context) { this.context = context; // Retrieve the state store for cumulative spending spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE); } @Override public void process(final Record<String, Double> record) { if (record.value() == null) { return; // Skip null purchase amounts } // Get the current spending total for the customer Double currentSpending = spendingStore.get(record.key()); if (currentSpending == null) { currentSpending = 0.0; } // Update the cumulative spending currentSpending += record.value(); spendingStore.put(record.key(), currentSpending); // Check if the customer qualifies for a discount if (currentSpending >= DISCOUNT_THRESHOLD) { // Reset the spending after applying the discount spendingStore.put(record.key(), currentSpending - DISCOUNT_THRESHOLD); // Send a discount notification context.forward(new Record<>(record.key(), DISCOUNT_NOTIFICATION_MESSAGE, record.timestamp())); } } } } ``` ### Traffic Radar Monitoring Car Count * **Idea:** A radar monitors cars passing along a road stretch. A system counts the cars for each day, maintaining a cumulative total for the current day in a state store. At the end of the day, the count is emitted and the state is cleared for the next day. * **Real-World Context:** A car counting system can be useful for determining measures for widening or controlling traffic depending on the number of cars passing through the monitored stretch. Below, methods `countWithTransformValues` and `countWithProcessValues` show how you can migrate from `transformValues` to `processValues`. ```java package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.processor.api.FixedKeyProcessor; import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext; import org.apache.kafka.streams.processor.api.FixedKeyRecord; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.state.Stores; import java.time.Instant; import java.time.ZoneId; import java.time.format.DateTimeFormatter; public class TrafficRadarMonitoringCarCountExample { private static final String DAILY_COUNT_STORE = "price-state-store"; private static final String DAILY_COUNT_TOPIC = "price-state-topic"; private static final String RADAR_COUNT_TOPIC = "car-radar-topic"; public static void countWithTransformValues(final StreamsBuilder builder) { // Define a state store for tracking daily car counts builder.addStateStore( Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE), org.apache.kafka.common.serialization.Serdes.String(), org.apache.kafka.common.serialization.Serdes.Long() ) ); final KStream<Void, String> radarStream = builder.stream(RADAR_COUNT_TOPIC); // Apply the ValueTransformer with the state store radarStream.transformValues(DailyCarCountTransformer::new, DAILY_COUNT_STORE) .to(DAILY_COUNT_TOPIC); } public static void countWithProcessValues(final StreamsBuilder builder) { // Define a state store for tracking daily car counts builder.addStateStore( Stores.keyValueStoreBuilder( Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE), org.apache.kafka.common.serialization.Serdes.String(), org.apache.kafka.common.serialization.Serdes.Long() ) ); final KStream<Void, String> radarStream = builder.stream(RADAR_COUNT_TOPIC); // Apply the FixedKeyProcessor with the state store radarStream.processValues(DailyCarCountProcessor::new, DAILY_COUNT_STORE) .to(DAILY_COUNT_TOPIC); } public static class DailyCarCountTransformer implements ValueTransformer<String, String> { private KeyValueStore<String, Long> stateStore; private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault()); @Override public void init(final org.apache.kafka.streams.processor.ProcessorContext context) { // Access the state store stateStore = context.getStateStore(DAILY_COUNT_STORE); } @Override public String transform(final String value) { if (value == null) { return null; // Skip null events } // Derive the current day from the event timestamp final long timestamp = System.currentTimeMillis(); // Use system time for simplicity final String currentDay = DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp)); // Retrieve the current count for the day Long dailyCount = stateStore.get(currentDay); if (dailyCount == null) { dailyCount = 0L; } // Increment the count dailyCount++; stateStore.put(currentDay, dailyCount); // Return the current day's count return String.format("Day: %s, Car Count: %s", currentDay, dailyCount); } @Override public void close() { } } private static class DailyCarCountProcessor implements FixedKeyProcessor<Void, String, String> { private FixedKeyProcessorContext<Void, String> context; private KeyValueStore<String, Long> stateStore; private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault()); @Override public void init(final FixedKeyProcessorContext<Void, String> context) { this.context = context; stateStore = context.getStateStore(DAILY_COUNT_STORE); } @Override public void process(final FixedKeyRecord<Void, String> record) { if (record.value() == null) { return; // Skip null events } // Derive the current day from the event timestamp final long timestamp = System.currentTimeMillis(); // Use system time for simplicity final String currentDay = DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp)); // Retrieve the current count for the day Long dailyCount = stateStore.get(currentDay); if (dailyCount == null) { dailyCount = 0L; } // Increment the count dailyCount++; stateStore.put(currentDay, dailyCount); // Emit the current day's count context.forward(record.withValue(String.format("Day: %s, Car Count: %s", currentDay, dailyCount))); } } } ``` ## Keynotes * The process and processValues APIs utilize ProcessorContext and Record objects for better type safety and flexibility. * Implementations for Processor or ProcessorWithKey should manage state and logic clearly. Use context().forward() for * emitting records downstream. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org