fonsdant commented on PR #18314:
URL: https://github.com/apache/kafka/pull/18314#issuecomment-2566716097

   I have wrote a "draft" into Markdown, before to write it into HTML. I hope 
this helps the review. Now, I will translate it to HTML.
   
   ***
   
   # Migrating from transform to process
   
   To migrate from the deprecated transform and flatTransform methods to the 
process API in Kafka Streams, follow these
   steps. The new process and processValues APIs enable a more flexible and 
reusable approach by requiring implementations
   of the Processor or ProcessorWithKey interfaces.
   
   Here are examples to help you migrate:
   
   | Example                                                                    
               | Migrating from        | Migrating to    | Type      |
   
|-------------------------------------------------------------------------------------------|-----------------------|-----------------|-----------|
   | [Cumulative Discounts for a Loyalty 
Program](#cumulative-discounts-for-a-loyalty-program) | `transform`           | 
`process`       | Stateful  |
   | [Categorizing Logs by Severity](#categorizing-logs-by-severity)            
               | `flatTransform`       | `process`       | Stateless |
   | [Traffic Radar Monitoring Car Count](#traffic-radar-monitoring-car-count)  
               | `transformValues`     | `processValues` | Stateful  |
   | [Replacing Slang in Text Messages](#replacing-slang-in-text-messages)      
               | `flatTransformValues` | `processValues` | Stateless |
   
   ## Stateless Examples
   
   ### Categorizing Logs by Severity
   
   * **Idea:** You have a stream of log messages. Each message contains a 
severity level (e.g., INFO, WARN, ERROR) in the
     value. The processor filters messages, routing ERROR messages to a 
dedicated topic and discarding INFO messages. The
     rest (WARN) are forwarded to another processor.
   * **Real-World Context:** In a production monitoring system, categorizing 
logs by severity ensures ERROR logs are sent
     to a critical incident management system, WARN logs are analyzed for 
potential risks, and INFO logs are stored for
     basic reporting purposes.
   
   Below, methods `categorizeWithFlatTransform` and `categorizeWithProcess` 
show how you can migrate from `flatTransform`
   to `process`.
   
   ```java
   package org.apache.kafka.streams.kstream;
   
   import org.apache.kafka.streams.KeyValue;
   import org.apache.kafka.streams.StreamsBuilder;
   import org.apache.kafka.streams.processor.api.Processor;
   import org.apache.kafka.streams.processor.api.ProcessorContext;
   import org.apache.kafka.streams.processor.api.Record;
   
   import java.util.Collections;
   import java.util.List;
   
   public class CategorizingLogsBySeverityExample {
       private static final String ERROR_LOGS_TOPIC = "error-logs-topic";
       private static final String INPUT_LOGS_TOPIC = "input-logs-topic";
       private static final String UNKNOWN_LOGS_TOPIC = "unknown-logs-topic";
       private static final String WARN_LOGS_TOPIC = "warn-logs-topic";
   
       public static void categorizeWithFlatTransform(final StreamsBuilder 
builder) {
           final KStream<String, String> logStream = 
builder.stream(INPUT_LOGS_TOPIC);
           logStream.flatTransform(() -> new LogSeverityTransformer())
               .to((key, value, recordContext) -> {
                   // Determine the target topic dynamically
                   if ("ERROR".equals(key)) return ERROR_LOGS_TOPIC;
                   if ("WARN".equals(key)) return WARN_LOGS_TOPIC;
                   return UNKNOWN_LOGS_TOPIC;
               });
       }
   
       public static void categorizeWithProcess(final StreamsBuilder builder) {
           final KStream<String, String> logStream = 
builder.stream(INPUT_LOGS_TOPIC);
           logStream.process(LogSeverityProcessor::new);
       }
   
       private static class LogSeverityTransformer implements 
Transformer<String, String, Iterable<KeyValue<String, String>>> {
           @Override
           public void init(org.apache.kafka.streams.processor.ProcessorContext 
context) {
           }
   
           @Override
           public Iterable<KeyValue<String, String>> transform(String key, 
String value) {
               if (value == null) {
                   return Collections.emptyList(); // Skip null values
               }
   
               // Assume the severity is the first word in the log message
               // For example: "ERROR: Disk not found" -> "ERROR"
               int colonIndex = value.indexOf(':');
               String severity = colonIndex > 0 ? value.substring(0, 
colonIndex).trim() : "UNKNOWN";
   
               // Create appropriate KeyValue pair based on severity
               return switch (severity) {
                   case "ERROR" -> List.of(new KeyValue<>("ERROR", value));
                   case "WARN" -> List.of(new KeyValue<>("WARN", value));
                   case "INFO" -> Collections.emptyList(); // INFO logs are 
ignored
                   default -> List.of(new KeyValue<>("UNKNOWN", value));
               };
           }
   
           @Override
           public void close() {
           }
       }
   
       private static class LogSeverityProcessor implements Processor<String, 
String, String, String> {
           private ProcessorContext<String, String> context;
   
           @Override
           public void init(final ProcessorContext<String, String> context) {
               this.context = context;
           }
   
           @Override
           public void process(final Record<String, String> record) {
               if (record.value() == null) {
                   return; // Skip null values
               }
   
               // Assume the severity is the first word in the log message
               // For example: "ERROR: Disk not found" -> "ERROR"
               final int colonIndex = record.value().indexOf(':');
               final String severity = colonIndex > 0 ? 
record.value().substring(0, colonIndex).trim() : "UNKNOWN";
   
               // Route logs based on severity
               switch (severity) {
                   case "ERROR":
                       context.forward(new Record<>(ERROR_LOGS_TOPIC, 
record.value(), record.timestamp()));
                       break;
                   case "WARN":
                       context.forward(new Record<>(WARN_LOGS_TOPIC, 
record.value(), record.timestamp()));
                       break;
                   case "INFO":
                       // INFO logs are ignored
                       break;
                   default:
                       // Forward to an "unknown" topic for logs with 
unrecognized severities
                       context.forward(new Record<>(UNKNOWN_LOGS_TOPIC, 
record.value(), record.timestamp()));
               }
           }
       }
   }
   ```
   
   ### Replacing Slang in Text Messages
   
   * **Idea:** A messaging stream contains user-generated content, and you want 
to replace slang words with their formal
     equivalents (e.g., "u" becomes "you", "brb" becomes "be right back"). The 
operation only modifies the message value
     and keeps the key intact.
   * **Real-World Context:** In customer support chat systems, normalizing text 
by replacing slang with formal equivalents
     ensures that automated sentiment analysis tools work accurately and 
provide reliable insights.
   
   Below, methods `replaceWithFlatTransformValues` and 
`replaceWithProcessValues` show how you can migrate from
   `flatTransformValues` to `processValues`.
   
   ```java
   package org.apache.kafka.streams.kstream;
   
   import org.apache.kafka.streams.StreamsBuilder;
   import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
   import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
   import org.apache.kafka.streams.processor.api.FixedKeyRecord;
   
   import java.util.Arrays;
   import java.util.Collections;
   import java.util.Map;
   
   public class ReplacingSlangTextInMessagesExample {
       private static final Map<String, String> SLANG_DICTIONARY = Map.of(
           "u", "you",
           "brb", "be right back",
           "omg", "oh my god",
           "btw", "by the way"
       );
       private static final String INPUT_MESSAGES_TOPIC = 
"input-messages-topic";
       private static final String OUTPUT_MESSAGES_TOPIC = 
"output-messages-topic";
   
       public static void replaceWithFlatTransformValues(final StreamsBuilder 
builder) {
           KStream<String, String> messageStream = 
builder.stream(INPUT_MESSAGES_TOPIC);
           
messageStream.flatTransformValues(SlangReplacementTransformer::new).to(OUTPUT_MESSAGES_TOPIC);
       }
   
       public static void replaceWithProcessValues(final StreamsBuilder 
builder) {
           KStream<String, String> messageStream = 
builder.stream(INPUT_MESSAGES_TOPIC);
           
messageStream.processValues(SlangReplacementProcessor::new).to(OUTPUT_MESSAGES_TOPIC);
       }
   
       private static class SlangReplacementTransformer implements 
ValueTransformer<String, Iterable<String>> {
           @Override
           public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
           }
   
           @Override
           public Iterable<String> transform(final String value) {
               if (value == null) {
                   return Collections.emptyList(); // Skip null values
               }
   
               // Replace slang words in the message
               final String[] words = value.split("\\s+");
               return Arrays.asList(
                   Arrays.stream(words)
                       .map(word -> SLANG_DICTIONARY.getOrDefault(word, word))
                       .toArray(String[]::new)
               );
           }
   
           @Override
           public void close() {
           }
       }
   
       private static class SlangReplacementProcessor implements 
FixedKeyProcessor<String, String, String> {
           private FixedKeyProcessorContext<String, String> context;
   
           @Override
           public void init(final FixedKeyProcessorContext<String, String> 
context) {
               this.context = context;
           }
   
           @Override
           public void process(final FixedKeyRecord<String, String> record) {
               if (record.value() == null) {
                   return; // Skip null values
               }
   
               // Replace slang words in the message
               final String[] words = record.value().split("\\s+");
               final StringBuilder replacedMessage = new StringBuilder();
               for (String word : words) {
                   replacedMessage.append(SLANG_DICTIONARY.getOrDefault(word, 
word)).append(" ");
               }
               context.forward(record.withValue(replacedMessage.toString()));
           }
       }
   }
   ```
   
   ## Stateful Examples
   
   ### Cumulative Discounts for a Loyalty Program
   
   * **Idea:** A stream of purchase events contains user IDs and transaction 
amounts. Use a state store to accumulate the
     total spending of each user. When their total crosses a threshold, apply a 
discount on their next transaction and
     update their accumulated total.
   * **Real-World Context:** In a retail loyalty program, tracking cumulative 
customer spending enables dynamic rewards,
     such as issuing a discount when a customer’s total purchases exceed a 
predefined limit.
   
   Below, methods `applyDiscountWithTransform` and `applyDiscountWithProcess` 
show how you can migrate from `transform` to
   `process`.
   
   ```java
   package org.apache.kafka.streams.kstream;
   
   import org.apache.kafka.streams.KeyValue;
   import org.apache.kafka.streams.StreamsBuilder;
   import org.apache.kafka.streams.processor.api.Processor;
   import org.apache.kafka.streams.processor.api.ProcessorContext;
   import org.apache.kafka.streams.processor.api.Record;
   import org.apache.kafka.streams.state.KeyValueStore;
   import org.apache.kafka.streams.state.Stores;
   
   public class CumulativeDiscountsForALoyaltyProgramExample {
       private static final double DISCOUNT_THRESHOLD = 100.0;
       private static final String CUSTOMER_SPENDING_STORE = 
"customer-spending-store";
       private static final String DISCOUNT_NOTIFICATION_MESSAGE =
           "Discount applied! You have received a reward for your purchases.";
       private static final String DISCOUNT_NOTIFICATIONS_TOPIC = 
"discount-notifications-topic";
       private static final String PURCHASE_EVENTS_TOPIC = 
"purchase-events-topic";
   
       public static void applyDiscountWithTransform(final StreamsBuilder 
builder) {
           // Define the state store for tracking cumulative spending
           builder.addStateStore(
               Stores.keyValueStoreBuilder(
                   Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE),
                   org.apache.kafka.common.serialization.Serdes.String(),
                   org.apache.kafka.common.serialization.Serdes.Double()
               )
           );
           final KStream<String, Double> purchaseStream = 
builder.stream(PURCHASE_EVENTS_TOPIC);
           // Apply the Transformer with the state store
           final KStream<String, String> notificationStream =
               purchaseStream.transform(CumulativeDiscountTransformer::new, 
CUSTOMER_SPENDING_STORE);
           // Send the notifications to the output topic
           notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC);
       }
   
       public static void applyDiscountWithProcess(final StreamsBuilder 
builder) {
           // Define the state store for tracking cumulative spending
           builder.addStateStore(
               Stores.keyValueStoreBuilder(
                   Stores.inMemoryKeyValueStore(CUSTOMER_SPENDING_STORE),
                   org.apache.kafka.common.serialization.Serdes.String(),
                   org.apache.kafka.common.serialization.Serdes.Double()
               )
           );
           final KStream<String, Double> purchaseStream = 
builder.stream(PURCHASE_EVENTS_TOPIC);
           // Apply the Processor with the state store
           final KStream<String, String> notificationStream =
               purchaseStream.process(CumulativeDiscountProcessor::new, 
CUSTOMER_SPENDING_STORE);
           // Send the notifications to the output topic
           notificationStream.to(DISCOUNT_NOTIFICATIONS_TOPIC);
       }
   
       private static class CumulativeDiscountTransformer implements 
Transformer<String, Double, KeyValue<String, String>> {
           private KeyValueStore<String, Double> spendingStore;
   
           @Override
           public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
               // Retrieve the state store for cumulative spending
               spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE);
           }
   
           @Override
           public KeyValue<String, String> transform(final String key, final 
Double value) {
               if (value == null) {
                   return null; // Skip null purchase amounts
               }
   
               // Get the current spending total for the customer
               Double currentSpending = spendingStore.get(key);
               if (currentSpending == null) {
                   currentSpending = 0.0;
               }
               // Update the cumulative spending
               currentSpending += value;
               spendingStore.put(key, currentSpending);
   
               // Check if the customer qualifies for a discount
               if (currentSpending >= DISCOUNT_THRESHOLD) {
                   // Reset the spending after applying the discount
                   spendingStore.put(key, currentSpending - DISCOUNT_THRESHOLD);
                   // Return a notification message
                   return new KeyValue<>(key, DISCOUNT_NOTIFICATION_MESSAGE);
               }
               return null; // No discount, so no output for this record
           }
   
           @Override
           public void close() {
           }
       }
   
       private static class CumulativeDiscountProcessor implements 
Processor<String, Double, String, String> {
           private KeyValueStore<String, Double> spendingStore;
           private ProcessorContext<String, String> context;
   
           @Override
           public void init(final ProcessorContext<String, String> context) {
               this.context = context;
               // Retrieve the state store for cumulative spending
               spendingStore = context.getStateStore(CUSTOMER_SPENDING_STORE);
           }
   
           @Override
           public void process(final Record<String, Double> record) {
               if (record.value() == null) {
                   return; // Skip null purchase amounts
               }
   
               // Get the current spending total for the customer
               Double currentSpending = spendingStore.get(record.key());
               if (currentSpending == null) {
                   currentSpending = 0.0;
               }
               // Update the cumulative spending
               currentSpending += record.value();
               spendingStore.put(record.key(), currentSpending);
   
               // Check if the customer qualifies for a discount
               if (currentSpending >= DISCOUNT_THRESHOLD) {
                   // Reset the spending after applying the discount
                   spendingStore.put(record.key(), currentSpending - 
DISCOUNT_THRESHOLD);
                   // Send a discount notification
                   context.forward(new Record<>(record.key(), 
DISCOUNT_NOTIFICATION_MESSAGE, record.timestamp()));
               }
           }
       }
   }
   ```
   
   ### Traffic Radar Monitoring Car Count
   
   * **Idea:** A radar monitors cars passing along a road stretch. A system 
counts the cars for each day, maintaining a
     cumulative total for the current day in a state store. At the end of the 
day, the count is emitted and the state is
     cleared for the next day.
   * **Real-World Context:** A car counting system can be useful for 
determining measures for widening or controlling
     traffic depending on the number of cars passing through the monitored 
stretch.
   
   Below, methods `countWithTransformValues` and `countWithProcessValues` show 
how you can migrate from `transformValues`
   to `processValues`.
   
   ```java
   package org.apache.kafka.streams.kstream;
   
   import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
   import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
   import org.apache.kafka.streams.processor.api.FixedKeyRecord;
   import org.apache.kafka.streams.state.KeyValueStore;
   import org.apache.kafka.streams.StreamsBuilder;
   import org.apache.kafka.streams.state.Stores;
   
   import java.time.Instant;
   import java.time.ZoneId;
   import java.time.format.DateTimeFormatter;
   
   public class TrafficRadarMonitoringCarCountExample {
       private static final String DAILY_COUNT_STORE = "price-state-store";
       private static final String DAILY_COUNT_TOPIC = "price-state-topic";
       private static final String RADAR_COUNT_TOPIC = "car-radar-topic";
   
       public static void countWithTransformValues(final StreamsBuilder 
builder) {
           // Define a state store for tracking daily car counts
           builder.addStateStore(
               Stores.keyValueStoreBuilder(
                   Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE),
                   org.apache.kafka.common.serialization.Serdes.String(),
                   org.apache.kafka.common.serialization.Serdes.Long()
               )
           );
           final KStream<Void, String> radarStream = 
builder.stream(RADAR_COUNT_TOPIC);
           // Apply the ValueTransformer with the state store
           radarStream.transformValues(DailyCarCountTransformer::new, 
DAILY_COUNT_STORE)
               .to(DAILY_COUNT_TOPIC);
       }
   
       public static void countWithProcessValues(final StreamsBuilder builder) {
           // Define a state store for tracking daily car counts
           builder.addStateStore(
               Stores.keyValueStoreBuilder(
                   Stores.inMemoryKeyValueStore(DAILY_COUNT_STORE),
                   org.apache.kafka.common.serialization.Serdes.String(),
                   org.apache.kafka.common.serialization.Serdes.Long()
               )
           );
           final KStream<Void, String> radarStream = 
builder.stream(RADAR_COUNT_TOPIC);
           // Apply the FixedKeyProcessor with the state store
           radarStream.processValues(DailyCarCountProcessor::new, 
DAILY_COUNT_STORE)
               .to(DAILY_COUNT_TOPIC);
       }
   
       public static class DailyCarCountTransformer implements 
ValueTransformer<String, String> {
           private KeyValueStore<String, Long> stateStore;
           private static final DateTimeFormatter DATE_FORMATTER =
               
DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault());
   
           @Override
           public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
               // Access the state store
               stateStore = context.getStateStore(DAILY_COUNT_STORE);
           }
   
           @Override
           public String transform(final String value) {
               if (value == null) {
                   return null; // Skip null events
               }
   
               // Derive the current day from the event timestamp
               final long timestamp = System.currentTimeMillis(); // Use system 
time for simplicity
               final String currentDay = 
DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp));
               // Retrieve the current count for the day
               Long dailyCount = stateStore.get(currentDay);
               if (dailyCount == null) {
                   dailyCount = 0L;
               }
               // Increment the count
               dailyCount++;
               stateStore.put(currentDay, dailyCount);
   
               // Return the current day's count
               return String.format("Day: %s, Car Count: %s", currentDay, 
dailyCount);
           }
   
           @Override
           public void close() {
           }
       }
   
       private static class DailyCarCountProcessor implements 
FixedKeyProcessor<Void, String, String> {
           private FixedKeyProcessorContext<Void, String> context;
           private KeyValueStore<String, Long> stateStore;
           private static final DateTimeFormatter DATE_FORMATTER =
               
DateTimeFormatter.ofPattern("yyyy-MM-dd").withZone(ZoneId.systemDefault());
   
           @Override
           public void init(final FixedKeyProcessorContext<Void, String> 
context) {
               this.context = context;
               stateStore = context.getStateStore(DAILY_COUNT_STORE);
           }
   
           @Override
           public void process(final FixedKeyRecord<Void, String> record) {
               if (record.value() == null) {
                   return; // Skip null events
               }
   
               // Derive the current day from the event timestamp
               final long timestamp = System.currentTimeMillis(); // Use system 
time for simplicity
               final String currentDay = 
DATE_FORMATTER.format(Instant.ofEpochMilli(timestamp));
               // Retrieve the current count for the day
               Long dailyCount = stateStore.get(currentDay);
               if (dailyCount == null) {
                   dailyCount = 0L;
               }
               // Increment the count
               dailyCount++;
               stateStore.put(currentDay, dailyCount);
   
               // Emit the current day's count
               context.forward(record.withValue(String.format("Day: %s, Car 
Count: %s", currentDay, dailyCount)));
           }
       }
   }
   ```
   
   ## Keynotes
   
   * The process and processValues APIs utilize ProcessorContext and Record 
objects for better type safety and flexibility.
   * Implementations for Processor or ProcessorWithKey should manage state and 
logic clearly. Use context().forward() for
   * emitting records downstream.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to