fonsdant commented on PR #18314:
URL: https://github.com/apache/kafka/pull/18314#issuecomment-2585901473

   @mjsax, how about using the `PopularPageEmailAlert` of the _Applying 
processors and transformers (Processor API integration)_ section?
   
   ```java
   package org.apache.kafka.streams.kstream;
   
   import org.apache.kafka.streams.StreamsBuilder;
   import org.apache.kafka.streams.processor.AbstractProcessor;
   import org.apache.kafka.streams.processor.api.Processor;
   import org.apache.kafka.streams.processor.api.ProcessorContext;
   import org.apache.kafka.streams.processor.api.Record;
   
   public class PopularPageEmailAlertExample {
       private static final String ALERTS_EMAIL = "ale...@yourcompany.com";
       private static final String PAGE_VIEWS_TOPIC = "page-views-topic";
   
       public static void alertWithOldProcess(StreamsBuilder builder) {
           KStream<String, Long> pageViews = builder.stream(PAGE_VIEWS_TOPIC);
           // Filter pages with exactly 1000 views and process them using the 
old API
           pageViews.filter((pageId, viewCount) -> viewCount == 1000)
                   .process(PopularPageEmailAlertOld::new);
       }
   
       public static void alertWithNewProcess(StreamsBuilder builder) {
           KStream<String, Long> pageViews = builder.stream(PAGE_VIEWS_TOPIC);
           // Filter pages with exactly 1000 views and process them using the 
new API
           pageViews.filter((pageId, viewCount) -> viewCount == 1000)
                   .process(PopularPageEmailAlertNew::new);
       }
   
       private static class PopularPageEmailAlertOld extends 
AbstractProcessor<String, Long> {
           @Override
           public void init(org.apache.kafka.streams.processor.ProcessorContext 
context) {
               super.init(context);
               System.out.println("Initialized email client for: " + 
ALERTS_EMAIL);
           }
   
           @Override
           public void process(String key, Long value) {
               if (value == null) return;
   
               if (value == 1000) {
                   // Send an email alert
                   System.out.printf("ALERT (Old API): Page %s has reached 1000 
views. Sending email to %s%n", key, ALERTS_EMAIL);
               }
           }
   
           @Override
           public void close() {
               System.out.println("Tearing down email client for: " + 
ALERTS_EMAIL);
           }
       }
   
       private static class PopularPageEmailAlertNew implements 
Processor<String, Long, Void, Void> {
           @Override
           public void init(ProcessorContext<Void, Void> context) {
               System.out.println("Initialized email client for: " + 
ALERTS_EMAIL);
           }
   
           @Override
           public void process(Record<String, Long> record) {
               if (record.value() == null) return;
   
               if (record.value() == 1000) {
                   // Send an email alert
                   System.out.printf("ALERT (New API): Page %s has reached 1000 
views. Sending email to %s%n", record.key(), ALERTS_EMAIL);
               }
           }
   
           @Override
           public void close() {
               System.out.println("Tearing down email client for: " + 
ALERTS_EMAIL);
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to