So I'm trying to implement a rate limiting processing step using Kafka Streams (0.10.2.1).
Basically, this step should just let messages through, unless similar messages have already been seen in the last N seconds, in which case it should aggregate them into a single message and then send them after N seconds have passed. I initially tried implementing it as a transform step, where process() either lets messages through or aggregates/stores them, and where punctuate() sends/clears out any stored messages older than N seconds. However, because punctuate() is only run when there's new data, messages won't be reliably sent when they should be. I then tried implementing my own scheduling that periodically sends/clears out messages using the ProcessorContext provided to the aforementioned transform step. However, it seems that when I call forward() from my scheduler (i.e. not in a process()/punctuate() call), I get a NullPointerException at ProcessorContextImpl.java:81 (https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L81). I assume that this is because currentNode() is null outside of process()/punctuate() calls. I also looked at handling this via a groupBy() and aggregate() operation, but it seems this wouldn't meet my requirements (AFAICT there's no way to get it to send the first message instantly while also delaying subsequent messages). Is there any other way to do this? Perhaps there's a way to work around the NullPointerExceptions? PS. It seems that https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics would make my initial approach work, but unfortunately it seems to have been pushed to the next release (https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0).