fonsdant commented on PR #18314: URL: https://github.com/apache/kafka/pull/18314#issuecomment-2585901473
@mjsax, how about using the `PopularPageEmailAlert` of the _Applying processors and transformers (Processor API integration)_ section? ```java package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; import org.apache.kafka.streams.processor.api.Record; public class PopularPageEmailAlertExample { private static final String ALERTS_EMAIL = "ale...@yourcompany.com"; private static final String PAGE_VIEWS_TOPIC = "page-views-topic"; public static void alertWithOldProcess(StreamsBuilder builder) { KStream<String, Long> pageViews = builder.stream(PAGE_VIEWS_TOPIC); // Filter pages with exactly 1000 views and process them using the old API pageViews.filter((pageId, viewCount) -> viewCount == 1000) .process(PopularPageEmailAlertOld::new); } public static void alertWithNewProcess(StreamsBuilder builder) { KStream<String, Long> pageViews = builder.stream(PAGE_VIEWS_TOPIC); // Filter pages with exactly 1000 views and process them using the new API pageViews.filter((pageId, viewCount) -> viewCount == 1000) .process(PopularPageEmailAlertNew::new); } private static class PopularPageEmailAlertOld extends AbstractProcessor<String, Long> { @Override public void init(org.apache.kafka.streams.processor.ProcessorContext context) { super.init(context); System.out.println("Initialized email client for: " + ALERTS_EMAIL); } @Override public void process(String key, Long value) { if (value == null) return; if (value == 1000) { // Send an email alert System.out.printf("ALERT (Old API): Page %s has reached 1000 views. Sending email to %s%n", key, ALERTS_EMAIL); } } @Override public void close() { System.out.println("Tearing down email client for: " + ALERTS_EMAIL); } } private static class PopularPageEmailAlertNew implements Processor<String, Long, Void, Void> { @Override public void init(ProcessorContext<Void, Void> context) { System.out.println("Initialized email client for: " + ALERTS_EMAIL); } @Override public void process(Record<String, Long> record) { if (record.value() == null) return; if (record.value() == 1000) { // Send an email alert System.out.printf("ALERT (New API): Page %s has reached 1000 views. Sending email to %s%n", record.key(), ALERTS_EMAIL); } } @Override public void close() { System.out.println("Tearing down email client for: " + ALERTS_EMAIL); } } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org