Hey folks, I'm trying to intercept outgoing Kafka messages before they're sent to do some basic tracking (ie get the actual assigned partition number, which isn't available in the StreamTask or MessageCollector classes). I've written a class that conforms to Kafka's ProducerInterceptor interface [0] and have added the following line to my streamtask properties file:
systems.kafka.producer.interceptor.classes=com.cavulus.kafka.CavulusKafkaProducerInterceptor Here's that class' package and signature: package com.cavulus.kafka; import org.apache.kafka.clients.producer.ProducerInterceptor; ... public class CavulusKafkaProducerInterceptor implements ProducerInterceptor { .. } The class compiles fine, so I assume that I'm adhering correctly to the interface, but when I try to start up the task in YARN, it stalls -- no logging, no errors, just nothing. I have no problem starting this streamtask when omitting the above line from my properties file. KIP-42 [1] explicitly references Samza as an inspiration for adding the ProducerInterceptor to Kafka 0.10, so I've been assuming some sort of compatibility (but I could be mistaken). Has anybody attempted this sort of combination? Cheers, Malcolm McFarland Cavulus [0] https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html [1] https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any unauthorized or improper disclosure, copying, distribution, or use of the contents of this message is prohibited. The information contained in this message is intended only for the personal and confidential use of the recipient(s) named above. If you have received this message in error, please notify the sender immediately and delete the original message.