So I'm trying to implement a rate limiting processing step using Kafka Streams 
(0.10.2.1).

Basically, this step should just let messages through, unless similar messages 
have already been seen in the last N seconds, in which case it should aggregate 
them into a single message and then send them after N seconds have passed.

I initially tried implementing it as a transform step, where process() either 
lets messages through or aggregates/stores them, and where punctuate() 
sends/clears out any stored messages older than N seconds. However, because 
punctuate() is only run when there's new data, messages won't be reliably sent 
when they should be.

I then tried implementing my own scheduling that periodically sends/clears out 
messages using the ProcessorContext provided to the aforementioned transform 
step. However, it seems that when I call forward() from my scheduler (i.e. not 
in a process()/punctuate() call), I get a NullPointerException at 
ProcessorContextImpl.java:81 
(https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java#L81).
 I assume that this is because currentNode() is null outside of 
process()/punctuate() calls.

I also looked at handling this via a groupBy() and aggregate() operation, but 
it seems this wouldn't meet my requirements (AFAICT there's no way to get it to 
send the first message instantly while also delaying subsequent messages).

Is there any other way to do this? Perhaps there's a way to work around the 
NullPointerExceptions?

PS. It seems that 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics
 would make my initial approach work, but unfortunately it seems to have been 
pushed to the next release 
(https://cwiki.apache.org/confluence/display/KAFKA/Release+Plan+0.11.0.0).

Reply via email to