Hey Malcolm, Sorry for not responding to this earlier. Unfortunately, we don't have an example in which ProducerInterceptor is used in a Samza application. It's odd that the whole task fails when you try to use your interceptor. The only thing I could think of is that the Kafka producer fails to properly initialize, and that causes the task to fail. However, I would still expect that you would see some logs pointing out where the error was. One thing you might want to double check is that the version of Kafka that is transitively pulled in by Samza matches the version of Kafka which you implemented your ProducerInterceptor against. It's possible that the interface changed under different versions of Kafka, and so your compile-time Kafka version might not match your runtime Kafka version.
That being said, if you are just looking to track partitions that stop listening for messages, then you might not need to use a Kafka interceptor. Samza already has metrics for consumption and production of messages, so you can look at those metrics to see if consumption/production rate drops to zero for any partition/container. There is TaskInstanceMetrics which have metrics like "process-calls" and "send-calls", and there are also KafkaSystemProducerMetrics/KafkaSystemConsumerMetrics (although I think the Kafka metrics are aggregated across the whole container). Do you think these metrics are sufficient for your use case? Cameron On Mon, Oct 7, 2019 at 1:29 PM Malcolm McFarland <mmcfarl...@cavulus.com> wrote: > Hey y'all, just wanted to follow up on this: does anybody have a working > example of using Kafka's ProducerInterceptor interface under YARN that I > could take a look at, or that they could provide some guidance with? > > More generally, we're trying to keep an eye on Samza SSP instances that > occasionally seem to go "zombie"; ie, the AM doesn't see any problems, no > errors are surfaced, but they've just stop listening for messages. My idea > here is to record the topic/partition/offset on each message delivery, and > record the subsequent topic/partition/offset when Samza picks up the > message for processing. Are there any ideas out there about how to do this? > > Cheers, > Malcolm McFarland > Cavulus > > > This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any > unauthorized or improper disclosure, copying, distribution, or use of the > contents of this message is prohibited. The information contained in this > message is intended only for the personal and confidential use of the > recipient(s) named above. If you have received this message in error, > please notify the sender immediately and delete the original message. > > > Malcolm McFarland > Cavulus > > > This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any > unauthorized or improper disclosure, copying, distribution, or use of the > contents of this message is prohibited. The information contained in this > message is intended only for the personal and confidential use of the > recipient(s) named above. If you have received this message in error, > please notify the sender immediately and delete the original message. > > > On Tue, Sep 10, 2019 at 3:13 PM Malcolm McFarland <mmcfarl...@cavulus.com> > wrote: > > > Hey folks, > > > > I'm trying to intercept outgoing Kafka messages before they're sent to > > do some basic tracking (ie get the actual assigned partition number, > > which isn't available in the StreamTask or MessageCollector classes). > > I've written a class that conforms to Kafka's ProducerInterceptor > > interface [0] and have added the following line to my streamtask > > properties file: > > > > > > > systems.kafka.producer.interceptor.classes=com.cavulus.kafka.CavulusKafkaProducerInterceptor > > > > Here's that class' package and signature: > > > > package com.cavulus.kafka; > > import org.apache.kafka.clients.producer.ProducerInterceptor; > > ... > > public class CavulusKafkaProducerInterceptor implements > > ProducerInterceptor { .. } > > > > The class compiles fine, so I assume that I'm adhering correctly to > > the interface, but when I try to start up the task in YARN, it stalls > > -- no logging, no errors, just nothing. I have no problem starting > > this streamtask when omitting the above line from my properties file. > > > > KIP-42 [1] explicitly references Samza as an inspiration for adding > > the ProducerInterceptor to Kafka 0.10, so I've been assuming some sort > > of compatibility (but I could be mistaken). > > > > Has anybody attempted this sort of combination? > > > > Cheers, > > Malcolm McFarland > > Cavulus > > > > [0] > > > https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html > > [1] > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors > > > > This correspondence is from HealthPlanCRM, LLC, d/b/a Cavulus. Any > > unauthorized or improper disclosure, copying, distribution, or use of > > the contents of this message is prohibited. The information contained > > in this message is intended only for the personal and confidential use > > of the recipient(s) named above. If you have received this message in > > error, please notify the sender immediately and delete the original > > message. > > >